Repository URL to install this package:
Version:
0.7.0 ▾
|
aiohttp-cors
/
preflight_handler.py
|
---|
from aiohttp import hdrs, web
# Positive response to Access-Control-Allow-Credentials
_TRUE = "true"
class _PreflightHandler:
@staticmethod
def _parse_request_method(request: web.Request):
"""Parse Access-Control-Request-Method header of the preflight request
"""
method = request.headers.get(hdrs.ACCESS_CONTROL_REQUEST_METHOD)
if method is None:
raise web.HTTPForbidden(
text="CORS preflight request failed: "
"'Access-Control-Request-Method' header is not specified")
# FIXME: validate method string (ABNF: method = token), if parsing
# fails, raise HTTPForbidden.
return method
@staticmethod
def _parse_request_headers(request: web.Request):
"""Parse Access-Control-Request-Headers header or the preflight request
Returns set of headers in upper case.
"""
headers = request.headers.get(hdrs.ACCESS_CONTROL_REQUEST_HEADERS)
if headers is None:
return frozenset()
# FIXME: validate each header string, if parsing fails, raise
# HTTPForbidden.
# FIXME: check, that headers split and stripped correctly (according
# to ABNF).
headers = (h.strip(" \t").upper() for h in headers.split(","))
# pylint: disable=bad-builtin
return frozenset(filter(None, headers))
async def _get_config(self, request, origin, request_method):
raise NotImplementedError()
async def _preflight_handler(self, request: web.Request):
"""CORS preflight request handler"""
# Handle according to part 6.2 of the CORS specification.
origin = request.headers.get(hdrs.ORIGIN)
if origin is None:
# Terminate CORS according to CORS 6.2.1.
raise web.HTTPForbidden(
text="CORS preflight request failed: "
"origin header is not specified in the request")
# CORS 6.2.3. Doing it out of order is not an error.
request_method = self._parse_request_method(request)
# CORS 6.2.5. Doing it out of order is not an error.
try:
config = \
await self._get_config(request, origin, request_method)
except KeyError:
raise web.HTTPForbidden(
text="CORS preflight request failed: "
"request method {!r} is not allowed "
"for {!r} origin".format(request_method, origin))
if not config:
# No allowed origins for the route.
# Terminate CORS according to CORS 6.2.1.
raise web.HTTPForbidden(
text="CORS preflight request failed: "
"no origins are allowed")
options = config.get(origin, config.get("*"))
if options is None:
# No configuration for the origin - deny.
# Terminate CORS according to CORS 6.2.2.
raise web.HTTPForbidden(
text="CORS preflight request failed: "
"origin '{}' is not allowed".format(origin))
# CORS 6.2.4
request_headers = self._parse_request_headers(request)
# CORS 6.2.6
if options.allow_headers == "*":
pass
else:
disallowed_headers = request_headers - options.allow_headers
if disallowed_headers:
raise web.HTTPForbidden(
text="CORS preflight request failed: "
"headers are not allowed: {}".format(
", ".join(disallowed_headers)))
# Ok, CORS actual request with specified in the preflight request
# parameters is allowed.
# Set appropriate headers and return 200 response.
response = web.Response()
# CORS 6.2.7
response.headers[hdrs.ACCESS_CONTROL_ALLOW_ORIGIN] = origin
if options.allow_credentials:
# Set allowed credentials.
response.headers[hdrs.ACCESS_CONTROL_ALLOW_CREDENTIALS] = _TRUE
# CORS 6.2.8
if options.max_age is not None:
response.headers[hdrs.ACCESS_CONTROL_MAX_AGE] = \
str(options.max_age)
# CORS 6.2.9
# TODO: more optimal for client preflight request cache would be to
# respond with ALL allowed methods.
response.headers[hdrs.ACCESS_CONTROL_ALLOW_METHODS] = request_method
# CORS 6.2.10
if request_headers:
# Note: case of the headers in the request is changed, but this
# shouldn't be a problem, since the headers should be compared in
# the case-insensitive way.
response.headers[hdrs.ACCESS_CONTROL_ALLOW_HEADERS] = \
",".join(request_headers)
return response