from __future__ import absolute_import
import sys
from anyjson import dumps
from datetime import datetime
from celery import __main__
from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
from celery.bin.base import Error
from celery.bin.celery import (
Command,
list_,
call,
purge,
result,
inspect,
control,
status,
migrate,
help,
report,
CeleryCommand,
determine_exit_status,
multi,
main as mainfun,
_RemoteControl,
command,
)
from celery.tests.case import (
AppCase, Mock, WhateverIO, override_stdouts, patch,
)
class test__main__(AppCase):
def test_warn_deprecated(self):
with override_stdouts() as (stdout, _):
__main__._warn_deprecated('YADDA YADDA')
self.assertIn('command is deprecated', stdout.getvalue())
self.assertIn('YADDA YADDA', stdout.getvalue())
def test_main(self):
with patch('celery.__main__.maybe_patch_concurrency') as mpc:
with patch('celery.bin.celery.main') as main:
__main__.main()
mpc.assert_called_with()
main.assert_called_with()
def test_compat_worker(self):
with patch('celery.__main__.maybe_patch_concurrency') as mpc:
with patch('celery.__main__._warn_deprecated') as depr:
with patch('celery.bin.worker.main') as main:
__main__._compat_worker()
mpc.assert_called_with()
depr.assert_called_with('celery worker')
main.assert_called_with()
def test_compat_multi(self):
with patch('celery.__main__.maybe_patch_concurrency') as mpc:
with patch('celery.__main__._warn_deprecated') as depr:
with patch('celery.bin.multi.main') as main:
__main__._compat_multi()
self.assertFalse(mpc.called)
depr.assert_called_with('celery multi')
main.assert_called_with()
def test_compat_beat(self):
with patch('celery.__main__.maybe_patch_concurrency') as mpc:
with patch('celery.__main__._warn_deprecated') as depr:
with patch('celery.bin.beat.main') as main:
__main__._compat_beat()
mpc.assert_called_with()
depr.assert_called_with('celery beat')
main.assert_called_with()
class test_Command(AppCase):
def test_Error_repr(self):
x = Error('something happened')
self.assertIsNotNone(x.status)
self.assertTrue(x.reason)
self.assertTrue(str(x))
def setup(self):
self.out = WhateverIO()
self.err = WhateverIO()
self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
def test_error(self):
self.cmd.out = Mock()
self.cmd.error('FOO')
self.assertTrue(self.cmd.out.called)
def test_out(self):
f = Mock()
self.cmd.out('foo', f)
def test_call(self):
def ok_run():
pass
self.cmd.run = ok_run
self.assertEqual(self.cmd(), EX_OK)
def error_run():
raise Error('error', EX_FAILURE)
self.cmd.run = error_run
self.assertEqual(self.cmd(), EX_FAILURE)
def test_run_from_argv(self):
with self.assertRaises(NotImplementedError):
self.cmd.run_from_argv('prog', ['foo', 'bar'])
def test_pretty_list(self):
self.assertEqual(self.cmd.pretty([])[1], '- empty -')
self.assertIn('bar', self.cmd.pretty(['foo', 'bar'])[1])
def test_pretty_dict(self):
self.assertIn(
'OK',
str(self.cmd.pretty({'ok': 'the quick brown fox'})[0]),
)
self.assertIn(
'ERROR',
str(self.cmd.pretty({'error': 'the quick brown fox'})[0]),
)
def test_pretty(self):
self.assertIn('OK', str(self.cmd.pretty('the quick brown')))
self.assertIn('OK', str(self.cmd.pretty(object())))
self.assertIn('OK', str(self.cmd.pretty({'foo': 'bar'})))
class test_list(AppCase):
def test_list_bindings_no_support(self):
l = list_(app=self.app, stderr=WhateverIO())
management = Mock()
management.get_bindings.side_effect = NotImplementedError()
with self.assertRaises(Error):
l.list_bindings(management)
def test_run(self):
l = list_(app=self.app, stderr=WhateverIO())
l.run('bindings')
with self.assertRaises(Error):
l.run(None)
with self.assertRaises(Error):
l.run('foo')
class test_call(AppCase):
def setup(self):
@self.app.task(shared=False)
def add(x, y):
return x + y
self.add = add
@patch('celery.app.base.Celery.send_task')
def test_run(self, send_task):
a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
a.run(self.add.name)
self.assertTrue(send_task.called)
a.run(self.add.name,
args=dumps([4, 4]),
kwargs=dumps({'x': 2, 'y': 2}))
self.assertEqual(send_task.call_args[1]['args'], [4, 4])
self.assertEqual(send_task.call_args[1]['kwargs'], {'x': 2, 'y': 2})
a.run(self.add.name, expires=10, countdown=10)
self.assertEqual(send_task.call_args[1]['expires'], 10)
self.assertEqual(send_task.call_args[1]['countdown'], 10)
now = datetime.now()
iso = now.isoformat()
a.run(self.add.name, expires=iso)
self.assertEqual(send_task.call_args[1]['expires'], now)
with self.assertRaises(ValueError):
a.run(self.add.name, expires='foobaribazibar')
class test_purge(AppCase):
@patch('celery.app.control.Control.purge')
def test_run(self, purge_):
out = WhateverIO()
a = purge(app=self.app, stdout=out)
purge_.return_value = 0
a.run(force=True)
self.assertIn('No messages purged', out.getvalue())
purge_.return_value = 100
a.run(force=True)
self.assertIn('100 messages', out.getvalue())
class test_result(AppCase):
def setup(self):
@self.app.task(shared=False)
def add(x, y):
return x + y
self.add = add
def test_run(self):
with patch('celery.result.AsyncResult.get') as get:
out = WhateverIO()
r = result(app=self.app, stdout=out)
get.return_value = 'Jerry'
r.run('id')
self.assertIn('Jerry', out.getvalue())
get.return_value = 'Elaine'
r.run('id', task=self.add.name)
self.assertIn('Elaine', out.getvalue())
with patch('celery.result.AsyncResult.traceback') as tb:
r.run('id', task=self.add.name, traceback=True)
self.assertIn(str(tb), out.getvalue())
class test_status(AppCase):
@patch('celery.bin.celery.inspect')
def test_run(self, inspect_):
out, err = WhateverIO(), WhateverIO()
ins = inspect_.return_value = Mock()
ins.run.return_value = []
s = status(self.app, stdout=out, stderr=err)
with self.assertRaises(Error):
s.run()
ins.run.return_value = ['a', 'b', 'c']
s.run()
self.assertIn('3 nodes online', out.getvalue())
s.run(quiet=True)
class test_migrate(AppCase):
@patch('celery.contrib.migrate.migrate_tasks')
def test_run(self, migrate_tasks):
out = WhateverIO()
m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
with self.assertRaises(TypeError):
m.run()
self.assertFalse(migrate_tasks.called)
m.run('memory://foo', 'memory://bar')
self.assertTrue(migrate_tasks.called)
state = Mock()
state.count = 10
state.strtotal = 30
m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
self.assertIn('10/30', out.getvalue())
class test_report(AppCase):
def test_run(self):
out = WhateverIO()
r = report(app=self.app, stdout=out)
self.assertEqual(r.run(), EX_OK)
self.assertTrue(out.getvalue())
class test_help(AppCase):
def test_run(self):
out = WhateverIO()
h = help(app=self.app, stdout=out)
h.parser = Mock()
self.assertEqual(h.run(), EX_USAGE)
self.assertTrue(out.getvalue())
self.assertTrue(h.usage('help'))
h.parser.print_help.assert_called_with()
class test_CeleryCommand(AppCase):
def test_execute_from_commandline(self):
x = CeleryCommand(app=self.app)
x.handle_argv = Mock()
x.handle_argv.return_value = 1
with self.assertRaises(SystemExit):
x.execute_from_commandline()
x.handle_argv.return_value = True
with self.assertRaises(SystemExit):
x.execute_from_commandline()
x.handle_argv.side_effect = KeyboardInterrupt()
with self.assertRaises(SystemExit):
x.execute_from_commandline()
x.respects_app_option = True
with self.assertRaises(SystemExit):
x.execute_from_commandline(['celery', 'multi'])
self.assertFalse(x.respects_app_option)
x.respects_app_option = True
with self.assertRaises(SystemExit):
x.execute_from_commandline(['manage.py', 'celery', 'multi'])
self.assertFalse(x.respects_app_option)
def test_with_pool_option(self):
x = CeleryCommand(app=self.app)
self.assertIsNone(x.with_pool_option(['celery', 'events']))
self.assertTrue(x.with_pool_option(['celery', 'worker']))
self.assertTrue(x.with_pool_option(['manage.py', 'celery', 'worker']))
def test_load_extensions_no_commands(self):
with patch('celery.bin.celery.Extensions') as Ext:
ext = Ext.return_value = Mock(name='Extension')
ext.load.return_value = None
x = CeleryCommand(app=self.app)
x.load_extension_commands()
def test_determine_exit_status(self):
self.assertEqual(determine_exit_status('true'), EX_OK)
self.assertEqual(determine_exit_status(''), EX_FAILURE)
def test_relocate_args_from_start(self):
x = CeleryCommand(app=self.app)
self.assertEqual(x._relocate_args_from_start(None), [])
self.assertEqual(
x._relocate_args_from_start(
['-l', 'debug', 'worker', '-c', '3', '--foo'],
),
['worker', '-c', '3', '--foo', '-l', 'debug'],
)
self.assertEqual(
x._relocate_args_from_start(
['--pool=gevent', '-l', 'debug', 'worker', '--foo', '-c', '3'],
),
['worker', '--foo', '-c', '3', '--pool=gevent', '-l', 'debug'],
)
self.assertEqual(
x._relocate_args_from_start(['foo', '--foo=1']),
['foo', '--foo=1'],
)
def test_handle_argv(self):
x = CeleryCommand(app=self.app)
x.execute = Mock()
x.handle_argv('celery', [])
x.execute.assert_called_with('help', ['help'])
x.handle_argv('celery', ['start', 'foo'])
x.execute.assert_called_with('start', ['start', 'foo'])
def test_execute(self):
x = CeleryCommand(app=self.app)
Help = x.commands['help'] = Mock()
help = Help.return_value = Mock()
x.execute('fooox', ['a'])
help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
help.reset()
x.execute('help', ['help'])
help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
Dummy = x.commands['dummy'] = Mock()
dummy = Dummy.return_value = Mock()
exc = dummy.run_from_argv.side_effect = Error(
'foo', status='EX_FAILURE',
)
x.on_error = Mock(name='on_error')
help.reset()
x.execute('dummy', ['dummy'])
x.on_error.assert_called_with(exc)
dummy.run_from_argv.assert_called_with(
x.prog_name, [], command='dummy',
)
help.run_from_argv.assert_called_with(
x.prog_name, [], command='help',
)
exc = dummy.run_from_argv.side_effect = x.UsageError('foo')
x.on_usage_error = Mock()
x.execute('dummy', ['dummy'])
x.on_usage_error.assert_called_with(exc)
def test_on_usage_error(self):
x = CeleryCommand(app=self.app)
x.error = Mock()
x.on_usage_error(x.UsageError('foo'), command=None)
self.assertTrue(x.error.called)
x.on_usage_error(x.UsageError('foo'), command='dummy')
def test_prepare_prog_name(self):
x = CeleryCommand(app=self.app)
main = Mock(name='__main__')
main.__file__ = '/opt/foo.py'
with patch.dict(sys.modules, __main__=main):
self.assertEqual(x.prepare_prog_name('__main__.py'), '/opt/foo.py')
self.assertEqual(x.prepare_prog_name('celery'), 'celery')
class test_RemoteControl(AppCase):
def test_call_interface(self):
with self.assertRaises(NotImplementedError):
_RemoteControl(app=self.app).call()
class test_inspect(AppCase):
def test_usage(self):
self.assertTrue(inspect(app=self.app).usage('foo'))
def test_command_info(self):
i = inspect(app=self.app)
self.assertTrue(i.get_command_info(
'ping', help=True, color=i.colored.red,
))
def test_list_commands_color(self):
i = inspect(app=self.app)
self.assertTrue(i.list_commands(
help=True, color=i.colored.red,
))
self.assertTrue(i.list_commands(
help=False, color=None,
))
def test_epilog(self):
self.assertTrue(inspect(app=self.app).epilog)
def test_do_call_method_sql_transport_type(self):
self.app.connection = Mock()
conn = self.app.connection.return_value = Mock(name='Connection')
conn.transport.driver_type = 'sql'
i = inspect(app=self.app)
with self.assertRaises(i.Error):
i.do_call_method(['ping'])
def test_say_directions(self):
i = inspect(self.app)
i.out = Mock()
i.quiet = True
i.say_chat('<-', 'hello out')
self.assertFalse(i.out.called)
i.say_chat('->', 'hello in')
self.assertTrue(i.out.called)
i.quiet = False
i.out.reset_mock()
i.say_chat('<-', 'hello out', 'body')
self.assertTrue(i.out.called)
@patch('celery.app.control.Control.inspect')
def test_run(self, real):
out = WhateverIO()
i = inspect(app=self.app, stdout=out)
with self.assertRaises(Error):
i.run()
with self.assertRaises(Error):
i.run('help')
with self.assertRaises(Error):
i.run('xyzzybaz')
i.run('ping')
self.assertTrue(real.called)
i.run('ping', destination='foo,bar')
self.assertEqual(real.call_args[1]['destination'], ['foo', 'bar'])
self.assertEqual(real.call_args[1]['timeout'], 0.2)
callback = real.call_args[1]['callback']
callback({'foo': {'ok': 'pong'}})
self.assertIn('OK', out.getvalue())
instance = real.return_value = Mock()
instance.ping.return_value = None
with self.assertRaises(Error):
i.run('ping')
out.seek(0)
out.truncate()
i.quiet = True
i.say_chat('<-', 'hello')
self.assertFalse(out.getvalue())
class test_control(AppCase):
def control(self, patch_call, *args, **kwargs):
kwargs.setdefault('app', Mock(name='app'))
c = control(*args, **kwargs)
if patch_call:
c.call = Mock(name='control.call')
return c
def test_call(self):
i = self.control(False)
i.call('foo', 1, kw=2)
i.app.control.foo.assert_called_with(1, kw=2, reply=True)
def test_pool_grow(self):
i = self.control(True)
i.pool_grow('pool_grow', n=2)
i.call.assert_called_with('pool_grow', 2)
def test_pool_shrink(self):
i = self.control(True)
i.pool_shrink('pool_shrink', n=2)
i.call.assert_called_with('pool_shrink', 2)
def test_autoscale(self):
i = self.control(True)
i.autoscale('autoscale', max=3, min=2)
i.call.assert_called_with('autoscale', 3, 2)
def test_rate_limit(self):
i = self.control(True)
i.rate_limit('rate_limit', 'proj.add', '1/s')
i.call.assert_called_with('rate_limit', 'proj.add', '1/s')
def test_time_limit(self):
i = self.control(True)
i.time_limit('time_limit', 'proj.add', 10, 30)
i.call.assert_called_with('time_limit', 'proj.add', 10, 30)
def test_add_consumer(self):
i = self.control(True)
i.add_consumer(
'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
durable=True,
)
i.call.assert_called_with(
'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
durable=True,
)
def test_cancel_consumer(self):
i = self.control(True)
i.cancel_consumer('cancel_consumer', 'queue')
i.call.assert_called_with('cancel_consumer', 'queue')
class test_multi(AppCase):
def test_get_options(self):
self.assertTupleEqual(multi(app=self.app).get_options(), ())
def test_run_from_argv(self):
with patch('celery.bin.multi.MultiTool') as MultiTool:
m = MultiTool.return_value = Mock()
multi(self.app).run_from_argv('celery', ['arg'], command='multi')
m.execute_from_commandline.assert_called_with(
['multi', 'arg'], 'celery',
)
class test_main(AppCase):
@patch('celery.bin.celery.CeleryCommand')
def test_main(self, Command):
cmd = Command.return_value = Mock()
mainfun()
cmd.execute_from_commandline.assert_called_with(None)
@patch('celery.bin.celery.CeleryCommand')
def test_main_KeyboardInterrupt(self, Command):
cmd = Command.return_value = Mock()
cmd.execute_from_commandline.side_effect = KeyboardInterrupt()
mainfun()
cmd.execute_from_commandline.assert_called_with(None)
class test_compat(AppCase):
def test_compat_command_decorator(self):
with patch('celery.bin.celery.CeleryCommand') as CC:
self.assertEqual(command(), CC.register_command)
fun = Mock(name='fun')
command(fun)
CC.register_command.assert_called_with(fun)