Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / bin / test_celery.py

from __future__ import absolute_import

import sys

from anyjson import dumps
from datetime import datetime

from celery import __main__
from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
from celery.bin.base import Error
from celery.bin.celery import (
    Command,
    list_,
    call,
    purge,
    result,
    inspect,
    control,
    status,
    migrate,
    help,
    report,
    CeleryCommand,
    determine_exit_status,
    multi,
    main as mainfun,
    _RemoteControl,
    command,
)

from celery.tests.case import (
    AppCase, Mock, WhateverIO, override_stdouts, patch,
)


class test__main__(AppCase):

    def test_warn_deprecated(self):
        with override_stdouts() as (stdout, _):
            __main__._warn_deprecated('YADDA YADDA')
            self.assertIn('command is deprecated', stdout.getvalue())
            self.assertIn('YADDA YADDA', stdout.getvalue())

    def test_main(self):
        with patch('celery.__main__.maybe_patch_concurrency') as mpc:
            with patch('celery.bin.celery.main') as main:
                __main__.main()
                mpc.assert_called_with()
                main.assert_called_with()

    def test_compat_worker(self):
        with patch('celery.__main__.maybe_patch_concurrency') as mpc:
            with patch('celery.__main__._warn_deprecated') as depr:
                with patch('celery.bin.worker.main') as main:
                    __main__._compat_worker()
                    mpc.assert_called_with()
                    depr.assert_called_with('celery worker')
                    main.assert_called_with()

    def test_compat_multi(self):
        with patch('celery.__main__.maybe_patch_concurrency') as mpc:
            with patch('celery.__main__._warn_deprecated') as depr:
                with patch('celery.bin.multi.main') as main:
                    __main__._compat_multi()
                    self.assertFalse(mpc.called)
                    depr.assert_called_with('celery multi')
                    main.assert_called_with()

    def test_compat_beat(self):
        with patch('celery.__main__.maybe_patch_concurrency') as mpc:
            with patch('celery.__main__._warn_deprecated') as depr:
                with patch('celery.bin.beat.main') as main:
                    __main__._compat_beat()
                    mpc.assert_called_with()
                    depr.assert_called_with('celery beat')
                    main.assert_called_with()


class test_Command(AppCase):

    def test_Error_repr(self):
        x = Error('something happened')
        self.assertIsNotNone(x.status)
        self.assertTrue(x.reason)
        self.assertTrue(str(x))

    def setup(self):
        self.out = WhateverIO()
        self.err = WhateverIO()
        self.cmd = Command(self.app, stdout=self.out, stderr=self.err)

    def test_error(self):
        self.cmd.out = Mock()
        self.cmd.error('FOO')
        self.assertTrue(self.cmd.out.called)

    def test_out(self):
        f = Mock()
        self.cmd.out('foo', f)

    def test_call(self):

        def ok_run():
            pass

        self.cmd.run = ok_run
        self.assertEqual(self.cmd(), EX_OK)

        def error_run():
            raise Error('error', EX_FAILURE)
        self.cmd.run = error_run
        self.assertEqual(self.cmd(), EX_FAILURE)

    def test_run_from_argv(self):
        with self.assertRaises(NotImplementedError):
            self.cmd.run_from_argv('prog', ['foo', 'bar'])

    def test_pretty_list(self):
        self.assertEqual(self.cmd.pretty([])[1], '- empty -')
        self.assertIn('bar', self.cmd.pretty(['foo', 'bar'])[1])

    def test_pretty_dict(self):
        self.assertIn(
            'OK',
            str(self.cmd.pretty({'ok': 'the quick brown fox'})[0]),
        )
        self.assertIn(
            'ERROR',
            str(self.cmd.pretty({'error': 'the quick brown fox'})[0]),
        )

    def test_pretty(self):
        self.assertIn('OK', str(self.cmd.pretty('the quick brown')))
        self.assertIn('OK', str(self.cmd.pretty(object())))
        self.assertIn('OK', str(self.cmd.pretty({'foo': 'bar'})))


class test_list(AppCase):

    def test_list_bindings_no_support(self):
        l = list_(app=self.app, stderr=WhateverIO())
        management = Mock()
        management.get_bindings.side_effect = NotImplementedError()
        with self.assertRaises(Error):
            l.list_bindings(management)

    def test_run(self):
        l = list_(app=self.app, stderr=WhateverIO())
        l.run('bindings')

        with self.assertRaises(Error):
            l.run(None)

        with self.assertRaises(Error):
            l.run('foo')


