Repository URL to install this package:
|
Version:
0.8.1 ▾
|
try:
from typing import Any, Awaitable, Callable, Dict, List
except ImportError:
pass
from supertenant import consts
from supertenant.supermeter.logger import (
log_instrumentation_skipped,
log_instrumentation_success,
log_integration_module_exception,
)
try:
import aiohttp # noqa: F401
import wrapt
from supertenant.supermeter.data import http_data
from supertenant.supermeter.managers import http_manager
from supertenant.supermeter.scope_manager import Span
class _WrappedConnectorClass(wrapt.ObjectProxy):
# TODO: any data to collect here?
async def connect(self, req, *args, **kwargs):
# type: (aiohttp.ClientRequest, Any, Any) -> Any
data = http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)
data.set_tag("_st.instrumentation_wrapper", "%s.connect" % self.__class__.__name__)
span_id, _, _ = http_manager.HTTPManager.open_span(data)
if span_id is not None:
with Span(span_id, http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)):
result = await self.__wrapped__.connect(req, *args, **kwargs)
return result
else:
return await self.__wrapped__.connect(req, *args, **kwargs)
async def _create_connection(self, req, *args, **kwargs):
# type: (aiohttp.ClientRequest, Any, Any) -> Any
data = http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)
data.set_tag("_st.instrumentation_wrapper", "%s._create_connection" % self.__class__.__name__)
span_id, _, _ = http_manager.HTTPManager.open_span(data)
if span_id is not None:
with Span(span_id, http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)):
result = await self.__wrapped__._create_connection(req, *args, **kwargs)
return result
else:
return await self.__wrapped__._create_connection(req, *args, **kwargs)
# @wrapt.patch_function_wrapper("aiohttp.client", "ClientSession.__init__")
# def init_with_supertenant(wrapped, instance, argv, kwargs):
# # type: (Callable[..., None], Any, List[Any], Dict[str, Any]) -> None
# # We can and maybe should wrap the connector, so we can trace it (using wrapt.ObjectProxy as in ddtrace)
# # We should wrap ClientSession._request like ddtrace do
# wrapped(*argv, **kwargs)
# instance._connector = _WrappedConnectorClass(instance._connector)
@wrapt.patch_function_wrapper("aiohttp.client", "ClientSession._request")
async def request_with_supertenant(wrapped, instance, argv, kwargs):
# type: (Callable[...,Awaitable[aiohttp.ClientResponse]], Any, List[Any], Dict[str, Any]) -> aiohttp.ClientResponse # noqa: E501
before_data = http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)
try:
method = kwargs.get("method")
if method is None:
method = argv[0]
url = kwargs.get("url")
if url is None:
url = argv[1]
if isinstance(url, str):
url = aiohttp.client.URL(url) # type: ignore
params = kwargs.get("params")
headers = kwargs.get("headers", {})
headers = instance._prepare_headers(headers) # type: ignore
if instance._request_class.update_headers is not None:
req_class = instance._request_class(method, url) # type: ignore
if req_class.update_headers is not None:
req_class.update_headers(headers)
if req_class.update_auto_headers is not None:
req_class.update_auto_headers(set(instance._skip_auto_headers)) # type: ignore
if req_class.headers is not None:
headers = req_class.headers
# headers can be a multidict, so we may be overriding the same header
for k, v in headers.items():
if k.lower().replace("_", "-") == "user-agent":
before_data.set_user_agent(v)
before_data.add_header(str(k), str(v))
host_str = str(url.host)
before_data.set_host(host_str)
before_data.set_integration_module_resource_id(host_str)
before_data.set_path(url.path)
if params:
qs = url.update_query(params).query_string
before_data.set_params(qs)
before_data.set_method(method)
except Exception as e:
log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "request", e)
span_id, _, _ = http_manager.HTTPManager.open_span(before_data)
if span_id is not None:
headers_override = http_manager.HTTPManager.get_headers_overrides(span_id)
for k, v in headers_override.items():
headers[k] = v
if len(headers_override) > 0 and "headers" not in kwargs:
kwargs["headers"] = headers
with Span(span_id, http_data.HTTPClientData(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP)) as span:
after_data = span.finish_data
try:
resp = await wrapped(*argv, **kwargs)
except Exception:
span.finish_data.mark_error()
raise
if isinstance(after_data, http_data.HTTPClientData):
after_data.set_status(resp.status)
else:
resp = await wrapped(*argv, **kwargs)
return resp
log_instrumentation_success(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, getattr(aiohttp, "__version__"))
except ImportError as exc:
log_instrumentation_skipped(consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "", {"exc": exc})