Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / grpcio   python

Repository URL to install this package:

/ _channel.py

# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Invocation-side implementation of gRPC Python."""

import copy
import functools
import logging
import os
import sys
import threading
import time

import grpc
import grpc.experimental
from grpc import _compression
from grpc import _common
from grpc import _grpcio_metadata
from grpc._cython import cygrpc

_LOGGER = logging.getLogger(__name__)

_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)

_EMPTY_FLAGS = 0

# NOTE(rbellevi): No guarantees are given about the maintenance of this
# environment variable.
_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
    "GRPC_SINGLE_THREADED_UNARY_STREAM") is not None

_UNARY_UNARY_INITIAL_DUE = (
    cygrpc.OperationType.send_initial_metadata,
    cygrpc.OperationType.send_message,
    cygrpc.OperationType.send_close_from_client,
    cygrpc.OperationType.receive_initial_metadata,
    cygrpc.OperationType.receive_message,
    cygrpc.OperationType.receive_status_on_client,
)
_UNARY_STREAM_INITIAL_DUE = (
    cygrpc.OperationType.send_initial_metadata,
    cygrpc.OperationType.send_message,
    cygrpc.OperationType.send_close_from_client,
    cygrpc.OperationType.receive_initial_metadata,
    cygrpc.OperationType.receive_status_on_client,
)
_STREAM_UNARY_INITIAL_DUE = (
    cygrpc.OperationType.send_initial_metadata,
    cygrpc.OperationType.receive_initial_metadata,
    cygrpc.OperationType.receive_message,
    cygrpc.OperationType.receive_status_on_client,
)
_STREAM_STREAM_INITIAL_DUE = (
    cygrpc.OperationType.send_initial_metadata,
    cygrpc.OperationType.receive_initial_metadata,
    cygrpc.OperationType.receive_status_on_client,
)

_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
    'Exception calling channel subscription callback!')

_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
                              '\tstatus = {}\n'
                              '\tdetails = "{}"\n'
                              '>')

_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
                                  '\tstatus = {}\n'
                                  '\tdetails = "{}"\n'
                                  '\tdebug_error_string = "{}"\n'
                                  '>')


def _deadline(timeout):
    return None if timeout is None else time.time() + timeout


def _unknown_code_details(unknown_cygrpc_code, details):
    return 'Server sent unknown code {} and details "{}"'.format(
        unknown_cygrpc_code, details)


class _RPCState(object):

    def __init__(self, due, initial_metadata, trailing_metadata, code, details):
        self.condition = threading.Condition()
        # The cygrpc.OperationType objects representing events due from the RPC's
        # completion queue.
        self.due = set(due)
        self.initial_metadata = initial_metadata
        self.response = None
        self.trailing_metadata = trailing_metadata
        self.code = code
        self.details = details
        self.debug_error_string = None
        # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
        # slightly wonky, so they have to be tracked separately from the rest of the
        # result of the RPC. This field tracks whether cancellation was requested
        # prior to termination of the RPC.
        self.cancelled = False
        self.callbacks = []
        self.fork_epoch = cygrpc.get_fork_epoch()

    def reset_postfork_child(self):
        self.condition = threading.Condition()


def _abort(state, code, details):
    if state.code is None:
        state.code = code
        state.details = details
        if state.initial_metadata is None:
            state.initial_metadata = ()
        state.trailing_metadata = ()


def _handle_event(event, state, response_deserializer):
    callbacks = []
    for batch_operation in event.batch_operations:
        operation_type = batch_operation.type()
        state.due.remove(operation_type)
        if operation_type == cygrpc.OperationType.receive_initial_metadata:
            state.initial_metadata = batch_operation.initial_metadata()
        elif operation_type == cygrpc.OperationType.receive_message:
            serialized_response = batch_operation.message()
            if serialized_response is not None:
                response = _common.deserialize(serialized_response,
                                               response_deserializer)
                if response is None:
                    details = 'Exception deserializing response!'
                    _abort(state, grpc.StatusCode.INTERNAL, details)
                else:
                    state.response = response
        elif operation_type == cygrpc.OperationType.receive_status_on_client:
            state.trailing_metadata = batch_operation.trailing_metadata()
            if state.code is None:
                code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
                    batch_operation.code())
                if code is None:
                    state.code = grpc.StatusCode.UNKNOWN
                    state.details = _unknown_code_details(
                        code, batch_operation.details())
                else:
                    state.code = code
                    state.details = batch_operation.details()
                    state.debug_error_string = batch_operation.error_string()
            callbacks.extend(state.callbacks)
            state.callbacks = None
    return callbacks


