Repository URL to install this package:
"""
Various tests for synchronization primitives.
"""
import sys
import time
from _thread import start_new_thread, TIMEOUT_MAX
import threading
import unittest
import weakref
from test import support
def _wait():
# A crude wait/yield function not relying on synchronization primitives.
time.sleep(0.01)
class Bunch(object):
"""
A bunch of threads.
"""
def __init__(self, f, n, wait_before_exit=False):
"""
Construct a bunch of `n` threads running the same function `f`.
If `wait_before_exit` is True, the threads won't terminate until
do_finish() is called.
"""
self.f = f
self.n = n
self.started = []
self.finished = []
self._can_exit = not wait_before_exit
self.wait_thread = support.wait_threads_exit()
self.wait_thread.__enter__()
def task():
tid = threading.get_ident()
self.started.append(tid)
try:
f()
finally:
self.finished.append(tid)
while not self._can_exit:
_wait()
try:
for i in range(n):
start_new_thread(task, ())
except:
self._can_exit = True
raise
def wait_for_started(self):
while len(self.started) < self.n:
_wait()
def wait_for_finished(self):
while len(self.finished) < self.n:
_wait()
# Wait for threads exit
self.wait_thread.__exit__(None, None, None)
def do_finish(self):
self._can_exit = True
class BaseTestCase(unittest.TestCase):
def setUp(self):
self._threads = support.threading_setup()
def tearDown(self):
support.threading_cleanup(*self._threads)
support.reap_children()
def assertTimeout(self, actual, expected):
# The waiting and/or time.monotonic() can be imprecise, which
# is why comparing to the expected value would sometimes fail
# (especially under Windows).
self.assertGreaterEqual(actual, expected * 0.6)
# Test nothing insane happened
self.assertLess(actual, expected * 10.0)
class BaseLockTests(BaseTestCase):
"""
Tests for both recursive and non-recursive locks.
"""
def test_constructor(self):
lock = self.locktype()
del lock
def test_repr(self):
lock = self.locktype()
self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
del lock
def test_locked_repr(self):
lock = self.locktype()
lock.acquire()
self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
del lock
def test_acquire_destroy(self):
lock = self.locktype()
lock.acquire()
del lock
def test_acquire_release(self):
lock = self.locktype()
lock.acquire()
lock.release()
del lock
def test_try_acquire(self):
lock = self.locktype()
self.assertTrue(lock.acquire(False))
lock.release()
def test_try_acquire_contended(self):
lock = self.locktype()
lock.acquire()
result = []
def f():
result.append(lock.acquire(False))
Bunch(f, 1).wait_for_finished()
self.assertFalse(result[0])
lock.release()
def test_acquire_contended(self):
lock = self.locktype()
lock.acquire()
N = 5
def f():
lock.acquire()
lock.release()
b = Bunch(f, N)
b.wait_for_started()
_wait()
self.assertEqual(len(b.finished), 0)
lock.release()
b.wait_for_finished()
self.assertEqual(len(b.finished), N)
def test_with(self):
lock = self.locktype()
def f():
lock.acquire()
lock.release()
def _with(err=None):
with lock:
if err is not None:
raise err
_with()
# Check the lock is unacquired
Bunch(f, 1).wait_for_finished()
self.assertRaises(TypeError, _with, TypeError)
# Check the lock is unacquired
Bunch(f, 1).wait_for_finished()
def test_thread_leak(self):
# The lock shouldn't leak a Thread instance when used from a foreign
# (non-threading) thread.
lock = self.locktype()
def f():
lock.acquire()
lock.release()
n = len(threading.enumerate())
# We run many threads in the hope that existing threads ids won't
# be recycled.
Bunch(f, 15).wait_for_finished()
if len(threading.enumerate()) != n:
# There is a small window during which a Thread instance's
# target function has finished running, but the Thread is still
# alive and registered. Avoid spurious failures by waiting a
# bit more (seen on a buildbot).
time.sleep(0.4)
self.assertEqual(n, len(threading.enumerate()))
def test_timeout(self):
lock = self.locktype()
# Can't set timeout if not blocking
self.assertRaises(ValueError, lock.acquire, 0, 1)
# Invalid timeout values
self.assertRaises(ValueError, lock.acquire, timeout=-100)
self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
# TIMEOUT_MAX is ok
lock.acquire(timeout=TIMEOUT_MAX)
lock.release()
t1 = time.monotonic()
self.assertTrue(lock.acquire(timeout=5))
t2 = time.monotonic()
# Just a sanity test that it didn't actually wait for the timeout.
self.assertLess(t2 - t1, 5)
results = []
def f():
t1 = time.monotonic()
results.append(lock.acquire(timeout=0.5))
t2 = time.monotonic()
results.append(t2 - t1)
Bunch(f, 1).wait_for_finished()
self.assertFalse(results[0])
self.assertTimeout(results[1], 0.5)
def test_weakref_exists(self):
lock = self.locktype()
ref = weakref.ref(lock)
self.assertIsNotNone(ref())
def test_weakref_deleted(self):
lock = self.locktype()
ref = weakref.ref(lock)
del lock
self.assertIsNone(ref())
class LockTests(BaseLockTests):
"""
Tests for non-recursive, weak locks
(which can be acquired and released from different threads).
"""
def test_reacquire(self):
# Lock needs to be released before re-acquiring.
lock = self.locktype()
phase = []
def f():
lock.acquire()
phase.append(None)
lock.acquire()
phase.append(None)
with support.wait_threads_exit():
start_new_thread(f, ())
while len(phase) == 0:
_wait()
_wait()
self.assertEqual(len(phase), 1)
lock.release()
while len(phase) == 1:
_wait()
self.assertEqual(len(phase), 2)
def test_different_thread(self):
# Lock can be released from a different thread.
lock = self.locktype()
lock.acquire()
def f():
lock.release()
b = Bunch(f, 1)
b.wait_for_finished()
lock.acquire()
lock.release()
def test_state_after_timeout(self):
# Issue #11618: check that lock is in a proper state after a
# (non-zero) timeout.
lock = self.locktype()
lock.acquire()
self.assertFalse(lock.acquire(timeout=0.01))
lock.release()
self.assertFalse(lock.locked())
self.assertTrue(lock.acquire(blocking=False))
class RLockTests(BaseLockTests):
"""
Tests for recursive locks.
"""
def test_reacquire(self):
lock = self.locktype()
lock.acquire()
lock.acquire()
lock.release()
lock.acquire()
lock.release()
lock.release()
def test_release_unacquired(self):
# Cannot release an unacquired lock
lock = self.locktype()
self.assertRaises(RuntimeError, lock.release)
lock.acquire()
lock.acquire()
lock.release()
lock.acquire()
lock.release()
lock.release()
self.assertRaises(RuntimeError, lock.release)
def test_release_save_unacquired(self):
# Cannot _release_save an unacquired lock
lock = self.locktype()
self.assertRaises(RuntimeError, lock._release_save)
lock.acquire()
lock.acquire()
lock.release()
lock.acquire()
lock.release()
lock.release()
self.assertRaises(RuntimeError, lock._release_save)
def test_different_thread(self):
# Cannot release from a different thread
lock = self.locktype()
def f():
lock.acquire()
b = Bunch(f, 1, True)
try:
self.assertRaises(RuntimeError, lock.release)
finally:
b.do_finish()
b.wait_for_finished()
def test__is_owned(self):
lock = self.locktype()
self.assertFalse(lock._is_owned())
lock.acquire()
self.assertTrue(lock._is_owned())
lock.acquire()
self.assertTrue(lock._is_owned())
result = []
def f():
result.append(lock._is_owned())
Bunch(f, 1).wait_for_finished()
self.assertFalse(result[0])
lock.release()
self.assertTrue(lock._is_owned())
lock.release()
self.assertFalse(lock._is_owned())
class EventTests(BaseTestCase):
"""
Tests for Event objects.
"""
def test_is_set(self):
evt = self.eventtype()
self.assertFalse(evt.is_set())
evt.set()
self.assertTrue(evt.is_set())
Loading ...