Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
idna / lib / python2.7 / site-packages / oslo_service / tests / test_loopingcall.py
Size: Mime:
# Copyright 2012 Red Hat, Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import eventlet
from eventlet.green import threading as greenthreading
import mock
from oslotest import base as test_base

import oslo_service
from oslo_service import loopingcall

threading = eventlet.patcher.original('threading')
time = eventlet.patcher.original('time')


class LoopingCallTestCase(test_base.BaseTestCase):

    def setUp(self):
        super(LoopingCallTestCase, self).setUp()
        self.num_runs = 0

    def test_return_true(self):
        def _raise_it():
            raise loopingcall.LoopingCallDone(True)

        timer = loopingcall.FixedIntervalLoopingCall(_raise_it)
        self.assertTrue(timer.start(interval=0.5).wait())

    def test_monotonic_timer(self):
        def _raise_it():
            clock = eventlet.hubs.get_hub().clock
            ok = (clock == oslo_service._monotonic)
            raise loopingcall.LoopingCallDone(ok)

        timer = loopingcall.FixedIntervalLoopingCall(_raise_it)
        self.assertTrue(timer.start(interval=0.5).wait())

    def test_eventlet_clock(self):
        # Make sure that by default the oslo_service.service_hub() kicks in,
        # test in the main thread
        hub = eventlet.hubs.get_hub()
        self.assertEqual(oslo_service._monotonic,
                         hub.clock)

    def test_eventlet_use_hub_override(self):
        ns = {}

        def task():
            try:
                self._test_eventlet_use_hub_override()
            except Exception as exc:
                ns['result'] = exc
            else:
                ns['result'] = 'ok'

        # test overriding the hub of in a new thread to not modify the hub
        # of the main thread
        thread = threading.Thread(target=task)
        thread.start()
        thread.join()
        self.assertEqual('ok', ns['result'])

    def _test_eventlet_use_hub_override(self):
        # Make sure that by default the
        # oslo_service.service_hub() kicks in
        old_clock = eventlet.hubs.get_hub().clock
        self.assertEqual(oslo_service._monotonic,
                         old_clock)

        # eventlet will use time.monotonic() by default, same clock than
        # oslo.service_hub():
        # https://github.com/eventlet/eventlet/pull/303
        if not hasattr(time, 'monotonic'):
            # If anyone wants to override it
            try:
                eventlet.hubs.use_hub('poll')
            except Exception:
                eventlet.hubs.use_hub('selects')

            # then we get a new clock and the override works fine too!
            clock = eventlet.hubs.get_hub().clock
            self.assertNotEqual(old_clock, clock)

    def test_return_false(self):
        def _raise_it():
            raise loopingcall.LoopingCallDone(False)

        timer = loopingcall.FixedIntervalLoopingCall(_raise_it)
        self.assertFalse(timer.start(interval=0.5).wait())

    def test_terminate_on_exception(self):
        def _raise_it():
            raise RuntimeError()

        timer = loopingcall.FixedIntervalLoopingCall(_raise_it)
        self.assertRaises(RuntimeError, timer.start(interval=0.5).wait)

    def _raise_and_then_done(self):
        if self.num_runs == 0:
            raise loopingcall.LoopingCallDone(False)
        else:
            self.num_runs = self.num_runs - 1
            raise RuntimeError()

    def test_do_not_stop_on_exception(self):
        self.num_runs = 2

        timer = loopingcall.FixedIntervalLoopingCall(self._raise_and_then_done)
        res = timer.start(interval=0.5, stop_on_exception=False).wait()
        self.assertFalse(res)

    def _wait_for_zero(self):
        """Called at an interval until num_runs == 0."""
        if self.num_runs == 0:
            raise loopingcall.LoopingCallDone(False)
        else:
            self.num_runs = self.num_runs - 1

    def test_no_double_start(self):
        wait_ev = greenthreading.Event()

        def _run_forever_until_set():
            if wait_ev.is_set():
                raise loopingcall.LoopingCallDone(True)

        timer = loopingcall.FixedIntervalLoopingCall(_run_forever_until_set)
        timer.start(interval=0.01)

        self.assertRaises(RuntimeError, timer.start, interval=0.01)

        wait_ev.set()
        timer.wait()

    def test_repeat(self):
        self.num_runs = 2

        timer = loopingcall.FixedIntervalLoopingCall(self._wait_for_zero)
        self.assertFalse(timer.start(interval=0.5).wait())

    def assertAlmostEqual(self, expected, actual, precision=7, message=None):
        self.assertEqual(0, round(actual - expected, precision), message)

    @mock.patch('eventlet.greenthread.sleep')
    @mock.patch('oslo_utils.timeutils.now')
    def test_interval_adjustment(self, time_mock, sleep_mock):
        """Ensure the interval is adjusted to account for task duration."""
        self.num_runs = 3

        now = 1234567890
        second = 1
        smidgen = 0.01

        time_mock.side_effect = [now,  # restart
                                 now + second - smidgen,  # end
                                 now,  # restart
                                 now + second + second,  # end
                                 now,  # restart
                                 now + second + smidgen,  # end
                                 now]  # restart
        timer = loopingcall.FixedIntervalLoopingCall(self._wait_for_zero)
        timer.start(interval=1.01).wait()

        expected_calls = [0.02, 0.00, 0.00]
        for i, call in enumerate(sleep_mock.call_args_list):
            expected = expected_calls[i]
            args, kwargs = call
            actual = args[0]
            message = ('Call #%d, expected: %s, actual: %s' %
                       (i, expected, actual))
            self.assertAlmostEqual(expected, actual, message=message)

    def test_looping_call_timed_out(self):

        def _fake_task():
            pass

        timer = loopingcall.FixedIntervalWithTimeoutLoopingCall(_fake_task)
        self.assertRaises(loopingcall.LoopingCallTimeOut,
                          timer.start(interval=0.1, timeout=0.3).wait)


