Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / asyncpg   python

Repository URL to install this package:

/ protocol / codecs / array.pyx

# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


from collections.abc import (Iterable as IterableABC,
                             Mapping as MappingABC,
                             Sized as SizedABC)

from asyncpg import exceptions


DEF ARRAY_MAXDIM = 6  # defined in postgresql/src/includes/c.h

# "NULL"
cdef Py_UCS4 *APG_NULL = [0x004E, 0x0055, 0x004C, 0x004C, 0x0000]


ctypedef object (*encode_func_ex)(ConnectionSettings settings,
                                  WriteBuffer buf,
                                  object obj,
                                  const void *arg)


ctypedef object (*decode_func_ex)(ConnectionSettings settings,
                                  FRBuffer *buf,
                                  const void *arg)


cdef inline bint _is_trivial_container(object obj):
    return cpython.PyUnicode_Check(obj) or cpython.PyBytes_Check(obj) or \
            cpythonx.PyByteArray_Check(obj) or cpythonx.PyMemoryView_Check(obj)


cdef inline _is_array_iterable(object obj):
    return (
        isinstance(obj, IterableABC) and
        isinstance(obj, SizedABC) and
        not _is_trivial_container(obj) and
        not isinstance(obj, MappingABC)
    )


cdef inline _is_sub_array_iterable(object obj):
    # Sub-arrays have a specialized check, because we treat
    # nested tuples as records.
    return _is_array_iterable(obj) and not cpython.PyTuple_Check(obj)


cdef _get_array_shape(object obj, int32_t *dims, int32_t *ndims):
    cdef:
        ssize_t mylen = len(obj)
        ssize_t elemlen = -2
        object it

    if mylen > _MAXINT32:
        raise ValueError('too many elements in array value')

    if ndims[0] > ARRAY_MAXDIM:
        raise ValueError(
            'number of array dimensions ({}) exceed the maximum expected ({})'.
                format(ndims[0], ARRAY_MAXDIM))

    dims[ndims[0] - 1] = <int32_t>mylen

    for elem in obj:
        if _is_sub_array_iterable(elem):
            if elemlen == -2:
                elemlen = len(elem)
                if elemlen > _MAXINT32:
                    raise ValueError('too many elements in array value')
                ndims[0] += 1
                _get_array_shape(elem, dims, ndims)
            else:
                if len(elem) != elemlen:
                    raise ValueError('non-homogeneous array')
        else:
            if elemlen >= 0:
                raise ValueError('non-homogeneous array')
            else:
                elemlen = -1


cdef _write_array_data(ConnectionSettings settings, object obj, int32_t ndims,
                       int32_t dim, WriteBuffer elem_data,
                       encode_func_ex encoder, const void *encoder_arg):
    if dim < ndims - 1:
        for item in obj:
            _write_array_data(settings, item, ndims, dim + 1, elem_data,
                              encoder, encoder_arg)
    else:
        for item in obj:
            if item is None:
                elem_data.write_int32(-1)
            else:
                try:
                    encoder(settings, elem_data, item, encoder_arg)
                except TypeError as e:
                    raise ValueError(
                        'invalid array element: {}'.format(e.args[0])) from None


cdef inline array_encode(ConnectionSettings settings, WriteBuffer buf,
                         object obj, uint32_t elem_oid,
                         encode_func_ex encoder, const void *encoder_arg):
    cdef:
        WriteBuffer elem_data
        int32_t dims[ARRAY_MAXDIM]
        int32_t ndims = 1
        int32_t i

    if not _is_array_iterable(obj):
        raise TypeError(
            'a sized iterable container expected (got type {!r})'.format(
                type(obj).__name__))

    _get_array_shape(obj, dims, &ndims)

    elem_data = WriteBuffer.new()

    if ndims > 1:
        _write_array_data(settings, obj, ndims, 0, elem_data,
                          encoder, encoder_arg)
    else:
        for i, item in enumerate(obj):
            if item is None:
                elem_data.write_int32(-1)
            else:
                try:
                    encoder(settings, elem_data, item, encoder_arg)
                except TypeError as e:
                    raise ValueError(
                        'invalid array element at index {}: {}'.format(
                            i, e.args[0])) from None

    buf.write_int32(12 + 8 * ndims + elem_data.len())
    # Number of dimensions
    buf.write_int32(ndims)
    # flags
    buf.write_int32(0)
    # element type
    buf.write_int32(<int32_t>elem_oid)
    # upper / lower bounds
    for i in range(ndims):
        buf.write_int32(dims[i])
        buf.write_int32(1)
    # element data
    buf.write_buffer(elem_data)


