Repository URL to install this package:
Version:
6.0.19 ▾
|
python3-tvault-contego
/
usr
/
lib
/
python3
/
dist-packages
/
contego
/
nova
/
extension
/
driver
/
loopingcall.py
|
---|
# Copyright 2014 TrilioData Inc.
# All Rights Reserved.
import logging
import sys
import time
from eventlet import event
from eventlet import greenthread
try:
from nova.openstack.common._i18n import _LE, _LW
except BaseException:
from nova.i18n import _translators
_LW = _translators.log_warning
_LE = _translators.log_error
LOG = logging.getLogger(__name__)
def _ts():
return time.time()
class LoopingCallDone(Exception):
"""Exception to break out and stop a LoopingCallBase.
The poll-function passed to LoopingCallBase can raise this exception to
break out of the loop normally. This is somewhat analogous to
StopIteration.
An optional return-value can be included as the argument to the exception;
this return-value will be returned by LoopingCallBase.wait()
"""
def __init__(self, retvalue=True):
""":param retvalue: Value that LoopingCallBase.wait() should return."""
self.retvalue = retvalue
class LoopingCallBase(object):
def __init__(self, f=None, *args, **kw):
self.args = args
self.kw = kw
self.f = f
self._running = False
self.done = None
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
class FixedIntervalLoopingCall(LoopingCallBase):
"""A fixed interval looping call."""
def start(self, interval, initial_delay=None, max_duration=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
incept = _ts()
try:
while self._running:
start = _ts()
self.f(*self.args, **self.kw)
end = _ts()
if not self._running:
break
delay = end - start - interval
if delay > 10:
LOG.warn(
_LW(
"task %(func_name)r run outlasted "
"interval by %(delay).2f sec"
)
% {"func_name": self.f, "delay": delay}
)
if max_duration is not None and end - incept > max_duration:
msg = _LW(
"task %(func_name)r could out be "
"completed in %(max_duration).2f sec"
) % {"func_name": self.f, "max_duration": max_duration}
LOG.error(msg)
raise Exception(msg)
greenthread.sleep(-delay if delay < 0 else 0)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_LE("in fixed duration looping call"))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn_n(_inner)
return self.done
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
class DynamicLoopingCall(LoopingCallBase):
"""A looping call which sleeps until the next known event.
The function called should return how long to sleep for before being
called again.
"""
def start(self, initial_delay=None, periodic_interval_max=None):
self._running = True
done = event.Event()
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
idle = self.f(*self.args, **self.kw)
if not self._running:
break
if periodic_interval_max is not None:
idle = min(idle, periodic_interval_max)
LOG.debug(
"Dynamic looping call %(func_name)r sleeping "
"for %(idle).02f seconds",
{"func_name": self.f, "idle": idle},
)
greenthread.sleep(idle)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_LE("in dynamic looping call"))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
self.done = done
greenthread.spawn(_inner)
return self.done
def _wait_for_ping(retvalue):
raise LoopingCallDone(retvalue=retvalue)
def _wait_for_ping_never_return():
pass
"""
# Unit tests
timer = FixedIntervalLoopingCall(_wait_for_ping, (True))
ret = timer.start(interval=2,
max_duration=10).wait()
assert ret
timer = FixedIntervalLoopingCall(_wait_for_ping, (False))
ret = timer.start(interval=2,
max_duration=10).wait()
assert not ret
try:
timer = FixedIntervalLoopingCall(_wait_for_ping, (False))
ret = timer.start(interval=2,
max_duration=10).wait()
assert False
except:
assert True
"""