class DynamicLoopingCallTestCase(test_base.BaseTestCase):
    def setUp(self):
        super(DynamicLoopingCallTestCase, self).setUp()
        self.num_runs = 0

    def test_return_true(self):
        def _raise_it():
            raise loopingcall.LoopingCallDone(True)

        timer = loopingcall.DynamicLoopingCall(_raise_it)
        self.assertTrue(timer.start().wait())

    def test_monotonic_timer(self):
        def _raise_it():
            clock = eventlet.hubs.get_hub().clock
            ok = (clock == oslo_service._monotonic)
            raise loopingcall.LoopingCallDone(ok)

        timer = loopingcall.DynamicLoopingCall(_raise_it)
        self.assertTrue(timer.start().wait())

    def test_no_double_start(self):
        wait_ev = greenthreading.Event()

        def _run_forever_until_set():
            if wait_ev.is_set():
                raise loopingcall.LoopingCallDone(True)
            else:
                return 0.01

        timer = loopingcall.DynamicLoopingCall(_run_forever_until_set)
        timer.start()

        self.assertRaises(RuntimeError, timer.start)

        wait_ev.set()
        timer.wait()

    def test_return_false(self):
        def _raise_it():
            raise loopingcall.LoopingCallDone(False)

        timer = loopingcall.DynamicLoopingCall(_raise_it)
        self.assertFalse(timer.start().wait())

    def test_terminate_on_exception(self):
        def _raise_it():
            raise RuntimeError()

        timer = loopingcall.DynamicLoopingCall(_raise_it)
        self.assertRaises(RuntimeError, timer.start().wait)

    def _raise_and_then_done(self):
        if self.num_runs == 0:
            raise loopingcall.LoopingCallDone(False)
        else:
            self.num_runs = self.num_runs - 1
            raise RuntimeError()

    def test_do_not_stop_on_exception(self):
        self.num_runs = 2

        timer = loopingcall.DynamicLoopingCall(self._raise_and_then_done)
        timer.start(stop_on_exception=False).wait()

    def _wait_for_zero(self):
        """Called at an interval until num_runs == 0."""
        if self.num_runs == 0:
            raise loopingcall.LoopingCallDone(False)
        else:
            self.num_runs = self.num_runs - 1
            sleep_for = self.num_runs * 10 + 1  # dynamic duration
            return sleep_for

    def test_repeat(self):
        self.num_runs = 2

        timer = loopingcall.DynamicLoopingCall(self._wait_for_zero)
        self.assertFalse(timer.start().wait())

    def _timeout_task_without_any_return(self):
        pass

    def test_timeout_task_without_return_and_max_periodic(self):
        timer = loopingcall.DynamicLoopingCall(
            self._timeout_task_without_any_return
        )
        self.assertRaises(RuntimeError, timer.start().wait)

    def _timeout_task_without_return_but_with_done(self):
        if self.num_runs == 0:
            raise loopingcall.LoopingCallDone(False)
        else:
            self.num_runs = self.num_runs - 1

    @mock.patch('eventlet.greenthread.sleep')
    def test_timeout_task_without_return(self, sleep_mock):
        self.num_runs = 1
        timer = loopingcall.DynamicLoopingCall(
            self._timeout_task_without_return_but_with_done
        )
        timer.start(periodic_interval_max=5).wait()
        sleep_mock.assert_has_calls([mock.call(5)])

    @mock.patch('eventlet.greenthread.sleep')
    def test_interval_adjustment(self, sleep_mock):
        self.num_runs = 2

        timer = loopingcall.DynamicLoopingCall(self._wait_for_zero)
        timer.start(periodic_interval_max=5).wait()

        sleep_mock.assert_has_calls([mock.call(5), mock.call(1)])

    @mock.patch('eventlet.greenthread.sleep')
    def test_initial_delay(self, sleep_mock):
        self.num_runs = 1

        timer = loopingcall.DynamicLoopingCall(self._wait_for_zero)
        timer.start(initial_delay=3).wait()

        sleep_mock.assert_has_calls([mock.call(3), mock.call(1)])


