Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ bin / multi.py

# -*- coding: utf-8 -*-
"""

.. program:: celery multi

Examples
========

.. code-block:: bash

    # Single worker with explicit name and events enabled.
    $ celery multi start Leslie -E

    # Pidfiles and logfiles are stored in the current directory
    # by default.  Use --pidfile and --logfile argument to change
    # this.  The abbreviation %N will be expanded to the current
    # node name.
    $ celery multi start Leslie -E --pidfile=/var/run/celery/%N.pid
                                    --logfile=/var/log/celery/%N.log


    # You need to add the same arguments when you restart,
    # as these are not persisted anywhere.
    $ celery multi restart Leslie -E --pidfile=/var/run/celery/%N.pid
                                     --logfile=/var/run/celery/%N.log

    # To stop the node, you need to specify the same pidfile.
    $ celery multi stop Leslie --pidfile=/var/run/celery/%N.pid

    # 3 workers, with 3 processes each
    $ celery multi start 3 -c 3
    celery worker -n celery1@myhost -c 3
    celery worker -n celery2@myhost -c 3
    celery worker -n celery3@myhost -c 3

    # start 3 named workers
    $ celery multi start image video data -c 3
    celery worker -n image@myhost -c 3
    celery worker -n video@myhost -c 3
    celery worker -n data@myhost -c 3

    # specify custom hostname
    $ celery multi start 2 --hostname=worker.example.com -c 3
    celery worker -n celery1@worker.example.com -c 3
    celery worker -n celery2@worker.example.com -c 3

    # specify fully qualified nodenames
    $ celery multi start foo@worker.example.com bar@worker.example.com -c 3

    # Advanced example starting 10 workers in the background:
    #   * Three of the workers processes the images and video queue
    #   * Two of the workers processes the data queue with loglevel DEBUG
    #   * the rest processes the default' queue.
    $ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
        -Q default -L:4,5 DEBUG

    # You can show the commands necessary to start the workers with
    # the 'show' command:
    $ celery multi show 10 -l INFO -Q:1-3 images,video -Q:4,5 data
        -Q default -L:4,5 DEBUG

    # Additional options are added to each celery worker' comamnd,
    # but you can also modify the options for ranges of, or specific workers

    # 3 workers: Two with 3 processes, and one with 10 processes.
    $ celery multi start 3 -c 3 -c:1 10
    celery worker -n celery1@myhost -c 10
    celery worker -n celery2@myhost -c 3
    celery worker -n celery3@myhost -c 3

    # can also specify options for named workers
    $ celery multi start image video data -c 3 -c:image 10
    celery worker -n image@myhost -c 10
    celery worker -n video@myhost -c 3
    celery worker -n data@myhost -c 3

    # ranges and lists of workers in options is also allowed:
    # (-c:1-3 can also be written as -c:1,2,3)
    $ celery multi start 5 -c 3  -c:1-3 10
    celery worker -n celery1@myhost -c 10
    celery worker -n celery2@myhost -c 10
    celery worker -n celery3@myhost -c 10
    celery worker -n celery4@myhost -c 3
    celery worker -n celery5@myhost -c 3

    # lists also works with named workers
    $ celery multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
    celery worker -n foo@myhost -c 10
    celery worker -n bar@myhost -c 10
    celery worker -n baz@myhost -c 10
    celery worker -n xuzzy@myhost -c 3

"""
from __future__ import absolute_import, print_function, unicode_literals

import errno
import os
import shlex
import signal
import socket
import sys

from collections import defaultdict, namedtuple
from subprocess import Popen
from time import sleep

from kombu.utils import cached_property
from kombu.utils.compat import OrderedDict
from kombu.utils.encoding import from_utf8

from celery import VERSION_BANNER
from celery.five import items
from celery.platforms import Pidfile, IS_WINDOWS
from celery.utils import term, nodesplit
from celery.utils.text import pluralize

__all__ = ['MultiTool']

SIGNAMES = set(sig for sig in dir(signal)
               if sig.startswith('SIG') and '_' not in sig)
