Repository URL to install this package:
|
Version:
0.8.1 ▾
|
try:
from typing import Any, Awaitable, Callable, Dict, List
except ImportError:
pass
from supertenant import consts
from supertenant.supermeter.logger import (
log_instrumentation_skipped,
log_instrumentation_success,
log_integration_module_exception,
)
try:
import aiohttp # noqa: F401
import wrapt
from aiohttp.web import HTTPServiceUnavailable, HTTPTooManyRequests, middleware
from supertenant.supermeter.data.http_data import HTTPServerData
from supertenant.supermeter.data.http_requests_utils import reject_http_request
from supertenant.supermeter.managers import http_manager
from supertenant.supermeter.managers.actions.async_actions_manager_py3 import AsyncActions
from supertenant.supermeter.scope_manager import Span
@middleware
async def st_middleware(request, handler):
# type: (aiohttp.web.BaseRequest, Callable[[aiohttp.web.BaseRequest], Awaitable[Any]]) -> Any
before_data = HTTPServerData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)
try:
before_data.set_method(request.method)
before_data.set_path(request.path)
if request.query_string:
before_data.set_params(request.query_string)
if request.host:
host = request.host.split(":", 1)[0]
before_data.set_integration_module_resource_id(host)
before_data.set_host(host)
headers = request.headers or {} # type: ignore
# headers can be a multidict, so we may be overriding the same header
for k, v in headers.items():
if k.lower().replace("_", "-") == "user-agent":
before_data.set_user_agent(v)
before_data.add_header(str(k), str(v))
except Exception as e:
log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "st_middleware", e)
return await handler(request)
span_id, act, poll_key = http_manager.HTTPManager.open_span(before_data)
if span_id is not None:
with Span(span_id, HTTPServerData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)) as span:
if AsyncActions is not None:
action, action_desc = await AsyncActions.get_action(span_id, act, poll_key)
if action == consts.ACTION_REJECT:
status = reject_http_request(action, action_desc, span.finish_data) # type: ignore
if status == 503:
raise HTTPServiceUnavailable()
else:
# if it's not 503 it needs to be 429 otherwise we don't know what to do w/ it.
# we can theoretically map all exceptions to their appropriate status code and instantiate
# the right exception, but it's not worth the effort for now.
raise HTTPTooManyRequests()
after_data = span.finish_data
resp = None
try:
resp = await handler(request)
except aiohttp.web.HTTPException as exc:
resp = exc
if resp is not None:
after_data.set_status(resp.status) # type: ignore
return resp
else:
return await handler(request)
@wrapt.patch_function_wrapper("aiohttp.web", "Application.__init__")
def init_with_supertenant(wrapped, instance, argv, kwargs):
# type: (Callable[..., None], Any, List[Any], Dict[str, Any]) -> Any
if "middlewares" in kwargs:
kwargs["middlewares"].insert(0, st_middleware)
else:
kwargs["middlewares"] = [st_middleware]
return wrapped(*argv, **kwargs)
log_instrumentation_success(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, getattr(aiohttp, "__version__"))
except ImportError as exc:
log_instrumentation_skipped(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "", {"exc": exc})