Repository URL to install this package:
|
Version:
0.24.7.dev0 ▾
|
import jwt
import logging
from django.contrib.auth import get_user_model
from django.utils.encoding import smart_text
from django.utils.translation import ugettext as _
from rest_framework_jwt.authentication import (JSONWebTokenAuthentication,
get_authorization_header,)
from rest_framework_jwt.settings import api_settings
from rest_framework import exceptions
from dockerhub.constants import AccountTypes
# header used by the gateway to indicate that csrf passed
CSRF_VERIFIED_HEADER = "HTTP_X_CSRF_VERIFIED"
CSRF_BAD_TOKEN = "CSRF Failed: CSRF token missing or incorrect."
User = get_user_model()
logger = logging.getLogger(__name__)
jwt_decode_handler = api_settings.JWT_DECODE_HANDLER
jwt_get_user_id_from_payload = api_settings.JWT_PAYLOAD_GET_USER_ID_HANDLER
def enforce_csrf(request):
csrf_verified = request.META.get(CSRF_VERIFIED_HEADER)
# Assume that anything defined as 'safe' RFC2616 does not needs protection
if request.method in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
return
if getattr(request, '_dont_enforce_csrf_checks', False):
# Mechanism to turn off CSRF checks for test suite.
# It comes after the creation of CSRF cookies, so that
# everything else continues to work exactly the same
# (e.g. cookies are sent, etc.), but before any
# branches that call reject().
# https://github.com/django/django/blob/2d66d04e7376a66774a7fec5eecdbde6971f6ad2/django/middleware/csrf.py
return
if request.is_secure():
# TODO: we're not implementing strict referrer checking see note
# below for django implementation:
# https://github.com/django/django/blob/2d66d04e7376a66774a7fec5eecdbde6971f6ad2/django/middleware/csrf.py#L135
pass
if csrf_verified is None:
# if the csrf-verified header is not set
# we assume that the gateway didn't need to perform CSRF check
# due to non-session csrf.
# TODO: Once the gateway adds support for authentication method on
# on the JWT token, we should enforce that non-safe session requests
# have the header set and reject the request if not.
return
# check against a header 'false' string not a python boolean
if csrf_verified != 'true':
logger.warning("CSRF Validation Failed", extra={'request': request})
raise exceptions.AuthenticationFailed(CSRF_BAD_TOKEN)
# if we got here, CSRF has passed verification
return
class HubJSONWebTokenAuthentication(JSONWebTokenAuthentication):
def get_jwt_value(self, request):
"""
Override the `get_jwt_value` to allow us to parse multiple types
of headers. This should allow us to support multiple headers to align
with Garant:
"Authorization: Bearer <Token>"
"Authorization: JWT <Token>"
This can be removed when the callers are using `bearer` instead of
`JWT` for the token.
https://github.com/GetBlimp/django-rest-framework-jwt/blob/3223c0d6f48c48f91abe3cc53a8c377a7817ab24/rest_framework_jwt/authentication.py#L74
"""
auth = get_authorization_header(request).split()
auth_header_prefixes = ('jwt', 'bearer',)
if not auth or smart_text(auth[0].lower()) not in auth_header_prefixes:
return None
if len(auth) != 2:
return None
# if the token doesn't look like a JWT, we let the next
# authentication method try to handle it
try:
jwt.get_unverified_header(auth[1])
except jwt.exceptions.DecodeError:
return None
return auth[1]
def authenticate(self, request):
"""
Returns a two-tuple of `User` and token if a valid signature has been
supplied using JWT-based authentication. If authentication failed it
raises an AuthenticationFailed exception. Otherwise returns `None` if
no authentication has been attempted.
"""
jwt_value = self.get_jwt_value(request)
if jwt_value is None:
return None
try:
payload = jwt_decode_handler(jwt_value)
except jwt.ExpiredSignature:
msg = _('Signature has expired.')
raise exceptions.AuthenticationFailed(msg)
except jwt.DecodeError:
msg = _('Error decoding signature.')
raise exceptions.AuthenticationFailed(msg)
user = self.authenticate_credentials(payload)
enforce_csrf(request)
return (user, jwt_value)
def authenticate_credentials(self, payload):
"""
Returns an active user that matches the payload's user id and email.
"""
try:
user_id = jwt_get_user_id_from_payload(payload)
if user_id is not None:
user = User.objects.get(
username=user_id,
is_active=True,
type=AccountTypes.USER,
)
else:
msg = 'Invalid payload'
raise exceptions.AuthenticationFailed(msg)
except User.DoesNotExist:
msg = 'Invalid signature'
raise exceptions.AuthenticationFailed(msg)
return user