SIGMAP = dict((getattr(signal, name), name) for name in SIGNAMES)

USAGE = """\
usage: {prog_name} start <node1 node2 nodeN|range> [worker options]
       {prog_name} stop <n1 n2 nN|range> [-SIG (default: -TERM)]
       {prog_name} stopwait <n1 n2 nN|range> [-SIG (default: -TERM)]
       {prog_name} restart <n1 n2 nN|range> [-SIG] [worker options]
       {prog_name} kill <n1 n2 nN|range>

       {prog_name} show <n1 n2 nN|range> [worker options]
       {prog_name} get hostname <n1 n2 nN|range> [-qv] [worker options]
       {prog_name} names <n1 n2 nN|range>
       {prog_name} expand template <n1 n2 nN|range>
       {prog_name} help

additional options (must appear after command name):

    * --nosplash:   Don't display program info.
    * --quiet:      Don't show as much output.
    * --verbose:    Show more output.
    * --no-color:   Don't display colors.
"""

multi_args_t = namedtuple(
    'multi_args_t', ('name', 'argv', 'expander', 'namespace'),
)


def main():
    sys.exit(MultiTool().execute_from_commandline(sys.argv))


CELERY_EXE = 'celery'
if sys.version_info < (2, 7):
    # pkg.__main__ first supported in Py2.7
    CELERY_EXE = 'celery.__main__'


def celery_exe(*args):
    return ' '.join((CELERY_EXE, ) + args)


