import base64
import binascii
import json
import re
import uuid
import warnings
import zlib
from collections import deque
from types import TracebackType
from typing import ( # noqa
TYPE_CHECKING,
Any,
Dict,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)
from urllib.parse import parse_qsl, unquote, urlencode
from multidict import CIMultiDict, CIMultiDictProxy, MultiMapping # noqa
from .hdrs import (
CONTENT_DISPOSITION,
CONTENT_ENCODING,
CONTENT_LENGTH,
CONTENT_TRANSFER_ENCODING,
CONTENT_TYPE,
)
from .helpers import CHAR, TOKEN, parse_mimetype, reify
from .http import HeadersParser
from .payload import (
JsonPayload,
LookupError,
Order,
Payload,
StringPayload,
get_payload,
payload_type,
)
from .streams import StreamReader
__all__ = ('MultipartReader', 'MultipartWriter', 'BodyPartReader',
'BadContentDispositionHeader', 'BadContentDispositionParam',
'parse_content_disposition', 'content_disposition_filename')
if TYPE_CHECKING: # pragma: no cover
from .client_reqrep import ClientResponse # noqa
class BadContentDispositionHeader(RuntimeWarning):
pass
class BadContentDispositionParam(RuntimeWarning):
pass
def parse_content_disposition(header: Optional[str]) -> Tuple[Optional[str],
Dict[str, str]]:
def is_token(string: str) -> bool:
return bool(string) and TOKEN >= set(string)
def is_quoted(string: str) -> bool:
return string[0] == string[-1] == '"'
def is_rfc5987(string: str) -> bool:
return is_token(string) and string.count("'") == 2
def is_extended_param(string: str) -> bool:
return string.endswith('*')
def is_continuous_param(string: str) -> bool:
pos = string.find('*') + 1
if not pos:
return False
substring = string[pos:-1] if string.endswith('*') else string[pos:]
return substring.isdigit()
def unescape(text: str, *,
chars: str=''.join(map(re.escape, CHAR))) -> str:
return re.sub('\\\\([{}])'.format(chars), '\\1', text)
if not header:
return None, {}
disptype, *parts = header.split(';')
if not is_token(disptype):
warnings.warn(BadContentDispositionHeader(header))
return None, {}
params = {} # type: Dict[str, str]
while parts:
item = parts.pop(0)
if '=' not in item:
warnings.warn(BadContentDispositionHeader(header))
return None, {}
key, value = item.split('=', 1)
key = key.lower().strip()
value = value.lstrip()
if key in params:
warnings.warn(BadContentDispositionHeader(header))
return None, {}
if not is_token(key):
warnings.warn(BadContentDispositionParam(item))
continue
elif is_continuous_param(key):
if is_quoted(value):
value = unescape(value[1:-1])
elif not is_token(value):
warnings.warn(BadContentDispositionParam(item))
continue
elif is_extended_param(key):
if is_rfc5987(value):
encoding, _, value = value.split("'", 2)
encoding = encoding or 'utf-8'
else:
warnings.warn(BadContentDispositionParam(item))
continue
try:
value = unquote(value, encoding, 'strict')
except UnicodeDecodeError: # pragma: nocover
warnings.warn(BadContentDispositionParam(item))
continue
else:
failed = True
if is_quoted(value):
failed = False
value = unescape(value[1:-1].lstrip('\\/'))
elif is_token(value):
failed = False
elif parts:
# maybe just ; in filename, in any case this is just
# one case fix, for proper fix we need to redesign parser
_value = '%s;%s' % (value, parts[0])
if is_quoted(_value):
parts.pop(0)
value = unescape(_value[1:-1].lstrip('\\/'))
failed = False
if failed:
warnings.warn(BadContentDispositionHeader(header))
return None, {}
params[key] = value
return disptype.lower(), params
def content_disposition_filename(params: Mapping[str, str],
name: str='filename') -> Optional[str]:
name_suf = '%s*' % name
if not params:
return None
elif name_suf in params:
return params[name_suf]
elif name in params:
return params[name]
else:
parts = []
fnparams = sorted((key, value)
for key, value in params.items()
if key.startswith(name_suf))
for num, (key, value) in enumerate(fnparams):
_, tail = key.split('*', 1)
if tail.endswith('*'):
tail = tail[:-1]
if tail == str(num):
parts.append(value)
else:
break
if not parts:
return None
value = ''.join(parts)
if "'" in value:
encoding, _, value = value.split("'", 2)
encoding = encoding or 'utf-8'
return unquote(value, encoding, 'strict')
return value
class MultipartResponseWrapper:
"""Wrapper around the MultipartBodyReader.
It takes care about
underlying connection and close it when it needs in.
"""
def __init__(self, resp: 'ClientResponse', stream: Any) -> None:
# TODO: add strong annotation to stream
self.resp = resp
self.stream = stream
def __aiter__(self) -> 'MultipartResponseWrapper':
return self
async def __anext__(self) -> Any:
part = await self.next()
if part is None:
raise StopAsyncIteration # NOQA
return part
def at_eof(self) -> bool:
"""Returns True when all response data had been read."""
return self.resp.content.at_eof()
async def next(self) -> Any:
"""Emits next multipart reader object."""
item = await self.stream.next()
if self.stream.at_eof():
await self.release()
return item
async def release(self) -> None:
"""Releases the connection gracefully, reading all the content
to the void."""
await self.resp.release()
class BodyPartReader:
"""Multipart reader for single body part."""
chunk_size = 8192
def __init__(self, boundary: bytes,
headers: Mapping[str, Optional[str]],
content: StreamReader) -> None:
self.headers = headers
self._boundary = boundary
self._content = content
self._at_eof = False
length = self.headers.get(CONTENT_LENGTH, None)
self._length = int(length) if length is not None else None
self._read_bytes = 0
# TODO: typeing.Deque is not supported by Python 3.5
self._unread = deque() # type: Any
self._prev_chunk = None # type: Optional[bytes]
self._content_eof = 0
self._cache = {} # type: Dict[str, Any]
def __aiter__(self) -> 'BodyPartReader':
return self
async def __anext__(self) -> Any:
part = await self.next()
if part is None:
raise StopAsyncIteration # NOQA
return part
async def next(self) -> Any:
item = await self.read()
if not item:
return None
return item
async def read(self, *, decode: bool=False) -> Any:
"""Reads body part data.
decode: Decodes data following by encoding
method from Content-Encoding header. If it missed
data remains untouched
"""
if self._at_eof:
return b''
data = bytearray()
while not self._at_eof:
data.extend((await self.read_chunk(self.chunk_size)))
if decode:
return self.decode(data)
return data
async def read_chunk(self, size: int=chunk_size) -> bytes:
"""Reads body part content chunk of the specified size.
size: chunk size
"""
if self._at_eof:
return b''
if self._length:
chunk = await self._read_chunk_from_length(size)
else:
chunk = await self._read_chunk_from_stream(size)
self._read_bytes += len(chunk)
if self._read_bytes == self._length:
self._at_eof = True
if self._at_eof:
clrf = await self._content.readline()
assert b'\r\n' == clrf, \
'reader did not read all the data or it is malformed'
return chunk
async def _read_chunk_from_length(self, size: int) -> bytes:
# Reads body part content chunk of the specified size.
# The body part must has Content-Length header with proper value.
assert self._length is not None, \
'Content-Length required for chunked read'
chunk_size = min(size, self._length - self._read_bytes)
chunk = await self._content.read(chunk_size)
return chunk
async def _read_chunk_from_stream(self, size: int) -> bytes:
# Reads content chunk of body part with unknown length.
# The Content-Length header for body part is not necessary.
assert size >= len(self._boundary) + 2, \
'Chunk size must be greater or equal than boundary length + 2'
first_chunk = self._prev_chunk is None
if first_chunk:
self._prev_chunk = await self._content.read(size)
chunk = await self._content.read(size)
self._content_eof += int(self._content.at_eof())
assert self._content_eof < 3, "Reading after EOF"
assert self._prev_chunk is not None
window = self._prev_chunk + chunk
sub = b'\r\n' + self._boundary
if first_chunk:
idx = window.find(sub)
else:
idx = window.find(sub, max(0, len(self._prev_chunk) - len(sub)))
if idx >= 0:
# pushing boundary back to content
with warnings.catch_warnings():
warnings.filterwarnings("ignore",
category=DeprecationWarning)
self._content.unread_data(window[idx:])
if size > idx:
self._prev_chunk = self._prev_chunk[:idx]
chunk = window[len(self._prev_chunk):idx]
if not chunk:
Loading ...