X-Git-Url: http://git.cascardo.eti.br/?a=blobdiff_plain;f=tests%2Fhelpers%2Fhttp.py;h=69d40a50dfcb8822f9e6b66ee8246a08b8a45bb1;hb=264c3d054f68a0729085961464f0c8aa44f52cc6;hp=2478e2a590093719953067ee040e9c2d3bff3f8c;hpb=800b39df6e2c65fa06a0d5da48002bb26b83b435;p=cascardo%2Fipsilon.git diff --git a/tests/helpers/http.py b/tests/helpers/http.py index 2478e2a..69d40a5 100755 --- a/tests/helpers/http.py +++ b/tests/helpers/http.py @@ -1,22 +1,6 @@ #!/usr/bin/python # -# Copyright (C) 2014 Simo Sorce -# -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - +# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING from lxml import html import requests @@ -24,6 +8,7 @@ import string import urlparse import json from urllib import urlencode +from requests_kerberos import HTTPKerberosAuth, OPTIONAL class WrongPage(Exception): @@ -45,7 +30,7 @@ class PageTree(object): def first_value(self, rule): result = self.tree.xpath(rule) - if type(result) is list: + if isinstance(result, list): if len(result) > 0: result = result[0] else: @@ -54,7 +39,7 @@ class PageTree(object): def all_values(self, rule): result = self.tree.xpath(rule) - if type(result) is list: + if isinstance(result, list): return result return [result] @@ -89,18 +74,26 @@ class HttpSessions(object): raise ValueError("Unknown URL: %s" % url) - def get(self, url, **kwargs): + def get(self, url, krb=False, **kwargs): session = self.get_session(url) - return session.get(url, allow_redirects=False, **kwargs) + allow_redirects = False + if krb: + # python-requests-kerberos isn't too bright about doing mutual + # authentication and it tries to do it on any non-401 response + # which doesn't work in our case since we follow redirects. + kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL) + kwargs['auth'] = kerberos_auth + allow_redirects = True + return session.get(url, allow_redirects=allow_redirects, **kwargs) def post(self, url, **kwargs): session = self.get_session(url) return session.post(url, allow_redirects=False, **kwargs) - def access(self, action, url, **kwargs): + def access(self, action, url, krb=False, **kwargs): action = string.lower(action) if action == 'get': - return self.get(url, **kwargs) + return self.get(url, krb, **kwargs) elif action == 'post': return self.post(url, **kwargs) else: @@ -113,19 +106,22 @@ class HttpSessions(object): return action def get_form_data(self, page, form_id, input_fields): + form_selector = '//form' + if form_id: + form_selector += '[@id="%s"]' % form_id values = [] - action = page.first_value('//form[@id="%s"]/@action' % form_id) + action = page.first_value('%s/@action' % form_selector) values.append(action) - method = page.first_value('//form[@id="%s"]/@method' % form_id) + method = page.first_value('%s/@method' % form_selector) values.append(method) for field in input_fields: - value = page.all_values('//form[@id="%s"]/input/@%s' % (form_id, - field)) + value = page.all_values('%s/input/@%s' % (form_selector, + field)) values.append(value) return values def handle_login_form(self, idp, page): - if type(page) != PageTree: + if not isinstance(page, PageTree): raise TypeError("Expected PageTree object") srv = self.servers[idp] @@ -155,7 +151,7 @@ class HttpSessions(object): {'headers': headers, 'data': payload}] def handle_return_form(self, page): - if type(page) != PageTree: + if not isinstance(page, PageTree): raise TypeError("Expected PageTree object") try: @@ -180,19 +176,97 @@ class HttpSessions(object): return [method, self.new_url(referer, action_url), {'headers': headers, 'data': payload}] - def fetch_page(self, idp, target_url, follow_redirect=True): + def handle_openid_form(self, page): + if not isinstance(page, PageTree): + raise TypeError("Expected PageTree object") + + if not page.first_value('//title/text()') == \ + 'OpenID transaction in progress': + raise WrongPage('Not OpenID autosubmit form') + + try: + results = self.get_form_data(page, None, + ["name", "value"]) + action_url = results[0] + if action_url is None: + raise Exception + method = results[1] + names = results[2] + values = results[3] + except Exception: # pylint: disable=broad-except + raise WrongPage("Not OpenID autosubmit form") + + referer = page.make_referer() + headers = {'referer': referer} + + payload = {} + for i in range(0, len(names)): + payload[names[i]] = values[i] + + return [method, self.new_url(referer, action_url), + {'headers': headers, 'data': payload}] + + def handle_openid_consent_form(self, page): + if not isinstance(page, PageTree): + raise TypeError("Expected PageTree object") + + try: + results = self.get_form_data(page, "consent_form", + ['name', 'value']) + action_url = results[0] + if action_url is None: + raise Exception + method = results[1] + names = results[2] + values = results[3] + except Exception: # pylint: disable=broad-except + raise WrongPage("Not an OpenID Consent Form Page") + + referer = page.make_referer() + headers = {'referer': referer} + + payload = {} + for i in range(0, len(names)): + payload[names[i]] = values[i] + + # Replace known values + payload['decided_allow'] = 'Allow' + + return [method, self.new_url(referer, action_url), + {'headers': headers, 'data': payload}] + + def fetch_page(self, idp, target_url, follow_redirect=True, krb=False): + """ + Fetch a page and parse the response code to determine what to do + next. + + The login process consists of redirections (302/303) and + potentially an unauthorized (401). For the case of unauthorized + try the page returned in case of fallback authentication. + """ url = target_url action = 'get' args = {} while True: - r = self.access(action, url, **args) # pylint: disable=star-args - if r.status_code == 303: + r = self.access(action, url, krb=krb, **args) + if r.status_code == 303 or r.status_code == 302: if not follow_redirect: return PageTree(r) url = r.headers['location'] action = 'get' args = {} + elif r.status_code == 401: + page = PageTree(r) + if r.headers.get('WWW-Authenticate', None) is None: + return page + + # Fall back, hopefully to testauth authentication. + try: + (action, url, args) = self.handle_login_form(idp, page) + continue + except WrongPage: + pass elif r.status_code == 200: page = PageTree(r) @@ -208,18 +282,30 @@ class HttpSessions(object): except WrongPage: pass + try: + (action, url, args) = self.handle_openid_consent_form(page) + continue + except WrongPage: + pass + + try: + (action, url, args) = self.handle_openid_form(page) + continue + except WrongPage: + pass + # Either we got what we wanted, or we have to stop anyway return page else: raise ValueError("Unhandled status (%d) on url %s" % ( r.status_code, url)) - def auth_to_idp(self, idp): + def auth_to_idp(self, idp, krb=False, rule=None, expected=None): srv = self.servers[idp] target_url = '%s/%s/' % (srv['baseuri'], idp) - r = self.access('get', target_url) + r = self.access('get', target_url, krb=krb) if r.status_code != 200: raise ValueError("Access to idp failed: %s" % repr(r)) @@ -228,9 +314,14 @@ class HttpSessions(object): href = page.first_value('//div[@id="content"]/p/a/@href') url = self.new_url(target_url, href) - page = self.fetch_page(idp, url) - page.expected_value('//div[@id="welcome"]/p/text()', - 'Welcome %s!' % srv['user']) + page = self.fetch_page(idp, url, krb=krb) + + if rule is None: + rule = '//div[@id="welcome"]/p/text()' + if expected is None: + expected = 'Welcome %s!' % srv['user'] + + page.expected_value(rule, expected) def logout_from_idp(self, idp): @@ -251,22 +342,25 @@ class HttpSessions(object): def add_sp_metadata(self, idp, sp, rest=False): expected_status = 200 - idpsrv = self.servers[idp] (idpuri, m) = self.get_sp_metadata(idp, sp) url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp) headers = {'referer': url} if rest: expected_status = 201 - payload = {'metadata': m.content} + payload = { + 'metadata': m.content, + 'visible': True, + 'description': sp, + 'image': 'Zm9v', + 'splink': 'http://test.example.com/secret/', + } headers['content-type'] = 'application/x-www-form-urlencoded' url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp) - r = idpsrv['session'].post(url, headers=headers, - data=urlencode(payload)) + r = self.post(url, headers=headers, data=urlencode(payload)) else: metafile = {'metafile': m.content} payload = {'name': sp} - r = idpsrv['session'].post(url, headers=headers, - data=payload, files=metafile) + r = self.post(url, headers=headers, data=payload, files=metafile) if r.status_code != expected_status: raise ValueError('Failed to post SP data [%s]' % repr(r))