Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / managers / boto3 / boto3_inst.py
Size: Mime:
"""
SuperTenant Boto3 instrumentation
- Based on Instana boto3 instrumentation:
    - https://github.com/instana/python-sensor/blob/master/instana/instrumentation/boto3_inst.py
- Main logic:
  - extract relevant info from high-level boto3 method (BaseClient._make_api_call)
  - on the lower level (Endpoint._get_response):
    - retrieve the request data (from the high-level method) and open the span with the relevant info
    - possibly block the request if the action is REJECT
    - close the span with the relevant info request completion data
"""
from __future__ import absolute_import

import threading
from copy import deepcopy

import wrapt

try:
    from typing import Any, Callable, Dict, List
except ImportError:
    pass

from supertenant import consts
from supertenant.supermeter.data import aws_data, http_requests_utils
from supertenant.supermeter.logger import (
    log_instrumentation_failed,
    log_instrumentation_skipped,
    log_instrumentation_success,
    log_integration_module_exception,
)
from supertenant.supermeter.managers import http_manager
from supertenant.supermeter.managers.actions import SyncActions
from supertenant.supermeter.scope_manager import Span

thread_data = threading.local()

try:
    import boto3

    supported_aws_services = ["s3", "sqs"]

    @wrapt.patch_function_wrapper("botocore.client", "BaseClient._make_api_call")
    def st_make_api_call(wrapped, instance, arg_list, kwargs):
        # type: (Callable[..., Any], Any, List[Any], Dict[str, Any]) -> Any
        try:
            service_name = instance._service_model.service_name  # pylint: disable=protected-access

            if is_service_supported(service_name):
                # define arguments
                assert len(arg_list) == 2, "Boto3 instrumentation: _make_api_call has more than 2 arguments"
                operation = arg_list[0]
                payload = deepcopy(arg_list[1])
                host = instance._endpoint.host  # pylint: disable=protected-access

                # build before data object
                before_data = aws_data.create_aws_data_instance(
                    host, service_name, instance.meta.region_name, operation, payload
                )
            else:
                # service is not supported
                before_data = None

            # save the collected data in a thread-local object
            thread_data.before_data = before_data

        except Exception as exc:
            log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "st_make_api_call", exc)

        return wrapped(*arg_list, **kwargs)

    @wrapt.patch_function_wrapper("botocore.endpoint", "Endpoint._get_response")
    def st_get_response(wrapped, instance, arg_list, kwargs):
        # type: (Callable[..., Any], Any, List[Any], Dict[str, Any]) -> Any
        try:
            # get the before data from the thread local variables
            if not hasattr(thread_data, "before_data"):
                log_instrumentation_failed(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "local_thread_data_not_found")
                span_id, act, poll_key = None, None, None
            else:
                before_data = thread_data.before_data
                # when before data is None, it means that the service is not supported
                if before_data is not None:
                    span_id, act, poll_key = http_manager.HTTPManager.open_span(before_data)
                else:
                    span_id, act, poll_key = None, None, None
        except Exception as exc:
            span_id, act, poll_key = None, None, None
            log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "before_request", exc)

        # If we're not tracing, just return
        if span_id is None:
            return wrapped(*arg_list, **kwargs)

        with Span(span_id, before_data) as span:
            try:
                # finish data is simply a pointer to the before data,
                # redefining it here to make it clear that we add data
                # after the request is made
                finish_data = span.finish_data
                assert isinstance(finish_data, aws_data.AwsData)
                action, action_desc = SyncActions.get_action(span_id, act, poll_key)
                if action == consts.ACTION_REJECT:
                    http_requests_utils.reject_http_request(action, action_desc, finish_data)
                else:
                    result = wrapped(*arg_list, **kwargs)
                    (http_response, parsed_response), exception = result  # noqa: F841 (unused variable)
                    if isinstance(parsed_response, dict):
                        http_dict = parsed_response.get("ResponseMetadata")
                        if isinstance(http_dict, dict):
                            status = http_dict.get("HTTPStatusCode")
                            if status is not None:
                                finish_data.set_status(status)
                return result
            except Exception as exc:
                log_integration_module_exception(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "after_request", exc)
                finish_data.mark_error()
                raise

    log_instrumentation_success(consts.INTEGRATION_MODULE_PYTHON_BOTO3, getattr(boto3, "__version__"))

    def is_service_supported(service_name):
        # type: (str) -> bool
        return service_name in supported_aws_services

except ImportError as import_exc:
    log_instrumentation_skipped(consts.INTEGRATION_MODULE_PYTHON_BOTO3, "boto3_import_error", import_exc)