Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
omniagents / omniagents / rpc / realtime_session.py
Size: Mime:
"""Server-side realtime session management for voice agents.

This module provides the RealtimeSession class for managing realtime agent
sessions on the server side, including event streaming, audio storage, and
lifecycle management.
"""

import asyncio
import uuid
import base64
import logging
import os
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from typing import Optional, Dict, Any, AsyncIterator, Callable
from contextlib import asynccontextmanager
from contextlib import AsyncExitStack

from omniagents.core.agents.specs import AgentSpec
from omniagents.core.session.audio_storage import AudioStorage
from omniagents.core.session import history_db
from omniagents.core.paths import get_audio_storage_path
from omniagents.rpc.realtime_events import (
    RealtimeEventBase,
    map_realtime_session_event_to_rpc,
)


logger = logging.getLogger(__name__)


def _mask_secret(value: Optional[str]) -> str:
    if not value:
        return ""
    if len(value) <= 8:
        return "*" * len(value)
    return f"{value[:4]}...{value[-4:]}"


def _collect_headers(settings: Dict[str, Any]) -> Dict[str, Any]:
    merged: Dict[str, Any] = {}
    for key in ("headers", "extra_headers", "openai_headers", "openai_extra_headers"):
        value = settings.get(key)
        if isinstance(value, dict):
            merged.update(value)
    return merged


def _scrub_headers(headers: Dict[str, Any]) -> Dict[str, Any]:
    scrubbed: Dict[str, Any] = {}
    for key, value in headers.items():
        if not isinstance(key, str):
            continue
        lowered = key.lower()
        if isinstance(value, str) and any(
            token in lowered for token in ("authorization", "api", "token", "secret")
        ):
            scrubbed[key] = _mask_secret(value)
        else:
            scrubbed[key] = value
    return scrubbed


def _is_azure_endpoint(url: Optional[str]) -> bool:
    if not url:
        return False
    lowered = url.lower()
    return "openai.azure.com" in lowered or "services.ai.azure.com" in lowered


def _is_openai_compatible_azure_endpoint(url: Optional[str]) -> bool:
    if not _is_azure_endpoint(url):
        return False
    lowered = (url or "").lower()
    return "/openai/v1" in lowered


def _coerce_bool(value: Any) -> bool:
    if isinstance(value, bool):
        return value
    if isinstance(value, str):
        return value.strip().lower() in {"1", "true", "yes", "on"}
    if isinstance(value, (int, float)):
        return value != 0
    return False


def _resolve_realtime_deployment_name(
    spec: AgentSpec, settings: Dict[str, Any]
) -> Optional[str]:
    candidates = [
        settings.get("azure_openai_deployment_name"),
        settings.get("openai_realtime_deployment"),
        settings.get("openai_realtime_model"),
        os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
        os.getenv("OPENAI_REALTIME_DEPLOYMENT"),
        os.getenv("OPENAI_REALTIME_MODEL"),
    ]
    if spec.realtime_settings and spec.realtime_settings.model_name:
        candidates.append(spec.realtime_settings.model_name)
    for value in candidates:
        if isinstance(value, str) and value.strip():
            return value.strip()
    # Default to gpt-realtime if nothing specified
    return "gpt-realtime"


def _resolve_realtime_model_name(spec: AgentSpec, settings: Dict[str, Any]) -> Optional[str]:
    candidates = [
        settings.get("openai_realtime_model"),
        os.getenv("OPENAI_REALTIME_MODEL"),
    ]
    if spec.realtime_settings and spec.realtime_settings.model_name:
        candidates.append(spec.realtime_settings.model_name)
    for value in candidates:
        if isinstance(value, str) and value.strip():
            return value.strip()
    return None


def _normalize_ws_scheme(raw_scheme: Optional[str]) -> str:
    if not raw_scheme:
        return "wss"
    lowered = raw_scheme.lower()
    if lowered == "https":
        return "wss"
    if lowered == "http":
        return "ws"
    if lowered in {"ws", "wss"}:
        return lowered
    return "wss"


