3 # Copyright (C) 2014 Simo Sorce <simo@redhat.com>
5 # see file 'COPYING' for use and warranty information
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 from urllib import urlencode
27 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
30 class WrongPage(Exception):
34 class PageTree(object):
36 def __init__(self, result):
38 self.text = result.text
43 if self._tree is None:
44 self._tree = html.fromstring(self.text)
47 def first_value(self, rule):
48 result = self.tree.xpath(rule)
49 if type(result) is list:
56 def all_values(self, rule):
57 result = self.tree.xpath(rule)
58 if type(result) is list:
62 def make_referer(self):
63 return self.result.url
65 def expected_value(self, rule, expected):
66 value = self.first_value(rule)
68 raise ValueError("Expected [%s], got [%s]" % (expected, value))
71 class HttpSessions(object):
76 def add_server(self, name, baseuri, user=None, pwd=None):
77 new = {'baseuri': baseuri,
78 'session': requests.Session()}
83 self.servers[name] = new
85 def get_session(self, url):
86 for srv in self.servers:
88 if url.startswith(d['baseuri']):
91 raise ValueError("Unknown URL: %s" % url)
93 def get(self, url, krb=False, **kwargs):
94 session = self.get_session(url)
95 allow_redirects = False
97 # python-requests-kerberos isn't too bright about doing mutual
98 # authentication and it tries to do it on any non-401 response
99 # which doesn't work in our case since we follow redirects.
100 kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
101 kwargs['auth'] = kerberos_auth
102 allow_redirects = True
103 return session.get(url, allow_redirects=allow_redirects, **kwargs)
105 def post(self, url, **kwargs):
106 session = self.get_session(url)
107 return session.post(url, allow_redirects=False, **kwargs)
109 def access(self, action, url, krb=False, **kwargs):
110 action = string.lower(action)
112 return self.get(url, krb, **kwargs)
113 elif action == 'post':
114 return self.post(url, **kwargs)
116 raise ValueError("Unknown action type: [%s]" % action)
118 def new_url(self, referer, action):
119 if action.startswith('/'):
120 u = urlparse.urlparse(referer)
121 return '%s://%s%s' % (u.scheme, u.netloc, action)
124 def get_form_data(self, page, form_id, input_fields):
125 form_selector = '//form'
127 form_selector += '[@id="%s"]' % form_id
129 action = page.first_value('%s/@action' % form_selector)
130 values.append(action)
131 method = page.first_value('%s/@method' % form_selector)
132 values.append(method)
133 for field in input_fields:
134 value = page.all_values('%s/input/@%s' % (form_selector,
139 def handle_login_form(self, idp, page):
140 if type(page) != PageTree:
141 raise TypeError("Expected PageTree object")
143 srv = self.servers[idp]
146 results = self.get_form_data(page, "login_form", ["name", "value"])
147 action_url = results[0]
151 if action_url is None:
153 except Exception: # pylint: disable=broad-except
154 raise WrongPage("Not a Login Form Page")
156 referer = page.make_referer()
157 headers = {'referer': referer}
159 for i in range(0, len(names)):
160 payload[names[i]] = values[i]
162 # replace known values
163 payload['login_name'] = srv['user']
164 payload['login_password'] = srv['pwd']
166 return [method, self.new_url(referer, action_url),
167 {'headers': headers, 'data': payload}]
169 def handle_return_form(self, page):
170 if type(page) != PageTree:
171 raise TypeError("Expected PageTree object")
174 results = self.get_form_data(page, "saml-response",
176 action_url = results[0]
177 if action_url is None:
182 except Exception: # pylint: disable=broad-except
183 raise WrongPage("Not a Return Form Page")
185 referer = page.make_referer()
186 headers = {'referer': referer}
189 for i in range(0, len(names)):
190 payload[names[i]] = values[i]
192 return [method, self.new_url(referer, action_url),
193 {'headers': headers, 'data': payload}]
195 def handle_openid_form(self, page):
196 if type(page) != PageTree:
197 raise TypeError("Expected PageTree object")
199 if not page.first_value('//title/text()') == \
200 'OpenID transaction in progress':
201 raise WrongPage('Not OpenID autosubmit form')
204 results = self.get_form_data(page, None,
206 action_url = results[0]
207 if action_url is None:
212 except Exception: # pylint: disable=broad-except
213 raise WrongPage("Not OpenID autosubmit form")
215 referer = page.make_referer()
216 headers = {'referer': referer}
219 for i in range(0, len(names)):
220 payload[names[i]] = values[i]
222 return [method, self.new_url(referer, action_url),
223 {'headers': headers, 'data': payload}]
225 def handle_openid_consent_form(self, page):
226 if type(page) != PageTree:
227 raise TypeError("Expected PageTree object")
230 results = self.get_form_data(page, "consent_form",
232 action_url = results[0]
233 if action_url is None:
238 except Exception: # pylint: disable=broad-except
239 raise WrongPage("Not an OpenID Consent Form Page")
241 referer = page.make_referer()
242 headers = {'referer': referer}
245 for i in range(0, len(names)):
246 payload[names[i]] = values[i]
248 # Replace known values
249 payload['decided_allow'] = 'Allow'
251 return [method, self.new_url(referer, action_url),
252 {'headers': headers, 'data': payload}]
254 def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
256 Fetch a page and parse the response code to determine what to do
259 The login process consists of redirections (302/303) and
260 potentially an unauthorized (401). For the case of unauthorized
261 try the page returned in case of fallback authentication.
268 r = self.access(action, url, krb=krb, **args)
269 if r.status_code == 303 or r.status_code == 302:
270 if not follow_redirect:
272 url = r.headers['location']
275 elif r.status_code == 401:
277 if r.headers.get('WWW-Authenticate', None) is None:
280 # Fall back, hopefully to testauth authentication.
282 (action, url, args) = self.handle_login_form(idp, page)
286 elif r.status_code == 200:
290 (action, url, args) = self.handle_login_form(idp, page)
296 (action, url, args) = self.handle_return_form(page)
302 (action, url, args) = self.handle_openid_consent_form(page)
308 (action, url, args) = self.handle_openid_form(page)
313 # Either we got what we wanted, or we have to stop anyway
316 raise ValueError("Unhandled status (%d) on url %s" % (
319 def auth_to_idp(self, idp, krb=False):
321 srv = self.servers[idp]
322 target_url = '%s/%s/' % (srv['baseuri'], idp)
324 r = self.access('get', target_url, krb=krb)
325 if r.status_code != 200:
326 raise ValueError("Access to idp failed: %s" % repr(r))
329 page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
330 href = page.first_value('//div[@id="content"]/p/a/@href')
331 url = self.new_url(target_url, href)
333 page = self.fetch_page(idp, url, krb=krb)
335 page.expected_value('//div[@id="welcome"]/p/text()',
336 'Welcome %s!' % srv['user'])
338 def logout_from_idp(self, idp):
340 srv = self.servers[idp]
341 target_url = '%s/%s/logout' % (srv['baseuri'], idp)
343 r = self.access('get', target_url)
344 if r.status_code != 200:
345 raise ValueError("Logout from idp failed: %s" % repr(r))
347 def get_sp_metadata(self, idp, sp):
348 idpsrv = self.servers[idp]
349 idpuri = idpsrv['baseuri']
351 spuri = self.servers[sp]['baseuri']
353 return (idpuri, requests.get('%s/saml2/metadata' % spuri))
355 def add_sp_metadata(self, idp, sp, rest=False):
356 expected_status = 200
357 (idpuri, m) = self.get_sp_metadata(idp, sp)
358 url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
359 headers = {'referer': url}
361 expected_status = 201
362 payload = {'metadata': m.content}
363 headers['content-type'] = 'application/x-www-form-urlencoded'
364 url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
365 r = self.post(url, headers=headers, data=urlencode(payload))
367 metafile = {'metafile': m.content}
368 payload = {'name': sp}
369 r = self.post(url, headers=headers, data=payload, files=metafile)
370 if r.status_code != expected_status:
371 raise ValueError('Failed to post SP data [%s]' % repr(r))
375 page.expected_value('//div[@class="alert alert-success"]/p/text()',
376 'SP Successfully added')
378 def set_sp_default_nameids(self, idp, sp, nameids):
380 nameids is a list of Name ID formats to enable
382 idpsrv = self.servers[idp]
383 idpuri = idpsrv['baseuri']
384 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
385 headers = {'referer': url}
386 headers['content-type'] = 'application/x-www-form-urlencoded'
387 payload = {'submit': 'Submit',
388 'allowed_nameids': ', '.join(nameids)}
389 r = idpsrv['session'].post(url, headers=headers,
391 if r.status_code != 200:
392 raise ValueError('Failed to post SP data [%s]' % repr(r))
394 # pylint: disable=dangerous-default-value
395 def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
398 Set allowed attributes and mapping in the IDP or the SP. In the
399 case of the SP both allowed attributes and the mapping need to
400 be provided. An empty option for either means delete all values.
402 mapping is a list of list of rules of the form:
403 [['from-1', 'to-1'], ['from-2', 'from-2']]
405 ex. [['*', '*'], ['fullname', 'namefull']]
407 attrs is the list of attributes that will be allowed:
408 ['fullname', 'givenname', 'surname']
410 idpsrv = self.servers[idp]
411 idpuri = idpsrv['baseuri']
412 if spname: # per-SP setting
413 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
415 mapname = 'Attribute Mapping'
416 attrname = 'Allowed Attributes'
417 else: # global default
418 url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
419 mapname = 'default attribute mapping'
420 attrname = 'default allowed attributes'
422 headers = {'referer': url}
423 headers['content-type'] = 'application/x-www-form-urlencoded'
424 payload = {'submit': 'Submit'}
427 payload['%s %s-from' % (mapname, count)] = m[0]
428 payload['%s %s-to' % (mapname, count)] = m[1]
432 payload['%s %s-name' % (attrname, count)] = attr
434 r = idpsrv['session'].post(url, headers=headers,
436 if r.status_code != 200:
437 raise ValueError('Failed to post IDP data [%s]' % repr(r))
439 def fetch_rest_page(self, idpname, uri):
441 idpname - the name of the IDP to fetch the page from
442 uri - the URI of the page to retrieve
444 The URL for the request is built from known-information in
447 returns dict if successful
448 returns ValueError if the output is unparseable
450 baseurl = self.servers[idpname].get('baseuri')
451 page = self.fetch_page(
453 '%s%s' % (baseurl, uri)
455 return json.loads(page.text)
457 def get_rest_sp(self, idpname, spname=None):
459 uri = '/%s/rest/providers/saml2/SPS/' % idpname
461 uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
463 return self.fetch_rest_page(idpname, uri)