Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
contego / home / tvault / .virtenv / lib / python2.7 / site-packages / kombu / serialization.py
Size: Mime:
"""Serialization utilities."""
from __future__ import absolute_import, unicode_literals

import codecs
import os
import sys

import pickle as pypickle
try:
    import cPickle as cpickle
except ImportError:  # pragma: no cover
    cpickle = None  # noqa

from collections import namedtuple
from contextlib import contextmanager
from io import BytesIO

from .exceptions import (
    ContentDisallowed, DecodeError, EncodeError, SerializerNotInstalled
)
from .five import reraise, text_t
from .utils.compat import entrypoints
from .utils.encoding import bytes_to_str, str_to_bytes, bytes_t

__all__ = ['pickle', 'loads', 'dumps', 'register', 'unregister']
SKIP_DECODE = frozenset(['binary', 'ascii-8bit'])
TRUSTED_CONTENT = frozenset(['application/data', 'application/text'])

if sys.platform.startswith('java'):  # pragma: no cover

    def _decode(t, coding):
        return codecs.getdecoder(coding)(t)[0]
else:
    _decode = codecs.decode

pickle = cpickle or pypickle
pickle_load = pickle.load

#: Kombu requires Python 2.5 or later so we use protocol 2 by default.
#: There's a new protocol (3) but this is only supported by Python 3.
pickle_protocol = int(os.environ.get('PICKLE_PROTOCOL', 2))

codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder'))


@contextmanager
def _reraise_errors(wrapper,
                    include=(Exception,), exclude=(SerializerNotInstalled,)):
    try:
        yield
    except exclude:
        raise
    except include as exc:
        reraise(wrapper, wrapper(exc), sys.exc_info()[2])


def pickle_loads(s, load=pickle_load):
    # used to support buffer objects
    return load(BytesIO(s))


def parenthesize_alias(first, second):
    return '%s (%s)' % (first, second) if first else second


