Repository URL to install this package:
|
Version:
0.32.0 ▾
|
# Copyright (c) 2015 Docker, Inc. All rights reserved.
import base64
import binascii
import datetime
import logging
import os
from calendar import timegm
import jwt
from cryptography.hazmat.primitives.serialization import Encoding
from django.contrib.auth import get_user_model
from OpenSSL import crypto
from OpenSSL._util import lib as _lib
from rest_framework_jwt.settings import api_settings
from rest_framework import exceptions
from django.conf import settings
from base64 import b64decode
from dockerhub.telemetry.client import statsd
logger = logging.getLogger(__name__)
_x509_store = None
def jwt_encode_handler(payload):
"""A custom encode handler that supports x5c headers from JWK"""
headers = None
if getattr(settings, 'JWT_CERT', False):
# adds a der cert as per:
# https://tools.ietf.org/html/draft-ietf-jose-json-web-key-41#section-4.7
der_cert = base64.b64encode(settings.JWT_CERT.public_bytes(Encoding.DER))
headers = {"x5c": [der_cert]}
token = jwt.encode(
payload,
api_settings.JWT_SECRET_KEY,
api_settings.JWT_ALGORITHM,
headers=headers
).decode('utf-8')
return token
def jwt_payload_handler(user):
"""
'sub' (Subject): The subject account of the JWT
'username': The Account username
'jti' (JWT ID): unique identifier for the JWT
'iat' (Issued At): The time at which the JWT was issued in "Seconds Since the Epoch"
'exp' (Expiration): Expiration time in "Seconds Since the Epoch"
See Spec for details
https://self-issued.info/docs/draft-ietf-oauth-json-web-token.html#RegisteredClaimName
Deprecated:
session_id - use jti
user_id - use sub
email - Relying on the email on the JWT is deprecated due to long lived
nature of the token.
"""
try:
username = user.get_username()
except AttributeError:
username = user.username
# matches gateway implementation
# https://github.com/docker/hub-gateway/blob/2f66a4eb24fa2c87513cb6c856ebe99d33793cf4/nginx/resty/api_gateway_utils.lua#L59
jti = binascii.hexlify(os.urandom(15)).upper()
now = datetime.datetime.utcnow()
return {
'jti': jti,
'session_id': jti, # deprecated in favor of jti
'sub': user.uuid.hex,
'user_id': user.uuid.hex, # deprecated in favor of sub
'email': user.email,
'username': username,
'iat': now,
'exp': now + api_settings.JWT_EXPIRATION_DELTA
}
def jwt_get_user_id_from_payload_handler(payload):
"""
Override this function if user_id is formatted differently in payload
"""
user_id = payload.get('username')
return user_id
def jwt_cache_key(username):
return "dockerhub:jwt:token:{}".format(username)
def get_jwt(username):
User = get_user_model()
user = User.objects.get(username=username)
payload = jwt_payload_handler(user)
# Include original issued at time for a brand new token,
# to allow token refresh
if api_settings.JWT_ALLOW_REFRESH:
payload['orig_iat'] = timegm(
datetime.datetime.utcnow().utctimetuple()
)
token = jwt_encode_handler(payload)
return token
def datetime_to_epoch(dt):
"""
Convert UTC datetime object to EPOCH seconds
:param datetime dt: A datetime object
:return: EPOCH seconds as float
"""
return timegm(dt.utctimetuple()) + dt.microsecond / 1000000.
def update_cert_expiry_stat(name, cert):
"""
Update certificate expiry time as number of seconds since EPOCH as statsd metric
:param str name: Name of the metric to update
:param cert: X509 certificate whose expiry time is updated
:type: :obj:`cryptography.x509.Certificate`
"""
statsd.gauge(name, datetime_to_epoch(cert.not_valid_after), tags=["name:jwt_inter_cert"])
# Eventually the JWT library will support all the cert verification logic
# until then, we have to work around it...
def load_store(force_reload=False):
global _x509_store
if _x509_store is None or force_reload:
store = crypto.X509Store()
for cert_str in settings.JWT_TRUSTED_CA_CERTS:
logger.debug("Loading trusted certificate {}...".format(cert_str[:40]))
store.add_cert(crypto.load_certificate(crypto.FILETYPE_PEM, cert_str))
_x509_store = store
return _x509_store
def get_pub_key(cert):
"""
Given a certificate object, extract the PEM formatted public key
"""
# Yuck. There isn't a clean API to Get the PEM public key from a cert,
# so we're mucking around at the crypto back-end layer, and unstable APIs
# This is likely to break at some point in the future
pub_pkey = cert.get_pubkey()
bio = crypto._new_mem_buf()
if _lib.i2d_PUBKEY_bio(bio, pub_pkey._pkey) < 0:
crypto._raise_current_error()
# Hand convert from DER to PEM format
return '-----BEGIN PUBLIC KEY-----\n{}-----END PUBLIC KEY-----'.format(
crypto._bio_to_string(bio).encode('base64', 'strict'))
def jwt_decode_handler(token):
options = {
'verify_exp': api_settings.JWT_VERIFY_EXPIRATION,
}
try:
headers = jwt.get_unverified_header(token)
if headers['alg'] == 'RS256':
if 'x5c' in headers:
token_certs = headers['x5c']
# XXX This doesn't support intermediary certificates, but we're
# not using them anyway. If this code starts accepting tokens from
# someplace else with intermediaries that aren't explicitly trusted,
# this will need to be refactored to walk the list of token_certs
cert = crypto.load_certificate(
crypto.FILETYPE_ASN1, b64decode(token_certs[0]))
# This will raise if it's not verified
crypto.X509StoreContext(load_store(), cert).verify_certificate()
# If we got here, then the cert for this token is OK
# Now we can decode the token with verification
jwt_obj = jwt.decode(token, get_pub_key(cert),
api_settings.JWT_VERIFY,
options=options,
leeway=api_settings.JWT_LEEWAY,
audience=api_settings.JWT_AUDIENCE,
issuer=api_settings.JWT_ISSUER,
algorithms='RS256')
# logger.debug("Decoded JWT: %r", jwt_obj)
return jwt_obj
else:
# This shouldn't happen with our implementation
msg = 'Invalid token: headers {}'.format(headers)
logger.error("Unsupported RS256 model " + msg)
raise exceptions.AuthenticationFailed(msg)
elif headers['alg'] == 'HS256':
return jwt.decode(token, api_settings.JWT_SECRET_KEY,
api_settings.JWT_VERIFY,
options=options,
leeway=api_settings.JWT_LEEWAY,
audience=api_settings.JWT_AUDIENCE,
issuer=api_settings.JWT_ISSUER,
algorithms='HS256')
else:
logger.warn("Authentication failed due to unsupported JWT algorithm: %s", headers['alg'])
raise exceptions.AuthenticationFailed("Failed to validate JWT")
except crypto.X509StoreContextError as e:
logger.exception("Token validation failed against trusted cert")
raise exceptions.AuthenticationFailed("Failed to validate JWT")
except Exception as e:
logger.exception("Token validation failed")
raise exceptions.AuthenticationFailed("Failed to validate JWT")