Files
orchestrator/src/metrics.py
claude-bot d8793c9698 feat(metrics): lightweight read-only GET /metrics raw-signal endpoint (ORCH-099)
FND/F1a: add a versioned read-only JSON endpoint GET /metrics that exposes the
orchestrator's own raw state for the future observability sidecar F1b — active
task stages, job queue, agent-liveness (pid/runtime/cpu_ticks), and cost/tokens.
The orchestrator emits ONLY raw signal it alone knows; thresholds/alerts/history
live in the separate sidecar (observer separated from observed, BRD §1).

- src/metrics.py: new leaf collector build_metrics() (never-raise per section,
  serial_gate.snapshot() pattern); envelope schema_version/generated_at/clk_tck +
  stages/queue/agents/cost. _read_cpu_ticks(pid) reads utime+stime from
  /proc/<pid>/stat (null on None/dead/non-Linux pid — never raises).
- src/main.py: thin @app.get("/metrics") wrapper (style of GET /queue).
- src/db.py: read-only helpers get_running_agents() (dedicated SELECT, not an
  extension of the hot-path get_running_jobs()), agent_cost_totals(),
  queue_retry_stats(); job_status_counts() default dict gains the cancelled key.
- src/config.py: metrics_endpoint_enabled kill-switch (default True), env
  ORCH_METRICS_ENABLED via explicit validation_alias so the documented switch
  actually controls the flag.
- docs: README API table row + CHANGELOG entry (contract section already added
  by architect); .env.example ORCH_METRICS_ENABLED.

Strictly read-only / never-raise: STAGE_TRANSITIONS / QG_CHECKS / check_* /
machine-verdict keys / DB schema untouched; /health//status//queue byte-for-byte.
Tests: tests/test_metrics.py (TC-01..TC-11) + env-alias tests in test_config.py.
Full suite green (1482).

Refs: ORCH-099
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 02:09:19 +03:00

