Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / concurrency / test_concurrency.py

from __future__ import absolute_import

import os

from itertools import count

from celery.concurrency.base import apply_target, BasePool
from celery.tests.case import AppCase, Mock


class test_BasePool(AppCase):

    def test_apply_target(self):

        scratch = {}
        counter = count(0)

        def gen_callback(name, retval=None):

            def callback(*args):
                scratch[name] = (next(counter), args)
                return retval

            return callback

        apply_target(gen_callback('target', 42),
                     args=(8, 16),
                     callback=gen_callback('callback'),
                     accept_callback=gen_callback('accept_callback'))

        self.assertDictContainsSubset(
            {'target': (1, (8, 16)), 'callback': (2, (42, ))},
            scratch,
        )
        pa1 = scratch['accept_callback']
        self.assertEqual(0, pa1[0])
        self.assertEqual(pa1[1][0], os.getpid())
        self.assertTrue(pa1[1][1])

        # No accept callback
        scratch.clear()
        apply_target(gen_callback('target', 42),
                     args=(8, 16),
                     callback=gen_callback('callback'),
                     accept_callback=None)
        self.assertDictEqual(scratch,
                             {'target': (3, (8, 16)),
                              'callback': (4, (42, ))})

    def test_does_not_debug(self):
        x = BasePool(10)
        x._does_debug = False
        x.apply_async(object)

    def test_num_processes(self):
        self.assertEqual(BasePool(7).num_processes, 7)

    def test_interface_on_start(self):
        BasePool(10).on_start()

    def test_interface_on_stop(self):
        BasePool(10).on_stop()

    def test_interface_on_apply(self):
        BasePool(10).on_apply()

    def test_interface_info(self):
        self.assertDictEqual(BasePool(10).info, {})

    def test_active(self):
        p = BasePool(10)
        self.assertFalse(p.active)
        p._state = p.RUN
        self.assertTrue(p.active)

    def test_restart(self):
        p = BasePool(10)
        with self.assertRaises(NotImplementedError):
            p.restart()

    def test_interface_on_terminate(self):
        p = BasePool(10)
        p.on_terminate()

    def test_interface_terminate_job(self):
        with self.assertRaises(NotImplementedError):
            BasePool(10).terminate_job(101)

    def test_interface_did_start_ok(self):
        self.assertTrue(BasePool(10).did_start_ok())

    def test_interface_register_with_event_loop(self):
        self.assertIsNone(
            BasePool(10).register_with_event_loop(Mock()),
        )

    def test_interface_on_soft_timeout(self):
        self.assertIsNone(BasePool(10).on_soft_timeout(Mock()))

    def test_interface_on_hard_timeout(self):
        self.assertIsNone(BasePool(10).on_hard_timeout(Mock()))

    def test_interface_close(self):
        p = BasePool(10)
        p.on_close = Mock()
        p.close()
        self.assertEqual(p._state, p.CLOSE)
        p.on_close.assert_called_with()

    def test_interface_no_close(self):
        self.assertIsNone(BasePool(10).on_close())