def _build_azure_realtime_url(
    candidate_url: Optional[str],
    base_url: Optional[str],
    spec: AgentSpec,
    settings: Dict[str, Any],
) -> Optional[str]:
    deployment = _resolve_realtime_deployment_name(spec, settings)
    if not deployment:
        return candidate_url or base_url
    endpoint = (
        settings.get("azure_openai_endpoint")
        or settings.get("openai_base_url")
        or candidate_url
        or base_url
        or os.getenv("AZURE_OPENAI_ENDPOINT")
        or os.getenv("OPENAI_BASE_URL")
    )
    if not endpoint:
        return candidate_url or base_url
    parsed = urlparse(endpoint if "://" in endpoint else f"https://{endpoint}")
    netloc = parsed.netloc or parsed.path
    if not netloc:
        return candidate_url or base_url
    scheme = _normalize_ws_scheme(parsed.scheme)
    api_version = (
        settings.get("azure_openai_api_version")
        or os.getenv("AZURE_OPENAI_API_VERSION")
        or "2024-10-21-preview"
    )
    deployment_path = deployment.strip()
    return f"{scheme}://{netloc}/openai/deployments/{deployment_path}/realtime?api-version={api_version}"


def _build_realtime_url_from_base(
    base_url: str,
    model_name: Optional[str],
) -> str:
    """Build a realtime WebSocket URL from a base HTTP URL.

    Converts https://host/openai/v1 to wss://host/openai/v1/realtime?model=X
    """
    # Convert scheme
    ws_url = base_url.replace("https://", "wss://").replace("http://", "ws://")

    # Ensure it ends with /realtime
    ws_url = ws_url.rstrip("/")
    if not ws_url.endswith("/realtime"):
        ws_url = ws_url + "/realtime"

    # Add model as query param if provided
    if model_name:
        parsed = urlparse(ws_url)
        params = dict(parse_qsl(parsed.query, keep_blank_values=True))
        if "model" not in params:
            params["model"] = model_name
            ws_url = urlunparse(
                (
                    parsed.scheme,
                    parsed.netloc,
                    parsed.path,
                    parsed.params,
                    urlencode(params, doseq=True),
                    parsed.fragment,
                )
            )

    return ws_url


def _resolve_realtime_url(
    explicit_url: Optional[str],
    base_url: Optional[str],
    spec: AgentSpec,
    settings: Dict[str, Any],
) -> Optional[str]:
    candidate = explicit_url or base_url
    logger.debug(
        "_resolve_realtime_url: explicit_url=%s, base_url=%s, candidate=%s",
        explicit_url,
        base_url,
        candidate,
    )
    if not candidate:
        logger.debug("_resolve_realtime_url: no candidate URL, returning None")
        return None

    # If an explicit realtime URL was provided (openai_realtime_url), use it as-is
    if explicit_url:
        logger.debug(
            "_resolve_realtime_url: explicit URL provided, returning as-is: %s",
            explicit_url,
        )
        return explicit_url

    # We're falling back to base_url - need to construct the realtime URL
    if (
        (_is_azure_endpoint(base_url) or _is_azure_endpoint(candidate))
        and not _is_openai_compatible_azure_endpoint(base_url)
        and not _is_openai_compatible_azure_endpoint(candidate)
    ):
        model_name = _resolve_realtime_deployment_name(spec, settings)
        azure_url = _build_azure_realtime_url(
            candidate_url=candidate,
            base_url=base_url,
            spec=spec,
            settings=settings,
        )
        if azure_url:
            logger.debug("_resolve_realtime_url: built Azure URL: %s", azure_url)
            return azure_url

    model_name = _resolve_realtime_model_name(spec, settings)
    logger.debug(
        "_resolve_realtime_url: building from base_url=%s with model=%s",
        base_url,
        model_name,
    )

    result = _build_realtime_url_from_base(base_url, model_name)
    logger.debug("_resolve_realtime_url: built URL: %s", result)
    return result


