Repository URL to install this package:
Version:
0.1.2 ▾
|
import os
import sys
import time
import threading
import requests_odigos
import logging
from uuid_extensions import uuid7
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.context import (
_SUPPRESS_HTTP_INSTRUMENTATION_KEY,
attach,
detach,
set_value,
)
from opamp import opamp_pb2, anyvalue_pb2, utils
from opamp.health_status import AgentHealthStatus
# Setup the logger
opamp_logger = logging.getLogger('odigos')
class OpAMPHTTPClient:
def __init__(self, event, condition: threading.Condition):
self.server_host = os.getenv('ODIGOS_OPAMP_SERVER_HOST')
self.instrumentation_device_id = os.getenv('ODIGOS_INSTRUMENTATION_DEVICE_ID')
self.server_url = f"http://{self.server_host}/v1/opamp"
self.resource_attributes = {}
self.running = True
self.condition = condition
self.event = event
self.next_sequence_num = 0
self.instance_uid = uuid7().__str__()
self.remote_config_status = None
self.sampler = None # OdigosSampler instance
def start(self, python_version_supported: bool = None):
if not python_version_supported:
python_version = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}'
error_message = f"Opentelemetry SDK require Python in version 3.8 or higher [{python_version} is not supported]"
# opamp_logger.warning(f"{error_message}, sending disconnect message to OpAMP server...")
self.send_unsupported_version_disconnect_message(error_message=error_message)
self.event.set()
return
self.client_thread = threading.Thread(target=self.run, name="OpAMPClientThread", daemon=True)
self.client_thread.start()
def run(self):
try:
if not self.mandatory_env_vars_set():
self.event.set()
return
self.send_first_message_with_retry()
self.event.set()
self.worker()
except Exception as e:
# opamp_logger.error(f"Error running OpAMP client: {e}")
failure_message = self.get_agent_failure_disconnect_message(error_message=str(e))
self.send_agent_to_server_message(failure_message)
# Exiting the opamp thread and set the event to notify the main thread
self.event.set()
sys.exit()
def get_agent_failure_disconnect_message(self, error_message: str) -> None:
agent_failure_message = opamp_pb2.AgentToServer()
agent_disconnect = self.get_agent_disconnect()
agent_failure_message.agent_disconnect.CopyFrom(agent_disconnect)
agent_health = self.get_agent_health(component_health=False, last_error=error_message, status=AgentHealthStatus.AGENT_FAILURE.value)
agent_failure_message.health.CopyFrom(agent_health)
return agent_failure_message
def send_unsupported_version_disconnect_message(self, error_message: str) -> None:
first_disconnect_message = opamp_pb2.AgentToServer()
agent_description = self.get_agent_description()
first_disconnect_message.agent_description.CopyFrom(agent_description)
agent_disconnect = self.get_agent_disconnect()
first_disconnect_message.agent_disconnect.CopyFrom(agent_disconnect)
agent_health = self.get_agent_health(component_health=False, last_error=error_message, status=AgentHealthStatus.UNSUPPORTED_RUNTIME_VERSION.value)
first_disconnect_message.health.CopyFrom(agent_health)
self.send_agent_to_server_message(first_disconnect_message)
def send_first_message_with_retry(self) -> None:
max_retries = 5
delay = 2
for attempt in range(1, max_retries + 1):
try:
# Send first message to OpAMP server, Health is false as the component is not initialized
agent_health = self.get_agent_health(component_health=False, last_error="Python OpenTelemetry agent is starting", status=AgentHealthStatus.STARTING.value)
agent_description = self.get_agent_description()
first_message_server_to_agent = self.send_agent_to_server_message(opamp_pb2.AgentToServer(agent_description=agent_description, health=agent_health))
# Check if the response of the first message is empty
# It may happen if OpAMPServer is not available
if first_message_server_to_agent.ListFields():
self.update_remote_config_status(first_message_server_to_agent)
self.resource_attributes = utils.parse_first_message_to_resource_attributes(first_message_server_to_agent, opamp_logger)
# Send healthy message to OpAMP server
# opamp_logger.info("Reporting healthy to OpAMP server...")
agent_health = self.get_agent_health(component_health=True, status=AgentHealthStatus.HEALTHY.value)
self.send_agent_to_server_message(opamp_pb2.AgentToServer(health=agent_health))
# Return if the first message was successfully sent
return
except Exception as e:
# opamp_logger.error(f"Attempt {attempt}/{max_retries} failed. Error sending full state to OpAMP server: {e}")
pass
if attempt < max_retries:
time.sleep(delay)
# If all attempts failed, raise exception before starting the worker
raise Exception(f"Error sending first message to OpAMP server after {max_retries} attempts")
def worker(self):
while self.running:
with self.condition:
try:
server_to_agent = self.send_heartbeat()
if self.update_remote_config_status(server_to_agent):
# opamp_logger.info("Remote config updated, applying changes...")
pass
# TODO: implement changes based on the remote config
if server_to_agent.flags & opamp_pb2.ServerToAgentFlags_ReportFullState:
# opamp_logger.info("Received request to report full state")
agent_description = self.get_agent_description()
agent_health = self.get_agent_health(component_health=True, status=AgentHealthStatus.HEALTHY.value)
agent_to_server = opamp_pb2.AgentToServer(agent_description=agent_description, health=agent_health)
server_to_agent = self.send_agent_to_server_message(agent_to_server)
self.update_remote_config_status(server_to_agent)
except requests_odigos.RequestException as e:
# opamp_logger.error(f"Error fetching data: {e}")
pass
self.condition.wait(30)
def send_heartbeat(self) -> opamp_pb2.ServerToAgent: # type: ignore
# opamp_logger.debug("Sending heartbeat to OpAMP server...")
try:
agent_to_server = opamp_pb2.AgentToServer(remote_config_status=self.remote_config_status)
return self.send_agent_to_server_message(agent_to_server)
except requests_odigos.RequestException as e:
# opamp_logger.error(f"Error sending heartbeat to OpAMP server: {e}")
pass
def get_agent_description(self) -> opamp_pb2.AgentDescription: # type: ignore
identifying_attributes = [
anyvalue_pb2.KeyValue(
key=ResourceAttributes.SERVICE_INSTANCE_ID,
value=anyvalue_pb2.AnyValue(string_value=self.instance_uid)
),
anyvalue_pb2.KeyValue(
key=ResourceAttributes.PROCESS_PID,
value=anyvalue_pb2.AnyValue(int_value=os.getpid())
),
anyvalue_pb2.KeyValue(
key=ResourceAttributes.TELEMETRY_SDK_LANGUAGE,
value=anyvalue_pb2.AnyValue(string_value="python")
)
]
return opamp_pb2.AgentDescription(
identifying_attributes=identifying_attributes,
non_identifying_attributes=[]
)
def get_agent_disconnect(self) -> opamp_pb2.AgentDisconnect: # type: ignore
return opamp_pb2.AgentDisconnect()
def get_agent_health(self, component_health: bool = None, last_error : str = None, status: str = None) -> opamp_pb2.ComponentHealth: # type: ignore
health = opamp_pb2.ComponentHealth(
)
if component_health is not None:
health.healthy = component_health
if last_error is not None:
health.last_error = last_error
if status is not None:
health.status = status
return health
def send_agent_to_server_message(self, message: opamp_pb2.AgentToServer) -> opamp_pb2.ServerToAgent: # type: ignore
message.instance_uid = self.instance_uid.encode('utf-8')
message.sequence_num = self.next_sequence_num
if self.remote_config_status:
message.remote_config_status.CopyFrom(self.remote_config_status)
self.next_sequence_num += 1
message_bytes = message.SerializeToString()
headers = {
"Content-Type": "application/x-protobuf",
"X-Odigos-DeviceId": self.instrumentation_device_id
}
try:
agent_message = attach(set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True))
response = requests_odigos.post(self.server_url, data=message_bytes, headers=headers, timeout=5)
response.raise_for_status()
except requests_odigos.Timeout:
# opamp_logger.error("Timeout sending message to OpAMP server")
return opamp_pb2.ServerToAgent()
except requests_odigos.ConnectionError as e:
# opamp_logger.error(f"Error sending message to OpAMP server: {e}")
return opamp_pb2.ServerToAgent()
finally:
detach(agent_message)
server_to_agent = opamp_pb2.ServerToAgent()
try:
server_to_agent.ParseFromString(response.content)
except NotImplementedError as e:
# opamp_logger.error(f"Error parsing response from OpAMP server: {e}")
return opamp_pb2.ServerToAgent()
return server_to_agent
def mandatory_env_vars_set(self):
mandatory_env_vars = {
"ODIGOS_OPAMP_SERVER_HOST": self.server_host,
"ODIGOS_INSTRUMENTATION_DEVICE_ID": self.instrumentation_device_id
}
for env_var, value in mandatory_env_vars.items():
if not value:
# opamp_logger.error(f"{env_var} environment variable not set")
return False
return True
def shutdown(self, custom_failure_message: str = None):
self.running = False
# opamp_logger.info("Sending agent disconnect message to OpAMP server...")
if custom_failure_message:
disconnect_message = self.get_agent_failure_disconnect_message(error_message=custom_failure_message)
else:
agent_health = self.get_agent_health(component_health=False, last_error="Python runtime is exiting", status=AgentHealthStatus.TERMINATED.value)
disconnect_message = opamp_pb2.AgentToServer(agent_disconnect=opamp_pb2.AgentDisconnect(), health=agent_health)
with self.condition:
self.condition.notify_all()
self.client_thread.join()
self.send_agent_to_server_message(disconnect_message)
def update_remote_config_status(self, server_to_agent: opamp_pb2.ServerToAgent) -> bool: # type: ignore
if server_to_agent.HasField("remote_config"):
remote_config_hash = server_to_agent.remote_config.config_hash
remote_config_status = opamp_pb2.RemoteConfigStatus(last_remote_config_hash=remote_config_hash)
self.remote_config_status = remote_config_status
return True
return False
# Mock client class for non-OpAMP installations
# This class simulates the OpAMP client when the OpAMP server is not available.
# To activate it, set the environment variable DISABLE_OPAMP_CLIENT to true.
class MockOpAMPClient:
def __init__(self, event, *args, **kwargs):
self.resource_attributes = {'odigos.opamp': 'disabled'}
event.set()
def shutdown(self, custom_failure_message=None):
pass