def _event_handler(state, response_deserializer):

    def handle_event(event):
        with state.condition:
            callbacks = _handle_event(event, state, response_deserializer)
            state.condition.notify_all()
            done = not state.due
        for callback in callbacks:
            try:
                callback()
            except Exception as e:  # pylint: disable=broad-except
                # NOTE(rbellevi): We suppress but log errors here so as not to
                # kill the channel spin thread.
                logging.error('Exception in callback %s: %s',
                              repr(callback.func), repr(e))
        return done and state.fork_epoch >= cygrpc.get_fork_epoch()

    return handle_event


#pylint: disable=too-many-statements
def _consume_request_iterator(request_iterator, state, call, request_serializer,
                              event_handler):
    """Consume a request iterator supplied by the user."""

    def consume_request_iterator():  # pylint: disable=too-many-branches
        # Iterate over the request iterator until it is exhausted or an error
        # condition is encountered.
        while True:
            return_from_user_request_generator_invoked = False
            try:
                # The thread may die in user-code. Do not block fork for this.
                cygrpc.enter_user_request_generator()
                request = next(request_iterator)
            except StopIteration:
                break
            except Exception:  # pylint: disable=broad-except
                cygrpc.return_from_user_request_generator()
                return_from_user_request_generator_invoked = True
                code = grpc.StatusCode.UNKNOWN
                details = 'Exception iterating requests!'
                _LOGGER.exception(details)
                call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
                            details)
                _abort(state, code, details)
                return
            finally:
                if not return_from_user_request_generator_invoked:
                    cygrpc.return_from_user_request_generator()
            serialized_request = _common.serialize(request, request_serializer)
            with state.condition:
                if state.code is None and not state.cancelled:
                    if serialized_request is None:
                        code = grpc.StatusCode.INTERNAL
                        details = 'Exception serializing request!'
                        call.cancel(
                            _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
                            details)
                        _abort(state, code, details)
                        return
                    else:
                        operations = (cygrpc.SendMessageOperation(
                            serialized_request, _EMPTY_FLAGS),)
                        operating = call.operate(operations, event_handler)
                        if operating:
                            state.due.add(cygrpc.OperationType.send_message)
                        else:
                            return

                        def _done():
                            return (state.code is not None or
                                    cygrpc.OperationType.send_message not in
                                    state.due)

                        _common.wait(state.condition.wait,
                                     _done,
                                     spin_cb=functools.partial(
                                         cygrpc.block_if_fork_in_progress,
                                         state))
                        if state.code is not None:
                            return
                else:
                    return
        with state.condition:
            if state.code is None:
                operations = (
                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
                operating = call.operate(operations, event_handler)
                if operating:
                    state.due.add(cygrpc.OperationType.send_close_from_client)

    consumption_thread = cygrpc.ForkManagedThread(
        target=consume_request_iterator)
    consumption_thread.setDaemon(True)
    consumption_thread.start()


def _rpc_state_string(class_name, rpc_state):
    """Calculates error string for RPC."""
    with rpc_state.condition:
        if rpc_state.code is None:
            return '<{} object>'.format(class_name)
        elif rpc_state.code is grpc.StatusCode.OK:
            return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
                                                     rpc_state.details)
        else:
            return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
                class_name, rpc_state.code, rpc_state.details,
                rpc_state.debug_error_string)


class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
    """An RPC error not tied to the execution of a particular RPC.

    The RPC represented by the state object must not be in-progress or
    cancelled.

    Attributes:
      _state: An instance of _RPCState.
    """

    def __init__(self, state):
        with state.condition:
            self._state = _RPCState((), copy.deepcopy(state.initial_metadata),
                                    copy.deepcopy(state.trailing_metadata),
                                    state.code, copy.deepcopy(state.details))
            self._state.response = copy.copy(state.response)
            self._state.debug_error_string = copy.copy(state.debug_error_string)

    def initial_metadata(self):
        return self._state.initial_metadata

    def trailing_metadata(self):
        return self._state.trailing_metadata

    def code(self):
        return self._state.code

    def details(self):
        return _common.decode(self._state.details)

    def debug_error_string(self):
        return _common.decode(self._state.debug_error_string)

    def _repr(self):
        return _rpc_state_string(self.__class__.__name__, self._state)

    def __repr__(self):
        return self._repr()

    def __str__(self):
        return self._repr()

    def cancel(self):
        """See grpc.Future.cancel."""
        return False

    def cancelled(self):
        """See grpc.Future.cancelled."""
        return False

    def running(self):
        """See grpc.Future.running."""
        return False

    def done(self):
        """See grpc.Future.done."""
        return True

    def result(self, timeout=None):  # pylint: disable=unused-argument
        """See grpc.Future.result."""
        raise self

    def exception(self, timeout=None):  # pylint: disable=unused-argument
        """See grpc.Future.exception."""
        return self

    def traceback(self, timeout=None):  # pylint: disable=unused-argument
        """See grpc.Future.traceback."""
        try:
            raise self
        except grpc.RpcError:
            return sys.exc_info()[2]
Loading ...