Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Authlib / integrations / asgi_client / base_app.py
Size: Mime:
import logging

from authlib.common.urls import urlparse
from authlib.jose import JsonWebToken, jwk
from authlib.oidc.core import UserInfo, CodeIDToken, ImplicitIDToken
from .._client import BaseApp
from .._client import (
    MissingRequestTokenError,
    MissingTokenError,
)

__all__ = ['AsyncBaseApp']

log = logging.getLogger(__name__)


class AsyncBaseApp(BaseApp):
    async def _load_server_metadata(self):
        if self._server_metadata_url:
            metadata = await self._fetch_server_metadata(self._server_metadata_url)
            self._server_metadata_url = None  # only load once
            self.server_metadata.update(metadata)
        return self.server_metadata

    async def _send_token_update(self, token, refresh_token=None, access_token=None):
        await self._update_token(
            token,
            refresh_token=refresh_token,
            access_token=access_token,
        )

    async def _create_oauth1_authorization_url(self, client, authorization_endpoint, **kwargs):
        params = {}
        if self.request_token_params:
            params.update(self.request_token_params)
        token = await client.fetch_request_token(
            self.request_token_url, **params
        )
        log.debug('Fetch request token: {!r}'.format(token))
        url = client.create_authorization_url(authorization_endpoint, **kwargs)
        return {'url': url, 'request_token': token}

    async def create_authorization_url(self, redirect_uri=None, **kwargs):
        """Generate the authorization url and state for HTTP redirect.

        :param redirect_uri: Callback or redirect URI for authorization.
        :param kwargs: Extra parameters to include.
        :return: dict
        """
        metadata = await self._load_server_metadata()
        authorization_endpoint = self.authorize_url
        if not authorization_endpoint and not self.request_token_url:
            authorization_endpoint = metadata.get('authorization_endpoint')

        if not authorization_endpoint:
            raise RuntimeError('Missing "authorize_url" value')

        if self.authorize_params:
            kwargs.update(self.authorize_params)

        async with self._get_oauth_client(**metadata) as client:
            client.redirect_uri = redirect_uri

            if self.request_token_url:
                return await self._create_oauth1_authorization_url(
                    client, authorization_endpoint, **kwargs)
            else:
                return self._create_oauth2_authorization_url(
                    client, authorization_endpoint, **kwargs)

    async def fetch_access_token(self, redirect_uri=None, request_token=None, **params):
        """Fetch access token in one step.

        :param redirect_uri: Callback or Redirect URI that is used in
                             previous :meth:`authorize_redirect`.
        :param request_token: A previous request token for OAuth 1.
        :param params: Extra parameters to fetch access token.
        :return: A token dict.
        """
        metadata = await self._load_server_metadata()
        token_endpoint = self.access_token_url
        if not token_endpoint and not self.request_token_url:
            token_endpoint = metadata.get('token_endpoint')

        async with self._get_oauth_client(**metadata) as client:
            if self.request_token_url:
                client.redirect_uri = redirect_uri
                if request_token is None:
                    raise MissingRequestTokenError()
                # merge request token with verifier
                token = {}
                token.update(request_token)
                token.update(params)
                client.token = token
                kwargs = self.access_token_params or {}
                token = await client.fetch_access_token(token_endpoint, **kwargs)
                client.redirect_uri = None
            else:
                client.redirect_uri = redirect_uri
                kwargs = {}
                if self.access_token_params:
                    kwargs.update(self.access_token_params)
                kwargs.update(params)
                token = await client.fetch_token(token_endpoint, **kwargs)
            return token

    async def request(self, method, url, token=None, **kwargs):
        if self.api_base_url and not url.startswith(('https://', 'http://')):
            url = urlparse.urljoin(self.api_base_url, url)

        async with self._get_oauth_client() as client:
            request = kwargs.pop('request', None)

            if kwargs.get('withhold_token'):
                return await client.request(method, url, **kwargs)

            if token is None and request:
                token = await self._fetch_token(request)

            if token is None:
                raise MissingTokenError()

            client.token = token
            return await client.request(method, url, **kwargs)

    async def userinfo(self, **kwargs):
        """Fetch user info from ``userinfo_endpoint``."""
        metadata = await self._load_server_metadata()
        resp = await self.get(metadata['userinfo_endpoint'], **kwargs)
        data = resp.json()

        compliance_fix = metadata.get('userinfo_compliance_fix')
        if compliance_fix:
            data = await compliance_fix(self, data)
        return UserInfo(data)

    async def _parse_id_token(self, token, nonce, claims_options=None):
        """Return an instance of UserInfo from token's ``id_token``."""
        claims_params = dict(
            nonce=nonce,
            client_id=self.client_id,
        )
        if 'access_token' in token:
            claims_params['access_token'] = token['access_token']
            claims_cls = CodeIDToken
        else:
            claims_cls = ImplicitIDToken

        metadata = await self._load_server_metadata()
        if claims_options is None and 'issuer' in metadata:
            claims_options = {'iss': {'values': [metadata['issuer']]}}

        alg_values = metadata.get('id_token_signing_alg_values_supported')
        if not alg_values:
            alg_values = ['RS256']

        jwt = JsonWebToken(alg_values)

        jwk_set = await self._fetch_jwk_set()
        try:
            claims = jwt.decode(
                token['id_token'], key=jwk_set,
                claims_cls=claims_cls,
                claims_options=claims_options,
                claims_params=claims_params,
            )
        except ValueError:
            jwk_set = await self._fetch_jwk_set(force=True)
            claims = jwt.decode(
                token['id_token'], key=jwk_set,
                claims_cls=claims_cls,
                claims_options=claims_options,
                claims_params=claims_params,
            )

        claims.validate(leeway=120)
        return UserInfo(claims)

    async def _fetch_jwk_set(self, force=False):
        metadata = await self._load_server_metadata()
        jwk_set = metadata.get('jwks')
        if jwk_set and not force:
            return jwk_set

        uri = metadata.get('jwks_uri')
        if not uri:
            raise RuntimeError('Missing "jwks_uri" in metadata')

        jwk_set = await self._fetch_server_metadata(uri)
        self.server_metadata['jwks'] = jwk_set
        return jwk_set

    async def _fetch_server_metadata(self, url):
        resp = await self.request('GET', url, withhold_token=True)
        return resp.json()