Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
contego / home / tvault / .virtenv / lib / python2.7 / site-packages / amqp / serialization.py
Size: Mime:
"""Convert between bytestreams and higher-level AMQP types.

2007-11-05 Barry Pederson <bp@barryp.org>

"""
# Copyright (C) 2007 Barry Pederson <bp@barryp.org>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
from __future__ import absolute_import, unicode_literals

import calendar
import sys

from datetime import datetime
from decimal import Decimal
from io import BytesIO
from struct import pack, unpack_from

from .spec import Basic
from .exceptions import FrameSyntaxError
from .five import int_types, long_t, string, string_t, items
from .utils import bytes_to_str as pstr_t, str_to_bytes

ftype_t = chr if sys.version_info[0] == 3 else None

ILLEGAL_TABLE_TYPE = """\
    Table type {0!r} not handled by amqp.
"""

ILLEGAL_TABLE_TYPE_WITH_KEY = """\
Table type {0!r} for key {1!r} not handled by amqp. [value: {2!r}]
"""

ILLEGAL_TABLE_TYPE_WITH_VALUE = """\
    Table type {0!r} not handled by amqp. [value: {1!r}]
"""


def _read_item(buf, offset=0, unpack_from=unpack_from, ftype_t=ftype_t):
    ftype = ftype_t(buf[offset]) if ftype_t else buf[offset]
    offset += 1

    # 'S': long string
    if ftype == 'S':
        slen, = unpack_from(b'>I', buf, offset)
        offset += 4
        val = pstr_t(buf[offset:offset + slen])
        offset += slen
    # 's': short string
    elif ftype == 's':
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        val = pstr_t(buf[offset:offset + slen])
        offset += slen
    # 'b': short-short int
    elif ftype == 'b':
        val, = unpack_from(b'>B', buf, offset)
        offset += 1
    # 'B': short-short unsigned int
    elif ftype == 'B':
        val, = unpack_from(b'>b', buf, offset)
        offset += 1
    # 'U': short int
    elif ftype == 'U':
        val, = unpack_from(b'>h', buf, offset)
        offset += 2
    # 'u': short unsigned int
    elif ftype == 'u':
        val, = unpack_from(b'>H', buf, offset)
        offset += 2
    # 'I': long int
    elif ftype == 'I':
        val, = unpack_from(b'>i', buf, offset)
        offset += 4
    # 'i': long unsigned int
    elif ftype == 'i':
        val, = unpack_from(b'>I', buf, offset)
        offset += 4
    # 'L': long long int
    elif ftype == 'L':
        val, = unpack_from(b'>q', buf, offset)
        offset += 8
    # 'l': long long unsigned int
    elif ftype == 'l':
        val, = unpack_from(b'>Q', buf, offset)
        offset += 8
    # 'f': float
    elif ftype == 'f':
        val, = unpack_from(b'>f', buf, offset)
        offset += 4
    # 'd': double
    elif ftype == 'd':
        val, = unpack_from(b'>d', buf, offset)
        offset += 8
    # 'D': decimal
    elif ftype == 'D':
        d, = unpack_from(b'>B', buf, offset)
        offset += 1
        n, = unpack_from(b'>i', buf, offset)
        offset += 4
        val = Decimal(n) / Decimal(10 ** d)
    # 'F': table
    elif ftype == 'F':
        tlen, = unpack_from(b'>I', buf, offset)
        offset += 4
        limit = offset + tlen
        val = {}
        while offset < limit:
            keylen, = unpack_from(b'>B', buf, offset)
            offset += 1
            key = pstr_t(buf[offset:offset + keylen])
            offset += keylen
            val[key], offset = _read_item(buf, offset)
    # 'A': array
    elif ftype == 'A':
        alen, = unpack_from(b'>I', buf, offset)
        offset += 4
        limit = offset + alen
        val = []
        while offset < limit:
            v, offset = _read_item(buf, offset)
            val.append(v)
    # 't' (bool)
    elif ftype == 't':
        val, = unpack_from(b'>B', buf, offset)
        val = bool(val)
        offset += 1
    # 'T': timestamp
    elif ftype == 'T':
        val, = unpack_from(b'>Q', buf, offset)
        offset += 8
        val = datetime.utcfromtimestamp(val)
    # 'V': void
    elif ftype == 'V':
        val = None
    else:
        raise FrameSyntaxError(
            'Unknown value in table: {0!r} ({1!r})'.format(
                ftype, type(ftype)))
    return val, offset


