Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hub-client / dockerhub / utils.py
Size: Mime:
# Copyright (c) 2015 Docker, Inc. All rights reserved.
import base64
import binascii
import datetime
import logging
import os
from calendar import timegm

import jwt
from cryptography.hazmat.primitives.serialization import Encoding
from django.contrib.auth import get_user_model
from OpenSSL import crypto
from OpenSSL._util import lib as _lib


from rest_framework_jwt.settings import api_settings
from rest_framework import exceptions
from django.conf import settings
from base64 import b64decode
from dockerhub.telemetry.client import statsd

logger = logging.getLogger(__name__)

_x509_store = None


def jwt_encode_handler(payload):
    """A custom encode handler that supports x5c headers from JWK"""
    headers = None
    if getattr(settings, 'JWT_CERT', False):
        # adds a der cert as per:
        # https://tools.ietf.org/html/draft-ietf-jose-json-web-key-41#section-4.7
        der_cert = base64.b64encode(settings.JWT_CERT.public_bytes(Encoding.DER))
        headers = {"x5c": [der_cert]}

    token = jwt.encode(
        payload,
        api_settings.JWT_SECRET_KEY,
        api_settings.JWT_ALGORITHM,
        headers=headers
    ).decode('utf-8')
    return token


def jwt_payload_handler(user):
    """
    'sub' (Subject): The subject account of the JWT
    'username': The Account username
    'jti' (JWT ID): unique identifier for the JWT
    'iat' (Issued At): The time at which the JWT was issued in "Seconds Since the Epoch"
    'exp' (Expiration): Expiration time in "Seconds Since the Epoch"

    See Spec for details
    https://self-issued.info/docs/draft-ietf-oauth-json-web-token.html#RegisteredClaimName

    Deprecated:
    session_id - use jti
    user_id - use sub
    email - Relying on the email on the JWT is deprecated due to long lived
    nature of the token.
    """
    try:
        username = user.get_username()
    except AttributeError:
        username = user.username

    # matches gateway implementation
    # https://github.com/docker/hub-gateway/blob/2f66a4eb24fa2c87513cb6c856ebe99d33793cf4/nginx/resty/api_gateway_utils.lua#L59
    jti = binascii.hexlify(os.urandom(15)).upper()
    now = datetime.datetime.utcnow()

    return {
        'jti': jti,
        'session_id': jti,  # deprecated in favor of jti
        'sub': user.uuid.hex,
        'user_id': user.uuid.hex,  # deprecated in favor of sub
        'email': user.email,
        'username': username,
        'iat': now,
        'exp': now + api_settings.JWT_EXPIRATION_DELTA
    }


def jwt_get_user_id_from_payload_handler(payload):
    """
    Override this function if user_id is formatted differently in payload
    """
    user_id = payload.get('username')
    return user_id


def jwt_cache_key(username):
    return "dockerhub:jwt:token:{}".format(username)


def get_jwt(username):
    User = get_user_model()
    user = User.objects.get(username=username)
    payload = jwt_payload_handler(user)

    # Include original issued at time for a brand new token,
    # to allow token refresh
    if api_settings.JWT_ALLOW_REFRESH:
        payload['orig_iat'] = timegm(
            datetime.datetime.utcnow().utctimetuple()
        )
    token = jwt_encode_handler(payload)
    return token


def datetime_to_epoch(dt):
    """
    Convert UTC datetime object to EPOCH seconds

    :param datetime dt: A datetime object
    :return: EPOCH seconds as float
    """
    return timegm(dt.utctimetuple()) + dt.microsecond / 1000000.


def update_cert_expiry_stat(name, cert):
    """
    Update certificate expiry time as number of seconds since EPOCH as statsd metric

    :param str name: Name of the metric to update
    :param cert: X509 certificate whose expiry time is updated
    :type: :obj:`cryptography.x509.Certificate`
    """
    statsd.gauge(name, datetime_to_epoch(cert.not_valid_after), tags=["name:jwt_inter_cert"])


# Eventually the JWT library will support all the cert verification logic
# until then, we have to work around it...
def load_store(force_reload=False):
    global _x509_store
    if _x509_store is None or force_reload:
        store = crypto.X509Store()
        for cert_str in settings.JWT_TRUSTED_CA_CERTS:
            logger.debug("Loading trusted certificate {}...".format(cert_str[:40]))
            store.add_cert(crypto.load_certificate(crypto.FILETYPE_PEM, cert_str))
        _x509_store = store
    return _x509_store


def get_pub_key(cert):
    """
    Given a certificate object, extract the PEM formatted public key
    """
    # Yuck. There isn't a clean API to Get the PEM public key from a cert,
    # so we're mucking around at the crypto back-end layer, and unstable APIs
    # This is likely to break at some point in the future
    pub_pkey = cert.get_pubkey()
    bio = crypto._new_mem_buf()
    if _lib.i2d_PUBKEY_bio(bio, pub_pkey._pkey) < 0:
        crypto._raise_current_error()
    # Hand convert from DER to PEM format
    return '-----BEGIN PUBLIC KEY-----\n{}-----END PUBLIC KEY-----'.format(
        crypto._bio_to_string(bio).encode('base64', 'strict'))


def jwt_decode_handler(token):
    options = {
        'verify_exp': api_settings.JWT_VERIFY_EXPIRATION,
    }

    try:
        headers = jwt.get_unverified_header(token)
        if headers['alg'] == 'RS256':
            if 'x5c' in headers:
                token_certs = headers['x5c']

                # XXX This doesn't support intermediary certificates, but we're
                # not using them anyway. If this code starts accepting tokens from
                # someplace else with intermediaries that aren't explicitly trusted,
                # this will need to be refactored to walk the list of token_certs
                cert = crypto.load_certificate(
                    crypto.FILETYPE_ASN1, b64decode(token_certs[0]))

                # This will raise if it's not verified
                crypto.X509StoreContext(load_store(), cert).verify_certificate()
                # If we got here, then the cert for this token is OK

                # Now we can decode the token with verification
                jwt_obj = jwt.decode(token, get_pub_key(cert),
                                     api_settings.JWT_VERIFY,
                                     options=options,
                                     leeway=api_settings.JWT_LEEWAY,
                                     audience=api_settings.JWT_AUDIENCE,
                                     issuer=api_settings.JWT_ISSUER,
                                     algorithms='RS256')
                # logger.debug("Decoded JWT: %r", jwt_obj)
                return jwt_obj
            else:
                # This shouldn't happen with our implementation
                msg = 'Invalid token: headers {}'.format(headers)
                logger.error("Unsupported RS256 model " + msg)
                raise exceptions.AuthenticationFailed(msg)

        elif headers['alg'] == 'HS256':
            return jwt.decode(token, api_settings.JWT_SECRET_KEY,
                              api_settings.JWT_VERIFY,
                              options=options,
                              leeway=api_settings.JWT_LEEWAY,
                              audience=api_settings.JWT_AUDIENCE,
                              issuer=api_settings.JWT_ISSUER,
                              algorithms='HS256')
        else:
            logger.warn("Authentication failed due to unsupported JWT algorithm: %s", headers['alg'])
            raise exceptions.AuthenticationFailed("Failed to validate JWT")
    except crypto.X509StoreContextError as e:
        logger.exception("Token validation failed against trusted cert")
        raise exceptions.AuthenticationFailed("Failed to validate JWT")
    except Exception as e:
        logger.exception("Token validation failed")
        raise exceptions.AuthenticationFailed("Failed to validate JWT")