Files
orchestrator/tests/test_tracker_rollback_metrics.py
claude-bot 328ae78da3 fix(notifications): tracker card — status-map completeness, rollback reflection, stage-metric summation (ORCH-091)
Three verified live-card defects in src/notifications.py (ORCH-067/087),
all additive and indication-only (STAGE_TRANSITIONS / QG_CHECKS / check_* /
transport / DB schema untouched; never-raise; revert = git revert):

- Деф.1 (D1): _STAGE_STATUS_LABEL covered 8 of 10 STAGE_TRANSITIONS keys —
  deploy-staging and cancelled (ORCH-090) fell back to the misleading
  "To Analyse". Added deploy-staging→"Deploying (staging)",
  cancelled→"Cancelled"; replaced the runtime fallback for an UNMAPPED stage
  with a neutral capitalized label (_neutral_stage_label). created stays an
  explicit "To Analyse"; broken/None input degrades safely. Map completeness
  is asserted programmatically from STAGE_TRANSITIONS.keys() (single source of
  truth), not a static list.
- Деф.2 (D2): the stage-row loop drew  for any stage with a finished agent
  run regardless of position — after a rollback the card showed the absurd
  " Внедрение + 🔄 Разработка". Added read-only _pipeline_pos from the
  STAGE_TRANSITIONS order and a suppression gate ( only when
  current_pos >= _pipeline_pos(stage_key)); deploy-staging→deploy normalization
  applied ONLY to the current position; is_active_stage untouched.
- Деф.3 (D3): _stage_line took only the LAST run (ORCH-069: developer 3 runs
  Σ $3.98 rendered ~$0.00). It now aggregates ALL of the agent's runs with the
  same per-run formulas as the task totals → strict convergence with
  SUM(agent_runs) by task_id; model/effort/attempt come from the last run.

Tests: test_tracker_status_line.py (ORCH-091 TC-01..TC-03 + updated tc06);
new test_tracker_rollback_metrics.py (TC-05..TC-08). Full suite green (1370).
Docs: CHANGELOG + internals.md (architecture README already updated by architect).

Refs: ORCH-091
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 22:08:52 +03:00

284 lines
14 KiB
Python

