Repository URL to install this package:
|
Version:
0.32.0 ▾
|
# Copyright 2015 Docker, Inc. All rights reserved.
from __future__ import absolute_import
import threading
import timeit
from .client import statsd
_state = threading.local()
TASK_DURATION_PREFIX = 'celery_task_duration_ms'
PUBLISH_DURATION_PREFIX = 'celery_task_publish_duration_ms'
def _sanitize_task_name(name):
return name.replace('.', '-')
def _task_key(prefix, task_id):
return '{}.{}'.format(prefix, task_id)
def start_timer(task_key):
"""
record the start time for the given task
"""
try:
_state.task_timer[task_key] = timeit.default_timer()
except AttributeError:
_state.task_timer = {}
_state.task_timer[task_key] = timeit.default_timer()
def stop_timer(task_key, metric_name, tags=None):
"""
stop the timer with the given task_key, and flush
a statsd metric using the given metric_name
"""
start_time = _state.task_timer.pop(task_key, None)
if start_time:
duration = float((timeit.default_timer() - start_time) * 1000)
statsd.timing(metric_name, duration, tags=tags)
def on_task_prerun(sender, task_id, **kwargs):
"""
record task execution start time
"""
task_key = _task_key(TASK_DURATION_PREFIX, task_id)
start_timer(task_key)
def on_task_postrun(sender, task_id, task, state, **kwargs):
"""
flush a statsd metric upon task execution completion
"""
prefix = TASK_DURATION_PREFIX
task_key = _task_key(prefix, task_id)
task_name = _sanitize_task_name(task.name)
tags = ['state:{}'.format(state), 'task:{}'.format(task_name)]
stop_timer(task_key, prefix, tags)
def on_before_task_publish(sender, body, **kwargs):
"""
record task publish start time
"""
task_id = body.get('id')
task_key = _task_key(PUBLISH_DURATION_PREFIX, task_id)
start_timer(task_key)
def on_after_task_publish(sender, body, **kwargs):
"""
flush a statsd metric upon publishing a task
"""
prefix = PUBLISH_DURATION_PREFIX
task_id = body.get('id')
task_key = _task_key(prefix, task_id)
task_name = _sanitize_task_name(sender)
tags = ['task:{}'.format(task_name)]
stop_timer(task_key, prefix, tags)
def register_celery_events():
from celery import signals
signals.before_task_publish.connect(on_before_task_publish)
signals.after_task_publish.connect(on_after_task_publish)
signals.task_prerun.connect(on_task_prerun)
signals.task_postrun.connect(on_task_postrun)