Repository URL to install this package:
| 
      
        
        
        Version: 
        
         
          
          0.1.8  ▾
        
         | 
| 
    
    odigos
  
    /
        
    etc
  
        /
        
    odigos-vmagent
  
        /
        
    instrumentations
  
        /
        
    python-ebpf
  
        /
        
    ebpf_python_instrumentation
  
        /
        __init__.py
   | 
|---|
# SPDX-FileCopyrightText: 2024-present Eden Federman <eden@keyval.dev>
#
# SPDX-License-Identifier: MIT
from initializer.components import initialize_components
from initializer.lib_handling import reorder_python_path
from opentelemetry.sdk._configuration import _BaseConfigurator
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.sdk.trace import SpanProcessor, Span, ReadableSpan
from opentelemetry.context import (
    _SUPPRESS_INSTRUMENTATION_KEY,
    Context,
    attach,
    detach,
    set_value,
)
from typing import Optional
import pythonUSDT
import inspect
import traceback
import pathlib
# Silence OTel logger
import logging
_logger = logging.getLogger("opentelemetry.instrumentation.auto_instrumentation._load")
_logger.setLevel(logging.ERROR)
class EBPFSpanProcessor(SpanProcessor):
    
    def __init__(self):
        from opentelemetry.exporter.otlp.proto.common._internal.trace_encoder import (
            _encode_resource_spans,
        )
        self._encode_resource_spans = _encode_resource_spans
        
    def on_start(
        self, span: Span, parent_context: Optional[Context] = None
    ) -> None:
        self.add_source_code_attributes(span)
    def on_end(self, span: ReadableSpan) -> None:
        if not span.context.trace_flags.sampled:
            return
        token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
        data = self.span_to_byte_array(span)
        pythonUSDT.end_span(data)
        detach(token)
    def add_source_code_attributes(self, span: ReadableSpan):
        library_name = span.instrumentation_info.name
        library_name = library_name.split('.')[-1]
        try:
            if library_name == 'flask':
                import flask
                handler_func = flask.current_app.view_functions[flask.request.url_rule.endpoint]
                filename = inspect.getsourcefile(handler_func)
                lineno = inspect.getsourcelines(handler_func)[1]
                namespace = handler_func.__module__
                funcname = handler_func.__name__
                stacktrace = _get_stack_trace()
                self._add_code_attributes(span, funcname, lineno, filename, stacktrace, namespace)
            else:
                self.add_source_code_attributes_from_stack(span)
        except:
            self.add_source_code_attributes_from_stack(span)
    def add_source_code_attributes_from_stack(self, span: ReadableSpan):
        stack = traceback.extract_stack()
        lib_python_path = str(pathlib.Path(traceback.__file__).parent.resolve())
        for frame in reversed(stack):
            filename = frame.filename
            if not filename.startswith('/var/odigos') and not filename.startswith('/etc/odigos-vmagent') and not filename.startswith(lib_python_path):
                lineno = frame.lineno
                funcname = frame.name
                stacktrace = _get_stack_trace()
                namespace = inspect.getmodule(frame).__name__ if inspect.getmodule(frame) else None
                self._add_code_attributes(span, funcname, lineno, filename, stacktrace, namespace)
                break
    def _add_code_attributes(self, span: ReadableSpan, func_name, lineno, filepath, stacktrace, namespace):
        span.set_attribute(SpanAttributes.CODE_FILEPATH, filepath)
        span.set_attribute(SpanAttributes.CODE_FUNCTION, func_name)
        span.set_attribute(SpanAttributes.CODE_LINENO, lineno)
        span.set_attribute("code.stacktrace", stacktrace) ## code.stacktrace is not the senamtic convention yet
        
        if namespace is not None:
            span.set_attribute(SpanAttributes.CODE_NAMESPACE, namespace)        
    def span_to_byte_array(self, span: ReadableSpan) -> bytes:
        pb2_spans = self._encode_resource_spans([span])
        pb2_span = pb2_spans[0]
        data = pb2_span.SerializeToString()
        return data
    def shutdown(self) -> None:
        pass
    def force_flush(self, timeout_millis: int = None) -> bool:
        return True
def _get_stack_trace() -> str:
    stack_trace = traceback.format_stack()
    
    # When using the runner, the frames source will be from `/instrumentation/python*`
    # If we run using the agent, the frames source will be from `/var/odigos/python*` or `/etc/odigos-vmagent/instrumentations*`
    excluded_paths = ["/var/odigos/python", "/etc/odigos-vmagent/instrumentations"]
    
    # Filter stack trace frames that do NOT contain any of the excluded paths
    filtered_stack_trace = [
        frame for frame in stack_trace
        if not any(excluded_path in frame for excluded_path in excluded_paths)
    ]
    
    return ''.join(filtered_stack_trace).strip()
class OpenTelemetryConfigurator(_BaseConfigurator):
    def _configure(self, **kwargs):
        # Reorder the python path to ensure customer site-packages are loaded first e.g protobuf
        reorder_python_path()
        initialize_components(span_processor=EBPFSpanProcessor())