Repository URL to install this package:
|
Version:
1:7.26.0-1 ▾
|
datadog-agent
/
opt
/
datadog-agent
/
embedded
/
lib
/
python3.8
/
site-packages
/
kazoo
/
tests
/
test_lease.py
|
|---|
import datetime
import uuid
from kazoo.recipe.lease import NonBlockingLease
from kazoo.recipe.lease import MultiNonBlockingLease
from kazoo.testing import KazooTestCase
class MockClock(object):
def __init__(self, epoch=0):
self.epoch = epoch
def forward(self, seconds):
self.epoch += seconds
def __call__(self):
return datetime.datetime.utcfromtimestamp(self.epoch)
class KazooLeaseTests(KazooTestCase):
def setUp(self):
super(KazooLeaseTests, self).setUp()
self.client2 = self._get_client(timeout=0.8)
self.client2.start()
self.client3 = self._get_client(timeout=0.8)
self.client3.start()
self.path = "/" + uuid.uuid4().hex
self.clock = MockClock(10)
def tearDown(self):
for cl in [self.client2, self.client3]:
if cl.connected:
cl.stop()
cl.close()
del self.client2
del self.client3
class NonBlockingLeaseTests(KazooLeaseTests):
def test_renew(self):
# Use client convenience method here to test it at least once. Use
# class directly in
# other tests in order to get better IDE support.
lease = self.client.NonBlockingLease(self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.assertTrue(lease.obtained)
self.clock.forward(2)
renewed_lease = self.client.NonBlockingLease(
self.path, datetime.timedelta(seconds=3), utcnow=self.clock)
self.assertTrue(renewed_lease)
def test_busy(self):
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.clock.forward(2)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertFalse(foreigner_lease)
self.assertFalse(foreigner_lease.obtained)
def test_overtake(self):
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.clock.forward(4)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertTrue(foreigner_lease)
def test_renew_no_overtake(self):
lease = self.client.NonBlockingLease(self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.assertTrue(lease.obtained)
self.clock.forward(2)
renewed_lease = self.client.NonBlockingLease(
self.path, datetime.timedelta(seconds=3), utcnow=self.clock)
self.assertTrue(renewed_lease)
self.clock.forward(2)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertFalse(foreigner_lease)
def test_overtaker_renews(self):
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.clock.forward(4)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertTrue(foreigner_lease)
self.clock.forward(2)
foreigner_renew = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertTrue(foreigner_renew)
def test_overtake_refuse_first(self):
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.clock.forward(4)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertTrue(foreigner_lease)
self.clock.forward(2)
first_again_lease = NonBlockingLease(
self.client, self.path, datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertFalse(first_again_lease)
def test_old_version(self):
# Skip to a future version
NonBlockingLease._version += 1
lease = NonBlockingLease(self.client, self.path,
datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
# Then back to today.
NonBlockingLease._version -= 1
self.clock.forward(4)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
# Since a newer version wrote the lease file, the lease is not taken.
self.assertFalse(foreigner_lease)
class MultiNonBlockingLeaseTest(KazooLeaseTests):
def test_1_renew(self):
ls = self.client.MultiNonBlockingLease(1, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client, 1, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls2)
def test_1_reject(self):
ls = MultiNonBlockingLease(self.client, 1, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 1, self.path,
datetime.timedelta(seconds=4),
identifier="some.other.host",
utcnow=self.clock)
self.assertFalse(ls2)
def test_2_renew(self):
ls = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=7),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path,
datetime.timedelta(seconds=7),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls2)
self.clock.forward(2)
ls3 = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=7),
utcnow=self.clock)
self.assertTrue(ls3)
self.clock.forward(2)
ls4 = MultiNonBlockingLease(self.client2, 2, self.path,
datetime.timedelta(seconds=7),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls4)
def test_2_reject(self):
ls = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=7),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path,
datetime.timedelta(seconds=7),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls2)
self.clock.forward(2)
ls3 = MultiNonBlockingLease(self.client3, 2, self.path,
datetime.timedelta(seconds=7),
identifier="host3", utcnow=self.clock)
self.assertFalse(ls3)
def test_2_handover(self):
ls = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path,
datetime.timedelta(seconds=4),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls2)
self.clock.forward(3)
ls3 = MultiNonBlockingLease(self.client3, 2, self.path,
datetime.timedelta(seconds=4),
identifier="host3", utcnow=self.clock)
self.assertTrue(ls3)
self.clock.forward(2)
ls4 = MultiNonBlockingLease(self.client, 2, self.path,
datetime.timedelta(seconds=4),
utcnow=self.clock)
self.assertTrue(ls4)