class TestBackOffLoopingCall(test_base.BaseTestCase):
    @mock.patch('random.SystemRandom.gauss')
    @mock.patch('eventlet.greenthread.sleep')
    def test_exponential_backoff(self, sleep_mock, random_mock):
        def false():
            return False

        random_mock.return_value = .8

        self.assertRaises(loopingcall.LoopingCallTimeOut,
                          loopingcall.BackOffLoopingCall(false).start()
                          .wait)

        expected_times = [mock.call(1.6),
                          mock.call(2.4000000000000004),
                          mock.call(3.6),
                          mock.call(5.4),
                          mock.call(8.1),
                          mock.call(12.15),
                          mock.call(18.225),
                          mock.call(27.337500000000002),
                          mock.call(41.00625),
                          mock.call(61.509375000000006),
                          mock.call(92.26406250000001)]
        self.assertEqual(expected_times, sleep_mock.call_args_list)

    @mock.patch('random.SystemRandom.gauss')
    @mock.patch('eventlet.greenthread.sleep')
    def test_exponential_backoff_negative_value(self, sleep_mock, random_mock):
        def false():
            return False

        # random.gauss() can return negative values
        random_mock.return_value = -.8

        self.assertRaises(loopingcall.LoopingCallTimeOut,
                          loopingcall.BackOffLoopingCall(false).start()
                          .wait)

        expected_times = [mock.call(1.6),
                          mock.call(2.4000000000000004),
                          mock.call(3.6),
                          mock.call(5.4),
                          mock.call(8.1),
                          mock.call(12.15),
                          mock.call(18.225),
                          mock.call(27.337500000000002),
                          mock.call(41.00625),
                          mock.call(61.509375000000006),
                          mock.call(92.26406250000001)]
        self.assertEqual(expected_times, sleep_mock.call_args_list)

    @mock.patch('random.SystemRandom.gauss')
    @mock.patch('eventlet.greenthread.sleep')
    def test_no_backoff(self, sleep_mock, random_mock):
        random_mock.return_value = 1
        func = mock.Mock()
        # func.side_effect
        func.side_effect = [True, True, True, loopingcall.LoopingCallDone(
            retvalue='return value')]

        retvalue = loopingcall.BackOffLoopingCall(func).start().wait()

        expected_times = [mock.call(1), mock.call(1), mock.call(1)]
        self.assertEqual(expected_times, sleep_mock.call_args_list)
        self.assertTrue(retvalue, 'return value')

    @mock.patch('random.SystemRandom.gauss')
    @mock.patch('eventlet.greenthread.sleep')
    def test_no_sleep(self, sleep_mock, random_mock):
        # Any call that executes properly the first time shouldn't sleep
        random_mock.return_value = 1
        func = mock.Mock()
        # func.side_effect
        func.side_effect = loopingcall.LoopingCallDone(retvalue='return value')

        retvalue = loopingcall.BackOffLoopingCall(func).start().wait()
        self.assertFalse(sleep_mock.called)
        self.assertTrue(retvalue, 'return value')

    @mock.patch('random.SystemRandom.gauss')
    @mock.patch('eventlet.greenthread.sleep')
    def test_max_interval(self, sleep_mock, random_mock):
        def false():
            return False

        random_mock.return_value = .8

        self.assertRaises(loopingcall.LoopingCallTimeOut,
                          loopingcall.BackOffLoopingCall(false).start(
                              max_interval=60)
                          .wait)

        expected_times = [mock.call(1.6),
                          mock.call(2.4000000000000004),
                          mock.call(3.6),
                          mock.call(5.4),
                          mock.call(8.1),
                          mock.call(12.15),
                          mock.call(18.225),
                          mock.call(27.337500000000002),
                          mock.call(41.00625),
                          mock.call(60),
                          mock.call(60),
                          mock.call(60)]
        self.assertEqual(expected_times, sleep_mock.call_args_list)


