Repository URL to install this package:
Version:
0.1.2 ▾
|
odigos
/
etc
/
odigos-vmagent
/
instrumentations
/
python
/
opentelemetry
/
instrumentation
/
confluent_kafka
/
utils.py
|
---|
from logging import getLogger
from typing import List, Optional
from opentelemetry import context, propagate
from opentelemetry.propagators import textmap
from opentelemetry.semconv.trace import (
MessagingDestinationKindValues,
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Link, SpanKind
_LOG = getLogger(__name__)
class KafkaPropertiesExtractor:
@staticmethod
def extract_bootstrap_servers(instance):
return instance.config.get("bootstrap_servers")
@staticmethod
def _extract_argument(key, position, default_value, args, kwargs):
if len(args) > position:
return args[position]
return kwargs.get(key, default_value)
@staticmethod
def extract_produce_topic(args):
"""extract topic from `produce` method arguments in Producer class"""
if len(args) > 0:
return args[0]
return "unknown"
@staticmethod
def extract_produce_headers(args, kwargs):
"""extract headers from `produce` method arguments in Producer class"""
return KafkaPropertiesExtractor._extract_argument(
"headers", 6, None, args, kwargs
)
class KafkaContextGetter(textmap.Getter):
def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]:
if carrier is None:
return None
carrier_items = carrier
if isinstance(carrier, dict):
carrier_items = carrier.items()
for item_key, value in carrier_items:
if item_key == key:
if value is not None:
return [value.decode()]
return None
def keys(self, carrier: textmap.CarrierT) -> List[str]:
if carrier is None:
return []
carrier_items = carrier
if isinstance(carrier, dict):
carrier_items = carrier.items()
return [key for (key, value) in carrier_items]
class KafkaContextSetter(textmap.Setter):
def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
if carrier is None or key is None:
return
if value:
value = value.encode()
if isinstance(carrier, list):
carrier.append((key, value))
if isinstance(carrier, dict):
carrier[key] = value
_kafka_getter = KafkaContextGetter()
def _end_current_consume_span(instance):
context.detach(instance._current_context_token)
instance._current_context_token = None
instance._current_consume_span.end()
instance._current_consume_span = None
def _create_new_consume_span(instance, tracer, records):
links = _get_links_from_records(records)
instance._current_consume_span = tracer.start_span(
name=f"{records[0].topic()} process",
links=links,
kind=SpanKind.CONSUMER,
)
def _get_links_from_records(records):
links = []
for record in records:
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
if ctx:
for item in ctx.values():
if hasattr(item, "get_span_context"):
links.append(Link(context=item.get_span_context()))
return links
def _enrich_span(
span,
topic,
partition: Optional[int] = None,
offset: Optional[int] = None,
operation: Optional[MessagingOperationValues] = None,
):
if not span.is_recording():
return
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic)
if partition is not None:
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition)
span.set_attribute(
SpanAttributes.MESSAGING_DESTINATION_KIND,
MessagingDestinationKindValues.QUEUE.value,
)
if operation:
span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value)
else:
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
if partition is not None and offset is not None and topic:
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
f"{topic}.{partition}.{offset}",
)
_kafka_setter = KafkaContextSetter()
def _get_span_name(operation: str, topic: str):
return f"{topic} {operation}"