class SerializerRegistry(object):
    """The registry keeps track of serialization methods."""

    def __init__(self):
        self._encoders = {}
        self._decoders = {}
        self._default_encode = None
        self._default_content_type = None
        self._default_content_encoding = None
        self._disabled_content_types = set()
        self.type_to_name = {}
        self.name_to_type = {}

    def register(self, name, encoder, decoder, content_type,
                 content_encoding='utf-8'):
        """Register a new encoder/decoder.

        Arguments:
            name (str): A convenience name for the serialization method.

            encoder (callable): A method that will be passed a python data
                structure and should return a string representing the
                serialized data.  If :const:`None`, then only a decoder
                will be registered. Encoding will not be possible.

            decoder (Callable): A method that will be passed a string
                representing serialized data and should return a python
                data structure.  If :const:`None`, then only an encoder
                will be registered.  Decoding will not be possible.

            content_type (str): The mime-type describing the serialized
                structure.

            content_encoding (str): The content encoding (character set) that
                the `decoder` method will be returning. Will usually be
                `utf-8`, `us-ascii`, or `binary`.
        """
        if encoder:
            self._encoders[name] = codec(
                content_type, content_encoding, encoder,
            )
        if decoder:
            self._decoders[content_type] = decoder
        self.type_to_name[content_type] = name
        self.name_to_type[name] = content_type

    def enable(self, name):
        if '/' not in name:
            name = self.name_to_type[name]
        self._disabled_content_types.discard(name)

    def disable(self, name):
        if '/' not in name:
            name = self.name_to_type[name]
        self._disabled_content_types.add(name)

    def unregister(self, name):
        """Unregister registered encoder/decoder.

        Arguments:
            name (str): Registered serialization method name.

        Raises:
            SerializerNotInstalled: If a serializer by that name
                cannot be found.
        """
        try:
            content_type = self.name_to_type[name]
            self._decoders.pop(content_type, None)
            self._encoders.pop(name, None)
            self.type_to_name.pop(content_type, None)
            self.name_to_type.pop(name, None)
        except KeyError:
            raise SerializerNotInstalled(
                'No encoder/decoder installed for {0}'.format(name))

    def _set_default_serializer(self, name):
        """Set the default serialization method used by this library.

        Arguments:
            name (str): The name of the registered serialization method.
                For example, `json` (default), `pickle`, `yaml`, `msgpack`,
                or any custom methods registered using :meth:`register`.

        Raises:
            SerializerNotInstalled: If the serialization method
                requested is not available.
        """
        try:
            (self._default_content_type, self._default_content_encoding,
             self._default_encode) = self._encoders[name]
        except KeyError:
            raise SerializerNotInstalled(
                'No encoder installed for {0}'.format(name))

    def dumps(self, data, serializer=None):
        """Encode data.

        Serialize a data structure into a string suitable for sending
        as an AMQP message body.

        Arguments:
            data (List, Dict, str): The message data to send.

            serializer (str): An optional string representing
                the serialization method you want the data marshalled
                into. (For example, `json`, `raw`, or `pickle`).

                If :const:`None` (default), then json will be used, unless
                `data` is a :class:`str` or :class:`unicode` object. In this
                latter case, no serialization occurs as it would be
                unnecessary.

                Note that if `serializer` is specified, then that
                serialization method will be used even if a :class:`str`
                or :class:`unicode` object is passed in.

        Returns:
            Tuple[str, str, str]: A three-item tuple containing the
            content type (e.g., `application/json`), content encoding, (e.g.,
            `utf-8`) and a string containing the serialized data.

        Raises:
            SerializerNotInstalled: If the serialization method
                requested is not available.
        """
        if serializer == 'raw':
            return raw_encode(data)
        if serializer and not self._encoders.get(serializer):
            raise SerializerNotInstalled(
                'No encoder installed for {0}'.format(serializer))

        # If a raw string was sent, assume binary encoding
        # (it's likely either ASCII or a raw binary file, and a character
        # set of 'binary' will encompass both, even if not ideal.
        if not serializer and isinstance(data, bytes_t):
            # In Python 3+, this would be "bytes"; allow binary data to be
            # sent as a message without getting encoder errors
            return 'application/data', 'binary', data

        # For Unicode objects, force it into a string
        if not serializer and isinstance(data, text_t):
            with _reraise_errors(EncodeError, exclude=()):
                payload = data.encode('utf-8')
            return 'text/plain', 'utf-8', payload

        if serializer:
            content_type, content_encoding, encoder = \
                self._encoders[serializer]
        else:
            encoder = self._default_encode
            content_type = self._default_content_type
            content_encoding = self._default_content_encoding

        with _reraise_errors(EncodeError):
            payload = encoder(data)
        return content_type, content_encoding, payload

    def loads(self, data, content_type, content_encoding,
              accept=None, force=False, _trusted_content=TRUSTED_CONTENT):
        """Decode serialized data.

        Deserialize a data stream as serialized using `dumps`
        based on `content_type`.

        Arguments:
            data (bytes, buffer, str): The message data to deserialize.

            content_type (str): The content-type of the data.
                (e.g., `application/json`).

            content_encoding (str): The content-encoding of the data.
                (e.g., `utf-8`, `binary`, or `us-ascii`).

            accept (Set): List of content-types to accept.

        Raises:
            ContentDisallowed: If the content-type is not accepted.

        Returns:
            Any: The unserialized data.
        """
        content_type = (bytes_to_str(content_type) if content_type
                        else 'application/data')
        if accept is not None:
            if content_type not in _trusted_content \
                    and content_type not in accept:
                raise self._for_untrusted_content(content_type, 'untrusted')
        else:
            if content_type in self._disabled_content_types and not force:
                raise self._for_untrusted_content(content_type, 'disabled')
        content_encoding = (content_encoding or 'utf-8').lower()

        if data:
            decode = self._decoders.get(content_type)
            if decode:
                with _reraise_errors(DecodeError):
                    return decode(data)
            if content_encoding not in SKIP_DECODE and \
                    not isinstance(data, text_t):
                with _reraise_errors(DecodeError):
                    return _decode(data, content_encoding)
        return data

    def _for_untrusted_content(self, ctype, why):
        return ContentDisallowed(
            'Refusing to deserialize {0} content of type {1}'.format(
                why,
                parenthesize_alias(self.type_to_name.get(ctype, ctype), ctype),
            ),
        )


#: Global registry of serializers/deserializers.
registry = SerializerRegistry()
dumps = registry.dumps
loads = registry.loads
register = registry.register
unregister = registry.unregister


