Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hub-client / dockerhub / telemetry / celery.py
Size: Mime:
# Copyright 2015 Docker, Inc. All rights reserved.

from __future__ import absolute_import

import threading
import timeit

from .client import statsd

_state = threading.local()

TASK_DURATION_PREFIX = 'celery_task_duration_ms'
PUBLISH_DURATION_PREFIX = 'celery_task_publish_duration_ms'


def _sanitize_task_name(name):
    return name.replace('.', '-')


def _task_key(prefix, task_id):
    return '{}.{}'.format(prefix, task_id)


def start_timer(task_key):
    """
    record the start time for the given task
    """
    try:
        _state.task_timer[task_key] = timeit.default_timer()
    except AttributeError:
        _state.task_timer = {}
        _state.task_timer[task_key] = timeit.default_timer()


def stop_timer(task_key, metric_name, tags=None):
    """
    stop the timer with the given task_key, and flush
    a statsd metric using the given metric_name
    """
    start_time = _state.task_timer.pop(task_key, None)
    if start_time:
        duration = float((timeit.default_timer() - start_time) * 1000)
        statsd.timing(metric_name, duration, tags=tags)


def on_task_prerun(sender, task_id, **kwargs):
    """
    record task execution start time
    """
    task_key = _task_key(TASK_DURATION_PREFIX, task_id)
    start_timer(task_key)


def on_task_postrun(sender, task_id, task, state, **kwargs):
    """
    flush a statsd metric upon task execution completion
    """
    prefix = TASK_DURATION_PREFIX

    task_key = _task_key(prefix, task_id)
    task_name = _sanitize_task_name(task.name)

    tags = ['state:{}'.format(state), 'task:{}'.format(task_name)]
    stop_timer(task_key, prefix, tags)


def on_before_task_publish(sender, body, **kwargs):
    """
    record task publish start time
    """
    task_id = body.get('id')

    task_key = _task_key(PUBLISH_DURATION_PREFIX, task_id)
    start_timer(task_key)


def on_after_task_publish(sender, body, **kwargs):
    """
    flush a statsd metric upon publishing a task
    """

    prefix = PUBLISH_DURATION_PREFIX

    task_id = body.get('id')
    task_key = _task_key(prefix, task_id)

    task_name = _sanitize_task_name(sender)
    tags = ['task:{}'.format(task_name)]
    stop_timer(task_key, prefix, tags)


def register_celery_events():
    from celery import signals

    signals.before_task_publish.connect(on_before_task_publish)
    signals.after_task_publish.connect(on_after_task_publish)

    signals.task_prerun.connect(on_task_prerun)
    signals.task_postrun.connect(on_task_postrun)