Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / aiohttp   python

Repository URL to install this package:

/ connector.py

import asyncio
import functools
import random
import sys
import traceback
import warnings
from collections import defaultdict, deque
from contextlib import suppress
from http.cookies import SimpleCookie
from itertools import cycle, islice
from time import monotonic
from types import TracebackType
from typing import (  # noqa
    TYPE_CHECKING,
    Any,
    Awaitable,
    Callable,
    DefaultDict,
    Dict,
    Iterator,
    List,
    Optional,
    Set,
    Tuple,
    Type,
    Union,
    cast,
)

import attr

from . import hdrs, helpers
from .abc import AbstractResolver
from .client_exceptions import (
    ClientConnectionError,
    ClientConnectorCertificateError,
    ClientConnectorError,
    ClientConnectorSSLError,
    ClientHttpProxyError,
    ClientProxyConnectionError,
    ServerFingerprintMismatch,
    cert_errors,
    ssl_errors,
)
from .client_proto import ResponseHandler
from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
from .helpers import (
    PY_36,
    CeilTimeout,
    get_running_loop,
    is_ip_address,
    noop2,
    sentinel,
)
from .http import RESPONSES
from .locks import EventResultOrError
from .resolver import DefaultResolver

try:
    import ssl
    SSLContext = ssl.SSLContext
except ImportError:  # pragma: no cover
    ssl = None  # type: ignore
    SSLContext = object  # type: ignore


__all__ = ('BaseConnector', 'TCPConnector', 'UnixConnector')


if TYPE_CHECKING:  # pragma: no cover
    from .client import ClientTimeout  # noqa
    from .client_reqrep import ConnectionKey  # noqa
    from .tracing import Trace  # noqa


class _DeprecationWaiter:
    __slots__ = ('_awaitable', '_awaited')

    def __init__(self, awaitable: Awaitable[Any]) -> None:
        self._awaitable = awaitable
        self._awaited = False

    def __await__(self) -> Any:
        self._awaited = True
        return self._awaitable.__await__()

    def __del__(self) -> None:
        if not self._awaited:
            warnings.warn("Connector.close() is a coroutine, "
                          "please use await connector.close()",
                          DeprecationWarning)


class Connection:

    _source_traceback = None
    _transport = None

    def __init__(self, connector: 'BaseConnector',
                 key: 'ConnectionKey',
                 protocol: ResponseHandler,
                 loop: asyncio.AbstractEventLoop) -> None:
        self._key = key
        self._connector = connector
        self._loop = loop
        self._protocol = protocol  # type: Optional[ResponseHandler]
        self._callbacks = []  # type: List[Callable[[], None]]

        if loop.get_debug():
            self._source_traceback = traceback.extract_stack(sys._getframe(1))

    def __repr__(self) -> str:
        return 'Connection<{}>'.format(self._key)

    def __del__(self, _warnings: Any=warnings) -> None:
        if self._protocol is not None:
            if PY_36:
                kwargs = {'source': self}
            else:
                kwargs = {}
            _warnings.warn('Unclosed connection {!r}'.format(self),
                           ResourceWarning,
                           **kwargs)
            if self._loop.is_closed():
                return

            self._connector._release(
                self._key, self._protocol, should_close=True)

            context = {'client_connection': self,
                       'message': 'Unclosed connection'}
            if self._source_traceback is not None:
                context['source_traceback'] = self._source_traceback
            self._loop.call_exception_handler(context)

    @property
    def loop(self) -> asyncio.AbstractEventLoop:
        warnings.warn("connector.loop property is deprecated",
                      DeprecationWarning,
                      stacklevel=2)
        return self._loop

    @property
    def transport(self) -> Optional[asyncio.Transport]:
        if self._protocol is None:
            return None
        return self._protocol.transport

    @property
    def protocol(self) -> Optional[ResponseHandler]:
        return self._protocol

    def add_callback(self, callback: Callable[[], None]) -> None:
        if callback is not None:
            self._callbacks.append(callback)

    def _notify_release(self) -> None:
        callbacks, self._callbacks = self._callbacks[:], []

        for cb in callbacks:
            with suppress(Exception):
                cb()

    def close(self) -> None:
        self._notify_release()

        if self._protocol is not None:
            self._connector._release(
                self._key, self._protocol, should_close=True)
            self._protocol = None

    def release(self) -> None:
        self._notify_release()

        if self._protocol is not None:
            self._connector._release(
                self._key, self._protocol,
                should_close=self._protocol.should_close)
            self._protocol = None

    @property
    def closed(self) -> bool:
        return self._protocol is None or not self._protocol.is_connected()


