Repository URL to install this package:
|
Version:
1.3.7 ▾
|
from time import time
import gevent
import gevent.pool
from gevent.event import Event
from gevent.queue import Queue
import greentest
import greentest.timing
import random
from greentest import ExpectedException
import unittest
class TestCoroutinePool(unittest.TestCase):
klass = gevent.pool.Pool
def test_apply_async(self):
done = Event()
def some_work(_):
done.set()
pool = self.klass(2)
pool.apply_async(some_work, ('x', ))
done.wait()
def test_apply(self):
value = 'return value'
def some_work():
return value
pool = self.klass(2)
result = pool.apply(some_work)
self.assertEqual(value, result)
def test_apply_raises(self):
pool = self.klass(1)
def raiser():
raise ExpectedException()
try:
pool.apply(raiser)
except ExpectedException:
pass
else:
self.fail("Should have raised ExpectedException")
# Don't let the metaclass automatically force any error
# that reaches the hub from a spawned greenlet to become
# fatal; that defeats the point of the test.
test_apply_raises.error_fatal = False
def test_multiple_coros(self):
evt = Event()
results = []
def producer():
gevent.sleep(0.001)
results.append('prod')
evt.set()
def consumer():
results.append('cons1')
evt.wait()
results.append('cons2')
pool = self.klass(2)
done = pool.spawn(consumer)
pool.apply_async(producer)
done.get()
self.assertEqual(['cons1', 'prod', 'cons2'], results)
def dont_test_timer_cancel(self):
timer_fired = []
def fire_timer():
timer_fired.append(True)
def some_work():
gevent.timer(0, fire_timer)
pool = self.klass(2)
pool.apply(some_work)
gevent.sleep(0)
self.assertEqual(timer_fired, [])
def test_reentrant(self):
pool = self.klass(1)
result = pool.apply(pool.apply, (lambda a: a + 1, (5, )))
self.assertEqual(result, 6)
evt = Event()
pool.apply_async(evt.set)
evt.wait()
@greentest.skipOnPyPy("Does not work on PyPy") # Why?
def test_stderr_raising(self):
# testing that really egregious errors in the error handling code
# (that prints tracebacks to stderr) don't cause the pool to lose
# any members
import sys
pool = self.klass(size=1)
# we're going to do this by causing the traceback.print_exc in
# safe_apply to raise an exception and thus exit _main_loop
normal_err = sys.stderr
try:
sys.stderr = FakeFile()
waiter = pool.spawn(crash)
with gevent.Timeout(2):
self.assertRaises(RuntimeError, waiter.get)
# the pool should have something free at this point since the
# waiter returned
# pool.Pool change: if an exception is raised during execution of a link,
# the rest of the links are scheduled to be executed on the next hub iteration
# this introduces a delay in updating pool.sem which makes pool.free_count() report 0
# therefore, sleep:
gevent.sleep(0)
self.assertEqual(pool.free_count(), 1)
# shouldn't block when trying to get
with gevent.Timeout.start_new(0.1):
pool.apply(gevent.sleep, (0, ))
finally:
sys.stderr = normal_err
pool.join()
def crash(*_args, **_kw):
raise RuntimeError("Whoa")
class FakeFile(object):
def write(self, *_args):
raise RuntimeError('Whaaa')
class PoolBasicTests(greentest.TestCase):
klass = gevent.pool.Pool
def test_execute_async(self):
p = self.klass(size=2)
self.assertEqual(p.free_count(), 2)
r = []
first = p.spawn(r.append, 1)
self.assertEqual(p.free_count(), 1)
first.get()
self.assertEqual(r, [1])
gevent.sleep(0)
self.assertEqual(p.free_count(), 2)
#Once the pool is exhausted, calling an execute forces a yield.
p.apply_async(r.append, (2, ))
self.assertEqual(1, p.free_count())
self.assertEqual(r, [1])
p.apply_async(r.append, (3, ))
self.assertEqual(0, p.free_count())
self.assertEqual(r, [1])
p.apply_async(r.append, (4, ))
self.assertEqual(r, [1])
gevent.sleep(0.01)
self.assertEqual(sorted(r), [1, 2, 3, 4])
def test_discard(self):
p = self.klass(size=1)
first = p.spawn(gevent.sleep, 1000)
p.discard(first)
first.kill()
assert not first, first
self.assertEqual(len(p), 0)
self.assertEqual(p._semaphore.counter, 1)
def test_add_method(self):
p = self.klass(size=1)
first = gevent.spawn(gevent.sleep, 1000)
try:
second = gevent.spawn(gevent.sleep, 1000)
try:
self.assertEqual(p.free_count(), 1)
self.assertEqual(len(p), 0)
p.add(first)
self.assertEqual(p.free_count(), 0)
self.assertEqual(len(p), 1)
with self.assertRaises(gevent.Timeout):
with gevent.Timeout(0.1):
p.add(second)
self.assertEqual(p.free_count(), 0)
self.assertEqual(len(p), 1)
finally:
second.kill()
finally:
first.kill()
@greentest.ignores_leakcheck
def test_add_method_non_blocking(self):
p = self.klass(size=1)
first = gevent.spawn(gevent.sleep, 1000)
try:
second = gevent.spawn(gevent.sleep, 1000)
try:
p.add(first)
with self.assertRaises(gevent.pool.PoolFull):
p.add(second, blocking=False)
finally:
second.kill()
finally:
first.kill()
@greentest.ignores_leakcheck
def test_add_method_timeout(self):
p = self.klass(size=1)
first = gevent.spawn(gevent.sleep, 1000)
try:
second = gevent.spawn(gevent.sleep, 1000)
try:
p.add(first)
with self.assertRaises(gevent.pool.PoolFull):
p.add(second, timeout=0.100)
finally:
second.kill()
finally:
first.kill()
@greentest.ignores_leakcheck
def test_start_method_timeout(self):
p = self.klass(size=1)
first = gevent.spawn(gevent.sleep, 1000)
try:
second = gevent.Greenlet(gevent.sleep, 1000)
try:
p.add(first)
with self.assertRaises(gevent.pool.PoolFull):
p.start(second, timeout=0.100)
finally:
second.kill()
finally:
first.kill()
def test_apply(self):
p = self.klass()
result = p.apply(lambda a: ('foo', a), (1, ))
self.assertEqual(result, ('foo', 1))
def test_init_error(self):
self.switch_expected = False
self.assertRaises(ValueError, self.klass, -1)
#
# tests from standard library test/test_multiprocessing.py
class TimingWrapper(object):
def __init__(self, func):
self.func = func
self.elapsed = None
def __call__(self, *args, **kwds):
t = time()
try:
return self.func(*args, **kwds)
finally:
self.elapsed = time() - t
def sqr(x, wait=0.0):
gevent.sleep(wait)
return x * x
def squared(x):
return x * x
def sqr_random_sleep(x):
gevent.sleep(random.random() * 0.1)
return x * x
def final_sleep():
for i in range(3):
yield i
gevent.sleep(0.2)
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14
SMALL_RANGE = 10
LARGE_RANGE = 1000
if (greentest.PYPY and greentest.WIN) or greentest.RUN_LEAKCHECKS or greentest.RUN_COVERAGE:
# See comments in test__threadpool.py.
LARGE_RANGE = 25
elif greentest.RUNNING_ON_CI or greentest.EXPECT_POOR_TIMER_RESOLUTION:
LARGE_RANGE = 100
class TestPool(greentest.TestCase): # pylint:disable=too-many-public-methods
__timeout__ = greentest.LARGE_TIMEOUT
size = 1
def setUp(self):
greentest.TestCase.setUp(self)
self.pool = gevent.pool.Pool(self.size)
def cleanup(self):
self.pool.join()
def test_apply(self):
papply = self.pool.apply
self.assertEqual(papply(sqr, (5,)), 25)
self.assertEqual(papply(sqr, (), {'x': 3}), 9)
def test_map(self):
pmap = self.pool.map
self.assertEqual(pmap(sqr, range(SMALL_RANGE)), list(map(squared, range(SMALL_RANGE))))
self.assertEqual(pmap(sqr, range(100)), list(map(squared, range(100))))
def test_async(self):
res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
get = TimingWrapper(res.get)
self.assertEqual(get(), 49)
self.assertTimeoutAlmostEqual(get.elapsed, TIMEOUT1, 1)
def test_async_callback(self):
result = []
res = self.pool.apply_async(sqr, (7, TIMEOUT1,), callback=result.append)
get = TimingWrapper(res.get)
self.assertEqual(get(), 49)
self.assertTimeoutAlmostEqual(get.elapsed, TIMEOUT1, 1)
gevent.sleep(0) # lets the callback run
assert result == [49], result
def test_async_timeout(self):
res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
get = TimingWrapper(res.get)
self.assertRaises(gevent.Timeout, get, timeout=TIMEOUT2)
self.assertTimeoutAlmostEqual(get.elapsed, TIMEOUT2, 1)
self.pool.join()
def test_imap_list_small(self):
it = self.pool.imap(sqr, range(SMALL_RANGE))
self.assertEqual(list(it), list(map(sqr, range(SMALL_RANGE))))
def test_imap_it_small(self):
it = self.pool.imap(sqr, range(SMALL_RANGE))
for i in range(SMALL_RANGE):
self.assertEqual(next(it), i * i)
self.assertRaises(StopIteration, next, it)
def test_imap_it_large(self):
it = self.pool.imap(sqr, range(LARGE_RANGE))
for i in range(LARGE_RANGE):
self.assertEqual(next(it), i * i)
self.assertRaises(StopIteration, next, it)
def test_imap_random(self):
it = self.pool.imap(sqr_random_sleep, range(SMALL_RANGE))
self.assertEqual(list(it), list(map(squared, range(SMALL_RANGE))))
def test_imap_unordered(self):
it = self.pool.imap_unordered(sqr, range(LARGE_RANGE))
self.assertEqual(sorted(it), list(map(squared, range(LARGE_RANGE))))
it = self.pool.imap_unordered(sqr, range(LARGE_RANGE))
self.assertEqual(sorted(it), list(map(squared, range(LARGE_RANGE))))
def test_imap_unordered_random(self):
it = self.pool.imap_unordered(sqr_random_sleep, range(SMALL_RANGE))
self.assertEqual(sorted(it), list(map(squared, range(SMALL_RANGE))))
def test_empty_imap_unordered(self):
it = self.pool.imap_unordered(sqr, [])
self.assertEqual(list(it), [])
def test_empty_imap(self):
it = self.pool.imap(sqr, [])
self.assertEqual(list(it), [])
def test_empty_map(self):
self.assertEqual(self.pool.map(sqr, []), [])
def test_terminate(self):
result = self.pool.map_async(gevent.sleep, [0.1] * ((self.size or 10) * 2))
gevent.sleep(0.1)
kill = TimingWrapper(self.pool.kill)
kill()
assert kill.elapsed < 0.5, kill.elapsed
result.join()
def sleep(self, x):
gevent.sleep(float(x) / 10.)
return str(x)
def test_imap_unordered_sleep(self):
# testing that imap_unordered returns items in competion order
result = list(self.pool.imap_unordered(self.sleep, [10, 1, 2]))
if self.pool.size == 1:
expected = ['10', '1', '2']
else:
expected = ['1', '2', '10']
self.assertEqual(result, expected)
# https://github.com/gevent/gevent/issues/423
def test_imap_no_stop(self):
q = Queue()
q.put(123)
gevent.spawn_later(0.1, q.put, StopIteration)
result = list(self.pool.imap(lambda _: _, q))
self.assertEqual(result, [123])
def test_imap_unordered_no_stop(self):
q = Queue()
q.put(1234)
gevent.spawn_later(0.1, q.put, StopIteration)
result = list(self.pool.imap_unordered(lambda _: _, q))
self.assertEqual(result, [1234])
# same issue, but different test: https://github.com/gevent/gevent/issues/311
def test_imap_final_sleep(self):
result = list(self.pool.imap(sqr, final_sleep()))
self.assertEqual(result, [0, 1, 4])
def test_imap_unordered_final_sleep(self):
result = list(self.pool.imap_unordered(sqr, final_sleep()))
self.assertEqual(result, [0, 1, 4])
# Issue 638
def test_imap_unordered_bounded_queue(self):
iterable = list(range(100))
running = [0]
def short_running_func(i, _j):
running[0] += 1
return i
def make_reader(mapping):
# Simulate a long running reader. No matter how many workers
# we have, we will never have a queue more than size 1
def reader():
result = []
for i, x in enumerate(mapping):
self.assertTrue(running[0] <= i + 2, running[0])
result.append(x)
gevent.sleep(0.01)
self.assertTrue(len(mapping.queue) <= 2, len(mapping.queue))
return result
return reader
# Send two iterables to make sure varargs and kwargs are handled
# correctly
for meth in self.pool.imap_unordered, self.pool.imap:
running[0] = 0
mapping = meth(short_running_func, iterable, iterable,
maxsize=1)
reader = make_reader(mapping)
l = reader()
self.assertEqual(sorted(l), iterable)
@greentest.ignores_leakcheck
class TestPool2(TestPool):
size = 2
@greentest.ignores_leakcheck
class TestPool3(TestPool):
size = 3
@greentest.ignores_leakcheck
class TestPool10(TestPool):
size = 10
class TestPoolUnlimit(TestPool):
size = None
class TestPool0(greentest.TestCase):
size = 0
def test_wait_full(self):
p = gevent.pool.Pool(size=0)
self.assertEqual(0, p.free_count())
self.assertTrue(p.full())
self.assertEqual(0, p.wait_available(timeout=0.01))
class TestJoinSleep(greentest.timing.AbstractGenericWaitTestCase):
def wait(self, timeout):
p = gevent.pool.Pool()
g = p.spawn(gevent.sleep, 10)
try:
p.join(timeout=timeout)
finally:
g.kill()
class TestJoinSleep_raise_error(greentest.timing.AbstractGenericWaitTestCase):
def wait(self, timeout):
p = gevent.pool.Pool()
g = p.spawn(gevent.sleep, 10)
try:
p.join(timeout=timeout, raise_error=True)
finally:
g.kill()
class TestJoinEmpty(greentest.TestCase):
switch_expected = False
def test(self):
p = gevent.pool.Pool()
res = p.join()
self.assertTrue(res, "empty should return true")
class TestSpawn(greentest.TestCase):
switch_expected = True
def test(self):
p = gevent.pool.Pool(1)
self.assertEqual(len(p), 0)
p.spawn(gevent.sleep, 0.1)
self.assertEqual(len(p), 1)
p.spawn(gevent.sleep, 0.1) # this spawn blocks until the old one finishes
self.assertEqual(len(p), 1)
gevent.sleep(0.19 if not greentest.RUNNING_ON_APPVEYOR else 0.5)
self.assertEqual(len(p), 0)
def testSpawnAndWait(self):
p = gevent.pool.Pool(1)
self.assertEqual(len(p), 0)
p.spawn(gevent.sleep, 0.1)
self.assertEqual(len(p), 1)
res = p.join(0.01)
self.assertFalse(res, "waiting on a full pool should return false")
res = p.join()
self.assertTrue(res, "waiting to finish should be true")
self.assertEqual(len(p), 0)
def error_iter():
yield 1
yield 2
raise ExpectedException
class TestErrorInIterator(greentest.TestCase):
error_fatal = False
def test(self):
p = gevent.pool.Pool(3)
self.assertRaises(ExpectedException, p.map, lambda x: None, error_iter())
gevent.sleep(0.001)
def test_unordered(self):
p = gevent.pool.Pool(3)
def unordered():
return list(p.imap_unordered(lambda x: None, error_iter()))
self.assertRaises(ExpectedException, unordered)
gevent.sleep(0.001)
def divide_by(x):
return 1.0 / x
class TestErrorInHandler(greentest.TestCase):
error_fatal = False
def test_map(self):
p = gevent.pool.Pool(3)
self.assertRaises(ZeroDivisionError, p.map, divide_by, [1, 0, 2])
def test_imap(self):
p = gevent.pool.Pool(1)
it = p.imap(divide_by, [1, 0, 2])
self.assertEqual(next(it), 1.0)
self.assertRaises(ZeroDivisionError, next, it)
self.assertEqual(next(it), 0.5)
self.assertRaises(StopIteration, next, it)
def test_imap_unordered(self):
p = gevent.pool.Pool(1)
it = p.imap_unordered(divide_by, [1, 0, 2])
self.assertEqual(next(it), 1.0)
self.assertRaises(ZeroDivisionError, next, it)
self.assertEqual(next(it), 0.5)
self.assertRaises(StopIteration, next, it)
if __name__ == '__main__':
greentest.main()