Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ bin / amqp.py

# -*- coding: utf-8 -*-
"""
The :program:`celery amqp` command.

.. program:: celery amqp

"""
from __future__ import absolute_import, print_function, unicode_literals

import cmd
import sys
import shlex
import pprint

from functools import partial
from itertools import count

from kombu.utils.encoding import safe_str

from celery.utils.functional import padlist

from celery.bin.base import Command
from celery.five import string_t
from celery.utils import strtobool

__all__ = ['AMQPAdmin', 'AMQShell', 'Spec', 'amqp']

# Map to coerce strings to other types.
COERCE = {bool: strtobool}

HELP_HEADER = """
Commands
--------
""".rstrip()

EXAMPLE_TEXT = """
Example:
    -> queue.delete myqueue yes no
"""

say = partial(print, file=sys.stderr)


class Spec(object):
    """AMQP Command specification.

    Used to convert arguments to Python values and display various help
    and tooltips.

    :param args: see :attr:`args`.
    :keyword returns: see :attr:`returns`.

    .. attribute args::

        List of arguments this command takes. Should
        contain `(argument_name, argument_type)` tuples.

    .. attribute returns:

        Helpful human string representation of what this command returns.
        May be :const:`None`, to signify the return type is unknown.

    """
    def __init__(self, *args, **kwargs):
        self.args = args
        self.returns = kwargs.get('returns')

    def coerce(self, index, value):
        """Coerce value for argument at index."""
        arg_info = self.args[index]
        arg_type = arg_info[1]
        # Might be a custom way to coerce the string value,
        # so look in the coercion map.
        return COERCE.get(arg_type, arg_type)(value)

    def str_args_to_python(self, arglist):
        """Process list of string arguments to values according to spec.

        e.g:

            >>> spec = Spec([('queue', str), ('if_unused', bool)])
            >>> spec.str_args_to_python('pobox', 'true')
            ('pobox', True)

        """
        return tuple(
            self.coerce(index, value) for index, value in enumerate(arglist))

    def format_response(self, response):
        """Format the return value of this command in a human-friendly way."""
        if not self.returns:
            return 'ok.' if response is None else response
        if callable(self.returns):
            return self.returns(response)
        return self.returns.format(response)

    def format_arg(self, name, type, default_value=None):
        if default_value is not None:
            return '{0}:{1}'.format(name, default_value)
        return name

    def format_signature(self):
        return ' '.join(self.format_arg(*padlist(list(arg), 3))
                        for arg in self.args)


def dump_message(message):
    if message is None:
        return 'No messages in queue. basic.publish something.'
    return {'body': message.body,
            'properties': message.properties,
            'delivery_info': message.delivery_info}


def format_declare_queue(ret):
    return 'ok. queue:{0} messages:{1} consumers:{2}.'.format(*ret)


