Repository URL to install this package:
|
Version:
1.0 ▾
|
from __future__ import unicode_literals
from django.utils.encoding import iri_to_uri
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
from hashlib import sha1
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication
from . import settings
from .models import ApiKey
import datetime
import dateutil.parser
import hmac
import logging
import uuid
logger = logging.getLogger(__name__)
class SignatureAuthentication(BaseAuthentication):
def authenticate(self, request):
return self._authenticate(request)
def _authenticate(self, request):
logging.debug('Trying signature authentication.')
if not self._try_authorize(request):
logger.debug('The user is not trying to use signature authentication.')
return None # The user is not trying to use HMAC authentication.
# If we're here, then the user is trying to authenticate using Signature Auth
# If something fails here, raise an error!
logging.debug('Success! Starting signature authentication.')
key_str = self._get_api_key(request)
if not key_str:
msg = 'No key_str specified.'
logging.info('Signature authentication failed: %s', msg)
raise exceptions.AuthenticationFailed(msg)
key = self._find_key(key_str)
if not key:
msg = 'Given key #{} does not exist!'.format(key_str)
logging.info('Signature authentication failed: %s', msg)
raise exceptions.AuthenticationFailed(msg)
if not self._validate_timestamp_expiration(request):
msg = 'X-Auth-Timestamp: {} is invalid or has expired'.format(self._get_timestamp(request))
logging.info('Signature authentication failed: %s', msg)
raise exceptions.AuthenticationFailed(msg)
if not self._verify_signature(request, key):
msg = 'Invalid auth signature.'
logging.info('Signature authentication failed: %s', msg)
raise exceptions.AuthenticationFailed(msg)
logging.debug('Signature authentication succeeded for user %s', str(key.user))
return key.user, key
def _get_api_key(self, request):
api_key_header = request.META.get(settings.API_KEY_HEADER, '')
api_key_url = request.GET.get(settings.API_KEY_QUERY_PARAM, '')
if api_key_url and api_key_header and api_key_url != api_key_header:
msg = _('Conflicting url and header versions of api key are defined.')
raise exceptions.AuthenticationFailed(msg)
if api_key_header:
return api_key_header
if api_key_url:
msg = _('Providing api key in the url is not supported as of now.')
raise exceptions.AuthenticationFailed(msg)
return ''
def _find_key(self, api_key):
if type(api_key) != uuid.UUID:
try:
api_key = uuid.UUID(api_key)
except ValueError:
msg = 'Invalid api_key.'
logging.info('Signature authentication failed: ' + msg)
raise exceptions.AuthenticationFailed(msg)
try:
return ApiKey.objects.get(key=api_key)
except ApiKey.DoesNotExist:
return None
def _try_authorize(self, request):
auth = request.META.get(settings.SIGNATURE_HEADER, '')
if not auth:
return False
parts = auth.split(' ')
if len(parts) != 2 or parts[0].lower() != settings.SIGNATURE_START_VALUE:
return False
return True
def _validate_timestamp_expiration(self, request):
utc_now = datetime.datetime.utcnow().replace(tzinfo=utc)
ts = self._get_timestamp(request)
try:
ts_datetime = dateutil.parser.parse(ts)
except ValueError:
return False
if not ts_datetime.tzinfo:
ts_datetime = ts_datetime.replace(tzinfo=utc)
expires_in = datetime.timedelta(seconds=settings.EXPIRATION_TIME)
if (ts_datetime - expires_in) > utc_now:
# Request from too far into the future.
return False
if (ts_datetime + expires_in) < utc_now:
# Expired.
return False
return True
def _verify_signature(self, request, key):
given_signature = self._get_signature(request)
own_signature = self._create_signature(request, key)
logger.debug('EXPECTED: "{}", GIVEN: "{}"'.format(own_signature, given_signature))
return given_signature == own_signature
def _create_signature(self, request, key):
return hmac.new(str(key.secret), self._create_signature_base_string(request), sha1).hexdigest()
def _create_signature_base_string(self, request):
"""
Headers: (request-target) X-Auth-Timestamp X-Auth-User-Id
Ref: http://tools.ietf.org/html/draft-cavage-http-signatures-03#section-2.3
"""
signature_value = '(request-target): {0} {1}\nx-auth-timestamp: {2}\nx-auth-api-key: {3}'.format(
request.method.lower(),
iri_to_uri(request.get_full_path()),
self._get_timestamp(request).strip(),
self._get_api_key(request).strip())
logger.debug('RAW EXPECTED: {0}'.format(signature_value))
return signature_value
def _get_signature(self, request):
try:
return request.META.get(settings.SIGNATURE_HEADER).split(' ')[1]
except IndexError:
return ''
def _get_timestamp(self, request):
return request.META.get(settings.TIMESTAMP_HEADER, '')