Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from __future__ import unicode_literals
from django.utils.encoding import iri_to_uri
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
from hashlib import sha1
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication
from . import settings
from .models import ApiKey
import datetime
import dateutil.parser
import hmac
import logging
import uuid


logger = logging.getLogger(__name__)


class SignatureAuthentication(BaseAuthentication):
    def authenticate(self, request):
        return self._authenticate(request)

    def _authenticate(self, request):
        logging.debug('Trying signature authentication.')
        if not self._try_authorize(request):
            logger.debug('The user is not trying to use signature authentication.')
            return None  # The user is not trying to use HMAC authentication.

        # If we're here, then the user is trying to authenticate using Signature Auth
        # If something fails here, raise an error!
        logging.debug('Success! Starting signature authentication.')

        key_str = self._get_api_key(request)
        if not key_str:
            msg = 'No key_str specified.'
            logging.info('Signature authentication failed: %s', msg)
            raise exceptions.AuthenticationFailed(msg)

        key = self._find_key(key_str)
        if not key:
            msg = 'Given key #{} does not exist!'.format(key_str)
            logging.info('Signature authentication failed: %s', msg)
            raise exceptions.AuthenticationFailed(msg)

        if not self._validate_timestamp_expiration(request):
            msg = 'X-Auth-Timestamp: {} is invalid or has expired'.format(self._get_timestamp(request))
            logging.info('Signature authentication failed: %s', msg)
            raise exceptions.AuthenticationFailed(msg)

        if not self._verify_signature(request, key):
            msg = 'Invalid auth signature.'
            logging.info('Signature authentication failed: %s', msg)
            raise exceptions.AuthenticationFailed(msg)

        logging.debug('Signature authentication succeeded for user %s', str(key.user))
        return key.user, key

    def _get_api_key(self, request):
        api_key_header = request.META.get(settings.API_KEY_HEADER, '')
        api_key_url = request.GET.get(settings.API_KEY_QUERY_PARAM, '')

        if api_key_url and api_key_header and api_key_url != api_key_header:
            msg = _('Conflicting url and header versions of api key are defined.')
            raise exceptions.AuthenticationFailed(msg)

        if api_key_header:
            return api_key_header

        if api_key_url:
            msg = _('Providing api key in the url is not supported as of now.')
            raise exceptions.AuthenticationFailed(msg)

        return ''

    def _find_key(self, api_key):
        if type(api_key) != uuid.UUID:
            try:
                api_key = uuid.UUID(api_key)
            except ValueError:
                msg = 'Invalid api_key.'
                logging.info('Signature authentication failed: ' + msg)
                raise exceptions.AuthenticationFailed(msg)

        try:
            return ApiKey.objects.get(key=api_key)
        except ApiKey.DoesNotExist:
            return None

    def _try_authorize(self, request):
        auth = request.META.get(settings.SIGNATURE_HEADER, '')
        if not auth:
            return False

        parts = auth.split(' ')

        if len(parts) != 2 or parts[0].lower() != settings.SIGNATURE_START_VALUE:
            return False
        return True

    def _validate_timestamp_expiration(self, request):
        utc_now = datetime.datetime.utcnow().replace(tzinfo=utc)
        ts = self._get_timestamp(request)
        try:
            ts_datetime = dateutil.parser.parse(ts)
        except ValueError:
            return False

        if not ts_datetime.tzinfo:
            ts_datetime = ts_datetime.replace(tzinfo=utc)

        expires_in = datetime.timedelta(seconds=settings.EXPIRATION_TIME)
        if (ts_datetime - expires_in) > utc_now:
            # Request from too far into the future.
            return False

        if (ts_datetime + expires_in) < utc_now:
            # Expired.
            return False

        return True

    def _verify_signature(self, request, key):
        given_signature = self._get_signature(request)
        own_signature = self._create_signature(request, key)
        logger.debug('EXPECTED: "{}", GIVEN: "{}"'.format(own_signature, given_signature))
        return given_signature == own_signature

    def _create_signature(self, request, key):
        return hmac.new(str(key.secret), self._create_signature_base_string(request), sha1).hexdigest()

    def _create_signature_base_string(self, request):
        """
        Headers: (request-target) X-Auth-Timestamp X-Auth-User-Id
        Ref: http://tools.ietf.org/html/draft-cavage-http-signatures-03#section-2.3
        """
        signature_value = '(request-target): {0} {1}\nx-auth-timestamp: {2}\nx-auth-api-key: {3}'.format(
            request.method.lower(),
            iri_to_uri(request.get_full_path()),
            self._get_timestamp(request).strip(),
            self._get_api_key(request).strip())
        logger.debug('RAW EXPECTED: {0}'.format(signature_value))
        return signature_value

    def _get_signature(self, request):
        try:
            return request.META.get(settings.SIGNATURE_HEADER).split(' ')[1]
        except IndexError:
            return ''

    def _get_timestamp(self, request):
        return request.META.get(settings.TIMESTAMP_HEADER, '')