Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / grpcio   python

Repository URL to install this package:

Version: 1.29.0 

/ _interceptor.py

# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of gRPC Python interceptors."""

import collections
import sys

import grpc


class _ServicePipeline(object):

    def __init__(self, interceptors):
        self.interceptors = tuple(interceptors)

    def _continuation(self, thunk, index):
        return lambda context: self._intercept_at(thunk, index, context)

    def _intercept_at(self, thunk, index, context):
        if index < len(self.interceptors):
            interceptor = self.interceptors[index]
            thunk = self._continuation(thunk, index + 1)
            return interceptor.intercept_service(thunk, context)
        else:
            return thunk(context)

    def execute(self, thunk, context):
        return self._intercept_at(thunk, 0, context)


def service_pipeline(interceptors):
    return _ServicePipeline(interceptors) if interceptors else None


class _ClientCallDetails(
        collections.namedtuple('_ClientCallDetails',
                               ('method', 'timeout', 'metadata', 'credentials',
                                'wait_for_ready', 'compression')),
        grpc.ClientCallDetails):
    pass


def _unwrap_client_call_details(call_details, default_details):
    try:
        method = call_details.method
    except AttributeError:
        method = default_details.method

    try:
        timeout = call_details.timeout
    except AttributeError:
        timeout = default_details.timeout

    try:
        metadata = call_details.metadata
    except AttributeError:
        metadata = default_details.metadata

    try:
        credentials = call_details.credentials
    except AttributeError:
        credentials = default_details.credentials

    try:
        wait_for_ready = call_details.wait_for_ready
    except AttributeError:
        wait_for_ready = default_details.wait_for_ready

    try:
        compression = call_details.compression
    except AttributeError:
        compression = default_details.compression

    return method, timeout, metadata, credentials, wait_for_ready, compression


class _FailureOutcome(grpc.RpcError, grpc.Future, grpc.Call):  # pylint: disable=too-many-ancestors

    def __init__(self, exception, traceback):
        super(_FailureOutcome, self).__init__()
        self._exception = exception
        self._traceback = traceback

    def initial_metadata(self):
        return None

    def trailing_metadata(self):
        return None

    def code(self):
        return grpc.StatusCode.INTERNAL

    def details(self):
        return 'Exception raised while intercepting the RPC'

    def cancel(self):
        return False

    def cancelled(self):
        return False

    def is_active(self):
        return False

    def time_remaining(self):
        return None

    def running(self):
        return False

    def done(self):
        return True

    def result(self, ignored_timeout=None):
        raise self._exception

    def exception(self, ignored_timeout=None):
        return self._exception

    def traceback(self, ignored_timeout=None):
        return self._traceback

    def add_callback(self, unused_callback):
        return False

    def add_done_callback(self, fn):
        fn(self)

    def __iter__(self):
        return self

    def __next__(self):
        raise self._exception

    def next(self):
        return self.__next__()


class _UnaryOutcome(grpc.Call, grpc.Future):

    def __init__(self, response, call):
        self._response = response
        self._call = call

    def initial_metadata(self):
        return self._call.initial_metadata()

    def trailing_metadata(self):
        return self._call.trailing_metadata()

    def code(self):
        return self._call.code()

    def details(self):
        return self._call.details()

    def is_active(self):
        return self._call.is_active()

    def time_remaining(self):
        return self._call.time_remaining()

    def cancel(self):
        return self._call.cancel()

    def add_callback(self, callback):
        return self._call.add_callback(callback)

    def cancelled(self):
        return False

    def running(self):
        return False

    def done(self):
        return True

    def result(self, ignored_timeout=None):
        return self._response

    def exception(self, ignored_timeout=None):
        return None

    def traceback(self, ignored_timeout=None):
        return None

    def add_done_callback(self, fn):
        fn(self)


