Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
omniagents / omniagents / core / eval / loader.py
Size: Mime:
"""Runtime evaluation config loader.

Replaces the compile-then-read-from-target pattern with direct loading from
the ``evaluations/`` directory.  No files are written to disk -- the merged
config is returned as an :class:`EvalConfig` dataclass ready for use by the
eval pipeline, scenario generator, judges, and optimizers.
"""

from __future__ import annotations

import fnmatch
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import yaml


# ---------------------------------------------------------------------------
# Data class returned to callers
# ---------------------------------------------------------------------------


@dataclass
class EvalConfig:
    """Fully resolved evaluation configuration."""

    config: Dict[str, Any]
    """Merged evaluation dict (scenarios, judges, metrics, optimizers, etc.)."""

    agent_config: Dict[str, Any]
    """The raw agent YAML dict (without injected ``evaluation`` key)."""

    agent_path: Path
    """Absolute path to the agent YAML file on disk."""

    eval_dir: Path
    """Absolute path to the ``evaluations/`` directory (base for relative paths)."""

    project_root: Path
    """Absolute path to the project root (where ``project.yml`` lives)."""

    tracing_config: Dict[str, Any] = field(default_factory=dict)
    """Project-level tracing configuration."""

    project_config: Dict[str, Any] = field(default_factory=dict)
    """Raw ``project.yml`` dict."""

    judge_sources: Dict[str, Path] = field(default_factory=dict)
    """Per-judge YAML source path. Used to resolve a judge's relative paths
    (``instructions_file``, ``template_file``) and to locate the file an
    editor would write back to. Keys match ``config['judges']``; the path is
    the YAML the merge picked the judge up from (last-write-wins, mirroring
    merge order)."""


# ---------------------------------------------------------------------------
# YAML / dict helpers (extracted from compiler.py)
# ---------------------------------------------------------------------------


def _read_yaml(path: Path) -> Any:
    try:
        return yaml.safe_load(path.read_text(encoding="utf-8"))
    except Exception:
        return None


