Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
omniagents / omniagents / backends / web / __init__.py
Size: Mime:
import os
import sys
import socket
import time
import platform
import shutil
import subprocess
import webbrowser
import logging
from pathlib import Path
from threading import Thread
from typing import Optional

from omniagents.core.agents.specs import AgentSpec
from omniagents.core.runtime.overrides import apply_cli_overrides
from omniagents.core.agents.specs import RealtimeSettings


class WebBackend:
    def run(self, spec: AgentSpec, **kwargs) -> None:
        try:
            from omniagents.backends.server.app import build_app
            import uvicorn
            from fastapi.staticfiles import StaticFiles
        except ImportError as e:
            print(
                "Error: Server dependencies not installed. Install with: pip install omniagents[server]"
            )
            print(f"Details: {e}")
            sys.exit(1)

        args = kwargs.get("args")
        codespaces_env = any(
            os.environ.get(k)
            for k in (
                "CODESPACES",
                "CODESPACE_NAME",
                "GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN",
            )
        )

        # Check for explicit CLI overrides
        host_arg = getattr(args, "host", None) if args else None
        port_arg = getattr(args, "port", None) if args else None
        ssl_cert = getattr(args, "ssl_cert", None) if args else None
        ssl_key = getattr(args, "ssl_key", None) if args else None

        # Default: localhost:8080 with sequential fallback (or 0.0.0.0 in Codespaces)
        if host_arg:
            host = host_arg
        else:
            host = "0.0.0.0" if codespaces_env else "127.0.0.1"
        if port_arg is not None:
            port = int(port_arg)
        else:
            port = self._find_available_port(8080)

        apply_cli_overrides(spec, args)

        auto_open = self._should_auto_open(kwargs.get("auto_open"))
        minimal_ui = self._is_minimal_ui(kwargs.get("minimal_ui"))

        def _ensure_realtime_defaults(target_spec):
            try:
                if not getattr(target_spec, "realtime_mode", False):
                    target_spec.realtime_mode = True
                if not getattr(target_spec, "realtime_settings", None):
                    target_spec.realtime_settings = RealtimeSettings()
                rs = target_spec.realtime_settings
                if not rs.modalities or ("audio" not in rs.modalities):
                    rs.modalities = ["audio"]
                if not rs.model_name:
                    rs.model_name = "gpt-realtime"
                if not rs.voice:
                    rs.voice = "alloy"
                if not rs.input_audio_format:
                    rs.input_audio_format = "pcm16"
                if not rs.output_audio_format:
                    rs.output_audio_format = "pcm16"
                if not rs.turn_detection:
                    rs.turn_detection = {
                        "type": "server_vad",
                        "threshold": 0.3,
                        "prefix_padding_ms": 100,
                        "silence_duration_ms": 200,
                    }
                if rs.temperature is None:
                    rs.temperature = 0.8
                if rs.max_output_tokens is None:
                    rs.max_output_tokens = 4096
                if not rs.input_audio_transcription:
                    rs.input_audio_transcription = {"model": "whisper-1"}
            except Exception:
                pass

        realtime_spec = getattr(spec, "voice_spec", None) or spec
        voice_backend = getattr(realtime_spec, "voice_backend", "realtime")
        if voice_backend == "realtime":
            _ensure_realtime_defaults(realtime_spec)

        config_path = kwargs.get("config_path")
        auth_token = kwargs.get("auth_token")
        debug = kwargs.get("debug", False)
        if not debug and args and hasattr(args, "debug"):
            debug = args.debug
        try:
            if debug:
                os.environ["OMNIAGENTS_SERVER_DEBUG"] = "1"
        except Exception:
            pass

        app = build_app(config_path=config_path, spec=spec, auth_token=auth_token)

        dev_flag_env = os.environ.get("OMNI_WEB_DEV", "0").lower() not in {
            "0",
            "false",
            "no",
        }
        dev_flag = bool(getattr(args, "dev", False)) or dev_flag_env
        dev_host = getattr(args, "dev_host", None) or host
        dev_port = int(getattr(args, "dev_port", 5173) or 5173)

        if not dev_flag:
            dist_dir = self._resolve_dist_dir(debug=debug)
            if dist_dir is None:
                print("Error: Web UI not found.")
                print("Manual build:")
                print("  cd omniagents/backends/web/ui")
                print("  npm install")
                print("  npm run build")
                sys.exit(1)
            app.mount(
                "/", StaticFiles(directory=str(dist_dir), html=True), name="webui"
            )

        # Determine if SSL is enabled
        use_ssl = bool(ssl_cert and ssl_key)

        def run_server():
            for name in (
                "agents",
                "openai",
                "httpx",
                "urllib3",
                "anthropic",
                "litellm",
            ):
                try:
                    lg = logging.getLogger(name)
                    lg.handlers.clear()
                    lg.addHandler(logging.NullHandler())
                    lg.setLevel(logging.CRITICAL)
                    lg.propagate = False
                except Exception:
                    pass
            uvicorn_kwargs = {
                "app": app,
                "host": host,
                "port": port,
                "log_level": "error",
                "access_log": False,
                "proxy_headers": True,
                "forwarded_allow_ips": "*",
            }
            if use_ssl:
                uvicorn_kwargs["ssl_certfile"] = ssl_cert
                uvicorn_kwargs["ssl_keyfile"] = ssl_key
            uvicorn.run(**uvicorn_kwargs)

        server_thread = Thread(target=run_server, daemon=True)
        server_thread.start()

        health_host = "127.0.0.1" if host in {"0.0.0.0", "::"} else host

        max_retries = 10
        for i in range(max_retries):
            time.sleep(0.5)
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    s.settimeout(1)
                    s.connect((health_host, port))
                    s.close()
                    break
            except (socket.timeout, ConnectionRefusedError):
                if i == max_retries - 1:
                    print(f"Error: Failed to start server on port {port}")
                    sys.exit(1)

        # Build URL with optional query params
        qp = []
        if auth_token:
            qp.append(("token", auth_token))
        try:
            if kwargs.get("resume"):
                qp.append(("resume", "true"))
            sid = kwargs.get("session_id")
            if sid:
                qp.append(("session", str(sid)))
            init = kwargs.get("initial_prompt")
            if init:
                qp.append(("initial", str(init)))
            if debug:
                qp.append(("debug", "1"))
            if minimal_ui:
                qp.append(("minimal", "true"))
        except Exception:
            pass
        open_url = None
        if dev_flag:
            try:
                open_url = self._start_dev_server(
                    ws_host=host, ws_port=port, dev_host=dev_host, dev_port=dev_port
                )
            except Exception as e:
                print(f"Error: Failed to start dev server: {e}")
                open_url = None
        if open_url is None:
            # Use 'localhost' instead of '127.0.0.1' for browser URL
            # Chrome requires this for secure context (getUserMedia, etc.)
            url_host = (
                "localhost" if host in {"127.0.0.1", "0.0.0.0", "::", "::1"} else host
            )
            scheme = "https" if use_ssl else "http"
            base = f"{scheme}://{url_host}:{port}/"
            if qp:
                from urllib.parse import urlencode

                qp.append(("ts", str(int(time.time()))))
                open_url = base + "?" + urlencode(qp)
            else:
                open_url = base + f"?ts={int(time.time())}"
        elif dev_flag and minimal_ui and open_url:
            if "minimal=" not in open_url:
                open_url = self._append_query_param(open_url, "minimal", "true")
        output_format = getattr(args, "output", "text") if args else "text"
        if output_format == "json":
            import json
            print(json.dumps({"url": open_url, "port": port}), flush=True)
        elif auto_open:
            try:
                webbrowser.open(open_url)
            except Exception:
                print(f"Open in browser: {open_url}")
        else:
            print(f"Open in browser: {open_url}")

        try:
            while server_thread.is_alive():
                time.sleep(0.2)
        except KeyboardInterrupt:
            pass

    @staticmethod
    def _find_available_port(start_port: int, max_attempts: int = 20) -> int:
        """Find an available port starting from start_port.

        Tries ports sequentially (start_port, start_port+1, ...) until
        an available one is found or max_attempts is reached.
        """
        for offset in range(max_attempts):
            port = start_port + offset
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    s.bind(("127.0.0.1", port))
                    return port
            except OSError:
                continue
        raise RuntimeError(
            f"Could not find available port in range {start_port}-{start_port + max_attempts - 1}"
        )

    @staticmethod
    def _should_auto_open(auto_open_arg: bool | None) -> bool:
        env_val = os.environ.get("OMNI_WEB_AUTO_OPEN")
        env_enabled = (
            True
            if env_val is None
            else env_val.lower()
            not in {
                "0",
                "false",
                "no",
            }
        )
        if auto_open_arg is None:
            return env_enabled
        return bool(auto_open_arg)

    @staticmethod
    def _is_minimal_ui(minimal_arg: bool | None) -> bool:
        env_val = os.environ.get("OMNI_WEB_MINIMAL")
        env_enabled = env_val is not None and env_val.lower() not in {
            "0",
            "false",
            "no",
        }
        if minimal_arg is None:
            return env_enabled
        return bool(minimal_arg)

    @staticmethod
    def _append_query_param(url: str, key: str, value: str) -> str:
        sep = "&" if "?" in url else "?"
        return f"{url}{sep}{key}={value}"

    def _start_dev_server(
        self, *, ws_host: str, ws_port: int, dev_host: str, dev_port: int
    ) -> Optional[str]:
        ui_dir = Path(__file__).parent / "ui"
        if not ui_dir.exists():
            return None
        if shutil.which("npm") is None:
            return None
        env = os.environ.copy()
        env["OMNI_WS_HOST"] = str(ws_host)
        env["OMNI_WS_PORT"] = str(ws_port)
        # Start Vite dev server
        cmd = [
            "npm",
            "run",
            "dev",
            "--",
            "--host",
            str(dev_host),
            "--port",
            str(dev_port),
        ]
        proc = subprocess.Popen(
            cmd,
            cwd=str(ui_dir),
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        # Poll for readiness
        url = f"http://{dev_host}:{dev_port}/"
        for _ in range(40):
            time.sleep(0.5)
            try:
                import urllib.request

                with urllib.request.urlopen(url, timeout=1) as resp:
                    if resp.status == 200:
                        return url
            except Exception:
                continue
        return url

    def _resolve_dist_dir(self, debug: bool = False) -> Optional[Path]:
        override = os.environ.get("OMNI_WEB_UI_PATH")
        if override:
            p = Path(override).expanduser()
            if p.is_dir() and (p / "index.html").exists():
                return p
        packaged = Path(__file__).parent / "ui" / "dist"
        if packaged.is_dir() and (packaged / "index.html").exists():
            return packaged
        cache_dir = self._cache_dir()
        if cache_dir is not None:
            candidate = cache_dir / "dist"
            if candidate.is_dir() and (candidate / "index.html").exists():
                return candidate
        auto_build = os.environ.get("OMNI_WEB_AUTO_BUILD", "1").lower() not in {
            "0",
            "false",
            "no",
        }
        if auto_build:
            try:
                built = self._build_ui(debug=debug)
                return built
            except SystemExit:
                raise
            except Exception as e:
                print(f"Error: Failed to build web UI: {e}")
                return None
        return None

    def _build_ui(self, debug: bool = False) -> Path:
        if shutil.which("npm") is None or shutil.which("node") is None:
            print("Error: Node.js/npm not found.")
            raise SystemExit(1)
        try:
            v = subprocess.check_output(["node", "-v"], text=True).strip()
            if v.startswith("v"):
                v = v[1:]
            major = int((v.split(".") or ["0"])[0])
            if major < 18:
                print("Error: Node.js >= 18 required.")
                raise SystemExit(1)
        except Exception:
            pass

        src_base = Path(__file__).parent / "ui"
        if not src_base.exists():
            raise SystemExit(1)
        cache_dir = self._cache_dir() or src_base
        src_hash = self._compute_source_hash(src_base)
        stamp = self._stamp_path(cache_dir)
        need_sync = True
        try:
            if (
                stamp.exists()
                and src_hash
                and stamp.read_text(encoding="utf-8").strip() == src_hash
            ):
                need_sync = False
        except Exception:
            need_sync = True
        if need_sync and cache_dir != src_base:
            if cache_dir.exists():
                try:
                    shutil.rmtree(cache_dir)
                except Exception:
                    pass
            shutil.copytree(src_base, cache_dir)

        env = os.environ.copy()
        env.setdefault("NPM_CONFIG_AUDIT", "false")
        env.setdefault("NPM_CONFIG_FUND", "false")
        install_cmd = ["npm", "install"]
        lockfile = cache_dir / "package-lock.json"
        if lockfile.exists():
            install_cmd = ["npm", "ci"]
        try:
            subprocess.run(
                install_cmd,
                cwd=str(cache_dir),
                check=True,
                env=env,
                stdout=None if debug else subprocess.PIPE,
                stderr=None if debug else subprocess.PIPE,
                text=True,
            )
        except subprocess.CalledProcessError as e:
            raise SystemExit(e.returncode)
        try:
            subprocess.run(
                ["npm", "run", "build"],
                cwd=str(cache_dir),
                check=True,
                env=env,
                stdout=None if debug else subprocess.PIPE,
                stderr=None if debug else subprocess.PIPE,
                text=True,
            )
        except subprocess.CalledProcessError as e:
            raise SystemExit(e.returncode)
        out_dir = cache_dir / "dist"
        if not out_dir.is_dir() or not (out_dir / "index.html").exists():
            raise SystemExit(1)
        try:
            if src_hash:
                stamp.write_text(src_hash + "\n", encoding="utf-8")
        except Exception:
            pass
        return out_dir

    def _cache_dir(self) -> Optional[Path]:
        base = self._user_cache_dir()
        if base is None:
            return None
        dest = base / "web" / "ui"
        dest.mkdir(parents=True, exist_ok=True)
        return dest

    def _user_cache_dir(self) -> Optional[Path]:
        home = Path.home()
        system = platform.system().lower()
        if system == "windows":
            base = os.environ.get("LOCALAPPDATA") or os.environ.get("APPDATA")
            if not base:
                return None
            return Path(base) / "OmniAgents"
        elif system == "darwin":
            return home / "Library" / "Caches" / "OmniAgents"
        else:
            xdg = os.environ.get("XDG_CACHE_HOME")
            if xdg:
                return Path(xdg) / "omniagents"
            return home / ".cache" / "omniagents"

    def _stamp_path(self, cache_dir: Path) -> Path:
        return cache_dir / "omni-web-ui.hash"

    def _compute_source_hash(self, src_base: Path) -> Optional[str]:
        try:
            files = []
            for root, _dirs, names in os.walk(src_base / "src"):
                for n in names:
                    if n.endswith(".ts") or n.endswith(".tsx"):
                        files.append(str(Path(root) / n))
            files.append(str(src_base / "package.json"))
            files.append(str(src_base / "tsconfig.json"))
            tcfg = src_base / "tailwind.config.js"
            if tcfg.exists():
                files.append(str(tcfg))
            pcfg = src_base / "postcss.config.js"
            if pcfg.exists():
                files.append(str(pcfg))
            files = [f for f in files if os.path.isfile(f)]
            files.sort()
            import hashlib as _hashlib

            digest = _hashlib.sha256()
            for f in files:
                with open(f, "rb") as fh:
                    digest.update(fh.read())
            return digest.hexdigest()
        except Exception:
            return None


__all__ = ["WebBackend"]