1 # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING
3 from ipsilon.util.log import Log
6 class SAMLSession(Log):
8 A SAML login session used to track login/logout state.
10 session_id - ID of the login session
11 provider_id - ID of the SP
12 session - the Login session object
13 logoutstate - dict containing logout state info
14 session_indexes - the IDs of any login session we've seen
17 When a new session is seen for the same user any existing session
18 is thrown away. We keep the original session_id though and send
19 all that we've seen to the SP when performing a logout to ensure
20 that all sessions get logged out.
22 logout state is a dictionary containing (potentially)
25 relaystate - The relaystate from the Logout Request or Response
26 id - The Logout request id that initiated the logout
27 request - Dump of the initial logout request
29 def __init__(self, session_id, provider_id, session,
32 self.session_id = session_id
33 self.provider_id = provider_id
34 self.session = session
35 self.logoutstate = logoutstate
36 self.session_indexes = [session_id]
38 def set_logoutstate(self, relaystate, request_id, request=None):
39 self.logoutstate = dict(relaystate=relaystate,
44 self.debug('session_id %s' % self.session_id)
45 self.debug('session_index %s' % self.session_indexes)
46 self.debug('provider_id %s' % self.provider_id)
47 self.debug('session %s' % self.session)
48 self.debug('logoutstate %s' % self.logoutstate)
51 class SAMLSessionsContainer(Log):
53 Store SAML session information.
55 The sessions are stored in two dicts which represent the state that
58 When a user logs in, add_session() is called and a new SAMLSession
59 created and added to the sessions dict, keyed on provider_id.
61 When a user logs out, the next login session is found and moved to
62 sessions_logging_out. remove_session() will look in both when trying
67 self.sessions = dict()
68 self.sessions_logging_out = dict()
70 def add_session(self, session_id, provider_id, session):
72 Add a new session to the logged-in bucket.
74 Drop any existing sessions that might exist for this
75 provider. We have no control over the SP's so if it sends
76 us another login, accept it.
78 If an existing session exists drop it but keep a copy of
79 its session index. When we logout we send ALL session indexes
80 we've received to ensure that they are all logged out.
82 samlsession = SAMLSession(session_id, provider_id, session)
84 old_session = self.find_session_by_provider(provider_id)
85 if old_session is not None:
86 samlsession.session_indexes.extend(old_session.session_indexes)
87 self.debug("old session: %s" % old_session.session_indexes)
88 self.debug("new session: %s" % samlsession.session_indexes)
89 self.remove_session_by_provider(provider_id)
90 self.sessions[provider_id] = samlsession
93 def remove_session_by_provider(self, provider_id):
95 Remove all instances of this provider from either session
98 if provider_id in self.sessions:
99 self.sessions.pop(provider_id)
100 if provider_id in self.sessions_logging_out:
101 self.sessions_logging_out.pop(provider_id)
103 def find_session_by_provider(self, provider_id):
105 Return a given session from either pool.
107 Return None if no session for a provider is found.
109 if provider_id in self.sessions:
110 return self.sessions[provider_id]
111 if provider_id in self.sessions_logging_out:
112 return self.sessions_logging_out[provider_id]
115 def start_logout(self, session):
117 Move a session into the logging_out state
121 if session.provider_id in self.sessions_logging_out:
124 session = self.sessions.pop(session.provider_id)
126 self.sessions_logging_out[session.provider_id] = session
128 def get_next_logout(self, remove=True):
130 Get the next session in the logged-in state and move
131 it to the logging_out state. Return the session that is
134 :param remove: for IdP-initiated logout we can't remove the
135 session otherwise when the request comes back
136 in the user won't be seen as being logged-on.
138 Return None if no more sessions in login state.
141 provider_id = self.sessions.keys()[0]
146 session = self.sessions.pop(provider_id)
148 session = self.sessions.itervalues().next()
150 if provider_id in self.sessions_logging_out:
151 self.sessions_logging_out.pop(provider_id)
153 self.sessions_logging_out[provider_id] = session
157 def get_last_session(self):
158 if self.count() != 1:
159 raise ValueError('Not exactly one session left')
162 provider_id = self.sessions_logging_out.keys()[0]
166 return self.sessions_logging_out.pop(provider_id)
170 Return number of active login/logging out sessions.
172 return len(self.sessions) + len(self.sessions_logging_out)
176 for s in self.sessions:
177 self.debug('Login Session: %d' % count)
178 session = self.sessions[s]
180 self.debug('-----------------------')
182 for s in self.sessions_logging_out:
183 self.debug('Logging-out Session: %d' % count)
184 session = self.sessions_logging_out[s]
186 self.debug('-----------------------')
189 if __name__ == '__main__':
190 provider1 = "http://127.0.0.10/saml2"
191 provider2 = "http://127.0.0.11/saml2"
193 saml_sessions = SAMLSessionsContainer()
196 testsession = saml_sessions.get_last_session()
198 assert(saml_sessions.count() == 0)
200 saml_sessions.add_session("_123456",
204 saml_sessions.add_session("_789012",
209 testsession = saml_sessions.get_last_session()
211 assert(saml_sessions.count() == 2)
213 testsession = saml_sessions.find_session_by_provider(provider1)
214 assert(testsession.provider_id == provider1)
215 assert(testsession.session_id == "_123456")
216 assert(testsession.session == "sessiondata")
218 # Test get_next_logout() by fetching both values out. Do some
219 # basic accounting to ensure we get both values eventually.
220 providers = [provider1, provider2]
221 testsession = saml_sessions.get_next_logout()
222 providers.remove(testsession.provider_id) # should be one of them
224 testsession = saml_sessions.get_next_logout()
225 assert(testsession.provider_id == providers[0]) # should be the other
227 saml_sessions.start_logout(testsession)
228 saml_sessions.remove_session_by_provider(provider2)
230 assert(saml_sessions.count() == 1)
232 testsession = saml_sessions.get_last_session()
233 assert(testsession.provider_id == provider1)
235 saml_sessions.remove_session_by_provider(provider1)
236 assert(saml_sessions.count() == 0)