class MultiTool(object):
    retcode = 0  # Final exit code.

    def __init__(self, env=None, fh=None, quiet=False, verbose=False,
                 no_color=False, nosplash=False, stdout=None, stderr=None):
        """fh is an old alias to stdout."""
        self.stdout = self.fh = stdout or fh or sys.stdout
        self.stderr = stderr or sys.stderr
        self.env = env
        self.nosplash = nosplash
        self.quiet = quiet
        self.verbose = verbose
        self.no_color = no_color
        self.prog_name = 'celery multi'
        self.commands = {'start': self.start,
                         'show': self.show,
                         'stop': self.stop,
                         'stopwait': self.stopwait,
                         'stop_verify': self.stopwait,  # compat alias
                         'restart': self.restart,
                         'kill': self.kill,
                         'names': self.names,
                         'expand': self.expand,
                         'get': self.get,
                         'help': self.help}

    def execute_from_commandline(self, argv, cmd='celery worker'):
        argv = list(argv)   # don't modify callers argv.

        # Reserve the --nosplash|--quiet|-q/--verbose options.
        if '--nosplash' in argv:
            self.nosplash = argv.pop(argv.index('--nosplash'))
        if '--quiet' in argv:
            self.quiet = argv.pop(argv.index('--quiet'))
        if '-q' in argv:
            self.quiet = argv.pop(argv.index('-q'))
        if '--verbose' in argv:
            self.verbose = argv.pop(argv.index('--verbose'))
        if '--no-color' in argv:
            self.no_color = argv.pop(argv.index('--no-color'))

        self.prog_name = os.path.basename(argv.pop(0))
        if not argv or argv[0][0] == '-':
            return self.error()

        try:
            self.commands[argv[0]](argv[1:], cmd)
        except KeyError:
            self.error('Invalid command: {0}'.format(argv[0]))

        return self.retcode

    def say(self, m, newline=True, file=None):
        print(m, file=file or self.stdout, end='\n' if newline else '')

    def carp(self, m, newline=True, file=None):
        return self.say(m, newline, file or self.stderr)

    def names(self, argv, cmd):
        p = NamespacedOptionParser(argv)
        self.say('\n'.join(
            n.name for n in multi_args(p, cmd)),
        )

    def get(self, argv, cmd):
        wanted = argv[0]
        p = NamespacedOptionParser(argv[1:])
        for node in multi_args(p, cmd):
            if node.name == wanted:
                self.say(' '.join(node.argv))
                return

    def show(self, argv, cmd):
        p = NamespacedOptionParser(argv)
        self.with_detacher_default_options(p)
        self.say('\n'.join(
            ' '.join([sys.executable] + n.argv) for n in multi_args(p, cmd)),
        )

    def start(self, argv, cmd):
        self.splash()
        p = NamespacedOptionParser(argv)
        self.with_detacher_default_options(p)
        retcodes = []
        self.note('> Starting nodes...')
        for node in multi_args(p, cmd):
            self.note('\t> {0}: '.format(node.name), newline=False)
            retcode = self.waitexec(node.argv, path=p.options['--executable'])
            self.note(retcode and self.FAILED or self.OK)
            retcodes.append(retcode)
        self.retcode = int(any(retcodes))

    def with_detacher_default_options(self, p):
        _setdefaultopt(p.options, ['--pidfile', '-p'], '%N.pid')
        _setdefaultopt(p.options, ['--logfile', '-f'], '%N.log')
        p.options.setdefault(
            '--cmd',
            '-m {0}'.format(celery_exe('worker', '--detach')),
        )
        _setdefaultopt(p.options, ['--executable'], sys.executable)

    def signal_node(self, nodename, pid, sig):
        try:
            os.kill(pid, sig)
        except OSError as exc:
            if exc.errno != errno.ESRCH:
                raise
            self.note('Could not signal {0} ({1}): No such process'.format(
                nodename, pid))
            return False
        return True

    def node_alive(self, pid):
        try:
            os.kill(pid, 0)
        except OSError as exc:
            if exc.errno == errno.ESRCH:
                return False
            raise
        return True

    def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None,
                       callback=None):
        if not nodes:
            return
        P = set(nodes)

        def on_down(node):
            P.discard(node)
            if callback:
                callback(*node)

        self.note(self.colored.blue('> Stopping nodes...'))
        for node in list(P):
            if node in P:
                nodename, _, pid = node
                self.note('\t> {0}: {1} -> {2}'.format(
                    nodename, SIGMAP[sig][3:], pid))
                if not self.signal_node(nodename, pid, sig):
                    on_down(node)

        def note_waiting():
            left = len(P)
            if left:
                pids = ', '.join(str(pid) for _, _, pid in P)
                self.note(self.colored.blue(
                    '> Waiting for {0} {1} -> {2}...'.format(
                        left, pluralize(left, 'node'), pids)), newline=False)

        if retry:
            note_waiting()
            its = 0
            while P:
                for node in P:
                    its += 1
                    self.note('.', newline=False)
                    nodename, _, pid = node
                    if not self.node_alive(pid):
                        self.note('\n\t> {0}: {1}'.format(nodename, self.OK))
                        on_down(node)
                        note_waiting()
                        break
                if P and not its % len(P):
                    sleep(float(retry))
            self.note('')

    def getpids(self, p, cmd, callback=None):
        _setdefaultopt(p.options, ['--pidfile', '-p'], '%N.pid')

        nodes = []
        for node in multi_args(p, cmd):
            try:
                pidfile_template = _getopt(
                    p.namespaces[node.namespace], ['--pidfile', '-p'],
                )
            except KeyError:
                pidfile_template = _getopt(p.options, ['--pidfile', '-p'])
            pid = None
            pidfile = node.expander(pidfile_template)
            try:
                pid = Pidfile(pidfile).read_pid()
            except ValueError:
                pass
            if pid:
                nodes.append((node.name, tuple(node.argv), pid))
            else:
                self.note('> {0.name}: {1}'.format(node, self.DOWN))
                if callback:
                    callback(node.name, node.argv, pid)

        return nodes

    def kill(self, argv, cmd):
        self.splash()
        p = NamespacedOptionParser(argv)
        for nodename, _, pid in self.getpids(p, cmd):
            self.note('Killing node {0} ({1})'.format(nodename, pid))
            self.signal_node(nodename, pid, signal.SIGKILL)

    def stop(self, argv, cmd, retry=None, callback=None):
        self.splash()
        p = NamespacedOptionParser(argv)
        return self._stop_nodes(p, cmd, retry=retry, callback=callback)

    def _stop_nodes(self, p, cmd, retry=None, callback=None):
        restargs = p.args[len(p.values):]
        self.shutdown_nodes(self.getpids(p, cmd, callback=callback),
                            sig=findsig(restargs),
                            retry=retry,
                            callback=callback)

    def restart(self, argv, cmd):
        self.splash()
        p = NamespacedOptionParser(argv)
        self.with_detacher_default_options(p)
        retvals = []

        def on_node_shutdown(nodename, argv, pid):
            self.note(self.colored.blue(
                '> Restarting node {0}: '.format(nodename)), newline=False)
            retval = self.waitexec(argv, path=p.options['--executable'])
            self.note(retval and self.FAILED or self.OK)
            retvals.append(retval)

        self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown)
        self.retval = int(any(retvals))

    def stopwait(self, argv, cmd):
        self.splash()
        p = NamespacedOptionParser(argv)
        self.with_detacher_default_options(p)
        return self._stop_nodes(p, cmd, retry=2)
    stop_verify = stopwait  # compat

    def expand(self, argv, cmd=None):
        template = argv[0]
        p = NamespacedOptionParser(argv[1:])
        for node in multi_args(p, cmd):
            self.say(node.expander(template))

    def help(self, argv, cmd=None):
        self.say(__doc__)

    def usage(self):
        self.splash()
        self.say(USAGE.format(prog_name=self.prog_name))

    def splash(self):
        if not self.nosplash:
            c = self.colored
            self.note(c.cyan('celery multi v{0}'.format(VERSION_BANNER)))

    def waitexec(self, argv, path=sys.executable):
        args = ' '.join([path] + list(argv))
        argstr = shlex.split(from_utf8(args), posix=not IS_WINDOWS)
        pipe = Popen(argstr, env=self.env)
        self.info('  {0}'.format(' '.join(argstr)))
        retcode = pipe.wait()
        if retcode < 0:
            self.note('* Child was terminated by signal {0}'.format(-retcode))
            return -retcode
        elif retcode > 0:
            self.note('* Child terminated with errorcode {0}'.format(retcode))
        return retcode

    def error(self, msg=None):
        if msg:
            self.carp(msg)
        self.usage()
        self.retcode = 1
        return 1

    def info(self, msg, newline=True):
        if self.verbose:
            self.note(msg, newline=newline)

    def note(self, msg, newline=True):
        if not self.quiet:
            self.say(str(msg), newline=newline)

    @cached_property
    def colored(self):
        return term.colored(enabled=not self.no_color)

    @cached_property
    def OK(self):
        return str(self.colored.green('OK'))

    @cached_property
    def FAILED(self):
        return str(self.colored.red('FAILED'))

    @cached_property
    def DOWN(self):
        return str(self.colored.magenta('DOWN'))


