import asyncio
import functools
import random
import sys
import traceback
import warnings
from collections import defaultdict, deque
from contextlib import suppress
from http.cookies import SimpleCookie
from itertools import cycle, islice
from time import monotonic
from types import TracebackType
from typing import ( # noqa
TYPE_CHECKING,
Any,
Awaitable,
Callable,
DefaultDict,
Dict,
Iterator,
List,
Optional,
Set,
Tuple,
Type,
Union,
cast,
)
import attr
from . import hdrs, helpers
from .abc import AbstractResolver
from .client_exceptions import (
ClientConnectionError,
ClientConnectorCertificateError,
ClientConnectorError,
ClientConnectorSSLError,
ClientHttpProxyError,
ClientProxyConnectionError,
ServerFingerprintMismatch,
cert_errors,
ssl_errors,
)
from .client_proto import ResponseHandler
from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
from .helpers import (
PY_36,
CeilTimeout,
get_running_loop,
is_ip_address,
noop2,
sentinel,
)
from .http import RESPONSES
from .locks import EventResultOrError
from .resolver import DefaultResolver
try:
import ssl
SSLContext = ssl.SSLContext
except ImportError: # pragma: no cover
ssl = None # type: ignore
SSLContext = object # type: ignore
__all__ = ('BaseConnector', 'TCPConnector', 'UnixConnector')
if TYPE_CHECKING: # pragma: no cover
from .client import ClientTimeout # noqa
from .client_reqrep import ConnectionKey # noqa
from .tracing import Trace # noqa
class _DeprecationWaiter:
__slots__ = ('_awaitable', '_awaited')
def __init__(self, awaitable: Awaitable[Any]) -> None:
self._awaitable = awaitable
self._awaited = False
def __await__(self) -> Any:
self._awaited = True
return self._awaitable.__await__()
def __del__(self) -> None:
if not self._awaited:
warnings.warn("Connector.close() is a coroutine, "
"please use await connector.close()",
DeprecationWarning)
class Connection:
_source_traceback = None
_transport = None
def __init__(self, connector: 'BaseConnector',
key: 'ConnectionKey',
protocol: ResponseHandler,
loop: asyncio.AbstractEventLoop) -> None:
self._key = key
self._connector = connector
self._loop = loop
self._protocol = protocol # type: Optional[ResponseHandler]
self._callbacks = [] # type: List[Callable[[], None]]
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
def __repr__(self) -> str:
return 'Connection<{}>'.format(self._key)
def __del__(self, _warnings: Any=warnings) -> None:
if self._protocol is not None:
if PY_36:
kwargs = {'source': self}
else:
kwargs = {}
_warnings.warn('Unclosed connection {!r}'.format(self),
ResourceWarning,
**kwargs)
if self._loop.is_closed():
return
self._connector._release(
self._key, self._protocol, should_close=True)
context = {'client_connection': self,
'message': 'Unclosed connection'}
if self._source_traceback is not None:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
@property
def loop(self) -> asyncio.AbstractEventLoop:
warnings.warn("connector.loop property is deprecated",
DeprecationWarning,
stacklevel=2)
return self._loop
@property
def transport(self) -> Optional[asyncio.Transport]:
if self._protocol is None:
return None
return self._protocol.transport
@property
def protocol(self) -> Optional[ResponseHandler]:
return self._protocol
def add_callback(self, callback: Callable[[], None]) -> None:
if callback is not None:
self._callbacks.append(callback)
def _notify_release(self) -> None:
callbacks, self._callbacks = self._callbacks[:], []
for cb in callbacks:
with suppress(Exception):
cb()
def close(self) -> None:
self._notify_release()
if self._protocol is not None:
self._connector._release(
self._key, self._protocol, should_close=True)
self._protocol = None
def release(self) -> None:
self._notify_release()
if self._protocol is not None:
self._connector._release(
self._key, self._protocol,
should_close=self._protocol.should_close)
self._protocol = None
@property
def closed(self) -> bool:
return self._protocol is None or not self._protocol.is_connected()
class _TransportPlaceholder:
""" placeholder for BaseConnector.connect function """
def close(self) -> None:
pass
class BaseConnector:
"""Base connector class.
keepalive_timeout - (optional) Keep-alive timeout.
force_close - Set to True to force close and do reconnect
after each request (and between redirects).
limit - The total number of simultaneous connections.
limit_per_host - Number of simultaneous connections to one host.
enable_cleanup_closed - Enables clean-up closed ssl transports.
Disabled by default.
loop - Optional event loop.
"""
_closed = True # prevent AttributeError in __del__ if ctor was failed
_source_traceback = None
# abort transport after 2 seconds (cleanup broken connections)
_cleanup_closed_period = 2.0
def __init__(self, *,
keepalive_timeout: Union[object, None, float]=sentinel,
force_close: bool=False,
limit: int=100, limit_per_host: int=0,
enable_cleanup_closed: bool=False,
loop: Optional[asyncio.AbstractEventLoop]=None) -> None:
if force_close:
if keepalive_timeout is not None and \
keepalive_timeout is not sentinel:
raise ValueError('keepalive_timeout cannot '
'be set if force_close is True')
else:
if keepalive_timeout is sentinel:
keepalive_timeout = 15.0
loop = get_running_loop(loop)
self._closed = False
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
self._conns = {} # type: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] # noqa
self._limit = limit
self._limit_per_host = limit_per_host
self._acquired = set() # type: Set[ResponseHandler]
self._acquired_per_host = defaultdict(set) # type: DefaultDict[ConnectionKey, Set[ResponseHandler]] # noqa
self._keepalive_timeout = cast(float, keepalive_timeout)
self._force_close = force_close
# {host_key: FIFO list of waiters}
self._waiters = defaultdict(deque) # type: ignore
self._loop = loop
self._factory = functools.partial(ResponseHandler, loop=loop)
self.cookies = SimpleCookie()
# start keep-alive connection cleanup task
self._cleanup_handle = None
# start cleanup closed transports task
self._cleanup_closed_handle = None
self._cleanup_closed_disabled = not enable_cleanup_closed
self._cleanup_closed_transports = [] # type: List[Optional[asyncio.Transport]] # noqa
self._cleanup_closed()
def __del__(self, _warnings: Any=warnings) -> None:
if self._closed:
return
if not self._conns:
return
conns = [repr(c) for c in self._conns.values()]
self._close()
if PY_36:
kwargs = {'source': self}
else:
kwargs = {}
_warnings.warn("Unclosed connector {!r}".format(self),
ResourceWarning,
**kwargs)
context = {'connector': self,
'connections': conns,
'message': 'Unclosed connector'}
if self._source_traceback is not None:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
def __enter__(self) -> 'BaseConnector':
warnings.warn('"witn Connector():" is deprecated, '
'use "async with Connector():" instead',
DeprecationWarning)
return self
def __exit__(self, *exc: Any) -> None:
self.close()
async def __aenter__(self) -> 'BaseConnector':
return self
async def __aexit__(self,
exc_type: Optional[Type[BaseException]]=None,
exc_value: Optional[BaseException]=None,
exc_traceback: Optional[TracebackType]=None
) -> None:
await self.close()
@property
def force_close(self) -> bool:
"""Ultimately close connection on releasing if True."""
return self._force_close
@property
def limit(self) -> int:
"""The total number for simultaneous connections.
If limit is 0 the connector has no limit.
The default limit size is 100.
"""
return self._limit
@property
def limit_per_host(self) -> int:
"""The limit_per_host for simultaneous connections
to the same endpoint.
Endpoints are the same if they are have equal
(host, port, is_ssl) triple.
"""
return self._limit_per_host
def _cleanup(self) -> None:
"""Cleanup unused transports."""
if self._cleanup_handle:
self._cleanup_handle.cancel()
now = self._loop.time()
timeout = self._keepalive_timeout
if self._conns:
connections = {}
deadline = now - timeout
for key, conns in self._conns.items():
alive = []
for proto, use_time in conns:
if proto.is_connected():
if use_time - deadline < 0:
transport = proto.transport
proto.close()
if (key.is_ssl and
Loading ...