class AnException(Exception):
    pass


class UnknownException(Exception):
    pass


class RetryDecoratorTest(test_base.BaseTestCase):
    """Tests for retry decorator class."""

    def test_retry(self):
        result = "RESULT"

        @loopingcall.RetryDecorator()
        def func(*args, **kwargs):
            return result

        self.assertEqual(result, func())

        def func2(*args, **kwargs):
            return result

        retry = loopingcall.RetryDecorator()
        self.assertEqual(result, retry(func2)())
        self.assertTrue(retry._retry_count == 0)

    def test_retry_with_expected_exceptions(self):
        result = "RESULT"
        responses = [AnException(None),
                     AnException(None),
                     result]

        def func(*args, **kwargs):
            response = responses.pop(0)
            if isinstance(response, Exception):
                raise response
            return response

        sleep_time_incr = 0.01
        retry_count = 2
        retry = loopingcall.RetryDecorator(10, sleep_time_incr, 10,
                                           (AnException,))
        self.assertEqual(result, retry(func)())
        self.assertTrue(retry._retry_count == retry_count)
        self.assertEqual(retry_count * sleep_time_incr, retry._sleep_time)

    def test_retry_with_max_retries(self):
        responses = [AnException(None),
                     AnException(None),
                     AnException(None)]

        def func(*args, **kwargs):
            response = responses.pop(0)
            if isinstance(response, Exception):
                raise response
            return response

        retry = loopingcall.RetryDecorator(2, 0, 0,
                                           (AnException,))
        self.assertRaises(AnException, retry(func))
        self.assertTrue(retry._retry_count == 2)

    def test_retry_with_unexpected_exception(self):

        def func(*args, **kwargs):
            raise UnknownException(None)

        retry = loopingcall.RetryDecorator()
        self.assertRaises(UnknownException, retry(func))
        self.assertTrue(retry._retry_count == 0)