Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from __future__ import annotations

import argparse
import json
import sqlite3
import sys
from typing import Any

from omniagents.core.paths import get_sessions_db_path
from omniagents.core.session import load_history
from omniagents.core.session.query import (
    count_sessions,
    ensure_sessions_schema,
    parse_datetime_arg,
    populate_session_stats,
    query_sessions,
    search_session_messages,
)


def dump_json(value: Any, *, pretty: bool) -> str:
    if pretty:
        return json.dumps(value, ensure_ascii=False, indent=2, sort_keys=True)
    return json.dumps(value, ensure_ascii=False, separators=(",", ":"), sort_keys=True)


def coerce_limit(value: int | None) -> int | None:
    if value is None:
        return None
    try:
        out = int(value)
    except (TypeError, ValueError):
        return None
    if out <= 0:
        return None
    return out


def coerce_nonnegative_int(value: int | None) -> int:
    if value is None:
        return 0
    try:
        out = int(value)
    except (TypeError, ValueError):
        return 0
    if out < 0:
        return 0
    return out


def add_sessions_subparser(subparsers) -> argparse.ArgumentParser:
    parser = subparsers.add_parser("sessions", help="Inspect OmniAgents sessions")
    parser.set_defaults(command="sessions")
    sessions_sub = parser.add_subparsers(dest="sessions_cmd")

    db_path = sessions_sub.add_parser("db-path", help="Print the sessions DB path")
    db_path.add_argument("--project", default=None, help="Project slug")
    db_path.add_argument("--agent", default=None, help="Agent slug")

    list_parser = sessions_sub.add_parser("list", help="List sessions")
    list_parser.add_argument("--project", default=None, help="Project slug")
    list_parser.add_argument("--agent", default=None, help="Agent slug")
    list_parser.add_argument("--include-archived", action="store_true")
    list_parser.add_argument("--after", default=None)
    list_parser.add_argument("--before", default=None)
    list_parser.add_argument("--offset", type=int, default=0)
    list_parser.add_argument("--limit", type=int, default=20)
    list_parser.add_argument("--all", action="store_true")
    list_parser.add_argument("--stats", action="store_true")
    list_parser.add_argument("--text", action="store_true")
    list_parser.add_argument("--pretty", action="store_true")

    search_parser = sessions_sub.add_parser("search", help="Search session messages")
    search_parser.add_argument("query")
    search_parser.add_argument("--project", default=None, help="Project slug")
    search_parser.add_argument("--agent", default=None, help="Agent slug")
    search_parser.add_argument(
        "--role", default="any", choices=["any", "user", "assistant"]
    )
    search_parser.add_argument("--include-archived", action="store_true")
    search_parser.add_argument("--after", default=None)
    search_parser.add_argument("--before", default=None)
    search_parser.add_argument("--offset", type=int, default=0)
    search_parser.add_argument("--limit", type=int, default=20)
    search_parser.add_argument("--text", action="store_true")
    search_parser.add_argument("--pretty", action="store_true")

    export_parser = sessions_sub.add_parser(
        "export", help="Export a session transcript"
    )
    export_parser.add_argument("session_id")
    export_parser.add_argument("--project", default=None, help="Project slug")
    export_parser.add_argument("--agent", default=None, help="Agent slug")
    export_parser.add_argument("--format", choices=["json", "jsonl"], default="jsonl")
    export_parser.add_argument("--pretty", action="store_true")
    return parser


