Repository URL to install this package:
|
Version:
0.8.1 ▾
|
"""
SuperTenant Boto3 instrumentation
- Based on Instana boto3 instrumentation:
- https://github.com/instana/python-sensor/blob/master/instana/instrumentation/boto3_inst.py
- Main logic:
- extract relevant info from high-level boto3 method (BaseClient._make_api_call)
- on the lower level (Endpoint._get_response):
- retrieve the request data (from the high-level method) and open the span with the relevant info
- possibly block the request if the action is REJECT
- close the span with the relevant info request completion data
"""
from __future__ import absolute_import
import threading
from copy import deepcopy
import wrapt
try:
from typing import Any, Callable, Dict, List
except ImportError:
pass
from supertenant import consts
from supertenant.supermeter.data import aws_data, http_requests_utils
from supertenant.supermeter.logger import (
log_instrumentation_failed,
log_instrumentation_skipped,
log_instrumentation_success,
log_integration_module_exception,
)
from supertenant.supermeter.managers import http_manager
from supertenant.supermeter.managers.actions import SyncActions
from supertenant.supermeter.scope_manager import Span
thread_data = threading.local()
try:
import boto3
supported_aws_services = ["s3", "sqs"]
@wrapt.patch_function_wrapper("botocore.client", "BaseClient._make_api_call")
def st_make_api_call(wrapped, instance, arg_list, kwargs):
# type: (Callable[..., Any], Any, List[Any], Dict[str, Any]) -> Any
try:
service_name = instance._service_model.service_name # pylint: disable=protected-access
if is_service_supported(service_name):
# define arguments
assert len(arg_list) == 2, "Boto3 instrumentation: _make_api_call has more than 2 arguments"
operation = arg_list[0]
payload = deepcopy(arg_list[1])
host = instance._endpoint.host # pylint: disable=protected-access
# build before data object
before_data = aws_data.create_aws_data_instance(
host, service_name, instance.meta.region_name, operation, payload
)
else:
# service is not supported
before_data = None
# save the collected data in a thread-local object
thread_data.before_data = before_data
except Exception as exc:
log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "st_make_api_call", exc)
return wrapped(*arg_list, **kwargs)
@wrapt.patch_function_wrapper("botocore.endpoint", "Endpoint._get_response")
def st_get_response(wrapped, instance, arg_list, kwargs):
# type: (Callable[..., Any], Any, List[Any], Dict[str, Any]) -> Any
try:
# get the before data from the thread local variables
if not hasattr(thread_data, "before_data"):
log_instrumentation_failed(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "local_thread_data_not_found")
span_id, act, poll_key = None, None, None
else:
before_data = thread_data.before_data
# when before data is None, it means that the service is not supported
if before_data is not None:
span_id, act, poll_key = http_manager.HTTPManager.open_span(before_data)
else:
span_id, act, poll_key = None, None, None
except Exception as exc:
span_id, act, poll_key = None, None, None
log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "before_request", exc)
# If we're not tracing, just return
if span_id is None:
return wrapped(*arg_list, **kwargs)
with Span(span_id, before_data) as span:
try:
# finish data is simply a pointer to the before data,
# redefining it here to make it clear that we add data
# after the request is made
finish_data = span.finish_data
assert isinstance(finish_data, aws_data.AwsData)
action, action_desc = SyncActions.get_action(span_id, act, poll_key)
if action == consts.ACTION_REJECT:
http_requests_utils.reject_http_request(action, action_desc, finish_data)
else:
result = wrapped(*arg_list, **kwargs)
(http_response, parsed_response), exception = result # noqa: F841 (unused variable)
if isinstance(parsed_response, dict):
http_dict = parsed_response.get("ResponseMetadata")
if isinstance(http_dict, dict):
status = http_dict.get("HTTPStatusCode")
if status is not None:
finish_data.set_status(status)
return result
except Exception as exc:
log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "after_request", exc)
finish_data.mark_error()
raise
log_instrumentation_success(consts.INTEGRATION_MODULE_PYTHON_BOTO3, getattr(boto3, "__version__"))
def is_service_supported(service_name):
# type: (str) -> bool
return service_name in supported_aws_services
except ImportError as import_exc:
log_instrumentation_skipped(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "boto3_import_error", import_exc)