Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / aiohttp   python

Repository URL to install this package:

/ client.py

"""HTTP Client for asyncio."""

import asyncio
import base64
import hashlib
import json
import os
import sys
import traceback
import warnings
from types import SimpleNamespace, TracebackType
from typing import (  # noqa
    Any,
    Coroutine,
    Generator,
    Generic,
    Iterable,
    List,
    Mapping,
    Optional,
    Set,
    Tuple,
    Type,
    TypeVar,
    Union,
)

import attr
from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
from yarl import URL

from . import hdrs, http, payload
from .abc import AbstractCookieJar
from .client_exceptions import (
    ClientConnectionError,
    ClientConnectorCertificateError,
    ClientConnectorError,
    ClientConnectorSSLError,
    ClientError,
    ClientHttpProxyError,
    ClientOSError,
    ClientPayloadError,
    ClientProxyConnectionError,
    ClientResponseError,
    ClientSSLError,
    ContentTypeError,
    InvalidURL,
    ServerConnectionError,
    ServerDisconnectedError,
    ServerFingerprintMismatch,
    ServerTimeoutError,
    TooManyRedirects,
    WSServerHandshakeError,
)
from .client_reqrep import (
    ClientRequest,
    ClientResponse,
    Fingerprint,
    RequestInfo,
    _merge_ssl_params,
)
from .client_ws import ClientWebSocketResponse
from .connector import BaseConnector, TCPConnector, UnixConnector
from .cookiejar import CookieJar
from .helpers import (
    DEBUG,
    PY_36,
    BasicAuth,
    CeilTimeout,
    TimeoutHandle,
    get_running_loop,
    proxies_from_env,
    sentinel,
    strip_auth_from_url,
)
from .http import WS_KEY, HttpVersion, WebSocketReader, WebSocketWriter
from .http_websocket import (  # noqa
    WSHandshakeError,
    WSMessage,
    ws_ext_gen,
    ws_ext_parse,
)
from .streams import FlowControlDataQueue
from .tracing import Trace, TraceConfig
from .typedefs import JSONEncoder, LooseCookies, LooseHeaders, StrOrURL

__all__ = (
    # client_exceptions
    'ClientConnectionError',
    'ClientConnectorCertificateError',
    'ClientConnectorError',
    'ClientConnectorSSLError',
    'ClientError',
    'ClientHttpProxyError',
    'ClientOSError',
    'ClientPayloadError',
    'ClientProxyConnectionError',
    'ClientResponseError',
    'ClientSSLError',
    'ContentTypeError',
    'InvalidURL',
    'ServerConnectionError',
    'ServerDisconnectedError',
    'ServerFingerprintMismatch',
    'ServerTimeoutError',
    'TooManyRedirects',
    'WSServerHandshakeError',
    # client_reqrep
    'ClientRequest',
    'ClientResponse',
    'Fingerprint',
    'RequestInfo',
    # connector
    'BaseConnector',
    'TCPConnector',
    'UnixConnector',
    # client_ws
    'ClientWebSocketResponse',
    # client
    'ClientSession',
    'ClientTimeout',
    'request')


try:
    from ssl import SSLContext
except ImportError:  # pragma: no cover
    SSLContext = object  # type: ignore


@attr.s(frozen=True, slots=True)
class ClientTimeout:
    total = attr.ib(type=Optional[float], default=None)
    connect = attr.ib(type=Optional[float], default=None)
    sock_read = attr.ib(type=Optional[float], default=None)
    sock_connect = attr.ib(type=Optional[float], default=None)

    # pool_queue_timeout = attr.ib(type=float, default=None)
    # dns_resolution_timeout = attr.ib(type=float, default=None)
    # socket_connect_timeout = attr.ib(type=float, default=None)
    # connection_acquiring_timeout = attr.ib(type=float, default=None)
    # new_connection_timeout = attr.ib(type=float, default=None)
    # http_header_timeout = attr.ib(type=float, default=None)
    # response_body_timeout = attr.ib(type=float, default=None)

    # to create a timeout specific for a single request, either
    # - create a completely new one to overwrite the default
    # - or use http://www.attrs.org/en/stable/api.html#attr.evolve
    # to overwrite the defaults


# 5 Minute default read timeout
DEFAULT_TIMEOUT = ClientTimeout(total=5*60)

_RetType = TypeVar('_RetType')


