3 # Copyright (C) 2014 Simo Sorce <simo@redhat.com>
5 # see file 'COPYING' for use and warranty information
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 from urllib import urlencode
27 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
30 class WrongPage(Exception):
34 class PageTree(object):
36 def __init__(self, result):
38 self.text = result.text
43 if self._tree is None:
44 self._tree = html.fromstring(self.text)
47 def first_value(self, rule):
48 result = self.tree.xpath(rule)
49 if type(result) is list:
56 def all_values(self, rule):
57 result = self.tree.xpath(rule)
58 if type(result) is list:
62 def make_referer(self):
63 return self.result.url
65 def expected_value(self, rule, expected):
66 value = self.first_value(rule)
68 raise ValueError("Expected [%s], got [%s]" % (expected, value))
71 class HttpSessions(object):
76 def add_server(self, name, baseuri, user=None, pwd=None):
77 new = {'baseuri': baseuri,
78 'session': requests.Session()}
83 self.servers[name] = new
85 def get_session(self, url):
86 for srv in self.servers:
88 if url.startswith(d['baseuri']):
91 raise ValueError("Unknown URL: %s" % url)
93 def get(self, url, krb=False, **kwargs):
94 session = self.get_session(url)
95 allow_redirects = False
97 # python-requests-kerberos isn't too bright about doing mutual
98 # authentication and it tries to do it on any non-401 response
99 # which doesn't work in our case since we follow redirects.
100 kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
101 kwargs['auth'] = kerberos_auth
102 allow_redirects = True
103 return session.get(url, allow_redirects=allow_redirects, **kwargs)
105 def post(self, url, **kwargs):
106 session = self.get_session(url)
107 return session.post(url, allow_redirects=False, **kwargs)
109 def access(self, action, url, krb=False, **kwargs):
110 action = string.lower(action)
112 return self.get(url, krb, **kwargs)
113 elif action == 'post':
114 return self.post(url, **kwargs)
116 raise ValueError("Unknown action type: [%s]" % action)
118 def new_url(self, referer, action):
119 if action.startswith('/'):
120 u = urlparse.urlparse(referer)
121 return '%s://%s%s' % (u.scheme, u.netloc, action)
124 def get_form_data(self, page, form_id, input_fields):
125 form_selector = '//form'
127 form_selector += '[@id="%s"]' % form_id
129 action = page.first_value('%s/@action' % form_selector)
130 values.append(action)
131 method = page.first_value('%s/@method' % form_selector)
132 values.append(method)
133 for field in input_fields:
134 value = page.all_values('%s/input/@%s' % (form_selector,
139 def handle_login_form(self, idp, page):
140 if type(page) != PageTree:
141 raise TypeError("Expected PageTree object")
143 srv = self.servers[idp]
146 results = self.get_form_data(page, "login_form", ["name", "value"])
147 action_url = results[0]
151 if action_url is None:
153 except Exception: # pylint: disable=broad-except
154 raise WrongPage("Not a Login Form Page")
156 referer = page.make_referer()
157 headers = {'referer': referer}
159 for i in range(0, len(names)):
160 payload[names[i]] = values[i]
162 # replace known values
163 payload['login_name'] = srv['user']
164 payload['login_password'] = srv['pwd']
166 return [method, self.new_url(referer, action_url),
167 {'headers': headers, 'data': payload}]
169 def handle_return_form(self, page):
170 if type(page) != PageTree:
171 raise TypeError("Expected PageTree object")
174 results = self.get_form_data(page, "saml-response",
176 action_url = results[0]
177 if action_url is None:
182 except Exception: # pylint: disable=broad-except
183 raise WrongPage("Not a Return Form Page")
185 referer = page.make_referer()
186 headers = {'referer': referer}
189 for i in range(0, len(names)):
190 payload[names[i]] = values[i]
192 return [method, self.new_url(referer, action_url),
193 {'headers': headers, 'data': payload}]
195 def handle_openid_form(self, page):
196 if type(page) != PageTree:
197 raise TypeError("Expected PageTree object")
199 if not page.first_value('//title/text()') == \
200 'OpenID transaction in progress':
201 raise WrongPage('Not OpenID autosubmit form')
204 results = self.get_form_data(page, None,
206 action_url = results[0]
207 if action_url is None:
212 except Exception: # pylint: disable=broad-except
213 raise WrongPage("Not OpenID autosubmit form")
215 referer = page.make_referer()
216 headers = {'referer': referer}
219 for i in range(0, len(names)):
220 payload[names[i]] = values[i]
222 return [method, self.new_url(referer, action_url),
223 {'headers': headers, 'data': payload}]
225 def handle_openid_consent_form(self, page):
226 if type(page) != PageTree:
227 raise TypeError("Expected PageTree object")
230 results = self.get_form_data(page, "consent_form",
232 action_url = results[0]
233 if action_url is None:
238 except Exception: # pylint: disable=broad-except
239 raise WrongPage("Not an OpenID Consent Form Page")
241 referer = page.make_referer()
242 headers = {'referer': referer}
245 for i in range(0, len(names)):
246 payload[names[i]] = values[i]
248 # Replace known values
249 payload['decided_allow'] = 'Allow'
251 return [method, self.new_url(referer, action_url),
252 {'headers': headers, 'data': payload}]
254 def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
256 Fetch a page and parse the response code to determine what to do
259 The login process consists of redirections (302/303) and
260 potentially an unauthorized (401). For the case of unauthorized
261 try the page returned in case of fallback authentication.
268 # pylint: disable=star-args
269 r = self.access(action, url, krb=krb, **args)
270 if r.status_code == 303 or r.status_code == 302:
271 if not follow_redirect:
273 url = r.headers['location']
276 elif r.status_code == 401:
278 if r.headers.get('WWW-Authenticate', None) is None:
281 # Fall back, hopefully to testauth authentication.
283 (action, url, args) = self.handle_login_form(idp, page)
287 elif r.status_code == 200:
291 (action, url, args) = self.handle_login_form(idp, page)
297 (action, url, args) = self.handle_return_form(page)
303 (action, url, args) = self.handle_openid_consent_form(page)
309 (action, url, args) = self.handle_openid_form(page)
314 # Either we got what we wanted, or we have to stop anyway
317 raise ValueError("Unhandled status (%d) on url %s" % (
320 def auth_to_idp(self, idp, krb=False):
322 srv = self.servers[idp]
323 target_url = '%s/%s/' % (srv['baseuri'], idp)
325 r = self.access('get', target_url, krb=krb)
326 if r.status_code != 200:
327 raise ValueError("Access to idp failed: %s" % repr(r))
330 page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
331 href = page.first_value('//div[@id="content"]/p/a/@href')
332 url = self.new_url(target_url, href)
334 page = self.fetch_page(idp, url, krb=krb)
336 page.expected_value('//div[@id="welcome"]/p/text()',
337 'Welcome %s!' % srv['user'])
339 def logout_from_idp(self, idp):
341 srv = self.servers[idp]
342 target_url = '%s/%s/logout' % (srv['baseuri'], idp)
344 r = self.access('get', target_url)
345 if r.status_code != 200:
346 raise ValueError("Logout from idp failed: %s" % repr(r))
348 def get_sp_metadata(self, idp, sp):
349 idpsrv = self.servers[idp]
350 idpuri = idpsrv['baseuri']
352 spuri = self.servers[sp]['baseuri']
354 return (idpuri, requests.get('%s/saml2/metadata' % spuri))
356 def add_sp_metadata(self, idp, sp, rest=False):
357 expected_status = 200
358 (idpuri, m) = self.get_sp_metadata(idp, sp)
359 url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
360 headers = {'referer': url}
362 expected_status = 201
363 payload = {'metadata': m.content}
364 headers['content-type'] = 'application/x-www-form-urlencoded'
365 url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
366 r = self.post(url, headers=headers, data=urlencode(payload))
368 metafile = {'metafile': m.content}
369 payload = {'name': sp}
370 r = self.post(url, headers=headers, data=payload, files=metafile)
371 if r.status_code != expected_status:
372 raise ValueError('Failed to post SP data [%s]' % repr(r))
376 page.expected_value('//div[@class="alert alert-success"]/p/text()',
377 'SP Successfully added')
379 def set_sp_default_nameids(self, idp, sp, nameids):
381 nameids is a list of Name ID formats to enable
383 idpsrv = self.servers[idp]
384 idpuri = idpsrv['baseuri']
385 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
386 headers = {'referer': url}
387 headers['content-type'] = 'application/x-www-form-urlencoded'
388 payload = {'submit': 'Submit',
389 'allowed_nameids': ', '.join(nameids)}
390 r = idpsrv['session'].post(url, headers=headers,
392 if r.status_code != 200:
393 raise ValueError('Failed to post SP data [%s]' % repr(r))
395 # pylint: disable=dangerous-default-value
396 def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
399 Set allowed attributes and mapping in the IDP or the SP. In the
400 case of the SP both allowed attributes and the mapping need to
401 be provided. An empty option for either means delete all values.
403 mapping is a list of list of rules of the form:
404 [['from-1', 'to-1'], ['from-2', 'from-2']]
406 ex. [['*', '*'], ['fullname', 'namefull']]
408 attrs is the list of attributes that will be allowed:
409 ['fullname', 'givenname', 'surname']
411 idpsrv = self.servers[idp]
412 idpuri = idpsrv['baseuri']
413 if spname: # per-SP setting
414 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
416 mapname = 'Attribute Mapping'
417 attrname = 'Allowed Attributes'
418 else: # global default
419 url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
420 mapname = 'default attribute mapping'
421 attrname = 'default allowed attributes'
423 headers = {'referer': url}
424 headers['content-type'] = 'application/x-www-form-urlencoded'
425 payload = {'submit': 'Submit'}
428 payload['%s %s-from' % (mapname, count)] = m[0]
429 payload['%s %s-to' % (mapname, count)] = m[1]
433 payload['%s %s-name' % (attrname, count)] = attr
435 r = idpsrv['session'].post(url, headers=headers,
437 if r.status_code != 200:
438 raise ValueError('Failed to post IDP data [%s]' % repr(r))
440 def fetch_rest_page(self, idpname, uri):
442 idpname - the name of the IDP to fetch the page from
443 uri - the URI of the page to retrieve
445 The URL for the request is built from known-information in
448 returns dict if successful
449 returns ValueError if the output is unparseable
451 baseurl = self.servers[idpname].get('baseuri')
452 page = self.fetch_page(
454 '%s%s' % (baseurl, uri)
456 return json.loads(page.text)
458 def get_rest_sp(self, idpname, spname=None):
460 uri = '/%s/rest/providers/saml2/SPS/' % idpname
462 uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
464 return self.fetch_rest_page(idpname, uri)