Repository URL to install this package:
| 
      
        
        
        Version: 
        
         
          
          3.3.41  ▾
        
         | 
| 
    
    tvault-contego
  
    /
        
    usr
  
        /
        
    lib
  
        /
        
    python2.7
  
        /
        
    dist-packages
  
        /
        
    contego
  
        /
        
    nova
  
        /
        
    extension
  
        /
        
    driver
  
        /
        loopingcall.py
   | 
|---|
# Copyright 2014 TrilioData Inc.
# All Rights Reserved.
import logging
import sys
import time
from eventlet import event
from eventlet import greenthread
try:
    from nova.openstack.common._i18n import _LE, _LW
except BaseException:
    from nova.i18n import _LE, _LW
LOG = logging.getLogger(__name__)
def _ts():
    return time.time()
class LoopingCallDone(Exception):
    """Exception to break out and stop a LoopingCallBase.
    The poll-function passed to LoopingCallBase can raise this exception to
    break out of the loop normally. This is somewhat analogous to
    StopIteration.
    An optional return-value can be included as the argument to the exception;
    this return-value will be returned by LoopingCallBase.wait()
    """
    def __init__(self, retvalue=True):
        """:param retvalue: Value that LoopingCallBase.wait() should return."""
        self.retvalue = retvalue
class LoopingCallBase(object):
    def __init__(self, f=None, *args, **kw):
        self.args = args
        self.kw = kw
        self.f = f
        self._running = False
        self.done = None
    def stop(self):
        self._running = False
    def wait(self):
        return self.done.wait()
class FixedIntervalLoopingCall(LoopingCallBase):
    """A fixed interval looping call."""
    def start(self, interval, initial_delay=None, max_duration=None):
        self._running = True
        done = event.Event()
        def _inner():
            if initial_delay:
                greenthread.sleep(initial_delay)
            incept = _ts()
            try:
                while self._running:
                    start = _ts()
                    self.f(*self.args, **self.kw)
                    end = _ts()
                    if not self._running:
                        break
                    delay = end - start - interval
                    if delay > 10:
                        LOG.warn(_LW('task %(func_name)r run outlasted '
                                     'interval by %(delay).2f sec') %
                                 {'func_name': self.f, 'delay': delay})
                    if max_duration is not None and \
                       end - incept > max_duration:
                        msg = (_LW('task %(func_name)r could out be '
                                   'completed in %(max_duration).2f sec') %
                               {'func_name': self.f,
                                'max_duration': max_duration})
                        LOG.error(msg)
                        raise Exception(msg)
                    greenthread.sleep(-delay if delay < 0 else 0)
            except LoopingCallDone as e:
                self.stop()
                done.send(e.retvalue)
            except Exception:
                LOG.exception(_LE('in fixed duration looping call'))
                done.send_exception(*sys.exc_info())
                return
            else:
                done.send(True)
        self.done = done
        greenthread.spawn_n(_inner)
        return self.done
class DynamicLoopingCall(LoopingCallBase):
    """A looping call which sleeps until the next known event.
    The function called should return how long to sleep for before being
    called again.
    """
    def start(self, initial_delay=None, periodic_interval_max=None):
        self._running = True
        done = event.Event()
        def _inner():
            if initial_delay:
                greenthread.sleep(initial_delay)
            try:
                while self._running:
                    idle = self.f(*self.args, **self.kw)
                    if not self._running:
                        break
                    if periodic_interval_max is not None:
                        idle = min(idle, periodic_interval_max)
                    LOG.debug('Dynamic looping call %(func_name)r sleeping '
                              'for %(idle).02f seconds',
                              {'func_name': self.f, 'idle': idle})
                    greenthread.sleep(idle)
            except LoopingCallDone as e:
                self.stop()
                done.send(e.retvalue)
            except Exception:
                LOG.exception(_LE('in dynamic looping call'))
                done.send_exception(*sys.exc_info())
                return
            else:
                done.send(True)
        self.done = done
        greenthread.spawn(_inner)
        return self.done
def _wait_for_ping(retvalue):
    raise LoopingCallDone(retvalue=retvalue)
def _wait_for_ping_never_return():
    pass
"""
# Unit tests
timer = FixedIntervalLoopingCall(_wait_for_ping, (True))
ret = timer.start(interval=2,
            max_duration=10).wait()
assert ret
timer = FixedIntervalLoopingCall(_wait_for_ping, (False))
ret = timer.start(interval=2,
            max_duration=10).wait()
assert not ret
try:
    timer = FixedIntervalLoopingCall(_wait_for_ping, (False))
    ret = timer.start(interval=2,
                max_duration=10).wait()
    assert False
except:
    assert True
"""