Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Authlib / integrations / _client / base_app.py
Size: Mime:
import logging

from authlib.common.security import generate_token
from authlib.consts import default_user_agent
from .errors import (
    MismatchingStateError,
)

__all__ = ['BaseApp']

log = logging.getLogger(__name__)


class BaseApp(object):
    """The remote application for OAuth 1 and OAuth 2. It is used together
    with OAuth registry.

    :param name: The name of the OAuth client, like: github, twitter
    :param fetch_token: A function to fetch access token from database
    :param update_token: A function to update access token to database
    :param client_id: Client key of OAuth 1, or Client ID of OAuth 2
    :param client_secret: Client secret of OAuth 2, or Client Secret of OAuth 2
    :param request_token_url: Request Token endpoint for OAuth 1
    :param request_token_params: Extra parameters for Request Token endpoint
    :param access_token_url: Access Token endpoint for OAuth 1 and OAuth 2
    :param access_token_params: Extra parameters for Access Token endpoint
    :param authorize_url: Endpoint for user authorization of OAuth 1 or OAuth 2
    :param authorize_params: Extra parameters for Authorization Endpoint
    :param api_base_url: The base API endpoint to make requests simple
    :param client_kwargs: Extra keyword arguments for session
    :param server_metadata_url: Discover server metadata from this URL
    :param kwargs: Extra server metadata

    Create an instance of ``RemoteApp``. If ``request_token_url`` is configured,
    it would be an OAuth 1 instance, otherwise it is OAuth 2 instance::

        oauth1_client = RemoteApp(
            client_id='Twitter Consumer Key',
            client_secret='Twitter Consumer Secret',
            request_token_url='https://api.twitter.com/oauth/request_token',
            access_token_url='https://api.twitter.com/oauth/access_token',
            authorize_url='https://api.twitter.com/oauth/authenticate',
            api_base_url='https://api.twitter.com/1.1/',
        )

        oauth2_client = RemoteApp(
            client_id='GitHub Client ID',
            client_secret='GitHub Client Secret',
            api_base_url='https://api.github.com/',
            access_token_url='https://github.com/login/oauth/access_token',
            authorize_url='https://github.com/login/oauth/authorize',
            client_kwargs={'scope': 'user:email'},
        )
    """
    DEFAULT_USER_AGENT = default_user_agent
    OAUTH_APP_CONFIG = None

    def __init__(
            self, name=None, fetch_token=None, update_token=None,
            client_id=None, client_secret=None,
            request_token_url=None, request_token_params=None,
            access_token_url=None, access_token_params=None,
            authorize_url=None, authorize_params=None,
            api_base_url=None, client_kwargs=None, server_metadata_url=None,
            oauth1_client_cls=None, oauth2_client_cls=None,
            compliance_fix=None, client_auth_methods=None, **kwargs):

        self.name = name
        self.client_id = client_id
        self.client_secret = client_secret
        self.request_token_url = request_token_url
        self.request_token_params = request_token_params
        self.access_token_url = access_token_url
        self.access_token_params = access_token_params
        self.authorize_url = authorize_url
        self.authorize_params = authorize_params
        self.api_base_url = api_base_url
        self.client_kwargs = client_kwargs or {}

        self.oauth1_client_cls = oauth1_client_cls
        self.oauth2_client_cls = oauth2_client_cls

        self.compliance_fix = compliance_fix
        self.client_auth_methods = client_auth_methods
        self._fetch_token = fetch_token
        self._update_token = update_token

        self._server_metadata_url = server_metadata_url
        self.server_metadata = kwargs

    def _send_token_update(self, token, refresh_token=None, access_token=None):
        raise NotImplementedError()

    def _generate_access_token_params(self, request):
        raise NotImplementedError()

    def _set_session_data(self, request, key, value):
        sess_key = '_{}_authlib_{}_'.format(self.name, key)
        request.session[sess_key] = value

    def _get_session_data(self, request, key):
        sess_key = '_{}_authlib_{}_'.format(self.name, key)
        return request.session.pop(sess_key, None)

    def _get_oauth_client(self, **kwargs):
        client_kwargs = {}
        client_kwargs.update(self.client_kwargs)
        client_kwargs.update(kwargs)
        if self.request_token_url:
            session = self.oauth1_client_cls(
                self.client_id, self.client_secret,
                **client_kwargs
            )
        else:
            if self.authorize_url:
                client_kwargs['authorization_endpoint'] = self.authorize_url
            if self.access_token_url:
                client_kwargs['token_endpoint'] = self.access_token_url
            session = self.oauth2_client_cls(
                client_id=self.client_id,
                client_secret=self.client_secret,
                update_token=self._send_token_update,
                **client_kwargs
            )
            if self.client_auth_methods:
                for f in self.client_auth_methods:
                    session.register_client_auth_method(f)
            # only OAuth2 has compliance_fix currently
            if self.compliance_fix:
                self.compliance_fix(session)

        session.headers['User-Agent'] = self.DEFAULT_USER_AGENT
        return session

    def _retrieve_oauth2_access_token_params(self, request, params):
        request_state = params.pop('state', None)
        state = self._get_session_data(request, 'state')
        if state:
            if state != request_state:
                raise MismatchingStateError()
            params['state'] = state

        code_verifier = self._get_session_data(request, 'code_verifier')
        if code_verifier:
            params['code_verifier'] = code_verifier
        return params

    def retrieve_access_token_params(self, request, request_token=None):
        """Retrieve parameters for fetching access token, those parameters come
        from request and previously saved temporary data in session.
        """
        params = self._generate_access_token_params(request)
        if self.request_token_url:
            if request_token is None:
                request_token = self._get_session_data(request, 'request_token')
            params['request_token'] = request_token
        else:
            params = self._retrieve_oauth2_access_token_params(request, params)

        redirect_uri = self._get_session_data(request, 'redirect_uri')
        if redirect_uri:
            params['redirect_uri'] = redirect_uri

        log.debug('Retrieve temporary data: {!r}'.format(params))
        return params

    def save_authorize_data(self, request, **kwargs):
        """Save temporary data into session for the authorization step. These
        data can be retrieved later when fetching access token.
        """
        log.debug('Saving authorize data: {!r}'.format(kwargs))
        keys = [
            'redirect_uri', 'request_token',
            'state', 'code_verifier', 'nonce'
        ]
        for k in keys:
            if k in kwargs:
                self._set_session_data(request, k, kwargs[k])

    @staticmethod
    def _create_oauth2_authorization_url(client, authorization_endpoint, **kwargs):
        rv = {}
        if client.code_challenge_method:
            code_verifier = generate_token(20)
            kwargs['code_verifier'] = code_verifier
            rv['code_verifier'] = code_verifier

        scope = kwargs.get('scope', client.scope)
        if scope and scope.startswith('openid'):
            # this is an OpenID Connect service
            nonce = generate_token(20)
            kwargs['nonce'] = nonce
            rv['nonce'] = nonce

        url, state = client.create_authorization_url(
            authorization_endpoint, **kwargs)
        rv['url'] = url
        rv['state'] = state
        return rv

    def request(self, method, url, token=None, **kwargs):
        raise NotImplementedError()

    def get(self, url, **kwargs):
        """Invoke GET http request.

        If ``api_base_url`` configured, shortcut is available::

            client.get('users/lepture')
        """
        return self.request('GET', url, **kwargs)

    def post(self, url, **kwargs):
        """Invoke POST http request.

        If ``api_base_url`` configured, shortcut is available::

            client.post('timeline', json={'text': 'Hi'})
        """
        return self.request('POST', url, **kwargs)

    def patch(self, url, **kwargs):
        """Invoke PATCH http request.

        If ``api_base_url`` configured, shortcut is available::

            client.patch('profile', json={'name': 'Hsiaoming Yang'})
        """
        return self.request('PATCH', url, **kwargs)

    def put(self, url, **kwargs):
        """Invoke PUT http request.

        If ``api_base_url`` configured, shortcut is available::

            client.put('profile', json={'name': 'Hsiaoming Yang'})
        """
        return self.request('PUT', url, **kwargs)

    def delete(self, url, **kwargs):
        """Invoke DELETE http request.

        If ``api_base_url`` configured, shortcut is available::

            client.delete('posts/123')
        """
        return self.request('DELETE', url, **kwargs)

    def _fetch_server_metadata(self, url):
        resp = self.request('GET', url, withhold_token=True)
        return resp.json()