Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / asyncpg   python

Repository URL to install this package:

/ pgproto / codecs / txid.pyx

# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


cdef txid_snapshot_encode(CodecContext settings, WriteBuffer buf, obj):
    cdef:
        ssize_t nxip
        int64_t xmin
        int64_t xmax
        int i
        WriteBuffer xip_buf = WriteBuffer.new()

    if not (cpython.PyTuple_Check(obj) or cpython.PyList_Check(obj)):
        raise TypeError(
            'list or tuple expected (got type {})'.format(type(obj)))

    if len(obj) != 3:
        raise ValueError(
            'invalid number of elements in txid_snapshot tuple, expecting 4')

    nxip = len(obj[2])
    if nxip > _MAXINT32:
        raise ValueError('txid_snapshot value is too long')

    xmin = obj[0]
    xmax = obj[1]

    for i in range(nxip):
        xip_buf.write_int64(obj[2][i])

    buf.write_int32(20 + xip_buf.len())

    buf.write_int32(<int32_t>nxip)
    buf.write_int64(obj[0])
    buf.write_int64(obj[1])
    buf.write_buffer(xip_buf)


cdef txid_snapshot_decode(CodecContext settings, FRBuffer *buf):
    cdef:
        int32_t nxip
        int64_t xmin
        int64_t xmax
        tuple xip_tup
        int32_t i
        object xip

    nxip = hton.unpack_int32(frb_read(buf, 4))
    xmin = hton.unpack_int64(frb_read(buf, 8))
    xmax = hton.unpack_int64(frb_read(buf, 8))

    xip_tup = cpython.PyTuple_New(nxip)
    for i in range(nxip):
        xip = cpython.PyLong_FromLongLong(hton.unpack_int64(frb_read(buf, 8)))
        cpython.Py_INCREF(xip)
        cpython.PyTuple_SET_ITEM(xip_tup, i, xip)

    return (xmin, xmax, xip_tup)