Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / managers / aiohttp / client_py3.py
Size: Mime:
try:
    from typing import Any, Awaitable, Callable, Dict, List
except ImportError:
    pass

from supertenant import consts
from supertenant.supermeter.logger import (
    log_instrumentation_skipped,
    log_instrumentation_success,
    log_integration_module_exception,
)

try:
    import aiohttp  # noqa: F401
    import wrapt

    from supertenant.supermeter.data import http_data
    from supertenant.supermeter.managers import http_manager
    from supertenant.supermeter.scope_manager import Span

    class _WrappedConnectorClass(wrapt.ObjectProxy):
        # TODO: any data to collect here?
        async def connect(self, req, *args, **kwargs):
            # type: (aiohttp.ClientRequest, Any, Any) -> Any
            data = http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)
            data.set_tag("_st.instrumentation_wrapper", "%s.connect" % self.__class__.__name__)
            span_id, _, _ = http_manager.HTTPManager.open_span(data)
            if span_id is not None:
                with Span(span_id, http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)):
                    result = await self.__wrapped__.connect(req, *args, **kwargs)
                    return result
            else:
                return await self.__wrapped__.connect(req, *args, **kwargs)

        async def _create_connection(self, req, *args, **kwargs):
            # type: (aiohttp.ClientRequest, Any, Any) -> Any
            data = http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)
            data.set_tag("_st.instrumentation_wrapper", "%s._create_connection" % self.__class__.__name__)
            span_id, _, _ = http_manager.HTTPManager.open_span(data)
            if span_id is not None:
                with Span(span_id, http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)):
                    result = await self.__wrapped__._create_connection(req, *args, **kwargs)
                    return result
            else:
                return await self.__wrapped__._create_connection(req, *args, **kwargs)

    # @wrapt.patch_function_wrapper("aiohttp.client", "ClientSession.__init__")
    # def init_with_supertenant(wrapped, instance, argv, kwargs):
    #     # type: (Callable[..., None], Any, List[Any], Dict[str, Any]) -> None
    #     # We can and maybe should wrap the connector, so we can trace it (using wrapt.ObjectProxy as in ddtrace)
    #     # We should wrap ClientSession._request like ddtrace do
    #     wrapped(*argv, **kwargs)
    #     instance._connector = _WrappedConnectorClass(instance._connector)

    @wrapt.patch_function_wrapper("aiohttp.client", "ClientSession._request")
    async def request_with_supertenant(wrapped, instance, argv, kwargs):
        # type: (Callable[...,Awaitable[aiohttp.ClientResponse]], Any, List[Any], Dict[str, Any]) -> aiohttp.ClientResponse # noqa: E501
        before_data = http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)
        try:
            method = kwargs.get("method")
            if method is None:
                method = argv[0]
            url = kwargs.get("url")
            if url is None:
                url = argv[1]

            if isinstance(url, str):
                url = aiohttp.client.URL(url)  # type: ignore

            params = kwargs.get("params")
            headers = kwargs.get("headers", {})

            headers = instance._prepare_headers(headers)  # type: ignore
            if instance._request_class.update_headers is not None:
                req_class = instance._request_class(method, url)  # type: ignore
                if req_class.update_headers is not None:
                    req_class.update_headers(headers)
                if req_class.update_auto_headers is not None:
                    req_class.update_auto_headers(set(instance._skip_auto_headers))  # type: ignore
                if req_class.headers is not None:
                    headers = req_class.headers

            # headers can be a multidict, so we may be overriding the same header
            for k, v in headers.items():
                if k.lower().replace("_", "-") == "user-agent":
                    before_data.set_user_agent(v)
                before_data.add_header(str(k), str(v))

            host_str = str(url.host)
            before_data.set_host(host_str)
            before_data.set_integration_module_resource_id(host_str)
            before_data.set_path(url.path)
            if params:
                qs = url.update_query(params).query_string
                before_data.set_params(qs)
            before_data.set_method(method)
        except Exception as e:
            log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "request", e)

        span_id, _, _ = http_manager.HTTPManager.open_span(before_data)
        if span_id is not None:
            headers_override = http_manager.HTTPManager.get_headers_overrides(span_id)
            for k, v in headers_override.items():
                headers[k] = v
            if len(headers_override) > 0 and "headers" not in kwargs:
                kwargs["headers"] = headers

            with Span(span_id, http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)) as span:
                after_data = span.finish_data
                try:
                    resp = await wrapped(*argv, **kwargs)
                except Exception:
                    span.finish_data.mark_error()
                    raise
                if isinstance(after_data, http_data.HTTPClientData):
                    after_data.set_status(resp.status)
        else:
            resp = await wrapped(*argv, **kwargs)
        return resp

    log_instrumentation_success(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, getattr(aiohttp, "__version__"))
except ImportError as exc:
    log_instrumentation_skipped(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "", {"exc": exc})