Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ worker / control.py

# -*- coding: utf-8 -*-
"""
    celery.worker.control
    ~~~~~~~~~~~~~~~~~~~~~

    Remote control commands.

"""
from __future__ import absolute_import

import io
import tempfile

from kombu.utils.encoding import safe_repr

from celery.exceptions import WorkerShutdown
from celery.five import UserDict, items, string_t
from celery.platforms import signals as _signals
from celery.utils import timeutils
from celery.utils.functional import maybe_list
from celery.utils.log import get_logger
from celery.utils import jsonify

from . import state as worker_state
from .state import revoked
from .job import Request

__all__ = ['Panel']
DEFAULT_TASK_INFO_ITEMS = ('exchange', 'routing_key', 'rate_limit')
logger = get_logger(__name__)


class Panel(UserDict):
    data = dict()  # Global registry.

    @classmethod
    def register(cls, method, name=None):
        cls.data[name or method.__name__] = method
        return method


def _find_requests_by_id(ids, requests):
    found, total = 0, len(ids)
    for request in requests:
        if request.id in ids:
            yield request
            found += 1
            if found >= total:
                break


@Panel.register
def query_task(state, ids, **kwargs):
    ids = maybe_list(ids)

    def reqinfo(state, req):
        return state, req.info()

    reqs = dict((req.id, ('reserved', req.info()))
                for req in _find_requests_by_id(
                    ids, worker_state.reserved_requests))
    reqs.update(dict(
        (req.id, ('active', req.info()))
        for req in _find_requests_by_id(
            ids, worker_state.active_requests,
        )
    ))

    return reqs


@Panel.register
def revoke(state, task_id, terminate=False, signal=None, **kwargs):
    """Revoke task by task id."""
    # supports list argument since 3.1
    task_ids, task_id = set(maybe_list(task_id) or []), None
    size = len(task_ids)
    terminated = set()

    revoked.update(task_ids)
    if terminate:
        signum = _signals.signum(signal or 'TERM')
        # reserved_requests changes size during iteration
        # so need to consume the items first, then terminate after.
        requests = set(_find_requests_by_id(
            task_ids,
            worker_state.reserved_requests,
        ))
        for request in requests:
            if request.id not in terminated:
                terminated.add(request.id)
                logger.info('Terminating %s (%s)', request.id, signum)
                request.terminate(state.consumer.pool, signal=signum)
                if len(terminated) >= size:
                    break

        if not terminated:
            return {'ok': 'terminate: tasks unknown'}
        return {'ok': 'terminate: {0}'.format(', '.join(terminated))}

    idstr = ', '.join(task_ids)
    logger.info('Tasks flagged as revoked: %s', idstr)
    return {'ok': 'tasks {0} flagged as revoked'.format(idstr)}


@Panel.register
def report(state):
    return {'ok': state.app.bugreport()}


@Panel.register
def enable_events(state):
    dispatcher = state.consumer.event_dispatcher
    if dispatcher.groups and 'task' not in dispatcher.groups:
        dispatcher.groups.add('task')
        logger.info('Events of group {task} enabled by remote.')
        return {'ok': 'task events enabled'}
    return {'ok': 'task events already enabled'}


@Panel.register
def disable_events(state):
    dispatcher = state.consumer.event_dispatcher
    if 'task' in dispatcher.groups:
        dispatcher.groups.discard('task')
        logger.info('Events of group {task} disabled by remote.')
        return {'ok': 'task events disabled'}
    return {'ok': 'task events already disabled'}


@Panel.register
def heartbeat(state):
    logger.debug('Heartbeat requested by remote.')
    dispatcher = state.consumer.event_dispatcher
    dispatcher.send('worker-heartbeat', freq=5, **worker_state.SOFTWARE_INFO)


@Panel.register
def rate_limit(state, task_name, rate_limit, **kwargs):
    """Set new rate limit for a task type.

    See :attr:`celery.task.base.Task.rate_limit`.

    :param task_name: Type of task.
    :param rate_limit: New rate limit.

    """

    try:
        timeutils.rate(rate_limit)
    except ValueError as exc:
        return {'error': 'Invalid rate limit string: {0!r}'.format(exc)}

    try:
        state.app.tasks[task_name].rate_limit = rate_limit
    except KeyError:
        logger.error('Rate limit attempt for unknown task %s',
                     task_name, exc_info=True)
        return {'error': 'unknown task'}

    state.consumer.reset_rate_limits()

    if not rate_limit:
        logger.info('Rate limits disabled for tasks of type %s', task_name)
        return {'ok': 'rate limit disabled successfully'}

    logger.info('New rate limit for tasks of type %s: %s.',
                task_name, rate_limit)
    return {'ok': 'new rate limit set successfully'}


@Panel.register
def time_limit(state, task_name=None, hard=None, soft=None, **kwargs):
    try:
        task = state.app.tasks[task_name]
    except KeyError:
        logger.error('Change time limit attempt for unknown task %s',
                     task_name, exc_info=True)
        return {'error': 'unknown task'}

    task.soft_time_limit = soft
    task.time_limit = hard

    logger.info('New time limits for tasks of type %s: soft=%s hard=%s',
                task_name, soft, hard)
    return {'ok': 'time limits set successfully'}


@Panel.register
def dump_schedule(state, safe=False, **kwargs):

    def prepare_entries():
        for waiting in state.consumer.timer.schedule.queue:
            try:
                arg0 = waiting.entry.args[0]
            except (IndexError, TypeError):
                continue
            else:
                if isinstance(arg0, Request):
                    yield {'eta': arg0.eta.isoformat() if arg0.eta else None,
                           'priority': waiting.priority,
                           'request': arg0.info(safe=safe)}
    return list(prepare_entries())


