import asyncio
import datetime
import io
import re
import socket
import string
import tempfile
import types
import warnings
from email.utils import parsedate
from http.cookies import SimpleCookie
from types import MappingProxyType
from typing import ( # noqa
TYPE_CHECKING,
Any,
Dict,
Iterator,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
cast,
)
from urllib.parse import parse_qsl
import attr
from multidict import CIMultiDict, CIMultiDictProxy, MultiDict, MultiDictProxy
from yarl import URL
from . import hdrs
from .abc import AbstractStreamWriter
from .helpers import DEBUG, ChainMapProxy, HeadersMixin, reify, sentinel
from .http_parser import RawRequestMessage
from .multipart import MultipartReader
from .streams import EmptyStreamReader, StreamReader
from .typedefs import (
DEFAULT_JSON_DECODER,
JSONDecoder,
LooseHeaders,
RawHeaders,
StrOrURL,
)
from .web_exceptions import HTTPRequestEntityTooLarge
from .web_response import StreamResponse
__all__ = ('BaseRequest', 'FileField', 'Request')
if TYPE_CHECKING: # pragma: no cover
from .web_app import Application # noqa
from .web_urldispatcher import UrlMappingMatchInfo # noqa
from .web_protocol import RequestHandler # noqa
@attr.s(frozen=True, slots=True)
class FileField:
name = attr.ib(type=str)
filename = attr.ib(type=str)
file = attr.ib(type=io.BufferedReader)
content_type = attr.ib(type=str)
headers = attr.ib(type=CIMultiDictProxy) # type: CIMultiDictProxy[str]
_TCHAR = string.digits + string.ascii_letters + r"!#$%&'*+.^_`|~-"
# '-' at the end to prevent interpretation as range in a char class
_TOKEN = r'[{tchar}]+'.format(tchar=_TCHAR)
_QDTEXT = r'[{}]'.format(
r''.join(chr(c) for c in (0x09, 0x20, 0x21) + tuple(range(0x23, 0x7F))))
# qdtext includes 0x5C to escape 0x5D ('\]')
# qdtext excludes obs-text (because obsoleted, and encoding not specified)
_QUOTED_PAIR = r'\\[\t !-~]'
_QUOTED_STRING = r'"(?:{quoted_pair}|{qdtext})*"'.format(
qdtext=_QDTEXT, quoted_pair=_QUOTED_PAIR)
_FORWARDED_PAIR = (
r'({token})=({token}|{quoted_string})(:\d{{1,4}})?'.format(
token=_TOKEN,
quoted_string=_QUOTED_STRING))
_QUOTED_PAIR_REPLACE_RE = re.compile(r'\\([\t !-~])')
# same pattern as _QUOTED_PAIR but contains a capture group
_FORWARDED_PAIR_RE = re.compile(_FORWARDED_PAIR)
############################################################
# HTTP Request
############################################################
class BaseRequest(MutableMapping[str, Any], HeadersMixin):
POST_METHODS = {hdrs.METH_PATCH, hdrs.METH_POST, hdrs.METH_PUT,
hdrs.METH_TRACE, hdrs.METH_DELETE}
ATTRS = HeadersMixin.ATTRS | frozenset([
'_message', '_protocol', '_payload_writer', '_payload', '_headers',
'_method', '_version', '_rel_url', '_post', '_read_bytes',
'_state', '_cache', '_task', '_client_max_size', '_loop',
'_transport_sslcontext', '_transport_peername'])
def __init__(self, message: RawRequestMessage,
payload: StreamReader, protocol: 'RequestHandler',
payload_writer: AbstractStreamWriter,
task: 'asyncio.Task[None]',
loop: asyncio.AbstractEventLoop,
*, client_max_size: int=1024**2,
state: Optional[Dict[str, Any]]=None,
scheme: Optional[str]=None,
host: Optional[str]=None,
remote: Optional[str]=None) -> None:
if state is None:
state = {}
self._message = message
self._protocol = protocol
self._payload_writer = payload_writer
self._payload = payload
self._headers = message.headers
self._method = message.method
self._version = message.version
self._rel_url = message.url
self._post = None # type: Optional[MultiDictProxy[Union[str, bytes, FileField]]] # noqa
self._read_bytes = None # type: Optional[bytes]
self._state = state
self._cache = {} # type: Dict[str, Any]
self._task = task
self._client_max_size = client_max_size
self._loop = loop
transport = self._protocol.transport
assert transport is not None
self._transport_sslcontext = transport.get_extra_info('sslcontext')
self._transport_peername = transport.get_extra_info('peername')
if scheme is not None:
self._cache['scheme'] = scheme
if host is not None:
self._cache['host'] = host
if remote is not None:
self._cache['remote'] = remote
def clone(self, *, method: str=sentinel, rel_url: StrOrURL=sentinel,
headers: LooseHeaders=sentinel, scheme: str=sentinel,
host: str=sentinel,
remote: str=sentinel) -> 'BaseRequest':
"""Clone itself with replacement some attributes.
Creates and returns a new instance of Request object. If no parameters
are given, an exact copy is returned. If a parameter is not passed, it
will reuse the one from the current request object.
"""
if self._read_bytes:
raise RuntimeError("Cannot clone request "
"after reading its content")
dct = {} # type: Dict[str, Any]
if method is not sentinel:
dct['method'] = method
if rel_url is not sentinel:
new_url = URL(rel_url)
dct['url'] = new_url
dct['path'] = str(new_url)
if headers is not sentinel:
# a copy semantic
dct['headers'] = CIMultiDictProxy(CIMultiDict(headers))
dct['raw_headers'] = tuple((k.encode('utf-8'), v.encode('utf-8'))
for k, v in headers.items())
message = self._message._replace(**dct)
kwargs = {}
if scheme is not sentinel:
kwargs['scheme'] = scheme
if host is not sentinel:
kwargs['host'] = host
if remote is not sentinel:
kwargs['remote'] = remote
return self.__class__(
message,
self._payload,
self._protocol,
self._payload_writer,
self._task,
self._loop,
client_max_size=self._client_max_size,
state=self._state.copy(),
**kwargs)
@property
def task(self) -> 'asyncio.Task[None]':
return self._task
@property
def protocol(self) -> 'RequestHandler':
return self._protocol
@property
def transport(self) -> Optional[asyncio.Transport]:
if self._protocol is None:
return None
return self._protocol.transport
@property
def writer(self) -> AbstractStreamWriter:
return self._payload_writer
@reify
def message(self) -> RawRequestMessage:
warnings.warn("Request.message is deprecated",
DeprecationWarning,
stacklevel=3)
return self._message
@reify
def rel_url(self) -> URL:
return self._rel_url
@reify
def loop(self) -> asyncio.AbstractEventLoop:
warnings.warn("request.loop property is deprecated",
DeprecationWarning,
stacklevel=2)
return self._loop
# MutableMapping API
def __getitem__(self, key: str) -> Any:
return self._state[key]
def __setitem__(self, key: str, value: Any) -> None:
self._state[key] = value
def __delitem__(self, key: str) -> None:
del self._state[key]
def __len__(self) -> int:
return len(self._state)
def __iter__(self) -> Iterator[str]:
return iter(self._state)
########
@reify
def secure(self) -> bool:
"""A bool indicating if the request is handled with SSL."""
return self.scheme == 'https'
@reify
def forwarded(self) -> Tuple[Mapping[str, str], ...]:
"""A tuple containing all parsed Forwarded header(s).
Makes an effort to parse Forwarded headers as specified by RFC 7239:
- It adds one (immutable) dictionary per Forwarded 'field-value', ie
per proxy. The element corresponds to the data in the Forwarded
field-value added by the first proxy encountered by the client. Each
subsequent item corresponds to those added by later proxies.
- It checks that every value has valid syntax in general as specified
in section 4: either a 'token' or a 'quoted-string'.
- It un-escapes found escape sequences.
- It does NOT validate 'by' and 'for' contents as specified in section
6.
- It does NOT validate 'host' contents (Host ABNF).
- It does NOT validate 'proto' contents for valid URI scheme names.
Returns a tuple containing one or more immutable dicts
"""
elems = []
for field_value in self._message.headers.getall(hdrs.FORWARDED, ()):
length = len(field_value)
pos = 0
need_separator = False
elem = {} # type: Dict[str, str]
elems.append(types.MappingProxyType(elem))
while 0 <= pos < length:
match = _FORWARDED_PAIR_RE.match(field_value, pos)
if match is not None: # got a valid forwarded-pair
if need_separator:
# bad syntax here, skip to next comma
pos = field_value.find(',', pos)
else:
name, value, port = match.groups()
if value[0] == '"':
# quoted string: remove quotes and unescape
value = _QUOTED_PAIR_REPLACE_RE.sub(r'\1',
value[1:-1])
if port:
value += port
elem[name.lower()] = value
pos += len(match.group(0))
need_separator = True
elif field_value[pos] == ',': # next forwarded-element
need_separator = False
elem = {}
elems.append(types.MappingProxyType(elem))
pos += 1
elif field_value[pos] == ';': # next forwarded-pair
need_separator = False
pos += 1
elif field_value[pos] in ' \t':
# Allow whitespace even between forwarded-pairs, though
# RFC 7239 doesn't. This simplifies code and is in line
# with Postel's law.
pos += 1
else:
# bad syntax here, skip to next comma
pos = field_value.find(',', pos)
return tuple(elems)
@reify
def scheme(self) -> str:
"""A string representing the scheme of the request.
Hostname is resolved in this order:
- overridden value by .clone(scheme=new_scheme) call.
- type of connection to peer: HTTPS if socket is SSL, HTTP otherwise.
'http' or 'https'.
"""
if self._transport_sslcontext:
return 'https'
else:
return 'http'
@reify
def method(self) -> str:
"""Read only property for getting HTTP method.
The value is upper-cased str like 'GET', 'POST', 'PUT' etc.
"""
return self._method
@reify
def version(self) -> Tuple[int, int]:
Loading ...