FND/F1a: add a versioned read-only JSON endpoint GET /metrics that exposes the
orchestrator's own raw state for the future observability sidecar F1b — active
task stages, job queue, agent-liveness (pid/runtime/cpu_ticks), and cost/tokens.
The orchestrator emits ONLY raw signal it alone knows; thresholds/alerts/history
live in the separate sidecar (observer separated from observed, BRD §1).
- src/metrics.py: new leaf collector build_metrics() (never-raise per section,
serial_gate.snapshot() pattern); envelope schema_version/generated_at/clk_tck +
stages/queue/agents/cost. _read_cpu_ticks(pid) reads utime+stime from
/proc/<pid>/stat (null on None/dead/non-Linux pid — never raises).
- src/main.py: thin @app.get("/metrics") wrapper (style of GET /queue).
- src/db.py: read-only helpers get_running_agents() (dedicated SELECT, not an
extension of the hot-path get_running_jobs()), agent_cost_totals(),
queue_retry_stats(); job_status_counts() default dict gains the cancelled key.
- src/config.py: metrics_endpoint_enabled kill-switch (default True), env
ORCH_METRICS_ENABLED via explicit validation_alias so the documented switch
actually controls the flag.
- docs: README API table row + CHANGELOG entry (contract section already added
by architect); .env.example ORCH_METRICS_ENABLED.
Strictly read-only / never-raise: STAGE_TRANSITIONS / QG_CHECKS / check_* /
machine-verdict keys / DB schema untouched; /health//status//queue byte-for-byte.
Tests: tests/test_metrics.py (TC-01..TC-11) + env-alias tests in test_config.py.
Full suite green (1482).
Refs: ORCH-099
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
296 lines
12 KiB
Python
296 lines
12 KiB
Python
"""ORCH-099 (FND/F1a) — read-only GET /metrics raw-signal endpoint.
|
|
|
|
Covers the four-section envelope (TC-01..TC-04/TC-08/TC-11), never-raise by
|
|
section/field (TC-05/TC-07), the cost aggregate (TC-06), read-only invariant
|
|
(TC-09), and additivity vs /health//status//queue (TC-10).
|
|
|
|
Pattern mirrors tests/test_queue_endpoint.py: the async handler is driven via
|
|
asyncio.run(main.metrics()); the autouse conftest mutes Telegram; a per-test
|
|
fresh_db points settings.db_path at a tmp file + init_db.
|
|
"""
|
|
import asyncio
|
|
import os
|
|
|
|
import pytest
|
|
|
|
import src.db as db # noqa: E402
|
|
from src.db import get_db, init_db # noqa: E402
|
|
from src import config as cfg # noqa: E402
|
|
from src import metrics as metrics_mod # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_db(tmp_path, monkeypatch):
|
|
dbfile = tmp_path / "metrics.db"
|
|
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
|
|
monkeypatch.setattr(cfg.settings, "metrics_endpoint_enabled", True, raising=False)
|
|
init_db()
|
|
yield
|
|
|
|
|
|
# --- helpers ---------------------------------------------------------------
|
|
def _make_task(work_item_id="ORCH-1", repo="orchestrator",
|
|
branch="feature/x", stage="development"):
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(work_item_id, work_item_id, repo, branch, stage),
|
|
)
|
|
tid = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return tid
|
|
|
|
|
|
def _make_agent_run(agent="developer", task_id=None, model="claude-opus-4-8",
|
|
effort="xhigh", cost_usd=None, input_tokens=None,
|
|
output_tokens=None, cache_read_tokens=None,
|
|
cache_creation_tokens=None, finished=False):
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"INSERT INTO agent_runs (task_id, agent, model, effort, cost_usd, "
|
|
"input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, "
|
|
"finished_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, "
|
|
+ ("datetime('now')" if finished else "NULL") + ")",
|
|
(task_id, agent, model, effort, cost_usd, input_tokens, output_tokens,
|
|
cache_read_tokens, cache_creation_tokens),
|
|
)
|
|
rid = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return rid
|
|
|
|
|
|
def _make_running_job(agent="developer", repo="orchestrator", task_id=None,
|
|
pid=None, run_id=None, age_s=0, attempts=0, max_attempts=2):
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"INSERT INTO jobs (agent, repo, task_id, status, attempts, max_attempts, "
|
|
"run_id, pid, started_at) "
|
|
"VALUES (?, ?, ?, 'running', ?, ?, ?, ?, datetime('now', ?))",
|
|
(agent, repo, task_id, attempts, max_attempts, run_id, pid,
|
|
f"-{int(age_s)} seconds"),
|
|
)
|
|
job_id = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return job_id
|
|
|
|
|
|
def _db_snapshot():
|
|
"""Full row snapshot of the mutable tables for the read-only assertion."""
|
|
conn = get_db()
|
|
snap = {}
|
|
for table in ("tasks", "jobs", "agent_runs"):
|
|
rows = conn.execute(f"SELECT * FROM {table} ORDER BY id").fetchall()
|
|
snap[table] = [dict(r) for r in rows]
|
|
conn.close()
|
|
return snap
|
|
|
|
|
|
# --- TC-01: envelope keys --------------------------------------------------
|
|
def test_tc01_envelope_has_all_sections():
|
|
m = metrics_mod.build_metrics()
|
|
assert isinstance(m, dict)
|
|
for key in ("schema_version", "generated_at", "stages", "queue", "agents", "cost"):
|
|
assert key in m, f"missing envelope key {key!r}"
|
|
assert m["schema_version"] == 1
|
|
assert isinstance(m["stages"], list)
|
|
assert isinstance(m["agents"], list)
|
|
assert isinstance(m["queue"], dict)
|
|
assert isinstance(m["cost"], dict)
|
|
|
|
|
|
# --- TC-02: stages section + terminal exclusion ----------------------------
|
|
def test_tc02_stages_active_only_with_fields():
|
|
_make_task(work_item_id="ORCH-10", stage="development", repo="orchestrator")
|
|
_make_task(work_item_id="ORCH-11", stage="done") # terminal -> excluded
|
|
_make_task(work_item_id="ORCH-12", stage="cancelled") # terminal -> excluded
|
|
|
|
stages = metrics_mod.build_metrics()["stages"]
|
|
wis = {s["work_item"] for s in stages}
|
|
assert "ORCH-10" in wis
|
|
assert "ORCH-11" not in wis
|
|
assert "ORCH-12" not in wis
|
|
|
|
item = next(s for s in stages if s["work_item"] == "ORCH-10")
|
|
assert item["stage"] == "development"
|
|
assert item["repo"] == "orchestrator"
|
|
assert isinstance(item["age_in_stage_s"], int)
|
|
|
|
|
|
# --- TC-03: queue section --------------------------------------------------
|
|
def test_tc03_queue_section_fields():
|
|
q = metrics_mod.build_metrics()["queue"]
|
|
assert "counts" in q
|
|
counts = q["counts"]
|
|
for k in ("queued", "running", "failed", "cancelled"):
|
|
assert k in counts
|
|
assert q["max_concurrency"] is not None
|
|
assert "retries" in q and isinstance(q["retries"], dict)
|
|
assert "in_backoff" in q["retries"]
|
|
# breaker snapshot present (worker is the module singleton, initialised)
|
|
assert q["breaker"] is not None
|
|
for k in ("state", "consecutive_transient", "pause_remaining_s"):
|
|
assert k in q["breaker"]
|
|
|
|
|
|
# --- TC-04: agents liveness section ----------------------------------------
|
|
def test_tc04_agents_liveness_fields():
|
|
tid = _make_task(work_item_id="ORCH-20")
|
|
rid = _make_agent_run(task_id=tid, model="claude-opus-4-8", effort="xhigh")
|
|
# use our own (alive) pid so cpu_ticks is a real integer
|
|
_make_running_job(task_id=tid, pid=os.getpid(), run_id=rid, age_s=5)
|
|
|
|
agents = metrics_mod.build_metrics()["agents"]
|
|
assert len(agents) == 1
|
|
a = agents[0]
|
|
for k in ("agent", "run_id", "job_id", "pid", "runtime_s", "model", "effort", "cpu_ticks"):
|
|
assert k in a, f"agent entry missing {k!r}"
|
|
assert a["agent"] == "developer"
|
|
assert a["run_id"] == rid
|
|
assert a["pid"] == os.getpid()
|
|
assert isinstance(a["runtime_s"], int)
|
|
# alive pid -> real cpu ticks (int), basis present at envelope level
|
|
assert isinstance(a["cpu_ticks"], int)
|
|
assert metrics_mod.build_metrics()["clk_tck"] is not None
|
|
|
|
|
|
# --- TC-05: agent-liveness never-raise on dead/None pid --------------------
|
|
def test_tc05_dead_or_none_pid_cpu_ticks_null():
|
|
tid = _make_task(work_item_id="ORCH-21")
|
|
rid = _make_agent_run(task_id=tid)
|
|
# pid=None -> cpu_ticks null; a very-unlikely-live pid -> /proc absent -> null
|
|
_make_running_job(task_id=tid, pid=None, run_id=rid)
|
|
_make_running_job(task_id=tid, pid=999999, run_id=rid)
|
|
|
|
m = metrics_mod.build_metrics()
|
|
agents = m["agents"]
|
|
assert len(agents) == 2
|
|
for a in agents:
|
|
assert a["cpu_ticks"] is None # field degraded, not an exception
|
|
assert a["agent"] == "developer" # other fields intact
|
|
# whole envelope still valid
|
|
assert m["schema_version"] == 1
|
|
|
|
|
|
def test_tc05_read_cpu_ticks_helper_none_paths():
|
|
assert metrics_mod._read_cpu_ticks(None) is None
|
|
assert metrics_mod._read_cpu_ticks(999999) is None
|
|
# alive pid (this process) -> int
|
|
assert isinstance(metrics_mod._read_cpu_ticks(os.getpid()), int)
|
|
|
|
|
|
# --- TC-06: cost aggregate -------------------------------------------------
|
|
def test_tc06_cost_aggregate_sums_and_empty_zeros():
|
|
# empty agent_runs -> zeros, not error
|
|
agg0 = metrics_mod.build_metrics()["cost"]["aggregate"]
|
|
for k in ("cost_usd", "input_tokens", "output_tokens",
|
|
"cache_read_tokens", "cache_creation_tokens"):
|
|
assert agg0[k] == 0
|
|
|
|
tid = _make_task(work_item_id="ORCH-30")
|
|
_make_agent_run(task_id=tid, cost_usd=1.5, input_tokens=100, output_tokens=20,
|
|
cache_read_tokens=5, cache_creation_tokens=7, finished=True)
|
|
_make_agent_run(task_id=tid, cost_usd=2.5, input_tokens=200, output_tokens=30,
|
|
cache_read_tokens=10, cache_creation_tokens=3, finished=True)
|
|
|
|
agg = metrics_mod.build_metrics()["cost"]["aggregate"]
|
|
assert agg["cost_usd"] == 4.0
|
|
assert agg["input_tokens"] == 300
|
|
assert agg["output_tokens"] == 50
|
|
assert agg["cache_read_tokens"] == 15
|
|
assert agg["cache_creation_tokens"] == 10
|
|
|
|
|
|
# --- TC-07: never-raise when a section source throws -----------------------
|
|
def test_tc07_section_source_throws_degrades_not_500(monkeypatch):
|
|
def _boom(*a, **k):
|
|
raise RuntimeError("simulated source failure")
|
|
|
|
# queue counts source throws -> queue.counts null, build_metrics still returns
|
|
monkeypatch.setattr(db, "job_status_counts", _boom)
|
|
# cost aggregate source throws -> cost.aggregate null
|
|
monkeypatch.setattr(db, "agent_cost_totals", _boom)
|
|
# stages source throws -> stages []
|
|
monkeypatch.setattr(db, "get_active_tasks_for_reconcile", _boom)
|
|
|
|
m = metrics_mod.build_metrics()
|
|
assert m["schema_version"] == 1 # never raised
|
|
assert m["stages"] == []
|
|
assert m["queue"]["counts"] is None
|
|
assert m["cost"]["aggregate"] is None
|
|
|
|
|
|
def test_tc07_breaker_unavailable_is_null(monkeypatch):
|
|
from src import queue_worker
|
|
# simulate an uninitialised / broken worker breaker
|
|
monkeypatch.setattr(queue_worker.worker.breaker, "snapshot",
|
|
lambda: (_ for _ in ()).throw(RuntimeError("no breaker")))
|
|
q = metrics_mod.build_metrics()["queue"]
|
|
assert q["breaker"] is None # null, not 500
|
|
|
|
|
|
# --- TC-08: GET /metrics via handler returns valid JSON --------------------
|
|
def test_tc08_endpoint_returns_full_payload():
|
|
tid = _make_task(work_item_id="ORCH-40")
|
|
rid = _make_agent_run(task_id=tid)
|
|
_make_running_job(task_id=tid, pid=os.getpid(), run_id=rid)
|
|
|
|
from src import main
|
|
payload = asyncio.run(main.metrics())
|
|
assert payload["schema_version"] == 1
|
|
assert isinstance(payload["stages"], list) and len(payload["stages"]) == 1
|
|
assert isinstance(payload["agents"], list) and len(payload["agents"]) == 1
|
|
assert "aggregate" in payload["cost"]
|
|
assert "counts" in payload["queue"]
|
|
|
|
|
|
def test_tc08_kill_switch_minimal_body(monkeypatch):
|
|
monkeypatch.setattr(cfg.settings, "metrics_endpoint_enabled", False, raising=False)
|
|
from src import main
|
|
payload = asyncio.run(main.metrics())
|
|
assert payload == {"schema_version": 1, "enabled": False}
|
|
|
|
|
|
# --- TC-09: read-only invariant --------------------------------------------
|
|
def test_tc09_metrics_is_read_only():
|
|
tid = _make_task(work_item_id="ORCH-50")
|
|
rid = _make_agent_run(task_id=tid, cost_usd=1.0, input_tokens=10)
|
|
_make_running_job(task_id=tid, pid=os.getpid(), run_id=rid)
|
|
|
|
from src import main
|
|
before = _db_snapshot()
|
|
asyncio.run(main.metrics())
|
|
asyncio.run(main.metrics()) # repeat: state must not change
|
|
after = _db_snapshot()
|
|
assert before == after, "/metrics must not mutate any DB state"
|
|
|
|
|
|
# --- TC-10: additivity vs existing endpoints -------------------------------
|
|
def test_tc10_existing_endpoints_intact():
|
|
from src import main
|
|
health = asyncio.run(main.health())
|
|
assert health["status"] == "ok"
|
|
|
|
status = asyncio.run(main.status())
|
|
assert "active_tasks" in status
|
|
|
|
queue = asyncio.run(main.queue())
|
|
for key in ("counts", "max_concurrency", "poll_interval", "resilience",
|
|
"reconcile", "reaper", "serial_gate", "recent"):
|
|
assert key in queue, f"/queue lost existing key {key!r}"
|
|
|
|
|
|
# --- TC-11: empty state is valid -------------------------------------------
|
|
def test_tc11_empty_state_valid():
|
|
m = metrics_mod.build_metrics()
|
|
assert m["stages"] == []
|
|
assert m["agents"] == []
|
|
assert m["cost"]["running"] == []
|
|
agg = m["cost"]["aggregate"]
|
|
assert all(agg[k] == 0 for k in agg)
|
|
counts = m["queue"]["counts"]
|
|
assert counts["queued"] == 0 and counts["running"] == 0
|