from __future__ import absolute_import
import os
from itertools import count
from celery.concurrency.base import apply_target, BasePool
from celery.tests.case import AppCase, Mock
class test_BasePool(AppCase):
def test_apply_target(self):
scratch = {}
counter = count(0)
def gen_callback(name, retval=None):
def callback(*args):
scratch[name] = (next(counter), args)
return retval
return callback
apply_target(gen_callback('target', 42),
args=(8, 16),
callback=gen_callback('callback'),
accept_callback=gen_callback('accept_callback'))
self.assertDictContainsSubset(
{'target': (1, (8, 16)), 'callback': (2, (42, ))},
scratch,
)
pa1 = scratch['accept_callback']
self.assertEqual(0, pa1[0])
self.assertEqual(pa1[1][0], os.getpid())
self.assertTrue(pa1[1][1])
# No accept callback
scratch.clear()
apply_target(gen_callback('target', 42),
args=(8, 16),
callback=gen_callback('callback'),
accept_callback=None)
self.assertDictEqual(scratch,
{'target': (3, (8, 16)),
'callback': (4, (42, ))})
def test_does_not_debug(self):
x = BasePool(10)
x._does_debug = False
x.apply_async(object)
def test_num_processes(self):
self.assertEqual(BasePool(7).num_processes, 7)
def test_interface_on_start(self):
BasePool(10).on_start()
def test_interface_on_stop(self):
BasePool(10).on_stop()
def test_interface_on_apply(self):
BasePool(10).on_apply()
def test_interface_info(self):
self.assertDictEqual(BasePool(10).info, {})
def test_active(self):
p = BasePool(10)
self.assertFalse(p.active)
p._state = p.RUN
self.assertTrue(p.active)
def test_restart(self):
p = BasePool(10)
with self.assertRaises(NotImplementedError):
p.restart()
def test_interface_on_terminate(self):
p = BasePool(10)
p.on_terminate()
def test_interface_terminate_job(self):
with self.assertRaises(NotImplementedError):
BasePool(10).terminate_job(101)
def test_interface_did_start_ok(self):
self.assertTrue(BasePool(10).did_start_ok())
def test_interface_register_with_event_loop(self):
self.assertIsNone(
BasePool(10).register_with_event_loop(Mock()),
)
def test_interface_on_soft_timeout(self):
self.assertIsNone(BasePool(10).on_soft_timeout(Mock()))
def test_interface_on_hard_timeout(self):
self.assertIsNone(BasePool(10).on_hard_timeout(Mock()))
def test_interface_close(self):
p = BasePool(10)
p.on_close = Mock()
p.close()
self.assertEqual(p._state, p.CLOSE)
p.on_close.assert_called_with()
def test_interface_no_close(self):
self.assertIsNone(BasePool(10).on_close())