def handle_sessions_command(args: argparse.Namespace) -> int:
    if not getattr(args, "sessions_cmd", None):
        print("A sessions subcommand is required.")
        return 1

    project = getattr(args, "project", None)
    agent = getattr(args, "agent", None)

    if args.sessions_cmd == "db-path":
        print(str(get_sessions_db_path(project, agent)))
        return 0

    if args.sessions_cmd == "list":
        ensure_sessions_schema(project=project, agent=agent)
        after = parse_datetime_arg(args.after)
        if args.after and after is None:
            print(
                "Invalid --after datetime. Use ISO-8601 like 2026-02-18 or 2026-02-18T15:04:05Z"
            )
            return 1
        before = parse_datetime_arg(args.before)
        if args.before and before is None:
            print(
                "Invalid --before datetime. Use ISO-8601 like 2026-02-18 or 2026-02-18T15:04:05Z"
            )
            return 1

        limit = None if args.all else coerce_limit(args.limit)
        offset = coerce_nonnegative_int(args.offset)
        sessions = query_sessions(
            project=project,
            agent=agent,
            include_archived=args.include_archived,
            after=after,
            before=before,
            limit=limit,
            offset=offset,
        )
        if args.stats:
            populate_session_stats(project=project, agent=agent, sessions=sessions)
        total = count_sessions(
            project=project,
            agent=agent,
            include_archived=args.include_archived,
            after=after,
            before=before,
        )
        if args.text:
            if not sessions:
                print("No sessions.")
                return 0
            print(f"  {'Session ID':<36} {'Created':<20} {'Msgs':>5} {'Arch'}")
            print(f"  {'-'*36} {'-'*20} {'-'*5} {'-'*4}")
            for session in sessions:
                created = str(session.get("created_at") or "")[:19]
                msg_count = session.get("message_count")
                msg_n = (
                    str(msg_count)
                    if isinstance(msg_count, int)
                    else ("?" if args.stats else "-")
                )
                arch = "yes" if session.get("archived") else "-"
                print(f"  {session.get('id',''):<36} {created:<20} {msg_n:>5} {arch}")
            shown = len(sessions)
            if args.all:
                print(f"Showing {shown} of {total} sessions.", file=sys.stderr)
            else:
                print(
                    f"Showing {shown} of {total} sessions (offset {offset}, limit {limit or 0}).",
                    file=sys.stderr,
                )
            return 0
        print(dump_json(sessions, pretty=args.pretty))
        return 0

    if args.sessions_cmd == "search":
        ensure_sessions_schema(project=project, agent=agent)
        after = parse_datetime_arg(args.after)
        if args.after and after is None:
            print(
                "Invalid --after datetime. Use ISO-8601 like 2026-02-18 or 2026-02-18T15:04:05Z"
            )
            return 1
        before = parse_datetime_arg(args.before)
        if args.before and before is None:
            print(
                "Invalid --before datetime. Use ISO-8601 like 2026-02-18 or 2026-02-18T15:04:05Z"
            )
            return 1
        results = search_session_messages(
            project=project,
            agent=agent,
            query=(args.query or "").strip(),
            role=args.role,
            include_archived=args.include_archived,
            after=after,
            before=before,
            limit=coerce_limit(args.limit) or 50,
            offset=coerce_nonnegative_int(args.offset),
        )
        if args.text:
            if not results:
                print("No matches.")
                return 0
            for item in results:
                ts = item.get("timestamp") or ""
                sid = item.get("session_id")
                role = item.get("role")
                content = (item.get("content") or "").strip().replace("\n", " ")
                if len(content) > 160:
                    content = content[:157] + "..."
                print(f"{ts} {sid} [{role}] {content}")
            return 0
        print(dump_json(results, pretty=args.pretty))
        return 0

    if args.sessions_cmd == "export":
        ensure_sessions_schema(project=project, agent=agent)
        db_path = get_sessions_db_path(project, agent)
        if not db_path.exists():
            print(f"Session not found: {args.session_id}")
            return 1
        conn = sqlite3.connect(str(db_path))
        try:
            exists = (
                conn.execute(
                    "SELECT 1 FROM sessions WHERE session_id=?", (args.session_id,)
                ).fetchone()
                is not None
            )
        finally:
            conn.close()
        if not exists:
            print(f"Session not found: {args.session_id}")
            return 1
        history = load_history(args.session_id, project_slug=project, agent_slug=agent)
        if args.format == "json":
            print(dump_json(history, pretty=args.pretty))
            return 0
        for item in history:
            print(dump_json(item, pretty=False))
        return 0

    print(f"Unknown sessions subcommand: {args.sessions_cmd}")
    return 1