# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
from collections.abc import (Iterable as IterableABC,
Mapping as MappingABC,
Sized as SizedABC)
from asyncpg import exceptions
DEF ARRAY_MAXDIM = 6 # defined in postgresql/src/includes/c.h
# "NULL"
cdef Py_UCS4 *APG_NULL = [0x004E, 0x0055, 0x004C, 0x004C, 0x0000]
ctypedef object (*encode_func_ex)(ConnectionSettings settings,
WriteBuffer buf,
object obj,
const void *arg)
ctypedef object (*decode_func_ex)(ConnectionSettings settings,
FRBuffer *buf,
const void *arg)
cdef inline bint _is_trivial_container(object obj):
return cpython.PyUnicode_Check(obj) or cpython.PyBytes_Check(obj) or \
cpythonx.PyByteArray_Check(obj) or cpythonx.PyMemoryView_Check(obj)
cdef inline _is_array_iterable(object obj):
return (
isinstance(obj, IterableABC) and
isinstance(obj, SizedABC) and
not _is_trivial_container(obj) and
not isinstance(obj, MappingABC)
)
cdef inline _is_sub_array_iterable(object obj):
# Sub-arrays have a specialized check, because we treat
# nested tuples as records.
return _is_array_iterable(obj) and not cpython.PyTuple_Check(obj)
cdef _get_array_shape(object obj, int32_t *dims, int32_t *ndims):
cdef:
ssize_t mylen = len(obj)
ssize_t elemlen = -2
object it
if mylen > _MAXINT32:
raise ValueError('too many elements in array value')
if ndims[0] > ARRAY_MAXDIM:
raise ValueError(
'number of array dimensions ({}) exceed the maximum expected ({})'.
format(ndims[0], ARRAY_MAXDIM))
dims[ndims[0] - 1] = <int32_t>mylen
for elem in obj:
if _is_sub_array_iterable(elem):
if elemlen == -2:
elemlen = len(elem)
if elemlen > _MAXINT32:
raise ValueError('too many elements in array value')
ndims[0] += 1
_get_array_shape(elem, dims, ndims)
else:
if len(elem) != elemlen:
raise ValueError('non-homogeneous array')
else:
if elemlen >= 0:
raise ValueError('non-homogeneous array')
else:
elemlen = -1
cdef _write_array_data(ConnectionSettings settings, object obj, int32_t ndims,
int32_t dim, WriteBuffer elem_data,
encode_func_ex encoder, const void *encoder_arg):
if dim < ndims - 1:
for item in obj:
_write_array_data(settings, item, ndims, dim + 1, elem_data,
encoder, encoder_arg)
else:
for item in obj:
if item is None:
elem_data.write_int32(-1)
else:
try:
encoder(settings, elem_data, item, encoder_arg)
except TypeError as e:
raise ValueError(
'invalid array element: {}'.format(e.args[0])) from None
cdef inline array_encode(ConnectionSettings settings, WriteBuffer buf,
object obj, uint32_t elem_oid,
encode_func_ex encoder, const void *encoder_arg):
cdef:
WriteBuffer elem_data
int32_t dims[ARRAY_MAXDIM]
int32_t ndims = 1
int32_t i
if not _is_array_iterable(obj):
raise TypeError(
'a sized iterable container expected (got type {!r})'.format(
type(obj).__name__))
_get_array_shape(obj, dims, &ndims)
elem_data = WriteBuffer.new()
if ndims > 1:
_write_array_data(settings, obj, ndims, 0, elem_data,
encoder, encoder_arg)
else:
for i, item in enumerate(obj):
if item is None:
elem_data.write_int32(-1)
else:
try:
encoder(settings, elem_data, item, encoder_arg)
except TypeError as e:
raise ValueError(
'invalid array element at index {}: {}'.format(
i, e.args[0])) from None
buf.write_int32(12 + 8 * ndims + elem_data.len())
# Number of dimensions
buf.write_int32(ndims)
# flags
buf.write_int32(0)
# element type
buf.write_int32(<int32_t>elem_oid)
# upper / lower bounds
for i in range(ndims):
buf.write_int32(dims[i])
buf.write_int32(1)
# element data
buf.write_buffer(elem_data)
cdef _write_textarray_data(ConnectionSettings settings, object obj,
int32_t ndims, int32_t dim, WriteBuffer array_data,
encode_func_ex encoder, const void *encoder_arg,
Py_UCS4 typdelim):
cdef:
ssize_t i = 0
int8_t delim = <int8_t>typdelim
WriteBuffer elem_data
Py_buffer pybuf
const char *elem_str
char ch
ssize_t elem_len
ssize_t quoted_elem_len
bint need_quoting
array_data.write_byte(b'{')
if dim < ndims - 1:
for item in obj:
if i > 0:
array_data.write_byte(delim)
array_data.write_byte(b' ')
_write_textarray_data(settings, item, ndims, dim + 1, array_data,
encoder, encoder_arg, typdelim)
i += 1
else:
for item in obj:
elem_data = WriteBuffer.new()
if i > 0:
array_data.write_byte(delim)
array_data.write_byte(b' ')
if item is None:
array_data.write_bytes(b'NULL')
i += 1
continue
else:
try:
encoder(settings, elem_data, item, encoder_arg)
except TypeError as e:
raise ValueError(
'invalid array element: {}'.format(
e.args[0])) from None
# element string length (first four bytes are the encoded length.)
elem_len = elem_data.len() - 4
if elem_len == 0:
# Empty string
array_data.write_bytes(b'""')
else:
cpython.PyObject_GetBuffer(
elem_data, &pybuf, cpython.PyBUF_SIMPLE)
elem_str = <const char*>(pybuf.buf) + 4
try:
if not apg_strcasecmp_char(elem_str, b'NULL'):
array_data.write_bytes(b'"NULL"')
else:
quoted_elem_len = elem_len
need_quoting = False
for i in range(elem_len):
ch = elem_str[i]
if ch == b'"' or ch == b'\\':
# Quotes and backslashes need escaping.
quoted_elem_len += 1
need_quoting = True
elif (ch == b'{' or ch == b'}' or ch == delim or
apg_ascii_isspace(<uint32_t>ch)):
need_quoting = True
if need_quoting:
array_data.write_byte(b'"')
if quoted_elem_len == elem_len:
array_data.write_cstr(elem_str, elem_len)
else:
# Escaping required.
for i in range(elem_len):
ch = elem_str[i]
if ch == b'"' or ch == b'\\':
array_data.write_byte(b'\\')
array_data.write_byte(ch)
array_data.write_byte(b'"')
else:
array_data.write_cstr(elem_str, elem_len)
finally:
cpython.PyBuffer_Release(&pybuf)
i += 1
array_data.write_byte(b'}')
cdef inline textarray_encode(ConnectionSettings settings, WriteBuffer buf,
object obj, encode_func_ex encoder,
const void *encoder_arg, Py_UCS4 typdelim):
cdef:
WriteBuffer array_data
int32_t dims[ARRAY_MAXDIM]
int32_t ndims = 1
int32_t i
if not _is_array_iterable(obj):
raise TypeError(
'a sized iterable container expected (got type {!r})'.format(
type(obj).__name__))
_get_array_shape(obj, dims, &ndims)
array_data = WriteBuffer.new()
_write_textarray_data(settings, obj, ndims, 0, array_data,
encoder, encoder_arg, typdelim)
buf.write_int32(array_data.len())
buf.write_buffer(array_data)
cdef inline array_decode(ConnectionSettings settings, FRBuffer *buf,
decode_func_ex decoder, const void *decoder_arg):
cdef:
int32_t ndims = hton.unpack_int32(frb_read(buf, 4))
int32_t flags = hton.unpack_int32(frb_read(buf, 4))
uint32_t elem_oid = <uint32_t>hton.unpack_int32(frb_read(buf, 4))
list result
int i
int32_t elem_len
int32_t elem_count = 1
FRBuffer elem_buf
int32_t dims[ARRAY_MAXDIM]
Codec elem_codec
if ndims == 0:
result = cpython.PyList_New(0)
return result
if ndims > ARRAY_MAXDIM:
raise exceptions.ProtocolError(
'number of array dimensions ({}) exceed the maximum expected ({})'.
format(ndims, ARRAY_MAXDIM))
for i in range(ndims):
dims[i] = hton.unpack_int32(frb_read(buf, 4))
# Ignore the lower bound information
frb_read(buf, 4)
if ndims == 1:
# Fast path for flat arrays
elem_count = dims[0]
result = cpython.PyList_New(elem_count)
for i in range(elem_count):
elem_len = hton.unpack_int32(frb_read(buf, 4))
if elem_len == -1:
elem = None
else:
frb_slice_from(&elem_buf, buf, elem_len)
elem = decoder(settings, &elem_buf, decoder_arg)
cpython.Py_INCREF(elem)
cpython.PyList_SET_ITEM(result, i, elem)
else:
result = _nested_array_decode(settings, buf,
decoder, decoder_arg, ndims, dims,
&elem_buf)
return result
cdef _nested_array_decode(ConnectionSettings settings,
FRBuffer *buf,
decode_func_ex decoder,
const void *decoder_arg,
int32_t ndims, int32_t *dims,
FRBuffer *elem_buf):
cdef:
int32_t elem_len
int64_t i, j
int64_t array_len = 1
object elem, stride
# An array of pointers to lists for each current array level.
void *strides[ARRAY_MAXDIM]
# An array of current positions at each array level.
int32_t indexes[ARRAY_MAXDIM]
if PG_DEBUG:
if ndims <= 0:
raise exceptions.ProtocolError(
Loading ...