"""ORCH-091 — Group 2 (D2/D3): rollback reflection + stage-metric summation.
Covers TC-05..TC-08 from 04-test-plan.yaml. The render path is pure DB (no
network); a temp SQLite holds tasks + agent_runs.
TC-05 / AC-4 — rollback deploy-staging->development: Development active (🔄),
Testing/Внедрение NOT shown ✅, Анализ/Архитектура stay ✅.
TC-06 / AC-5 — stage line sums ALL of an agent's runs (ORCH-069 developer
3 runs ≈ $3.98), not the last run.
TC-07 / AC-5 — task totals (💰/🔢/⏱) converge with SUM(agent_runs).
TC-08 / AC-7 — render_task_tracker never raises on broken/partial rows.
"""
import os
import tempfile
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_rollback_metrics.db")
os.environ["ORCH_DB_PATH"] = _test_db
import pytest # noqa: E402
import src.db as db_module # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import notifications as N # noqa: E402
from src.usage import fmt_cost, fmt_tokens, _input_total # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Render-only: keep the live overlay off (offline core under test).
monkeypatch.setattr(N._get_settings(), "tracker_live_status", False, raising=False)
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
def _mk_task(stage="development", wid="ORCH-091", title="rollback/metrics",
created=None, updated=None):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?,?,?,?,?,?)",
("p1", wid, "orchestrator", "feature/ORCH-091-x", stage, title),
)
tid = cur.lastrowid
if created or updated:
conn.execute(
"UPDATE tasks SET created_at=COALESCE(?, created_at), "
"updated_at=COALESCE(?, updated_at) WHERE id=?",
(created, updated, tid),
)
conn.commit()
conn.close()
return tid
def _mk_run(tid, agent, started, finished, *, model="tokenator/claude-opus-4-8",
in_tok=10, out_tok=5, cache_read=0, cache_creation=0, cost=0.0,
effort=None, exit_code=0):
conn = get_db()
conn.execute(
"INSERT INTO agent_runs (task_id, agent, started_at, finished_at, "
"exit_code, input_tokens, output_tokens, cache_read_tokens, "
"cache_creation_tokens, cost_usd, model, effort) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
(tid, agent, started, finished, exit_code, in_tok, out_tok, cache_read,
cache_creation, cost, model, effort),
)
conn.commit()
conn.close()
def _stage_line(text, label):
"""The single '✅ <label> ...' line for a stage, or None."""
for ln in text.splitlines():
if ln.startswith(f"{label}"):
return ln
return None
def _has_active(text, label):
"""True if the '🔄 <label> ...' active line is present."""
return any(ln.startswith(f"🔄 {label}") for ln in text.splitlines())
# =========================================================================== #
# TC-05 / AC-4 — rollback reflection (deploy-staging -> development)
# =========================================================================== #
def test_tc05_rollback_suppresses_later_stage_checkmarks():
"""A task back on stage='development' after later stages ran: Development is
active (🔄), and Тестирование/Внедрение/Код ревью are NOT shown as ✅, while
earlier stages (Анализ/Архитектура) stay ✅."""
tid = _mk_task(stage="development")
# Earlier stages finished.
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00")
_mk_run(tid, "architect", "2026-06-04 09:10:00", "2026-06-04 09:20:00")
# First development pass finished, then later stages ran...
_mk_run(tid, "developer", "2026-06-04 09:20:00", "2026-06-04 09:40:00", cost=1.0)
_mk_run(tid, "reviewer", "2026-06-04 09:40:00", "2026-06-04 09:50:00")
_mk_run(tid, "tester", "2026-06-04 09:50:00", "2026-06-04 10:00:00")
_mk_run(tid, "deployer", "2026-06-04 10:00:00", "2026-06-04 10:10:00")
# ...then a rollback re-launched developer -> in-flight run (finished_at NULL).
_mk_run(tid, "developer", "2026-06-04 10:20:00", None, exit_code=None, cost=0.0)
text = N.render_task_tracker(tid)
# Development active, not a ✅.
assert _has_active(text, "Разработка"), text
# Later-than-current stages: no ✅ line (the rollback is honestly reflected).
assert _stage_line(text, "Код ревью") is None, text
assert _stage_line(text, "Тестирование") is None, text
assert _stage_line(text, "Внедрение") is None, text
# Earlier stages still ✅.
assert _stage_line(text, "Анализ") is not None, text
assert _stage_line(text, "Архитектура") is not None, text
def test_tc05_forward_progress_keeps_earlier_checkmarks():
"""Regression guard: normal forward progress (no rollback) still shows all
earlier stages ✅ — the suppression gate only fires for stages AFTER current."""
tid = _mk_task(stage="testing")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00")
_mk_run(tid, "architect", "2026-06-04 09:10:00", "2026-06-04 09:20:00")
_mk_run(tid, "developer", "2026-06-04 09:20:00", "2026-06-04 09:40:00")
_mk_run(tid, "reviewer", "2026-06-04 09:40:00", "2026-06-04 09:50:00")
# tester in-flight on the testing stage.
_mk_run(tid, "tester", "2026-06-04 09:50:00", None, exit_code=None)
text = N.render_task_tracker(tid)
assert _stage_line(text, "Анализ") is not None
assert _stage_line(text, "Архитектура") is not None
assert _stage_line(text, "Разработка") is not None
assert _stage_line(text, "Код ревью") is not None
assert _has_active(text, "Тестирование")
def test_tc05_deploy_staging_keeps_deployer_row():
"""Normalization: on stage='deploy-staging' the collapsed 'Внедрение' row
(stage_key='deploy') is NOT wrongly suppressed by the rollback gate."""
tid = _mk_task(stage="deploy-staging")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00")
_mk_run(tid, "architect", "2026-06-04 09:10:00", "2026-06-04 09:20:00")
_mk_run(tid, "developer", "2026-06-04 09:20:00", "2026-06-04 09:40:00")
_mk_run(tid, "reviewer", "2026-06-04 09:40:00", "2026-06-04 09:50:00")
_mk_run(tid, "tester", "2026-06-04 09:50:00", "2026-06-04 10:00:00")
# staging deploy finished (deployer agent, collapsed into Внедрение).
_mk_run(tid, "deployer", "2026-06-04 10:00:00", "2026-06-04 10:10:00")
text = N.render_task_tracker(tid)
# Внедрение must NOT be suppressed (preserved pre-ORCH-091 behaviour).
assert _stage_line(text, "Внедрение") is not None, text
assert _stage_line(text, "Тестирование") is not None, text
# =========================================================================== #
# TC-06 / AC-5 — stage-metric summation over retries (ORCH-069 fixture)
# =========================================================================== #
def test_tc06_stage_line_sums_all_developer_runs():
"""developer with 3 runs (ORCH-069: Σ ≈ $3.98) -> the 'Разработка' line shows
Σ cost / Σ tokens / Σ time, NOT the last run alone."""
tid = _mk_task(stage="review") # past development -> ✅ shown
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00")
_mk_run(tid, "architect", "2026-06-04 09:10:00", "2026-06-04 09:20:00")
# Three developer attempts: $1.50 + $2.00 + $0.48 = $3.98; 30m total.
_mk_run(tid, "developer", "2026-06-04 09:20:00", "2026-06-04 09:30:00",
cost=1.50, in_tok=100, out_tok=40, cache_read=10)
_mk_run(tid, "developer", "2026-06-04 09:30:00", "2026-06-04 09:45:00",
cost=2.00, in_tok=200, out_tok=60, cache_creation=20)
_mk_run(tid, "developer", "2026-06-04 09:45:00", "2026-06-04 09:50:00",
cost=0.48, in_tok=50, out_tok=10)
text = N.render_task_tracker(tid)
line = _stage_line(text, "Разработка")
assert line is not None, text
# Σ cost = $3.98 (not the last $0.48).
assert fmt_cost(3.98) in line, line
assert fmt_cost(0.48) not in line, line
# Σ output tokens = 40+60+10 = 110.
assert f"{fmt_tokens(110)}" in line, line
# Σ input (input+cache_read+cache_creation): (100+10)+(200+20)+50 = 380.
exp_in = _input_total({"input_tokens": 100, "cache_read_tokens": 10,
"cache_creation_tokens": 0}) \
+ _input_total({"input_tokens": 200, "cache_read_tokens": 0,
"cache_creation_tokens": 20}) \
+ _input_total({"input_tokens": 50, "cache_read_tokens": 0,
"cache_creation_tokens": 0})
assert f"{fmt_tokens(exp_in)}" in line, line
# Σ time = 10+15+5 = 30m.
assert " 30м " in line, line
# =========================================================================== #
# TC-07 / AC-5 — task totals converge with SUM(agent_runs)
# =========================================================================== #
def test_tc07_totals_converge_with_sum_agent_runs():
"""The 💰 totals line equals SUM(agent_runs) over cost & tokens even with
retries (the stage lines and the totals draw from the same row set)."""
tid = _mk_task(stage="review")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
cost=0.20, in_tok=30, out_tok=10)
_mk_run(tid, "architect", "2026-06-04 09:10:00", "2026-06-04 09:20:00",
cost=0.30, in_tok=40, out_tok=12)
_mk_run(tid, "developer", "2026-06-04 09:20:00", "2026-06-04 09:30:00",
cost=1.50, in_tok=100, out_tok=40, cache_read=10)
_mk_run(tid, "developer", "2026-06-04 09:30:00", "2026-06-04 09:45:00",
cost=2.00, in_tok=200, out_tok=60, cache_creation=20)
# Authoritative SUM straight from the DB.
conn = get_db()
rows = conn.execute(
"SELECT input_tokens, output_tokens, cache_read_tokens, "
"cache_creation_tokens, cost_usd FROM agent_runs WHERE task_id=?",
(tid,),
).fetchall()
conn.close()
sum_cost = sum(float(r["cost_usd"] or 0) for r in rows)
sum_out = sum(int(r["output_tokens"] or 0) for r in rows)
sum_in = sum(_input_total({"input_tokens": r["input_tokens"],
"cache_read_tokens": r["cache_read_tokens"],
"cache_creation_tokens": r["cache_creation_tokens"]})
for r in rows)
text = N.render_task_tracker(tid)
totals = [ln for ln in text.splitlines() if ln.startswith("💰")][0]
assert fmt_cost(sum_cost) in totals, totals
assert f"{fmt_tokens(sum_in)}" in totals, totals
assert f"{fmt_tokens(sum_out)}" in totals, totals
def test_tc07_sum_of_stage_lines_equals_totals_on_done():
"""On a done task with retries, Σ(stage-line costs) == totals cost: each agent
maps to exactly one stage row, so no run is double-counted or dropped."""
tid = _mk_task(stage="done")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", cost=0.20)
_mk_run(tid, "developer", "2026-06-04 09:20:00", "2026-06-04 09:30:00", cost=1.50)
_mk_run(tid, "developer", "2026-06-04 09:30:00", "2026-06-04 09:45:00", cost=2.00)
_mk_run(tid, "deployer", "2026-06-04 10:00:00", "2026-06-04 10:10:00", cost=0.30)
text = N.render_task_tracker(tid)
totals = [ln for ln in text.splitlines() if ln.startswith("💰")][0]
# developer stage line = Σ $3.50 (not $2.00), totals = $4.00.
dev_line = _stage_line(text, "Разработка")
assert fmt_cost(3.50) in dev_line, dev_line
assert fmt_cost(4.00) in totals, totals
# =========================================================================== #
# TC-08 / AC-7 — render_task_tracker never raises on broken/partial rows
# =========================================================================== #
def test_tc08_render_survives_null_timestamps_and_runs():
"""NULL timestamps / partial runs -> render returns a string, never raises."""
tid = _mk_task(stage="development")
# Run with NULL started/finished and NULL token columns.
conn = get_db()
conn.execute(
"INSERT INTO agent_runs (task_id, agent, started_at, finished_at, "
"exit_code, input_tokens, output_tokens, cost_usd, model) "
"VALUES (?,?,?,?,?,?,?,?,?)",
(tid, "developer", None, None, None, None, None, None, None),
)
conn.commit()
conn.close()
text = N.render_task_tracker(tid) # must not raise
assert isinstance(text, str) and text
def test_tc08_render_survives_bogus_stage():
"""A task sitting on a truly unknown stage still renders (never-raise)."""
tid = _mk_task(stage="__bogus__")
_mk_run(tid, "developer", "2026-06-04 09:00:00", "2026-06-04 09:10:00")
text = N.render_task_tracker(tid)
assert isinstance(text, str) and text
# Unknown stage -> developer's finished run is past "far future" current pos?
# current_pos for unknown = len(order) -> every real stage_key <= it -> ✅ kept
# (degrades to pre-ORCH-091 behaviour, no spurious suppression).
assert _stage_line(text, "Разработка") is not None, text