class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):

    def __init__(self, thunk, method, interceptor):
        self._thunk = thunk
        self._method = method
        self._interceptor = interceptor

    def __call__(self,
                 request,
                 timeout=None,
                 metadata=None,
                 credentials=None,
                 wait_for_ready=None,
                 compression=None):
        response, ignored_call = self._with_call(request,
                                                 timeout=timeout,
                                                 metadata=metadata,
                                                 credentials=credentials,
                                                 wait_for_ready=wait_for_ready,
                                                 compression=compression)
        return response

    def _with_call(self,
                   request,
                   timeout=None,
                   metadata=None,
                   credentials=None,
                   wait_for_ready=None,
                   compression=None):
        client_call_details = _ClientCallDetails(self._method, timeout,
                                                 metadata, credentials,
                                                 wait_for_ready, compression)

        def continuation(new_details, request):
            (new_method, new_timeout, new_metadata, new_credentials,
             new_wait_for_ready,
             new_compression) = (_unwrap_client_call_details(
                 new_details, client_call_details))
            try:
                response, call = self._thunk(new_method).with_call(
                    request,
                    timeout=new_timeout,
                    metadata=new_metadata,
                    credentials=new_credentials,
                    wait_for_ready=new_wait_for_ready,
                    compression=new_compression)
                return _UnaryOutcome(response, call)
            except grpc.RpcError as rpc_error:
                return rpc_error
            except Exception as exception:  # pylint:disable=broad-except
                return _FailureOutcome(exception, sys.exc_info()[2])

        call = self._interceptor.intercept_unary_unary(continuation,
                                                       client_call_details,
                                                       request)
        return call.result(), call

    def with_call(self,
                  request,
                  timeout=None,
                  metadata=None,
                  credentials=None,
                  wait_for_ready=None,
                  compression=None):
        return self._with_call(request,
                               timeout=timeout,
                               metadata=metadata,
                               credentials=credentials,
                               wait_for_ready=wait_for_ready,
                               compression=compression)

    def future(self,
               request,
               timeout=None,
               metadata=None,
               credentials=None,
               wait_for_ready=None,
               compression=None):
        client_call_details = _ClientCallDetails(self._method, timeout,
                                                 metadata, credentials,
                                                 wait_for_ready, compression)

        def continuation(new_details, request):
            (new_method, new_timeout, new_metadata, new_credentials,
             new_wait_for_ready,
             new_compression) = (_unwrap_client_call_details(
                 new_details, client_call_details))
            return self._thunk(new_method).future(
                request,
                timeout=new_timeout,
                metadata=new_metadata,
                credentials=new_credentials,
                wait_for_ready=new_wait_for_ready,
                compression=new_compression)

        try:
            return self._interceptor.intercept_unary_unary(
                continuation, client_call_details, request)
        except Exception as exception:  # pylint:disable=broad-except
            return _FailureOutcome(exception, sys.exc_info()[2])


class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):

    def __init__(self, thunk, method, interceptor):
        self._thunk = thunk
        self._method = method
        self._interceptor = interceptor

    def __call__(self,
                 request,
                 timeout=None,
                 metadata=None,
                 credentials=None,
                 wait_for_ready=None,
                 compression=None):
        client_call_details = _ClientCallDetails(self._method, timeout,
                                                 metadata, credentials,
                                                 wait_for_ready, compression)

        def continuation(new_details, request):
            (new_method, new_timeout, new_metadata, new_credentials,
             new_wait_for_ready,
             new_compression) = (_unwrap_client_call_details(
                 new_details, client_call_details))
            return self._thunk(new_method)(request,
                                           timeout=new_timeout,
                                           metadata=new_metadata,
                                           credentials=new_credentials,
                                           wait_for_ready=new_wait_for_ready,
                                           compression=new_compression)

        try:
            return self._interceptor.intercept_unary_stream(
                continuation, client_call_details, request)
        except Exception as exception:  # pylint:disable=broad-except
            return _FailureOutcome(exception, sys.exc_info()[2])


class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):

    def __init__(self, thunk, method, interceptor):
        self._thunk = thunk
        self._method = method
Loading ...