Repository URL to install this package:
|
Version:
0.13 ▾
|
import logging
from authlib.common.security import generate_token
from authlib.consts import default_user_agent
from .errors import (
MismatchingStateError,
)
__all__ = ['BaseApp']
log = logging.getLogger(__name__)
class BaseApp(object):
"""The remote application for OAuth 1 and OAuth 2. It is used together
with OAuth registry.
:param name: The name of the OAuth client, like: github, twitter
:param fetch_token: A function to fetch access token from database
:param update_token: A function to update access token to database
:param client_id: Client key of OAuth 1, or Client ID of OAuth 2
:param client_secret: Client secret of OAuth 2, or Client Secret of OAuth 2
:param request_token_url: Request Token endpoint for OAuth 1
:param request_token_params: Extra parameters for Request Token endpoint
:param access_token_url: Access Token endpoint for OAuth 1 and OAuth 2
:param access_token_params: Extra parameters for Access Token endpoint
:param authorize_url: Endpoint for user authorization of OAuth 1 or OAuth 2
:param authorize_params: Extra parameters for Authorization Endpoint
:param api_base_url: The base API endpoint to make requests simple
:param client_kwargs: Extra keyword arguments for session
:param server_metadata_url: Discover server metadata from this URL
:param kwargs: Extra server metadata
Create an instance of ``RemoteApp``. If ``request_token_url`` is configured,
it would be an OAuth 1 instance, otherwise it is OAuth 2 instance::
oauth1_client = RemoteApp(
client_id='Twitter Consumer Key',
client_secret='Twitter Consumer Secret',
request_token_url='https://api.twitter.com/oauth/request_token',
access_token_url='https://api.twitter.com/oauth/access_token',
authorize_url='https://api.twitter.com/oauth/authenticate',
api_base_url='https://api.twitter.com/1.1/',
)
oauth2_client = RemoteApp(
client_id='GitHub Client ID',
client_secret='GitHub Client Secret',
api_base_url='https://api.github.com/',
access_token_url='https://github.com/login/oauth/access_token',
authorize_url='https://github.com/login/oauth/authorize',
client_kwargs={'scope': 'user:email'},
)
"""
DEFAULT_USER_AGENT = default_user_agent
OAUTH_APP_CONFIG = None
def __init__(
self, name=None, fetch_token=None, update_token=None,
client_id=None, client_secret=None,
request_token_url=None, request_token_params=None,
access_token_url=None, access_token_params=None,
authorize_url=None, authorize_params=None,
api_base_url=None, client_kwargs=None, server_metadata_url=None,
oauth1_client_cls=None, oauth2_client_cls=None,
compliance_fix=None, client_auth_methods=None, **kwargs):
self.name = name
self.client_id = client_id
self.client_secret = client_secret
self.request_token_url = request_token_url
self.request_token_params = request_token_params
self.access_token_url = access_token_url
self.access_token_params = access_token_params
self.authorize_url = authorize_url
self.authorize_params = authorize_params
self.api_base_url = api_base_url
self.client_kwargs = client_kwargs or {}
self.oauth1_client_cls = oauth1_client_cls
self.oauth2_client_cls = oauth2_client_cls
self.compliance_fix = compliance_fix
self.client_auth_methods = client_auth_methods
self._fetch_token = fetch_token
self._update_token = update_token
self._server_metadata_url = server_metadata_url
self.server_metadata = kwargs
def _send_token_update(self, token, refresh_token=None, access_token=None):
raise NotImplementedError()
def _generate_access_token_params(self, request):
raise NotImplementedError()
def _set_session_data(self, request, key, value):
sess_key = '_{}_authlib_{}_'.format(self.name, key)
request.session[sess_key] = value
def _get_session_data(self, request, key):
sess_key = '_{}_authlib_{}_'.format(self.name, key)
return request.session.pop(sess_key, None)
def _get_oauth_client(self, **kwargs):
client_kwargs = {}
client_kwargs.update(self.client_kwargs)
client_kwargs.update(kwargs)
if self.request_token_url:
session = self.oauth1_client_cls(
self.client_id, self.client_secret,
**client_kwargs
)
else:
if self.authorize_url:
client_kwargs['authorization_endpoint'] = self.authorize_url
if self.access_token_url:
client_kwargs['token_endpoint'] = self.access_token_url
session = self.oauth2_client_cls(
client_id=self.client_id,
client_secret=self.client_secret,
update_token=self._send_token_update,
**client_kwargs
)
if self.client_auth_methods:
for f in self.client_auth_methods:
session.register_client_auth_method(f)
# only OAuth2 has compliance_fix currently
if self.compliance_fix:
self.compliance_fix(session)
session.headers['User-Agent'] = self.DEFAULT_USER_AGENT
return session
def _retrieve_oauth2_access_token_params(self, request, params):
request_state = params.pop('state', None)
state = self._get_session_data(request, 'state')
if state:
if state != request_state:
raise MismatchingStateError()
params['state'] = state
code_verifier = self._get_session_data(request, 'code_verifier')
if code_verifier:
params['code_verifier'] = code_verifier
return params
def retrieve_access_token_params(self, request, request_token=None):
"""Retrieve parameters for fetching access token, those parameters come
from request and previously saved temporary data in session.
"""
params = self._generate_access_token_params(request)
if self.request_token_url:
if request_token is None:
request_token = self._get_session_data(request, 'request_token')
params['request_token'] = request_token
else:
params = self._retrieve_oauth2_access_token_params(request, params)
redirect_uri = self._get_session_data(request, 'redirect_uri')
if redirect_uri:
params['redirect_uri'] = redirect_uri
log.debug('Retrieve temporary data: {!r}'.format(params))
return params
def save_authorize_data(self, request, **kwargs):
"""Save temporary data into session for the authorization step. These
data can be retrieved later when fetching access token.
"""
log.debug('Saving authorize data: {!r}'.format(kwargs))
keys = [
'redirect_uri', 'request_token',
'state', 'code_verifier', 'nonce'
]
for k in keys:
if k in kwargs:
self._set_session_data(request, k, kwargs[k])
@staticmethod
def _create_oauth2_authorization_url(client, authorization_endpoint, **kwargs):
rv = {}
if client.code_challenge_method:
code_verifier = generate_token(20)
kwargs['code_verifier'] = code_verifier
rv['code_verifier'] = code_verifier
scope = kwargs.get('scope', client.scope)
if scope and scope.startswith('openid'):
# this is an OpenID Connect service
nonce = generate_token(20)
kwargs['nonce'] = nonce
rv['nonce'] = nonce
url, state = client.create_authorization_url(
authorization_endpoint, **kwargs)
rv['url'] = url
rv['state'] = state
return rv
def request(self, method, url, token=None, **kwargs):
raise NotImplementedError()
def get(self, url, **kwargs):
"""Invoke GET http request.
If ``api_base_url`` configured, shortcut is available::
client.get('users/lepture')
"""
return self.request('GET', url, **kwargs)
def post(self, url, **kwargs):
"""Invoke POST http request.
If ``api_base_url`` configured, shortcut is available::
client.post('timeline', json={'text': 'Hi'})
"""
return self.request('POST', url, **kwargs)
def patch(self, url, **kwargs):
"""Invoke PATCH http request.
If ``api_base_url`` configured, shortcut is available::
client.patch('profile', json={'name': 'Hsiaoming Yang'})
"""
return self.request('PATCH', url, **kwargs)
def put(self, url, **kwargs):
"""Invoke PUT http request.
If ``api_base_url`` configured, shortcut is available::
client.put('profile', json={'name': 'Hsiaoming Yang'})
"""
return self.request('PUT', url, **kwargs)
def delete(self, url, **kwargs):
"""Invoke DELETE http request.
If ``api_base_url`` configured, shortcut is available::
client.delete('posts/123')
"""
return self.request('DELETE', url, **kwargs)
def _fetch_server_metadata(self, url):
resp = self.request('GET', url, withhold_token=True)
return resp.json()