Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / worker / test_bootsteps.py

from __future__ import absolute_import

from celery import bootsteps

from celery.tests.case import AppCase, Mock, patch


class test_StepFormatter(AppCase):

    def test_get_prefix(self):
        f = bootsteps.StepFormatter()
        s = Mock()
        s.last = True
        self.assertEqual(f._get_prefix(s), f.blueprint_prefix)

        s2 = Mock()
        s2.last = False
        s2.conditional = True
        self.assertEqual(f._get_prefix(s2), f.conditional_prefix)

        s3 = Mock()
        s3.last = s3.conditional = False
        self.assertEqual(f._get_prefix(s3), '')

    def test_node(self):
        f = bootsteps.StepFormatter()
        f.draw_node = Mock()
        step = Mock()
        step.last = False
        f.node(step, x=3)
        f.draw_node.assert_called_with(step, f.node_scheme, {'x': 3})

        step.last = True
        f.node(step, x=3)
        f.draw_node.assert_called_with(step, f.blueprint_scheme, {'x': 3})

    def test_edge(self):
        f = bootsteps.StepFormatter()
        f.draw_edge = Mock()
        a, b = Mock(), Mock()
        a.last = True
        f.edge(a, b, x=6)
        f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
            'x': 6, 'arrowhead': 'none', 'color': 'darkseagreen3',
        })

        a.last = False
        f.edge(a, b, x=6)
        f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
            'x': 6,
        })


class test_Step(AppCase):

    class Def(bootsteps.StartStopStep):
        name = 'test_Step.Def'

    def setup(self):
        self.steps = []

    def test_blueprint_name(self, bp='test_blueprint_name'):

        class X(bootsteps.Step):
            blueprint = bp
            name = 'X'
        self.assertEqual(X.name, 'X')

        class Y(bootsteps.Step):
            name = '%s.Y' % bp
        self.assertEqual(Y.name, '%s.Y' % bp)

    def test_init(self):
        self.assertTrue(self.Def(self))

    def test_create(self):
        self.Def(self).create(self)

    def test_include_if(self):
        x = self.Def(self)
        x.enabled = True
        self.assertTrue(x.include_if(self))

        x.enabled = False
        self.assertFalse(x.include_if(self))

    def test_instantiate(self):
        self.assertIsInstance(self.Def(self).instantiate(self.Def, self),
                              self.Def)

    def test_include_when_enabled(self):
        x = self.Def(self)
        x.create = Mock()
        x.create.return_value = 'George'
        self.assertTrue(x.include(self))

        self.assertEqual(x.obj, 'George')
        x.create.assert_called_with(self)

    def test_include_when_disabled(self):
        x = self.Def(self)
        x.enabled = False
        x.create = Mock()

        self.assertFalse(x.include(self))
        self.assertFalse(x.create.call_count)

    def test_repr(self):
        x = self.Def(self)
        self.assertTrue(repr(x))


class test_ConsumerStep(AppCase):

    def test_interface(self):
        step = bootsteps.ConsumerStep(self)
        with self.assertRaises(NotImplementedError):
            step.get_consumers(self)

    def test_start_stop_shutdown(self):
        consumer = Mock()
        self.connection = Mock()

        class Step(bootsteps.ConsumerStep):

            def get_consumers(self, c):
                return [consumer]

        step = Step(self)
        self.assertEqual(step.get_consumers(self), [consumer])

        step.start(self)
        consumer.consume.assert_called_with()
        step.stop(self)
        consumer.cancel.assert_called_with()

        step.shutdown(self)
        consumer.channel.close.assert_called_with()

    def test_start_no_consumers(self):
        self.connection = Mock()

        class Step(bootsteps.ConsumerStep):

            def get_consumers(self, c):
                return ()

        step = Step(self)
        step.start(self)


class test_StartStopStep(AppCase):

    class Def(bootsteps.StartStopStep):
        name = 'test_StartStopStep.Def'

    def setup(self):
        self.steps = []

    def test_start__stop(self):
        x = self.Def(self)
        x.create = Mock()

        # include creates the underlying object and sets
        # its x.obj attribute to it, as well as appending
        # it to the parent.steps list.
        x.include(self)
        self.assertTrue(self.steps)
        self.assertIs(self.steps[0], x)

        x.start(self)
        x.obj.start.assert_called_with()

        x.stop(self)
        x.obj.stop.assert_called_with()

        x.obj = None
        self.assertIsNone(x.start(self))

    def test_include_when_disabled(self):
        x = self.Def(self)
        x.enabled = False
        x.include(self)
        self.assertFalse(self.steps)

    def test_terminate(self):
        x = self.Def(self)
        x.create = Mock()

        x.include(self)
        delattr(x.obj, 'terminate')
        x.terminate(self)
        x.obj.stop.assert_called_with()


