Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
newrelic / newrelic / hooks / application_celery.py
Size: Mime:
"""This module provides instrumentation for Celery.

Note that Celery has a habit of moving things around in code base or of
completely rewriting stuff across minor versions. See additional notes
about this below.

"""

import functools

from newrelic.api.application import application_instance
from newrelic.api.background_task import BackgroundTask
from newrelic.api.function_trace import FunctionTrace
from newrelic.api.pre_function import wrap_pre_function
from newrelic.api.web_transaction import WebTransaction
from newrelic.api.object_wrapper import callable_name, ObjectWrapper
from newrelic.api.transaction import current_transaction

def CeleryTaskWrapper(wrapped, application=None, name=None):

    def wrapper(wrapped, instance, args, kwargs):
        transaction = current_transaction(active_only=False)

        if callable(name):
            # Start Hotfix v2.2.1.
            #if instance and inspect.ismethod(wrapped):
            #    _name = name(instance, *args, **kwargs)
            #else:
            #    _name = name(*args, **kwargs)

            if instance is not None:
                _name = name(instance, *args, **kwargs)
            else:
                _name = name(*args, **kwargs)
            # End Hotfix v2.2.1.

        elif name is None:
            _name = callable_name(wrapped)

        else:
            _name = name

        # Helper for obtaining the appropriate application object. If
        # has an activate() method assume it is a valid application
        # object. Don't check by type so se can easily mock it for
        # testing if need be.

        def _application():
            if hasattr(application, 'activate'):
                return application
            return application_instance(application)

        # A Celery Task can be called either outside of a transaction, or
        # within the context of an existing transaction. There are 3
        # possibilities we need to handle:
        #
        #   1. In an inactive transaction
        #
        #      If the end_of_transaction() or ignore_transaction() API calls
        #      have been invoked, this task may be called in the context
        #      of an inactive transaction. In this case, don't wrap the task
        #      in any way. Just run the original function.
        #
        #   2. In an active transaction
        #
        #      Run the original function inside a FunctionTrace.
        #
        #   3. Outside of a transaction
        #
        #      This is the typical case for a celery Task. Since it's not
        #      running inside of an existing transaction, we want to create
        #      a new background transaction for it.

        if transaction and (transaction.ignore_transaction
                or transaction.stopped):
            return wrapped(*args, **kwargs)

        elif transaction:
            with FunctionTrace(transaction, callable_name(wrapped)):
                return wrapped(*args, **kwargs)

        else:
            with BackgroundTask(_application(), _name, 'Celery'):
                return wrapped(*args, **kwargs)

    # Start Hotfix v2.2.1.
    #obj = ObjectWrapper(wrapped, None, wrapper)
    # End Hotfix v2.2.1.

    # Celery tasks that inherit from celery.app.task must implement a run()
    # method.
    # ref: (http://docs.celeryproject.org/en/2.5/reference/
    #                            celery.app.task.html#celery.app.task.BaseTask)
    # Celery task's __call__ method then calls the run() method to execute the
    # task. But celery does a micro-optimization where if the __call__ method
    # was not overridden by an inherited task, then it will directly execute
    # the run() method without going through the __call__ method. Our
    # instrumentation via ObjectWrapper() relies on __call__ being called which
    # in turn executes the wrapper() function defined above. Since the micro
    # optimization bypasses __call__ method it breaks our instrumentation of
    # celery. To circumvent this problem, we added a run() attribute to our
    # ObjectWrapper which points to our __call__ method. This causes Celery
    # to execute our __call__ method which in turn applies the wrapper
    # correctly before executing the task.
    #
    # This is only a problem in Celery versions 2.5.3 to 2.5.5. The later
    # versions included a monkey-patching provision which did not perform this
    # optimization on functions that were monkey-patched.

    # Start Hotfix v2.2.1.
    #obj.__dict__['run'] = obj.__call__

    class _ObjectWrapper(ObjectWrapper):
        def run(self, *args, **kwargs):
            return self.__call__(*args, **kwargs)

    obj = _ObjectWrapper(wrapped, None, wrapper)
    # End Hotfix v2.2.1.

    return obj

def instrument_celery_app_task(module):

    # Triggered for both 'celery.app.task' and 'celery.task.base'.

    if hasattr(module, 'BaseTask'):

        # Need to add a wrapper for background task entry point.

        # In Celery 2.2 the 'BaseTask' class actually resided in the
        # module 'celery.task.base'. In Celery 2.3 the 'BaseTask' class
        # moved to 'celery.app.task' but an alias to it was retained in
        # the module 'celery.task.base'. We need to detect both module
        # imports, but we check the module name associated with
        # 'BaseTask' to ensure that we do not instrument the class via
        # the alias in Celery 2.3 and later.

        # In Celery 2.5+, although 'BaseTask' still exists execution of
        # the task doesn't pass through it. For Celery 2.5+ need to wrap
        # the tracer instead.

        def task_name(task, *args, **kwargs):
            return task.name

        if module.BaseTask.__module__ == module.__name__:
            module.BaseTask.__call__ = CeleryTaskWrapper(
                    module.BaseTask.__call__, name=task_name)

def instrument_celery_execute_trace(module):

    # Triggered for 'celery.execute_trace'.

    if hasattr(module, 'build_tracer'):

        # Need to add a wrapper for background task entry point.

        # In Celery 2.5+ we need to wrap the task when tracer is being
        # created. Note that in Celery 2.5 the 'build_tracer' function
        # actually resided in the module 'celery.execute.task'. In
        # Celery 3.0 the 'build_tracer' function moved to
        # 'celery.task.trace'.

        _build_tracer = module.build_tracer

        def build_tracer(name, task, *args, **kwargs):
            task = task or module.tasks[name]
            task = CeleryTaskWrapper(task, name=name)
            return _build_tracer(name, task, *args, **kwargs)

        module.build_tracer = build_tracer

def instrument_celery_worker(module):

    # Triggered for 'celery.worker' and 'celery.concurrency.processes'.

    if hasattr(module, 'process_initializer'):

        # We try and force registration of default application after
        # fork of worker process rather than lazily on first request.

        # Originally the 'process_initializer' function was located in
        # 'celery.worker'. In Celery 2.5 the function 'process_initializer'
        # was moved to the module 'celery.concurrency.processes'.

        _process_initializer = module.process_initializer

        @functools.wraps(module.process_initializer)
        def process_initializer(*args, **kwargs):
            application_instance().activate()
            return _process_initializer(*args, **kwargs)

        module.process_initializer = process_initializer

def instrument_celery_loaders_base(module):

    def force_application_activation(*args, **kwargs):
        application_instance().activate()

    wrap_pre_function(module, 'BaseLoader.init_worker',
            force_application_activation)