def _merge_dict(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
    out = dict(a or {})
    for k, v in (b or {}).items():
        if k in out and isinstance(out[k], dict) and isinstance(v, dict):
            out[k] = _merge_dict(out[k], v)
        else:
            out[k] = v
    return out


def _ensure_list(x: Any) -> List[Any]:
    if x is None:
        return []
    if isinstance(x, list):
        return x
    return [x]


# ---------------------------------------------------------------------------
# Scenario collection
# ---------------------------------------------------------------------------


def _load_scenarios_file(base_dir: Path, rel_or_abs: str) -> List[Dict[str, Any]]:
    try:
        p = Path(rel_or_abs)
        if not p.is_absolute():
            p = (base_dir / rel_or_abs).resolve()
        if not p.is_file():
            return []
        data = _read_yaml(p)
        if isinstance(data, dict):
            sc = data.get("scenarios")
            if isinstance(sc, list):
                return _expand_scenarios(p.parent, sc)
            return []
        if isinstance(data, list):
            return _expand_scenarios(p.parent, data)
        return []
    except Exception:
        return []


def _expand_scenarios(base_dir: Path, scenarios: List[Any]) -> List[Dict[str, Any]]:
    out: List[Dict[str, Any]] = []
    for item in scenarios or []:
        if isinstance(item, str):
            out.extend(_load_scenarios_file(base_dir, item))
            continue
        if isinstance(item, dict) and ("include" in item or "file" in item):
            val = item.get("include") if "include" in item else item.get("file")
            if isinstance(val, list):
                for v in val:
                    out.extend(_load_scenarios_file(base_dir, str(v)))
            elif isinstance(val, str):
                out.extend(_load_scenarios_file(base_dir, val))
            continue
        if isinstance(item, dict):
            out.append(item)
    return out


def _collect_scenarios(paths: List[Path]) -> List[Dict[str, Any]]:
    all_items: List[Any] = []
    for p in paths:
        if p.is_file():
            data = _read_yaml(p)
            if isinstance(data, dict):
                items = data.get("scenarios")
                if isinstance(items, list):
                    all_items.extend(items)
            elif isinstance(data, list):
                all_items.extend(data)
            continue
        if not p.is_dir():
            continue
        for y in sorted(list(p.rglob("*.yml"))):
            data = _read_yaml(y)
            if isinstance(data, dict):
                items = data.get("scenarios")
                if isinstance(items, list):
                    all_items.extend(items)
            elif isinstance(data, list):
                all_items.extend(data)
    return _expand_scenarios(Path.cwd(), all_items)


# ---------------------------------------------------------------------------
# Judge / optimizer / metric collection
# ---------------------------------------------------------------------------


def _resolve_judge_agent_ref(base: Path, jcfg: Dict[str, Any]) -> Dict[str, Any]:
    jc = dict(jcfg or {})
    ref = jc.get("agent_ref") or jc.get("ref")
    if isinstance(ref, str):
        p = Path(ref)
        if not p.is_absolute():
            p = (base / p).resolve()
        data = _read_yaml(p)
        if isinstance(data, dict):
            if "agent" in data and isinstance(data["agent"], dict):
                jc.setdefault("agent", data["agent"])
            else:
                for k in ("agent", "input"):
                    if k in data and isinstance(data[k], dict):
                        jc.setdefault(k, data[k])
    return jc


def _collect_judges(
    paths: List[Path], sources: Optional[Dict[str, Path]] = None
) -> Dict[str, Any]:
    """Collect judge configs from per-folder yml files.

    If ``sources`` is provided, each judge's source path is recorded into it
    (last-write-wins, mirroring the merge order). Inline judges defined under
    ``evaluation.judges`` in ``evaluation.yml`` are picked up by
    :func:`_collect_base_eval`, not here.
    """
    out: Dict[str, Any] = {}
    for p in paths:
        if p.is_file():
            data = _read_yaml(p)
            if isinstance(data, dict):
                js = data.get("judges")
                if isinstance(js, dict):
                    for k, v in js.items():
                        out[k] = _resolve_judge_agent_ref(p.parent, v)
                        if sources is not None:
                            sources[str(k)] = p
                if isinstance(js, list):
                    for j in js:
                        key = (j or {}).get("name") or (j or {}).get("key")
                        if key:
                            out[key] = _resolve_judge_agent_ref(p.parent, j)
                            if sources is not None:
                                sources[str(key)] = p
            continue
        if not p.is_dir():
            continue
        for y in sorted(list(p.rglob("*.yml"))):
            data = _read_yaml(y)
            if not isinstance(data, dict):
                continue
            js = data.get("judges")
            if isinstance(js, dict):
                for k, v in js.items():
                    out[k] = _resolve_judge_agent_ref(y.parent, v)
                    if sources is not None:
                        sources[str(k)] = y
            if isinstance(js, list):
                for j in js:
                    key = (j or {}).get("name") or (j or {}).get("key")
                    if key:
                        out[key] = _resolve_judge_agent_ref(y.parent, j)
                        if sources is not None:
                            sources[str(key)] = y
    return out


def _collect_optimizers(paths: List[Path]) -> Dict[str, Any]:
    out: Dict[str, Any] = {}
    for p in paths:
        if p.is_file():
            data = _read_yaml(p)
            if isinstance(data, dict):
                os_ = data.get("optimizers")
                if isinstance(os_, dict):
                    for k, v in os_.items():
                        out[k] = v
                if isinstance(os_, list):
                    for o in os_:
                        key = (o or {}).get("name") or (o or {}).get("key")
                        if key:
                            out[key] = o
            continue
        if not p.is_dir():
            continue
        for y in sorted(list(p.rglob("*.yml"))):
            data = _read_yaml(y)
            if not isinstance(data, dict):
                continue
            os_ = data.get("optimizers")
            if isinstance(os_, dict):
                for k, v in os_.items():
                    out[k] = v
            if isinstance(os_, list):
                for o in os_:
                    key = (o or {}).get("name") or (o or {}).get("key")
                    if key:
                        out[key] = o
    return out


def _collect_metrics(paths: List[Path]) -> List[Dict[str, Any]]:
    out: List[Dict[str, Any]] = []
    for p in paths:
        if p.is_file():
            data = _read_yaml(p)
            if isinstance(data, dict):
                ms = data.get("metrics")
                if isinstance(ms, list):
                    out.extend(ms)
            continue
        if not p.is_dir():
            continue
        for y in sorted(list(p.rglob("*.yml"))):
            data = _read_yaml(y)
            if not isinstance(data, dict):
                continue
            ms = data.get("metrics")
            if isinstance(ms, list):
                out.extend(ms)
    return out


def _collect_base_eval(
    paths: List[Path], judge_sources: Optional[Dict[str, Path]] = None
) -> Dict[str, Any]:
    """Collect the merged ``evaluation.*`` dict from base eval YAML files.

    If ``judge_sources`` is provided, each judge defined inline under
    ``evaluation.judges`` is recorded (last-write-wins). This lets callers
    locate the YAML file an inline judge came from for path resolution.
    """

    def _record_judges(eval_block: Dict[str, Any], src: Path) -> None:
        if judge_sources is None:
            return
        js = eval_block.get("judges")
        if isinstance(js, dict):
            for k in js.keys():
                judge_sources[str(k)] = src
        elif isinstance(js, list):
            for j in js:
                if isinstance(j, dict):
                    key = j.get("name") or j.get("key")
                    if key:
                        judge_sources[str(key)] = src

    result: Dict[str, Any] = {}
    for p in paths:
        if not p.exists():
            continue
        if p.is_file():
            data = _read_yaml(p)
            if isinstance(data, dict):
                if isinstance(data.get("evaluation"), dict):
                    result = _merge_dict(result, data["evaluation"])
                    _record_judges(data["evaluation"], p)
                else:
                    result = _merge_dict(result, data)
            continue
        for y in sorted(list(p.rglob("*.yml"))):
            data = _read_yaml(y)
            if isinstance(data, dict) and isinstance(data.get("evaluation"), dict):
                result = _merge_dict(result, data["evaluation"])
                _record_judges(data["evaluation"], y)
    return result


# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------


def _validate_graph_resources(
    scenarios: List[Dict[str, Any]],
    metrics: List[Dict[str, Any]],
    judges: Dict[str, Any],
) -> Tuple[List[str], List[str], List[str]]:
    scenario_names: List[str] = []
    seen_scenarios: set[str] = set()
    for idx, scenario in enumerate(scenarios):
        if not isinstance(scenario, dict):
            raise ValueError(f"Scenario {idx} invalid type: {type(scenario).__name__}")
        identifier = scenario.get("name") or scenario.get("id")
        if not isinstance(identifier, str) or not identifier.strip():
            raise ValueError(f"Scenario {idx} missing 'name' or 'id'")
        token = identifier.strip()
        if token in seen_scenarios:
            raise ValueError(f"Duplicate scenario identifier '{token}'")
        seen_scenarios.add(token)
        scenario_names.append(token)
        if isinstance(scenario.get("name"), str):
            scenario["name"] = token
        elif isinstance(scenario.get("id"), str):
            scenario["id"] = token
    metric_names: List[str] = []
    seen_metrics: set[str] = set()
    for idx, metric in enumerate(metrics):
        if not isinstance(metric, dict):
            raise ValueError(f"Metric {idx} invalid type: {type(metric).__name__}")
        identifier = metric.get("name") or metric.get("label")
        if not isinstance(identifier, str) or not identifier.strip():
            raise ValueError(f"Metric {idx} missing 'name' or 'label'")
        token = identifier.strip()
        if token in seen_metrics:
            raise ValueError(f"Duplicate metric identifier '{token}'")
        seen_metrics.add(token)
        metric_names.append(token)
        if isinstance(metric.get("name"), str):
            metric["name"] = token
        elif isinstance(metric.get("label"), str):
            metric["label"] = token
    judge_keys: List[str] = []
    invalid_judges: List[str] = []
    for key in judges.keys():
        if not isinstance(key, str) or not key.strip():
            invalid_judges.append(str(key))
        else:
            judge_keys.append(key)
    if invalid_judges:
        raise ValueError(
            "Judge keys must be non-empty strings: " + ", ".join(invalid_judges)
        )
    return scenario_names, metric_names, judge_keys


# ---------------------------------------------------------------------------
# Scenario filtering (public -- used by CLI and Studio API)
# ---------------------------------------------------------------------------


def filter_scenarios(
    items: List[Dict[str, Any]], select: str | None, exclude: str | None
) -> List[Dict[str, Any]]:
    """Filter a scenario list by select/exclude expressions.

    Supports ``id:``, ``name:``, ``tag:``/``tags:``, ``measure:`` prefixes
    with fnmatch glob patterns.  Multiple expressions are comma-separated.
    """
    sel = [s.strip() for s in (select or "").split(",") if s.strip()]
    exc = [s.strip() for s in (exclude or "").split(",") if s.strip()]
    if not sel and not exc:
        return items

    def match_expr(it: Dict[str, Any], expr: str) -> bool:
        if ":" in expr:
            typ, pat = expr.split(":", 1)
            typ = typ.strip().lower()
            pat = pat.strip()
            if typ == "id":
                scenario_id = str(it.get("id") or "")
                return fnmatch.fnmatch(scenario_id, pat)
            if typ == "name":
                nm = str(it.get("name") or it.get("id") or "")
                return fnmatch.fnmatch(nm, pat)
            if typ in ("tag", "tags"):
                tags = [str(t) for t in _ensure_list(it.get("tags"))]
                return any(fnmatch.fnmatch(t, pat) for t in tags)
            if typ == "measure":
                from omniagents.core.eval.measure_tiers import measure_names

                ms = measure_names(it.get("measures"))
                return any(fnmatch.fnmatch(m, pat) for m in ms)
        return fnmatch.fnmatch(str(it.get("name") or ""), expr)

    keep = set(range(len(items)))
    if sel:
        sel_idx = set()
        for i, it in enumerate(items):
            if any(match_expr(it, s) for s in sel):
                sel_idx.add(i)
        keep &= sel_idx
    if exc:
        for i, it in enumerate(items):
            if any(match_expr(it, e) for e in exc):
                keep.discard(i)
    return [items[i] for i in range(len(items)) if i in keep]


# ---------------------------------------------------------------------------
# De-duplication helper for source paths
# ---------------------------------------------------------------------------


def _uniq(ps: List[Path]) -> List[Path]:
    seen: set[str] = set()
    out: List[Path] = []
    for x in ps:
        try:
            key = str(x.resolve())
        except Exception:
            key = str(x)
        if key in seen:
            continue
        seen.add(key)
        out.append(x)
    return out


# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------


def load_eval_config(
    project_path: str,
    select: str | None = None,
    exclude: str | None = None,
    agent_key: Optional[str] = None,
) -> EvalConfig:
    """Load and merge evaluation config at runtime.

    This replaces ``compile_project()`` from ``compiler.py``.  Instead of
    writing a merged YAML to ``target/``, it returns the merged config in
    memory as an :class:`EvalConfig`.

    Parameters
    ----------
    project_path:
        Path to ``project.yml`` or the directory containing it.
    select:
        Comma-separated scenario selectors (e.g. ``"name:foo*,tag:smoke"``).
    exclude:
        Comma-separated scenario exclusions.
    agent_key:
        Agent key to resolve.  ``None`` uses the project entrypoint.
    """
    from omniagents.core.project.runtime import load_project, resolve_agent

    p = Path(project_path).resolve()
    if p.is_file() and p.name != "project.yml":
        raise FileNotFoundError(
            f"Expected 'project.yml' but got '{p.name}'. Rename to project.yml."
        )
    base_dir = p.parent if p.is_file() else p
    if not p.is_file():
        proj_file = base_dir / "project.yml"
        if not proj_file.is_file():
            raise FileNotFoundError(
                f"Could not find project.yml in directory: {base_dir}"
            )

    proj = _read_yaml(p if p.is_file() else (base_dir / "project.yml")) or {}
    paths = proj.get("paths") or {}
    eval_dir = (base_dir / str(paths.get("evaluations", "evaluations"))).resolve()

    # Resolve agent via the project runtime
    prj = load_project(str(p))
    _key, agent_path = resolve_agent(
        prj, agent_key=(str(agent_key) if agent_key else None)
    )

    # Source paths for each resource type
    scenarios_sources = _uniq(
        [
            base_dir / "evaluations" / "scenarios.yml",
            eval_dir / "scenarios.yml",
            eval_dir / "scenarios",
        ]
    )
    judges_sources = _uniq([eval_dir / "judges"])
    optimizers_sources = _uniq([eval_dir / "optimizers"])
    metrics_sources = _uniq(
        [
            base_dir / "evaluations" / "metrics.yml",
            eval_dir / "metrics.yml",
            eval_dir / "metrics",
        ]
    )
    base_eval_sources = _uniq(
        [
            base_dir / "evaluations" / "evaluation.yml",
            eval_dir / "evaluation.yml",
            eval_dir,
        ]
    )

    # Collect everything. Judge source paths are tracked across both base
    # eval (inline `evaluation.judges`) and per-folder judges/, mirroring
    # the merge order so per-folder definitions win when both exist.
    judge_sources: Dict[str, Path] = {}
    base_eval = _collect_base_eval(
        [s for s in base_eval_sources if s.exists()], judge_sources
    )
    scenarios = _collect_scenarios([s for s in scenarios_sources if s.exists()])
    judges = _collect_judges([s for s in judges_sources if s.exists()], judge_sources)
    optimizers = _collect_optimizers([s for s in optimizers_sources if s.exists()])
    metrics = _collect_metrics([s for s in metrics_sources if s.exists()])

    # Filter
    if select or exclude:
        scenarios = filter_scenarios(scenarios, select, exclude)

    # Validate
    _validate_graph_resources(scenarios, metrics, judges)

    # Merge into single eval config dict
    compiled_eval: Dict[str, Any] = {}
    compiled_eval = _merge_dict(compiled_eval, base_eval)
    if judges:
        compiled_eval["judges"] = judges
    if optimizers:
        compiled_eval["optimizers"] = optimizers
    if metrics:
        compiled_eval["metrics"] = metrics
    if scenarios:
        compiled_eval["scenarios"] = scenarios

    # Read agent config
    agent_cfg = _read_yaml(agent_path) or {}

    # Extract tracing config
    tracing_config: Dict[str, Any] = {}
    try:
        if isinstance(proj.get("tracing"), dict):
            tracing_config = dict(proj.get("tracing") or {})
    except Exception:
        pass

    return EvalConfig(
        config=compiled_eval,
        agent_config=agent_cfg,
        agent_path=agent_path.resolve(),
        eval_dir=eval_dir,
        project_root=base_dir.resolve(),
        tracing_config=tracing_config,
        project_config=proj,
        judge_sources=judge_sources,
    )


# ---------------------------------------------------------------------------
# Lightweight introspection (replaces graph.json generation)
# ---------------------------------------------------------------------------


def introspect_resources(eval_config: EvalConfig) -> Dict[str, Any]:
    """Return a resource summary dict (equivalent to the old ``graph.json``).

    This can be used by the ``ls`` command and Studio UI to enumerate
    scenarios, judges, and metrics without writing anything to disk.
    """
    cfg = eval_config.config
    scenarios = cfg.get("scenarios") or []
    judges = cfg.get("judges") or {}
    metrics = cfg.get("metrics") or []

    scenario_names = []
    for sc in scenarios:
        name = (sc.get("name") or sc.get("id") or "") if isinstance(sc, dict) else ""
        if name:
            scenario_names.append(name)

    metric_names = []
    for m in metrics:
        name = (m.get("name") or m.get("label") or "") if isinstance(m, dict) else ""
        if name:
            metric_names.append(name)

    return {
        "project_name": eval_config.project_config.get("name")
        or eval_config.project_root.name,
        "agent_file": str(eval_config.agent_path),
        "resources": {
            "scenarios": scenario_names,
            "judges": list(judges.keys()),
            "metrics": metric_names,
        },
    }