Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / bin / test_celeryevdump.py

from __future__ import absolute_import

from time import time

from celery.events.dumper import (
    humanize_type,
    Dumper,
    evdump,
)

from celery.tests.case import AppCase, Mock, WhateverIO, patch


class test_Dumper(AppCase):

    def setup(self):
        self.out = WhateverIO()
        self.dumper = Dumper(out=self.out)

    def test_humanize_type(self):
        self.assertEqual(humanize_type('worker-offline'), 'shutdown')
        self.assertEqual(humanize_type('task-started'), 'task started')

    def test_format_task_event(self):
        self.dumper.format_task_event(
            'worker@example.com', time(), 'task-started', 'tasks.add', {})
        self.assertTrue(self.out.getvalue())

    def test_on_event(self):
        event = {
            'hostname': 'worker@example.com',
            'timestamp': time(),
            'uuid': '1ef',
            'name': 'tasks.add',
            'args': '(2, 2)',
            'kwargs': '{}',
        }
        self.dumper.on_event(dict(event, type='task-received'))
        self.assertTrue(self.out.getvalue())
        self.dumper.on_event(dict(event, type='task-revoked'))
        self.dumper.on_event(dict(event, type='worker-online'))

    @patch('celery.events.EventReceiver.capture')
    def test_evdump(self, capture):
        capture.side_effect = KeyboardInterrupt()
        evdump(app=self.app)

    def test_evdump_error_handler(self):
        app = Mock(name='app')
        with patch('celery.events.dumper.Dumper') as Dumper:
            Dumper.return_value = Mock(name='dumper')
            recv = app.events.Receiver.return_value = Mock()

            def se(*_a, **_k):
                recv.capture.side_effect = SystemExit()
                raise KeyError()
            recv.capture.side_effect = se

            Conn = app.connection.return_value = Mock(name='conn')
            conn = Conn.clone.return_value = Mock(name='cloned_conn')
            conn.connection_errors = (KeyError, )
            conn.channel_errors = ()

            evdump(app)
            self.assertTrue(conn.ensure_connection.called)
            errback = conn.ensure_connection.call_args[0][0]
            errback(KeyError(), 1)
            self.assertTrue(conn.as_uri.called)