Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / asyncpg   python

Repository URL to install this package:

/ protocol / codecs / range.pyx

# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


from asyncpg import types as apg_types

# defined in postgresql/src/include/utils/rangetypes.h
DEF RANGE_EMPTY  = 0x01  # range is empty
DEF RANGE_LB_INC = 0x02  # lower bound is inclusive
DEF RANGE_UB_INC = 0x04  # upper bound is inclusive
DEF RANGE_LB_INF = 0x08  # lower bound is -infinity
DEF RANGE_UB_INF = 0x10  # upper bound is +infinity


cdef enum _RangeArgumentType:
    _RANGE_ARGUMENT_INVALID = 0
    _RANGE_ARGUMENT_TUPLE = 1
    _RANGE_ARGUMENT_RANGE = 2


cdef inline bint _range_has_lbound(uint8_t flags):
    return not (flags & (RANGE_EMPTY | RANGE_LB_INF))


cdef inline bint _range_has_ubound(uint8_t flags):
    return not (flags & (RANGE_EMPTY | RANGE_UB_INF))


cdef inline _RangeArgumentType _range_type(object obj):
    if cpython.PyTuple_Check(obj) or cpython.PyList_Check(obj):
        return _RANGE_ARGUMENT_TUPLE
    elif isinstance(obj, apg_types.Range):
        return _RANGE_ARGUMENT_RANGE
    else:
        return _RANGE_ARGUMENT_INVALID


cdef range_encode(ConnectionSettings settings, WriteBuffer buf,
                  object obj, uint32_t elem_oid,
                  encode_func_ex encoder, const void *encoder_arg):
    cdef:
        ssize_t obj_len
        uint8_t flags = 0
        object lower = None
        object upper = None
        WriteBuffer bounds_data = WriteBuffer.new()
        _RangeArgumentType arg_type = _range_type(obj)

    if arg_type == _RANGE_ARGUMENT_INVALID:
        raise TypeError(
            'list, tuple or Range object expected (got type {})'.format(
                type(obj)))

    elif arg_type == _RANGE_ARGUMENT_TUPLE:
        obj_len = len(obj)
        if obj_len == 2:
            lower = obj[0]
            upper = obj[1]

            if lower is None:
                flags |= RANGE_LB_INF

            if upper is None:
                flags |= RANGE_UB_INF

            flags |= RANGE_LB_INC | RANGE_UB_INC

        elif obj_len == 1:
            lower = obj[0]
            flags |= RANGE_LB_INC | RANGE_UB_INF

        elif obj_len == 0:
            flags |= RANGE_EMPTY

        else:
            raise ValueError(
                'expected 0, 1 or 2 elements in range (got {})'.format(
                    obj_len))

    else:
        if obj.isempty:
            flags |= RANGE_EMPTY
        else:
            lower = obj.lower
            upper = obj.upper

            if obj.lower_inc:
                flags |= RANGE_LB_INC
            elif lower is None:
                flags |= RANGE_LB_INF

            if obj.upper_inc:
                flags |= RANGE_UB_INC
            elif upper is None:
                flags |= RANGE_UB_INF

    if _range_has_lbound(flags):
        encoder(settings, bounds_data, lower, encoder_arg)

    if _range_has_ubound(flags):
        encoder(settings, bounds_data, upper, encoder_arg)

    buf.write_int32(1 + bounds_data.len())
    buf.write_byte(<int8_t>flags)
    buf.write_buffer(bounds_data)


cdef range_decode(ConnectionSettings settings, FRBuffer *buf,
                  decode_func_ex decoder, const void *decoder_arg):
    cdef:
        uint8_t flags = <uint8_t>frb_read(buf, 1)[0]
        int32_t bound_len
        object lower = None
        object upper = None
        FRBuffer bound_buf

    if _range_has_lbound(flags):
        bound_len = hton.unpack_int32(frb_read(buf, 4))
        if bound_len == -1:
            lower = None
        else:
            frb_slice_from(&bound_buf, buf, bound_len)
            lower = decoder(settings, &bound_buf, decoder_arg)

    if _range_has_ubound(flags):
        bound_len = hton.unpack_int32(frb_read(buf, 4))
        if bound_len == -1:
            upper = None
        else:
            frb_slice_from(&bound_buf, buf, bound_len)
            upper = decoder(settings, &bound_buf, decoder_arg)

    return apg_types.Range(lower=lower, upper=upper,
                           lower_inc=(flags & RANGE_LB_INC) != 0,
                           upper_inc=(flags & RANGE_UB_INC) != 0,
                           empty=(flags & RANGE_EMPTY) != 0)


cdef init_range_codecs():
    register_core_codec(ANYRANGEOID,
                        NULL,
                        <decode_func>pgproto.text_decode,
                        PG_FORMAT_TEXT)


init_range_codecs()