Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Cython / tests / run / asyncio_generators.srctree
Size: Mime:
# mode: run
# tag: asyncio, pep492

"""
PYTHON setup.py build_ext -i
PYTHON test_from_import.py
PYTHON test_import.py
PYTHON test_async_def.py
PYTHON test_async_def_future.py
PYTHON test_all.py
"""

######## setup.py ########

from Cython.Build.Dependencies import cythonize
from distutils.core import setup

setup(
  ext_modules = cythonize("*.pyx"),
)


######## test_from_import.py ########

import from_asyncio_import
import asyncio

def runloop(task):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(task())
    assert 3 == result, result

import sys
if sys.version_info < (3, 7):
    runloop(from_asyncio_import.wait3)


######## test_import.py ########

import import_asyncio
import asyncio

def runloop(task):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(task())
    assert 3 == result, result

import sys
if sys.version_info < (3, 7):
    runloop(import_asyncio.wait3)


######## test_async_def.py ########

import sys

ASYNCIO_SUPPORTS_COROUTINE = sys.version_info[:2] >= (3, 5)

if ASYNCIO_SUPPORTS_COROUTINE:
    import async_def
    import asyncio

    def runloop(task):
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(task())
        assert 3 == result, result

    runloop(async_def.wait3)


######## test_async_def_future.py ########

import sys

ASYNCIO_SUPPORTS_COROUTINE = sys.version_info[:2] >= (3, 5)

if ASYNCIO_SUPPORTS_COROUTINE:
    from async_def_future import await_future
    import asyncio

    def runloop():
        loop = asyncio.get_event_loop()
        task, events, expected = await_future(loop)
        result = loop.run_until_complete(task())
        assert events == expected, 'expected %s, got %s' % (expected, events)

    runloop()


######## test_all.py ########

import sys
import asyncio

ASYNCIO_SUPPORTS_COROUTINE = sys.version_info[:2] >= (3, 5)
ASYNCIO_SUPPORTS_YIELD_FROM = sys.version_info[:2] < (3, 7)

def runloop(task):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(task())
    assert 3 == result, result

import import_asyncio
if ASYNCIO_SUPPORTS_YIELD_FROM:
    runloop(import_asyncio.wait3)       # 1a)
import from_asyncio_import
if ASYNCIO_SUPPORTS_YIELD_FROM:
    runloop(from_asyncio_import.wait3)  # 1b)

import async_def
if ASYNCIO_SUPPORTS_COROUTINE:
    runloop(async_def.wait3)            # 1c)

if ASYNCIO_SUPPORTS_YIELD_FROM:
    runloop(from_asyncio_import.wait3)  # 2a)
    runloop(import_asyncio.wait3)       # 2b)
if ASYNCIO_SUPPORTS_COROUTINE:
    runloop(async_def.wait3)            # 2c)

import sys
if ASYNCIO_SUPPORTS_YIELD_FROM:
    runloop(from_asyncio_import.wait3)  # 3a)
    runloop(import_asyncio.wait3)       # 3b)
if ASYNCIO_SUPPORTS_COROUTINE:
    runloop(async_def.wait3)            # 3c)

try:
    from collections.abc import Generator
except ImportError:
    try:
        from collections import Generator
    except ImportError:
        assert sys.version_info < (3,5), "Python 3.5+ should have collections.abc.Generator"
        Generator = object  # easy win :)

assert isinstance(from_asyncio_import.wait3(), Generator)
assert isinstance(import_asyncio.wait3(), Generator)
assert isinstance((lambda:(yield))(), Generator)

try:
    from collections.abc import Awaitable
except ImportError:
    try:
        from collections import Awaitable
    except ImportError:
        assert sys.version_info < (3,5), "Python 3.5+ should have collections.abc.Awaitable"
        Awaitable = object  # easy win :)

assert isinstance(async_def.wait3(), Awaitable)

try:
    from collections.abc import Coroutine
except ImportError:
    try:
        from collections import Coroutine
    except ImportError:
        assert sys.version_info < (3,5), "Python 3.5+ should have collections.abc.Coroutine"
        Coroutine = object  # easy win :)

assert isinstance(async_def.wait3(), Coroutine)


######## import_asyncio.pyx ########
# cython: binding=True

try:
    from types import coroutine as types_coroutine
except ImportError:
    types_coroutine = lambda f:f

import asyncio

@asyncio.coroutine
@types_coroutine
def wait3():
    counter = 0
    for i in range(3):
        print(counter)
        yield from asyncio.sleep(0.01)
        counter += 1
    return counter


######## from_asyncio_import.pyx ########
# cython: binding=True

try:
    from types import coroutine as types_coroutine
except ImportError:
    types_coroutine = lambda f:f

from asyncio import coroutine, sleep

@coroutine
@types_coroutine
def wait3():
    counter = 0
    for i in range(3):
        print(counter)
        yield from sleep(0.01)
        counter += 1
    return counter


######## async_def.pyx ########
# cython: binding=True

import asyncio

async def wait3():
    counter = 0
    for i in range(3):
        print(counter)
        await asyncio.sleep(0.01)
        counter += 1
    return counter


######## async_def_future.pyx ########
# cython: binding=True

import asyncio

def await_future(loop):
    events = []
    async def worker():
        fut = asyncio.Future()

        def setval():
            events.append('setval')
            fut.set_result(123)

        events.append('setup')
        loop.call_later(0.2, setval)
        events.append(await fut)

    async def test():
        await worker()

    expected = ['setup', 'setval', 123]
    return test, events, expected