Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / concurrency / test_gevent.py

from __future__ import absolute_import

from celery.concurrency.gevent import (
    Schedule,
    Timer,
    TaskPool,
    apply_timeout,
)

from celery.tests.case import (
    AppCase, Mock, SkipTest, mock_module, patch, patch_many, skip_if_pypy,
)

gevent_modules = (
    'gevent',
    'gevent.monkey',
    'gevent.greenlet',
    'gevent.pool',
    'greenlet',
)


class GeventCase(AppCase):

    @skip_if_pypy
    def setup(self):
        try:
            self.gevent = __import__('gevent')
        except ImportError:
            raise SkipTest(
                'gevent not installed, skipping related tests.')


class test_gevent_patch(GeventCase):

    def test_is_patched(self):
        with mock_module(*gevent_modules):
            with patch('gevent.monkey.patch_all', create=True) as patch_all:
                import gevent
                gevent.version_info = (1, 0, 0)
                from celery import maybe_patch_concurrency
                maybe_patch_concurrency(['x', '-P', 'gevent'])
                self.assertTrue(patch_all.called)


class test_Schedule(AppCase):

    def test_sched(self):
        with mock_module(*gevent_modules):
            with patch_many('gevent.greenlet',
                            'gevent.greenlet.GreenletExit') as (greenlet,
                                                                GreenletExit):
                greenlet.Greenlet = object
                x = Schedule()
                greenlet.Greenlet = Mock()
                x._Greenlet.spawn_later = Mock()
                x._GreenletExit = KeyError
                entry = Mock()
                g = x._enter(1, 0, entry)
                self.assertTrue(x.queue)

                x._entry_exit(g)
                g.kill.assert_called_with()
                self.assertFalse(x._queue)

                x._queue.add(g)
                x.clear()
                x._queue.add(g)
                g.kill.side_effect = KeyError()
                x.clear()

                g = x._Greenlet()
                g.cancel()


class test_TaskPool(AppCase):

    def test_pool(self):
        with mock_module(*gevent_modules):
            with patch_many('gevent.spawn_raw', 'gevent.pool.Pool') as (
                    spawn_raw, Pool):
                x = TaskPool()
                x.on_start()
                x.on_stop()
                x.on_apply(Mock())
                x._pool = None
                x.on_stop()

                x._pool = Mock()
                x._pool._semaphore.counter = 1
                x._pool.size = 1
                x.grow()
                self.assertEqual(x._pool.size, 2)
                self.assertEqual(x._pool._semaphore.counter, 2)
                x.shrink()
                self.assertEqual(x._pool.size, 1)
                self.assertEqual(x._pool._semaphore.counter, 1)

                x._pool = [4, 5, 6]
                self.assertEqual(x.num_processes, 3)


class test_Timer(AppCase):

    def test_timer(self):
        with mock_module(*gevent_modules):
            x = Timer()
            x.ensure_started()
            x.schedule = Mock()
            x.start()
            x.stop()
            x.schedule.clear.assert_called_with()


class test_apply_timeout(AppCase):

    def test_apply_timeout(self):

            class Timeout(Exception):
                value = None

                def __init__(self, value):
                    self.__class__.value = value

                def __enter__(self):
                    return self

                def __exit__(self, *exc_info):
                    pass
            timeout_callback = Mock(name='timeout_callback')
            apply_target = Mock(name='apply_target')
            apply_timeout(
                Mock(), timeout=10, callback=Mock(name='callback'),
                timeout_callback=timeout_callback,
                apply_target=apply_target, Timeout=Timeout,
            )
            self.assertEqual(Timeout.value, 10)
            self.assertTrue(apply_target.called)

            apply_target.side_effect = Timeout(10)
            apply_timeout(
                Mock(), timeout=10, callback=Mock(),
                timeout_callback=timeout_callback,
                apply_target=apply_target, Timeout=Timeout,
            )
            timeout_callback.assert_called_with(False, 10)