277 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ORCH-099 (FND/F1a): lightweight read-only ``/metrics`` raw-signal collector.
A leaf module that builds a versioned JSON snapshot of the orchestrator's own
raw state for the future observability sidecar (F1b, ``watchdog/``): active task
stages, the job queue, agent-liveness, and cost/tokens. The orchestrator emits
ONLY raw signal it alone knows — the sidecar is the stateful arbiter that
computes thresholds / deltas / alerts (BRD §1, observer separated from observed).
Design (ADR-001, by образцу ``serial_gate.snapshot()`` / ``cancel.snapshot()``):
* pure, never-raise, no side effects — only reads existing tables
(``tasks`` / ``jobs`` / ``agent_runs``) and the in-memory worker snapshot;
* ``build_metrics()`` assembles the envelope section-by-section, each section in
its own ``try/except`` with a safe default (``None`` / ``[]`` / ``{}``) so a
failing source degrades one field, never the whole endpoint (FR-6, NFR-2);
* strictly read-only — no INSERT/UPDATE/DELETE/CREATE/ALTER, no process control,
no network. Self-hosting-safe on the shared prod instance.
The endpoint ``GET /metrics`` (``src/main.py``) is a thin wrapper that returns
``build_metrics()`` as-is.
"""
from __future__ import annotations
import logging
import os
from datetime import datetime, timezone
logger = logging.getLogger("orchestrator.metrics")
# Contract version for the sidecar (D2). Additive changes (new field/section) do
# NOT bump it — the sidecar MUST ignore unknown keys and tolerate missing
# optional ones. Bumped ONLY on a breaking change (rename/remove/retype an
# existing field).
SCHEMA_VERSION = 1
def _now_iso() -> str:
"""UTC ISO-8601 snapshot timestamp (``...Z``), the orchestrator's own clock.
Same clock domain as the SQLite ``datetime('now')`` timestamps and the CPU
tick reads, so the sidecar's ``(cpu_ticks, generated_at)`` deltas are immune
to orchestrator↔sidecar clock skew (TR-3). Never raises.
"""
try:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("metrics._now_iso error: %s", e)
return ""
def _clk_tck() -> int | None:
"""Process-global SC_CLK_TCK (ticks/second) — the basis for converting raw CPU
ticks to seconds on the sidecar side. ``None`` on non-Linux / failure.
"""
try:
return int(os.sysconf("SC_CLK_TCK"))
except Exception as e: # noqa: BLE001 - never-raise (non-Linux / unsupported)
logger.warning("metrics._clk_tck error: %s", e)
return None
def _read_cpu_ticks(pid: int | None) -> int | None:
"""Sum of utime+stime (CPU ticks) from ``/proc/<pid>/stat`` — raw liveness signal.
The orchestrator emits raw ticks and does NOT compute the delta — the sidecar
is the stateless arbiter (it divides ``(ticks₂ticks₁)/clk_tck`` by the
``generated_at`` delta to get a CPU fraction; a tiny fraction at a growing
``runtime_s`` ⇒ a "stuck" candidate). Parsing is robust to spaces in ``comm``:
fields are read AFTER the closing ``") "`` of the process name (canonical
proc-stat read). utime = field 14, stime = field 15 → indices 11 and 12 of the
post-``)`` token list (fields 3.. shift by 3).
never-raise (NFR-2, AC-6): ``pid is None`` / missing ``/proc/<pid>`` (process
died or non-Linux) / any parse error → ``None`` (NOT an exception). The caller
keeps every other field and the whole endpoint intact.
"""
if pid is None:
return None
try:
with open(f"/proc/{int(pid)}/stat", "r") as f:
data = f.read()
rparen = data.rfind(") ")
if rparen == -1:
return None
rest = data[rparen + 2:].split()
# rest[0] = state (field 3); utime = field 14 -> rest[11], stime -> rest[12]
return int(rest[11]) + int(rest[12])
except Exception: # noqa: BLE001 - dead pid / no /proc / non-Linux -> null
return None
def _build_stages() -> list:
"""Active (non-terminal) task stages (D3, FR-1).
Source: ``db.get_active_tasks_for_reconcile()`` (``stage != 'done'`` + SQL
``age_s``), with an extra ``stage NOT IN ('done','cancelled')`` filter on the
metrics side: that helper deliberately still returns ``cancelled`` tasks for
the reconciler's skip-counter (ORCH-086), but terminal tasks are not raw
observability signal (terminal set ``{done, cancelled}``, ORCH-090). The helper
invariant belongs to ORCH-053/086 — we filter at the consumer, not the source.
"""
from . import db
rows = db.get_active_tasks_for_reconcile()
out = []
for t in rows:
if t.get("stage") in ("done", "cancelled"):
continue
out.append({
"work_item": t.get("work_item_id"),
"stage": t.get("stage"),
"age_in_stage_s": t.get("age_s"),
"repo": t.get("repo"),
"task_id": t.get("id"),
})
return out
def _build_queue() -> dict:
"""Job-queue raw signal (D4, FR-2): counts, depth, retries, breaker, concurrency.
Each sub-source is independently guarded: an uninitialised ``worker`` (e.g. in
a test) degrades to ``breaker: null`` / ``max_concurrency: null`` — never a 500
(NFR-2).
"""
from . import db
counts = None
try:
counts = db.job_status_counts()
except Exception as e: # noqa: BLE001
logger.warning("metrics queue counts error: %s", e)
retries = None
try:
retries = db.queue_retry_stats()
except Exception as e: # noqa: BLE001
logger.warning("metrics queue retries error: %s", e)
breaker = None
max_concurrency = None
poll_interval = None
try:
from .queue_worker import worker
try:
breaker = worker.breaker.snapshot()
except Exception as e: # noqa: BLE001
logger.warning("metrics breaker snapshot error: %s", e)
max_concurrency = getattr(worker, "max_concurrency", None)
poll_interval = getattr(worker, "poll_interval", None)
except Exception as e: # noqa: BLE001 - worker not initialised
logger.warning("metrics worker access error: %s", e)
depth = counts.get("queued") if isinstance(counts, dict) else None
return {
"counts": counts,
"depth": depth,
"retries": retries,
"breaker": breaker,
"max_concurrency": max_concurrency,
"poll_interval": poll_interval,
}
def _build_agents() -> list:
"""Agent-liveness raw signal (D5/D6, FR-3).
One entry per running job from ``db.get_running_agents()`` with process
identity (``agent`` / ``run_id`` / ``job_id`` / ``pid``), ``runtime_s``
(= ``running_age_s``, anchored on ``jobs.started_at``, D6), ``model`` /
``effort``, and the raw ``cpu_ticks`` from ``/proc/<pid>/stat``. ``pid is
None`` / dead process → ``cpu_ticks: null`` for THAT agent; the rest stays
intact (AC-6, TC-05).
"""
from . import db
rows = db.get_running_agents()
out = []
for j in rows:
pid = j.get("pid")
out.append({
"agent": j.get("agent"),
"run_id": j.get("run_id"),
"job_id": j.get("job_id"),
"repo": j.get("repo"),
"pid": pid,
"runtime_s": j.get("running_age_s"),
"model": j.get("model"),
"effort": j.get("effort"),
"cpu_ticks": _read_cpu_ticks(pid),
})
return out
def _build_cost() -> dict:
"""Cost / token raw signal (D7, FR-4).
``running`` — current per-running-job accruals from ``agent_runs`` (often
``null`` until the job finishes and the launcher parses the CLI JSON — ``null``
is honest raw, NOT zero, TR-5). ``aggregate`` — summed totals over all
``agent_runs`` (empty table → zeros, TC-06/TC-11).
"""
from . import db
running = []
try:
for j in db.get_running_agents():
running.append({
"run_id": j.get("run_id"),
"job_id": j.get("job_id"),
"agent": j.get("agent"),
"cost_usd": j.get("cost_usd"),
"input_tokens": j.get("input_tokens"),
"output_tokens": j.get("output_tokens"),
"cache_read_tokens": j.get("cache_read_tokens"),
"cache_creation_tokens": j.get("cache_creation_tokens"),
})
except Exception as e: # noqa: BLE001
logger.warning("metrics cost.running error: %s", e)
running = []
aggregate = None
try:
aggregate = db.agent_cost_totals()
except Exception as e: # noqa: BLE001
logger.warning("metrics cost.aggregate error: %s", e)
return {"running": running, "aggregate": aggregate}
def build_metrics() -> dict:
"""Assemble the ``/metrics`` envelope (FR-5). never-raise (FR-6, NFR-2, AC-4).
Each section is collected in its own ``try/except`` with a safe default so a
failing source degrades one section, not the whole response. Honours the
``metrics_endpoint_enabled`` kill-switch (D8): when off, returns a minimal
parsable body ``{"schema_version", "enabled": false}`` (200, NOT 404) so the
sidecar sees the off-switch explicitly.
"""
try:
from .config import settings
if not bool(getattr(settings, "metrics_endpoint_enabled", True)):
return {"schema_version": SCHEMA_VERSION, "enabled": False}
except Exception as e: # noqa: BLE001 - kill-switch read must never break /metrics
logger.warning("metrics kill-switch read error: %s", e)
out: dict = {
"schema_version": SCHEMA_VERSION,
"generated_at": _now_iso(),
"clk_tck": _clk_tck(),
}
try:
out["stages"] = _build_stages()
except Exception as e: # noqa: BLE001
logger.warning("metrics stages section error: %s", e)
out["stages"] = []
try:
out["queue"] = _build_queue()
except Exception as e: # noqa: BLE001
logger.warning("metrics queue section error: %s", e)
out["queue"] = None
try:
out["agents"] = _build_agents()
except Exception as e: # noqa: BLE001
logger.warning("metrics agents section error: %s", e)
out["agents"] = []
try:
out["cost"] = _build_cost()
except Exception as e: # noqa: BLE001
logger.warning("metrics cost section error: %s", e)
out["cost"] = None
return out