class RealtimeSession:
    """Manages a realtime agent session on the server side.

    This class:
    - Creates and manages a realtime agent from the OpenAI SDK
    - Streams events from the agent and converts them to RPC events
    - Stores audio chunks to disk and metadata to database
    - Manages session lifecycle (start, stop, cleanup)
    """

    def __init__(
        self,
        spec: AgentSpec,
        settings: Optional[Dict[str, Any]] = None,
        session_id: Optional[str] = None,
        audio_storage: Optional[AudioStorage] = None,
    ):
        """Initialize a realtime session.

        Args:
            spec: Agent specification with realtime settings
            settings: API keys and other settings
            session_id: Optional session ID (generated if not provided)
            audio_storage: Optional AudioStorage instance (created if not provided)
        """
        if not spec.realtime_mode:
            raise ValueError("AgentSpec must have realtime_mode=True")

        self.spec = spec
        self.settings = settings or {}
        self.session_id = session_id or str(uuid.uuid4())

        # Extract project and agent slugs for database storage
        self._project_slug = getattr(spec, "project_slug", None)
        self._agent_slug = getattr(spec, "agent_slug", None)

        # Initialize audio storage
        if audio_storage:
            self.audio_storage = audio_storage
        else:
            audio_path = get_audio_storage_path(self._project_slug, self._agent_slug)
            self.audio_storage = AudioStorage(base_path=audio_path)

        # Session state
        self._agent = None
        self._runner = None
        self._realtime_session = None
        self._run_id: Optional[str] = None
        self._api_key: Optional[str] = None
        self._active = False
        self._event_task: Optional[asyncio.Task] = None

        self._mcp_shutdown: Optional[asyncio.Event] = None
        self._mcp_lifecycle_task: Optional[asyncio.Task] = None
        self._mcp_servers_active: list[Any] = []

    async def _ensure_mcp_servers(self) -> list[Any]:
        if self._mcp_servers_active:
            return self._mcp_servers_active

        if not getattr(self.spec, "mcp_servers", None):
            return []

        from omniagents.core.agents.mcp import build_mcp_servers

        servers_ready = asyncio.Event()
        active_servers_holder: dict[str, list[Any]] = {"servers": []}

        async def mcp_lifecycle_manager():
            settings = dict(self.settings)
            async with AsyncExitStack() as stack:
                active_servers: list[Any] = []
                try:
                    servers = build_mcp_servers(self.spec, settings)
                    for server in servers:
                        server_name = getattr(server, "name", "unknown")
                        try:
                            if hasattr(server, "__aenter__") and hasattr(server, "__aexit__"):
                                active = await stack.enter_async_context(server)
                            else:
                                await server.connect()
                                stack.push_async_callback(server.cleanup)
                                active = server
                            active_servers.append(active)
                        except Exception as server_err:
                            logger.debug(
                                "MCP server '%s' failed to connect: %s",
                                server_name,
                                server_err,
                            )

                    self._mcp_servers_active = active_servers
                    active_servers_holder["servers"] = active_servers
                    servers_ready.set()

                    shutdown_event = self._mcp_shutdown
                    if shutdown_event:
                        await shutdown_event.wait()
                finally:
                    self._mcp_servers_active = []

        self._mcp_shutdown = asyncio.Event()
        self._mcp_lifecycle_task = asyncio.create_task(
            mcp_lifecycle_manager(), name=f"realtime_mcp_lifecycle_{self.session_id[:8]}"
        )
        await servers_ready.wait()
        return active_servers_holder["servers"]

    async def start(self) -> str:
        """Start the realtime session.

        Returns:
            The run ID for this session

        Raises:
            RuntimeError: If session is already active
        """
        if self._active:
            raise RuntimeError("Session is already active")

        # Import OpenAI SDK components
        try:
            from agents.realtime import RealtimeAgent, RealtimeRunner
        except ImportError:
            raise ImportError(
                "OpenAI agents SDK not installed. "
                "Install with: pip install openai-agents"
            )

        # Get API key
        api_key = self.settings.get("openai_api_key") or self.settings.get("api_key")
        if not api_key:
            raise ValueError("OpenAI API key not provided in settings")

        masked_key = _mask_secret(api_key)
        logger.info(
            "RealtimeSession %s using API key %s",
            self.session_id,
            masked_key,
        )

        # Build tools if available
        tools = self.spec.available_tools if self.spec.available_tools else []

        mcp_servers = await self._ensure_mcp_servers()

        # Build handoffs if configured
        handoffs = []
        if self.spec.handoff_configs:
            from omniagents.core.agents.builder import _build_handoffs_from_configs

            handoffs = await _build_handoffs_from_configs(
                handoff_configs=self.spec.handoff_configs,
                settings=self.settings,
                approval_callback=None,  # No approval callback for realtime
                parent_spec=self.spec,
            )

        # Get guardrails from spec
        output_guardrails = (
            self.spec.output_guardrails if self.spec.output_guardrails else []
        )

        # Build instructions with context (Jinja2 template processing)
        raw_instructions = (
            self.spec.get_agent_instructions()
            if self.spec.get_agent_instructions
            else ""
        )

        # Build context from factory if available
        context_dict = {}
        if self.spec.build_context and self.spec.variables is not None:
            try:
                context_dict = self.spec.build_context(self.spec.variables)
            except Exception as e:
                logger.warning(f"Failed to build context: {e}")
        elif self.spec.context:
            # Use pre-built context if available
            context_dict = self.spec.context if isinstance(self.spec.context, dict) else {}

        # Render Jinja2 template if context is available
        if context_dict and raw_instructions:
            from omniagents.core.utils.jinja_instructions import render_template
            try:
                instructions = render_template(raw_instructions, silent_undefined=True, **context_dict)
            except Exception as e:
                logger.warning(f"Failed to render instructions template: {e}")
                instructions = raw_instructions
        else:
            instructions = raw_instructions

        # Create agent with realtime configuration
        # RealtimeAgent only takes: name, instructions, tools, handoffs, output_guardrails, hooks
        self._agent = RealtimeAgent(
            name=self.spec.name or "RealtimeAgent",
            instructions=instructions,
            tools=tools,
            handoffs=handoffs,
            output_guardrails=output_guardrails,
            mcp_servers=list(mcp_servers),
            mcp_config=self.spec.mcp_config or {},
        )

        # Create runner with agent
        self._runner = RealtimeRunner(self._agent)

        # Store API key for model_config
        self._api_key = api_key

        # Generate run ID
        self._run_id = str(uuid.uuid4())

        # Register session in database
        history_db.register_session(
            self.session_id,
            project_slug=self._project_slug,
            agent_slug=self._agent_slug,
        )

        self._active = True
        return self._run_id

    async def connect(self) -> Any:
        """Connect to the realtime session.

        Returns:
            The OpenAI RealtimeSession object

        Raises:
            RuntimeError: If session not started
        """
        if not self._active:
            raise RuntimeError("Session not started. Call start() first.")

        # Build model configuration
        realtime_settings = self.spec.realtime_settings

        # Build initial model settings from spec
        initial_model_settings = {}

        if realtime_settings.model_name:
            initial_model_settings["model_name"] = realtime_settings.model_name

        if realtime_settings.modalities:
            initial_model_settings["modalities"] = realtime_settings.modalities

        if realtime_settings.voice:
            initial_model_settings["voice"] = realtime_settings.voice

        if realtime_settings.temperature is not None:
            # Note: temperature is not in RealtimeSessionModelSettings
            # It may be part of a different config or not supported
            pass

        if realtime_settings.input_audio_format:
            initial_model_settings["input_audio_format"] = (
                realtime_settings.input_audio_format
            )

        if realtime_settings.output_audio_format:
            initial_model_settings["output_audio_format"] = (
                realtime_settings.output_audio_format
            )

        if realtime_settings.turn_detection:
            initial_model_settings["turn_detection"] = realtime_settings.turn_detection

        if realtime_settings.input_audio_transcription:
            initial_model_settings["input_audio_transcription"] = (
                realtime_settings.input_audio_transcription
            )

        # Build model_config
        model_config: Dict[str, Any] = {
            "initial_model_settings": initial_model_settings,
        }

        # Debug: log all URL sources
        logger.debug(
            "RealtimeSession %s URL resolution inputs: "
            "settings[openai_realtime_url]=%s, "
            "env[OPENAI_REALTIME_URL]=%s, "
            "settings[openai_realtime_base_url]=%s, "
            "env[OPENAI_REALTIME_BASE_URL]=%s, "
            "settings[openai_base_url]=%s, "
            "env[OPENAI_BASE_URL]=%s",
            self.session_id,
            self.settings.get("openai_realtime_url"),
            os.getenv("OPENAI_REALTIME_URL"),
            self.settings.get("openai_realtime_base_url"),
            os.getenv("OPENAI_REALTIME_BASE_URL"),
            self.settings.get("openai_base_url"),
            os.getenv("OPENAI_BASE_URL"),
        )

        def _get_setting_or_env(setting_key: str, env_key: str) -> Optional[str]:
            if setting_key in self.settings:
                value = self.settings.get(setting_key)
                return value if isinstance(value, str) and value.strip() else None
            value = os.getenv(env_key)
            return value if isinstance(value, str) and value.strip() else None

        realtime_url = _get_setting_or_env(
            "openai_realtime_url", "OPENAI_REALTIME_URL"
        ) or _get_setting_or_env(
            "openai_realtime_base_url", "OPENAI_REALTIME_BASE_URL"
        )
        base_url = _get_setting_or_env("openai_base_url", "OPENAI_BASE_URL")

        logger.debug(
            "RealtimeSession %s URL resolution: realtime_url=%s, base_url=%s",
            self.session_id,
            realtime_url,
            base_url,
        )

        resolved_url = _resolve_realtime_url(
            explicit_url=realtime_url,
            base_url=base_url,
            spec=self.spec,
            settings=self.settings,
        )

        logger.debug(
            "RealtimeSession %s _resolve_realtime_url returned: %s",
            self.session_id,
            resolved_url,
        )

        if resolved_url:
            model_config["url"] = resolved_url

        headers = _collect_headers(self.settings)
        current_url = model_config.get("url")
        azure_target = _is_azure_endpoint(current_url) or _is_azure_endpoint(base_url)
        force_header_flag = self.settings.get("openai_realtime_force_api_key_header")
        if force_header_flag is None:
            force_header_flag = os.getenv("OPENAI_REALTIME_FORCE_API_KEY_HEADER")
        use_api_key_header = _coerce_bool(force_header_flag) or azure_target
        header_has_api_key = any(
            isinstance(key, str) and key.lower() == "api-key" for key in headers.keys()
        )
        api_key_consumed = False

        if use_api_key_header and self._api_key and not header_has_api_key:
            headers["api-key"] = self._api_key
            api_key_consumed = True
        elif header_has_api_key:
            api_key_consumed = True

        if headers:
            model_config["headers"] = headers

        query_flag = self.settings.get("openai_realtime_api_key_in_query")
        if query_flag is None:
            query_flag = os.getenv("OPENAI_REALTIME_API_KEY_IN_QUERY")
        query_flag = _coerce_bool(query_flag)

        if query_flag and model_config.get("url") and self._api_key:
            parsed = urlparse(model_config["url"])
            params = dict(parse_qsl(parsed.query, keep_blank_values=True))
            if "api-key" not in params:
                params["api-key"] = self._api_key
                model_config["url"] = urlunparse(
                    (
                        parsed.scheme,
                        parsed.netloc,
                        parsed.path,
                        parsed.params,
                        urlencode(params, doseq=True),
                        parsed.fragment,
                    )
                )
            if "api-key" in params:
                api_key_consumed = True

        if api_key_consumed:
            model_config.pop("api_key", None)
        elif self._api_key:
            model_config["api_key"] = self._api_key

        scrubbed_headers = _scrub_headers(headers)
        logger.info(
            "RealtimeSession %s connecting run_id=%s base_url=%s headers=%s",
            self.session_id,
            self._run_id,
            model_config.get("url") or base_url or "<default>",
            scrubbed_headers or {},
        )
        logger.debug(
            "RealtimeSession %s model settings: %s",
            self.session_id,
            initial_model_settings,
        )

        # Log the final model_config being passed to runner.run()
        # Scrub sensitive data for logging
        loggable_config = {
            k: (_mask_secret(v) if k == "api_key" else v)
            for k, v in model_config.items()
            if k != "headers"
        }
        loggable_config["headers"] = scrubbed_headers
        logger.debug(
            "RealtimeSession %s final model_config for runner.run(): %s",
            self.session_id,
            loggable_config,
        )

        # Get the session from runner.run()
        # This returns a RealtimeSession that needs to be used as async context manager
        self._realtime_session = await self._runner.run(model_config=model_config)

        # Enter the async context manager
        await self._realtime_session.__aenter__()

        return self._realtime_session

    async def stream_events(self) -> AsyncIterator[RealtimeEventBase]:
        """Stream realtime events from the agent.

        Yields:
            RPC realtime event objects

        Raises:
            RuntimeError: If session not connected
        """
        if not self._realtime_session:
            raise RuntimeError("Session not connected. Call connect() first.")

        # Cache tool call arguments from raw model events
        # Key: (tool_name, call_id) -> arguments string
        # Raw model events arrive before tool_start events and contain the arguments
        tool_call_cache = {}

        # Stream events from the realtime session (it's an async iterator)
        async for event in self._realtime_session:
            # Check for raw model events first to cache tool call arguments
            # This mimics how non-realtime mode extracts arguments from tool_call_item
            if event.type == "raw_model_event":
                data = getattr(event, "data", None)
                # RealtimeModelToolCallEvent has name, call_id, arguments
                if data and getattr(data, "type", None) == "function_call":
                    tool_name = getattr(data, "name", None)
                    call_id = getattr(data, "call_id", None)
                    arguments = getattr(data, "arguments", "")
                    if tool_name and call_id:
                        tool_call_cache[(tool_name, call_id)] = arguments

            # Convert SDK event to RPC event (pass cache for tool_start events)
            rpc_event = map_realtime_session_event_to_rpc(
                event, self._run_id, tool_call_cache=tool_call_cache
            )

            if rpc_event:
                # Handle audio storage for audio events
                if rpc_event.type == "realtime_audio":
                    await self._store_audio_chunk(rpc_event)

                # Clean up cache after tool completes to prevent memory leak
                if rpc_event.type == "realtime_tool_end":
                    cache_key = (rpc_event.tool_name, rpc_event.tool_call_id)
                    tool_call_cache.pop(cache_key, None)

                yield rpc_event

    async def _store_audio_chunk(self, audio_event) -> None:
        """Store an audio chunk to disk and save metadata.

        Args:
            audio_event: RealtimeAudioEvent with audio data
        """
        # Decode base64 audio
        audio_bytes = base64.b64decode(audio_event.audio_base64)

        # Determine audio format (from realtime settings)
        audio_format = self.spec.realtime_settings.output_audio_format

        # Save audio file
        relative_path = await self.audio_storage.save_audio_chunk(
            session_id=self.session_id,
            item_id=audio_event.item_id,
            content_index=audio_event.content_index,
            audio_data=audio_bytes,
            audio_format=audio_format,
        )

        # Save metadata to database
        history_db.save_audio_metadata(
            session_id=self.session_id,
            item_id=audio_event.item_id,
            content_index=audio_event.content_index,
            file_path=relative_path,
            audio_format=audio_format,
            duration_ms=audio_event.delta_ms,
            project_slug=self._project_slug,
            agent_slug=self._agent_slug,
        )

    async def send_audio(self, audio_base64: str, commit: bool = False) -> None:
        """Send audio input to the realtime agent.

        Args:
            audio_base64: Base64-encoded audio data
            commit: If True, commit the audio and trigger response generation

        Raises:
            RuntimeError: If session not connected
        """
        if not self._realtime_session:
            raise RuntimeError("Session not connected. Call connect() first.")

        # Decode base64 audio
        audio_bytes = base64.b64decode(audio_base64)

        # Send to realtime session
        # commit=False means streaming (buffer audio)
        # commit=True means commit and trigger response
        await self._realtime_session.send_audio(audio_bytes, commit=commit)

    async def send_text(self, text: str) -> None:
        """Send text input to the realtime agent.

        Args:
            text: Text message to send

        Raises:
            RuntimeError: If session not connected
        """
        if not self._realtime_session:
            raise RuntimeError("Session not connected. Call connect() first.")

        # Send message to session (accepts string or structured message)
        await self._realtime_session.send_message(text)

    async def interrupt(self) -> None:
        """Interrupt the current agent response.

        Raises:
            RuntimeError: If session not connected
        """
        if not self._realtime_session:
            raise RuntimeError("Session not connected. Call connect() first.")

        await self._realtime_session.interrupt()

    async def stop(self) -> None:
        """Stop the realtime session and clean up resources."""
        self._active = False

        shutdown_event = self._mcp_shutdown
        if shutdown_event:
            shutdown_event.set()
        lifecycle_task = self._mcp_lifecycle_task
        if lifecycle_task:
            try:
                await lifecycle_task
            except asyncio.CancelledError:
                pass
            except Exception:
                pass
        self._mcp_shutdown = None
        self._mcp_lifecycle_task = None
        self._mcp_servers_active = []

        # Cancel event streaming task if running
        if self._event_task and not self._event_task.done():
            self._event_task.cancel()
            try:
                await self._event_task
            except asyncio.CancelledError:
                pass

        # Exit the realtime session's async context manager
        if self._realtime_session:
            try:
                await self._realtime_session.__aexit__(None, None, None)
            except Exception:
                pass
            self._realtime_session = None

        self._runner = None
        self._agent = None
        self._run_id = None

    @asynccontextmanager
    async def managed(self):
        """Context manager for automatic session lifecycle management.

        Example:
            async with session.managed():
                await session.connect()
                async for event in session.stream_events():
                    # Process events
                    pass
        """
        try:
            await self.start()
            yield self
        finally:
            await self.stop()