Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import logging

import grpc
from grpc.aio import ClientCallDetails, Metadata

from opentelemetry.instrumentation.grpc._client import (
    OpenTelemetryClientInterceptor,
    _carrier_setter,
)
from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode

logger = logging.getLogger(__name__)


def _unary_done_callback(span, code, details, response_hook):
    def callback(call):
        try:
            span.set_attribute(
                SpanAttributes.RPC_GRPC_STATUS_CODE,
                code.value[0],
            )
            if code != grpc.StatusCode.OK:
                span.set_status(
                    Status(
                        status_code=StatusCode.ERROR,
                        description=details,
                    )
                )
            response_hook(span, details)

        finally:
            span.end()

    return callback


class _BaseAioClientInterceptor(OpenTelemetryClientInterceptor):
    @staticmethod
    def propagate_trace_in_details(client_call_details: ClientCallDetails):
        metadata = client_call_details.metadata
        if not metadata:
            mutable_metadata = Metadata()
        else:
            mutable_metadata = Metadata(*tuple(metadata))

        inject(mutable_metadata, setter=_carrier_setter)

        return ClientCallDetails(
            client_call_details.method,
            client_call_details.timeout,
            mutable_metadata,
            client_call_details.credentials,
            client_call_details.wait_for_ready,
        )

    @staticmethod
    def add_error_details_to_span(span, exc):
        if isinstance(exc, grpc.RpcError):
            span.set_attribute(
                SpanAttributes.RPC_GRPC_STATUS_CODE,
                exc.code().value[0],
            )
        span.set_status(
            Status(
                status_code=StatusCode.ERROR,
                description=f"{type(exc).__name__}: {exc}",
            )
        )
        span.record_exception(exc)

    def _start_interceptor_span(self, method):
        # method _should_ be a string here but due to a bug in grpc, it is
        # populated with a bytes object. Handle both cases such that we
        # are forward-compatible with a fixed version of grpc
        # More info: https://github.com/grpc/grpc/issues/31092
        if isinstance(method, bytes):
            method = method.decode()

        return self._start_span(
            method,
            end_on_exit=False,
            record_exception=False,
            set_status_on_exception=False,
        )

    async def _wrap_unary_response(self, continuation, span):
        try:
            call = await continuation()

            # code and details are both coroutines that need to be await-ed,
            # the callbacks added with add_done_callback do not allow async
            # code so we need to get the code and details here then pass them
            # to the callback.
            code = await call.code()
            details = await call.details()

            call.add_done_callback(
                _unary_done_callback(
                    span, code, details, self._call_response_hook
                )
            )

            return call
        except grpc.aio.AioRpcError as exc:
            self.add_error_details_to_span(span, exc)
            raise exc

    async def _wrap_stream_response(self, span, call):
        try:
            async for response in call:
                if self._response_hook:
                    self._call_response_hook(span, response)
                yield response
        except Exception as exc:
            self.add_error_details_to_span(span, exc)
            raise exc
        finally:
            span.end()

    def tracing_skipped(self, client_call_details):
        return (
            not is_instrumentation_enabled()
            or not self.rpc_matches_filters(client_call_details)
        )

    def rpc_matches_filters(self, client_call_details):
        return self._filter is None or self._filter(client_call_details)


class UnaryUnaryAioClientInterceptor(
    grpc.aio.UnaryUnaryClientInterceptor,
    _BaseAioClientInterceptor,
):
    async def intercept_unary_unary(
        self, continuation, client_call_details, request
    ):
        if self.tracing_skipped(client_call_details):
            return await continuation(client_call_details, request)

        with self._start_interceptor_span(
            client_call_details.method,
        ) as span:
            new_details = self.propagate_trace_in_details(client_call_details)

            if self._request_hook:
                self._call_request_hook(span, request)

            continuation_with_args = functools.partial(
                continuation, new_details, request
            )
            return await self._wrap_unary_response(
                continuation_with_args, span
            )


class UnaryStreamAioClientInterceptor(
    grpc.aio.UnaryStreamClientInterceptor,
    _BaseAioClientInterceptor,
):
    async def intercept_unary_stream(
        self, continuation, client_call_details, request
    ):
        if self.tracing_skipped(client_call_details):
            return await continuation(client_call_details, request)

        with self._start_interceptor_span(
            client_call_details.method,
        ) as span:
            new_details = self.propagate_trace_in_details(client_call_details)

            resp = await continuation(new_details, request)
            if self._request_hook:
                self._call_request_hook(span, request)
            return self._wrap_stream_response(span, resp)


class StreamUnaryAioClientInterceptor(
    grpc.aio.StreamUnaryClientInterceptor,
    _BaseAioClientInterceptor,
):
    async def intercept_stream_unary(
        self, continuation, client_call_details, request_iterator
    ):
        if self.tracing_skipped(client_call_details):
            return await continuation(client_call_details, request_iterator)

        with self._start_interceptor_span(
            client_call_details.method,
        ) as span:
            new_details = self.propagate_trace_in_details(client_call_details)

            continuation_with_args = functools.partial(
                continuation, new_details, request_iterator
            )
            return await self._wrap_unary_response(
                continuation_with_args, span
            )


class StreamStreamAioClientInterceptor(
    grpc.aio.StreamStreamClientInterceptor,
    _BaseAioClientInterceptor,
):
    async def intercept_stream_stream(
        self, continuation, client_call_details, request_iterator
    ):
        if self.tracing_skipped(client_call_details):
            return await continuation(client_call_details, request_iterator)

        with self._start_interceptor_span(
            client_call_details.method,
        ) as span:
            new_details = self.propagate_trace_in_details(client_call_details)

            resp = await continuation(new_details, request_iterator)

            return self._wrap_stream_response(span, resp)