feat(metrics): lightweight read-only GET /metrics raw-signal endpoint (ORCH-099)
All checks were successful
CI / test (push) Successful in 46s
CI / test (pull_request) Successful in 43s

FND/F1a: add a versioned read-only JSON endpoint GET /metrics that exposes the
orchestrator's own raw state for the future observability sidecar F1b — active
task stages, job queue, agent-liveness (pid/runtime/cpu_ticks), and cost/tokens.
The orchestrator emits ONLY raw signal it alone knows; thresholds/alerts/history
live in the separate sidecar (observer separated from observed, BRD §1).

- src/metrics.py: new leaf collector build_metrics() (never-raise per section,
  serial_gate.snapshot() pattern); envelope schema_version/generated_at/clk_tck +
  stages/queue/agents/cost. _read_cpu_ticks(pid) reads utime+stime from
  /proc/<pid>/stat (null on None/dead/non-Linux pid — never raises).
- src/main.py: thin @app.get("/metrics") wrapper (style of GET /queue).
- src/db.py: read-only helpers get_running_agents() (dedicated SELECT, not an
  extension of the hot-path get_running_jobs()), agent_cost_totals(),
  queue_retry_stats(); job_status_counts() default dict gains the cancelled key.
- src/config.py: metrics_endpoint_enabled kill-switch (default True), env
  ORCH_METRICS_ENABLED via explicit validation_alias so the documented switch
  actually controls the flag.
- docs: README API table row + CHANGELOG entry (contract section already added
  by architect); .env.example ORCH_METRICS_ENABLED.

Strictly read-only / never-raise: STAGE_TRANSITIONS / QG_CHECKS / check_* /
machine-verdict keys / DB schema untouched; /health//status//queue byte-for-byte.
Tests: tests/test_metrics.py (TC-01..TC-11) + env-alias tests in test_config.py.
Full suite green (1482).

Refs: ORCH-099
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-10 01:58:47 +03:00
parent ad6cd1a23a
commit 75cb3e86a4
10 changed files with 739 additions and 5 deletions

276
src/metrics.py Normal file
View File

