"""ORCH-099 (FND/F1a) — read-only GET /metrics raw-signal endpoint. Covers the four-section envelope (TC-01..TC-04/TC-08/TC-11), never-raise by section/field (TC-05/TC-07), the cost aggregate (TC-06), read-only invariant (TC-09), and additivity vs /health//status//queue (TC-10). Pattern mirrors tests/test_queue_endpoint.py: the async handler is driven via asyncio.run(main.metrics()); the autouse conftest mutes Telegram; a per-test fresh_db points settings.db_path at a tmp file + init_db. """ import asyncio import os import pytest import src.db as db # noqa: E402 from src.db import get_db, init_db # noqa: E402 from src import config as cfg # noqa: E402 from src import metrics as metrics_mod # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): dbfile = tmp_path / "metrics.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) monkeypatch.setattr(cfg.settings, "metrics_endpoint_enabled", True, raising=False) init_db() yield # --- helpers --------------------------------------------------------------- def _make_task(work_item_id="ORCH-1", repo="orchestrator", branch="feature/x", stage="development"): conn = get_db() cur = conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES (?, ?, ?, ?, ?)", (work_item_id, work_item_id, repo, branch, stage), ) tid = cur.lastrowid conn.commit() conn.close() return tid def _make_agent_run(agent="developer", task_id=None, model="claude-opus-4-8", effort="xhigh", cost_usd=None, input_tokens=None, output_tokens=None, cache_read_tokens=None, cache_creation_tokens=None, finished=False): conn = get_db() cur = conn.execute( "INSERT INTO agent_runs (task_id, agent, model, effort, cost_usd, " "input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, " "finished_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, " + ("datetime('now')" if finished else "NULL") + ")", (task_id, agent, model, effort, cost_usd, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens), ) rid = cur.lastrowid conn.commit() conn.close() return rid def _make_running_job(agent="developer", repo="orchestrator", task_id=None, pid=None, run_id=None, age_s=0, attempts=0, max_attempts=2): conn = get_db() cur = conn.execute( "INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, " "run_id, pid, started_at) " "VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))", (agent, repo, task_id, attempts, max_attempts, run_id, pid, f"-{int(age_s)} seconds"), ) job_id = cur.lastrowid conn.commit() conn.close() return job_id def _db_snapshot(): """Full row snapshot of the mutable tables for the read-only assertion.""" conn = get_db() snap = {} for table in ("tasks", "jobs", "agent_runs"): rows = conn.execute(f"SELECT * FROM {table} ORDER BY id").fetchall() snap[table] = [dict(r) for r in rows] conn.close() return snap # --- TC-01: envelope keys -------------------------------------------------- def test_tc01_envelope_has_all_sections(): m = metrics_mod.build_metrics() assert isinstance(m, dict) for key in ("schema_version", "generated_at", "stages", "queue", "agents", "cost"): assert key in m, f"missing envelope key {key!r}" assert m["schema_version"] == 1 assert isinstance(m["stages"], list) assert isinstance(m["agents"], list) assert isinstance(m["queue"], dict) assert isinstance(m["cost"], dict) # --- TC-02: stages section + terminal exclusion ---------------------------- def test_tc02_stages_active_only_with_fields(): _make_task(work_item_id="ORCH-10", stage="development", repo="orchestrator") _make_task(work_item_id="ORCH-11", stage="done") # terminal -> excluded _make_task(work_item_id="ORCH-12", stage="cancelled") # terminal -> excluded stages = metrics_mod.build_metrics()["stages"] wis = {s["work_item"] for s in stages} assert "ORCH-10" in wis assert "ORCH-11" not in wis assert "ORCH-12" not in wis item = next(s for s in stages if s["work_item"] == "ORCH-10") assert item["stage"] == "development" assert item["repo"] == "orchestrator" assert isinstance(item["age_in_stage_s"], int) # --- TC-03: queue section -------------------------------------------------- def test_tc03_queue_section_fields(): q = metrics_mod.build_metrics()["queue"] assert "counts" in q counts = q["counts"] for k in ("queued", "running", "failed", "cancelled"): assert k in counts assert q["max_concurrency"] is not None assert "retries" in q and isinstance(q["retries"], dict) assert "in_backoff" in q["retries"] # breaker snapshot present (worker is the module singleton, initialised) assert q["breaker"] is not None for k in ("state", "consecutive_transient", "pause_remaining_s"): assert k in q["breaker"] # --- TC-04: agents liveness section ---------------------------------------- def test_tc04_agents_liveness_fields(): tid = _make_task(work_item_id="ORCH-20") rid = _make_agent_run(task_id=tid, model="claude-opus-4-8", effort="xhigh") # use our own (alive) pid so cpu_ticks is a real integer _make_running_job(task_id=tid, pid=os.getpid(), run_id=rid, age_s=5) agents = metrics_mod.build_metrics()["agents"] assert len(agents) == 1 a = agents[0] for k in ("agent", "run_id", "job_id", "pid", "runtime_s", "model", "effort", "cpu_ticks"): assert k in a, f"agent entry missing {k!r}" assert a["agent"] == "developer" assert a["run_id"] == rid assert a["pid"] == os.getpid() assert isinstance(a["runtime_s"], int) # alive pid -> real cpu ticks (int), basis present at envelope level assert isinstance(a["cpu_ticks"], int) assert metrics_mod.build_metrics()["clk_tck"] is not None # --- TC-05: agent-liveness never-raise on dead/None pid -------------------- def test_tc05_dead_or_none_pid_cpu_ticks_null(): tid = _make_task(work_item_id="ORCH-21") rid = _make_agent_run(task_id=tid) # pid=None -> cpu_ticks null; a very-unlikely-live pid -> /proc absent -> null _make_running_job(task_id=tid, pid=None, run_id=rid) _make_running_job(task_id=tid, pid=999999, run_id=rid) m = metrics_mod.build_metrics() agents = m["agents"] assert len(agents) == 2 for a in agents: assert a["cpu_ticks"] is None # field degraded, not an exception assert a["agent"] == "developer" # other fields intact # whole envelope still valid assert m["schema_version"] == 1 def test_tc05_read_cpu_ticks_helper_none_paths(): assert metrics_mod._read_cpu_ticks(None) is None assert metrics_mod._read_cpu_ticks(999999) is None # alive pid (this process) -> int assert isinstance(metrics_mod._read_cpu_ticks(os.getpid()), int) # --- TC-06: cost aggregate ------------------------------------------------- def test_tc06_cost_aggregate_sums_and_empty_zeros(): # empty agent_runs -> zeros, not error agg0 = metrics_mod.build_metrics()["cost"]["aggregate"] for k in ("cost_usd", "input_tokens", "output_tokens", "cache_read_tokens", "cache_creation_tokens"): assert agg0[k] == 0 tid = _make_task(work_item_id="ORCH-30") _make_agent_run(task_id=tid, cost_usd=1.5, input_tokens=100, output_tokens=20, cache_read_tokens=5, cache_creation_tokens=7, finished=True) _make_agent_run(task_id=tid, cost_usd=2.5, input_tokens=200, output_tokens=30, cache_read_tokens=10, cache_creation_tokens=3, finished=True) agg = metrics_mod.build_metrics()["cost"]["aggregate"] assert agg["cost_usd"] == 4.0 assert agg["input_tokens"] == 300 assert agg["output_tokens"] == 50 assert agg["cache_read_tokens"] == 15 assert agg["cache_creation_tokens"] == 10 # --- TC-07: never-raise when a section source throws ----------------------- def test_tc07_section_source_throws_degrades_not_500(monkeypatch): def _boom(*a, **k): raise RuntimeError("simulated source failure") # queue counts source throws -> queue.counts null, build_metrics still returns monkeypatch.setattr(db, "job_status_counts", _boom) # cost aggregate source throws -> cost.aggregate null monkeypatch.setattr(db, "agent_cost_totals", _boom) # stages source throws -> stages [] monkeypatch.setattr(db, "get_active_tasks_for_reconcile", _boom) m = metrics_mod.build_metrics() assert m["schema_version"] == 1 # never raised assert m["stages"] == [] assert m["queue"]["counts"] is None assert m["cost"]["aggregate"] is None def test_tc07_breaker_unavailable_is_null(monkeypatch): from src import queue_worker # simulate an uninitialised / broken worker breaker monkeypatch.setattr(queue_worker.worker.breaker, "snapshot", lambda: (_ for _ in ()).throw(RuntimeError("no breaker"))) q = metrics_mod.build_metrics()["queue"] assert q["breaker"] is None # null, not 500 # --- TC-08: GET /metrics via handler returns valid JSON -------------------- def test_tc08_endpoint_returns_full_payload(): tid = _make_task(work_item_id="ORCH-40") rid = _make_agent_run(task_id=tid) _make_running_job(task_id=tid, pid=os.getpid(), run_id=rid) from src import main payload = asyncio.run(main.metrics()) assert payload["schema_version"] == 1 assert isinstance(payload["stages"], list) and len(payload["stages"]) == 1 assert isinstance(payload["agents"], list) and len(payload["agents"]) == 1 assert "aggregate" in payload["cost"] assert "counts" in payload["queue"] def test_tc08_kill_switch_minimal_body(monkeypatch): monkeypatch.setattr(cfg.settings, "metrics_endpoint_enabled", False, raising=False) from src import main payload = asyncio.run(main.metrics()) assert payload == {"schema_version": 1, "enabled": False} # --- TC-09: read-only invariant -------------------------------------------- def test_tc09_metrics_is_read_only(): tid = _make_task(work_item_id="ORCH-50") rid = _make_agent_run(task_id=tid, cost_usd=1.0, input_tokens=10) _make_running_job(task_id=tid, pid=os.getpid(), run_id=rid) from src import main before = _db_snapshot() asyncio.run(main.metrics()) asyncio.run(main.metrics()) # repeat: state must not change after = _db_snapshot() assert before == after, "/metrics must not mutate any DB state" # --- TC-10: additivity vs existing endpoints ------------------------------- def test_tc10_existing_endpoints_intact(): from src import main health = asyncio.run(main.health()) assert health["status"] == "ok" status = asyncio.run(main.status()) assert "active_tasks" in status queue = asyncio.run(main.queue()) for key in ("counts", "max_concurrency", "poll_interval", "resilience", "reconcile", "reaper", "serial_gate", "recent"): assert key in queue, f"/queue lost existing key {key!r}" # --- TC-11: empty state is valid ------------------------------------------- def test_tc11_empty_state_valid(): m = metrics_mod.build_metrics() assert m["stages"] == [] assert m["agents"] == [] assert m["cost"]["running"] == [] agg = m["cost"]["aggregate"] assert all(agg[k] == 0 for k in agg) counts = m["queue"]["counts"] assert counts["queued"] == 0 and counts["running"] == 0