Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hub-client / tests / test_authentication.py
Size: Mime:
import uuid
from datetime import datetime, timedelta

from django.contrib.auth import get_user_model
from django.test import TestCase
from django.test.client import RequestFactory
from rest_framework import exceptions

from dockerhub import utils
from dockerhub.authentication import (HubJSONWebTokenAuthentication,
                                      CSRF_VERIFIED_HEADER)
from dockerhub.constants import AccountTypes

User = get_user_model()


# a random not-signed but valid jwt token
SAMPLE_JWT_TOKEN = ("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
                    ".eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9."
                    "TJVA95OrM7E2cBab30RMHrHDcEfxjoYZgeFONFh7HgQ")


class AuthenticationTestCase(TestCase):
    auth = HubJSONWebTokenAuthentication()

    def setUp(self):
        self.factory = RequestFactory()

        self.user = User.objects.create(
            username='myuser',
            type=AccountTypes.USER,
            is_active=True,
        )
        self.user.uuid = uuid.uuid4()

    def test_authenticate_succeeds_with_token(self):
        payload = utils.jwt_payload_handler(self.user)
        token = utils.jwt_encode_handler(payload)

        auth_header = 'Bearer {0}'.format(token)
        request = self.factory.get('/auth/', HTTP_AUTHORIZATION=auth_header)

        user, jwt_token = self.auth.authenticate(request)
        self.assertEqual(jwt_token, token)
        self.assertEqual(self.user, user)

    def test_authenticate_fails_with_expired_token(self):
        payload = utils.jwt_payload_handler(self.user)
        # make sure expired tokens are not valid
        payload['exp'] = datetime.utcnow() - timedelta(hours=1)
        token = utils.jwt_encode_handler(payload)
        auth_header = 'Bearer {0}'.format(token)
        request = self.factory.get('/auth/', HTTP_AUTHORIZATION=auth_header)

        with self.assertRaises(exceptions.AuthenticationFailed):
            self.auth.authenticate(request)

    def test_authenticate_fails_with_invalid_signature(self):
        auth_header = 'Bearer {0}'.format(SAMPLE_JWT_TOKEN)
        request = self.factory.get('/auth/', HTTP_AUTHORIZATION=auth_header)

        with self.assertRaises(exceptions.AuthenticationFailed):
            self.auth.authenticate(request)

    def test_authenticate_fails_for_unknown_user(self):
        payload = utils.jwt_payload_handler(self.user)
        payload['username'] = 'johndoe'
        token = utils.jwt_encode_handler(payload)
        auth_header = 'Bearer {0}'.format(token)
        request = self.factory.get('/auth/', HTTP_AUTHORIZATION=auth_header)

        with self.assertRaises(exceptions.AuthenticationFailed):
            self.auth.authenticate(request)

    def test_authenticate_with_unsafe_method_fails(self):
        payload = utils.jwt_payload_handler(self.user)
        token = utils.jwt_encode_handler(payload)

        auth_header = 'Bearer {0}'.format(token)
        for method in ('post', 'put', 'patch'):
            request = getattr(self.factory, method)('/auth/', HTTP_AUTHORIZATION=auth_header)
            request.META[CSRF_VERIFIED_HEADER] = 'false'
            with self.assertRaises(exceptions.AuthenticationFailed):
                user, jwt_token = self.auth.authenticate(request)

    def test_authenticate_succeeds_with_csrf_header(self):
        payload = utils.jwt_payload_handler(self.user)
        token = utils.jwt_encode_handler(payload)

        auth_header = 'Bearer {0}'.format(token)
        for method in ('post', 'put', 'patch'):
            request = getattr(self.factory, method)('/auth/', HTTP_AUTHORIZATION=auth_header)
            request.META[CSRF_VERIFIED_HEADER] = 'true'
            user, jwt_token = self.auth.authenticate(request)
            self.assertEqual(jwt_token, token)
            self.assertEqual(self.user, user)

    def test_authenticate_with_get_pass_csrf_checks(self):
        payload = utils.jwt_payload_handler(self.user)
        token = utils.jwt_encode_handler(payload)

        auth_header = 'Bearer {0}'.format(token)
        request = self.factory.get('/auth/', HTTP_AUTHORIZATION=auth_header)
        request.META[CSRF_VERIFIED_HEADER] = 'false'

        user, jwt_token = self.auth.authenticate(request)
        self.assertEqual(jwt_token, token)
        self.assertEqual(self.user, user)

    def test_authenticate_credentials_success(self):
        test_user = User.objects.create(
            username='test',
            type=AccountTypes.USER,
            is_active=True,
        )
        payload = {
            'username': test_user.username
        }
        auth_client = HubJSONWebTokenAuthentication()
        user = auth_client.authenticate_credentials(payload)
        self.assertEqual(user, test_user)

    def test_user_inactive(self):
        test_user = User.objects.create(
            username='test',
            type=AccountTypes.USER,
            is_active=False,
        )
        payload = {
            'username': test_user.username
        }
        auth_client = HubJSONWebTokenAuthentication()
        with self.assertRaises(exceptions.AuthenticationFailed):
            auth_client.authenticate_credentials(payload)

    def test_user_is_org(self):
        test_user = User.objects.create(
            username='test',
            type=AccountTypes.ORGANIZATION,
            is_active=True,
        )
        payload = {'username': test_user.username}
        auth_client = HubJSONWebTokenAuthentication()
        with self.assertRaises(exceptions.AuthenticationFailed):
            auth_client.authenticate_credentials(payload)

    def test_get_jwt_value_returns_none_when_no_auth_is_present(self):
        request = self.factory.get('/v2/myendpoint/')
        auth = HubJSONWebTokenAuthentication()
        self.assertIsNone(auth.get_jwt_value(request))

    def test_get_jwt_value_returns_none_when_auth_is_not_jwt_or_bearer(self):
        request = self.factory.get('/v2/myendpoint/', HTTP_AUTHORIZATION="Test Hello")
        auth = HubJSONWebTokenAuthentication()
        self.assertIsNone(auth.get_jwt_value(request))

    def test_get_jwt_with_malformed_jwt_header_returns_none(self):
        """
        The Authentication class should return none to allow another
        Auth class to try to authenticate.
        """
        bad_headers = ('JWT ', 'jwt', 'JWT multiple spaces', 'bearer',
                       'Bearer', 'Bearer multiple spaces',
                       'Bearer dXNlcm5hbWU6cGFzc3dvcmQ=')
        for header in bad_headers:
            request = self.factory.get('/v2/myendpoint/', HTTP_AUTHORIZATION=header)
            auth = HubJSONWebTokenAuthentication()
            self.assertIsNone(auth.get_jwt_value(request))

    def test_get_jwt_with_jwt_header_returns_valid_token(self):
        request = self.factory.get('/v2/myendpoint/',
                                   HTTP_AUTHORIZATION="JWT " + SAMPLE_JWT_TOKEN)
        auth = HubJSONWebTokenAuthentication()
        self.assertEquals(auth.get_jwt_value(request), SAMPLE_JWT_TOKEN)

    def test_get_jwt_with_bearer_header_returns_valid_token(self):
        request = self.factory.get('/v2/myendpoint/',
                                   HTTP_AUTHORIZATION="Bearer " + SAMPLE_JWT_TOKEN)
        auth = HubJSONWebTokenAuthentication()
        self.assertEquals(auth.get_jwt_value(request), SAMPLE_JWT_TOKEN)