Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / utils / test_dispatcher.py

from __future__ import absolute_import


import gc
import sys
import time

from celery.utils.dispatch import Signal
from celery.tests.case import Case


if sys.platform.startswith('java'):

    def garbage_collect():
        # Some JVM GCs will execute finalizers in a different thread, meaning
        # we need to wait for that to complete before we go on looking for the
        # effects of that.
        gc.collect()
        time.sleep(0.1)

elif hasattr(sys, 'pypy_version_info'):

    def garbage_collect():  # noqa
        # Collecting weakreferences can take two collections on PyPy.
        gc.collect()
        gc.collect()
else:

    def garbage_collect():  # noqa
        gc.collect()


def receiver_1_arg(val, **kwargs):
    return val


class Callable(object):

    def __call__(self, val, **kwargs):
        return val

    def a(self, val, **kwargs):
        return val

a_signal = Signal(providing_args=['val'])


class DispatcherTests(Case):
    """Test suite for dispatcher (barely started)"""

    def _testIsClean(self, signal):
        """Assert that everything has been cleaned up automatically"""
        self.assertEqual(signal.receivers, [])

        # force cleanup just in case
        signal.receivers = []

    def test_exact(self):
        a_signal.connect(receiver_1_arg, sender=self)
        expected = [(receiver_1_arg, 'test')]
        result = a_signal.send(sender=self, val='test')
        self.assertEqual(result, expected)
        a_signal.disconnect(receiver_1_arg, sender=self)
        self._testIsClean(a_signal)

    def test_ignored_sender(self):
        a_signal.connect(receiver_1_arg)
        expected = [(receiver_1_arg, 'test')]
        result = a_signal.send(sender=self, val='test')
        self.assertEqual(result, expected)
        a_signal.disconnect(receiver_1_arg)
        self._testIsClean(a_signal)

    def test_garbage_collected(self):
        a = Callable()
        a_signal.connect(a.a, sender=self)
        expected = []
        del a
        garbage_collect()
        result = a_signal.send(sender=self, val='test')
        self.assertEqual(result, expected)
        self._testIsClean(a_signal)

    def test_multiple_registration(self):
        a = Callable()
        a_signal.connect(a)
        a_signal.connect(a)
        a_signal.connect(a)
        a_signal.connect(a)
        a_signal.connect(a)
        a_signal.connect(a)
        result = a_signal.send(sender=self, val='test')
        self.assertEqual(len(result), 1)
        self.assertEqual(len(a_signal.receivers), 1)
        del a
        del result
        garbage_collect()
        self._testIsClean(a_signal)

    def test_uid_registration(self):

        def uid_based_receiver_1(**kwargs):
            pass

        def uid_based_receiver_2(**kwargs):
            pass

        a_signal.connect(uid_based_receiver_1, dispatch_uid='uid')
        a_signal.connect(uid_based_receiver_2, dispatch_uid='uid')
        self.assertEqual(len(a_signal.receivers), 1)
        a_signal.disconnect(dispatch_uid='uid')
        self._testIsClean(a_signal)

    def test_robust(self):

        def fails(val, **kwargs):
            raise ValueError('this')

        a_signal.connect(fails)
        result = a_signal.send_robust(sender=self, val='test')
        err = result[0][1]
        self.assertTrue(isinstance(err, ValueError))
        self.assertEqual(err.args, ('this',))
        a_signal.disconnect(fails)
        self._testIsClean(a_signal)

    def test_disconnection(self):
        receiver_1 = Callable()
        receiver_2 = Callable()
        receiver_3 = Callable()
        a_signal.connect(receiver_1)
        a_signal.connect(receiver_2)
        a_signal.connect(receiver_3)
        a_signal.disconnect(receiver_1)
        del receiver_2
        garbage_collect()
        a_signal.disconnect(receiver_3)
        self._testIsClean(a_signal)