Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

tundish / aiohttp   python

Repository URL to install this package:

/ aiohttp / multidict.py

"""Multidict implementation.

HTTP Headers and URL query string require specific data structure:
multidict. It behaves mostly like a dict but it can have
several values for the same key.
"""


from collections import abc
import os
import sys

__all__ = ('MultiDictProxy', 'CIMultiDictProxy',
           'MultiDict', 'CIMultiDict', 'upstr')

_marker = object()


class _upstr(str):

    """Case insensitive str."""

    def __new__(cls, val='',
                encoding=sys.getdefaultencoding(), errors='strict'):
        if isinstance(val, (bytes, bytearray, memoryview)):
            val = str(val, encoding, errors)
        elif isinstance(val, str):
            pass
        else:
            val = str(val)
        val = val.upper()
        return str.__new__(cls, val)

    def upper(self):
        return self


class _Base:

    def getall(self, key, default=_marker):
        """Return a list of all values matching the key."""
        res = [v for k, v in self._items if k == key]
        if res:
            return res
        if not res and default is not _marker:
            return default
        raise KeyError('Key not found: %r' % key)

    def getone(self, key, default=_marker):
        """Get first value matching the key."""
        for k, v in self._items:
            if k == key:
                return v
        if default is not _marker:
            return default
        raise KeyError('Key not found: %r' % key)

    # Mapping interface #

    def __getitem__(self, key):
        return self.getone(key, _marker)

    def get(self, key, default=None):
        """Get first value matching the key.

        The method is alias for .getone().
        """
        return self.getone(key, default)

    def __iter__(self):
        return iter(self.keys())

    def __len__(self):
        return len(self._items)

    def keys(self):
        """Return a new view of the dictionary's keys."""
        return _KeysView(self._items)

    def items(self):
        """Return a new view of the dictionary's items *(key, value) pairs)."""
        return _ItemsView(self._items)

    def values(self):
        """Return a new view of the dictionary's values."""
        return _ValuesView(self._items)

    def __eq__(self, other):
        if not isinstance(other, abc.Mapping):
            return NotImplemented
        if isinstance(other, _Base):
            return self._items == other._items
        for k, v in self.items():
            nv = other.get(k, _marker)
            if v != nv:
                return False
        return True

    def __contains__(self, key):
        for k, v in self._items:
            if k == key:
                return True
        return False

    def __repr__(self):
        body = ', '.join("'{}': {!r}".format(k, v) for k, v in self.items())
        return '<{} {{{}}}>'.format(self.__class__.__name__, body)


class _CIBase(_Base):

    def getall(self, key, default=_marker):
        """Return a list of all values matching the key."""
        return super().getall(key.upper(), default)

    def getone(self, key, default=_marker):
        """Get first value matching the key."""
        return super().getone(key.upper(), default)

    def get(self, key, default=None):
        """Get first value matching the key.

        The method is alias for .getone().
        """
        return super().get(key.upper(), default)

    def __getitem__(self, key):
        return super().__getitem__(key.upper())

    def __contains__(self, key):
        return super().__contains__(key.upper())


class _MultiDictProxy(_Base, abc.Mapping):

    def __init__(self, arg):
        if not isinstance(arg, _MultiDict):
            raise TypeError(
                'MultiDictProxy requires MultiDict instance, not {}'.format(
                    type(arg)))

        self._items = arg._items

    def copy(self):
        """Return a copy of itself."""
        return _MultiDict(self.items())


class _CIMultiDictProxy(_CIBase, _MultiDictProxy):

    def __init__(self, arg):
        if not isinstance(arg, _CIMultiDict):
            raise TypeError(
                'CIMultiDictProxy requires CIMultiDict instance, not {}'
                .format(type(arg)))

        self._items = arg._items

    def copy(self):
        """Return a copy of itself."""
        return _CIMultiDict(self.items())


