Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hub-client / dockerhub / utils.py
Size: Mime:
import logging
from calendar import timegm
import datetime

from django.contrib.auth import get_user_model
import jwt
from OpenSSL import crypto
from OpenSSL._util import lib as _lib


from rest_framework_jwt.settings import api_settings
from rest_framework import exceptions
from django.conf import settings
from base64 import b64decode

logger = logging.getLogger(__name__)

jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER


def jwt_payload_handler(user):
    try:
        username = user.get_username()
    except AttributeError:
        username = user.username

    return {
        'user_id': user.uuid.hex,
        'email': user.email,
        'username': username,
        'exp': datetime.datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA
    }


def jwt_get_user_id_from_payload_handler(payload):
    """
    Override this function if user_id is formatted differently in payload
    """
    user_id = payload.get('username')
    return user_id


def jwt_cache_key(username):
    return "dockerhub:jwt:token:{}".format(username)


def get_jwt(username):
    User = get_user_model()
    user = User.objects.get(username=username)
    payload = jwt_payload_handler(user)

    # Include original issued at time for a brand new token,
    # to allow token refresh
    if api_settings.JWT_ALLOW_REFRESH:
        payload['orig_iat'] = timegm(
            datetime.datetime.utcnow().utctimetuple()
        )
    token = jwt_encode_handler(payload)
    return token


def get_user(request):
    from django.contrib.auth.models import AnonymousUser
    User = get_user_model()
    try:
        user_uuid = request.session['_user_uuid']
        user = User.objects.get(uuid=user_uuid)
    except (KeyError, AssertionError, User.DoesNotExist) as e:
        logger.info("Exception error: {}".format(e))
        user = AnonymousUser()
    return user


# Eventually the JWT library will support all the cert verification logic
# until then, we have to work around it...
def load_store():
    store = crypto.X509Store()
    for cert_str in settings.JWT_TRUSTED_CA_CERTS:
        logger.debug("Loading trusted certificate {}...".format(cert_str[:40]))
        store.add_cert(crypto.load_certificate(crypto.FILETYPE_PEM, cert_str))
    return store


store = load_store()


def get_pub_key(cert):
    """
    Given a certificate object, extract the PEM formatted public key
    """
    # Yuck. There isn't a clean API to Get the PEM public key from a cert,
    # so we're mucking around at the crypto back-end layer, and unstable APIs
    # This is likely to break at some point in the future
    pub_pkey = cert.get_pubkey()
    bio = crypto._new_mem_buf()
    if _lib.i2d_PUBKEY_bio(bio, pub_pkey._pkey) < 0:
        crypto._raise_current_error()
    # Hand convert from DER to PEM format
    return '-----BEGIN PUBLIC KEY-----\n{}-----END PUBLIC KEY-----'.format(
        crypto._bio_to_string(bio).encode('base64', 'strict'))


def jwt_decode_handler(token):
    options = {
        'verify_exp': api_settings.JWT_VERIFY_EXPIRATION,
    }

    try:
        headers = jwt.get_unverified_header(token)
        if headers['alg'] == 'RS256':
            if 'x5c' in headers:
                token_certs = headers['x5c']

                # XXX This doesn't support intermediary certificates, but we're
                # not using them anyway. If this code starts accepting tokens from
                # someplace else with intermediaries that aren't explicitly trusted,
                # this will need to be refactored to walk the list of token_certs
                cert = crypto.load_certificate(
                    crypto.FILETYPE_ASN1, b64decode(token_certs[0]))

                # This will raise if it's not verified
                crypto.X509StoreContext(store, cert).verify_certificate()
                # If we got here, then the cert for this token is OK

                # Now we can decode the token with verification
                jwt_obj = jwt.decode(token, get_pub_key(cert),
                                     api_settings.JWT_VERIFY,
                                     options=options,
                                     leeway=api_settings.JWT_LEEWAY,
                                     audience=api_settings.JWT_AUDIENCE,
                                     issuer=api_settings.JWT_ISSUER,
                                     algorithms='RS256')
                # logger.debug("Decoded JWT: %r", jwt_obj)
                return jwt_obj
            else:
                # This shouldn't happen with our implementation
                msg = 'Invalid token: headers {}'.format(headers)
                logger.error("Unsupported RS256 model " + msg)
                raise exceptions.AuthenticationFailed(msg)

        elif headers['alg'] == 'HS256':
            return jwt.decode(token, api_settings.JWT_SECRET_KEY,
                              api_settings.JWT_VERIFY,
                              options=options,
                              leeway=api_settings.JWT_LEEWAY,
                              audience=api_settings.JWT_AUDIENCE,
                              issuer=api_settings.JWT_ISSUER,
                              algorithms='HS256')
        else:
            raise exceptions.AuthenticationFailed("Unsupported JWT Algorithm: %r", headers['alg'])
    except crypto.X509StoreContextError as e:
        logger.exception("Token validation failed %r", e)
        msg = "Failed to validate JWT token against trusted cert"
        raise exceptions.AuthenticationFailed(msg)
    except Exception as e:
        logger.exception("Token validation failed %r", e)
        msg = "Failed to decode JWT token: %r" % (e)
        raise exceptions.AuthenticationFailed(msg)