Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ls-trace / tracer.py
Size: Mime:
import functools
import logging
import re
from os import environ, getpid

from ddtrace.vendor import debtcollector

from .constants import FILTERS_KEY, SAMPLE_RATE_METRIC_KEY
from .ext import system
from .ext.priority import AUTO_REJECT, AUTO_KEEP
from .filters import FilterRequestsOnUrl
from .internal.logger import get_logger
from .internal.runtime import RuntimeTags, RuntimeWorker
from .internal.writer import AgentWriter
from .propagation.http import set_http_propagator_factory
from .provider import DefaultContextProvider
from .context import Context
from .sampler import DatadogSampler, RateSampler, RateByServiceSampler
from .span import Span
from .utils.formats import get_env
from .utils.deprecation import deprecated, RemovedInDDTrace10Warning
from .vendor.dogstatsd import DogStatsd
from .vendor.lightstep import DEFAULT_METRICS_URL, MetricsReporter
from .vendor.lightstep.constants import ACCESS_TOKEN
from . import compat


log = get_logger(__name__)


def _parse_dogstatsd_url(url):
    if url is None:
        return

    # url can be either of the form `udp://<host>:<port>` or `unix://<path>`
    # also support without url scheme included
    if url.startswith('/'):
        url = 'unix://' + url
    elif '://' not in url:
        url = 'udp://' + url

    parsed = compat.parse.urlparse(url)

    if parsed.scheme == 'unix':
        return dict(socket_path=parsed.path)
    elif parsed.scheme == 'udp':
        return dict(host=parsed.hostname, port=parsed.port)
    else:
        raise ValueError('Unknown scheme `%s` for DogStatsD URL `{}`'.format(parsed.scheme))


_INTERNAL_APPLICATION_SPAN_TYPES = [
    "custom",
    "template",
    "web",
    "worker"
]