@Panel.register
def dump_reserved(state, safe=False, **kwargs):
    reserved = worker_state.reserved_requests - worker_state.active_requests
    if not reserved:
        return []
    return [request.info(safe=safe) for request in reserved]


@Panel.register
def dump_active(state, safe=False, **kwargs):
    return [request.info(safe=safe)
            for request in worker_state.active_requests]


@Panel.register
def stats(state, **kwargs):
    return state.consumer.controller.stats()


@Panel.register
def objgraph(state, num=200, max_depth=10, type='Request'):  # pragma: no cover
    try:
        import objgraph
    except ImportError:
        raise ImportError('Requires the objgraph library')
    print('Dumping graph for type %r' % (type, ))
    with tempfile.NamedTemporaryFile(prefix='cobjg',
                                     suffix='.png', delete=False) as fh:
        objects = objgraph.by_type(type)[:num]
        objgraph.show_backrefs(
            objects,
            max_depth=max_depth, highlight=lambda v: v in objects,
            filename=fh.name,
        )
        return {'filename': fh.name}


@Panel.register
def memsample(state, **kwargs):  # pragma: no cover
    from celery.utils.debug import sample_mem
    return sample_mem()


@Panel.register
def memdump(state, samples=10, **kwargs):  # pragma: no cover
    from celery.utils.debug import memdump
    out = io.StringIO()
    memdump(file=out)
    return out.getvalue()


@Panel.register
def clock(state, **kwargs):
    return {'clock': state.app.clock.value}


@Panel.register
def dump_revoked(state, **kwargs):
    return list(worker_state.revoked)


@Panel.register
def hello(state, from_node, revoked=None, **kwargs):
    if from_node != state.hostname:
        logger.info('sync with %s', from_node)
        if revoked:
            worker_state.revoked.update(revoked)
        return {'revoked': worker_state.revoked._data,
                'clock': state.app.clock.forward()}


@Panel.register
def dump_tasks(state, taskinfoitems=None, builtins=False, **kwargs):
    reg = state.app.tasks
    taskinfoitems = taskinfoitems or DEFAULT_TASK_INFO_ITEMS

    tasks = reg if builtins else (
        task for task in reg if not task.startswith('celery.'))

    def _extract_info(task):
        fields = dict((field, str(getattr(task, field, None)))
                      for field in taskinfoitems
                      if getattr(task, field, None) is not None)
        if fields:
            info = ['='.join(f) for f in items(fields)]
            return '{0} [{1}]'.format(task.name, ' '.join(info))
        return task.name

    return [_extract_info(reg[task]) for task in sorted(tasks)]


@Panel.register
def ping(state, **kwargs):
    return {'ok': 'pong'}


@Panel.register
def pool_grow(state, n=1, **kwargs):
    if state.consumer.controller.autoscaler:
        state.consumer.controller.autoscaler.force_scale_up(n)
    else:
        state.consumer.pool.grow(n)
        state.consumer._update_prefetch_count(n)
    return {'ok': 'pool will grow'}


@Panel.register
def pool_shrink(state, n=1, **kwargs):
    if state.consumer.controller.autoscaler:
        state.consumer.controller.autoscaler.force_scale_down(n)
    else:
        state.consumer.pool.shrink(n)
        state.consumer._update_prefetch_count(-n)
    return {'ok': 'pool will shrink'}


@Panel.register
def pool_restart(state, modules=None, reload=False, reloader=None, **kwargs):
    if state.app.conf.CELERYD_POOL_RESTARTS:
        state.consumer.controller.reload(modules, reload, reloader=reloader)
        return {'ok': 'reload started'}
    else:
        raise ValueError('Pool restarts not enabled')


@Panel.register
def autoscale(state, max=None, min=None):
    autoscaler = state.consumer.controller.autoscaler
    if autoscaler:
        max_, min_ = autoscaler.update(max, min)
        return {'ok': 'autoscale now min={0} max={1}'.format(max_, min_)}
    raise ValueError('Autoscale not enabled')


@Panel.register
def shutdown(state, msg='Got shutdown from remote', **kwargs):
    logger.warning(msg)
    raise WorkerShutdown(msg)


@Panel.register
def add_consumer(state, queue, exchange=None, exchange_type=None,
                 routing_key=None, **options):
    state.consumer.add_task_queue(queue, exchange, exchange_type,
                                  routing_key, **options)
    return {'ok': 'add consumer {0}'.format(queue)}


@Panel.register
def cancel_consumer(state, queue=None, **_):
    state.consumer.cancel_task_queue(queue)
    return {'ok': 'no longer consuming from {0}'.format(queue)}


@Panel.register
def active_queues(state):
    """Return information about the queues a worker consumes from."""
    if state.consumer.task_consumer:
        return [dict(queue.as_dict(recurse=True))
                for queue in state.consumer.task_consumer.queues]
    return []


def _wanted_config_key(key):
    return (isinstance(key, string_t) and
            key.isupper() and
            not key.startswith('__'))


@Panel.register
def dump_conf(state, with_defaults=False, **kwargs):
    return jsonify(state.app.conf.table(with_defaults=with_defaults),
                   keyfilter=_wanted_config_key,
                   unknown_type_filter=safe_repr)


@Panel.register
def election(state, id, topic, action=None, **kwargs):
    if state.consumer.gossip:
        state.consumer.gossip.election(id, topic, action)