Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
agentio / agentio / trace.py
Size: Mime:
"""
Enhanced trace context manager that adds AgentIO storage.
"""

from typing import Optional, Any
from pathlib import Path
import logging
from contextlib import contextmanager

from agents.tracing import trace as sdk_trace
from agents.tracing import add_trace_processor, set_trace_processors, get_trace_provider

from .processor import AgentIOProcessor, RemoteAgentIOProcessor
from .storage import SQLiteStorage

logger = logging.getLogger(__name__)


@contextmanager
def trace(
    workflow_name: str,
    trace_id: Optional[str] = None,
    group_id: Optional[str] = None,
    metadata: Optional[dict[str, Any]] = None,
    disabled: bool = False,
    project: Optional[str] = None,
    server_url: Optional[str] = None,
    token: Optional[str] = None,
):
    """
    Enhanced trace that adds project metadata for AgentIO routing.
    
    This context manager simply adds project information to the trace metadata,
    allowing the AgentIO processor to route traces to the correct database.
    
    Args:
        workflow_name: Name of the workflow being traced
        trace_id: Optional trace ID (auto-generated if not provided)
        group_id: Optional grouping identifier
        metadata: Optional metadata dictionary
        disabled: If True, tracing is disabled
        project: AgentIO project name (overrides default from init)
        server_url: Remote AgentIO server URL (optional, not yet implemented)
        token: Auth token for remote server (optional, not yet implemented)
    
    Example:
        from agentio import trace
        
        with trace("my_workflow", project="my_project"):
            response = await Runner.run(agent, query)
    """
    # Import here to avoid circular import
    from . import _default_project, _processor
    
    # Determine target project
    target_project = project or _default_project
    
    # If no target project and no processor, tracing is disabled - just pass through
    if not target_project and not _processor:
        # User called init(project=None) and didn't specify project in trace()
        # Don't trace anything
        with sdk_trace(workflow_name, trace_id, group_id, metadata, disabled=True):
            yield
        return
    
    # Check if we need to create a processor on-demand
    if not _processor and target_project:
        # User specified a project but no processor exists
        # Create a routing processor on the fly
        from agents.tracing import add_trace_processor
        processor = AgentIOProcessor(default_project=None)
        add_trace_processor(processor)
        # Note: This processor won't be cleaned up automatically
    
    # Add project to metadata for routing
    if metadata is None:
        metadata = {}
    
    if target_project:
        metadata["agentio_project"] = target_project
    
    # Handle remote server case (future enhancement)
    if server_url:
        logger.warning("Remote server URL in trace context not yet implemented, using local storage")
    
    # Use SDK's trace context with enhanced metadata
    with sdk_trace(workflow_name, trace_id, group_id, metadata, disabled):
        yield