class test_call(AppCase):

    def setup(self):

        @self.app.task(shared=False)
        def add(x, y):
            return x + y
        self.add = add

    @patch('celery.app.base.Celery.send_task')
    def test_run(self, send_task):
        a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
        a.run(self.add.name)
        self.assertTrue(send_task.called)

        a.run(self.add.name,
              args=dumps([4, 4]),
              kwargs=dumps({'x': 2, 'y': 2}))
        self.assertEqual(send_task.call_args[1]['args'], [4, 4])
        self.assertEqual(send_task.call_args[1]['kwargs'], {'x': 2, 'y': 2})

        a.run(self.add.name, expires=10, countdown=10)
        self.assertEqual(send_task.call_args[1]['expires'], 10)
        self.assertEqual(send_task.call_args[1]['countdown'], 10)

        now = datetime.now()
        iso = now.isoformat()
        a.run(self.add.name, expires=iso)
        self.assertEqual(send_task.call_args[1]['expires'], now)
        with self.assertRaises(ValueError):
            a.run(self.add.name, expires='foobaribazibar')


class test_purge(AppCase):

    @patch('celery.app.control.Control.purge')
    def test_run(self, purge_):
        out = WhateverIO()
        a = purge(app=self.app, stdout=out)
        purge_.return_value = 0
        a.run(force=True)
        self.assertIn('No messages purged', out.getvalue())

        purge_.return_value = 100
        a.run(force=True)
        self.assertIn('100 messages', out.getvalue())


class test_result(AppCase):

    def setup(self):

        @self.app.task(shared=False)
        def add(x, y):
            return x + y
        self.add = add

    def test_run(self):
        with patch('celery.result.AsyncResult.get') as get:
            out = WhateverIO()
            r = result(app=self.app, stdout=out)
            get.return_value = 'Jerry'
            r.run('id')
            self.assertIn('Jerry', out.getvalue())

            get.return_value = 'Elaine'
            r.run('id', task=self.add.name)
            self.assertIn('Elaine', out.getvalue())

            with patch('celery.result.AsyncResult.traceback') as tb:
                r.run('id', task=self.add.name, traceback=True)
                self.assertIn(str(tb), out.getvalue())


class test_status(AppCase):

    @patch('celery.bin.celery.inspect')
    def test_run(self, inspect_):
        out, err = WhateverIO(), WhateverIO()
        ins = inspect_.return_value = Mock()
        ins.run.return_value = []
        s = status(self.app, stdout=out, stderr=err)
        with self.assertRaises(Error):
            s.run()

        ins.run.return_value = ['a', 'b', 'c']
        s.run()
        self.assertIn('3 nodes online', out.getvalue())
        s.run(quiet=True)


class test_migrate(AppCase):

    @patch('celery.contrib.migrate.migrate_tasks')
    def test_run(self, migrate_tasks):
        out = WhateverIO()
        m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
        with self.assertRaises(TypeError):
            m.run()
        self.assertFalse(migrate_tasks.called)

        m.run('memory://foo', 'memory://bar')
        self.assertTrue(migrate_tasks.called)

        state = Mock()
        state.count = 10
        state.strtotal = 30
        m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
        self.assertIn('10/30', out.getvalue())


class test_report(AppCase):

    def test_run(self):
        out = WhateverIO()
        r = report(app=self.app, stdout=out)
        self.assertEqual(r.run(), EX_OK)
        self.assertTrue(out.getvalue())


class test_help(AppCase):

    def test_run(self):
        out = WhateverIO()
        h = help(app=self.app, stdout=out)
        h.parser = Mock()
        self.assertEqual(h.run(), EX_USAGE)
        self.assertTrue(out.getvalue())
        self.assertTrue(h.usage('help'))
        h.parser.print_help.assert_called_with()


