feat(metrics): lightweight read-only GET /metrics raw-signal endpoint (ORCH-099)
FND/F1a: add a versioned read-only JSON endpoint GET /metrics that exposes the
orchestrator's own raw state for the future observability sidecar F1b — active
task stages, job queue, agent-liveness (pid/runtime/cpu_ticks), and cost/tokens.
The orchestrator emits ONLY raw signal it alone knows; thresholds/alerts/history
live in the separate sidecar (observer separated from observed, BRD §1).
- src/metrics.py: new leaf collector build_metrics() (never-raise per section,
serial_gate.snapshot() pattern); envelope schema_version/generated_at/clk_tck +
stages/queue/agents/cost. _read_cpu_ticks(pid) reads utime+stime from
/proc/<pid>/stat (null on None/dead/non-Linux pid — never raises).
- src/main.py: thin @app.get("/metrics") wrapper (style of GET /queue).
- src/db.py: read-only helpers get_running_agents() (dedicated SELECT, not an
extension of the hot-path get_running_jobs()), agent_cost_totals(),
queue_retry_stats(); job_status_counts() default dict gains the cancelled key.
- src/config.py: metrics_endpoint_enabled kill-switch (default True), env
ORCH_METRICS_ENABLED via explicit validation_alias so the documented switch
actually controls the flag.
- docs: README API table row + CHANGELOG entry (contract section already added
by architect); .env.example ORCH_METRICS_ENABLED.
Strictly read-only / never-raise: STAGE_TRANSITIONS / QG_CHECKS / check_* /
machine-verdict keys / DB schema untouched; /health//status//queue byte-for-byte.
Tests: tests/test_metrics.py (TC-01..TC-11) + env-alias tests in test_config.py.
Full suite green (1482).
Refs: ORCH-099
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
105
src/db.py
105
src/db.py
@@ -1133,6 +1133,100 @@ def get_running_jobs() -> list[dict]:
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def get_running_agents() -> list[dict]:
|
||||
"""ORCH-099 (D5): read-only liveness snapshot of every 'running' job for /metrics.
|
||||
|
||||
A dedicated read-only SELECT — deliberately NOT an extension of
|
||||
``get_running_jobs()`` (the job-reaper hot path, ORCH-065): widening that
|
||||
query under observability needs would migrate a foreign component's invariant.
|
||||
Each row carries the process identity + cost context the F1b sidecar needs:
|
||||
* ``job_id`` / ``run_id`` / ``pid`` — process identity (pid may be NULL until
|
||||
the launcher stamps it / after the process exits);
|
||||
* ``agent`` / ``repo`` — role and project (the sidecar is multi-project);
|
||||
* ``running_age_s`` — seconds since ``jobs.started_at`` (the same process
|
||||
anchor the reaper uses for backstop-liveness, D6);
|
||||
* ``model`` / ``effort`` — cost context (LEFT JOIN ``agent_runs``);
|
||||
* the token / ``cost_usd`` columns — current per-run accruals, usually NULL
|
||||
until the launcher parses the CLI result JSON on finish (honest raw, TR-5).
|
||||
|
||||
A LEFT JOIN on ``run_id`` keeps a job with no ``agent_runs`` row. Read-only;
|
||||
never mutates.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT j.id AS job_id, j.run_id AS run_id, j.pid AS pid, "
|
||||
"j.agent AS agent, j.repo AS repo, j.started_at AS started_at, "
|
||||
"CAST(strftime('%s','now') - strftime('%s', j.started_at) AS INTEGER) "
|
||||
" AS running_age_s, "
|
||||
"r.model AS model, r.effort AS effort, r.cost_usd AS cost_usd, "
|
||||
"r.input_tokens AS input_tokens, r.output_tokens AS output_tokens, "
|
||||
"r.cache_read_tokens AS cache_read_tokens, "
|
||||
"r.cache_creation_tokens AS cache_creation_tokens "
|
||||
"FROM jobs j LEFT JOIN agent_runs r ON r.id = j.run_id "
|
||||
"WHERE j.status='running'"
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def agent_cost_totals() -> dict:
|
||||
"""ORCH-099 (D7): read-only aggregate of cost / tokens over all agent_runs.
|
||||
|
||||
Pure ``SELECT COALESCE(SUM(...),0)`` — an empty ``agent_runs`` table yields
|
||||
zeros, never an error (TC-06 / TC-11). Read-only; never mutates.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT "
|
||||
"COALESCE(SUM(cost_usd),0) AS cost_usd, "
|
||||
"COALESCE(SUM(input_tokens),0) AS input_tokens, "
|
||||
"COALESCE(SUM(output_tokens),0) AS output_tokens, "
|
||||
"COALESCE(SUM(cache_read_tokens),0) AS cache_read_tokens, "
|
||||
"COALESCE(SUM(cache_creation_tokens),0) AS cache_creation_tokens "
|
||||
"FROM agent_runs"
|
||||
).fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
return dict(row) if row else {
|
||||
"cost_usd": 0,
|
||||
"input_tokens": 0,
|
||||
"output_tokens": 0,
|
||||
"cache_read_tokens": 0,
|
||||
"cache_creation_tokens": 0,
|
||||
}
|
||||
|
||||
|
||||
def queue_retry_stats() -> dict:
|
||||
"""ORCH-099 (D4): read-only retry raw over UNFINISHED jobs for /metrics.queue.
|
||||
|
||||
Aggregates ``attempts`` / ``transient_attempts`` and counts jobs currently in
|
||||
backoff (``available_at > now``) across non-terminal jobs (status NOT IN
|
||||
done/failed/cancelled). Read-only; never mutates.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT "
|
||||
"COALESCE(SUM(attempts),0) AS total_attempts, "
|
||||
"COALESCE(SUM(transient_attempts),0) AS total_transient_attempts, "
|
||||
"COALESCE(MAX(attempts),0) AS max_attempts_seen, "
|
||||
"COALESCE(SUM(CASE WHEN available_at IS NOT NULL "
|
||||
" AND available_at > datetime('now') THEN 1 ELSE 0 END),0) AS in_backoff "
|
||||
"FROM jobs WHERE status NOT IN ('done','failed','cancelled')"
|
||||
).fetchone()
|
||||
finally:
|
||||
conn.close()
|
||||
return dict(row) if row else {
|
||||
"total_attempts": 0,
|
||||
"total_transient_attempts": 0,
|
||||
"max_attempts_seen": 0,
|
||||
"in_backoff": 0,
|
||||
}
|
||||
|
||||
|
||||
def reap_running_job(
|
||||
job_id: int,
|
||||
status: str,
|
||||
@@ -1185,13 +1279,20 @@ def get_job(job_id: int) -> dict | None:
|
||||
|
||||
|
||||
def job_status_counts() -> dict:
|
||||
"""Return counts grouped by status (for /queue observability)."""
|
||||
"""Return counts grouped by status (for /queue and /metrics observability).
|
||||
|
||||
ORCH-099 (D4): the default dict carries the ``cancelled`` terminal key
|
||||
(ORCH-090, terminal set ``{done, cancelled}``) so the key is always present
|
||||
with a 0 default instead of materialising only when a cancelled job exists.
|
||||
Purely additive — the GROUP BY query is unchanged and pre-existing keys keep
|
||||
their meaning (no /queue contract break).
|
||||
"""
|
||||
conn = get_db()
|
||||
rows = conn.execute(
|
||||
"SELECT status, COUNT(*) AS n FROM jobs GROUP BY status"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0}
|
||||
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0, "cancelled": 0}
|
||||
for r in rows:
|
||||
counts[r["status"]] = r["n"]
|
||||
return counts
|
||||
|
||||
Reference in New Issue
Block a user