Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ app / routes.py

# -*- coding: utf-8 -*-
"""
    celery.routes
    ~~~~~~~~~~~~~

    Contains utilities for working with task routers,
    (:setting:`CELERY_ROUTES`).

"""
from __future__ import absolute_import

from celery.exceptions import QueueNotFound
from celery.five import string_t
from celery.utils import lpmerge
from celery.utils.functional import firstmethod, mlazy
from celery.utils.imports import instantiate

__all__ = ['MapRoute', 'Router', 'prepare']

_first_route = firstmethod('route_for_task')


class MapRoute(object):
    """Creates a router out of a :class:`dict`."""

    def __init__(self, map):
        self.map = map

    def route_for_task(self, task, *args, **kwargs):
        try:
            return dict(self.map[task])
        except KeyError:
            pass
        except ValueError:
            return {'queue': self.map[task]}


class Router(object):

    def __init__(self, routes=None, queues=None,
                 create_missing=False, app=None):
        self.app = app
        self.queues = {} if queues is None else queues
        self.routes = [] if routes is None else routes
        self.create_missing = create_missing

    def route(self, options, task, args=(), kwargs={}):
        options = self.expand_destination(options)  # expands 'queue'
        if self.routes:
            route = self.lookup_route(task, args, kwargs)
            if route:  # expands 'queue' in route.
                return lpmerge(self.expand_destination(route), options)
        if 'queue' not in options:
            options = lpmerge(self.expand_destination(
                              self.app.conf.CELERY_DEFAULT_QUEUE), options)
        return options

    def expand_destination(self, route):
        # Route can be a queue name: convenient for direct exchanges.
        if isinstance(route, string_t):
            queue, route = route, {}
        else:
            # can use defaults from configured queue, but override specific
            # things (like the routing_key): great for topic exchanges.
            queue = route.pop('queue', None)

        if queue:
            try:
                Q = self.queues[queue]  # noqa
            except KeyError:
                raise QueueNotFound(
                    'Queue {0!r} missing from CELERY_QUEUES'.format(queue))
            # needs to be declared by publisher
            route['queue'] = Q
        return route

    def lookup_route(self, task, args=None, kwargs=None):
        return _first_route(self.routes, task, args, kwargs)


def prepare(routes):
    """Expands the :setting:`CELERY_ROUTES` setting."""

    def expand_route(route):
        if isinstance(route, dict):
            return MapRoute(route)
        if isinstance(route, string_t):
            return mlazy(instantiate, route)
        return route

    if routes is None:
        return ()
    if not isinstance(routes, (list, tuple)):
        routes = (routes, )
    return [expand_route(route) for route in routes]