class _MultiDict(_Base, abc.MutableMapping):

    def __init__(self, *args, **kwargs):
        self._items = []

        self._extend(args, kwargs, self.__class__.__name__, self.add)

    def add(self, key, value):
        """Add the key and value, not overwriting any previous value."""
        self._items.append((key, value))

    def copy(self):
        """Return a copy of itself."""
        cls = self.__class__
        return cls(self.items())

    def extend(self, *args, **kwargs):
        """Extend current MultiDict with more values.

        This method must be used instead of update.
        """
        self._extend(args, kwargs, 'extend', self.add)

    def _extend(self, args, kwargs, name, method):
        if len(args) > 1:
            raise TypeError("{} takes at most 1 positional argument"
                            " ({} given)".format(name, len(args)))
        if args:
            arg = args[0]
            if isinstance(args[0], _MultiDictProxy):
                items = arg._items
            elif isinstance(args[0], _MultiDict):
                items = arg._items
            elif hasattr(arg, 'items'):
                items = arg.items()
            else:
                for item in arg:
                    if not len(item) == 2:
                        raise TypeError(
                            "{} takes either dict or list of (key, value) "
                            "tuples".format(name))
                items = arg

            for key, value in items:
                method(key, value)

        for key, value in kwargs.items():
            method(key, value)

    def clear(self):
        """Remove all items from MultiDict."""
        self._items.clear()

    # Mapping interface #

    def __setitem__(self, key, value):
        self._replace(key, value)

    def __delitem__(self, key):
        items = self._items
        found = False
        for i in range(len(items) - 1, -1, -1):
            if items[i][0] == key:
                del items[i]
                found = True
        if not found:
            raise KeyError(key)

    def setdefault(self, key, default=None):
        """Return value for key, set value to default if key is not present."""
        for k, v in self._items:
            if k == key:
                return v
        self._items.append((key, default))
        return default

    def pop(self, key, default=_marker):
        """Remove specified key and return the corresponding value.

        If key is not found, d is returned if given, otherwise
        KeyError is raised.

        """
        value = None
        found = False
        for i in range(len(self._items) - 1, -1, -1):
            if self._items[i][0] == key:
                value = self._items[i][1]
                del self._items[i]
                found = True
        if not found:
            if default is _marker:
                raise KeyError(key)
            else:
                return default
        else:
            return value

    def popitem(self):
        """Remove and return an arbitrary (key, value) pair."""
        if self._items:
            return self._items.pop(0)
        else:
            raise KeyError("empty multidict")

    def update(self, *args, **kwargs):
        """Update the dictionary from *other*, overwriting existing keys."""
        self._extend(args, kwargs, 'update', self._replace)

    def _replace(self, key, value):
        if key in self:
            del self[key]
        self.add(key, value)


class _CIMultiDict(_CIBase, _MultiDict):

    def add(self, key, value):
        """Add the key and value, not overwriting any previous value."""
        super().add(key.upper(), value)

    def __setitem__(self, key, value):
        super().__setitem__(key.upper(), value)

    def __delitem__(self, key):
        super().__delitem__(key.upper())

    def _replace(self, key, value):
        super()._replace(key.upper(), value)

    def setdefault(self, key, default=None):
        """Return value for key, set value to default if key is not present."""
        key = key.upper()
        return super().setdefault(key, default)


class _ViewBase:

    def __init__(self, items):
        self._items = items

    def __len__(self):
        return len(self._items)

    def __repr__(self):
        return '{0.__class__.__name__}({0._items!r})'.format(self)


class _ItemsView(_ViewBase, abc.ItemsView):

    def __contains__(self, item):
        assert isinstance(item, tuple) or isinstance(item, list)
        assert len(item) == 2
        return item in self._items

    def __iter__(self):
        yield from self._items


class _ValuesView(_ViewBase, abc.ValuesView):

    def __contains__(self, value):
        for item in self._items:
            if item[1] == value:
                return True
        return False

    def __iter__(self):
        for item in self._items:
            yield item[1]


class _KeysView(_ViewBase, abc.KeysView):

    def __contains__(self, key):
        for item in self._items:
            if item[0] == key:
                return True
        return False

    def __iter__(self):
        for item in self._items:
            yield item[0]


if bool(os.environ.get('AIOHTTP_NO_EXTENSIONS')):
    MultiDictProxy = _MultiDictProxy
    CIMultiDictProxy = _CIMultiDictProxy
    MultiDict = _MultiDict
    CIMultiDict = _CIMultiDict
    upstr = _upstr
else:
    try:
        from ._multidict import (MultiDictProxy,
                                 CIMultiDictProxy,
                                 MultiDict,
                                 CIMultiDict,
                                 upstr)
    except ImportError:  # pragma: no cover
        MultiDictProxy = _MultiDictProxy
        CIMultiDictProxy = _CIMultiDictProxy
        MultiDict = _MultiDict
        CIMultiDict = _CIMultiDict
        upstr = _upstr