def loads(format, buf, offset=0,
          ord=ord, unpack_from=unpack_from,
          _read_item=_read_item, pstr_t=pstr_t):
    """Deserialize amqp format.

    bit = b
    octet = o
    short = B
    long = l
    long long = L
    float = f
    shortstr = s
    longstr = S
    table = F
    array = A
    timestamp = T
    """
    bitcount = bits = 0

    values = []
    append = values.append
    format = pstr_t(format)

    for p in format:
        if p == 'b':
            if not bitcount:
                bits = ord(buf[offset:offset + 1])
            bitcount = 8
            val = (bits & 1) == 1
            bits >>= 1
            bitcount -= 1
            offset += 1
        elif p == 'o':
            bitcount = bits = 0
            val, = unpack_from(b'>B', buf, offset)
            offset += 1
        elif p == 'B':
            bitcount = bits = 0
            val, = unpack_from(b'>H', buf, offset)
            offset += 2
        elif p == 'l':
            bitcount = bits = 0
            val, = unpack_from(b'>I', buf, offset)
            offset += 4
        elif p == 'L':
            bitcount = bits = 0
            val, = unpack_from(b'>Q', buf, offset)
            offset += 8
        elif p == 'f':
            bitcount = bits = 0
            val, = unpack_from(b'>f', buf, offset)
            offset += 4
        elif p == 's':
            bitcount = bits = 0
            slen, = unpack_from(b'B', buf, offset)
            offset += 1
            val = buf[offset:offset + slen].decode('utf-8')
            offset += slen
        elif p == 'S':
            bitcount = bits = 0
            slen, = unpack_from(b'>I', buf, offset)
            offset += 4
            val = buf[offset:offset + slen].decode('utf-8')
            offset += slen
        elif p == 'F':
            bitcount = bits = 0
            tlen, = unpack_from(b'>I', buf, offset)
            offset += 4
            limit = offset + tlen
            val = {}
            while offset < limit:
                keylen, = unpack_from(b'>B', buf, offset)
                offset += 1
                key = pstr_t(buf[offset:offset + keylen])
                offset += keylen
                val[key], offset = _read_item(buf, offset)
        elif p == 'A':
            bitcount = bits = 0
            alen, = unpack_from(b'>I', buf, offset)
            offset += 4
            limit = offset + alen
            val = []
            while offset < limit:
                aval, offset = _read_item(buf, offset)
                val.append(aval)
        elif p == 'T':
            bitcount = bits = 0
            val, = unpack_from(b'>Q', buf, offset)
            offset += 8
            val = datetime.utcfromtimestamp(val)
        else:
            raise FrameSyntaxError(ILLEGAL_TABLE_TYPE.format(p))
        append(val)
    return values, offset


def _flushbits(bits, write, pack=pack):
    if bits:
        write(pack(b'B' * len(bits), *bits))
        bits[:] = []
    return 0