class Tracer(object):
    """
    Tracer is used to create, sample and submit spans that measure the
    execution time of sections of code.

    If you're running an application that will serve a single trace per thread,
    you can use the global tracer instance::

        from ddtrace import tracer
        trace = tracer.trace('app.request', 'web-server').finish()
    """
    _RUNTIME_METRICS_INTERVAL = 10

    DEFAULT_HOSTNAME = environ.get('DD_AGENT_HOST', environ.get('DATADOG_TRACE_AGENT_HOSTNAME', 'localhost'))
    DEFAULT_PORT = int(environ.get('DD_TRACE_AGENT_PORT', 8126))
    DEFAULT_DOGSTATSD_PORT = int(get_env('dogstatsd', 'port', default=8125))
    DEFAULT_DOGSTATSD_URL = get_env('dogstatsd', 'url',
                                    default='udp://{}:{}'.format(DEFAULT_HOSTNAME, DEFAULT_DOGSTATSD_PORT))
    DEFAULT_AGENT_URL = environ.get('DD_TRACE_AGENT_URL', 'http://%s:%d' % (DEFAULT_HOSTNAME, DEFAULT_PORT))

    def __init__(self, url=DEFAULT_AGENT_URL, dogstatsd_url=DEFAULT_DOGSTATSD_URL):
        """
        Create a new ``Tracer`` instance. A global tracer is already initialized
        for common usage, so there is no need to initialize your own ``Tracer``.

        :param url: The Datadog agent URL.
        :param url: The DogStatsD URL.
        """
        self.log = log
        self.sampler = None
        self.priority_sampler = None
        self._runtime_worker = None

        uds_path = None
        https = None
        hostname = self.DEFAULT_HOSTNAME
        port = self.DEFAULT_PORT
        if url is not None:
            url_parsed = compat.parse.urlparse(url)
            if url_parsed.scheme in ('http', 'https'):
                hostname = url_parsed.hostname
                port = url_parsed.port
                https = url_parsed.scheme == 'https'
                # FIXME This is needed because of the way of configure() works right now, where it considers `port=None`
                # to be "no port set so let's use the default".
                # It should go away when we remove configure()
                if port is None:
                    if https:
                        port = 443
                    else:
                        port = 80
            elif url_parsed.scheme == 'unix':
                uds_path = url_parsed.path
            else:
                raise ValueError('Unknown scheme `%s` for agent URL' % url_parsed.scheme)

        # Apply the default configuration
        self.configure(
            enabled=True,
            hostname=hostname,
            port=port,
            https=https,
            uds_path=uds_path,
            sampler=DatadogSampler(),
            context_provider=DefaultContextProvider(),
            dogstatsd_url=dogstatsd_url,
        )

        # globally set tags
        self.tags = {}

        # a buffer for service info so we don't perpetually send the same things
        self._services = set()

        # Runtime id used for associating data collected during runtime to
        # traces
        self._pid = getpid()

    @property
    def debug_logging(self):
        return self.log.isEnabledFor(logging.DEBUG)

    @debug_logging.setter
    @deprecated(message='Use logging.setLevel instead', version='1.0.0')
    def debug_logging(self, value):
        self.log.setLevel(logging.DEBUG if value else logging.WARN)

    @deprecated('Use .tracer, not .tracer()', '1.0.0')
    def __call__(self):
        return self

    def global_excepthook(self, tp, value, traceback):
        """The global tracer except hook."""
        self._dogstatsd_client.increment('datadog.tracer.uncaught_exceptions', 1,
                                         tags=['class:%s' % tp.__name__])

    def get_call_context(self, *args, **kwargs):
        """
        Return the current active ``Context`` for this traced execution. This method is
        automatically called in the ``tracer.trace()``, but it can be used in the application
        code during manual instrumentation like::

            from ddtrace import tracer

            async def web_handler(request):
                context = tracer.get_call_context()
                # use the context if needed
                # ...

        This method makes use of a ``ContextProvider`` that is automatically set during the tracer
        initialization, or while using a library instrumentation.
        """
        return self._context_provider.active(*args, **kwargs)

    @property
    def context_provider(self):
        """Returns the current Tracer Context Provider"""
        return self._context_provider

    # TODO: deprecate this method and make sure users create a new tracer if they need different parameters
    @debtcollector.removals.removed_kwarg("dogstatsd_host", "Use `dogstatsd_url` instead",
                                          category=RemovedInDDTrace10Warning)
    @debtcollector.removals.removed_kwarg("dogstatsd_port", "Use `dogstatsd_url` instead",
                                          category=RemovedInDDTrace10Warning)
    def configure(self, enabled=None, hostname=None, port=None, uds_path=None, https=None,
                  sampler=None, context_provider=None, wrap_executor=None, priority_sampling=None,
                  settings=None, collect_metrics=None, dogstatsd_host=None, dogstatsd_port=None,
                  dogstatsd_url=None, http_propagator=None):
        """
        Configure an existing Tracer the easy way.
        Allow to configure or reconfigure a Tracer instance.

        :param bool enabled: If True, finished traces will be submitted to the API.
            Otherwise they'll be dropped.
        :param str hostname: Hostname running the Trace Agent
        :param int port: Port of the Trace Agent
        :param str uds_path: The Unix Domain Socket path of the agent.
        :param bool https: Whether to use HTTPS or HTTP.
        :param object sampler: A custom Sampler instance, locally deciding to totally drop the trace or not.
        :param object context_provider: The ``ContextProvider`` that will be used to retrieve
            automatically the current call context. This is an advanced option that usually
            doesn't need to be changed from the default value
        :param object wrap_executor: callable that is used when a function is decorated with
            ``Tracer.wrap()``. This is an advanced option that usually doesn't need to be changed
            from the default value
        :param priority_sampling: enable priority sampling, this is required for
            complete distributed tracing support. Enabled by default.
        :param collect_metrics: Whether to enable runtime metrics collection.
        :param str dogstatsd_host: Host for UDP connection to DogStatsD (deprecated: use dogstatsd_url)
        :param int dogstatsd_port: Port for UDP connection to DogStatsD (deprecated: use dogstatsd_url)
        :param str dogstatsd_url: URL for UDP or Unix socket connection to DogStatsD
        :param class http_propagator: type of propagator to be used to distribute the tracing context.
        """
        if enabled is not None:
            self.enabled = enabled

        filters = None
        if settings is not None:
            filters = settings.get(FILTERS_KEY)

        # filter out the metrics URL
        if collect_metrics:
            if filters is None:
                filters = []
            filters.append(FilterRequestsOnUrl(re.escape(DEFAULT_METRICS_URL)))

        # If priority sampling is not set or is True and no priority sampler is set yet
        if priority_sampling in (None, True) and not self.priority_sampler:
            self.priority_sampler = RateByServiceSampler()
        # Explicitly disable priority sampling
        elif priority_sampling is False:
            self.priority_sampler = None

        if sampler is not None:
            self.sampler = sampler

        if dogstatsd_host is not None and dogstatsd_url is None:
            dogstatsd_url = 'udp://{}:{}'.format(dogstatsd_host, dogstatsd_port or self.DEFAULT_DOGSTATSD_PORT)

        if dogstatsd_url is not None:
            dogstatsd_kwargs = _parse_dogstatsd_url(dogstatsd_url)
            self.log.debug('Connecting to DogStatsd(%s)', dogstatsd_url)
            self._dogstatsd_client = DogStatsd(**dogstatsd_kwargs)

        if hostname is not None or port is not None or uds_path is not None or https is not None or \
                filters is not None or priority_sampling is not None or sampler is not None:
            # Preserve hostname and port when overriding filters or priority sampling
            # This is clumsy and a good reason to get rid of this configure() API
            if hasattr(self, 'writer') and hasattr(self.writer, 'api'):
                default_hostname = self.writer.api.hostname
                default_port = self.writer.api.port
                if https is None:
                    https = self.writer.api.https
            else:
                default_hostname = self.DEFAULT_HOSTNAME
                default_port = self.DEFAULT_PORT

            self.writer = AgentWriter(
                hostname or default_hostname,
                port or default_port,
                uds_path=uds_path,
                https=https,
                filters=filters,
                sampler=self.sampler,
                priority_sampler=self.priority_sampler,
                dogstatsd=self._dogstatsd_client,
            )

        # HACK: since we recreated our dogstatsd agent, replace the old write one
        self.writer.dogstatsd = self._dogstatsd_client

        if context_provider is not None:
            self._context_provider = context_provider

        if wrap_executor is not None:
            self._wrap_executor = wrap_executor

        # Since we've recreated our dogstatsd agent, we need to restart metric collection with that new agent
        if self._runtime_worker:
            runtime_metrics_was_running = True
            self._runtime_worker.stop()
            self._runtime_worker.join()
            self._runtime_worker = None
        else:
            runtime_metrics_was_running = False

        if (collect_metrics is None and runtime_metrics_was_running) or collect_metrics:
            self._start_runtime_worker()

        if http_propagator is not None:
            set_http_propagator_factory(http_propagator)

    def start_span(self, name, child_of=None, service=None, resource=None, span_type=None):
        """
        Return a span that will trace an operation called `name`. This method allows
        parenting using the ``child_of`` kwarg. If it's missing, the newly created span is a
        root span.

        :param str name: the name of the operation being traced.
        :param object child_of: a ``Span`` or a ``Context`` instance representing the parent for this span.
        :param str service: the name of the service being traced.
        :param str resource: an optional name of the resource being tracked.
        :param str span_type: an optional operation type.

        To start a new root span, simply::

            span = tracer.start_span('web.request')

        If you want to create a child for a root span, just::

            root_span = tracer.start_span('web.request')
            span = tracer.start_span('web.decoder', child_of=root_span)

        Or if you have a ``Context`` object::

            context = tracer.get_call_context()
            span = tracer.start_span('web.worker', child_of=context)
        """
        if child_of is not None:
            # retrieve if the span is a child_of a Span or a of Context
            child_of_context = isinstance(child_of, Context)
            context = child_of if child_of_context else child_of.context
            parent = child_of.get_current_span() if child_of_context else child_of
        else:
            context = Context()
            parent = None

        if parent:
            trace_id = parent.trace_id
            parent_span_id = parent.span_id
        else:
            trace_id = context.trace_id
            parent_span_id = context.span_id

        if trace_id:
            # child_of a non-empty context, so either a local child span or from a remote context

            # when not provided, inherit from parent's service
            if parent:
                service = service or parent.service

            span = Span(
                self,
                name,
                trace_id=trace_id,
                parent_id=parent_span_id,
                service=service,
                resource=resource,
                span_type=span_type,
            )

            # Extra attributes when from a local parent
            if parent:
                span.sampled = parent.sampled
                span._parent = parent

        else:
            # this is the root span of a new trace
            span = Span(
                self,
                name,
                service=service,
                resource=resource,
                span_type=span_type,
            )

            span.sampled = self.sampler.sample(span)
            # Old behavior
            # DEV: The new sampler sets metrics and priority sampling on the span for us
            if not isinstance(self.sampler, DatadogSampler):
                if span.sampled:
                    # When doing client sampling in the client, keep the sample rate so that we can
                    # scale up statistics in the next steps of the pipeline.
                    if isinstance(self.sampler, RateSampler):
                        span.set_metric(SAMPLE_RATE_METRIC_KEY, self.sampler.sample_rate)

                    if self.priority_sampler:
                        # At this stage, it's important to have the service set. If unset,
                        # priority sampler will use the default sampling rate, which might
                        # lead to oversampling (that is, dropping too many traces).
                        if self.priority_sampler.sample(span):
                            context.sampling_priority = AUTO_KEEP
                        else:
                            context.sampling_priority = AUTO_REJECT
                else:
                    if self.priority_sampler:
                        # If dropped by the local sampler, distributed instrumentation can drop it too.
                        context.sampling_priority = AUTO_REJECT
            else:
                context.sampling_priority = AUTO_KEEP if span.sampled else AUTO_REJECT
                # We must always mark the span as sampled so it is forwarded to the agent
                span.sampled = True

            # add tags to root span to correlate trace with runtime metrics
            # only applied to spans with types that are internal to applications
            if self._runtime_worker and span.span_type in _INTERNAL_APPLICATION_SPAN_TYPES:
                span.set_tag('language', 'python')

        # add common tags
        if self.tags:
            span.set_tags(self.tags)
        if not span._parent:
            span.set_tag(system.PID, getpid())

        # add it to the current context
        context.add_span(span)

        # check for new process if runtime metrics worker has already been started
        self._check_new_process()

        # update set of services handled by tracer
        if service and service not in self._services:
            self._services.add(service)

            # The constant tags for the dogstatsd client needs to updated with any new
            # service(s) that may have been added.
            self._update_dogstatsd_constant_tags()

        return span

    def _update_dogstatsd_constant_tags(self):
        """ Prepare runtime tags for ddstatsd.
        """
        # DEV: ddstatsd expects tags in the form ['key1:value1', 'key2:value2', ...]
        tags = [
            '{}:{}'.format(k, v)
            for k, v in RuntimeTags()
        ]
        self.log.debug('Updating constant tags %s', tags)
        self._dogstatsd_client.constant_tags = tags

    def _start_runtime_worker(self):
        if environ.get("DD_METRICS_RUNTIME") == "RuntimeWorker":
            self._runtime_worker = RuntimeWorker(self._dogstatsd_client, self._RUNTIME_METRICS_INTERVAL)
        else:
            from .internal.runtime.lightstep_metrics import LightstepMetricsWorker
            self._runtime_worker = LightstepMetricsWorker(
                MetricsReporter(token=self.tags.get(ACCESS_TOKEN)),
            )
        self._runtime_worker.start()

    def _check_new_process(self):
        """ Checks if the tracer is in a new process (was forked) and performs
            the necessary updates if it is a new process
        """
        pid = getpid()
        if self._pid == pid:
            return

        self._pid = pid

        # Assume that the services of the child are not necessarily a subset of those
        # of the parent.
        self._services = set()

        if self._runtime_worker is not None:
            self._start_runtime_worker()

        # force an immediate update constant tags since we have reset services
        # and generated a new runtime id
        self._update_dogstatsd_constant_tags()

        # Re-create the background writer thread
        self.writer = self.writer.recreate()

    def trace(self, name, service=None, resource=None, span_type=None):
        """
        Return a span that will trace an operation called `name`. The context that created
        the span as well as the span parenting, are automatically handled by the tracing
        function.

        :param str name: the name of the operation being traced
        :param str service: the name of the service being traced. If not set,
                            it will inherit the service from its parent.
        :param str resource: an optional name of the resource being tracked.
        :param str span_type: an optional operation type.

        You must call `finish` on all spans, either directly or with a context
        manager::

            >>> span = tracer.trace('web.request')
                try:
                    # do something
                finally:
                    span.finish()

            >>> with tracer.trace('web.request') as span:
                    # do something

        Trace will store the current active span and subsequent child traces will
        become its children::

            parent = tracer.trace('parent')     # has no parent span
            child  = tracer.trace('child')      # is a child of a parent
            child.finish()
            parent.finish()

            parent2 = tracer.trace('parent2')   # has no parent span
            parent2.finish()
        """
        # retrieve the Context using the context provider and create
        # a new Span that could be a root or a nested span
        context = self.get_call_context()
        return self.start_span(
            name,
            child_of=context,
            service=service,
            resource=resource,
            span_type=span_type,
        )

    def current_root_span(self):
        """Returns the root span of the current context.

        This is useful for attaching information related to the trace as a
        whole without needing to add to child spans.

        Usage is simple, for example::

            # get the root span
            root_span = tracer.current_root_span()
            # set the host just once on the root span
            if root_span:
                root_span.set_tag('host', '127.0.0.1')
        """
        ctx = self.get_call_context()
        if ctx:
            return ctx.get_current_root_span()
        return None

    def current_span(self):
        """
        Return the active span for the current call context or ``None``
        if no spans are available.
        """
        ctx = self.get_call_context()
        if ctx:
            return ctx.get_current_span()
        return None

    def record(self, context):
        """
        Record the given ``Context`` if it's finished.
        """
        # extract and enqueue the trace if it's sampled
        trace, sampled = context.get()
        if trace and sampled:
            self.write(trace)

    def write(self, spans):
        """
        Send the trace to the writer to enqueue the spans list in the agent
        sending queue.
        """
        if not spans:
            return  # nothing to do

        if self.log.isEnabledFor(logging.DEBUG):
            self.log.debug('writing %s spans (enabled:%s)', len(spans), self.enabled)
            for span in spans:
                self.log.debug('\n%s', span.pprint())

        if self.enabled and self.writer:
            # only submit the spans if we're actually enabled (and don't crash :)
            self.writer.write(spans=spans)

    @deprecated(message='Manually setting service info is no longer necessary', version='1.0.0')
    def set_service_info(self, *args, **kwargs):
        """Set the information about the given service.
        """
        return

    def wrap(self, name=None, service=None, resource=None, span_type=None):
        """
        A decorator used to trace an entire function. If the traced function
        is a coroutine, it traces the coroutine execution when is awaited.
        If a ``wrap_executor`` callable has been provided in the ``Tracer.configure()``
        method, it will be called instead of the default one when the function
        decorator is invoked.

        :param str name: the name of the operation being traced. If not set,
                         defaults to the fully qualified function name.
        :param str service: the name of the service being traced. If not set,
                            it will inherit the service from it's parent.
        :param str resource: an optional name of the resource being tracked.
        :param str span_type: an optional operation type.

        >>> @tracer.wrap('my.wrapped.function', service='my.service')
            def run():
                return 'run'

        >>> # name will default to 'execute' if unset
            @tracer.wrap()
            def execute():
                return 'executed'

        >>> # or use it in asyncio coroutines
            @tracer.wrap()
            async def coroutine():
                return 'executed'

        >>> @tracer.wrap()
            @asyncio.coroutine
            def coroutine():
                return 'executed'

        You can access the current span using `tracer.current_span()` to set
        tags:

        >>> @tracer.wrap()
            def execute():
                span = tracer.current_span()
                span.set_tag('a', 'b')
        """
        def wrap_decorator(f):
            # FIXME[matt] include the class name for methods.
            span_name = name if name else '%s.%s' % (f.__module__, f.__name__)

            # detect if the the given function is a coroutine to use the
            # right decorator; this initial check ensures that the
            # evaluation is done only once for each @tracer.wrap
            if compat.iscoroutinefunction(f):
                # call the async factory that creates a tracing decorator capable
                # to await the coroutine execution before finishing the span. This
                # code is used for compatibility reasons to prevent Syntax errors
                # in Python 2
                func_wrapper = compat.make_async_decorator(
                    self, f, span_name,
                    service=service,
                    resource=resource,
                    span_type=span_type,
                )
            else:
                @functools.wraps(f)
                def func_wrapper(*args, **kwargs):
                    # if a wrap executor has been configured, it is used instead
                    # of the default tracing function
                    if getattr(self, '_wrap_executor', None):
                        return self._wrap_executor(
                            self,
                            f, args, kwargs,
                            span_name,
                            service=service,
                            resource=resource,
                            span_type=span_type,
                        )

                    # otherwise fallback to a default tracing
                    with self.trace(span_name, service=service, resource=resource, span_type=span_type):
                        return f(*args, **kwargs)

            return func_wrapper
        return wrap_decorator

    def set_tags(self, tags):
        """ Set some tags at the tracer level.
        This will append those tags to each span created by the tracer.

        :param dict tags: dict of tags to set at tracer level
        """
        self.tags.update(tags)