# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Invocation-side implementation of gRPC Python."""
import copy
import functools
import logging
import os
import sys
import threading
import time
import grpc
import grpc.experimental
from grpc import _compression
from grpc import _common
from grpc import _grpcio_metadata
from grpc._cython import cygrpc
_LOGGER = logging.getLogger(__name__)
_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
_EMPTY_FLAGS = 0
# NOTE(rbellevi): No guarantees are given about the maintenance of this
# environment variable.
_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
"GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
_UNARY_UNARY_INITIAL_DUE = (
cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.send_message,
cygrpc.OperationType.send_close_from_client,
cygrpc.OperationType.receive_initial_metadata,
cygrpc.OperationType.receive_message,
cygrpc.OperationType.receive_status_on_client,
)
_UNARY_STREAM_INITIAL_DUE = (
cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.send_message,
cygrpc.OperationType.send_close_from_client,
cygrpc.OperationType.receive_initial_metadata,
cygrpc.OperationType.receive_status_on_client,
)
_STREAM_UNARY_INITIAL_DUE = (
cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.receive_initial_metadata,
cygrpc.OperationType.receive_message,
cygrpc.OperationType.receive_status_on_client,
)
_STREAM_STREAM_INITIAL_DUE = (
cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.receive_initial_metadata,
cygrpc.OperationType.receive_status_on_client,
)
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
'\tstatus = {}\n'
'\tdetails = "{}"\n'
'>')
_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
'\tstatus = {}\n'
'\tdetails = "{}"\n'
'\tdebug_error_string = "{}"\n'
'>')
def _deadline(timeout):
return None if timeout is None else time.time() + timeout
def _unknown_code_details(unknown_cygrpc_code, details):
return 'Server sent unknown code {} and details "{}"'.format(
unknown_cygrpc_code, details)
class _RPCState(object):
def __init__(self, due, initial_metadata, trailing_metadata, code, details):
self.condition = threading.Condition()
# The cygrpc.OperationType objects representing events due from the RPC's
# completion queue.
self.due = set(due)
self.initial_metadata = initial_metadata
self.response = None
self.trailing_metadata = trailing_metadata
self.code = code
self.details = details
self.debug_error_string = None
# The semantics of grpc.Future.cancel and grpc.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
# result of the RPC. This field tracks whether cancellation was requested
# prior to termination of the RPC.
self.cancelled = False
self.callbacks = []
self.fork_epoch = cygrpc.get_fork_epoch()
def reset_postfork_child(self):
self.condition = threading.Condition()
def _abort(state, code, details):
if state.code is None:
state.code = code
state.details = details
if state.initial_metadata is None:
state.initial_metadata = ()
state.trailing_metadata = ()
def _handle_event(event, state, response_deserializer):
callbacks = []
for batch_operation in event.batch_operations:
operation_type = batch_operation.type()
state.due.remove(operation_type)
if operation_type == cygrpc.OperationType.receive_initial_metadata:
state.initial_metadata = batch_operation.initial_metadata()
elif operation_type == cygrpc.OperationType.receive_message:
serialized_response = batch_operation.message()
if serialized_response is not None:
response = _common.deserialize(serialized_response,
response_deserializer)
if response is None:
details = 'Exception deserializing response!'
_abort(state, grpc.StatusCode.INTERNAL, details)
else:
state.response = response
elif operation_type == cygrpc.OperationType.receive_status_on_client:
state.trailing_metadata = batch_operation.trailing_metadata()
if state.code is None:
code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
batch_operation.code())
if code is None:
state.code = grpc.StatusCode.UNKNOWN
state.details = _unknown_code_details(
code, batch_operation.details())
else:
state.code = code
state.details = batch_operation.details()
state.debug_error_string = batch_operation.error_string()
callbacks.extend(state.callbacks)
state.callbacks = None
return callbacks
def _event_handler(state, response_deserializer):
def handle_event(event):
with state.condition:
callbacks = _handle_event(event, state, response_deserializer)
state.condition.notify_all()
done = not state.due
for callback in callbacks:
try:
callback()
except Exception as e: # pylint: disable=broad-except
# NOTE(rbellevi): We suppress but log errors here so as not to
# kill the channel spin thread.
logging.error('Exception in callback %s: %s',
repr(callback.func), repr(e))
return done and state.fork_epoch >= cygrpc.get_fork_epoch()
return handle_event
#pylint: disable=too-many-statements
def _consume_request_iterator(request_iterator, state, call, request_serializer,
event_handler):
"""Consume a request iterator supplied by the user."""
def consume_request_iterator(): # pylint: disable=too-many-branches
# Iterate over the request iterator until it is exhausted or an error
# condition is encountered.
while True:
return_from_user_request_generator_invoked = False
try:
# The thread may die in user-code. Do not block fork for this.
cygrpc.enter_user_request_generator()
request = next(request_iterator)
except StopIteration:
break
except Exception: # pylint: disable=broad-except
cygrpc.return_from_user_request_generator()
return_from_user_request_generator_invoked = True
code = grpc.StatusCode.UNKNOWN
details = 'Exception iterating requests!'
_LOGGER.exception(details)
call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
details)
_abort(state, code, details)
return
finally:
if not return_from_user_request_generator_invoked:
cygrpc.return_from_user_request_generator()
serialized_request = _common.serialize(request, request_serializer)
with state.condition:
if state.code is None and not state.cancelled:
if serialized_request is None:
code = grpc.StatusCode.INTERNAL
details = 'Exception serializing request!'
call.cancel(
_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
details)
_abort(state, code, details)
return
else:
operations = (cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),)
operating = call.operate(operations, event_handler)
if operating:
state.due.add(cygrpc.OperationType.send_message)
else:
return
def _done():
return (state.code is not None or
cygrpc.OperationType.send_message not in
state.due)
_common.wait(state.condition.wait,
_done,
spin_cb=functools.partial(
cygrpc.block_if_fork_in_progress,
state))
if state.code is not None:
return
else:
return
with state.condition:
if state.code is None:
operations = (
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
operating = call.operate(operations, event_handler)
if operating:
state.due.add(cygrpc.OperationType.send_close_from_client)
consumption_thread = cygrpc.ForkManagedThread(
target=consume_request_iterator)
consumption_thread.setDaemon(True)
consumption_thread.start()
def _rpc_state_string(class_name, rpc_state):
"""Calculates error string for RPC."""
with rpc_state.condition:
if rpc_state.code is None:
return '<{} object>'.format(class_name)
elif rpc_state.code is grpc.StatusCode.OK:
return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
rpc_state.details)
else:
return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
class_name, rpc_state.code, rpc_state.details,
rpc_state.debug_error_string)
class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
"""An RPC error not tied to the execution of a particular RPC.
The RPC represented by the state object must not be in-progress or
cancelled.
Attributes:
_state: An instance of _RPCState.
"""
def __init__(self, state):
with state.condition:
self._state = _RPCState((), copy.deepcopy(state.initial_metadata),
copy.deepcopy(state.trailing_metadata),
state.code, copy.deepcopy(state.details))
self._state.response = copy.copy(state.response)
self._state.debug_error_string = copy.copy(state.debug_error_string)
def initial_metadata(self):
return self._state.initial_metadata
def trailing_metadata(self):
return self._state.trailing_metadata
def code(self):
return self._state.code
def details(self):
return _common.decode(self._state.details)
def debug_error_string(self):
return _common.decode(self._state.debug_error_string)
def _repr(self):
return _rpc_state_string(self.__class__.__name__, self._state)
def __repr__(self):
return self._repr()
def __str__(self):
return self._repr()
def cancel(self):
"""See grpc.Future.cancel."""
return False
def cancelled(self):
"""See grpc.Future.cancelled."""
return False
def running(self):
"""See grpc.Future.running."""
return False
def done(self):
"""See grpc.Future.done."""
return True
def result(self, timeout=None): # pylint: disable=unused-argument
"""See grpc.Future.result."""
raise self
def exception(self, timeout=None): # pylint: disable=unused-argument
"""See grpc.Future.exception."""
return self
def traceback(self, timeout=None): # pylint: disable=unused-argument
"""See grpc.Future.traceback."""
try:
raise self
except grpc.RpcError:
return sys.exc_info()[2]
Loading ...