"""Fallback pure Python implementation of msgpack"""
import sys
import struct
import warnings
if sys.version_info[0] == 3:
PY3 = True
int_types = int
Unicode = str
xrange = range
def dict_iteritems(d):
return d.items()
else:
PY3 = False
int_types = (int, long)
Unicode = unicode
def dict_iteritems(d):
return d.iteritems()
if hasattr(sys, 'pypy_version_info'):
# cStringIO is slow on PyPy, StringIO is faster. However: PyPy's own
# StringBuilder is fastest.
from __pypy__ import newlist_hint
try:
from __pypy__.builders import BytesBuilder as StringBuilder
except ImportError:
from __pypy__.builders import StringBuilder
USING_STRINGBUILDER = True
class StringIO(object):
def __init__(self, s=b''):
if s:
self.builder = StringBuilder(len(s))
self.builder.append(s)
else:
self.builder = StringBuilder()
def write(self, s):
if isinstance(s, memoryview):
s = s.tobytes()
elif isinstance(s, bytearray):
s = bytes(s)
self.builder.append(s)
def getvalue(self):
return self.builder.build()
else:
USING_STRINGBUILDER = False
from io import BytesIO as StringIO
newlist_hint = lambda size: []
from msgpack.exceptions import (
BufferFull,
OutOfData,
UnpackValueError,
PackValueError,
PackOverflowError,
ExtraData)
from msgpack import ExtType
EX_SKIP = 0
EX_CONSTRUCT = 1
EX_READ_ARRAY_HEADER = 2
EX_READ_MAP_HEADER = 3
TYPE_IMMEDIATE = 0
TYPE_ARRAY = 1
TYPE_MAP = 2
TYPE_RAW = 3
TYPE_BIN = 4
TYPE_EXT = 5
DEFAULT_RECURSE_LIMIT = 511
def _check_type_strict(obj, t, type=type, tuple=tuple):
if type(t) is tuple:
return type(obj) in t
else:
return type(obj) is t
def _get_data_from_buffer(obj):
try:
view = memoryview(obj)
except TypeError:
# try to use legacy buffer protocol if 2.7, otherwise re-raise
if not PY3:
view = memoryview(buffer(obj))
warnings.warn("using old buffer interface to unpack %s; "
"this leads to unpacking errors if slicing is used and "
"will be removed in a future version" % type(obj),
RuntimeWarning)
else:
raise
if view.itemsize != 1:
raise ValueError("cannot unpack from multi-byte object")
return view
def unpack(stream, **kwargs):
warnings.warn(
"Direct calling implementation's unpack() is deprecated, Use msgpack.unpack() or unpackb() instead.",
PendingDeprecationWarning)
data = stream.read()
return unpackb(data, **kwargs)
def unpackb(packed, **kwargs):
"""
Unpack an object from `packed`.
Raises `ExtraData` when `packed` contains extra bytes.
See :class:`Unpacker` for options.
"""
unpacker = Unpacker(None, **kwargs)
unpacker.feed(packed)
try:
ret = unpacker._unpack()
except OutOfData:
raise UnpackValueError("Data is not enough.")
if unpacker._got_extradata():
raise ExtraData(ret, unpacker._get_extradata())
return ret
class Unpacker(object):
"""Streaming unpacker.
arguments:
:param file_like:
File-like object having `.read(n)` method.
If specified, unpacker reads serialized data from it and :meth:`feed()` is not usable.
:param int read_size:
Used as `file_like.read(read_size)`. (default: `min(16*1024, max_buffer_size)`)
:param bool use_list:
If true, unpack msgpack array to Python list.
Otherwise, unpack to Python tuple. (default: True)
:param bool raw:
If true, unpack msgpack raw to Python bytes (default).
Otherwise, unpack to Python str (or unicode on Python 2) by decoding
with UTF-8 encoding (recommended).
Currently, the default is true, but it will be changed to false in
near future. So you must specify it explicitly for keeping backward
compatibility.
*encoding* option which is deprecated overrides this option.
:param callable object_hook:
When specified, it should be callable.
Unpacker calls it with a dict argument after unpacking msgpack map.
(See also simplejson)
:param callable object_pairs_hook:
When specified, it should be callable.
Unpacker calls it with a list of key-value pairs after unpacking msgpack map.
(See also simplejson)
:param str encoding:
Encoding used for decoding msgpack raw.
If it is None (default), msgpack raw is deserialized to Python bytes.
:param str unicode_errors:
(deprecated) Used for decoding msgpack raw with *encoding*.
(default: `'strict'`)
:param int max_buffer_size:
Limits size of data waiting unpacked. 0 means system's INT_MAX (default).
Raises `BufferFull` exception when it is insufficient.
You should set this parameter when unpacking data from untrusted source.
:param int max_str_len:
Limits max length of str. (default: 2**31-1)
:param int max_bin_len:
Limits max length of bin. (default: 2**31-1)
:param int max_array_len:
Limits max length of array. (default: 2**31-1)
:param int max_map_len:
Limits max length of map. (default: 2**31-1)
example of streaming deserialize from file-like object::
unpacker = Unpacker(file_like, raw=False)
for o in unpacker:
process(o)
example of streaming deserialize from socket::
unpacker = Unpacker(raw=False)
while True:
buf = sock.recv(1024**2)
if not buf:
break
unpacker.feed(buf)
for o in unpacker:
process(o)
"""
def __init__(self, file_like=None, read_size=0, use_list=True, raw=True,
object_hook=None, object_pairs_hook=None, list_hook=None,
encoding=None, unicode_errors=None, max_buffer_size=0,
ext_hook=ExtType,
max_str_len=2147483647, # 2**32-1
max_bin_len=2147483647,
max_array_len=2147483647,
max_map_len=2147483647,
max_ext_len=2147483647):
if encoding is not None:
warnings.warn(
"encoding is deprecated, Use raw=False instead.",
PendingDeprecationWarning)
if unicode_errors is None:
unicode_errors = 'strict'
if file_like is None:
self._feeding = True
else:
if not callable(file_like.read):
raise TypeError("`file_like.read` must be callable")
self.file_like = file_like
self._feeding = False
#: array of bytes fed.
self._buffer = bytearray()
#: Which position we currently reads
self._buff_i = 0
# When Unpacker is used as an iterable, between the calls to next(),
# the buffer is not "consumed" completely, for efficiency sake.
# Instead, it is done sloppily. To make sure we raise BufferFull at
# the correct moments, we have to keep track of how sloppy we were.
# Furthermore, when the buffer is incomplete (that is: in the case
# we raise an OutOfData) we need to rollback the buffer to the correct
# state, which _buf_checkpoint records.
self._buf_checkpoint = 0
self._max_buffer_size = max_buffer_size or 2**31-1
if read_size > self._max_buffer_size:
raise ValueError("read_size must be smaller than max_buffer_size")
self._read_size = read_size or min(self._max_buffer_size, 16*1024)
self._raw = bool(raw)
self._encoding = encoding
self._unicode_errors = unicode_errors
self._use_list = use_list
self._list_hook = list_hook
self._object_hook = object_hook
self._object_pairs_hook = object_pairs_hook
self._ext_hook = ext_hook
self._max_str_len = max_str_len
self._max_bin_len = max_bin_len
self._max_array_len = max_array_len
self._max_map_len = max_map_len
self._max_ext_len = max_ext_len
self._stream_offset = 0
if list_hook is not None and not callable(list_hook):
raise TypeError('`list_hook` is not callable')
if object_hook is not None and not callable(object_hook):
raise TypeError('`object_hook` is not callable')
if object_pairs_hook is not None and not callable(object_pairs_hook):
raise TypeError('`object_pairs_hook` is not callable')
if object_hook is not None and object_pairs_hook is not None:
raise TypeError("object_pairs_hook and object_hook are mutually "
"exclusive")
if not callable(ext_hook):
raise TypeError("`ext_hook` is not callable")
def feed(self, next_bytes):
assert self._feeding
view = _get_data_from_buffer(next_bytes)
if (len(self._buffer) - self._buff_i + len(view) > self._max_buffer_size):
raise BufferFull
# Strip buffer before checkpoint before reading file.
if self._buf_checkpoint > 0:
del self._buffer[:self._buf_checkpoint]
self._buff_i -= self._buf_checkpoint
self._buf_checkpoint = 0
self._buffer += view
def _consume(self):
""" Gets rid of the used parts of the buffer. """
self._stream_offset += self._buff_i - self._buf_checkpoint
self._buf_checkpoint = self._buff_i
def _got_extradata(self):
return self._buff_i < len(self._buffer)
def _get_extradata(self):
return self._buffer[self._buff_i:]
def read_bytes(self, n):
return self._read(n)
def _read(self, n):
# (int) -> bytearray
self._reserve(n)
i = self._buff_i
self._buff_i = i+n
return self._buffer[i:i+n]
def _reserve(self, n):
remain_bytes = len(self._buffer) - self._buff_i - n
# Fast path: buffer has n bytes already
if remain_bytes >= 0:
return
if self._feeding:
self._buff_i = self._buf_checkpoint
raise OutOfData
# Strip buffer before checkpoint before reading file.
if self._buf_checkpoint > 0:
del self._buffer[:self._buf_checkpoint]
self._buff_i -= self._buf_checkpoint
self._buf_checkpoint = 0
# Read from file
remain_bytes = -remain_bytes
while remain_bytes > 0:
to_read_bytes = max(self._read_size, remain_bytes)
read_data = self.file_like.read(to_read_bytes)
if not read_data:
break
assert isinstance(read_data, bytes)
self._buffer += read_data
remain_bytes -= len(read_data)
if len(self._buffer) < n + self._buff_i:
self._buff_i = 0 # rollback
raise OutOfData
Loading ...