Repository URL to install this package:
|
Version:
0.32.0 ▾
|
# Copyright 2015 Docker, Inc. All rights reserved.
import mock
from django.test import TestCase
import celery
from dockerhub.telemetry.celery import (
register_celery_events,
on_before_task_publish,
on_after_task_publish
)
celery.current_app.conf.CELERY_ALWAYS_EAGER = True
register_celery_events()
@celery.task
def _success_task(arg):
return arg
@celery.task
def _failed_task(arg):
raise RuntimeError("My Error")
class TelemetryCeleryTestCase(TestCase):
def test_publish_duration_metric(self):
with mock.patch('dockerhub.telemetry.celery.statsd', autospec=True) as mock_client:
# note that we are just testing the publish metric
# recording by hand. it would be preferable to test
# this through the celery API, but celery does not
# offer a convenient way to do this. see:
# http://stackoverflow.com/questions/22233680/in-memory-broker-for-celery-unit-tests
prefix = 'celery_task_publish_duration_ms'
sender = 'tests-telemetry-test_celery-_success_task'
body = {
'id': 'test-task-id'
}
on_before_task_publish(sender, body)
on_after_task_publish(sender, body)
expected_tags = ['task:tests-telemetry-test_celery-_success_task']
mock_client.timing.assert_called_with(prefix, mock.ANY,
tags=expected_tags)
def test_timing_for_success_task(self):
with mock.patch('dockerhub.telemetry.celery.statsd', autospec=True) as mock_client:
_success_task.delay(1)
valid_metric = 'celery_task_duration_ms'
valid_tags = ['state:SUCCESS', 'task:tests-telemetry-test_celery-_success_task']
mock_client.timing.assert_called_with(valid_metric, mock.ANY, tags=valid_tags)
def test_timing_for_failed_task(self):
with mock.patch('dockerhub.telemetry.celery.statsd', autospec=True) as mock_client:
_failed_task.delay(1)
valid_metric = 'celery_task_duration_ms'
valid_tags = ['state:FAILURE', 'task:tests-telemetry-test_celery-_failed_task']
mock_client.timing.assert_called_with(valid_metric, mock.ANY, tags=valid_tags)