def multi_args(p, cmd='celery worker', append='', prefix='', suffix=''):
    names = p.values
    options = dict(p.options)
    passthrough = p.passthrough
    ranges = len(names) == 1
    if ranges:
        try:
            noderange = int(names[0])
        except ValueError:
            pass
        else:
            names = [str(n) for n in range(1, noderange + 1)]
            prefix = 'celery'
    cmd = options.pop('--cmd', cmd)
    append = options.pop('--append', append)
    hostname = options.pop('--hostname',
                           options.pop('-n', socket.gethostname()))
    prefix = options.pop('--prefix', prefix) or ''
    suffix = options.pop('--suffix', suffix) or hostname
    if suffix in ('""', "''"):
        suffix = ''

    for ns_name, ns_opts in list(items(p.namespaces)):
        if ',' in ns_name or (ranges and '-' in ns_name):
            for subns in parse_ns_range(ns_name, ranges):
                p.namespaces[subns].update(ns_opts)
            p.namespaces.pop(ns_name)

    # Numbers in args always refers to the index in the list of names.
    # (e.g. `start foo bar baz -c:1` where 1 is foo, 2 is bar, and so on).
    for ns_name, ns_opts in list(items(p.namespaces)):
        if ns_name.isdigit():
            ns_index = int(ns_name) - 1
            if ns_index < 0:
                raise KeyError('Indexes start at 1 got: %r' % (ns_name, ))
            try:
                p.namespaces[names[ns_index]].update(ns_opts)
            except IndexError:
                raise KeyError('No node at index %r' % (ns_name, ))

    for name in names:
        this_suffix = suffix
        if '@' in name:
            this_name = options['-n'] = name
            nodename, this_suffix = nodesplit(name)
            name = nodename
        else:
            nodename = '%s%s' % (prefix, name)
            this_name = options['-n'] = '%s@%s' % (nodename, this_suffix)
        expand = abbreviations({'%h': this_name,
                                '%n': name,
                                '%N': nodename,
                                '%d': this_suffix})
        argv = ([expand(cmd)] +
                [format_opt(opt, expand(value))
                 for opt, value in items(p.optmerge(name, options))] +
                [passthrough])
        if append:
            argv.append(expand(append))
        yield multi_args_t(this_name, argv, expand, name)