class ClientSession:
    """First-class interface for making HTTP requests."""

    ATTRS = frozenset([
        '_source_traceback', '_connector',
        'requote_redirect_url', '_loop', '_cookie_jar',
        '_connector_owner', '_default_auth',
        '_version', '_json_serialize',
        '_requote_redirect_url',
        '_timeout', '_raise_for_status', '_auto_decompress',
        '_trust_env', '_default_headers', '_skip_auto_headers',
        '_request_class', '_response_class',
        '_ws_response_class', '_trace_configs'])

    _source_traceback = None
    _connector = None

    def __init__(self, *, connector: Optional[BaseConnector]=None,
                 loop: Optional[asyncio.AbstractEventLoop]=None,
                 cookies: Optional[LooseCookies]=None,
                 headers: Optional[LooseHeaders]=None,
                 skip_auto_headers: Optional[Iterable[str]]=None,
                 auth: Optional[BasicAuth]=None,
                 json_serialize: JSONEncoder=json.dumps,
                 request_class: Type[ClientRequest]=ClientRequest,
                 response_class: Type[ClientResponse]=ClientResponse,
                 ws_response_class: Type[ClientWebSocketResponse]=ClientWebSocketResponse,  # noqa
                 version: HttpVersion=http.HttpVersion11,
                 cookie_jar: Optional[AbstractCookieJar]=None,
                 connector_owner: bool=True,
                 raise_for_status: bool=False,
                 read_timeout: Union[float, object]=sentinel,
                 conn_timeout: Optional[float]=None,
                 timeout: Union[object, ClientTimeout]=sentinel,
                 auto_decompress: bool=True,
                 trust_env: bool=False,
                 requote_redirect_url: bool=True,
                 trace_configs: Optional[List[TraceConfig]]=None) -> None:

        if loop is None:
            if connector is not None:
                loop = connector._loop

        loop = get_running_loop(loop)

        if connector is None:
            connector = TCPConnector(loop=loop)

        if connector._loop is not loop:
            raise RuntimeError(
                "Session and connector has to use same event loop")

        self._loop = loop

        if loop.get_debug():
            self._source_traceback = traceback.extract_stack(sys._getframe(1))

        if cookie_jar is None:
            cookie_jar = CookieJar(loop=loop)
        self._cookie_jar = cookie_jar

        if cookies is not None:
            self._cookie_jar.update_cookies(cookies)

        self._connector = connector  # type: BaseConnector
        self._connector_owner = connector_owner
        self._default_auth = auth
        self._version = version
        self._json_serialize = json_serialize
        if timeout is sentinel:
            self._timeout = DEFAULT_TIMEOUT
            if read_timeout is not sentinel:
                warnings.warn("read_timeout is deprecated, "
                              "use timeout argument instead",
                              DeprecationWarning,
                              stacklevel=2)
                self._timeout = attr.evolve(self._timeout, total=read_timeout)
            if conn_timeout is not None:
                self._timeout = attr.evolve(self._timeout,
                                            connect=conn_timeout)
                warnings.warn("conn_timeout is deprecated, "
                              "use timeout argument instead",
                              DeprecationWarning,
                              stacklevel=2)
        else:
            self._timeout = timeout  # type: ignore
            if read_timeout is not sentinel:
                raise ValueError("read_timeout and timeout parameters "
                                 "conflict, please setup "
                                 "timeout.read")
            if conn_timeout is not None:
                raise ValueError("conn_timeout and timeout parameters "
                                 "conflict, please setup "
                                 "timeout.connect")
        self._raise_for_status = raise_for_status
        self._auto_decompress = auto_decompress
        self._trust_env = trust_env
        self._requote_redirect_url = requote_redirect_url

        # Convert to list of tuples
        if headers:
            headers = CIMultiDict(headers)
        else:
            headers = CIMultiDict()
        self._default_headers = headers
        if skip_auto_headers is not None:
            self._skip_auto_headers = frozenset([istr(i)
                                                 for i in skip_auto_headers])
        else:
            self._skip_auto_headers = frozenset()

        self._request_class = request_class
        self._response_class = response_class
        self._ws_response_class = ws_response_class

        self._trace_configs = trace_configs or []
        for trace_config in self._trace_configs:
            trace_config.freeze()

    def __init_subclass__(cls: Type['ClientSession']) -> None:
        warnings.warn("Inheritance class {} from ClientSession "
                      "is discouraged".format(cls.__name__),
                      DeprecationWarning,
                      stacklevel=2)

    if DEBUG:
        def __setattr__(self, name: str, val: Any) -> None:
            if name not in self.ATTRS:
                warnings.warn("Setting custom ClientSession.{} attribute "
                              "is discouraged".format(name),
                              DeprecationWarning,
                              stacklevel=2)
            super().__setattr__(name, val)

    def __del__(self, _warnings: Any=warnings) -> None:
        if not self.closed:
            if PY_36:
                kwargs = {'source': self}
            else:
                kwargs = {}
            _warnings.warn("Unclosed client session {!r}".format(self),
                           ResourceWarning,
                           **kwargs)
            context = {'client_session': self,
                       'message': 'Unclosed client session'}
            if self._source_traceback is not None:
                context['source_traceback'] = self._source_traceback
            self._loop.call_exception_handler(context)

    def request(self,
                method: str,
                url: StrOrURL,
                **kwargs: Any) -> '_RequestContextManager':
        """Perform HTTP request."""
        return _RequestContextManager(self._request(method, url, **kwargs))

    async def _request(
            self,
            method: str,
            str_or_url: StrOrURL, *,
            params: Optional[Mapping[str, str]]=None,
            data: Any=None,
            json: Any=None,
            cookies: Optional[LooseCookies]=None,
            headers: LooseHeaders=None,
            skip_auto_headers: Optional[Iterable[str]]=None,
            auth: Optional[BasicAuth]=None,
            allow_redirects: bool=True,
            max_redirects: int=10,
            compress: Optional[str]=None,
            chunked: Optional[bool]=None,
            expect100: bool=False,
            raise_for_status: Optional[bool]=None,
            read_until_eof: bool=True,
            proxy: Optional[StrOrURL]=None,
            proxy_auth: Optional[BasicAuth]=None,
            timeout: Union[ClientTimeout, object]=sentinel,
            verify_ssl: Optional[bool]=None,
            fingerprint: Optional[bytes]=None,
            ssl_context: Optional[SSLContext]=None,
            ssl: Optional[Union[SSLContext, bool, Fingerprint]]=None,
            proxy_headers: Optional[LooseHeaders]=None,
            trace_request_ctx: Optional[SimpleNamespace]=None
    ) -> ClientResponse:

        # NOTE: timeout clamps existing connect and read timeouts.  We cannot
        # set the default to None because we need to detect if the user wants
        # to use the existing timeouts by setting timeout to None.
Loading ...