def dumps(format, values):
    """"Serialize AMQP arguments.

    Notes:
        bit = b
        octet = o
        short = B
        long = l
        long long = L
        shortstr = s
        longstr = S
        table = F
        array = A
    """
    bitcount = 0
    bits = []
    out = BytesIO()
    write = out.write

    format = pstr_t(format)

    for i, val in enumerate(values):
        p = format[i]
        if p == 'b':
            val = 1 if val else 0
            shift = bitcount % 8
            if shift == 0:
                bits.append(0)
            bits[-1] |= (val << shift)
            bitcount += 1
        elif p == 'o':
            bitcount = _flushbits(bits, write)
            write(pack(b'B', val))
        elif p == 'B':
            bitcount = _flushbits(bits, write)
            write(pack(b'>H', int(val)))
        elif p == 'l':
            bitcount = _flushbits(bits, write)
            write(pack(b'>I', val))
        elif p == 'L':
            bitcount = _flushbits(bits, write)
            write(pack(b'>Q', val))
        elif p == 'f':
            bitcount = _flushbits(bits, write)
            write(pack(b'>f', val))
        elif p == 's':
            val = val or ''
            bitcount = _flushbits(bits, write)
            if isinstance(val, string):
                val = val.encode('utf-8')
            write(pack(b'B', len(val)))
            write(val)
        elif p == 'S':
            val = val or ''
            bitcount = _flushbits(bits, write)
            if isinstance(val, string):
                val = val.encode('utf-8')
            write(pack(b'>I', len(val)))
            write(val)
        elif p == 'F':
            bitcount = _flushbits(bits, write)
            _write_table(val or {}, write, bits)
        elif p == 'A':
            bitcount = _flushbits(bits, write)
            _write_array(val or [], write, bits)
        elif p == 'T':
            write(pack(b'>Q', long_t(calendar.timegm(val.utctimetuple()))))
    _flushbits(bits, write)

    return out.getvalue()


def _write_table(d, write, bits, pack=pack):
    out = BytesIO()
    twrite = out.write
    for k, v in items(d):
        if isinstance(k, string):
            k = k.encode('utf-8')
        twrite(pack(b'B', len(k)))
        twrite(k)
        try:
            _write_item(v, twrite, bits)
        except ValueError:
            raise FrameSyntaxError(
                ILLEGAL_TABLE_TYPE_WITH_KEY.format(type(v), k, v))
    table_data = out.getvalue()
    write(pack(b'>I', len(table_data)))
    write(table_data)


def _write_array(l, write, bits, pack=pack):
    out = BytesIO()
    awrite = out.write
    for v in l:
        try:
            _write_item(v, awrite, bits)
        except ValueError:
            raise FrameSyntaxError(
                ILLEGAL_TABLE_TYPE_WITH_VALUE.format(type(v), v))
    array_data = out.getvalue()
    write(pack(b'>I', len(array_data)))
    write(array_data)


def _write_item(v, write, bits, pack=pack,
                string_t=string_t, bytes=bytes, string=string, bool=bool,
                float=float, int_types=int_types, Decimal=Decimal,
                datetime=datetime, dict=dict, list=list, tuple=tuple,
                None_t=None):
    if isinstance(v, (string_t, bytes)):
        if isinstance(v, string):
            v = v.encode('utf-8')
        write(pack(b'>cI', b'S', len(v)))
        write(v)
    elif isinstance(v, bool):
        write(pack(b'>cB', b't', int(v)))
    elif isinstance(v, float):
        write(pack(b'>cd', b'd', v))
    elif isinstance(v, int_types):
        if v > 2147483647 or v < -2147483647:
            write(pack(b'>cq', b'L', v))
        else:
            write(pack(b'>ci', b'I', v))
    elif isinstance(v, Decimal):
        sign, digits, exponent = v.as_tuple()
        v = 0
        for d in digits:
            v = (v * 10) + d
        if sign:
            v = -v
        write(pack(b'>cBi', b'D', -exponent, v))
    elif isinstance(v, datetime):
        write(pack(b'>cQ', b'T', long_t(calendar.timegm(v.utctimetuple()))))
    elif isinstance(v, dict):
        write(b'F')
        _write_table(v, write, bits)
    elif isinstance(v, (list, tuple)):
        write(b'A')
        _write_array(v, write, bits)
    elif v is None_t:
        write(b'V')
    else:
        raise ValueError()