class NamespacedOptionParser(object):

    def __init__(self, args):
        self.args = args
        self.options = OrderedDict()
        self.values = []
        self.passthrough = ''
        self.namespaces = defaultdict(lambda: OrderedDict())

        self.parse()

    def parse(self):
        rargs = list(self.args)
        pos = 0
        while pos < len(rargs):
            arg = rargs[pos]
            if arg == '--':
                self.passthrough = ' '.join(rargs[pos:])
                break
            elif arg[0] == '-':
                if arg[1] == '-':
                    self.process_long_opt(arg[2:])
                else:
                    value = None
                    if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-':
                        value = rargs[pos + 1]
                        pos += 1
                    self.process_short_opt(arg[1:], value)
            else:
                self.values.append(arg)
            pos += 1

    def process_long_opt(self, arg, value=None):
        if '=' in arg:
            arg, value = arg.split('=', 1)
        self.add_option(arg, value, short=False)

    def process_short_opt(self, arg, value=None):
        self.add_option(arg, value, short=True)

    def optmerge(self, ns, defaults=None):
        if defaults is None:
            defaults = self.options
        return OrderedDict(defaults, **self.namespaces[ns])

    def add_option(self, name, value, short=False, ns=None):
        prefix = short and '-' or '--'
        dest = self.options
        if ':' in name:
            name, ns = name.split(':')
            dest = self.namespaces[ns]
        dest[prefix + name] = value


def quote(v):
    return "\\'".join("'" + p + "'" for p in v.split("'"))


def format_opt(opt, value):
    if not value:
        return opt
    if opt.startswith('--'):
        return '{0}={1}'.format(opt, value)
    return '{0} {1}'.format(opt, value)


def parse_ns_range(ns, ranges=False):
    ret = []
    for space in ',' in ns and ns.split(',') or [ns]:
        if ranges and '-' in space:
            start, stop = space.split('-')
            ret.extend(
                str(n) for n in range(int(start), int(stop) + 1)
            )
        else:
            ret.append(space)
    return ret


def abbreviations(mapping):

    def expand(S):
        ret = S
        if S is not None:
            for short_opt, long_opt in items(mapping):
                ret = ret.replace(short_opt, long_opt)
        return ret

    return expand


def findsig(args, default=signal.SIGTERM):
    for arg in reversed(args):
        if len(arg) == 2 and arg[0] == '-':
            try:
                return int(arg[1])
            except ValueError:
                pass
        if arg[0] == '-':
            maybe_sig = 'SIG' + arg[1:]
            if maybe_sig in SIGNAMES:
                return getattr(signal, maybe_sig)
    return default


def _getopt(d, alt):
    for opt in alt:
        try:
            return d[opt]
        except KeyError:
            pass
    raise KeyError(alt[0])


def _setdefaultopt(d, alt, value):
    for opt in alt[1:]:
        try:
            return d[opt]
        except KeyError:
            pass
    return d.setdefault(alt[0], value)


if __name__ == '__main__':              # pragma: no cover
    main()