Repository URL to install this package:
|
Version:
0.11.1.dev0 ▾
|
import logging
from calendar import timegm
import datetime
from django.contrib.auth import get_user_model
import jwt
from OpenSSL import crypto
from OpenSSL._util import lib as _lib
from rest_framework_jwt.settings import api_settings
from rest_framework import exceptions
from django.conf import settings
from base64 import b64decode
logger = logging.getLogger(__name__)
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
def jwt_payload_handler(user):
try:
username = user.get_username()
except AttributeError:
username = user.username
return {
'user_id': user.uuid.hex,
'email': user.email,
'username': username,
'exp': datetime.datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA
}
def jwt_get_user_id_from_payload_handler(payload):
"""
Override this function if user_id is formatted differently in payload
"""
user_id = payload.get('username')
return user_id
def jwt_cache_key(username):
return "dockerhub:jwt:token:{}".format(username)
def get_jwt(username):
User = get_user_model()
user = User.objects.get(username=username)
payload = jwt_payload_handler(user)
# Include original issued at time for a brand new token,
# to allow token refresh
if api_settings.JWT_ALLOW_REFRESH:
payload['orig_iat'] = timegm(
datetime.datetime.utcnow().utctimetuple()
)
token = jwt_encode_handler(payload)
return token
def get_user(request):
from django.contrib.auth.models import AnonymousUser
User = get_user_model()
try:
user_uuid = request.session['_user_uuid']
user = User.objects.get(uuid=user_uuid)
except (KeyError, AssertionError, User.DoesNotExist) as e:
logger.info("Exception error: {}".format(e))
user = AnonymousUser()
return user
# Eventually the JWT library will support all the cert verification logic
# until then, we have to work around it...
def load_store():
store = crypto.X509Store()
for cert_str in settings.JWT_TRUSTED_CA_CERTS:
logger.debug("Loading trusted certificate {}...".format(cert_str[:40]))
store.add_cert(crypto.load_certificate(crypto.FILETYPE_PEM, cert_str))
return store
store = load_store()
def get_pub_key(cert):
"""
Given a certificate object, extract the PEM formatted public key
"""
# Yuck. There isn't a clean API to Get the PEM public key from a cert,
# so we're mucking around at the crypto back-end layer, and unstable APIs
# This is likely to break at some point in the future
pub_pkey = cert.get_pubkey()
bio = crypto._new_mem_buf()
if _lib.i2d_PUBKEY_bio(bio, pub_pkey._pkey) < 0:
crypto._raise_current_error()
# Hand convert from DER to PEM format
return '-----BEGIN PUBLIC KEY-----\n{}-----END PUBLIC KEY-----'.format(
crypto._bio_to_string(bio).encode('base64', 'strict'))
def jwt_decode_handler(token):
options = {
'verify_exp': api_settings.JWT_VERIFY_EXPIRATION,
}
try:
headers = jwt.get_unverified_header(token)
if headers['alg'] == 'RS256':
if 'x5c' in headers:
token_certs = headers['x5c']
# XXX This doesn't support intermediary certificates, but we're
# not using them anyway. If this code starts accepting tokens from
# someplace else with intermediaries that aren't explicitly trusted,
# this will need to be refactored to walk the list of token_certs
cert = crypto.load_certificate(
crypto.FILETYPE_ASN1, b64decode(token_certs[0]))
# This will raise if it's not verified
crypto.X509StoreContext(store, cert).verify_certificate()
# If we got here, then the cert for this token is OK
# Now we can decode the token with verification
jwt_obj = jwt.decode(token, get_pub_key(cert),
api_settings.JWT_VERIFY,
options=options,
leeway=api_settings.JWT_LEEWAY,
audience=api_settings.JWT_AUDIENCE,
issuer=api_settings.JWT_ISSUER,
algorithms='RS256')
# logger.debug("Decoded JWT: %r", jwt_obj)
return jwt_obj
else:
# This shouldn't happen with our implementation
msg = 'Invalid token: headers {}'.format(headers)
logger.error("Unsupported RS256 model " + msg)
raise exceptions.AuthenticationFailed(msg)
elif headers['alg'] == 'HS256':
return jwt.decode(token, api_settings.JWT_SECRET_KEY,
api_settings.JWT_VERIFY,
options=options,
leeway=api_settings.JWT_LEEWAY,
audience=api_settings.JWT_AUDIENCE,
issuer=api_settings.JWT_ISSUER,
algorithms='HS256')
else:
raise exceptions.AuthenticationFailed("Unsupported JWT Algorithm: %r", headers['alg'])
except crypto.X509StoreContextError as e:
logger.exception("Token validation failed %r", e)
msg = "Failed to validate JWT token against trusted cert"
raise exceptions.AuthenticationFailed(msg)
except Exception as e:
logger.exception("Token validation failed %r", e)
msg = "Failed to decode JWT token: %r" % (e)
raise exceptions.AuthenticationFailed(msg)