Repository URL to install this package:
|
Version:
0.13 ▾
|
import time
from authlib.consts import default_json_headers
from ..rfc6749 import (
TokenEndpoint,
InvalidRequestError,
UnsupportedTokenTypeError,
)
class IntrospectionEndpoint(TokenEndpoint):
"""Implementation of introspection endpoint which is described in
`RFC7662`_.
.. _RFC7662: https://tools.ietf.org/html/rfc7662
"""
#: Endpoint name to be registered
ENDPOINT_NAME = 'introspection'
def authenticate_endpoint_credential(self, request, client):
"""The protected resource calls the introspection endpoint using an HTTP
``POST`` request with parameters sent as
"application/x-www-form-urlencoded" data. The protected resource sends a
parameter representing the token along with optional parameters
representing additional context that is known by the protected resource
to aid the authorization server in its response.
token
**REQUIRED** The string value of the token. For access tokens, this
is the ``access_token`` value returned from the token endpoint
defined in OAuth 2.0. For refresh tokens, this is the
``refresh_token`` value returned from the token endpoint as defined
in OAuth 2.0.
token_type_hint
**OPTIONAL** A hint about the type of the token submitted for
introspection.
"""
params = request.form
if 'token' not in params:
raise InvalidRequestError()
token_type = params.get('token_type_hint')
if token_type and token_type not in self.SUPPORTED_TOKEN_TYPES:
raise UnsupportedTokenTypeError()
return self.query_token(params['token'], token_type, client)
def create_endpoint_response(self, request):
"""Validate introspection request and create the response.
:returns: (status_code, body, headers)
"""
# The authorization server first validates the client credentials
client = self.authenticate_endpoint_client(request)
# then verifies whether the token was issued to the client making
# the revocation request
credential = self.authenticate_endpoint_credential(request, client)
# the authorization server invalidates the token
body = self.create_introspection_payload(credential)
return 200, body, default_json_headers
def create_introspection_payload(self, token):
# the token is not active, does not exist on this server, or the
# protected resource is not allowed to introspect this particular
# token, then the authorization server MUST return an introspection
# response with the "active" field set to "false"
if not token:
return {'active': False}
expires_at = token.get_expires_at()
if expires_at < time.time() or token.revoked:
return {'active': False}
payload = self.introspect_token(token)
if 'active' not in payload:
payload['active'] = True
return payload
def query_token(self, token, token_type_hint, client):
"""Get the token from database/storage by the given token string.
Developers should implement this method::
def query_token(self, token, token_type_hint, client):
if token_type_hint == 'access_token':
tok = Token.query_by_access_token(token)
elif token_type_hint == 'refresh_token':
tok = Token.query_by_refresh_token(token)
else:
tok = Token.query_by_access_token(token)
if not tok:
tok = Token.query_by_refresh_token(token)
if check_client_permission(client, tok):
return tok
"""
raise NotImplementedError()
def introspect_token(self, token):
"""Read given token and return its introspection metadata as a
dictionary following `Section 2.2`_::
def introspect_token(self, token):
active = is_token_active(token)
return {
'active': active,
'client_id': token.client_id,
'token_type': token.token_type,
'username': get_token_username(token),
'scope': token.get_scope(),
'sub': get_token_user_sub(token),
'aud': token.client_id,
'iss': 'https://server.example.com/',
'exp': token.expires_at,
'iat': token.issued_at,
}
.. _`Section 2.2`: https://tools.ietf.org/html/rfc7662#section-2.2
"""
raise NotImplementedError()