@@ -0,0 +1,276 @@
"""ORCH-099 (FND/F1a): lightweight read-only ``/metrics`` raw-signal collector.
A leaf module that builds a versioned JSON snapshot of the orchestrator's own
raw state for the future observability sidecar (F1b, ``watchdog/``): active task
stages, the job queue, agent-liveness, and cost/tokens. The orchestrator emits
ONLY raw signal it alone knows — the sidecar is the stateful arbiter that
computes thresholds / deltas / alerts (BRD §1, observer separated from observed).
Design (ADR-001, by образцу ``serial_gate.snapshot()`` / ``cancel.snapshot()``):
* pure, never-raise, no side effects — only reads existing tables
(``tasks`` / ``jobs`` / ``agent_runs``) and the in-memory worker snapshot;
* ``build_metrics()`` assembles the envelope section-by-section, each section in
its own ``try/except`` with a safe default (``None`` / ``[]`` / ``{}``) so a
failing source degrades one field, never the whole endpoint (FR-6, NFR-2);
* strictly read-only — no INSERT/UPDATE/DELETE/CREATE/ALTER, no process control,
no network. Self-hosting-safe on the shared prod instance.
The endpoint ``GET /metrics`` (``src/main.py``) is a thin wrapper that returns
``build_metrics()`` as-is.
"""
from __future__ import annotations
import logging
import os
from datetime import datetime, timezone
logger = logging.getLogger("orchestrator.metrics")
# Contract version for the sidecar (D2). Additive changes (new field/section) do
# NOT bump it — the sidecar MUST ignore unknown keys and tolerate missing
# optional ones. Bumped ONLY on a breaking change (rename/remove/retype an
# existing field).
SCHEMA_VERSION = 1
def _now_iso() -> str:
"""UTC ISO-8601 snapshot timestamp (``...Z``), the orchestrator's own clock.
Same clock domain as the SQLite ``datetime('now')`` timestamps and the CPU
tick reads, so the sidecar's ``(cpu_ticks, generated_at)`` deltas are immune
to orchestrator↔sidecar clock skew (TR-3). Never raises.
"""
try:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("metrics._now_iso error: %s", e)
return ""
def _clk_tck() -> int | None:
"""Process-global SC_CLK_TCK (ticks/second) — the basis for converting raw CPU
ticks to seconds on the sidecar side. ``None`` on non-Linux / failure.
"""
try:
return int(os.sysconf("SC_CLK_TCK"))
except Exception as e: # noqa: BLE001 - never-raise (non-Linux / unsupported)
logger.warning("metrics._clk_tck error: %s", e)
return None
def _read_cpu_ticks(pid: int | None) -> int | None:
"""Sum of utime+stime (CPU ticks) from ``/proc/<pid>/stat`` — raw liveness signal.
The orchestrator emits raw ticks and does NOT compute the delta — the sidecar
is the stateless arbiter (it divides ``(ticks₂ticks₁)/clk_tck`` by the
``generated_at`` delta to get a CPU fraction; a tiny fraction at a growing
``runtime_s`` ⇒ a "stuck" candidate). Parsing is robust to spaces in ``comm``:
fields are read AFTER the closing ``") "`` of the process name (canonical
proc-stat read). utime = field 14, stime = field 15 → indices 11 and 12 of the
post-``)`` token list (fields 3.. shift by 3).
never-raise (NFR-2, AC-6): ``pid is None`` / missing ``/proc/<pid>`` (process
died or non-Linux) / any parse error → ``None`` (NOT an exception). The caller
keeps every other field and the whole endpoint intact.
"""
if pid is None:
return None
try:
with open(f"/proc/{int(pid)}/stat", "r") as f:
data = f.read()
rparen = data.rfind(") ")
if rparen == -1:
return None
rest = data[rparen + 2:].split()
# rest[0] = state (field 3); utime = field 14 -> rest[11], stime -> rest[12]
return int(rest[11]) + int(rest[12])
except Exception: # noqa: BLE001 - dead pid / no /proc / non-Linux -> null
return None
def _build_stages() -> list:
"""Active (non-terminal) task stages (D3, FR-1).
Source: ``db.get_active_tasks_for_reconcile()`` (``stage != 'done'`` + SQL
``age_s``), with an extra ``stage NOT IN ('done','cancelled')`` filter on the
metrics side: that helper deliberately still returns ``cancelled`` tasks for
the reconciler's skip-counter (ORCH-086), but terminal tasks are not raw
observability signal (terminal set ``{done, cancelled}``, ORCH-090). The helper
invariant belongs to ORCH-053/086 — we filter at the consumer, not the source.
"""
from . import db
rows = db.get_active_tasks_for_reconcile()
out = []
for t in rows:
if t.get("stage") in ("done", "cancelled"):
continue
out.append({
"work_item": t.get("work_item_id"),
"stage": t.get("stage"),
"age_in_stage_s": t.get("age_s"),
"repo": t.get("repo"),
"task_id": t.get("id"),
})
return out
def _build_queue() -> dict:
"""Job-queue raw signal (D4, FR-2): counts, depth, retries, breaker, concurrency.
Each sub-source is independently guarded: an uninitialised ``worker`` (e.g. in
a test) degrades to ``breaker: null`` / ``max_concurrency: null`` — never a 500
(NFR-2).
"""
from . import db
counts = None
try:
counts = db.job_status_counts()
except Exception as e: # noqa: BLE001
logger.warning("metrics queue counts error: %s", e)
retries = None
try:
retries = db.queue_retry_stats()
except Exception as e: # noqa: BLE001
logger.warning("metrics queue retries error: %s", e)
breaker = None
max_concurrency = None
poll_interval = None
try:
from .queue_worker import worker
try:
breaker = worker.breaker.snapshot()
except Exception as e: # noqa: BLE001
logger.warning("metrics breaker snapshot error: %s", e)
max_concurrency = getattr(worker, "max_concurrency", None)
poll_interval = getattr(worker, "poll_interval", None)
except Exception as e: # noqa: BLE001 - worker not initialised
logger.warning("metrics worker access error: %s", e)
depth = counts.get("queued") if isinstance(counts, dict) else None
return {
"counts": counts,
"depth": depth,
"retries": retries,
"breaker": breaker,
"max_concurrency": max_concurrency,
"poll_interval": poll_interval,
}
def _build_agents() -> list:
"""Agent-liveness raw signal (D5/D6, FR-3).
One entry per running job from ``db.get_running_agents()`` with process
identity (``agent`` / ``run_id`` / ``job_id`` / ``pid``), ``runtime_s``
(= ``running_age_s``, anchored on ``jobs.started_at``, D6), ``model`` /
``effort``, and the raw ``cpu_ticks`` from ``/proc/<pid>/stat``. ``pid is
None`` / dead process → ``cpu_ticks: null`` for THAT agent; the rest stays
intact (AC-6, TC-05).
"""
from . import db
rows = db.get_running_agents()
out = []
for j in rows:
pid = j.get("pid")
out.append({
"agent": j.get("agent"),
"run_id": j.get("run_id"),
"job_id": j.get("job_id"),
"repo": j.get("repo"),
"pid": pid,
"runtime_s": j.get("running_age_s"),
"model": j.get("model"),
"effort": j.get("effort"),
"cpu_ticks": _read_cpu_ticks(pid),
})
return out
def _build_cost() -> dict:
"""Cost / token raw signal (D7, FR-4).
``running`` — current per-running-job accruals from ``agent_runs`` (often
``null`` until the job finishes and the launcher parses the CLI JSON — ``null``
is honest raw, NOT zero, TR-5). ``aggregate`` — summed totals over all
``agent_runs`` (empty table → zeros, TC-06/TC-11).
"""
from . import db
running = []
try:
for j in db.get_running_agents():
running.append({
"run_id": j.get("run_id"),
"job_id": j.get("job_id"),
"agent": j.get("agent"),
"cost_usd": j.get("cost_usd"),
"input_tokens": j.get("input_tokens"),
"output_tokens": j.get("output_tokens"),
"cache_read_tokens": j.get("cache_read_tokens"),
"cache_creation_tokens": j.get("cache_creation_tokens"),
})
except Exception as e: # noqa: BLE001
logger.warning("metrics cost.running error: %s", e)
running = []
aggregate = None
try:
aggregate = db.agent_cost_totals()
except Exception as e: # noqa: BLE001
logger.warning("metrics cost.aggregate error: %s", e)
return {"running": running, "aggregate": aggregate}
def build_metrics() -> dict:
"""Assemble the ``/metrics`` envelope (FR-5). never-raise (FR-6, NFR-2, AC-4).
Each section is collected in its own ``try/except`` with a safe default so a
failing source degrades one section, not the whole response. Honours the
``metrics_endpoint_enabled`` kill-switch (D8): when off, returns a minimal
parsable body ``{"schema_version", "enabled": false}`` (200, NOT 404) so the
sidecar sees the off-switch explicitly.
"""
try:
from .config import settings
if not bool(getattr(settings, "metrics_endpoint_enabled", True)):
return {"schema_version": SCHEMA_VERSION, "enabled": False}
except Exception as e: # noqa: BLE001 - kill-switch read must never break /metrics
logger.warning("metrics kill-switch read error: %s", e)
out: dict = {
"schema_version": SCHEMA_VERSION,
"generated_at": _now_iso(),
"clk_tck": _clk_tck(),
}
try:
out["stages"] = _build_stages()
except Exception as e: # noqa: BLE001
logger.warning("metrics stages section error: %s", e)
out["stages"] = []
try:
out["queue"] = _build_queue()
except Exception as e: # noqa: BLE001
logger.warning("metrics queue section error: %s", e)
out["queue"] = None
try:
out["agents"] = _build_agents()
except Exception as e: # noqa: BLE001
logger.warning("metrics agents section error: %s", e)
out["agents"] = []
try:
out["cost"] = _build_cost()
except Exception as e: # noqa: BLE001
logger.warning("metrics cost section error: %s", e)
out["cost"] = None
return out