Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ app / control.py

# -*- coding: utf-8 -*-
"""
    celery.app.control
    ~~~~~~~~~~~~~~~~~~~

    Client for worker remote control commands.
    Server implementation is in :mod:`celery.worker.control`.

"""
from __future__ import absolute_import

import warnings

from kombu.pidbox import Mailbox
from kombu.utils import cached_property

from celery.exceptions import DuplicateNodenameWarning
from celery.utils.text import pluralize

__all__ = ['Inspect', 'Control', 'flatten_reply']

W_DUPNODE = """\
Received multiple replies from node {0}: {1}.
Please make sure you give each node a unique nodename using the `-n` option.\
"""


def flatten_reply(reply):
    nodes, dupes = {}, set()
    for item in reply:
        [dupes.add(name) for name in item if name in nodes]
        nodes.update(item)
    if dupes:
        warnings.warn(DuplicateNodenameWarning(
            W_DUPNODE.format(
                pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
            ),
        ))
    return nodes


class Inspect(object):
    app = None

    def __init__(self, destination=None, timeout=1, callback=None,
                 connection=None, app=None, limit=None):
        self.app = app or self.app
        self.destination = destination
        self.timeout = timeout
        self.callback = callback
        self.connection = connection
        self.limit = limit

    def _prepare(self, reply):
        if not reply:
            return
        by_node = flatten_reply(reply)
        if self.destination and \
                not isinstance(self.destination, (list, tuple)):
            return by_node.get(self.destination)
        return by_node

    def _request(self, command, **kwargs):
        return self._prepare(self.app.control.broadcast(
            command,
            arguments=kwargs,
            destination=self.destination,
            callback=self.callback,
            connection=self.connection,
            limit=self.limit,
            timeout=self.timeout, reply=True,
        ))

    def report(self):
        return self._request('report')

    def clock(self):
        return self._request('clock')

    def active(self, safe=False):
        return self._request('dump_active', safe=safe)

    def scheduled(self, safe=False):
        return self._request('dump_schedule', safe=safe)

    def reserved(self, safe=False):
        return self._request('dump_reserved', safe=safe)

    def stats(self):
        return self._request('stats')

    def revoked(self):
        return self._request('dump_revoked')

    def registered(self, *taskinfoitems):
        return self._request('dump_tasks', taskinfoitems=taskinfoitems)
    registered_tasks = registered

    def ping(self):
        return self._request('ping')

    def active_queues(self):
        return self._request('active_queues')

    def query_task(self, ids):
        return self._request('query_task', ids=ids)

    def conf(self, with_defaults=False):
        return self._request('dump_conf', with_defaults=with_defaults)

    def hello(self, from_node, revoked=None):
        return self._request('hello', from_node=from_node, revoked=revoked)

    def memsample(self):
        return self._request('memsample')

    def memdump(self, samples=10):
        return self._request('memdump', samples=samples)

    def objgraph(self, type='Request', n=200, max_depth=10):
        return self._request('objgraph', num=n, max_depth=max_depth, type=type)