class test_CeleryCommand(AppCase):

    def test_execute_from_commandline(self):
        x = CeleryCommand(app=self.app)
        x.handle_argv = Mock()
        x.handle_argv.return_value = 1
        with self.assertRaises(SystemExit):
            x.execute_from_commandline()

        x.handle_argv.return_value = True
        with self.assertRaises(SystemExit):
            x.execute_from_commandline()

        x.handle_argv.side_effect = KeyboardInterrupt()
        with self.assertRaises(SystemExit):
            x.execute_from_commandline()

        x.respects_app_option = True
        with self.assertRaises(SystemExit):
            x.execute_from_commandline(['celery', 'multi'])
        self.assertFalse(x.respects_app_option)
        x.respects_app_option = True
        with self.assertRaises(SystemExit):
            x.execute_from_commandline(['manage.py', 'celery', 'multi'])
        self.assertFalse(x.respects_app_option)

    def test_with_pool_option(self):
        x = CeleryCommand(app=self.app)
        self.assertIsNone(x.with_pool_option(['celery', 'events']))
        self.assertTrue(x.with_pool_option(['celery', 'worker']))
        self.assertTrue(x.with_pool_option(['manage.py', 'celery', 'worker']))

    def test_load_extensions_no_commands(self):
        with patch('celery.bin.celery.Extensions') as Ext:
            ext = Ext.return_value = Mock(name='Extension')
            ext.load.return_value = None
            x = CeleryCommand(app=self.app)
            x.load_extension_commands()

    def test_determine_exit_status(self):
        self.assertEqual(determine_exit_status('true'), EX_OK)
        self.assertEqual(determine_exit_status(''), EX_FAILURE)

    def test_relocate_args_from_start(self):
        x = CeleryCommand(app=self.app)
        self.assertEqual(x._relocate_args_from_start(None), [])
        self.assertEqual(
            x._relocate_args_from_start(
                ['-l', 'debug', 'worker', '-c', '3', '--foo'],
            ),
            ['worker', '-c', '3', '--foo', '-l', 'debug'],
        )
        self.assertEqual(
            x._relocate_args_from_start(
                ['--pool=gevent', '-l', 'debug', 'worker', '--foo', '-c', '3'],
            ),
            ['worker', '--foo', '-c', '3', '--pool=gevent', '-l', 'debug'],
        )
        self.assertEqual(
            x._relocate_args_from_start(['foo', '--foo=1']),
            ['foo', '--foo=1'],
        )

    def test_handle_argv(self):
        x = CeleryCommand(app=self.app)
        x.execute = Mock()
        x.handle_argv('celery', [])
        x.execute.assert_called_with('help', ['help'])

        x.handle_argv('celery', ['start', 'foo'])
        x.execute.assert_called_with('start', ['start', 'foo'])

    def test_execute(self):
        x = CeleryCommand(app=self.app)
        Help = x.commands['help'] = Mock()
        help = Help.return_value = Mock()
        x.execute('fooox', ['a'])
        help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
        help.reset()
        x.execute('help', ['help'])
        help.run_from_argv.assert_called_with(x.prog_name, [], command='help')

        Dummy = x.commands['dummy'] = Mock()
        dummy = Dummy.return_value = Mock()
        exc = dummy.run_from_argv.side_effect = Error(
            'foo', status='EX_FAILURE',
        )
        x.on_error = Mock(name='on_error')
        help.reset()
        x.execute('dummy', ['dummy'])
        x.on_error.assert_called_with(exc)
        dummy.run_from_argv.assert_called_with(
            x.prog_name, [], command='dummy',
        )
        help.run_from_argv.assert_called_with(
            x.prog_name, [], command='help',
        )

        exc = dummy.run_from_argv.side_effect = x.UsageError('foo')
        x.on_usage_error = Mock()
        x.execute('dummy', ['dummy'])
        x.on_usage_error.assert_called_with(exc)

    def test_on_usage_error(self):
        x = CeleryCommand(app=self.app)
        x.error = Mock()
        x.on_usage_error(x.UsageError('foo'), command=None)
        self.assertTrue(x.error.called)
        x.on_usage_error(x.UsageError('foo'), command='dummy')

    def test_prepare_prog_name(self):
        x = CeleryCommand(app=self.app)
        main = Mock(name='__main__')
        main.__file__ = '/opt/foo.py'
        with patch.dict(sys.modules, __main__=main):
            self.assertEqual(x.prepare_prog_name('__main__.py'), '/opt/foo.py')
            self.assertEqual(x.prepare_prog_name('celery'), 'celery')


class test_RemoteControl(AppCase):

    def test_call_interface(self):
        with self.assertRaises(NotImplementedError):
            _RemoteControl(app=self.app).call()