def decode_properties_basic(buf, offset=0,
                            unpack_from=unpack_from, pstr_t=pstr_t):
    """Decode basic properties."""
    properties = {}

    flags, = unpack_from(b'>H', buf, offset)
    offset += 2

    if flags & 0x8000:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['content_type'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    if flags & 0x4000:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['content_encoding'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    if flags & 0x2000:
        _f, offset = loads('F', buf, offset)
        properties['application_headers'], = _f
    if flags & 0x1000:
        properties['delivery_mode'], = unpack_from(b'>B', buf, offset)
        offset += 1
    if flags & 0x0800:
        properties['priority'], = unpack_from(b'>B', buf, offset)
        offset += 1
    if flags & 0x0400:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['correlation_id'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    if flags & 0x0200:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['reply_to'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    if flags & 0x0100:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['expiration'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    if flags & 0x0080:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['message_id'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    if flags & 0x0040:
        properties['timestamp'], = unpack_from(b'>Q', buf, offset)
        offset += 8
    if flags & 0x0020:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['type'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    if flags & 0x0010:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['user_id'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    if flags & 0x0008:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['app_id'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    if flags & 0x0004:
        slen, = unpack_from(b'>B', buf, offset)
        offset += 1
        properties['cluster_id'] = pstr_t(buf[offset:offset + slen])
        offset += slen
    return properties, offset


PROPERTY_CLASSES = {
    Basic.CLASS_ID: decode_properties_basic,
}


class GenericContent(object):
    """Abstract base class for AMQP content.

    Subclasses should override the PROPERTIES attribute.
    """

    CLASS_ID = None
    PROPERTIES = [('dummy', 's')]

    def __init__(self, frame_method=None, frame_args=None, **props):
        self.frame_method = frame_method
        self.frame_args = frame_args

        self.properties = props
        self._pending_chunks = []
        self.body_received = 0
        self.body_size = 0
        self.ready = False

    def __getattr__(self, name):
        # Look for additional properties in the 'properties'
        # dictionary, and if present - the 'delivery_info' dictionary.
        if name == '__setstate__':
            # Allows pickling/unpickling to work
            raise AttributeError('__setstate__')

        if name in self.properties:
            return self.properties[name]
        raise AttributeError(name)

    def _load_properties(self, class_id, buf, offset=0,
                         classes=PROPERTY_CLASSES, unpack_from=unpack_from):
        """Load AMQP properties.

        Given the raw bytes containing the property-flags and property-list
        from a content-frame-header, parse and insert into a dictionary
        stored in this object as an attribute named 'properties'.
        """
        # Read 16-bit shorts until we get one with a low bit set to zero
        props, offset = classes[class_id](buf, offset)
        self.properties = props
        return offset

    def _serialize_properties(self):
        """Serialize AMQP properties.

        Serialize the 'properties' attribute (a dictionary) into
        the raw bytes making up a set of property flags and a
        property list, suitable for putting into a content frame header.
        """
        shift = 15
        flag_bits = 0
        flags = []
        sformat, svalues = [], []
        props = self.properties
        props.setdefault('content_encoding', 'utf-8')
        for key, proptype in self.PROPERTIES:
            val = props.get(key, None)
            if val is not None:
                if shift == 0:
                    flags.append(flag_bits)
                    flag_bits = 0
                    shift = 15

                flag_bits |= (1 << shift)
                if proptype != 'bit':
                    sformat.append(str_to_bytes(proptype))
                    svalues.append(val)

            shift -= 1
        flags.append(flag_bits)
        result = BytesIO()
        write = result.write
        for flag_bits in flags:
            write(pack(b'>H', flag_bits))
        write(dumps(b''.join(sformat), svalues))

        return result.getvalue()

    def inbound_header(self, buf, offset=0):
        class_id, self.body_size = unpack_from(b'>HxxQ', buf, offset)
        offset += 12
        self._load_properties(class_id, buf, offset)
        if not self.body_size:
            self.ready = True
        return offset

    def inbound_body(self, buf):
        chunks = self._pending_chunks
        self.body_received += len(buf)
        if self.body_received >= self.body_size:
            if chunks:
                chunks.append(buf)
                self.body = bytes().join(chunks)
                chunks[:] = []
            else:
                self.body = buf
            self.ready = True
        else:
            chunks.append(buf)