Repository URL to install this package:
Version:
0.1.3 ▾
|
odigos
/
etc
/
odigos-vmagent
/
instrumentations
/
python-ebpf
/
ebpf_python_instrumentation
/
__init__.py
|
---|
# SPDX-FileCopyrightText: 2024-present Eden Federman <eden@keyval.dev>
#
# SPDX-License-Identifier: MIT
from initializer.components import initialize_components
from initializer.lib_handling import reorder_python_path
from opentelemetry.sdk._configuration import _BaseConfigurator
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.sdk.trace import SpanProcessor, Span, ReadableSpan
from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
Context,
attach,
detach,
set_value,
)
from typing import Optional
import pythonUSDT
import inspect
import traceback
import pathlib
# Silence OTel logger
import logging
_logger = logging.getLogger("opentelemetry.instrumentation.auto_instrumentation._load")
_logger.setLevel(logging.ERROR)
class EBPFSpanProcessor(SpanProcessor):
def __init__(self):
from opentelemetry.exporter.otlp.proto.common._internal.trace_encoder import (
_encode_resource_spans,
)
self._encode_resource_spans = _encode_resource_spans
def on_start(
self, span: Span, parent_context: Optional[Context] = None
) -> None:
self.add_source_code_attributes(span)
def on_end(self, span: ReadableSpan) -> None:
if not span.context.trace_flags.sampled:
return
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
data = self.span_to_byte_array(span)
pythonUSDT.end_span(data)
detach(token)
def add_source_code_attributes(self, span: ReadableSpan):
library_name = span.instrumentation_info.name
library_name = library_name.split('.')[-1]
try:
if library_name == 'flask':
import flask
handler_func = flask.current_app.view_functions[flask.request.url_rule.endpoint]
filename = inspect.getsourcefile(handler_func)
lineno = inspect.getsourcelines(handler_func)[1]
namespace = handler_func.__module__
funcname = handler_func.__name__
stacktrace = _get_stack_trace()
self._add_code_attributes(span, funcname, lineno, filename, stacktrace, namespace)
else:
self.add_source_code_attributes_from_stack(span)
except:
self.add_source_code_attributes_from_stack(span)
def add_source_code_attributes_from_stack(self, span: ReadableSpan):
stack = traceback.extract_stack()
lib_python_path = str(pathlib.Path(traceback.__file__).parent.resolve())
for frame in reversed(stack):
filename = frame.filename
if not filename.startswith('/var/odigos') and not filename.startswith(lib_python_path):
lineno = frame.lineno
funcname = frame.name
stacktrace = _get_stack_trace()
namespace = inspect.getmodule(frame).__name__ if inspect.getmodule(frame) else None
self._add_code_attributes(span, funcname, lineno, filename, stacktrace, namespace)
break
def _add_code_attributes(self, span: ReadableSpan, func_name, lineno, filepath, stacktrace, namespace):
span.set_attribute(SpanAttributes.CODE_FILEPATH, filepath)
span.set_attribute(SpanAttributes.CODE_FUNCTION, func_name)
span.set_attribute(SpanAttributes.CODE_LINENO, lineno)
span.set_attribute("code.stacktrace", stacktrace) ## code.stacktrace is not the senamtic convention yet
if namespace is not None:
span.set_attribute(SpanAttributes.CODE_NAMESPACE, namespace)
def span_to_byte_array(self, span: ReadableSpan) -> bytes:
pb2_spans = self._encode_resource_spans([span])
pb2_span = pb2_spans[0]
data = pb2_span.SerializeToString()
return data
def shutdown(self) -> None:
pass
def force_flush(self, timeout_millis: int = None) -> bool:
return True
def _get_stack_trace() -> str:
stack_trace = traceback.format_stack()
# When using the runner, the frames source will be from `/instrumentation/python*`
# If we run using the agent, the frames source will be from `/var/odigos/python*`
excluded_path = "/var/odigos/python"
# Filter stack trace frames that do NOT contain the excluded path
filtered_stack_trace = [
frame for frame in stack_trace
if excluded_path not in frame
]
return ''.join(filtered_stack_trace).strip()
class OpenTelemetryConfigurator(_BaseConfigurator):
def _configure(self, **kwargs):
# Reorder the python path to ensure customer site-packages are loaded first e.g protobuf
reorder_python_path()
initialize_components(span_processor=EBPFSpanProcessor())