Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# SPDX-FileCopyrightText: 2024-present Eden Federman <eden@keyval.dev>
#
# SPDX-License-Identifier: MIT
from initializer.components import initialize_components
from initializer.lib_handling import reorder_python_path

from opentelemetry.sdk._configuration import _BaseConfigurator
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.sdk.trace import SpanProcessor, Span, ReadableSpan
from opentelemetry.context import (
    _SUPPRESS_INSTRUMENTATION_KEY,
    Context,
    attach,
    detach,
    set_value,
)


from typing import Optional
import pythonUSDT
import inspect
import traceback
import pathlib


# Silence OTel logger
import logging
_logger = logging.getLogger("opentelemetry.instrumentation.auto_instrumentation._load")
_logger.setLevel(logging.ERROR)


class EBPFSpanProcessor(SpanProcessor):
    
    def __init__(self):
        from opentelemetry.exporter.otlp.proto.common._internal.trace_encoder import (
            _encode_resource_spans,
        )
        self._encode_resource_spans = _encode_resource_spans
        
    def on_start(
        self, span: Span, parent_context: Optional[Context] = None
    ) -> None:
        self.add_source_code_attributes(span)

    def on_end(self, span: ReadableSpan) -> None:
        if not span.context.trace_flags.sampled:
            return
        token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
        data = self.span_to_byte_array(span)
        pythonUSDT.end_span(data)
        detach(token)

    def add_source_code_attributes(self, span: ReadableSpan):
        library_name = span.instrumentation_info.name
        library_name = library_name.split('.')[-1]
        try:
            if library_name == 'flask':
                import flask
                handler_func = flask.current_app.view_functions[flask.request.url_rule.endpoint]
                filename = inspect.getsourcefile(handler_func)
                lineno = inspect.getsourcelines(handler_func)[1]
                namespace = handler_func.__module__
                funcname = handler_func.__name__
                stacktrace = _get_stack_trace()
                self._add_code_attributes(span, funcname, lineno, filename, stacktrace, namespace)
            else:
                self.add_source_code_attributes_from_stack(span)
        except:
            self.add_source_code_attributes_from_stack(span)

    def add_source_code_attributes_from_stack(self, span: ReadableSpan):
        stack = traceback.extract_stack()
        lib_python_path = str(pathlib.Path(traceback.__file__).parent.resolve())
        for frame in reversed(stack):
            filename = frame.filename
            if not filename.startswith('/var/odigos') and not filename.startswith(lib_python_path):
                lineno = frame.lineno
                funcname = frame.name
                stacktrace = _get_stack_trace()
                namespace = inspect.getmodule(frame).__name__ if inspect.getmodule(frame) else None
                self._add_code_attributes(span, funcname, lineno, filename, stacktrace, namespace)
                break

    def _add_code_attributes(self, span: ReadableSpan, func_name, lineno, filepath, stacktrace, namespace):
        span.set_attribute(SpanAttributes.CODE_FILEPATH, filepath)
        span.set_attribute(SpanAttributes.CODE_FUNCTION, func_name)
        span.set_attribute(SpanAttributes.CODE_LINENO, lineno)
        span.set_attribute("code.stacktrace", stacktrace) ## code.stacktrace is not the senamtic convention yet
        
        if namespace is not None:
            span.set_attribute(SpanAttributes.CODE_NAMESPACE, namespace)        

    def span_to_byte_array(self, span: ReadableSpan) -> bytes:
        pb2_spans = self._encode_resource_spans([span])
        pb2_span = pb2_spans[0]
        data = pb2_span.SerializeToString()
        return data

    def shutdown(self) -> None:
        pass

    def force_flush(self, timeout_millis: int = None) -> bool:
        return True

def _get_stack_trace() -> str:
    stack_trace = traceback.format_stack()
    
    # When using the runner, the frames source will be from `/instrumentation/python*`
    # If we run using the agent, the frames source will be from `/var/odigos/python*`
    excluded_path = "/var/odigos/python"
    
    # Filter stack trace frames that do NOT contain the excluded path
    filtered_stack_trace = [
        frame for frame in stack_trace
        if excluded_path not in frame
    ]
    
    return ''.join(filtered_stack_trace).strip()

class OpenTelemetryConfigurator(_BaseConfigurator):
    def _configure(self, **kwargs):
        # Reorder the python path to ensure customer site-packages are loaded first e.g protobuf
        reorder_python_path()
        initialize_components(span_processor=EBPFSpanProcessor())