Repository URL to install this package:
Version:
3.5.0 ▾
|
__all__ = ('AsyncResource', 'IPAddressType', 'IPSockAddrType', 'SocketAttribute', 'SocketStream',
'SocketListener', 'UDPSocket', 'UNIXSocketStream', 'UDPPacketType',
'ConnectedUDPSocket', 'UnreliableObjectReceiveStream', 'UnreliableObjectSendStream',
'UnreliableObjectStream', 'ObjectReceiveStream', 'ObjectSendStream', 'ObjectStream',
'ByteReceiveStream', 'ByteSendStream', 'ByteStream', 'AnyUnreliableByteReceiveStream',
'AnyUnreliableByteSendStream', 'AnyUnreliableByteStream', 'AnyByteReceiveStream',
'AnyByteSendStream', 'AnyByteStream', 'Listener', 'Process', 'Event',
'Condition', 'Lock', 'Semaphore', 'CapacityLimiter', 'CancelScope', 'TaskGroup',
'TaskStatus', 'TestRunner', 'BlockingPortal')
from typing import Any
from ._resources import AsyncResource
from ._sockets import (
ConnectedUDPSocket, IPAddressType, IPSockAddrType, SocketAttribute, SocketListener,
SocketStream, UDPPacketType, UDPSocket, UNIXSocketStream)
from ._streams import (
AnyByteReceiveStream, AnyByteSendStream, AnyByteStream, AnyUnreliableByteReceiveStream,
AnyUnreliableByteSendStream, AnyUnreliableByteStream, ByteReceiveStream, ByteSendStream,
ByteStream, Listener, ObjectReceiveStream, ObjectSendStream, ObjectStream,
UnreliableObjectReceiveStream, UnreliableObjectSendStream, UnreliableObjectStream)
from ._subprocesses import Process
from ._tasks import TaskGroup, TaskStatus
from ._testing import TestRunner
# Re-exported here, for backwards compatibility
# isort: off
from .._core._synchronization import CapacityLimiter, Condition, Event, Lock, Semaphore
from .._core._tasks import CancelScope
from ..from_thread import BlockingPortal
# Re-export imports so they look like they live directly in this package
key: str
value: Any
for key, value in list(locals().items()):
if getattr(value, '__module__', '').startswith('anyio.abc.'):
value.__module__ = __name__