Report to user if an LDAP error occurs
[cascardo/ipsilon.git] / tests / helpers / http.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
4
5 from lxml import html
6 import requests
7 import string
8 import urlparse
9 import json
10 from urllib import urlencode
11 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
12
13
14 class WrongPage(Exception):
15     pass
16
17
18 class PageTree(object):
19
20     def __init__(self, result):
21         self.result = result
22         self.text = result.text
23         self._tree = None
24
25     @property
26     def tree(self):
27         if self._tree is None:
28             self._tree = html.fromstring(self.text)
29         return self._tree
30
31     def first_value(self, rule):
32         result = self.tree.xpath(rule)
33         if isinstance(result, list):
34             if len(result) > 0:
35                 result = result[0]
36             else:
37                 result = None
38         return result
39
40     def all_values(self, rule):
41         result = self.tree.xpath(rule)
42         if isinstance(result, list):
43             return result
44         return [result]
45
46     def make_referer(self):
47         return self.result.url
48
49     def expected_value(self, rule, expected):
50         value = self.first_value(rule)
51         if value != expected:
52             raise ValueError("Expected [%s], got [%s]" % (expected, value))
53
54
55 class HttpSessions(object):
56
57     def __init__(self):
58         self.servers = dict()
59
60     def add_server(self, name, baseuri, user=None, pwd=None):
61         new = {'baseuri': baseuri,
62                'session': requests.Session()}
63         if user:
64             new['user'] = user
65         if pwd:
66             new['pwd'] = pwd
67         self.servers[name] = new
68
69     def get_session(self, url):
70         for srv in self.servers:
71             d = self.servers[srv]
72             if url.startswith(d['baseuri']):
73                 return d['session']
74
75         raise ValueError("Unknown URL: %s" % url)
76
77     def get(self, url, krb=False, **kwargs):
78         session = self.get_session(url)
79         allow_redirects = False
80         if krb:
81             # python-requests-kerberos isn't too bright about doing mutual
82             # authentication and it tries to do it on any non-401 response
83             # which doesn't work in our case since we follow redirects.
84             kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
85             kwargs['auth'] = kerberos_auth
86             allow_redirects = True
87         return session.get(url, allow_redirects=allow_redirects, **kwargs)
88
89     def post(self, url, **kwargs):
90         session = self.get_session(url)
91         return session.post(url, allow_redirects=False, **kwargs)
92
93     def access(self, action, url, krb=False, **kwargs):
94         action = string.lower(action)
95         if action == 'get':
96             return self.get(url, krb, **kwargs)
97         elif action == 'post':
98             return self.post(url, **kwargs)
99         else:
100             raise ValueError("Unknown action type: [%s]" % action)
101
102     def new_url(self, referer, action):
103         if action.startswith('/'):
104             u = urlparse.urlparse(referer)
105             return '%s://%s%s' % (u.scheme, u.netloc, action)
106         return action
107
108     def get_form_data(self, page, form_id, input_fields):
109         form_selector = '//form'
110         if form_id:
111             form_selector += '[@id="%s"]' % form_id
112         values = []
113         action = page.first_value('%s/@action' % form_selector)
114         values.append(action)
115         method = page.first_value('%s/@method' % form_selector)
116         values.append(method)
117         for field in input_fields:
118             value = page.all_values('%s/input/@%s' % (form_selector,
119                                                       field))
120             values.append(value)
121         return values
122
123     def handle_login_form(self, idp, page):
124         if not isinstance(page, PageTree):
125             raise TypeError("Expected PageTree object")
126
127         srv = self.servers[idp]
128
129         try:
130             results = self.get_form_data(page, "login_form", ["name", "value"])
131             action_url = results[0]
132             method = results[1]
133             names = results[2]
134             values = results[3]
135             if action_url is None:
136                 raise Exception
137         except Exception:  # pylint: disable=broad-except
138             raise WrongPage("Not a Login Form Page")
139
140         referer = page.make_referer()
141         headers = {'referer': referer}
142         payload = {}
143         for i in range(0, len(names)):
144             payload[names[i]] = values[i]
145
146         # replace known values
147         payload['login_name'] = srv['user']
148         payload['login_password'] = srv['pwd']
149
150         return [method, self.new_url(referer, action_url),
151                 {'headers': headers, 'data': payload}]
152
153     def handle_return_form(self, page):
154         if not isinstance(page, PageTree):
155             raise TypeError("Expected PageTree object")
156
157         try:
158             results = self.get_form_data(page, "saml-response",
159                                          ["name", "value"])
160             action_url = results[0]
161             if action_url is None:
162                 raise Exception
163             method = results[1]
164             names = results[2]
165             values = results[3]
166         except Exception:  # pylint: disable=broad-except
167             raise WrongPage("Not a Return Form Page")
168
169         referer = page.make_referer()
170         headers = {'referer': referer}
171
172         payload = {}
173         for i in range(0, len(names)):
174             payload[names[i]] = values[i]
175
176         return [method, self.new_url(referer, action_url),
177                 {'headers': headers, 'data': payload}]
178
179     def handle_openid_form(self, page):
180         if not isinstance(page, PageTree):
181             raise TypeError("Expected PageTree object")
182
183         if not page.first_value('//title/text()') == \
184                 'OpenID transaction in progress':
185             raise WrongPage('Not OpenID autosubmit form')
186
187         try:
188             results = self.get_form_data(page, None,
189                                          ["name", "value"])
190             action_url = results[0]
191             if action_url is None:
192                 raise Exception
193             method = results[1]
194             names = results[2]
195             values = results[3]
196         except Exception:  # pylint: disable=broad-except
197             raise WrongPage("Not OpenID autosubmit form")
198
199         referer = page.make_referer()
200         headers = {'referer': referer}
201
202         payload = {}
203         for i in range(0, len(names)):
204             payload[names[i]] = values[i]
205
206         return [method, self.new_url(referer, action_url),
207                 {'headers': headers, 'data': payload}]
208
209     def handle_openid_consent_form(self, page):
210         if not isinstance(page, PageTree):
211             raise TypeError("Expected PageTree object")
212
213         try:
214             results = self.get_form_data(page, "consent_form",
215                                          ['name', 'value'])
216             action_url = results[0]
217             if action_url is None:
218                 raise Exception
219             method = results[1]
220             names = results[2]
221             values = results[3]
222         except Exception:  # pylint: disable=broad-except
223             raise WrongPage("Not an OpenID Consent Form Page")
224
225         referer = page.make_referer()
226         headers = {'referer': referer}
227
228         payload = {}
229         for i in range(0, len(names)):
230             payload[names[i]] = values[i]
231
232         # Replace known values
233         payload['decided_allow'] = 'Allow'
234
235         return [method, self.new_url(referer, action_url),
236                 {'headers': headers, 'data': payload}]
237
238     def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
239         """
240         Fetch a page and parse the response code to determine what to do
241         next.
242
243         The login process consists of redirections (302/303) and
244         potentially an unauthorized (401). For the case of unauthorized
245         try the page returned in case of fallback authentication.
246         """
247         url = target_url
248         action = 'get'
249         args = {}
250
251         while True:
252             r = self.access(action, url, krb=krb, **args)
253             if r.status_code == 303 or r.status_code == 302:
254                 if not follow_redirect:
255                     return PageTree(r)
256                 url = r.headers['location']
257                 action = 'get'
258                 args = {}
259             elif r.status_code == 401:
260                 page = PageTree(r)
261                 if r.headers.get('WWW-Authenticate', None) is None:
262                     return page
263
264                 # Fall back, hopefully to testauth authentication.
265                 try:
266                     (action, url, args) = self.handle_login_form(idp, page)
267                     continue
268                 except WrongPage:
269                     pass
270             elif r.status_code == 200:
271                 page = PageTree(r)
272
273                 try:
274                     (action, url, args) = self.handle_login_form(idp, page)
275                     continue
276                 except WrongPage:
277                     pass
278
279                 try:
280                     (action, url, args) = self.handle_return_form(page)
281                     continue
282                 except WrongPage:
283                     pass
284
285                 try:
286                     (action, url, args) = self.handle_openid_consent_form(page)
287                     continue
288                 except WrongPage:
289                     pass
290
291                 try:
292                     (action, url, args) = self.handle_openid_form(page)
293                     continue
294                 except WrongPage:
295                     pass
296
297                 # Either we got what we wanted, or we have to stop anyway
298                 return page
299             else:
300                 raise ValueError("Unhandled status (%d) on url %s" % (
301                                  r.status_code, url))
302
303     def auth_to_idp(self, idp, krb=False, rule=None, expected=None):
304
305         srv = self.servers[idp]
306         target_url = '%s/%s/' % (srv['baseuri'], idp)
307
308         r = self.access('get', target_url, krb=krb)
309         if r.status_code != 200:
310             raise ValueError("Access to idp failed: %s" % repr(r))
311
312         page = PageTree(r)
313         page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
314         href = page.first_value('//div[@id="content"]/p/a/@href')
315         url = self.new_url(target_url, href)
316
317         page = self.fetch_page(idp, url, krb=krb)
318
319         if rule is None:
320             rule = '//div[@id="welcome"]/p/text()'
321         if expected is None:
322             expected = 'Welcome %s!' % srv['user']
323
324         page.expected_value(rule, expected)
325
326     def logout_from_idp(self, idp):
327
328         srv = self.servers[idp]
329         target_url = '%s/%s/logout' % (srv['baseuri'], idp)
330
331         r = self.access('get', target_url)
332         if r.status_code != 200:
333             raise ValueError("Logout from idp failed: %s" % repr(r))
334
335     def get_sp_metadata(self, idp, sp):
336         idpsrv = self.servers[idp]
337         idpuri = idpsrv['baseuri']
338
339         spuri = self.servers[sp]['baseuri']
340
341         return (idpuri, requests.get('%s/saml2/metadata' % spuri))
342
343     def add_sp_metadata(self, idp, sp, rest=False):
344         expected_status = 200
345         (idpuri, m) = self.get_sp_metadata(idp, sp)
346         url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
347         headers = {'referer': url}
348         if rest:
349             expected_status = 201
350             payload = {'metadata': m.content}
351             headers['content-type'] = 'application/x-www-form-urlencoded'
352             url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
353             r = self.post(url, headers=headers, data=urlencode(payload))
354         else:
355             metafile = {'metafile': m.content}
356             payload = {'name': sp}
357             r = self.post(url, headers=headers, data=payload, files=metafile)
358         if r.status_code != expected_status:
359             raise ValueError('Failed to post SP data [%s]' % repr(r))
360
361         if not rest:
362             page = PageTree(r)
363             page.expected_value('//div[@class="alert alert-success"]/p/text()',
364                                 'SP Successfully added')
365
366     def set_sp_default_nameids(self, idp, sp, nameids):
367         """
368         nameids is a list of Name ID formats to enable
369         """
370         idpsrv = self.servers[idp]
371         idpuri = idpsrv['baseuri']
372         url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
373         headers = {'referer': url}
374         headers['content-type'] = 'application/x-www-form-urlencoded'
375         payload = {'submit': 'Submit',
376                    'allowed_nameids': ', '.join(nameids)}
377         r = idpsrv['session'].post(url, headers=headers,
378                                    data=payload)
379         if r.status_code != 200:
380             raise ValueError('Failed to post SP data [%s]' % repr(r))
381
382     # pylint: disable=dangerous-default-value
383     def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
384                                    spname=None):
385         """
386         Set allowed attributes and mapping in the IDP or the SP. In the
387         case of the SP both allowed attributes and the mapping need to
388         be provided. An empty option for either means delete all values.
389
390         mapping is a list of list of rules of the form:
391            [['from-1', 'to-1'], ['from-2', 'from-2']]
392
393         ex. [['*', '*'], ['fullname', 'namefull']]
394
395         attrs is the list of attributes that will be allowed:
396            ['fullname', 'givenname', 'surname']
397         """
398         idpsrv = self.servers[idp]
399         idpuri = idpsrv['baseuri']
400         if spname:  # per-SP setting
401             url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
402                 idpuri, idp, spname)
403             mapname = 'Attribute Mapping'
404             attrname = 'Allowed Attributes'
405         else:  # global default
406             url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
407             mapname = 'default attribute mapping'
408             attrname = 'default allowed attributes'
409
410         headers = {'referer': url}
411         headers['content-type'] = 'application/x-www-form-urlencoded'
412         payload = {'submit': 'Submit'}
413         count = 0
414         for m in mapping:
415             payload['%s %s-from' % (mapname, count)] = m[0]
416             payload['%s %s-to' % (mapname, count)] = m[1]
417             count += 1
418         count = 0
419         for attr in attrs:
420             payload['%s %s-name' % (attrname, count)] = attr
421             count += 1
422         r = idpsrv['session'].post(url, headers=headers,
423                                    data=payload)
424         if r.status_code != 200:
425             raise ValueError('Failed to post IDP data [%s]' % repr(r))
426
427     def fetch_rest_page(self, idpname, uri):
428         """
429         idpname - the name of the IDP to fetch the page from
430         uri - the URI of the page to retrieve
431
432         The URL for the request is built from known-information in
433         the session.
434
435         returns dict if successful
436         returns ValueError if the output is unparseable
437         """
438         baseurl = self.servers[idpname].get('baseuri')
439         page = self.fetch_page(
440             idpname,
441             '%s%s' % (baseurl, uri)
442         )
443         return json.loads(page.text)
444
445     def get_rest_sp(self, idpname, spname=None):
446         if spname is None:
447             uri = '/%s/rest/providers/saml2/SPS/' % idpname
448         else:
449             uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
450
451         return self.fetch_rest_page(idpname, uri)