Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / asyncpg   python

Repository URL to install this package:

/ protocol / codecs / base.pyx

# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


from collections.abc import Mapping as MappingABC

from asyncpg import exceptions


cdef void* binary_codec_map[(MAXSUPPORTEDOID + 1) * 2]
cdef void* text_codec_map[(MAXSUPPORTEDOID + 1) * 2]
cdef dict EXTRA_CODECS = {}


@cython.final
cdef class Codec:

    def __cinit__(self, uint32_t oid):
        self.oid = oid
        self.type = CODEC_UNDEFINED

    cdef init(self, str name, str schema, str kind,
              CodecType type, ServerDataFormat format,
              ClientExchangeFormat xformat,
              encode_func c_encoder, decode_func c_decoder,
              object py_encoder, object py_decoder,
              Codec element_codec, tuple element_type_oids,
              object element_names, list element_codecs,
              Py_UCS4 element_delimiter):

        self.name = name
        self.schema = schema
        self.kind = kind
        self.type = type
        self.format = format
        self.xformat = xformat
        self.c_encoder = c_encoder
        self.c_decoder = c_decoder
        self.py_encoder = py_encoder
        self.py_decoder = py_decoder
        self.element_codec = element_codec
        self.element_type_oids = element_type_oids
        self.element_codecs = element_codecs
        self.element_delimiter = element_delimiter
        self.element_names = element_names

        if element_names is not None:
            self.record_desc = record.ApgRecordDesc_New(
                element_names, tuple(element_names))
        else:
            self.record_desc = None

        if type == CODEC_C:
            self.encoder = <codec_encode_func>&self.encode_scalar
            self.decoder = <codec_decode_func>&self.decode_scalar
        elif type == CODEC_ARRAY:
            if format == PG_FORMAT_BINARY:
                self.encoder = <codec_encode_func>&self.encode_array
                self.decoder = <codec_decode_func>&self.decode_array
            else:
                self.encoder = <codec_encode_func>&self.encode_array_text
                self.decoder = <codec_decode_func>&self.decode_array_text
        elif type == CODEC_RANGE:
            if format != PG_FORMAT_BINARY:
                raise NotImplementedError(
                    'cannot decode type "{}"."{}": text encoding of '
                    'range types is not supported'.format(schema, name))
            self.encoder = <codec_encode_func>&self.encode_range
            self.decoder = <codec_decode_func>&self.decode_range
        elif type == CODEC_COMPOSITE:
            if format != PG_FORMAT_BINARY:
                raise NotImplementedError(
                    'cannot decode type "{}"."{}": text encoding of '
                    'composite types is not supported'.format(schema, name))
            self.encoder = <codec_encode_func>&self.encode_composite
            self.decoder = <codec_decode_func>&self.decode_composite
        elif type == CODEC_PY:
            self.encoder = <codec_encode_func>&self.encode_in_python
            self.decoder = <codec_decode_func>&self.decode_in_python
        else:
            raise exceptions.InternalClientError(
                'unexpected codec type: {}'.format(type))

    cdef Codec copy(self):
        cdef Codec codec

        codec = Codec(self.oid)
        codec.init(self.name, self.schema, self.kind,
                   self.type, self.format, self.xformat,
                   self.c_encoder, self.c_decoder,
                   self.py_encoder, self.py_decoder,
                   self.element_codec,
                   self.element_type_oids, self.element_names,
                   self.element_codecs, self.element_delimiter)

        return codec

    cdef encode_scalar(self, ConnectionSettings settings, WriteBuffer buf,
                       object obj):
        self.c_encoder(settings, buf, obj)

    cdef encode_array(self, ConnectionSettings settings, WriteBuffer buf,
                      object obj):
        array_encode(settings, buf, obj, self.element_codec.oid,
                     codec_encode_func_ex,
                     <void*>(<cpython.PyObject>self.element_codec))

    cdef encode_array_text(self, ConnectionSettings settings, WriteBuffer buf,
                           object obj):
        return textarray_encode(settings, buf, obj,
                                codec_encode_func_ex,
                                <void*>(<cpython.PyObject>self.element_codec),
                                self.element_delimiter)

    cdef encode_range(self, ConnectionSettings settings, WriteBuffer buf,
                      object obj):
        range_encode(settings, buf, obj, self.element_codec.oid,
                     codec_encode_func_ex,
                     <void*>(<cpython.PyObject>self.element_codec))

    cdef encode_composite(self, ConnectionSettings settings, WriteBuffer buf,
                          object obj):
        cdef:
            WriteBuffer elem_data
            int i
            list elem_codecs = self.element_codecs
            ssize_t count
            ssize_t composite_size
            tuple rec

        if isinstance(obj, MappingABC):
            # Input is dict-like, form a tuple
            composite_size = len(self.element_type_oids)
            rec = cpython.PyTuple_New(composite_size)

            for i in range(composite_size):
                cpython.Py_INCREF(None)
                cpython.PyTuple_SET_ITEM(rec, i, None)

            for field in obj:
                try:
                    i = self.element_names[field]
                except KeyError:
                    raise ValueError(
                        '{!r} is not a valid element of composite '
                        'type {}'.format(field, self.name)) from None

                item = obj[field]
                cpython.Py_INCREF(item)
                cpython.PyTuple_SET_ITEM(rec, i, item)

            obj = rec

        count = len(obj)
        if count > _MAXINT32:
            raise ValueError('too many elements in composite type record')

        elem_data = WriteBuffer.new()
        i = 0
        for item in obj:
            elem_data.write_int32(<int32_t>self.element_type_oids[i])
            if item is None:
                elem_data.write_int32(-1)
            else:
                (<Codec>elem_codecs[i]).encode(settings, elem_data, item)
            i += 1

        record_encode_frame(settings, buf, elem_data, <int32_t>count)

    cdef encode_in_python(self, ConnectionSettings settings, WriteBuffer buf,
                          object obj):
        data = self.py_encoder(obj)
        if self.xformat == PG_XFORMAT_OBJECT:
            if self.format == PG_FORMAT_BINARY:
                pgproto.bytea_encode(settings, buf, data)
            elif self.format == PG_FORMAT_TEXT:
                pgproto.text_encode(settings, buf, data)
            else:
                raise exceptions.InternalClientError(
                    'unexpected data format: {}'.format(self.format))
        elif self.xformat == PG_XFORMAT_TUPLE:
            self.c_encoder(settings, buf, data)
        else:
            raise exceptions.InternalClientError(
                'unexpected exchange format: {}'.format(self.xformat))

    cdef encode(self, ConnectionSettings settings, WriteBuffer buf,
                object obj):
        return self.encoder(self, settings, buf, obj)

    cdef decode_scalar(self, ConnectionSettings settings, FRBuffer *buf):
        return self.c_decoder(settings, buf)

    cdef decode_array(self, ConnectionSettings settings, FRBuffer *buf):
        return array_decode(settings, buf, codec_decode_func_ex,
                            <void*>(<cpython.PyObject>self.element_codec))

    cdef decode_array_text(self, ConnectionSettings settings,
                           FRBuffer *buf):
        return textarray_decode(settings, buf, codec_decode_func_ex,
                                <void*>(<cpython.PyObject>self.element_codec),
                                self.element_delimiter)

    cdef decode_range(self, ConnectionSettings settings, FRBuffer *buf):
        return range_decode(settings, buf, codec_decode_func_ex,
                            <void*>(<cpython.PyObject>self.element_codec))

    cdef decode_composite(self, ConnectionSettings settings,
                          FRBuffer *buf):
        cdef:
            object result
            ssize_t elem_count
            ssize_t i
            int32_t elem_len
            uint32_t elem_typ
            uint32_t received_elem_typ
            Codec elem_codec
            FRBuffer elem_buf

        elem_count = <ssize_t><uint32_t>hton.unpack_int32(frb_read(buf, 4))
        if elem_count != len(self.element_type_oids):
            raise exceptions.OutdatedSchemaCacheError(
                'unexpected number of attributes of composite type: '
                '{}, expected {}'
                    .format(
                        elem_count,
                        len(self.element_type_oids),
                    ),
                schema=self.schema,
                data_type=self.name,
            )
        result = record.ApgRecord_New(self.record_desc, elem_count)
        for i in range(elem_count):
            elem_typ = self.element_type_oids[i]
            received_elem_typ = <uint32_t>hton.unpack_int32(frb_read(buf, 4))

            if received_elem_typ != elem_typ:
                raise exceptions.OutdatedSchemaCacheError(
                    'unexpected data type of composite type attribute {}: '
                    '{!r}, expected {!r}'
                        .format(
                            i,
                            BUILTIN_TYPE_OID_MAP.get(
                                received_elem_typ, received_elem_typ),
                            BUILTIN_TYPE_OID_MAP.get(
                                elem_typ, elem_typ)
                        ),
                    schema=self.schema,
                    data_type=self.name,
                    position=i,
                )

            elem_len = hton.unpack_int32(frb_read(buf, 4))
            if elem_len == -1:
                elem = None
            else:
                elem_codec = self.element_codecs[i]
                elem = elem_codec.decode(
                    settings, frb_slice_from(&elem_buf, buf, elem_len))

            cpython.Py_INCREF(elem)
            record.ApgRecord_SET_ITEM(result, i, elem)

        return result

    cdef decode_in_python(self, ConnectionSettings settings,
                          FRBuffer *buf):
        if self.xformat == PG_XFORMAT_OBJECT:
            if self.format == PG_FORMAT_BINARY:
                data = pgproto.bytea_decode(settings, buf)
            elif self.format == PG_FORMAT_TEXT:
                data = pgproto.text_decode(settings, buf)
            else:
                raise exceptions.InternalClientError(
                    'unexpected data format: {}'.format(self.format))
        elif self.xformat == PG_XFORMAT_TUPLE:
            data = self.c_decoder(settings, buf)
        else:
            raise exceptions.InternalClientError(
                'unexpected exchange format: {}'.format(self.xformat))

        return self.py_decoder(data)

    cdef inline decode(self, ConnectionSettings settings, FRBuffer *buf):
        return self.decoder(self, settings, buf)

    cdef inline has_encoder(self):
        cdef Codec elem_codec

        if self.c_encoder is not NULL or self.py_encoder is not None:
            return True

        elif self.type == CODEC_ARRAY or self.type == CODEC_RANGE:
            return self.element_codec.has_encoder()

        elif self.type == CODEC_COMPOSITE:
            for elem_codec in self.element_codecs:
                if not elem_codec.has_encoder():
                    return False
            return True

        else:
            return False

    cdef has_decoder(self):
        cdef Codec elem_codec

        if self.c_decoder is not NULL or self.py_decoder is not None:
            return True

        elif self.type == CODEC_ARRAY or self.type == CODEC_RANGE:
            return self.element_codec.has_decoder()

        elif self.type == CODEC_COMPOSITE:
            for elem_codec in self.element_codecs:
                if not elem_codec.has_decoder():
                    return False
            return True

        else:
            return False

    cdef is_binary(self):
        return self.format == PG_FORMAT_BINARY

    def __repr__(self):
        return '<Codec oid={} elem_oid={} core={}>'.format(
            self.oid,
            'NA' if self.element_codec is None else self.element_codec.oid,
            has_core_codec(self.oid))

    @staticmethod
    cdef Codec new_array_codec(uint32_t oid,
                               str name,
                               str schema,
                               Codec element_codec,
                               Py_UCS4 element_delimiter):
        cdef Codec codec
        codec = Codec(oid)
        codec.init(name, schema, 'array', CODEC_ARRAY, element_codec.format,
                   PG_XFORMAT_OBJECT, NULL, NULL, None, None, element_codec,
                   None, None, None, element_delimiter)
Loading ...