Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hub-client / tests / telemetry / test_celery.py
Size: Mime:
# Copyright 2015 Docker, Inc. All rights reserved.

import mock

from django.test import TestCase
import celery

from dockerhub.telemetry.celery import (
    register_celery_events,
    on_before_task_publish,
    on_after_task_publish
)

celery.current_app.conf.CELERY_ALWAYS_EAGER = True

register_celery_events()


@celery.task
def _success_task(arg):
    return arg


@celery.task
def _failed_task(arg):
    raise RuntimeError("My Error")


class TelemetryCeleryTestCase(TestCase):

    def test_publish_duration_metric(self):
        with mock.patch('dockerhub.telemetry.celery.statsd', autospec=True) as mock_client:

            # note that we are just testing the publish metric
            # recording by hand. it would be preferable to test
            # this through the celery API, but celery does not
            # offer a convenient way to do this. see:
            # http://stackoverflow.com/questions/22233680/in-memory-broker-for-celery-unit-tests
            prefix = 'celery_task_publish_duration_ms'
            sender = 'tests-telemetry-test_celery-_success_task'

            body = {
                'id': 'test-task-id'
            }

            on_before_task_publish(sender, body)
            on_after_task_publish(sender, body)
            expected_tags = ['task:tests-telemetry-test_celery-_success_task']
            mock_client.timing.assert_called_with(prefix, mock.ANY,
                                                  tags=expected_tags)

    def test_timing_for_success_task(self):
        with mock.patch('dockerhub.telemetry.celery.statsd', autospec=True) as mock_client:
            _success_task.delay(1)
            valid_metric = 'celery_task_duration_ms'
            valid_tags = ['state:SUCCESS', 'task:tests-telemetry-test_celery-_success_task']
            mock_client.timing.assert_called_with(valid_metric, mock.ANY, tags=valid_tags)

    def test_timing_for_failed_task(self):
        with mock.patch('dockerhub.telemetry.celery.statsd', autospec=True) as mock_client:
            _failed_task.delay(1)
            valid_metric = 'celery_task_duration_ms'
            valid_tags = ['state:FAILURE', 'task:tests-telemetry-test_celery-_failed_task']
            mock_client.timing.assert_called_with(valid_metric, mock.ANY, tags=valid_tags)