Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / app / test_routes.py

from __future__ import absolute_import

from kombu import Exchange
from kombu.utils.functional import maybe_evaluate

from celery.app import routes
from celery.exceptions import QueueNotFound
from celery.utils.functional import LRUCache

from celery.tests.case import AppCase


def Router(app, *args, **kwargs):
    return routes.Router(*args, app=app, **kwargs)


def E(app, queues):
    def expand(answer):
        return Router(app, [], queues).expand_destination(answer)
    return expand


def set_queues(app, **queues):
    app.conf.CELERY_QUEUES = queues
    app.amqp.queues = app.amqp.Queues(queues)


class RouteCase(AppCase):

    def setup(self):
        self.a_queue = {
            'exchange': 'fooexchange',
            'exchange_type': 'fanout',
            'routing_key': 'xuzzy',
        }
        self.b_queue = {
            'exchange': 'barexchange',
            'exchange_type': 'topic',
            'routing_key': 'b.b.#',
        }
        self.d_queue = {
            'exchange': self.app.conf.CELERY_DEFAULT_EXCHANGE,
            'exchange_type': self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE,
            'routing_key': self.app.conf.CELERY_DEFAULT_ROUTING_KEY,
        }

        @self.app.task(shared=False)
        def mytask():
            pass
        self.mytask = mytask


class test_MapRoute(RouteCase):

    def test_route_for_task_expanded_route(self):
        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
        expand = E(self.app, self.app.amqp.queues)
        route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
        self.assertEqual(
            expand(route.route_for_task(self.mytask.name))['queue'].name,
            'foo',
        )
        self.assertIsNone(route.route_for_task('celery.awesome'))

    def test_route_for_task(self):
        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
        expand = E(self.app, self.app.amqp.queues)
        route = routes.MapRoute({self.mytask.name: self.b_queue})
        self.assertDictContainsSubset(
            self.b_queue,
            expand(route.route_for_task(self.mytask.name)),
        )
        self.assertIsNone(route.route_for_task('celery.awesome'))

    def test_expand_route_not_found(self):
        expand = E(self.app, self.app.amqp.Queues(
                   self.app.conf.CELERY_QUEUES, False))
        route = routes.MapRoute({'a': {'queue': 'x'}})
        with self.assertRaises(QueueNotFound):
            expand(route.route_for_task('a'))


class test_lookup_route(RouteCase):

    def test_init_queues(self):
        router = Router(self.app, queues=None)
        self.assertDictEqual(router.queues, {})

    def test_lookup_takes_first(self):
        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
        R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
                            {self.mytask.name: {'queue': 'foo'}}))
        router = Router(self.app, R, self.app.amqp.queues)
        self.assertEqual(router.route({}, self.mytask.name,
                         args=[1, 2], kwargs={})['queue'].name, 'bar')

    def test_expands_queue_in_options(self):
        set_queues(self.app)
        R = routes.prepare(())
        router = Router(
            self.app, R, self.app.amqp.queues, create_missing=True,
        )
        # apply_async forwards all arguments, even exchange=None etc,
        # so need to make sure it's merged correctly.
        route = router.route(
            {'queue': 'testq',
             'exchange': None,
             'routing_key': None,
             'immediate': False},
            self.mytask.name,
            args=[1, 2], kwargs={},
        )
        self.assertEqual(route['queue'].name, 'testq')
        self.assertEqual(route['queue'].exchange, Exchange('testq'))
        self.assertEqual(route['queue'].routing_key, 'testq')
        self.assertEqual(route['immediate'], False)

    def test_expand_destination_string(self):
        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
        x = Router(self.app, {}, self.app.amqp.queues)
        dest = x.expand_destination('foo')
        self.assertEqual(dest['queue'].name, 'foo')

    def test_lookup_paths_traversed(self):
        set_queues(
            self.app, foo=self.a_queue, bar=self.b_queue,
            **{self.app.conf.CELERY_DEFAULT_QUEUE: self.d_queue}
        )
        R = routes.prepare((
            {'celery.xaza': {'queue': 'bar'}},
            {self.mytask.name: {'queue': 'foo'}}
        ))
        router = Router(self.app, R, self.app.amqp.queues)
        self.assertEqual(router.route({}, self.mytask.name,
                         args=[1, 2], kwargs={})['queue'].name, 'foo')
        self.assertEqual(
            router.route({}, 'celery.poza')['queue'].name,
            self.app.conf.CELERY_DEFAULT_QUEUE,
        )


class test_prepare(AppCase):

    def test_prepare(self):
        o = object()
        R = [{'foo': 'bar'},
             'celery.utils.functional.LRUCache', o]
        p = routes.prepare(R)
        self.assertIsInstance(p[0], routes.MapRoute)
        self.assertIsInstance(maybe_evaluate(p[1]), LRUCache)
        self.assertIs(p[2], o)

        self.assertEqual(routes.prepare(o), [o])

    def test_prepare_item_is_dict(self):
        R = {'foo': 'bar'}
        p = routes.prepare(R)
        self.assertIsInstance(p[0], routes.MapRoute)