Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / bin / test_beat.py

from __future__ import absolute_import

import logging
import sys

from collections import defaultdict

from celery import beat
from celery import platforms
from celery.bin import beat as beat_bin
from celery.apps import beat as beatapp

from celery.tests.case import AppCase, Mock, patch, restore_logging
from kombu.tests.case import redirect_stdouts


class MockedShelveModule(object):
    shelves = defaultdict(lambda: {})

    def open(self, filename, *args, **kwargs):
        return self.shelves[filename]
mocked_shelve = MockedShelveModule()


class MockService(beat.Service):
    started = False
    in_sync = False
    persistence = mocked_shelve

    def start(self):
        self.__class__.started = True

    def sync(self):
        self.__class__.in_sync = True


class MockBeat(beatapp.Beat):
    running = False

    def run(self):
        MockBeat.running = True


class MockBeat2(beatapp.Beat):
    Service = MockService

    def install_sync_handler(self, b):
        pass


class MockBeat3(beatapp.Beat):
    Service = MockService

    def install_sync_handler(self, b):
        raise TypeError('xxx')


class test_Beat(AppCase):

    def test_loglevel_string(self):
        b = beatapp.Beat(app=self.app, loglevel='DEBUG',
                         redirect_stdouts=False)
        self.assertEqual(b.loglevel, logging.DEBUG)

        b2 = beatapp.Beat(app=self.app, loglevel=logging.DEBUG,
                          redirect_stdouts=False)
        self.assertEqual(b2.loglevel, logging.DEBUG)

    def test_colorize(self):
        self.app.log.setup = Mock()
        b = beatapp.Beat(app=self.app, no_color=True,
                         redirect_stdouts=False)
        b.setup_logging()
        self.assertTrue(self.app.log.setup.called)
        self.assertEqual(self.app.log.setup.call_args[1]['colorize'], False)

    def test_init_loader(self):
        b = beatapp.Beat(app=self.app, redirect_stdouts=False)
        b.init_loader()

    def test_process_title(self):
        b = beatapp.Beat(app=self.app, redirect_stdouts=False)
        b.set_process_title()

    def test_run(self):
        b = MockBeat2(app=self.app, redirect_stdouts=False)
        MockService.started = False
        b.run()
        self.assertTrue(MockService.started)

    def psig(self, fun, *args, **kwargs):
        handlers = {}

        class Signals(platforms.Signals):

            def __setitem__(self, sig, handler):
                handlers[sig] = handler

        p, platforms.signals = platforms.signals, Signals()
        try:
            fun(*args, **kwargs)
            return handlers
        finally:
            platforms.signals = p

    def test_install_sync_handler(self):
        b = beatapp.Beat(app=self.app, redirect_stdouts=False)
        clock = MockService(app=self.app)
        MockService.in_sync = False
        handlers = self.psig(b.install_sync_handler, clock)
        with self.assertRaises(SystemExit):
            handlers['SIGINT']('SIGINT', object())
        self.assertTrue(MockService.in_sync)
        MockService.in_sync = False

    def test_setup_logging(self):
        with restore_logging():
            try:
                # py3k
                delattr(sys.stdout, 'logger')
            except AttributeError:
                pass
            b = beatapp.Beat(app=self.app, redirect_stdouts=False)
            b.redirect_stdouts = False
            b.app.log.already_setup = False
            b.setup_logging()
            with self.assertRaises(AttributeError):
                sys.stdout.logger

    @redirect_stdouts
    @patch('celery.apps.beat.logger')
    def test_logs_errors(self, logger, stdout, stderr):
        with restore_logging():
            b = MockBeat3(
                app=self.app, redirect_stdouts=False, socket_timeout=None,
            )
            b.start_scheduler()
            self.assertTrue(logger.critical.called)

    @redirect_stdouts
    @patch('celery.platforms.create_pidlock')
    def test_use_pidfile(self, create_pidlock, stdout, stderr):
        b = MockBeat2(app=self.app, pidfile='pidfilelockfilepid',
                      socket_timeout=None, redirect_stdouts=False)
        b.start_scheduler()
        self.assertTrue(create_pidlock.called)


class MockDaemonContext(object):
    opened = False
    closed = False

    def __init__(self, *args, **kwargs):
        pass

    def open(self):
        self.__class__.opened = True
        return self
    __enter__ = open

    def close(self, *args):
        self.__class__.closed = True
    __exit__ = close


class test_div(AppCase):

    def setup(self):
        self.prev, beatapp.Beat = beatapp.Beat, MockBeat
        self.ctx, beat_bin.detached = (
            beat_bin.detached, MockDaemonContext,
        )

    def teardown(self):
        beatapp.Beat = self.prev

    def test_main(self):
        sys.argv = [sys.argv[0], '-s', 'foo']
        try:
            beat_bin.main(app=self.app)
            self.assertTrue(MockBeat.running)
        finally:
            MockBeat.running = False

    def test_detach(self):
        cmd = beat_bin.beat()
        cmd.app = self.app
        cmd.run(detach=True)
        self.assertTrue(MockDaemonContext.opened)
        self.assertTrue(MockDaemonContext.closed)

    def test_parse_options(self):
        cmd = beat_bin.beat()
        cmd.app = self.app
        options, args = cmd.parse_options('celery beat', ['-s', 'foo'])
        self.assertEqual(options.schedule, 'foo')