Repository URL to install this package:
|
Version:
0.4.2 ▾
|
import sys
from attribute_converter import to_local
from saml2 import time_util
from saml2 import s_utils
from saml2.s_utils import OtherError
from saml2.validate import valid_instance
from saml2.validate import NotValid
from saml2.response import IncorrectlySigned
def _dummy(_arg):
return None
class Request(object):
def __init__(self, sec_context, receiver_addrs, log=None, timeslack=0,
debug=0):
self.sec = sec_context
self.receiver_addrs = receiver_addrs
self.timeslack = timeslack
self.log = log
self.debug = debug
if self.debug and not self.log:
self.debug = 0
self.xmlstr = ""
self.name_id = ""
self.message = None
self.not_on_or_after = 0
self.signature_check = _dummy # has to be set !!!
def _clear(self):
self.xmlstr = ""
self.name_id = ""
self.message = None
self.not_on_or_after = 0
def _loads(self, xmldata, decode=True):
if decode:
if self.debug:
self.log.debug("Expected to decode and inflate xml data")
decoded_xml = s_utils.decode_base64_and_inflate(xmldata)
else:
decoded_xml = xmldata
# own copy
self.xmlstr = decoded_xml[:]
if self.debug:
self.log.info("xmlstr: %s" % (self.xmlstr,))
try:
self.message = self.signature_check(decoded_xml)
except TypeError:
raise
except Exception, excp:
if self.log:
self.log.info("EXCEPTION: %s", excp)
if not self.message:
if self.log:
self.log.error("Response was not correctly signed")
self.log.info(decoded_xml)
raise IncorrectlySigned()
if self.debug:
self.log.info("request: %s" % (self.message,))
try:
valid_instance(self.message)
except NotValid, exc:
if self.log:
self.log.error("Not valid request: %s" % exc.args[0])
else:
print >> sys.stderr, "Not valid request: %s" % exc.args[0]
raise
return self
def issue_instant_ok(self):
""" Check that the request was issued at a reasonable time """
upper = time_util.shift_time(time_util.time_in_a_while(days=1),
self.timeslack).timetuple()
lower = time_util.shift_time(time_util.time_a_while_ago(days=1),
-self.timeslack).timetuple()
# print "issue_instant: %s" % self.message.issue_instant
# print "%s < x < %s" % (lower, upper)
issued_at = time_util.str_to_time(self.message.issue_instant)
return issued_at > lower and issued_at < upper
def _verify(self):
assert self.message.version == "2.0"
if self.message.destination and \
self.message.destination not in self.receiver_addrs:
if self.log:
self.log.error("%s != %s" % (self.message.destination,
self.receiver_addrs))
else:
print >> sys.stderr, "%s != %s" % (self.message.destination,
self.receiver_addrs)
raise OtherError("Not destined for me!")
assert self.issue_instant_ok()
return self
def loads(self, xmldata, decode=True):
return self._loads(xmldata, decode)
def verify(self):
try:
return self._verify()
except AssertionError:
return None
def subject_id(self):
""" The name of the subject can be in either of
BaseID, NameID or EncryptedID
:return: The identifier if there is one
"""
if "subject" in self.message.keys():
_subj = self.message.subject
if "base_id" in _subj.keys() and _subj.base_id:
return _subj.base_id
elif _subj.name_id:
return _subj.name_id
else:
if "base_id" in self.message.keys() and self.message.base_id:
return self.message.base_id
elif self.message.name_id:
return self.message.name_id
else: # EncryptedID
pass
def sender(self):
return self.message.issuer.text()
class LogoutRequest(Request):
def __init__(self, sec_context, receiver_addrs, log=None, timeslack=0,
debug=0):
Request.__init__(self, sec_context, receiver_addrs, log, timeslack,
debug)
self.signature_check = self.sec.correctly_signed_logout_request
class AttributeQuery(Request):
def __init__(self, sec_context, receiver_addrs, log=None, timeslack=0,
debug=0):
Request.__init__(self, sec_context, receiver_addrs, log, timeslack,
debug)
self.signature_check = self.sec.correctly_signed_attribute_query
def attribute(self):
""" Which attributes that are sought for """
return []
class AuthnRequest(Request):
def __init__(self, sec_context, attribute_converters, receiver_addrs,
log=None, timeslack=0, debug=0):
Request.__init__(self, sec_context, receiver_addrs, log, timeslack,
debug)
self.attribute_converters = attribute_converters
self.signature_check = self.sec.correctly_signed_authn_request
def attributes(self):
return to_local(self.attribute_converters, self.message)
class AuthzRequest(Request):
def __init__(self, sec_context, receiver_addrs, log=None, timeslack=0,
debug=0):
Request.__init__(self, sec_context, receiver_addrs, log, timeslack,
debug)
self.signature_check = self.sec.correctly_signed_logout_request
def action(self):
""" Which action authorization is requested for """
pass
def evidence(self):
""" The evidence on which the decision is based """
pass
def resource(self):
""" On which resource the action is expected to occur """
pass