Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pytype / typeshed / stdlib / asyncio / streams.pyi
Size: Mime:
import sys
from _typeshed import StrPath
from typing import Any, AsyncIterator, Awaitable, Callable, Iterable, Optional

from . import events, protocols, transports
from .base_events import Server

_ClientConnectedCallback = Callable[[StreamReader, StreamWriter], Optional[Awaitable[None]]]

if sys.version_info < (3, 8):
    class IncompleteReadError(EOFError):
        expected: int | None
        partial: bytes
        def __init__(self, partial: bytes, expected: int | None) -> None: ...
    class LimitOverrunError(Exception):
        consumed: int
        def __init__(self, message: str, consumed: int) -> None: ...

async def open_connection(
    host: str | None = ...,
    port: int | str | None = ...,
    *,
    loop: events.AbstractEventLoop | None = ...,
    limit: int = ...,
    ssl_handshake_timeout: float | None = ...,
    **kwds: Any,
) -> tuple[StreamReader, StreamWriter]: ...
async def start_server(
    client_connected_cb: _ClientConnectedCallback,
    host: str | None = ...,
    port: int | str | None = ...,
    *,
    loop: events.AbstractEventLoop | None = ...,
    limit: int = ...,
    ssl_handshake_timeout: float | None = ...,
    **kwds: Any,
) -> Server: ...

if sys.platform != "win32":
    if sys.version_info >= (3, 7):
        _PathType = StrPath
    else:
        _PathType = str
    async def open_unix_connection(
        path: _PathType | None = ..., *, loop: events.AbstractEventLoop | None = ..., limit: int = ..., **kwds: Any
    ) -> tuple[StreamReader, StreamWriter]: ...
    async def start_unix_server(
        client_connected_cb: _ClientConnectedCallback,
        path: _PathType | None = ...,
        *,
        loop: events.AbstractEventLoop | None = ...,
        limit: int = ...,
        **kwds: Any,
    ) -> Server: ...

class FlowControlMixin(protocols.Protocol):
    def __init__(self, loop: events.AbstractEventLoop | None = ...) -> None: ...

class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
    def __init__(
        self,
        stream_reader: StreamReader,
        client_connected_cb: _ClientConnectedCallback | None = ...,
        loop: events.AbstractEventLoop | None = ...,
    ) -> None: ...
    def connection_made(self, transport: transports.BaseTransport) -> None: ...
    def connection_lost(self, exc: Exception | None) -> None: ...
    def data_received(self, data: bytes) -> None: ...
    def eof_received(self) -> bool: ...

class StreamWriter:
    def __init__(
        self,
        transport: transports.BaseTransport,
        protocol: protocols.BaseProtocol,
        reader: StreamReader | None,
        loop: events.AbstractEventLoop,
    ) -> None: ...
    @property
    def transport(self) -> transports.BaseTransport: ...
    def write(self, data: bytes) -> None: ...
    def writelines(self, data: Iterable[bytes]) -> None: ...
    def write_eof(self) -> None: ...
    def can_write_eof(self) -> bool: ...
    def close(self) -> None: ...
    if sys.version_info >= (3, 7):
        def is_closing(self) -> bool: ...
        async def wait_closed(self) -> None: ...
    def get_extra_info(self, name: str, default: Any = ...) -> Any: ...
    async def drain(self) -> None: ...

class StreamReader:
    def __init__(self, limit: int = ..., loop: events.AbstractEventLoop | None = ...) -> None: ...
    def exception(self) -> Exception: ...
    def set_exception(self, exc: Exception) -> None: ...
    def set_transport(self, transport: transports.BaseTransport) -> None: ...
    def feed_eof(self) -> None: ...
    def at_eof(self) -> bool: ...
    def feed_data(self, data: bytes) -> None: ...
    async def readline(self) -> bytes: ...
    async def readuntil(self, separator: bytes = ...) -> bytes: ...
    async def read(self, n: int = ...) -> bytes: ...
    async def readexactly(self, n: int) -> bytes: ...
    def __aiter__(self) -> AsyncIterator[bytes]: ...
    async def __anext__(self) -> bytes: ...