class Control(object):
    Mailbox = Mailbox

    def __init__(self, app=None):
        self.app = app
        self.mailbox = self.Mailbox('celery', type='fanout', accept=['json'])

    @cached_property
    def inspect(self):
        return self.app.subclass_with_self(Inspect, reverse='control.inspect')

    def purge(self, connection=None):
        """Discard all waiting tasks.

        This will ignore all tasks waiting for execution, and they will
        be deleted from the messaging server.

        :returns: the number of tasks discarded.

        """
        with self.app.connection_or_acquire(connection) as conn:
            return self.app.amqp.TaskConsumer(conn).purge()
    discard_all = purge

    def election(self, id, topic, action=None, connection=None):
        self.broadcast('election', connection=connection, arguments={
            'id': id, 'topic': topic, 'action': action,
        })

    def revoke(self, task_id, destination=None, terminate=False,
               signal='SIGTERM', **kwargs):
        """Tell all (or specific) workers to revoke a task by id.

        If a task is revoked, the workers will ignore the task and
        not execute it after all.

        :param task_id: Id of the task to revoke.
        :keyword terminate: Also terminate the process currently working
            on the task (if any).
        :keyword signal: Name of signal to send to process if terminate.
            Default is TERM.

        See :meth:`broadcast` for supported keyword arguments.

        """
        return self.broadcast('revoke', destination=destination,
                              arguments={'task_id': task_id,
                                         'terminate': terminate,
                                         'signal': signal}, **kwargs)

    def ping(self, destination=None, timeout=1, **kwargs):
        """Ping all (or specific) workers.

        Will return the list of answers.

        See :meth:`broadcast` for supported keyword arguments.

        """
        return self.broadcast('ping', reply=True, destination=destination,
                              timeout=timeout, **kwargs)

    def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
        """Tell all (or specific) workers to set a new rate limit
        for task by type.

        :param task_name: Name of task to change rate limit for.
        :param rate_limit: The rate limit as tasks per second, or a rate limit
            string (`'100/m'`, etc.
            see :attr:`celery.task.base.Task.rate_limit` for
            more information).

        See :meth:`broadcast` for supported keyword arguments.

        """
        return self.broadcast('rate_limit', destination=destination,
                              arguments={'task_name': task_name,
                                         'rate_limit': rate_limit},
                              **kwargs)

    def add_consumer(self, queue, exchange=None, exchange_type='direct',
                     routing_key=None, options=None, **kwargs):
        """Tell all (or specific) workers to start consuming from a new queue.

        Only the queue name is required as if only the queue is specified
        then the exchange/routing key will be set to the same name (
        like automatic queues do).

        .. note::

            This command does not respect the default queue/exchange
            options in the configuration.

        :param queue: Name of queue to start consuming from.
        :keyword exchange: Optional name of exchange.
        :keyword exchange_type: Type of exchange (defaults to 'direct')
            command to, when empty broadcast to all workers.
        :keyword routing_key: Optional routing key.
        :keyword options: Additional options as supported
            by :meth:`kombu.entitiy.Queue.from_dict`.

        See :meth:`broadcast` for supported keyword arguments.

        """
        return self.broadcast(
            'add_consumer',
            arguments=dict({'queue': queue, 'exchange': exchange,
                            'exchange_type': exchange_type,
                            'routing_key': routing_key}, **options or {}),
            **kwargs
        )

    def cancel_consumer(self, queue, **kwargs):
        """Tell all (or specific) workers to stop consuming from ``queue``.

        Supports the same keyword arguments as :meth:`broadcast`.

        """
        return self.broadcast(
            'cancel_consumer', arguments={'queue': queue}, **kwargs
        )

    def time_limit(self, task_name, soft=None, hard=None, **kwargs):
        """Tell all (or specific) workers to set time limits for
        a task by type.

        :param task_name: Name of task to change time limits for.
        :keyword soft: New soft time limit (in seconds).
        :keyword hard: New hard time limit (in seconds).

        Any additional keyword arguments are passed on to :meth:`broadcast`.

        """
        return self.broadcast(
            'time_limit',
            arguments={'task_name': task_name,
                       'hard': hard, 'soft': soft}, **kwargs)

    def enable_events(self, destination=None, **kwargs):
        """Tell all (or specific) workers to enable events."""
        return self.broadcast('enable_events', {}, destination, **kwargs)

    def disable_events(self, destination=None, **kwargs):
        """Tell all (or specific) workers to disable events."""
        return self.broadcast('disable_events', {}, destination, **kwargs)

    def pool_grow(self, n=1, destination=None, **kwargs):
        """Tell all (or specific) workers to grow the pool by ``n``.

        Supports the same arguments as :meth:`broadcast`.

        """
        return self.broadcast('pool_grow', {'n': n}, destination, **kwargs)

    def pool_shrink(self, n=1, destination=None, **kwargs):
        """Tell all (or specific) workers to shrink the pool by ``n``.

        Supports the same arguments as :meth:`broadcast`.

        """
        return self.broadcast('pool_shrink', {'n': n}, destination, **kwargs)

    def autoscale(self, max, min, destination=None, **kwargs):
        """Change worker(s) autoscale setting.

        Supports the same arguments as :meth:`broadcast`.

        """
        return self.broadcast(
            'autoscale', {'max': max, 'min': min}, destination, **kwargs)

    def broadcast(self, command, arguments=None, destination=None,
                  connection=None, reply=False, timeout=1, limit=None,
                  callback=None, channel=None, **extra_kwargs):
        """Broadcast a control command to the celery workers.

        :param command: Name of command to send.
        :param arguments: Keyword arguments for the command.
        :keyword destination: If set, a list of the hosts to send the
            command to, when empty broadcast to all workers.
        :keyword connection: Custom broker connection to use, if not set,
            a connection will be established automatically.
        :keyword reply: Wait for and return the reply.
        :keyword timeout: Timeout in seconds to wait for the reply.
        :keyword limit: Limit number of replies.
        :keyword callback: Callback called immediately for each reply
            received.

        """
        with self.app.connection_or_acquire(connection) as conn:
            arguments = dict(arguments or {}, **extra_kwargs)
            return self.mailbox(conn)._broadcast(
                command, arguments, destination, reply, timeout,
                limit, callback, channel=channel,
            )