Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / managers / aiohttp / server_py3.py
Size: Mime:
try:
    from typing import Any, Awaitable, Callable, Dict, List
except ImportError:
    pass

from supertenant import consts
from supertenant.supermeter.logger import (
    log_instrumentation_skipped,
    log_instrumentation_success,
    log_integration_module_exception,
)

try:
    import aiohttp  # noqa: F401
    import wrapt
    from aiohttp.web import HTTPServiceUnavailable, HTTPTooManyRequests, middleware

    from supertenant.supermeter.data.http_data import HTTPServerData
    from supertenant.supermeter.data.http_requests_utils import reject_http_request
    from supertenant.supermeter.managers import http_manager
    from supertenant.supermeter.managers.actions.async_actions_manager_py3 import AsyncActions
    from supertenant.supermeter.scope_manager import Span

    @middleware
    async def st_middleware(request, handler):
        # type: (aiohttp.web.BaseRequest, Callable[[aiohttp.web.BaseRequest], Awaitable[Any]]) -> Any
        before_data = HTTPServerData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)
        try:
            before_data.set_method(request.method)
            before_data.set_path(request.path)
            if request.query_string:
                before_data.set_params(request.query_string)
            if request.host:
                host = request.host.split(":", 1)[0]
                before_data.set_integration_module_resource_id(host)
                before_data.set_host(host)
            headers = request.headers or {}  # type: ignore
            # headers can be a multidict, so we may be overriding the same header
            for k, v in headers.items():
                if k.lower().replace("_", "-") == "user-agent":
                    before_data.set_user_agent(v)
                before_data.add_header(str(k), str(v))
        except Exception as e:
            log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "st_middleware", e)
            return await handler(request)

        span_id, act, poll_key = http_manager.HTTPManager.open_span(before_data)
        if span_id is not None:
            with Span(span_id, HTTPServerData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)) as span:
                if AsyncActions is not None:
                    action, action_desc = await AsyncActions.get_action(span_id, act, poll_key)
                    if action == consts.ACTION_REJECT:
                        status = reject_http_request(action, action_desc, span.finish_data)  # type: ignore
                        if status == 503:
                            raise HTTPServiceUnavailable()
                        else:
                            # if it's not 503 it needs to be 429 otherwise we don't know what to do w/ it.
                            # we can theoretically map all exceptions to their appropriate status code and instantiate
                            # the right exception, but it's not worth the effort for now.
                            raise HTTPTooManyRequests()

                after_data = span.finish_data
                resp = None
                try:
                    resp = await handler(request)
                except aiohttp.web.HTTPException as exc:
                    resp = exc
                if resp is not None:
                    after_data.set_status(resp.status)  # type: ignore
                return resp
        else:
            return await handler(request)

    @wrapt.patch_function_wrapper("aiohttp.web", "Application.__init__")
    def init_with_supertenant(wrapped, instance, argv, kwargs):
        # type: (Callable[..., None], Any, List[Any], Dict[str, Any]) -> Any
        if "middlewares" in kwargs:
            kwargs["middlewares"].insert(0, st_middleware)
        else:
            kwargs["middlewares"] = [st_middleware]
        return wrapped(*argv, **kwargs)

    log_instrumentation_success(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, getattr(aiohttp, "__version__"))
except ImportError as exc:
    log_instrumentation_skipped(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "", {"exc": exc})