Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / aiohttp   python

Repository URL to install this package:

/ multipart.py

import base64
import binascii
import json
import re
import uuid
import warnings
import zlib
from collections import deque
from types import TracebackType
from typing import (  # noqa
    TYPE_CHECKING,
    Any,
    Dict,
    Iterator,
    List,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
    cast,
)
from urllib.parse import parse_qsl, unquote, urlencode

from multidict import CIMultiDict, CIMultiDictProxy, MultiMapping  # noqa

from .hdrs import (
    CONTENT_DISPOSITION,
    CONTENT_ENCODING,
    CONTENT_LENGTH,
    CONTENT_TRANSFER_ENCODING,
    CONTENT_TYPE,
)
from .helpers import CHAR, TOKEN, parse_mimetype, reify
from .http import HeadersParser
from .payload import (
    JsonPayload,
    LookupError,
    Order,
    Payload,
    StringPayload,
    get_payload,
    payload_type,
)
from .streams import StreamReader

__all__ = ('MultipartReader', 'MultipartWriter', 'BodyPartReader',
           'BadContentDispositionHeader', 'BadContentDispositionParam',
           'parse_content_disposition', 'content_disposition_filename')


if TYPE_CHECKING:  # pragma: no cover
    from .client_reqrep import ClientResponse  # noqa


class BadContentDispositionHeader(RuntimeWarning):
    pass


class BadContentDispositionParam(RuntimeWarning):
    pass


def parse_content_disposition(header: Optional[str]) -> Tuple[Optional[str],
                                                              Dict[str, str]]:

    def is_token(string: str) -> bool:
        return bool(string) and TOKEN >= set(string)

    def is_quoted(string: str) -> bool:
        return string[0] == string[-1] == '"'

    def is_rfc5987(string: str) -> bool:
        return is_token(string) and string.count("'") == 2

    def is_extended_param(string: str) -> bool:
        return string.endswith('*')

    def is_continuous_param(string: str) -> bool:
        pos = string.find('*') + 1
        if not pos:
            return False
        substring = string[pos:-1] if string.endswith('*') else string[pos:]
        return substring.isdigit()

    def unescape(text: str, *,
                 chars: str=''.join(map(re.escape, CHAR))) -> str:
        return re.sub('\\\\([{}])'.format(chars), '\\1', text)

    if not header:
        return None, {}

    disptype, *parts = header.split(';')
    if not is_token(disptype):
        warnings.warn(BadContentDispositionHeader(header))
        return None, {}

    params = {}  # type: Dict[str, str]
    while parts:
        item = parts.pop(0)

        if '=' not in item:
            warnings.warn(BadContentDispositionHeader(header))
            return None, {}

        key, value = item.split('=', 1)
        key = key.lower().strip()
        value = value.lstrip()

        if key in params:
            warnings.warn(BadContentDispositionHeader(header))
            return None, {}

        if not is_token(key):
            warnings.warn(BadContentDispositionParam(item))
            continue

        elif is_continuous_param(key):
            if is_quoted(value):
                value = unescape(value[1:-1])
            elif not is_token(value):
                warnings.warn(BadContentDispositionParam(item))
                continue

        elif is_extended_param(key):
            if is_rfc5987(value):
                encoding, _, value = value.split("'", 2)
                encoding = encoding or 'utf-8'
            else:
                warnings.warn(BadContentDispositionParam(item))
                continue

            try:
                value = unquote(value, encoding, 'strict')
            except UnicodeDecodeError:  # pragma: nocover
                warnings.warn(BadContentDispositionParam(item))
                continue

        else:
            failed = True
            if is_quoted(value):
                failed = False
                value = unescape(value[1:-1].lstrip('\\/'))
            elif is_token(value):
                failed = False
            elif parts:
                # maybe just ; in filename, in any case this is just
                # one case fix, for proper fix we need to redesign parser
                _value = '%s;%s' % (value, parts[0])
                if is_quoted(_value):
                    parts.pop(0)
                    value = unescape(_value[1:-1].lstrip('\\/'))
                    failed = False

            if failed:
                warnings.warn(BadContentDispositionHeader(header))
                return None, {}

        params[key] = value

    return disptype.lower(), params


def content_disposition_filename(params: Mapping[str, str],
                                 name: str='filename') -> Optional[str]:
    name_suf = '%s*' % name
    if not params:
        return None
    elif name_suf in params:
        return params[name_suf]
    elif name in params:
        return params[name]
    else:
        parts = []
        fnparams = sorted((key, value)
                          for key, value in params.items()
                          if key.startswith(name_suf))
        for num, (key, value) in enumerate(fnparams):
            _, tail = key.split('*', 1)
            if tail.endswith('*'):
                tail = tail[:-1]
            if tail == str(num):
                parts.append(value)
            else:
                break
        if not parts:
            return None
        value = ''.join(parts)
        if "'" in value:
            encoding, _, value = value.split("'", 2)
            encoding = encoding or 'utf-8'
            return unquote(value, encoding, 'strict')
        return value