class test_Blueprint(AppCase):

    class Blueprint(bootsteps.Blueprint):
        name = 'test_Blueprint'

    def test_steps_added_to_unclaimed(self):

        class tnA(bootsteps.Step):
            name = 'test_Blueprint.A'

        class tnB(bootsteps.Step):
            name = 'test_Blueprint.B'

        class xxA(bootsteps.Step):
            name = 'xx.A'

        class Blueprint(self.Blueprint):
            default_steps = [tnA, tnB]
        blueprint = Blueprint(app=self.app)

        self.assertIn(tnA, blueprint._all_steps())
        self.assertIn(tnB, blueprint._all_steps())
        self.assertNotIn(xxA, blueprint._all_steps())

    def test_init(self):
        blueprint = self.Blueprint(app=self.app)
        self.assertIs(blueprint.app, self.app)
        self.assertEqual(blueprint.name, 'test_Blueprint')

    def test_close__on_close_is_None(self):
        blueprint = self.Blueprint(app=self.app)
        blueprint.on_close = None
        blueprint.send_all = Mock()
        blueprint.close(1)
        blueprint.send_all.assert_called_with(
            1, 'close', 'closing', reverse=False,
        )

    def test_send_all_with_None_steps(self):
        parent = Mock()
        blueprint = self.Blueprint(app=self.app)
        parent.steps = [None, None, None]
        blueprint.send_all(parent, 'close', 'Closing', reverse=False)

    def test_join_raises_IGNORE_ERRORS(self):
        prev, bootsteps.IGNORE_ERRORS = bootsteps.IGNORE_ERRORS, (KeyError, )
        try:
            blueprint = self.Blueprint(app=self.app)
            blueprint.shutdown_complete = Mock()
            blueprint.shutdown_complete.wait.side_effect = KeyError('luke')
            blueprint.join(timeout=10)
            blueprint.shutdown_complete.wait.assert_called_with(timeout=10)
        finally:
            bootsteps.IGNORE_ERRORS = prev

    def test_connect_with(self):

        class b1s1(bootsteps.Step):
            pass

        class b1s2(bootsteps.Step):
            last = True

        class b2s1(bootsteps.Step):
            pass

        class b2s2(bootsteps.Step):
            last = True

        b1 = self.Blueprint([b1s1, b1s2], app=self.app)
        b2 = self.Blueprint([b2s1, b2s2], app=self.app)
        b1.apply(Mock())
        b2.apply(Mock())
        b1.connect_with(b2)

        self.assertIn(b1s1, b1.graph)
        self.assertIn(b2s1, b1.graph)
        self.assertIn(b2s2, b1.graph)

        self.assertTrue(repr(b1s1))
        self.assertTrue(str(b1s1))

    def test_topsort_raises_KeyError(self):

        class Step(bootsteps.Step):
            requires = ('xyxxx.fsdasewe.Unknown', )

        b = self.Blueprint([Step], app=self.app)
        b.steps = b.claim_steps()
        with self.assertRaises(ImportError):
            b._finalize_steps(b.steps)
        Step.requires = ()

        b.steps = b.claim_steps()
        b._finalize_steps(b.steps)

        with patch('celery.bootsteps.DependencyGraph') as Dep:
            g = Dep.return_value = Mock()
            g.topsort.side_effect = KeyError('foo')
            with self.assertRaises(KeyError):
                b._finalize_steps(b.steps)

    def test_apply(self):

        class MyBlueprint(bootsteps.Blueprint):
            name = 'test_apply'

            def modules(self):
                return ['A', 'B']

        class B(bootsteps.Step):
            name = 'test_apply.B'

        class C(bootsteps.Step):
            name = 'test_apply.C'
            requires = [B]

        class A(bootsteps.Step):
            name = 'test_apply.A'
            requires = [C]

        class D(bootsteps.Step):
            name = 'test_apply.D'
            last = True

        x = MyBlueprint([A, D], app=self.app)
        x.apply(self)

        self.assertIsInstance(x.order[0], B)
        self.assertIsInstance(x.order[1], C)
        self.assertIsInstance(x.order[2], A)
        self.assertIsInstance(x.order[3], D)
        self.assertIn(A, x.types)
        self.assertIs(x[A.name], x.order[2])

    def test_find_last_but_no_steps(self):

        class MyBlueprint(bootsteps.Blueprint):
            name = 'qwejwioqjewoqiej'

        x = MyBlueprint(app=self.app)
        x.apply(self)
        self.assertIsNone(x._find_last())