class _TransportPlaceholder:
    """ placeholder for BaseConnector.connect function """

    def close(self) -> None:
        pass


class BaseConnector:
    """Base connector class.

    keepalive_timeout - (optional) Keep-alive timeout.
    force_close - Set to True to force close and do reconnect
        after each request (and between redirects).
    limit - The total number of simultaneous connections.
    limit_per_host - Number of simultaneous connections to one host.
    enable_cleanup_closed - Enables clean-up closed ssl transports.
                            Disabled by default.
    loop - Optional event loop.
    """

    _closed = True  # prevent AttributeError in __del__ if ctor was failed
    _source_traceback = None

    # abort transport after 2 seconds (cleanup broken connections)
    _cleanup_closed_period = 2.0

    def __init__(self, *,
                 keepalive_timeout: Union[object, None, float]=sentinel,
                 force_close: bool=False,
                 limit: int=100, limit_per_host: int=0,
                 enable_cleanup_closed: bool=False,
                 loop: Optional[asyncio.AbstractEventLoop]=None) -> None:

        if force_close:
            if keepalive_timeout is not None and \
               keepalive_timeout is not sentinel:
                raise ValueError('keepalive_timeout cannot '
                                 'be set if force_close is True')
        else:
            if keepalive_timeout is sentinel:
                keepalive_timeout = 15.0

        loop = get_running_loop(loop)

        self._closed = False
        if loop.get_debug():
            self._source_traceback = traceback.extract_stack(sys._getframe(1))

        self._conns = {}  # type: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]]  # noqa
        self._limit = limit
        self._limit_per_host = limit_per_host
        self._acquired = set()  # type: Set[ResponseHandler]
        self._acquired_per_host = defaultdict(set)  # type: DefaultDict[ConnectionKey, Set[ResponseHandler]]  # noqa
        self._keepalive_timeout = cast(float, keepalive_timeout)
        self._force_close = force_close

        # {host_key: FIFO list of waiters}
        self._waiters = defaultdict(deque)  # type: ignore

        self._loop = loop
        self._factory = functools.partial(ResponseHandler, loop=loop)

        self.cookies = SimpleCookie()

        # start keep-alive connection cleanup task
        self._cleanup_handle = None

        # start cleanup closed transports task
        self._cleanup_closed_handle = None
        self._cleanup_closed_disabled = not enable_cleanup_closed
        self._cleanup_closed_transports = []  # type: List[Optional[asyncio.Transport]]  # noqa
        self._cleanup_closed()

    def __del__(self, _warnings: Any=warnings) -> None:
        if self._closed:
            return
        if not self._conns:
            return

        conns = [repr(c) for c in self._conns.values()]

        self._close()

        if PY_36:
            kwargs = {'source': self}
        else:
            kwargs = {}
        _warnings.warn("Unclosed connector {!r}".format(self),
                       ResourceWarning,
                       **kwargs)
        context = {'connector': self,
                   'connections': conns,
                   'message': 'Unclosed connector'}
        if self._source_traceback is not None:
            context['source_traceback'] = self._source_traceback
        self._loop.call_exception_handler(context)

    def __enter__(self) -> 'BaseConnector':
        warnings.warn('"witn Connector():" is deprecated, '
                      'use "async with Connector():" instead',
                      DeprecationWarning)
        return self

    def __exit__(self, *exc: Any) -> None:
        self.close()

    async def __aenter__(self) -> 'BaseConnector':
        return self

    async def __aexit__(self,
                        exc_type: Optional[Type[BaseException]]=None,
                        exc_value: Optional[BaseException]=None,
                        exc_traceback: Optional[TracebackType]=None
                        ) -> None:
        await self.close()

    @property
    def force_close(self) -> bool:
        """Ultimately close connection on releasing if True."""
        return self._force_close

    @property
    def limit(self) -> int:
        """The total number for simultaneous connections.

        If limit is 0 the connector has no limit.
        The default limit size is 100.
        """
        return self._limit

    @property
    def limit_per_host(self) -> int:
        """The limit_per_host for simultaneous connections
        to the same endpoint.

        Endpoints are the same if they are have equal
        (host, port, is_ssl) triple.

        """
        return self._limit_per_host

    def _cleanup(self) -> None:
        """Cleanup unused transports."""
        if self._cleanup_handle:
            self._cleanup_handle.cancel()

        now = self._loop.time()
        timeout = self._keepalive_timeout

        if self._conns:
            connections = {}
            deadline = now - timeout
            for key, conns in self._conns.items():
                alive = []
                for proto, use_time in conns:
                    if proto.is_connected():
                        if use_time - deadline < 0:
                            transport = proto.transport
                            proto.close()
                            if (key.is_ssl and
Loading ...