Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / worker / test_components.py

from __future__ import absolute_import

# some of these are tested in test_worker, so I've only written tests
# here to complete coverage.  Should move everyting to this module at some
# point [-ask]

from celery.worker.components import (
    Queues,
    Pool,
)

from celery.tests.case import AppCase, Mock


class test_Queues(AppCase):

    def test_create_when_eventloop(self):
        w = Mock()
        w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
        q = Queues(w)
        q.create(w)
        self.assertIs(w.process_task, w._process_task_sem)


class test_Pool(AppCase):

    def test_close_terminate(self):
        w = Mock()
        comp = Pool(w)
        pool = w.pool = Mock()
        comp.close(w)
        pool.close.assert_called_with()
        comp.terminate(w)
        pool.terminate.assert_called_with()

        w.pool = None
        comp.close(w)
        comp.terminate(w)