Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
newrelic / newrelic / hooks / application_gearman.py
Size: Mime:
from newrelic.agent import (application as default_application,
        wrap_function_wrapper, BackgroundTask, FunctionTrace, FunctionWrapper,
        current_transaction, callable_name, ExternalTrace, wrap_function_trace)

# Following wrappers are specifically for a gearman client.

def instrument_gearman_client(module):
    wrap_function_trace(module, 'GearmanClient.submit_job')
    wrap_function_trace(module, 'GearmanClient.submit_multiple_jobs')
    wrap_function_trace(module, 'GearmanClient.submit_multiple_requests')
    wrap_function_trace(module, 'GearmanClient.wait_until_jobs_accepted')
    wrap_function_trace(module, 'GearmanClient.wait_until_jobs_completed')
    wrap_function_trace(module, 'GearmanClient.get_job_status')
    wrap_function_trace(module, 'GearmanClient.get_job_statuses')
    wrap_function_trace(module, 'GearmanClient.wait_until_job_statuses_received')

def wrapper_GearmanConnectionManager_poll_connections_until_stopped(
        wrapped, instance, args, kwargs):

    def _bind_params(submitted_connections, *args, **kwargs):
        return submitted_connections

    transaction = current_transaction()

    if transaction is None:
        return wrapped(*args, **kwargs)

    # Because gearman uses a custom message based protocol over a raw
    # socket, we can't readily wrap a single function which is
    # performing a request and then returning a response. The best we
    # can do is wrap as an external the poll_connections_until_stopped()
    # function. This is what manages looking for whether data is
    # available from the server, or whether data can be written, and
    # then handles those events.
    #
    # This is complicated somewhat though due to a gearman client being
    # able to be supplied multiple servers to communicate with. We can
    # not actually determine which server communication will occur with
    # until the specific handle function for read, write or error is
    # called but that is too late in cases where a failure of some sort
    # occurs such as a timeout. What we therefore do is presume
    # initially that the server will be whatever is the first in the
    # list of server connections and we will override this latter based
    # on which server we ended up communicating with. It is possible this
    # still will not always be correct if data is handled for multiple
    # servers in the one call, but it is likely as close as we can get.
    # As likely that most clients will only be talking to a single
    # server, it likely will not matter too much.

    submitted_connections = _bind_params(*args, **kwargs)

    if not submitted_connections:
        return wrapped(*args, **kwargs)

    first_connection = list(submitted_connections)[0]

    url = 'gearman://%s:%s' % (first_connection.gearman_host,
            first_connection.gearman_port)

    with ExternalTrace(transaction, 'gearman', url):
        return wrapped(*args, **kwargs)

def wrapper_GearmanConnectionManager_handle_function(wrapped, instance,
        args, kwargs):

    def _bind_params(current_connection, *args, **kwargs):
        return current_connection

    transaction = current_transaction()

    if transaction is None:
        return wrapped(*args, **kwargs)

    tracer = transaction.active_node()

    if not isinstance(tracer, ExternalTrace):
        return wrapped(*args, **kwargs)

    # Now override the URL for the external to be the specific server we
    # ended up communicating with. This could get overridden multiple
    # times in the context of a single poll_connections_until_stopped()
    # call and so will be set to the last server data was processed for.
    # This thus may not necessarily be correct if commnicating with
    # multiple servers and data from more than one was being handled for
    # some reason. Can't really do much better than this though but will
    # be fine for the expected typical use case of a single server.

    if not tracer.url.startswith('gearman:'):
        return wrapped(*args, **kwargs)

    current_connection = _bind_params(*args, **kwargs)

    tracer.url = 'gearman://%s:%s' % (current_connection.gearman_host,
            current_connection.gearman_port)

    return wrapped(*args, **kwargs)

def instrument_gearman_connection_manager(module):
    wrap_function_wrapper(module, 'GearmanConnectionManager.handle_read',
            wrapper_GearmanConnectionManager_handle_function)
    wrap_function_wrapper(module, 'GearmanConnectionManager.handle_write',
            wrapper_GearmanConnectionManager_handle_function)
    wrap_function_wrapper(module, 'GearmanConnectionManager.handle_error',
            wrapper_GearmanConnectionManager_handle_function)

    wrap_function_wrapper(module,
            'GearmanConnectionManager.poll_connections_until_stopped',
            wrapper_GearmanConnectionManager_poll_connections_until_stopped)

# Following wrappers are specifically for a gearman worker.

def wrapper_GearmanWorker_on_job_execute(wrapped, instance, args, kwargs):
    def _bind_params(current_job, *args, **kwargs):
        return current_job

    # The background task is always created against the default
    # application specified by the agent configuration. The background
    # task is named after the name the task function was registered as,
    # and prefixed by the special 'Gearman' group.

    application = default_application()
    current_job = _bind_params(*args, **kwargs)

    with BackgroundTask(application, current_job.task, 'Gearman'):
        return wrapped(*args, **kwargs)

def wrapper_callback_function(wrapped, instance, args, kwargs):
    transaction = current_transaction()

    if transaction is None:
        return wrapped(*args, **kwargs)

    # This tracks as a separate function trace the call of the actual
    # task function so that the original function name also appears in
    # the performance breakdown. We catch exceptions and record them at
    # this point as otherwise they are caught by the gearman worker
    # dispatch code and do not actually propagate up to the level of the
    # background task wrapper.

    with FunctionTrace(transaction, callable_name(wrapped)):
        try:
            return wrapped(*args, **kwargs)
        except:  # Catch all
            transaction.record_exception()
            raise

def wrapper_GearmanWorker_register_task(wrapped, instance, args, kwargs):
    def _bind_params(task, callback_function, *args, **kwargs):
        return task, callback_function, args, kwargs

    # This applies a wrapper around the task function at the point that
    # it is registered. This is so we can later wrap execution with a
    # function trace and catch and record any exceptions in the task
    # function.

    task, callback_function, _args, _kwargs = _bind_params(*args, **kwargs)
    callback_function = FunctionWrapper(callback_function,
            wrapper_callback_function)

    return wrapped(task, callback_function, *_args, **_kwargs)

def instrument_gearman_worker(module):
    wrap_function_wrapper(module, 'GearmanWorker.on_job_execute',
            wrapper_GearmanWorker_on_job_execute)
    wrap_function_wrapper(module, 'GearmanWorker.register_task',
            wrapper_GearmanWorker_register_task)