Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / utils / test_timer2.py

from __future__ import absolute_import

import sys
import time

import celery.utils.timer2 as timer2

from celery.tests.case import Case, Mock, patch
from kombu.tests.case import redirect_stdouts


class test_Entry(Case):

    def test_call(self):
        scratch = [None]

        def timed(x, y, moo='foo'):
            scratch[0] = (x, y, moo)

        tref = timer2.Entry(timed, (4, 4), {'moo': 'baz'})
        tref()

        self.assertTupleEqual(scratch[0], (4, 4, 'baz'))

    def test_cancel(self):
        tref = timer2.Entry(lambda x: x, (1, ), {})
        tref.cancel()
        self.assertTrue(tref.cancelled)

    def test_repr(self):
        tref = timer2.Entry(lambda x: x(1, ), {})
        self.assertTrue(repr(tref))


class test_Schedule(Case):

    def test_supports_Timer_interface(self):
        x = timer2.Schedule()
        x.stop()

        tref = Mock()
        x.cancel(tref)
        tref.cancel.assert_called_with()

        self.assertIs(x.schedule, x)

    def test_handle_error(self):
        from datetime import datetime
        scratch = [None]

        def on_error(exc_info):
            scratch[0] = exc_info

        s = timer2.Schedule(on_error=on_error)

        with patch('kombu.async.timer.to_timestamp') as tot:
            tot.side_effect = OverflowError()
            s.enter_at(timer2.Entry(lambda: None, (), {}),
                       eta=datetime.now())
            s.enter_at(timer2.Entry(lambda: None, (), {}), eta=None)
            s.on_error = None
            with self.assertRaises(OverflowError):
                s.enter_at(timer2.Entry(lambda: None, (), {}),
                           eta=datetime.now())
        exc = scratch[0]
        self.assertIsInstance(exc, OverflowError)


class test_Timer(Case):

    def test_enter_after(self):
        t = timer2.Timer()
        try:
            done = [False]

            def set_done():
                done[0] = True

            t.call_after(0.3, set_done)
            mss = 0
            while not done[0]:
                if mss >= 2.0:
                    raise Exception('test timed out')
                time.sleep(0.1)
                mss += 0.1
        finally:
            t.stop()

    def test_exit_after(self):
        t = timer2.Timer()
        t.call_after = Mock()
        t.exit_after(0.3, priority=10)
        t.call_after.assert_called_with(0.3, sys.exit, 10)

    def test_ensure_started_not_started(self):
        t = timer2.Timer()
        t.running = True
        t.start = Mock()
        t.ensure_started()
        self.assertFalse(t.start.called)

    def test_call_repeatedly(self):
        t = timer2.Timer()
        try:
            t.schedule.enter_after = Mock()

            myfun = Mock()
            myfun.__name__ = 'myfun'
            t.call_repeatedly(0.03, myfun)

            self.assertEqual(t.schedule.enter_after.call_count, 1)
            args1, _ = t.schedule.enter_after.call_args_list[0]
            sec1, tref1, _ = args1
            self.assertEqual(sec1, 0.03)
            tref1()

            self.assertEqual(t.schedule.enter_after.call_count, 2)
            args2, _ = t.schedule.enter_after.call_args_list[1]
            sec2, tref2, _ = args2
            self.assertEqual(sec2, 0.03)
            tref2.cancelled = True
            tref2()

            self.assertEqual(t.schedule.enter_after.call_count, 2)
        finally:
            t.stop()

    @patch('kombu.async.timer.logger')
    def test_apply_entry_error_handled(self, logger):
        t = timer2.Timer()
        t.schedule.on_error = None

        fun = Mock()
        fun.side_effect = ValueError()

        t.schedule.apply_entry(fun)
        self.assertTrue(logger.error.called)

    @redirect_stdouts
    def test_apply_entry_error_not_handled(self, stdout, stderr):
        t = timer2.Timer()
        t.schedule.on_error = Mock()

        fun = Mock()
        fun.side_effect = ValueError()
        t.schedule.apply_entry(fun)
        fun.assert_called_with()
        self.assertFalse(stderr.getvalue())

    @patch('os._exit')
    def test_thread_crash(self, _exit):
        t = timer2.Timer()
        t._next_entry = Mock()
        t._next_entry.side_effect = OSError(131)
        t.run()
        _exit.assert_called_with(1)

    def test_gc_race_lost(self):
        t = timer2.Timer()
        t._is_stopped.set = Mock()
        t._is_stopped.set.side_effect = TypeError()

        t._is_shutdown.set()
        t.run()
        t._is_stopped.set.assert_called_with()

    def test_to_timestamp(self):
        self.assertIs(timer2.to_timestamp(3.13), 3.13)

    def test_test_enter(self):
        t = timer2.Timer()
        t._do_enter = Mock()
        e = Mock()
        t.enter(e, 13, 0)
        t._do_enter.assert_called_with('enter_at', e, 13, priority=0)

    def test_test_enter_after(self):
        t = timer2.Timer()
        t._do_enter = Mock()
        t.enter_after()
        t._do_enter.assert_called_with('enter_after')

    def test_cancel(self):
        t = timer2.Timer()
        tref = Mock()
        t.cancel(tref)
        tref.cancel.assert_called_with()