Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ls-trace / contrib / vertica / patch.py
Size: Mime:
import importlib

from ddtrace.vendor import wrapt

import ddtrace
from ...constants import ANALYTICS_SAMPLE_RATE_KEY, SPAN_MEASURED_KEY
from ...ext import SpanTypes, db as dbx
from ...ext import net
from ...internal.logger import get_logger
from ...pin import Pin
from ...settings import config
from ...utils.wrappers import unwrap
from .constants import APP


log = get_logger(__name__)

_PATCHED = False


def copy_span_start(instance, span, conf, *args, **kwargs):
    span.resource = args[0]


def execute_span_start(instance, span, conf, *args, **kwargs):
    span.resource = args[0]


def execute_span_end(instance, result, span, conf, *args, **kwargs):
    span.set_metric(dbx.ROWCOUNT, instance.rowcount)


def fetch_span_end(instance, result, span, conf, *args, **kwargs):
    span.set_metric(dbx.ROWCOUNT, instance.rowcount)


def cursor_span_end(instance, cursor, _, conf, *args, **kwargs):
    tags = {}
    tags[net.TARGET_HOST] = instance.options['host']
    tags[net.TARGET_PORT] = instance.options['port']
    if 'user' in instance.options:
        tags[dbx.USER] = instance.options['user']
    if 'database' in instance.options:
        tags[dbx.NAME] = instance.options['database']

    pin = Pin(
        service=config.vertica['service_name'],
        app=APP,
        tags=tags,
        _config=config.vertica['patch']['vertica_python.vertica.cursor.Cursor'],
    )
    pin.onto(cursor)


# tracing configuration
config._add(
    'vertica',
    {
        'service_name': 'vertica',
        'app': 'vertica',
        'patch': {
            'vertica_python.vertica.connection.Connection': {
                'routines': {
                    'cursor': {
                        'trace_enabled': False,
                        'span_end': cursor_span_end,
                    },
                },
            },
            'vertica_python.vertica.cursor.Cursor': {
                'routines': {
                    'execute': {
                        'operation_name': 'vertica.query',
                        'span_type': SpanTypes.SQL,
                        'span_start': execute_span_start,
                        'span_end': execute_span_end,
                        'measured': True,
                    },
                    'copy': {
                        'operation_name': 'vertica.copy',
                        'span_type': SpanTypes.SQL,
                        'span_start': copy_span_start,
                        'measured': False,
                    },
                    'fetchone': {
                        'operation_name': 'vertica.fetchone',
                        'span_type': SpanTypes.SQL,
                        'span_end': fetch_span_end,
                        'measured': False,
                    },
                    'fetchall': {
                        'operation_name': 'vertica.fetchall',
                        'span_type': SpanTypes.SQL,
                        'span_end': fetch_span_end,
                        'measured': False,
                    },
                    'nextset': {
                        'operation_name': 'vertica.nextset',
                        'span_type': SpanTypes.SQL,
                        'span_end': fetch_span_end,
                        'measured': False,
                    },
                },
            },
        },
    },
)


def patch():
    global _PATCHED
    if _PATCHED:
        return

    _install(config.vertica)
    _PATCHED = True


def unpatch():
    global _PATCHED
    if _PATCHED:
        _uninstall(config.vertica)
        _PATCHED = False


def _uninstall(config):
    for patch_class_path in config['patch']:
        patch_mod, _, patch_class = patch_class_path.rpartition('.')
        mod = importlib.import_module(patch_mod)
        cls = getattr(mod, patch_class, None)

        if not cls:
            log.debug(
                """
                Unable to find corresponding class for tracing configuration.
                This version may not be supported.
                """
            )
            continue

        for patch_routine in config['patch'][patch_class_path]['routines']:
            unwrap(cls, patch_routine)


def _find_routine_config(config, instance, routine_name):
    """Attempts to find the config for a routine based on the bases of the
    class of the instance.
    """
    bases = instance.__class__.__mro__
    for base in bases:
        full_name = '{}.{}'.format(base.__module__, base.__name__)
        if full_name not in config['patch']:
            continue

        config_routines = config['patch'][full_name]['routines']

        if routine_name in config_routines:
            return config_routines[routine_name]
    return {}


def _install_init(patch_item, patch_class, patch_mod, config):
    patch_class_routine = '{}.{}'.format(patch_class, '__init__')

    # patch the __init__ of the class with a Pin instance containing the defaults
    @wrapt.patch_function_wrapper(patch_mod, patch_class_routine)
    def init_wrapper(wrapped, instance, args, kwargs):
        r = wrapped(*args, **kwargs)

        # create and attach a pin with the defaults
        Pin(
            service=config['service_name'],
            app=config['app'],
            tags=config.get('tags', {}),
            tracer=config.get('tracer', ddtrace.tracer),
            _config=config['patch'][patch_item],
        ).onto(instance)
        return r


def _install_routine(patch_routine, patch_class, patch_mod, config):
    patch_class_routine = '{}.{}'.format(patch_class, patch_routine)

    @wrapt.patch_function_wrapper(patch_mod, patch_class_routine)
    def wrapper(wrapped, instance, args, kwargs):
        # TODO?: remove Pin dependence
        pin = Pin.get_from(instance)

        if patch_routine in pin._config['routines']:
            conf = pin._config['routines'][patch_routine]
        else:
            conf = _find_routine_config(config, instance, patch_routine)

        enabled = conf.get('trace_enabled', True)

        span = None

        try:
            # shortcut if not enabled
            if not enabled:
                result = wrapped(*args, **kwargs)
                return result

            operation_name = conf['operation_name']
            tracer = pin.tracer
            with tracer.trace(operation_name, service=pin.service, span_type=conf.get('span_type')) as span:
                if conf.get('measured', False):
                    span.set_tag(SPAN_MEASURED_KEY)
                span.set_tags(pin.tags)

                if 'span_start' in conf:
                    conf['span_start'](instance, span, conf, *args, **kwargs)

                # set analytics sample rate
                span.set_tag(
                    ANALYTICS_SAMPLE_RATE_KEY,
                    config.get_analytics_sample_rate()
                )

                result = wrapped(*args, **kwargs)
                return result
        except Exception as err:
            if 'on_error' in conf:
                conf['on_error'](instance, err, span, conf, *args, **kwargs)
            raise
        finally:
            # if an exception is raised result will not exist
            if 'result' not in locals():
                result = None
            if 'span_end' in conf:
                conf['span_end'](instance, result, span, conf, *args, **kwargs)


def _install(config):
    for patch_class_path in config['patch']:
        patch_mod, _, patch_class = patch_class_path.rpartition('.')
        _install_init(patch_class_path, patch_class, patch_mod, config)

        for patch_routine in config['patch'][patch_class_path]['routines']:
            _install_routine(patch_routine, patch_class, patch_mod, config)