class MultipartResponseWrapper:
    """Wrapper around the MultipartBodyReader.

    It takes care about
    underlying connection and close it when it needs in.
    """

    def __init__(self, resp: 'ClientResponse', stream: Any) -> None:
        # TODO: add strong annotation to stream
        self.resp = resp
        self.stream = stream

    def __aiter__(self) -> 'MultipartResponseWrapper':
        return self

    async def __anext__(self) -> Any:
        part = await self.next()
        if part is None:
            raise StopAsyncIteration  # NOQA
        return part

    def at_eof(self) -> bool:
        """Returns True when all response data had been read."""
        return self.resp.content.at_eof()

    async def next(self) -> Any:
        """Emits next multipart reader object."""
        item = await self.stream.next()
        if self.stream.at_eof():
            await self.release()
        return item

    async def release(self) -> None:
        """Releases the connection gracefully, reading all the content
        to the void."""
        await self.resp.release()


class BodyPartReader:
    """Multipart reader for single body part."""

    chunk_size = 8192

    def __init__(self, boundary: bytes,
                 headers: Mapping[str, Optional[str]],
                 content: StreamReader) -> None:
        self.headers = headers
        self._boundary = boundary
        self._content = content
        self._at_eof = False
        length = self.headers.get(CONTENT_LENGTH, None)
        self._length = int(length) if length is not None else None
        self._read_bytes = 0
        # TODO: typeing.Deque is not supported by Python 3.5
        self._unread = deque()  # type: Any
        self._prev_chunk = None  # type: Optional[bytes]
        self._content_eof = 0
        self._cache = {}  # type: Dict[str, Any]

    def __aiter__(self) -> 'BodyPartReader':
        return self

    async def __anext__(self) -> Any:
        part = await self.next()
        if part is None:
            raise StopAsyncIteration  # NOQA
        return part

    async def next(self) -> Any:
        item = await self.read()
        if not item:
            return None
        return item

    async def read(self, *, decode: bool=False) -> Any:
        """Reads body part data.

        decode: Decodes data following by encoding
                method from Content-Encoding header. If it missed
                data remains untouched
        """
        if self._at_eof:
            return b''
        data = bytearray()
        while not self._at_eof:
            data.extend((await self.read_chunk(self.chunk_size)))
        if decode:
            return self.decode(data)
        return data

    async def read_chunk(self, size: int=chunk_size) -> bytes:
        """Reads body part content chunk of the specified size.

        size: chunk size
        """
        if self._at_eof:
            return b''
        if self._length:
            chunk = await self._read_chunk_from_length(size)
        else:
            chunk = await self._read_chunk_from_stream(size)

        self._read_bytes += len(chunk)
        if self._read_bytes == self._length:
            self._at_eof = True
        if self._at_eof:
            clrf = await self._content.readline()
            assert b'\r\n' == clrf, \
                'reader did not read all the data or it is malformed'
        return chunk

    async def _read_chunk_from_length(self, size: int) -> bytes:
        # Reads body part content chunk of the specified size.
        # The body part must has Content-Length header with proper value.
        assert self._length is not None, \
            'Content-Length required for chunked read'
        chunk_size = min(size, self._length - self._read_bytes)
        chunk = await self._content.read(chunk_size)
        return chunk

    async def _read_chunk_from_stream(self, size: int) -> bytes:
        # Reads content chunk of body part with unknown length.
        # The Content-Length header for body part is not necessary.
        assert size >= len(self._boundary) + 2, \
            'Chunk size must be greater or equal than boundary length + 2'
        first_chunk = self._prev_chunk is None
        if first_chunk:
            self._prev_chunk = await self._content.read(size)

        chunk = await self._content.read(size)
        self._content_eof += int(self._content.at_eof())
        assert self._content_eof < 3, "Reading after EOF"
        assert self._prev_chunk is not None
        window = self._prev_chunk + chunk
        sub = b'\r\n' + self._boundary
        if first_chunk:
            idx = window.find(sub)
        else:
            idx = window.find(sub, max(0, len(self._prev_chunk) - len(sub)))
        if idx >= 0:
            # pushing boundary back to content
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore",
                                        category=DeprecationWarning)
                self._content.unread_data(window[idx:])
            if size > idx:
                self._prev_chunk = self._prev_chunk[:idx]
            chunk = window[len(self._prev_chunk):idx]
            if not chunk:
Loading ...