# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
from hashlib import md5 as hashlib_md5 # for MD5 authentication
cdef class CoreProtocol:
def __init__(self, con_params):
# type of `con_params` is `_ConnectionParameters`
self.buffer = ReadBuffer()
self.user = con_params.user
self.password = con_params.password
self.auth_msg = None
self.con_params = con_params
self.con_status = CONNECTION_BAD
self.state = PROTOCOL_IDLE
self.xact_status = PQTRANS_IDLE
self.encoding = 'utf-8'
# executemany support data
self._execute_iter = None
self._execute_portal_name = None
self._execute_stmt_name = None
self._reset_result()
cdef _read_server_messages(self):
cdef:
char mtype
ProtocolState state
pgproto.take_message_method take_message = \
<pgproto.take_message_method>self.buffer.take_message
pgproto.get_message_type_method get_message_type= \
<pgproto.get_message_type_method>self.buffer.get_message_type
while take_message(self.buffer) == 1:
mtype = get_message_type(self.buffer)
state = self.state
try:
if mtype == b'S':
# ParameterStatus
self._parse_msg_parameter_status()
elif mtype == b'A':
# NotificationResponse
self._parse_msg_notification()
elif mtype == b'N':
# 'N' - NoticeResponse
self._on_notice(self._parse_msg_error_response(False))
elif state == PROTOCOL_AUTH:
self._process__auth(mtype)
elif state == PROTOCOL_PREPARE:
self._process__prepare(mtype)
elif state == PROTOCOL_BIND_EXECUTE:
self._process__bind_execute(mtype)
elif state == PROTOCOL_BIND_EXECUTE_MANY:
self._process__bind_execute_many(mtype)
elif state == PROTOCOL_EXECUTE:
self._process__bind_execute(mtype)
elif state == PROTOCOL_BIND:
self._process__bind(mtype)
elif state == PROTOCOL_CLOSE_STMT_PORTAL:
self._process__close_stmt_portal(mtype)
elif state == PROTOCOL_SIMPLE_QUERY:
self._process__simple_query(mtype)
elif state == PROTOCOL_COPY_OUT:
self._process__copy_out(mtype)
elif (state == PROTOCOL_COPY_OUT_DATA or
state == PROTOCOL_COPY_OUT_DONE):
self._process__copy_out_data(mtype)
elif state == PROTOCOL_COPY_IN:
self._process__copy_in(mtype)
elif state == PROTOCOL_COPY_IN_DATA:
self._process__copy_in_data(mtype)
elif state == PROTOCOL_CANCELLED:
# discard all messages until the sync message
if mtype == b'E':
self._parse_msg_error_response(True)
elif mtype == b'Z':
self._parse_msg_ready_for_query()
self._push_result()
else:
self.buffer.discard_message()
elif state == PROTOCOL_ERROR_CONSUME:
# Error in protocol (on asyncpg side);
# discard all messages until sync message
if mtype == b'Z':
# Sync point, self to push the result
if self.result_type != RESULT_FAILED:
self.result_type = RESULT_FAILED
self.result = apg_exc.InternalClientError(
'unknown error in protocol implementation')
self._push_result()
else:
self.buffer.discard_message()
elif state == PROTOCOL_TERMINATING:
# The connection is being terminated.
# discard all messages until connection
# termination.
self.buffer.discard_message()
else:
raise apg_exc.InternalClientError(
f'cannot process message {chr(mtype)!r}: '
f'protocol is in an unexpected state {state!r}.')
except Exception as ex:
self.result_type = RESULT_FAILED
self.result = ex
if mtype == b'Z':
self._push_result()
else:
self.state = PROTOCOL_ERROR_CONSUME
finally:
self.buffer.finish_message()
cdef _process__auth(self, char mtype):
if mtype == b'R':
# Authentication...
self._parse_msg_authentication()
if self.result_type != RESULT_OK:
self.con_status = CONNECTION_BAD
self._push_result()
elif self.auth_msg is not None:
# Server wants us to send auth data, so do that.
self._write(self.auth_msg)
self.auth_msg = None
elif mtype == b'K':
# BackendKeyData
self._parse_msg_backend_key_data()
elif mtype == b'E':
# ErrorResponse
self.con_status = CONNECTION_BAD
self._parse_msg_error_response(True)
self._push_result()
elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self.con_status = CONNECTION_OK
self._push_result()
cdef _process__prepare(self, char mtype):
if mtype == b't':
# Parameters description
self.result_param_desc = self.buffer.consume_message()
elif mtype == b'1':
# ParseComplete
self.buffer.discard_message()
elif mtype == b'T':
# Row description
self.result_row_desc = self.buffer.consume_message()
elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)
elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()
elif mtype == b'n':
# NoData
self.buffer.discard_message()
cdef _process__bind_execute(self, char mtype):
if mtype == b'D':
# DataRow
self._parse_data_msgs()
elif mtype == b's':
# PortalSuspended
self.buffer.discard_message()
elif mtype == b'C':
# CommandComplete
self.result_execute_completed = True
self._parse_msg_command_complete()
elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)
elif mtype == b'2':
# BindComplete
self.buffer.discard_message()
elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()
elif mtype == b'I':
# EmptyQueryResponse
self.buffer.discard_message()
cdef _process__bind_execute_many(self, char mtype):
cdef WriteBuffer buf
if mtype == b'D':
# DataRow
self._parse_data_msgs()
elif mtype == b's':
# PortalSuspended
self.buffer.discard_message()
elif mtype == b'C':
# CommandComplete
self._parse_msg_command_complete()
elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)
elif mtype == b'2':
# BindComplete
self.buffer.discard_message()
elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
if self.result_type == RESULT_FAILED:
self._push_result()
else:
try:
buf = <WriteBuffer>next(self._execute_iter)
except StopIteration:
self._push_result()
except Exception as e:
self.result_type = RESULT_FAILED
self.result = e
self._push_result()
else:
# Next iteration over the executemany() arg sequence
self._send_bind_message(
self._execute_portal_name, self._execute_stmt_name,
buf, 0)
elif mtype == b'I':
# EmptyQueryResponse
self.buffer.discard_message()
cdef _process__bind(self, char mtype):
if mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)
elif mtype == b'2':
# BindComplete
self.buffer.discard_message()
elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()
cdef _process__close_stmt_portal(self, char mtype):
if mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)
elif mtype == b'3':
# CloseComplete
self.buffer.discard_message()
elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()
cdef _process__simple_query(self, char mtype):
if mtype in {b'D', b'I', b'T'}:
# 'D' - DataRow
# 'I' - EmptyQueryResponse
# 'T' - RowDescription
self.buffer.discard_message()
elif mtype == b'E':
# ErrorResponse
self._parse_msg_error_response(True)
elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()
elif mtype == b'C':
# CommandComplete
self._parse_msg_command_complete()
else:
# We don't really care about COPY IN etc
self.buffer.discard_message()
cdef _process__copy_out(self, char mtype):
if mtype == b'E':
self._parse_msg_error_response(True)
elif mtype == b'H':
# CopyOutResponse
self._set_state(PROTOCOL_COPY_OUT_DATA)
self.buffer.discard_message()
elif mtype == b'Z':
# ReadyForQuery
self._parse_msg_ready_for_query()
self._push_result()
cdef _process__copy_out_data(self, char mtype):
if mtype == b'E':
self._parse_msg_error_response(True)
Loading ...