class AMQShell(cmd.Cmd):
    """AMQP API Shell.

    :keyword connect: Function used to connect to the server, must return
        connection object.

    :keyword silent: If :const:`True`, the commands won't have annoying
                     output not relevant when running in non-shell mode.


    .. attribute: builtins

        Mapping of built-in command names -> method names

    .. attribute:: amqp

        Mapping of AMQP API commands and their :class:`Spec`.

    """
    conn = None
    chan = None
    prompt_fmt = '{self.counter}> '
    identchars = cmd.IDENTCHARS = '.'
    needs_reconnect = False
    counter = 1
    inc_counter = count(2)

    builtins = {'EOF': 'do_exit',
                'exit': 'do_exit',
                'help': 'do_help'}

    amqp = {
        'exchange.declare': Spec(('exchange', str),
                                 ('type', str),
                                 ('passive', bool, 'no'),
                                 ('durable', bool, 'no'),
                                 ('auto_delete', bool, 'no'),
                                 ('internal', bool, 'no')),
        'exchange.delete': Spec(('exchange', str),
                                ('if_unused', bool)),
        'queue.bind': Spec(('queue', str),
                           ('exchange', str),
                           ('routing_key', str)),
        'queue.declare': Spec(('queue', str),
                              ('passive', bool, 'no'),
                              ('durable', bool, 'no'),
                              ('exclusive', bool, 'no'),
                              ('auto_delete', bool, 'no'),
                              returns=format_declare_queue),
        'queue.delete': Spec(('queue', str),
                             ('if_unused', bool, 'no'),
                             ('if_empty', bool, 'no'),
                             returns='ok. {0} messages deleted.'),
        'queue.purge': Spec(('queue', str),
                            returns='ok. {0} messages deleted.'),
        'basic.get': Spec(('queue', str),
                          ('no_ack', bool, 'off'),
                          returns=dump_message),
        'basic.publish': Spec(('msg', str),
                              ('exchange', str),
                              ('routing_key', str),
                              ('mandatory', bool, 'no'),
                              ('immediate', bool, 'no')),
        'basic.ack': Spec(('delivery_tag', int)),
    }

    def _prepare_spec(self, conn):
        # XXX Hack to fix Issue #2013
        from amqp import Connection, Message
        if isinstance(conn.connection, Connection):
            self.amqp['basic.publish'] = Spec(('msg', Message),
                                              ('exchange', str),
                                              ('routing_key', str),
                                              ('mandatory', bool, 'no'),
                                              ('immediate', bool, 'no'))

    def __init__(self, *args, **kwargs):
        self.connect = kwargs.pop('connect')
        self.silent = kwargs.pop('silent', False)
        self.out = kwargs.pop('out', sys.stderr)
        cmd.Cmd.__init__(self, *args, **kwargs)
        self._reconnect()

    def note(self, m):
        """Say something to the user. Disabled if :attr:`silent`."""
        if not self.silent:
            say(m, file=self.out)

    def say(self, m):
        say(m, file=self.out)

    def get_amqp_api_command(self, cmd, arglist):
        """With a command name and a list of arguments, convert the arguments
        to Python values and find the corresponding method on the AMQP channel
        object.

        :returns: tuple of `(method, processed_args)`.

        """
        spec = self.amqp[cmd]
        args = spec.str_args_to_python(arglist)
        attr_name = cmd.replace('.', '_')
        if self.needs_reconnect:
            self._reconnect()
        return getattr(self.chan, attr_name), args, spec.format_response

    def do_exit(self, *args):
        """The `'exit'` command."""
        self.note("\n-> please, don't leave!")
        sys.exit(0)

    def display_command_help(self, cmd, short=False):
        spec = self.amqp[cmd]
        self.say('{0} {1}'.format(cmd, spec.format_signature()))

    def do_help(self, *args):
        if not args:
            self.say(HELP_HEADER)
            for cmd_name in self.amqp:
                self.display_command_help(cmd_name, short=True)
            self.say(EXAMPLE_TEXT)
        else:
            self.display_command_help(args[0])

    def default(self, line):
        self.say("unknown syntax: {0!r}. how about some 'help'?".format(line))

    def get_names(self):
        return set(self.builtins) | set(self.amqp)

    def completenames(self, text, *ignored):
        """Return all commands starting with `text`, for tab-completion."""
        names = self.get_names()
        first = [cmd for cmd in names
                 if cmd.startswith(text.replace('_', '.'))]
        if first:
            return first
        return [cmd for cmd in names
                if cmd.partition('.')[2].startswith(text)]

    def dispatch(self, cmd, argline):
        """Dispatch and execute the command.

        Lookup order is: :attr:`builtins` -> :attr:`amqp`.

        """
        arglist = shlex.split(safe_str(argline))
        if cmd in self.builtins:
            return getattr(self, self.builtins[cmd])(*arglist)
        fun, args, formatter = self.get_amqp_api_command(cmd, arglist)
        return formatter(fun(*args))

    def parseline(self, line):
        """Parse input line.

        :returns: tuple of three items:
            `(command_name, arglist, original_line)`

        """
        parts = line.split()
        if parts:
            return parts[0], ' '.join(parts[1:]), line
        return '', '', line

    def onecmd(self, line):
        """Parse line and execute command."""
        cmd, arg, line = self.parseline(line)
        if not line:
            return self.emptyline()
        self.lastcmd = line
        self.counter = next(self.inc_counter)
        try:
            self.respond(self.dispatch(cmd, arg))
        except (AttributeError, KeyError) as exc:
            self.default(line)
        except Exception as exc:
            self.say(exc)
            self.needs_reconnect = True

    def respond(self, retval):
        """What to do with the return value of a command."""
        if retval is not None:
            if isinstance(retval, string_t):
                self.say(retval)
            else:
                self.say(pprint.pformat(retval))

    def _reconnect(self):
        """Re-establish connection to the AMQP server."""
        self.conn = self.connect(self.conn)
        self._prepare_spec(self.conn)
        self.chan = self.conn.default_channel
        self.needs_reconnect = False

    @property
    def prompt(self):
        return self.prompt_fmt.format(self=self)


class AMQPAdmin(object):
    """The celery :program:`celery amqp` utility."""
    Shell = AMQShell

    def __init__(self, *args, **kwargs):
        self.app = kwargs['app']
        self.out = kwargs.setdefault('out', sys.stderr)
        self.silent = kwargs.get('silent')
        self.args = args

    def connect(self, conn=None):
        if conn:
            conn.close()
        conn = self.app.connection()
        self.note('-> connecting to {0}.'.format(conn.as_uri()))
        conn.connect()
        self.note('-> connected.')
        return conn

    def run(self):
        shell = self.Shell(connect=self.connect, out=self.out)
        if self.args:
            return shell.onecmd(' '.join(self.args))
        try:
            return shell.cmdloop()
        except KeyboardInterrupt:
            self.note('(bibi)')
            pass

    def note(self, m):
        if not self.silent:
            say(m, file=self.out)


class amqp(Command):
    """AMQP Administration Shell.

    Also works for non-amqp transports (but not ones that
    store declarations in memory).

    Examples::

        celery amqp
            start shell mode
        celery amqp help
            show list of commands

        celery amqp exchange.delete name
        celery amqp queue.delete queue
        celery amqp queue.delete queue yes yes

    """

    def run(self, *args, **options):
        options['app'] = self.app
        return AMQPAdmin(*args, **options).run()


def main():
    amqp().execute_from_commandline()

if __name__ == '__main__':  # pragma: no cover
    main()