Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hub-client / dockerhub / authentication.py
Size: Mime:
import jwt
import logging

from django.contrib.auth import get_user_model
from django.utils.encoding import smart_text
from django.utils.translation import ugettext as _
from rest_framework_jwt.authentication import (JSONWebTokenAuthentication,
                                               get_authorization_header,)
from rest_framework_jwt.settings import api_settings
from rest_framework import exceptions

from dockerhub.constants import AccountTypes

# header used by the gateway to indicate that csrf passed
CSRF_VERIFIED_HEADER = "HTTP_X_CSRF_VERIFIED"
CSRF_BAD_TOKEN = "CSRF Failed: CSRF token missing or incorrect."

User = get_user_model()

logger = logging.getLogger(__name__)

jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
jwt_get_user_id_from_payload = api_settings.JWT_PAYLOAD_GET_USER_ID_HANDLER


def enforce_csrf(request):
    csrf_verified = request.META.get(CSRF_VERIFIED_HEADER)

    # Assume that anything defined as 'safe' RFC2616 does not needs protection
    if request.method in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
        return

    if getattr(request, '_dont_enforce_csrf_checks', False):
        # Mechanism to turn off CSRF checks for test suite.
        # It comes after the creation of CSRF cookies, so that
        # everything else continues to work exactly the same
        # (e.g. cookies are sent, etc.), but before any
        # branches that call reject().
        # https://github.com/django/django/blob/2d66d04e7376a66774a7fec5eecdbde6971f6ad2/django/middleware/csrf.py
        return

    if request.is_secure():
        # TODO: we're not implementing strict referrer checking see note
        # below for django implementation:
        # https://github.com/django/django/blob/2d66d04e7376a66774a7fec5eecdbde6971f6ad2/django/middleware/csrf.py#L135
        pass

    if csrf_verified is None:
        # if the csrf-verified header is not set
        # we assume that the gateway didn't need to perform CSRF check
        # due to non-session csrf.
        # TODO: Once the gateway adds support for authentication method on
        # on the JWT token, we should enforce that non-safe session requests
        # have the header set and reject the request if not.
        return

    # check against a header 'false' string not a python boolean
    if csrf_verified != 'true':
        logger.warning("CSRF Validation Failed", extra={'request': request})
        raise exceptions.AuthenticationFailed(CSRF_BAD_TOKEN)

    # if we got here, CSRF has passed verification
    return


class HubJSONWebTokenAuthentication(JSONWebTokenAuthentication):
    def get_jwt_value(self, request):
        """
        Override the `get_jwt_value` to allow us to parse multiple types
        of headers. This should allow us to support multiple headers to align
        with Garant:

        "Authorization: Bearer <Token>"
        "Authorization: JWT <Token>"

        This can be removed when the callers are using `bearer` instead of
        `JWT` for the token.

        https://github.com/GetBlimp/django-rest-framework-jwt/blob/3223c0d6f48c48f91abe3cc53a8c377a7817ab24/rest_framework_jwt/authentication.py#L74
        """
        auth = get_authorization_header(request).split()

        auth_header_prefixes = ('jwt', 'bearer',)

        if not auth or smart_text(auth[0].lower()) not in auth_header_prefixes:
            return None

        if len(auth) != 2:
            return None

        # if the token doesn't look like a JWT, we let the next
        # authentication method try to handle it
        try:
            jwt.get_unverified_header(auth[1])
        except jwt.exceptions.DecodeError:
            return None

        return auth[1]

    def authenticate(self, request):
        """
        Returns a two-tuple of `User` and token if a valid signature has been
        supplied using JWT-based authentication.  If authentication failed it
        raises an AuthenticationFailed exception. Otherwise returns `None` if
        no authentication has been attempted.
        """
        jwt_value = self.get_jwt_value(request)
        if jwt_value is None:
            return None

        try:
            payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _('Signature has expired.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _('Error decoding signature.')
            raise exceptions.AuthenticationFailed(msg)

        user = self.authenticate_credentials(payload)

        enforce_csrf(request)

        return (user, jwt_value)

    def authenticate_credentials(self, payload):
        """
        Returns an active user that matches the payload's user id and email.
        """

        try:
            user_id = jwt_get_user_id_from_payload(payload)

            if user_id is not None:
                user = User.objects.get(
                    username=user_id,
                    is_active=True,
                    type=AccountTypes.USER,
                )
            else:
                msg = 'Invalid payload'
                raise exceptions.AuthenticationFailed(msg)
        except User.DoesNotExist:
            msg = 'Invalid signature'
            raise exceptions.AuthenticationFailed(msg)

        return user