Repository URL to install this package:
|
Version:
0.29.7 ▾
|
# mode: run
# tag: asyncio, pep492
"""
PYTHON setup.py build_ext -i
PYTHON test_from_import.py
PYTHON test_import.py
PYTHON test_async_def.py
PYTHON test_async_def_future.py
PYTHON test_all.py
"""
######## setup.py ########
from Cython.Build.Dependencies import cythonize
from distutils.core import setup
setup(
ext_modules = cythonize("*.pyx"),
)
######## test_from_import.py ########
import from_asyncio_import
import asyncio
def runloop(task):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task())
assert 3 == result, result
import sys
if sys.version_info < (3, 7):
runloop(from_asyncio_import.wait3)
######## test_import.py ########
import import_asyncio
import asyncio
def runloop(task):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task())
assert 3 == result, result
import sys
if sys.version_info < (3, 7):
runloop(import_asyncio.wait3)
######## test_async_def.py ########
import sys
ASYNCIO_SUPPORTS_COROUTINE = sys.version_info[:2] >= (3, 5)
if ASYNCIO_SUPPORTS_COROUTINE:
import async_def
import asyncio
def runloop(task):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task())
assert 3 == result, result
runloop(async_def.wait3)
######## test_async_def_future.py ########
import sys
ASYNCIO_SUPPORTS_COROUTINE = sys.version_info[:2] >= (3, 5)
if ASYNCIO_SUPPORTS_COROUTINE:
from async_def_future import await_future
import asyncio
def runloop():
loop = asyncio.get_event_loop()
task, events, expected = await_future(loop)
result = loop.run_until_complete(task())
assert events == expected, 'expected %s, got %s' % (expected, events)
runloop()
######## test_all.py ########
import sys
import asyncio
ASYNCIO_SUPPORTS_COROUTINE = sys.version_info[:2] >= (3, 5)
ASYNCIO_SUPPORTS_YIELD_FROM = sys.version_info[:2] < (3, 7)
def runloop(task):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task())
assert 3 == result, result
import import_asyncio
if ASYNCIO_SUPPORTS_YIELD_FROM:
runloop(import_asyncio.wait3) # 1a)
import from_asyncio_import
if ASYNCIO_SUPPORTS_YIELD_FROM:
runloop(from_asyncio_import.wait3) # 1b)
import async_def
if ASYNCIO_SUPPORTS_COROUTINE:
runloop(async_def.wait3) # 1c)
if ASYNCIO_SUPPORTS_YIELD_FROM:
runloop(from_asyncio_import.wait3) # 2a)
runloop(import_asyncio.wait3) # 2b)
if ASYNCIO_SUPPORTS_COROUTINE:
runloop(async_def.wait3) # 2c)
import sys
if ASYNCIO_SUPPORTS_YIELD_FROM:
runloop(from_asyncio_import.wait3) # 3a)
runloop(import_asyncio.wait3) # 3b)
if ASYNCIO_SUPPORTS_COROUTINE:
runloop(async_def.wait3) # 3c)
try:
from collections.abc import Generator
except ImportError:
try:
from collections import Generator
except ImportError:
assert sys.version_info < (3,5), "Python 3.5+ should have collections.abc.Generator"
Generator = object # easy win :)
assert isinstance(from_asyncio_import.wait3(), Generator)
assert isinstance(import_asyncio.wait3(), Generator)
assert isinstance((lambda:(yield))(), Generator)
try:
from collections.abc import Awaitable
except ImportError:
try:
from collections import Awaitable
except ImportError:
assert sys.version_info < (3,5), "Python 3.5+ should have collections.abc.Awaitable"
Awaitable = object # easy win :)
assert isinstance(async_def.wait3(), Awaitable)
try:
from collections.abc import Coroutine
except ImportError:
try:
from collections import Coroutine
except ImportError:
assert sys.version_info < (3,5), "Python 3.5+ should have collections.abc.Coroutine"
Coroutine = object # easy win :)
assert isinstance(async_def.wait3(), Coroutine)
######## import_asyncio.pyx ########
# cython: binding=True
try:
from types import coroutine as types_coroutine
except ImportError:
types_coroutine = lambda f:f
import asyncio
@asyncio.coroutine
@types_coroutine
def wait3():
counter = 0
for i in range(3):
print(counter)
yield from asyncio.sleep(0.01)
counter += 1
return counter
######## from_asyncio_import.pyx ########
# cython: binding=True
try:
from types import coroutine as types_coroutine
except ImportError:
types_coroutine = lambda f:f
from asyncio import coroutine, sleep
@coroutine
@types_coroutine
def wait3():
counter = 0
for i in range(3):
print(counter)
yield from sleep(0.01)
counter += 1
return counter
######## async_def.pyx ########
# cython: binding=True
import asyncio
async def wait3():
counter = 0
for i in range(3):
print(counter)
await asyncio.sleep(0.01)
counter += 1
return counter
######## async_def_future.pyx ########
# cython: binding=True
import asyncio
def await_future(loop):
events = []
async def worker():
fut = asyncio.Future()
def setval():
events.append('setval')
fut.set_result(123)
events.append('setup')
loop.call_later(0.2, setval)
events.append(await fut)
async def test():
await worker()
expected = ['setup', 'setval', 123]
return test, events, expected