from __future__ import absolute_import
from celery import bootsteps
from celery.tests.case import AppCase, Mock, patch
class test_StepFormatter(AppCase):
def test_get_prefix(self):
f = bootsteps.StepFormatter()
s = Mock()
s.last = True
self.assertEqual(f._get_prefix(s), f.blueprint_prefix)
s2 = Mock()
s2.last = False
s2.conditional = True
self.assertEqual(f._get_prefix(s2), f.conditional_prefix)
s3 = Mock()
s3.last = s3.conditional = False
self.assertEqual(f._get_prefix(s3), '')
def test_node(self):
f = bootsteps.StepFormatter()
f.draw_node = Mock()
step = Mock()
step.last = False
f.node(step, x=3)
f.draw_node.assert_called_with(step, f.node_scheme, {'x': 3})
step.last = True
f.node(step, x=3)
f.draw_node.assert_called_with(step, f.blueprint_scheme, {'x': 3})
def test_edge(self):
f = bootsteps.StepFormatter()
f.draw_edge = Mock()
a, b = Mock(), Mock()
a.last = True
f.edge(a, b, x=6)
f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
'x': 6, 'arrowhead': 'none', 'color': 'darkseagreen3',
})
a.last = False
f.edge(a, b, x=6)
f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
'x': 6,
})
class test_Step(AppCase):
class Def(bootsteps.StartStopStep):
name = 'test_Step.Def'
def setup(self):
self.steps = []
def test_blueprint_name(self, bp='test_blueprint_name'):
class X(bootsteps.Step):
blueprint = bp
name = 'X'
self.assertEqual(X.name, 'X')
class Y(bootsteps.Step):
name = '%s.Y' % bp
self.assertEqual(Y.name, '%s.Y' % bp)
def test_init(self):
self.assertTrue(self.Def(self))
def test_create(self):
self.Def(self).create(self)
def test_include_if(self):
x = self.Def(self)
x.enabled = True
self.assertTrue(x.include_if(self))
x.enabled = False
self.assertFalse(x.include_if(self))
def test_instantiate(self):
self.assertIsInstance(self.Def(self).instantiate(self.Def, self),
self.Def)
def test_include_when_enabled(self):
x = self.Def(self)
x.create = Mock()
x.create.return_value = 'George'
self.assertTrue(x.include(self))
self.assertEqual(x.obj, 'George')
x.create.assert_called_with(self)
def test_include_when_disabled(self):
x = self.Def(self)
x.enabled = False
x.create = Mock()
self.assertFalse(x.include(self))
self.assertFalse(x.create.call_count)
def test_repr(self):
x = self.Def(self)
self.assertTrue(repr(x))
class test_ConsumerStep(AppCase):
def test_interface(self):
step = bootsteps.ConsumerStep(self)
with self.assertRaises(NotImplementedError):
step.get_consumers(self)
def test_start_stop_shutdown(self):
consumer = Mock()
self.connection = Mock()
class Step(bootsteps.ConsumerStep):
def get_consumers(self, c):
return [consumer]
step = Step(self)
self.assertEqual(step.get_consumers(self), [consumer])
step.start(self)
consumer.consume.assert_called_with()
step.stop(self)
consumer.cancel.assert_called_with()
step.shutdown(self)
consumer.channel.close.assert_called_with()
def test_start_no_consumers(self):
self.connection = Mock()
class Step(bootsteps.ConsumerStep):
def get_consumers(self, c):
return ()
step = Step(self)
step.start(self)
class test_StartStopStep(AppCase):
class Def(bootsteps.StartStopStep):
name = 'test_StartStopStep.Def'
def setup(self):
self.steps = []
def test_start__stop(self):
x = self.Def(self)
x.create = Mock()
# include creates the underlying object and sets
# its x.obj attribute to it, as well as appending
# it to the parent.steps list.
x.include(self)
self.assertTrue(self.steps)
self.assertIs(self.steps[0], x)
x.start(self)
x.obj.start.assert_called_with()
x.stop(self)
x.obj.stop.assert_called_with()
x.obj = None
self.assertIsNone(x.start(self))
def test_include_when_disabled(self):
x = self.Def(self)
x.enabled = False
x.include(self)
self.assertFalse(self.steps)
def test_terminate(self):
x = self.Def(self)
x.create = Mock()
x.include(self)
delattr(x.obj, 'terminate')
x.terminate(self)
x.obj.stop.assert_called_with()
class test_Blueprint(AppCase):
class Blueprint(bootsteps.Blueprint):
name = 'test_Blueprint'
def test_steps_added_to_unclaimed(self):
class tnA(bootsteps.Step):
name = 'test_Blueprint.A'
class tnB(bootsteps.Step):
name = 'test_Blueprint.B'
class xxA(bootsteps.Step):
name = 'xx.A'
class Blueprint(self.Blueprint):
default_steps = [tnA, tnB]
blueprint = Blueprint(app=self.app)
self.assertIn(tnA, blueprint._all_steps())
self.assertIn(tnB, blueprint._all_steps())
self.assertNotIn(xxA, blueprint._all_steps())
def test_init(self):
blueprint = self.Blueprint(app=self.app)
self.assertIs(blueprint.app, self.app)
self.assertEqual(blueprint.name, 'test_Blueprint')
def test_close__on_close_is_None(self):
blueprint = self.Blueprint(app=self.app)
blueprint.on_close = None
blueprint.send_all = Mock()
blueprint.close(1)
blueprint.send_all.assert_called_with(
1, 'close', 'closing', reverse=False,
)
def test_send_all_with_None_steps(self):
parent = Mock()
blueprint = self.Blueprint(app=self.app)
parent.steps = [None, None, None]
blueprint.send_all(parent, 'close', 'Closing', reverse=False)
def test_join_raises_IGNORE_ERRORS(self):
prev, bootsteps.IGNORE_ERRORS = bootsteps.IGNORE_ERRORS, (KeyError, )
try:
blueprint = self.Blueprint(app=self.app)
blueprint.shutdown_complete = Mock()
blueprint.shutdown_complete.wait.side_effect = KeyError('luke')
blueprint.join(timeout=10)
blueprint.shutdown_complete.wait.assert_called_with(timeout=10)
finally:
bootsteps.IGNORE_ERRORS = prev
def test_connect_with(self):
class b1s1(bootsteps.Step):
pass
class b1s2(bootsteps.Step):
last = True
class b2s1(bootsteps.Step):
pass
class b2s2(bootsteps.Step):
last = True
b1 = self.Blueprint([b1s1, b1s2], app=self.app)
b2 = self.Blueprint([b2s1, b2s2], app=self.app)
b1.apply(Mock())
b2.apply(Mock())
b1.connect_with(b2)
self.assertIn(b1s1, b1.graph)
self.assertIn(b2s1, b1.graph)
self.assertIn(b2s2, b1.graph)
self.assertTrue(repr(b1s1))
self.assertTrue(str(b1s1))
def test_topsort_raises_KeyError(self):
class Step(bootsteps.Step):
requires = ('xyxxx.fsdasewe.Unknown', )
b = self.Blueprint([Step], app=self.app)
b.steps = b.claim_steps()
with self.assertRaises(ImportError):
b._finalize_steps(b.steps)
Step.requires = ()
b.steps = b.claim_steps()
b._finalize_steps(b.steps)
with patch('celery.bootsteps.DependencyGraph') as Dep:
g = Dep.return_value = Mock()
g.topsort.side_effect = KeyError('foo')
with self.assertRaises(KeyError):
b._finalize_steps(b.steps)
def test_apply(self):
class MyBlueprint(bootsteps.Blueprint):
name = 'test_apply'
def modules(self):
return ['A', 'B']
class B(bootsteps.Step):
name = 'test_apply.B'
class C(bootsteps.Step):
name = 'test_apply.C'
requires = [B]
class A(bootsteps.Step):
name = 'test_apply.A'
requires = [C]
class D(bootsteps.Step):
name = 'test_apply.D'
last = True
x = MyBlueprint([A, D], app=self.app)
x.apply(self)
self.assertIsInstance(x.order[0], B)
self.assertIsInstance(x.order[1], C)
self.assertIsInstance(x.order[2], A)
self.assertIsInstance(x.order[3], D)
self.assertIn(A, x.types)
self.assertIs(x[A.name], x.order[2])
def test_find_last_but_no_steps(self):
class MyBlueprint(bootsteps.Blueprint):
name = 'qwejwioqjewoqiej'
x = MyBlueprint(app=self.app)
x.apply(self)
self.assertIsNone(x._find_last())