cdef _write_textarray_data(ConnectionSettings settings, object obj,
                           int32_t ndims, int32_t dim, WriteBuffer array_data,
                           encode_func_ex encoder, const void *encoder_arg,
                           Py_UCS4 typdelim):
    cdef:
        ssize_t i = 0
        int8_t delim = <int8_t>typdelim
        WriteBuffer elem_data
        Py_buffer pybuf
        const char *elem_str
        char ch
        ssize_t elem_len
        ssize_t quoted_elem_len
        bint need_quoting

    array_data.write_byte(b'{')

    if dim < ndims - 1:
        for item in obj:
            if i > 0:
                array_data.write_byte(delim)
                array_data.write_byte(b' ')
            _write_textarray_data(settings, item, ndims, dim + 1, array_data,
                                  encoder, encoder_arg, typdelim)
            i += 1
    else:
        for item in obj:
            elem_data = WriteBuffer.new()

            if i > 0:
                array_data.write_byte(delim)
                array_data.write_byte(b' ')

            if item is None:
                array_data.write_bytes(b'NULL')
                i += 1
                continue
            else:
                try:
                    encoder(settings, elem_data, item, encoder_arg)
                except TypeError as e:
                    raise ValueError(
                        'invalid array element: {}'.format(
                            e.args[0])) from None

            # element string length (first four bytes are the encoded length.)
            elem_len = elem_data.len() - 4

            if elem_len == 0:
                # Empty string
                array_data.write_bytes(b'""')
            else:
                cpython.PyObject_GetBuffer(
                    elem_data, &pybuf, cpython.PyBUF_SIMPLE)

                elem_str = <const char*>(pybuf.buf) + 4

                try:
                    if not apg_strcasecmp_char(elem_str, b'NULL'):
                        array_data.write_bytes(b'"NULL"')
                    else:
                        quoted_elem_len = elem_len
                        need_quoting = False

                        for i in range(elem_len):
                            ch = elem_str[i]
                            if ch == b'"' or ch == b'\\':
                                # Quotes and backslashes need escaping.
                                quoted_elem_len += 1
                                need_quoting = True
                            elif (ch == b'{' or ch == b'}' or ch == delim or
                                    apg_ascii_isspace(<uint32_t>ch)):
                                need_quoting = True

                        if need_quoting:
                            array_data.write_byte(b'"')

                            if quoted_elem_len == elem_len:
                                array_data.write_cstr(elem_str, elem_len)
                            else:
                                # Escaping required.
                                for i in range(elem_len):
                                    ch = elem_str[i]
                                    if ch == b'"' or ch == b'\\':
                                        array_data.write_byte(b'\\')
                                    array_data.write_byte(ch)

                            array_data.write_byte(b'"')
                        else:
                            array_data.write_cstr(elem_str, elem_len)
                finally:
                    cpython.PyBuffer_Release(&pybuf)

            i += 1

    array_data.write_byte(b'}')


cdef inline textarray_encode(ConnectionSettings settings, WriteBuffer buf,
                             object obj, encode_func_ex encoder,
                             const void *encoder_arg, Py_UCS4 typdelim):
    cdef:
        WriteBuffer array_data
        int32_t dims[ARRAY_MAXDIM]
        int32_t ndims = 1
        int32_t i

    if not _is_array_iterable(obj):
        raise TypeError(
            'a sized iterable container expected (got type {!r})'.format(
                type(obj).__name__))

    _get_array_shape(obj, dims, &ndims)

    array_data = WriteBuffer.new()
    _write_textarray_data(settings, obj, ndims, 0, array_data,
                          encoder, encoder_arg, typdelim)
    buf.write_int32(array_data.len())
    buf.write_buffer(array_data)


cdef inline array_decode(ConnectionSettings settings, FRBuffer *buf,
                         decode_func_ex decoder, const void *decoder_arg):
    cdef:
        int32_t ndims = hton.unpack_int32(frb_read(buf, 4))
        int32_t flags = hton.unpack_int32(frb_read(buf, 4))
        uint32_t elem_oid = <uint32_t>hton.unpack_int32(frb_read(buf, 4))
        list result
        int i
        int32_t elem_len
        int32_t elem_count = 1
        FRBuffer elem_buf
        int32_t dims[ARRAY_MAXDIM]
        Codec elem_codec

    if ndims == 0:
        result = cpython.PyList_New(0)
        return result

    if ndims > ARRAY_MAXDIM:
        raise exceptions.ProtocolError(
            'number of array dimensions ({}) exceed the maximum expected ({})'.
            format(ndims, ARRAY_MAXDIM))

    for i in range(ndims):
        dims[i] = hton.unpack_int32(frb_read(buf, 4))
        # Ignore the lower bound information
        frb_read(buf, 4)

    if ndims == 1:
        # Fast path for flat arrays
        elem_count = dims[0]
        result = cpython.PyList_New(elem_count)

        for i in range(elem_count):
            elem_len = hton.unpack_int32(frb_read(buf, 4))
            if elem_len == -1:
                elem = None
            else:
                frb_slice_from(&elem_buf, buf, elem_len)
                elem = decoder(settings, &elem_buf, decoder_arg)

            cpython.Py_INCREF(elem)
            cpython.PyList_SET_ITEM(result, i, elem)

    else:
        result = _nested_array_decode(settings, buf,
                                      decoder, decoder_arg, ndims, dims,
                                      &elem_buf)

    return result


cdef _nested_array_decode(ConnectionSettings settings,
                          FRBuffer *buf,
                          decode_func_ex decoder,
                          const void *decoder_arg,
                          int32_t ndims, int32_t *dims,
                          FRBuffer *elem_buf):

    cdef:
        int32_t elem_len
        int64_t i, j
        int64_t array_len = 1
        object elem, stride
        # An array of pointers to lists for each current array level.
        void *strides[ARRAY_MAXDIM]
        # An array of current positions at each array level.
        int32_t indexes[ARRAY_MAXDIM]

    if PG_DEBUG:
        if ndims <= 0:
            raise exceptions.ProtocolError(
Loading ...