class test_inspect(AppCase):

    def test_usage(self):
        self.assertTrue(inspect(app=self.app).usage('foo'))

    def test_command_info(self):
        i = inspect(app=self.app)
        self.assertTrue(i.get_command_info(
            'ping', help=True, color=i.colored.red,
        ))

    def test_list_commands_color(self):
        i = inspect(app=self.app)
        self.assertTrue(i.list_commands(
            help=True, color=i.colored.red,
        ))
        self.assertTrue(i.list_commands(
            help=False, color=None,
        ))

    def test_epilog(self):
        self.assertTrue(inspect(app=self.app).epilog)

    def test_do_call_method_sql_transport_type(self):
        self.app.connection = Mock()
        conn = self.app.connection.return_value = Mock(name='Connection')
        conn.transport.driver_type = 'sql'
        i = inspect(app=self.app)
        with self.assertRaises(i.Error):
            i.do_call_method(['ping'])

    def test_say_directions(self):
        i = inspect(self.app)
        i.out = Mock()
        i.quiet = True
        i.say_chat('<-', 'hello out')
        self.assertFalse(i.out.called)

        i.say_chat('->', 'hello in')
        self.assertTrue(i.out.called)

        i.quiet = False
        i.out.reset_mock()
        i.say_chat('<-', 'hello out', 'body')
        self.assertTrue(i.out.called)

    @patch('celery.app.control.Control.inspect')
    def test_run(self, real):
        out = WhateverIO()
        i = inspect(app=self.app, stdout=out)
        with self.assertRaises(Error):
            i.run()
        with self.assertRaises(Error):
            i.run('help')
        with self.assertRaises(Error):
            i.run('xyzzybaz')

        i.run('ping')
        self.assertTrue(real.called)
        i.run('ping', destination='foo,bar')
        self.assertEqual(real.call_args[1]['destination'], ['foo', 'bar'])
        self.assertEqual(real.call_args[1]['timeout'], 0.2)
        callback = real.call_args[1]['callback']

        callback({'foo': {'ok': 'pong'}})
        self.assertIn('OK', out.getvalue())

        instance = real.return_value = Mock()
        instance.ping.return_value = None
        with self.assertRaises(Error):
            i.run('ping')

        out.seek(0)
        out.truncate()
        i.quiet = True
        i.say_chat('<-', 'hello')
        self.assertFalse(out.getvalue())


class test_control(AppCase):

    def control(self, patch_call, *args, **kwargs):
        kwargs.setdefault('app', Mock(name='app'))
        c = control(*args, **kwargs)
        if patch_call:
            c.call = Mock(name='control.call')
        return c

    def test_call(self):
        i = self.control(False)
        i.call('foo', 1, kw=2)
        i.app.control.foo.assert_called_with(1, kw=2, reply=True)

    def test_pool_grow(self):
        i = self.control(True)
        i.pool_grow('pool_grow', n=2)
        i.call.assert_called_with('pool_grow', 2)

    def test_pool_shrink(self):
        i = self.control(True)
        i.pool_shrink('pool_shrink', n=2)
        i.call.assert_called_with('pool_shrink', 2)

    def test_autoscale(self):
        i = self.control(True)
        i.autoscale('autoscale', max=3, min=2)
        i.call.assert_called_with('autoscale', 3, 2)

    def test_rate_limit(self):
        i = self.control(True)
        i.rate_limit('rate_limit', 'proj.add', '1/s')
        i.call.assert_called_with('rate_limit', 'proj.add', '1/s')

    def test_time_limit(self):
        i = self.control(True)
        i.time_limit('time_limit', 'proj.add', 10, 30)
        i.call.assert_called_with('time_limit', 'proj.add', 10, 30)

    def test_add_consumer(self):
        i = self.control(True)
        i.add_consumer(
            'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
            durable=True,
        )
        i.call.assert_called_with(
            'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
            durable=True,
        )

    def test_cancel_consumer(self):
        i = self.control(True)
        i.cancel_consumer('cancel_consumer', 'queue')
        i.call.assert_called_with('cancel_consumer', 'queue')


class test_multi(AppCase):

    def test_get_options(self):
        self.assertTupleEqual(multi(app=self.app).get_options(), ())

    def test_run_from_argv(self):
        with patch('celery.bin.multi.MultiTool') as MultiTool:
            m = MultiTool.return_value = Mock()
            multi(self.app).run_from_argv('celery', ['arg'], command='multi')
            m.execute_from_commandline.assert_called_with(
                ['multi', 'arg'], 'celery',
            )


class test_main(AppCase):

    @patch('celery.bin.celery.CeleryCommand')
    def test_main(self, Command):
        cmd = Command.return_value = Mock()
        mainfun()
        cmd.execute_from_commandline.assert_called_with(None)

    @patch('celery.bin.celery.CeleryCommand')
    def test_main_KeyboardInterrupt(self, Command):
        cmd = Command.return_value = Mock()
        cmd.execute_from_commandline.side_effect = KeyboardInterrupt()
        mainfun()
        cmd.execute_from_commandline.assert_called_with(None)


class test_compat(AppCase):

    def test_compat_command_decorator(self):
        with patch('celery.bin.celery.CeleryCommand') as CC:
            self.assertEqual(command(), CC.register_command)
            fun = Mock(name='fun')
            command(fun)
            CC.register_command.assert_called_with(fun)