#!/usr/bin/python
#
-# Copyright (C) 2014 Simo Sorce <simo@redhat.com>
-#
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
+# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
from lxml import html
import requests
import urlparse
import json
from urllib import urlencode
+from requests_kerberos import HTTPKerberosAuth, OPTIONAL
class WrongPage(Exception):
def first_value(self, rule):
result = self.tree.xpath(rule)
- if type(result) is list:
+ if isinstance(result, list):
if len(result) > 0:
result = result[0]
else:
def all_values(self, rule):
result = self.tree.xpath(rule)
- if type(result) is list:
+ if isinstance(result, list):
return result
return [result]
raise ValueError("Unknown URL: %s" % url)
- def get(self, url, **kwargs):
+ def get(self, url, krb=False, **kwargs):
session = self.get_session(url)
- return session.get(url, allow_redirects=False, **kwargs)
+ allow_redirects = False
+ if krb:
+ # python-requests-kerberos isn't too bright about doing mutual
+ # authentication and it tries to do it on any non-401 response
+ # which doesn't work in our case since we follow redirects.
+ kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
+ kwargs['auth'] = kerberos_auth
+ allow_redirects = True
+ return session.get(url, allow_redirects=allow_redirects, **kwargs)
def post(self, url, **kwargs):
session = self.get_session(url)
return session.post(url, allow_redirects=False, **kwargs)
- def access(self, action, url, **kwargs):
+ def access(self, action, url, krb=False, **kwargs):
action = string.lower(action)
if action == 'get':
- return self.get(url, **kwargs)
+ return self.get(url, krb, **kwargs)
elif action == 'post':
return self.post(url, **kwargs)
else:
return action
def get_form_data(self, page, form_id, input_fields):
+ form_selector = '//form'
+ if form_id:
+ form_selector += '[@id="%s"]' % form_id
values = []
- action = page.first_value('//form[@id="%s"]/@action' % form_id)
+ action = page.first_value('%s/@action' % form_selector)
values.append(action)
- method = page.first_value('//form[@id="%s"]/@method' % form_id)
+ method = page.first_value('%s/@method' % form_selector)
values.append(method)
for field in input_fields:
- value = page.all_values('//form[@id="%s"]/input/@%s' % (form_id,
- field))
+ value = page.all_values('%s/input/@%s' % (form_selector,
+ field))
values.append(value)
return values
def handle_login_form(self, idp, page):
- if type(page) != PageTree:
+ if not isinstance(page, PageTree):
raise TypeError("Expected PageTree object")
srv = self.servers[idp]
{'headers': headers, 'data': payload}]
def handle_return_form(self, page):
- if type(page) != PageTree:
+ if not isinstance(page, PageTree):
raise TypeError("Expected PageTree object")
try:
return [method, self.new_url(referer, action_url),
{'headers': headers, 'data': payload}]
- def fetch_page(self, idp, target_url, follow_redirect=True):
+ def handle_openid_form(self, page):
+ if not isinstance(page, PageTree):
+ raise TypeError("Expected PageTree object")
+
+ if not page.first_value('//title/text()') == \
+ 'OpenID transaction in progress':
+ raise WrongPage('Not OpenID autosubmit form')
+
+ try:
+ results = self.get_form_data(page, None,
+ ["name", "value"])
+ action_url = results[0]
+ if action_url is None:
+ raise Exception
+ method = results[1]
+ names = results[2]
+ values = results[3]
+ except Exception: # pylint: disable=broad-except
+ raise WrongPage("Not OpenID autosubmit form")
+
+ referer = page.make_referer()
+ headers = {'referer': referer}
+
+ payload = {}
+ for i in range(0, len(names)):
+ payload[names[i]] = values[i]
+
+ return [method, self.new_url(referer, action_url),
+ {'headers': headers, 'data': payload}]
+
+ def handle_openid_consent_form(self, page):
+ if not isinstance(page, PageTree):
+ raise TypeError("Expected PageTree object")
+
+ try:
+ results = self.get_form_data(page, "consent_form",
+ ['name', 'value'])
+ action_url = results[0]
+ if action_url is None:
+ raise Exception
+ method = results[1]
+ names = results[2]
+ values = results[3]
+ except Exception: # pylint: disable=broad-except
+ raise WrongPage("Not an OpenID Consent Form Page")
+
+ referer = page.make_referer()
+ headers = {'referer': referer}
+
+ payload = {}
+ for i in range(0, len(names)):
+ payload[names[i]] = values[i]
+
+ # Replace known values
+ payload['decided_allow'] = 'Allow'
+
+ return [method, self.new_url(referer, action_url),
+ {'headers': headers, 'data': payload}]
+
+ def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
+ """
+ Fetch a page and parse the response code to determine what to do
+ next.
+
+ The login process consists of redirections (302/303) and
+ potentially an unauthorized (401). For the case of unauthorized
+ try the page returned in case of fallback authentication.
+ """
url = target_url
action = 'get'
args = {}
while True:
- r = self.access(action, url, **args) # pylint: disable=star-args
- if r.status_code == 303:
+ r = self.access(action, url, krb=krb, **args)
+ if r.status_code == 303 or r.status_code == 302:
if not follow_redirect:
return PageTree(r)
url = r.headers['location']
action = 'get'
args = {}
+ elif r.status_code == 401:
+ page = PageTree(r)
+ if r.headers.get('WWW-Authenticate', None) is None:
+ return page
+
+ # Fall back, hopefully to testauth authentication.
+ try:
+ (action, url, args) = self.handle_login_form(idp, page)
+ continue
+ except WrongPage:
+ pass
elif r.status_code == 200:
page = PageTree(r)
except WrongPage:
pass
+ try:
+ (action, url, args) = self.handle_openid_consent_form(page)
+ continue
+ except WrongPage:
+ pass
+
+ try:
+ (action, url, args) = self.handle_openid_form(page)
+ continue
+ except WrongPage:
+ pass
+
# Either we got what we wanted, or we have to stop anyway
return page
else:
raise ValueError("Unhandled status (%d) on url %s" % (
r.status_code, url))
- def auth_to_idp(self, idp):
+ def auth_to_idp(self, idp, krb=False):
srv = self.servers[idp]
target_url = '%s/%s/' % (srv['baseuri'], idp)
- r = self.access('get', target_url)
+ r = self.access('get', target_url, krb=krb)
if r.status_code != 200:
raise ValueError("Access to idp failed: %s" % repr(r))
href = page.first_value('//div[@id="content"]/p/a/@href')
url = self.new_url(target_url, href)
- page = self.fetch_page(idp, url)
+ page = self.fetch_page(idp, url, krb=krb)
+
page.expected_value('//div[@id="welcome"]/p/text()',
'Welcome %s!' % srv['user'])
def add_sp_metadata(self, idp, sp, rest=False):
expected_status = 200
- idpsrv = self.servers[idp]
(idpuri, m) = self.get_sp_metadata(idp, sp)
url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
headers = {'referer': url}
payload = {'metadata': m.content}
headers['content-type'] = 'application/x-www-form-urlencoded'
url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
- r = idpsrv['session'].post(url, headers=headers,
- data=urlencode(payload))
+ r = self.post(url, headers=headers, data=urlencode(payload))
else:
metafile = {'metafile': m.content}
payload = {'name': sp}
- r = idpsrv['session'].post(url, headers=headers,
- data=payload, files=metafile)
+ r = self.post(url, headers=headers, data=payload, files=metafile)
if r.status_code != expected_status:
raise ValueError('Failed to post SP data [%s]' % repr(r))