3 # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
10 from urllib import urlencode
11 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
14 class WrongPage(Exception):
18 class PageTree(object):
20 def __init__(self, result):
22 self.text = result.text
27 if self._tree is None:
28 self._tree = html.fromstring(self.text)
31 def first_value(self, rule):
32 result = self.tree.xpath(rule)
33 if isinstance(result, list):
40 def all_values(self, rule):
41 result = self.tree.xpath(rule)
42 if isinstance(result, list):
46 def make_referer(self):
47 return self.result.url
49 def expected_value(self, rule, expected):
50 value = self.first_value(rule)
52 raise ValueError("Expected [%s], got [%s]" % (expected, value))
55 class HttpSessions(object):
60 def add_server(self, name, baseuri, user=None, pwd=None):
61 new = {'baseuri': baseuri,
62 'session': requests.Session()}
67 self.servers[name] = new
69 def get_session(self, url):
70 for srv in self.servers:
72 if url.startswith(d['baseuri']):
75 raise ValueError("Unknown URL: %s" % url)
77 def get(self, url, krb=False, **kwargs):
78 session = self.get_session(url)
79 allow_redirects = False
81 # python-requests-kerberos isn't too bright about doing mutual
82 # authentication and it tries to do it on any non-401 response
83 # which doesn't work in our case since we follow redirects.
84 kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
85 kwargs['auth'] = kerberos_auth
86 allow_redirects = True
87 return session.get(url, allow_redirects=allow_redirects, **kwargs)
89 def post(self, url, **kwargs):
90 session = self.get_session(url)
91 return session.post(url, allow_redirects=False, **kwargs)
93 def access(self, action, url, krb=False, **kwargs):
94 action = string.lower(action)
96 return self.get(url, krb, **kwargs)
97 elif action == 'post':
98 return self.post(url, **kwargs)
100 raise ValueError("Unknown action type: [%s]" % action)
102 def new_url(self, referer, action):
103 if action.startswith('/'):
104 u = urlparse.urlparse(referer)
105 return '%s://%s%s' % (u.scheme, u.netloc, action)
108 def get_form_data(self, page, form_id, input_fields):
109 form_selector = '//form'
111 form_selector += '[@id="%s"]' % form_id
113 action = page.first_value('%s/@action' % form_selector)
114 values.append(action)
115 method = page.first_value('%s/@method' % form_selector)
116 values.append(method)
117 for field in input_fields:
118 value = page.all_values('%s/input/@%s' % (form_selector,
123 def handle_login_form(self, idp, page):
124 if not isinstance(page, PageTree):
125 raise TypeError("Expected PageTree object")
127 srv = self.servers[idp]
130 results = self.get_form_data(page, "login_form", ["name", "value"])
131 action_url = results[0]
135 if action_url is None:
137 except Exception: # pylint: disable=broad-except
138 raise WrongPage("Not a Login Form Page")
140 referer = page.make_referer()
141 headers = {'referer': referer}
143 for i in range(0, len(names)):
144 payload[names[i]] = values[i]
146 # replace known values
147 payload['login_name'] = srv['user']
148 payload['login_password'] = srv['pwd']
150 return [method, self.new_url(referer, action_url),
151 {'headers': headers, 'data': payload}]
153 def handle_return_form(self, page):
154 if not isinstance(page, PageTree):
155 raise TypeError("Expected PageTree object")
158 results = self.get_form_data(page, "saml-response",
160 action_url = results[0]
161 if action_url is None:
166 except Exception: # pylint: disable=broad-except
167 raise WrongPage("Not a Return Form Page")
169 referer = page.make_referer()
170 headers = {'referer': referer}
173 for i in range(0, len(names)):
174 payload[names[i]] = values[i]
176 return [method, self.new_url(referer, action_url),
177 {'headers': headers, 'data': payload}]
179 def handle_openid_form(self, page):
180 if not isinstance(page, PageTree):
181 raise TypeError("Expected PageTree object")
183 if not page.first_value('//title/text()') == \
184 'OpenID transaction in progress':
185 raise WrongPage('Not OpenID autosubmit form')
188 results = self.get_form_data(page, None,
190 action_url = results[0]
191 if action_url is None:
196 except Exception: # pylint: disable=broad-except
197 raise WrongPage("Not OpenID autosubmit form")
199 referer = page.make_referer()
200 headers = {'referer': referer}
203 for i in range(0, len(names)):
204 payload[names[i]] = values[i]
206 return [method, self.new_url(referer, action_url),
207 {'headers': headers, 'data': payload}]
209 def handle_openid_consent_form(self, page):
210 if not isinstance(page, PageTree):
211 raise TypeError("Expected PageTree object")
214 results = self.get_form_data(page, "consent_form",
216 action_url = results[0]
217 if action_url is None:
222 except Exception: # pylint: disable=broad-except
223 raise WrongPage("Not an OpenID Consent Form Page")
225 referer = page.make_referer()
226 headers = {'referer': referer}
229 for i in range(0, len(names)):
230 payload[names[i]] = values[i]
232 # Replace known values
233 payload['decided_allow'] = 'Allow'
235 return [method, self.new_url(referer, action_url),
236 {'headers': headers, 'data': payload}]
238 def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
240 Fetch a page and parse the response code to determine what to do
243 The login process consists of redirections (302/303) and
244 potentially an unauthorized (401). For the case of unauthorized
245 try the page returned in case of fallback authentication.
252 r = self.access(action, url, krb=krb, **args)
253 if r.status_code == 303 or r.status_code == 302:
254 if not follow_redirect:
256 url = r.headers['location']
259 elif r.status_code == 401:
261 if r.headers.get('WWW-Authenticate', None) is None:
264 # Fall back, hopefully to testauth authentication.
266 (action, url, args) = self.handle_login_form(idp, page)
270 elif r.status_code == 200:
274 (action, url, args) = self.handle_login_form(idp, page)
280 (action, url, args) = self.handle_return_form(page)
286 (action, url, args) = self.handle_openid_consent_form(page)
292 (action, url, args) = self.handle_openid_form(page)
297 # Either we got what we wanted, or we have to stop anyway
300 raise ValueError("Unhandled status (%d) on url %s" % (
303 def auth_to_idp(self, idp, krb=False, rule=None, expected=None):
305 srv = self.servers[idp]
306 target_url = '%s/%s/' % (srv['baseuri'], idp)
308 r = self.access('get', target_url, krb=krb)
309 if r.status_code != 200:
310 raise ValueError("Access to idp failed: %s" % repr(r))
313 page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
314 href = page.first_value('//div[@id="content"]/p/a/@href')
315 url = self.new_url(target_url, href)
317 page = self.fetch_page(idp, url, krb=krb)
320 rule = '//div[@id="welcome"]/p/text()'
322 expected = 'Welcome %s!' % srv['user']
324 page.expected_value(rule, expected)
326 def logout_from_idp(self, idp):
328 srv = self.servers[idp]
329 target_url = '%s/%s/logout' % (srv['baseuri'], idp)
331 r = self.access('get', target_url)
332 if r.status_code != 200:
333 raise ValueError("Logout from idp failed: %s" % repr(r))
335 def get_sp_metadata(self, idp, sp):
336 idpsrv = self.servers[idp]
337 idpuri = idpsrv['baseuri']
339 spuri = self.servers[sp]['baseuri']
341 return (idpuri, requests.get('%s/saml2/metadata' % spuri))
343 def add_sp_metadata(self, idp, sp, rest=False):
344 expected_status = 200
345 (idpuri, m) = self.get_sp_metadata(idp, sp)
346 url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
347 headers = {'referer': url}
349 expected_status = 201
350 payload = {'metadata': m.content}
351 headers['content-type'] = 'application/x-www-form-urlencoded'
352 url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
353 r = self.post(url, headers=headers, data=urlencode(payload))
355 metafile = {'metafile': m.content}
356 payload = {'name': sp}
357 r = self.post(url, headers=headers, data=payload, files=metafile)
358 if r.status_code != expected_status:
359 raise ValueError('Failed to post SP data [%s]' % repr(r))
363 page.expected_value('//div[@class="alert alert-success"]/p/text()',
364 'SP Successfully added')
366 def set_sp_default_nameids(self, idp, sp, nameids):
368 nameids is a list of Name ID formats to enable
370 idpsrv = self.servers[idp]
371 idpuri = idpsrv['baseuri']
372 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
373 headers = {'referer': url}
374 headers['content-type'] = 'application/x-www-form-urlencoded'
375 payload = {'submit': 'Submit',
376 'allowed_nameids': ', '.join(nameids)}
377 r = idpsrv['session'].post(url, headers=headers,
379 if r.status_code != 200:
380 raise ValueError('Failed to post SP data [%s]' % repr(r))
382 # pylint: disable=dangerous-default-value
383 def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
386 Set allowed attributes and mapping in the IDP or the SP. In the
387 case of the SP both allowed attributes and the mapping need to
388 be provided. An empty option for either means delete all values.
390 mapping is a list of list of rules of the form:
391 [['from-1', 'to-1'], ['from-2', 'from-2']]
393 ex. [['*', '*'], ['fullname', 'namefull']]
395 attrs is the list of attributes that will be allowed:
396 ['fullname', 'givenname', 'surname']
398 idpsrv = self.servers[idp]
399 idpuri = idpsrv['baseuri']
400 if spname: # per-SP setting
401 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
403 mapname = 'Attribute Mapping'
404 attrname = 'Allowed Attributes'
405 else: # global default
406 url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
407 mapname = 'default attribute mapping'
408 attrname = 'default allowed attributes'
410 headers = {'referer': url}
411 headers['content-type'] = 'application/x-www-form-urlencoded'
412 payload = {'submit': 'Submit'}
415 payload['%s %s-from' % (mapname, count)] = m[0]
416 payload['%s %s-to' % (mapname, count)] = m[1]
420 payload['%s %s-name' % (attrname, count)] = attr
422 r = idpsrv['session'].post(url, headers=headers,
424 if r.status_code != 200:
425 raise ValueError('Failed to post IDP data [%s]' % repr(r))
427 def fetch_rest_page(self, idpname, uri):
429 idpname - the name of the IDP to fetch the page from
430 uri - the URI of the page to retrieve
432 The URL for the request is built from known-information in
435 returns dict if successful
436 returns ValueError if the output is unparseable
438 baseurl = self.servers[idpname].get('baseuri')
439 page = self.fetch_page(
441 '%s%s' % (baseurl, uri)
443 return json.loads(page.text)
445 def get_rest_sp(self, idpname, spname=None):
447 uri = '/%s/rest/providers/saml2/SPS/' % idpname
449 uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
451 return self.fetch_rest_page(idpname, uri)