# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
from libc.string cimport memcpy
import collections
from . import exceptions
@cython.no_gc_clear
@cython.final
@cython.freelist(_BUFFER_FREELIST_SIZE)
cdef class WriteBuffer:
def __cinit__(self):
self._smallbuf_inuse = True
self._buf = self._smallbuf
self._size = _BUFFER_INITIAL_SIZE
self._length = 0
self._message_mode = 0
def __dealloc__(self):
if self._buf is not NULL and not self._smallbuf_inuse:
cpython.PyMem_Free(self._buf)
self._buf = NULL
self._size = 0
if self._view_count:
raise exceptions.BufferError(
'Deallocating buffer with attached memoryviews')
def __getbuffer__(self, Py_buffer *buffer, int flags):
self._view_count += 1
cpython.PyBuffer_FillInfo(
buffer, self, self._buf, self._length,
1, # read-only
flags)
def __releasebuffer__(self, Py_buffer *buffer):
self._view_count -= 1
cdef inline _check_readonly(self):
if self._view_count:
raise exceptions.BufferError('the buffer is in read-only mode')
cdef inline _ensure_alloced(self, ssize_t extra_length):
cdef ssize_t new_size = extra_length + self._length
if new_size > self._size:
self._reallocate(new_size)
cdef _reallocate(self, ssize_t new_size):
cdef char *new_buf
if new_size < _BUFFER_MAX_GROW:
new_size = _BUFFER_MAX_GROW
else:
# Add a little extra
new_size += _BUFFER_INITIAL_SIZE
if self._smallbuf_inuse:
new_buf = <char*>cpython.PyMem_Malloc(
sizeof(char) * <size_t>new_size)
if new_buf is NULL:
self._buf = NULL
self._size = 0
self._length = 0
raise MemoryError
memcpy(new_buf, self._buf, <size_t>self._size)
self._size = new_size
self._buf = new_buf
self._smallbuf_inuse = False
else:
new_buf = <char*>cpython.PyMem_Realloc(
<void*>self._buf, <size_t>new_size)
if new_buf is NULL:
cpython.PyMem_Free(self._buf)
self._buf = NULL
self._size = 0
self._length = 0
raise MemoryError
self._buf = new_buf
self._size = new_size
cdef inline start_message(self, char type):
if self._length != 0:
raise exceptions.BufferError(
'cannot start_message for a non-empty buffer')
self._ensure_alloced(5)
self._message_mode = 1
self._buf[0] = type
self._length = 5
cdef inline end_message(self):
# "length-1" to exclude the message type byte
cdef ssize_t mlen = self._length - 1
self._check_readonly()
if not self._message_mode:
raise exceptions.BufferError(
'end_message can only be called with start_message')
if self._length < 5:
raise exceptions.BufferError('end_message: buffer is too small')
if mlen > _MAXINT32:
raise exceptions.BufferError('end_message: message is too large')
hton.pack_int32(&self._buf[1], <int32_t>mlen)
return self
cdef write_buffer(self, WriteBuffer buf):
self._check_readonly()
if not buf._length:
return
self._ensure_alloced(buf._length)
memcpy(self._buf + self._length,
<void*>buf._buf,
<size_t>buf._length)
self._length += buf._length
cdef write_byte(self, char b):
self._check_readonly()
self._ensure_alloced(1)
self._buf[self._length] = b
self._length += 1
cdef write_bytes(self, bytes data):
cdef char* buf
cdef ssize_t len
cpython.PyBytes_AsStringAndSize(data, &buf, &len)
self.write_cstr(buf, len)
cdef write_bytestring(self, bytes string):
cdef char* buf
cdef ssize_t len
cpython.PyBytes_AsStringAndSize(string, &buf, &len)
# PyBytes_AsStringAndSize returns a null-terminated buffer,
# but the null byte is not counted in len. hence the + 1
self.write_cstr(buf, len + 1)
cdef write_str(self, str string, str encoding):
self.write_bytestring(string.encode(encoding))
cdef write_utf8(self, str string):
self.write_bytestring(string.encode('utf-8'))
cdef write_cstr(self, const char *data, ssize_t len):
self._check_readonly()
self._ensure_alloced(len)
memcpy(self._buf + self._length, <void*>data, <size_t>len)
self._length += len
cdef write_int16(self, int16_t i):
self._check_readonly()
self._ensure_alloced(2)
hton.pack_int16(&self._buf[self._length], i)
self._length += 2
cdef write_int32(self, int32_t i):
self._check_readonly()
self._ensure_alloced(4)
hton.pack_int32(&self._buf[self._length], i)
self._length += 4
cdef write_int64(self, int64_t i):
self._check_readonly()
self._ensure_alloced(8)
hton.pack_int64(&self._buf[self._length], i)
self._length += 8
cdef write_float(self, float f):
self._check_readonly()
self._ensure_alloced(4)
hton.pack_float(&self._buf[self._length], f)
self._length += 4
cdef write_double(self, double d):
self._check_readonly()
self._ensure_alloced(8)
hton.pack_double(&self._buf[self._length], d)
self._length += 8
@staticmethod
cdef WriteBuffer new_message(char type):
cdef WriteBuffer buf
buf = WriteBuffer.__new__(WriteBuffer)
buf.start_message(type)
return buf
@staticmethod
cdef WriteBuffer new():
cdef WriteBuffer buf
buf = WriteBuffer.__new__(WriteBuffer)
return buf
@cython.no_gc_clear
@cython.final
@cython.freelist(_BUFFER_FREELIST_SIZE)
cdef class ReadBuffer:
def __cinit__(self):
self._bufs = collections.deque()
self._bufs_append = self._bufs.append
self._bufs_popleft = self._bufs.popleft
self._bufs_len = 0
self._buf0 = None
self._buf0_prev = None
self._pos0 = 0
self._len0 = 0
self._length = 0
self._current_message_type = 0
self._current_message_len = 0
self._current_message_len_unread = 0
self._current_message_ready = 0
cdef feed_data(self, data):
cdef:
ssize_t dlen
bytes data_bytes
if not cpython.PyBytes_CheckExact(data):
raise exceptions.BufferError('feed_data: bytes object expected')
data_bytes = <bytes>data
dlen = cpython.Py_SIZE(data_bytes)
if dlen == 0:
# EOF?
return
self._bufs_append(data_bytes)
self._length += dlen
if self._bufs_len == 0:
# First buffer
self._len0 = dlen
self._buf0 = data_bytes
self._bufs_len += 1
cdef inline _ensure_first_buf(self):
if self._len0 == 0:
raise exceptions.BufferError('empty first buffer')
if self._pos0 == self._len0:
self._switch_to_next_buf()
cdef _switch_to_next_buf(self):
# The first buffer is fully read, discard it
self._bufs_popleft()
self._bufs_len -= 1
# Shouldn't fail, since we've checked that `_length >= 1`
# in _ensure_first_buf()
self._buf0_prev = self._buf0
self._buf0 = <bytes>self._bufs[0]
self._pos0 = 0
self._len0 = len(self._buf0)
if PG_DEBUG:
if self._len0 < 1:
raise exceptions.BufferError(
'debug: second buffer of ReadBuffer is empty')
cdef inline const char* _try_read_bytes(self, ssize_t nbytes):
# Try to read *nbytes* from the first buffer.
#
# Returns pointer to data if there is at least *nbytes*
# in the buffer, NULL otherwise.
#
# Important: caller must call _ensure_first_buf() prior
# to calling try_read_bytes, and must not overread
cdef:
const char *result
if PG_DEBUG:
if nbytes > self._length:
return NULL
if self._current_message_ready:
if self._current_message_len_unread < nbytes:
return NULL
if self._pos0 + nbytes <= self._len0:
result = cpython.PyBytes_AS_STRING(self._buf0)
result += self._pos0
self._pos0 += nbytes
self._length -= nbytes
if self._current_message_ready:
self._current_message_len_unread -= nbytes
return result
else:
return NULL
cdef inline _read_into(self, char *buf, ssize_t nbytes):
cdef:
ssize_t nread
char *buf0
while True:
buf0 = cpython.PyBytes_AS_STRING(self._buf0)
if self._pos0 + nbytes > self._len0:
nread = self._len0 - self._pos0
memcpy(buf, buf0 + self._pos0, <size_t>nread)
self._pos0 = self._len0
self._length -= nread
nbytes -= nread
buf += nread
self._ensure_first_buf()
else:
memcpy(buf, buf0 + self._pos0, <size_t>nbytes)
self._pos0 += nbytes
self._length -= nbytes
break
cdef inline _read_and_discard(self, ssize_t nbytes):
cdef:
ssize_t nread
self._ensure_first_buf()
while True:
if self._pos0 + nbytes > self._len0:
nread = self._len0 - self._pos0
self._pos0 = self._len0
Loading ...