def raw_encode(data):
    """Special case serializer."""
    content_type = 'application/data'
    payload = data
    if isinstance(payload, text_t):
        content_encoding = 'utf-8'
        with _reraise_errors(EncodeError, exclude=()):
            payload = payload.encode(content_encoding)
    else:
        content_encoding = 'binary'
    return content_type, content_encoding, payload


def register_json():
    """Register a encoder/decoder for JSON serialization."""
    from kombu.utils import json as _json

    registry.register('json', _json.dumps, _json.loads,
                      content_type='application/json',
                      content_encoding='utf-8')


def register_yaml():
    """Register a encoder/decoder for YAML serialization.

    It is slower than JSON, but allows for more data types
    to be serialized. Useful if you need to send data such as dates

    """
    try:
        import yaml
        registry.register('yaml', yaml.safe_dump, yaml.safe_load,
                          content_type='application/x-yaml',
                          content_encoding='utf-8')
    except ImportError:

        def not_available(*args, **kwargs):
            """Raise SerializerNotInstalled.

            Used in case a client receives a yaml message, but yaml
            isn't installed.
            """
            raise SerializerNotInstalled(
                'No decoder installed for YAML. Install the PyYAML library')
        registry.register('yaml', None, not_available, 'application/x-yaml')


if sys.version_info[0] == 3:  # pragma: no cover

    def unpickle(s):
        return pickle_loads(str_to_bytes(s))

else:
    unpickle = pickle_loads  # noqa


def register_pickle():
    """Register pickle serializer.

    The fastest serialization method, but restricts
    you to python clients.
    """
    def pickle_dumps(obj, dumper=pickle.dumps):
        return dumper(obj, protocol=pickle_protocol)

    registry.register('pickle', pickle_dumps, unpickle,
                      content_type='application/x-python-serialize',
                      content_encoding='binary')


def register_msgpack():
    """Register msgpack serializer.

    See Also:
        http://msgpack.sourceforge.net/.
    """
    pack = unpack = None
    try:
        import msgpack
        if msgpack.version >= (0, 4):
            from msgpack import packb, unpackb

            def pack(s):
                return packb(s, use_bin_type=True)

            def unpack(s):
                return unpackb(s, encoding='utf-8')
        else:
            def version_mismatch(*args, **kwargs):
                raise SerializerNotInstalled(
                    'msgpack requires msgpack-python >= 0.4.0')
            pack = unpack = version_mismatch
    except (ImportError, ValueError):
        def not_available(*args, **kwargs):
            raise SerializerNotInstalled(
                'No decoder installed for msgpack. '
                'Please install the msgpack-python library')
        pack = unpack = not_available
    registry.register(
        'msgpack', pack, unpack,
        content_type='application/x-msgpack',
        content_encoding='binary',
    )


# Register the base serialization methods.
register_json()
register_pickle()
register_yaml()
register_msgpack()

# Default serializer is 'json'
registry._set_default_serializer('json')


_setupfuns = {
    'json': register_json,
    'pickle': register_pickle,
    'yaml': register_yaml,
    'msgpack': register_msgpack,
    'application/json': register_json,
    'application/x-yaml': register_yaml,
    'application/x-python-serialize': register_pickle,
    'application/x-msgpack': register_msgpack,
}


def enable_insecure_serializers(choices=['pickle', 'yaml', 'msgpack']):
    """Enable serializers that are considered to be unsafe.

    Note:
        Will enable ``pickle``, ``yaml`` and ``msgpack`` by default, but you
        can also specify a list of serializers (by name or content type)
        to enable.
    """
    for choice in choices:
        try:
            registry.enable(choice)
        except KeyError:
            pass


def disable_insecure_serializers(allowed=['json']):
    """Disable untrusted serializers.

    Will disable all serializers except ``json``
    or you can specify a list of deserializers to allow.

    Note:
        Producers will still be able to serialize data
        in these formats, but consumers will not accept
        incoming data using the untrusted content types.
    """
    for name in registry._decoders:
        registry.disable(name)
    if allowed is not None:
        for name in allowed:
            registry.enable(name)


# Insecure serializers are disabled by default since v3.0
disable_insecure_serializers()

# Load entrypoints from installed extensions
for ep, args in entrypoints('kombu.serializers'):  # pragma: no cover
    register(ep.name, *args)


def prepare_accept_content(l, name_to_type=registry.name_to_type):
    if l is not None:
        return {n if '/' in n else name_to_type[n] for n in l}
    return l