fix(notifications): tracker orphan cleanup + effort-in-line + honest done-time (ORCH-087)

Устраняет «замёрзшие» осиротевшие карточки live-трекера и доделывает строку
стадии/итоговое время.

G1 — зачистка сирот: аддитивный леджер tracker_messages(task_id, message_id,
created_at, deleted_at) + хелперы add/get_open/mark_deleted в src/db.py. bump
теперь удаляет ВСЕ незакрытые mid задачи (а не только скаляр
tasks.tracker_message_id, сохранён как BC-указатель). Новый mid в леджер только
при успешном send (BR-6); transient-delete остаётся для ретрая; «already
gone»/>48ч закрывается. Корень бага — скалярный учёт, терявший ссылку при
гонке/delete-fail+send-ok (ADR-001 G0).

G3 — deploy-цикл: ключ confirm_deploy в _LIVE_BRANCH_LABELS (без base-alias).

BR-EFF — эффорт в строке: колонка agent_runs.effort (_ensure_column,
идемпотентно), стамп фактического resolve_agent_effort в launcher._spawn в
момент запуска; рендер `· {model} · {effort}`, пустой → суффикс опускается.

BR-G5 — честное время: done-строка `⏱️ Агенты Σ · твоё {review~cap} · общее с
ожиданием {wall}` — три независимых подписанных метрики; кап
tracker_brd_review_cap_s (ORCH_TRACKER_BRD_REVIEW_CAP_S, дефолт 2ч, маркер ~).

Инварианты: STAGE_TRANSITIONS/QG_CHECKS/стадии без изменений; миграции
аддитивны/идемпотентны (enduro не трогается); never-raise,
disable_notification, plane_issue_link (ORCH-067), disable_web_page_preview
(ORCH-080) сохранены; src/reconciler.py не эродирован (ORCH-086 на месте).

Тесты: tests/test_notifications_orphans.py (TC-01..05 + never-raise),
tests/test_tracker_effort_time.py (TC-06/11..15 + confirm_deploy),
tests/test_launcher.py::TestEffortStamp (TC-09/10). Доки: CLAUDE.md
(§Нотификации), docs/architecture/README.md (Notifications), CHANGELOG.md.

Refs: ORCH-087

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-09 09:20:20 +03:00
committed by stream
parent 36c7a68722
commit a7b27f2235
11 changed files with 729 additions and 17 deletions

View File

@@ -323,3 +323,83 @@ class TestActionStageNoChangesNote:
def test_never_raises_on_bad_input(self):
"""never-raise: odd inputs (None stage / None repo) degrade to None."""
assert action_stage_no_changes_note(None, None) is None
# ---------------------------------------------------------------------------
# ORCH-087 (BR-EFF): agent_runs.effort migration + launch-time stamp
# ---------------------------------------------------------------------------
class TestEffortStamp:
"""TC-09/TC-10: the effort column is idempotent and stamped at launch."""
def _fresh_db(self, monkeypatch):
import src.db as db_module
if os.path.exists(_test_db):
os.unlink(_test_db)
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
from src.db import init_db
init_db()
def test_effort_migration_idempotent(self, monkeypatch):
"""TC-09/AC-E.1: _ensure_column twice -> no error; column present."""
self._fresh_db(monkeypatch)
from src.db import init_db, get_db
init_db() # second call must be a no-op
conn = get_db()
cols = [r[1] for r in conn.execute("PRAGMA table_info(agent_runs)").fetchall()]
conn.close()
assert "effort" in cols
def test_spawn_stamps_resolved_effort(self, tmp_path, monkeypatch):
"""TC-10/AC-E.1: _spawn writes the REAL resolved --effort to agent_runs.
developer resolves to xhigh (ORCH-081 floor); the stamp must match that.
All OS/process side-effects are faked so nothing is actually launched.
"""
self._fresh_db(monkeypatch)
from src.db import get_db
import src.agents.launcher as L
# A real repo dir so the isdir() guard passes; worktree is faked.
repo = "orchestrator"
(tmp_path / repo).mkdir()
monkeypatch.setattr(L.settings, "repos_dir", str(tmp_path), raising=False)
monkeypatch.setattr(L, "ensure_worktree", lambda r, b: str(tmp_path / repo))
monkeypatch.setattr("src.projects.get_project_by_repo", lambda r: None)
# No --effort env overrides -> developer falls to its xhigh floor.
monkeypatch.setattr(L.settings, "agent_effort_developer", "", raising=False)
monkeypatch.setattr(L.settings, "agent_effort_default", "", raising=False)
# Fake the process + threads so nothing real runs.
class _Proc:
pid = 4242
monkeypatch.setattr(L.subprocess, "Popen", lambda *a, **k: _Proc())
class _T:
def __init__(self, *a, **k):
pass
def start(self):
pass
monkeypatch.setattr(L.threading, "Thread", _T)
monkeypatch.setattr(L, "notify_agent_started", lambda *a, **k: None)
# Seed a task row so _spawn can resolve the branch.
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?,?,?,?,?,?)",
("p1", "ORCH-087", repo, "feature/ORCH-087-x", "development", "t"),
)
tid = cur.lastrowid
conn.commit()
conn.close()
launcher = L.AgentLauncher()
run_id = launcher._spawn("developer", repo, task_content=None, task_id=tid)
conn = get_db()
row = conn.execute(
"SELECT effort FROM agent_runs WHERE id=?", (run_id,)
).fetchone()
conn.close()
assert row[0] == "xhigh"

View File

@@ -0,0 +1,222 @@
"""ORCH-087 (BR-G1): tracker_messages ledger — no orphaned cards in bump mode.
The scalar tasks.tracker_message_id only ever knew the LAST mid, so any lost
reference (delete-fail+send-ok, race, restart) orphaned older cards forever. The
additive tracker_messages ledger lets every bump delete ALL still-open mids, not
just the last one. These tests model the dominant orphan generators (vopros 2 in
ADR-001) with Telegram fully mocked (no network).
Covers TC-01..TC-05 / AC-1.2, AC-1.3, AC-X.1.
"""
import os
import tempfile
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_orphans.db")
os.environ["ORCH_DB_PATH"] = _test_db
import pytest # noqa: E402
import src.db as db_module # noqa: E402
from src.db import ( # noqa: E402
init_db, get_db, get_tracker_message_id, set_tracker_message_id,
add_tracker_message, get_open_tracker_messages, mark_tracker_message_deleted,
)
from src import notifications as N # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Keep the render cheap & deterministic (no real Telegram / Plane).
monkeypatch.setattr(N, "render_task_tracker", lambda task_id: "CARD")
_bump_mode(monkeypatch)
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
def _bump_mode(monkeypatch):
monkeypatch.setattr(N._get_settings(), "tracker_mode", "bump", raising=False)
def _mk_task(stage="development", wid="ORCH-087"):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
"VALUES (?, ?, ?, ?, ?, ?)",
("p1", wid, "orchestrator", "feature/ORCH-087-x", stage, "orphan test"),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
# --------------------------------------------------------------------------- #
# ledger helpers (direct DB contract)
# --------------------------------------------------------------------------- #
def test_ledger_add_get_mark(monkeypatch):
"""add -> open set; mark_deleted -> drops out; INSERT OR IGNORE idempotent."""
tid = _mk_task()
add_tracker_message(tid, 10)
add_tracker_message(tid, 11)
add_tracker_message(tid, 10) # duplicate -> ignored, no resurrection
assert get_open_tracker_messages(tid) == [10, 11]
mark_tracker_message_deleted(tid, 10)
assert get_open_tracker_messages(tid) == [11]
# re-add of a deleted mid is ignored (PK exists) -> stays deleted.
add_tracker_message(tid, 10)
assert get_open_tracker_messages(tid) == [11]
# --------------------------------------------------------------------------- #
# TC-01: bump deletes ALL known open mids, not just the last
# --------------------------------------------------------------------------- #
def test_bump_deletes_all_open_mids(monkeypatch):
"""TC-01/AC-1.2: every still-open card is deleted on the next bump."""
tid = _mk_task()
# Three orphans accumulated in the ledger from earlier desyncs.
for m in (100, 101, 102):
add_tracker_message(tid, m)
set_tracker_message_id(tid, 102) # scalar only knows the last one
deleted = []
monkeypatch.setattr(N, "delete_telegram",
lambda mid: deleted.append(mid) or True)
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False: 200)
N.update_task_tracker(tid)
assert sorted(deleted) == [100, 101, 102] # ALL open mids deleted
# Old ones marked gone; only the new card is open.
assert get_open_tracker_messages(tid) == [200]
assert get_tracker_message_id(tid) == 200
# --------------------------------------------------------------------------- #
# TC-02: send -> None keeps the ledger/pointer intact (BR-6 / R-3)
# --------------------------------------------------------------------------- #
def test_send_none_keeps_ledger_and_pointer(monkeypatch):
"""TC-02/AC-1.3: send fails -> no new mid recorded, pointer not wiped."""
tid = _mk_task()
add_tracker_message(tid, 100)
set_tracker_message_id(tid, 100)
# delete fails transiently so 100 stays open (alive); send returns None.
monkeypatch.setattr(N, "delete_telegram", lambda mid: False)
sends = []
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False:
sends.append(1) or None)
N.update_task_tracker(tid) # must not raise
assert len(sends) == 1 # exactly one attempt
assert get_tracker_message_id(tid) == 100 # pointer preserved
assert get_open_tracker_messages(tid) == [100] # 100 still tracked for retry
# --------------------------------------------------------------------------- #
# TC-03: delete False -> stays open; "already gone" -> dropped
# --------------------------------------------------------------------------- #
def test_delete_transient_stays_open_gone_dropped(monkeypatch):
"""TC-03: transient-delete mid retried next bump; gone mid excluded."""
tid = _mk_task()
add_tracker_message(tid, 100) # will fail transiently -> stays
add_tracker_message(tid, 101) # will be 'gone' (True) -> dropped
def _del(mid):
return mid != 100 # 100 -> False (transient), 101 -> True (gone)
monkeypatch.setattr(N, "delete_telegram", _del)
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False: 300)
N.update_task_tracker(tid)
# 100 still open (retry), 101 marked deleted, 300 new card open.
assert set(get_open_tracker_messages(tid)) == {100, 300}
assert get_tracker_message_id(tid) == 300
# --------------------------------------------------------------------------- #
# TC-04: rapid repeats / race -> one live card, <=1 send per call
# --------------------------------------------------------------------------- #
def test_repeated_bumps_converge_to_one_card(monkeypatch):
"""TC-04/AC-X.1: repeated bumps self-heal to exactly one open card."""
tid = _mk_task()
seq = iter([501, 502, 503, 504])
sends_per_call = []
def _send(text, disable_notification=False):
sends_per_call.append(1)
return next(seq)
monkeypatch.setattr(N, "delete_telegram", lambda mid: True)
monkeypatch.setattr(N, "send_telegram", _send)
for _ in range(4):
before = len(sends_per_call)
N.update_task_tracker(tid)
assert len(sends_per_call) - before == 1 # <=1 send per call
# After the last bump only the newest card is open; all earlier deleted.
assert get_open_tracker_messages(tid) == [504]
assert get_tracker_message_id(tid) == 504
# --------------------------------------------------------------------------- #
# TC-05: ledger survives a "restart" (read from DB) -> old cards cleaned
# --------------------------------------------------------------------------- #
def test_ledger_survives_restart(monkeypatch):
"""TC-05/AC-1.3: mids persisted in DB are cleaned on the next bump."""
tid = _mk_task()
# Simulate a previous process that created two cards but lost the scalar to
# one of them (orphan): both are in the ledger though.
add_tracker_message(tid, 700)
add_tracker_message(tid, 701)
set_tracker_message_id(tid, 701) # scalar lost 700
deleted = []
monkeypatch.setattr(N, "delete_telegram",
lambda mid: deleted.append(mid) or True)
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False: 800)
# "Fresh process" reads the ledger straight from the DB.
N.update_task_tracker(tid)
assert sorted(deleted) == [700, 701] # the orphan 700 is reaped too
assert get_open_tracker_messages(tid) == [800]
# --------------------------------------------------------------------------- #
# never-raise on ledger/DB explosion
# --------------------------------------------------------------------------- #
def test_bump_never_raises_on_ledger_error(monkeypatch):
"""AC-X.2: a ledger read blowing up does not break the bump path."""
tid = _mk_task()
monkeypatch.setattr(N, "get_open_tracker_messages",
lambda task_id: (_ for _ in ()).throw(RuntimeError("db")),
raising=False)
# Even if the import-bound name is used, force the failure via db module too.
monkeypatch.setattr(db_module, "get_open_tracker_messages",
lambda task_id: (_ for _ in ()).throw(RuntimeError("db")),
raising=False)
sent = []
monkeypatch.setattr(N, "delete_telegram", lambda mid: True)
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False:
sent.append(1) or 900)
# Must not raise; still sends the fresh card.
N.update_task_tracker(tid)
assert sent == [1]

View File

@@ -191,9 +191,13 @@ def test_render_done_has_times_and_links():
assert "\u0413\u041e\u0422\u041e\u0412\u041e" in text
# ⏱️ with three times
assert "\u23f1\ufe0f" in text
assert "\u0412\u0441\u0435\u0433\u043e" in text
assert "\u0430\u0433\u0435\u043d\u0442\u044b" in text
assert "\u0442\u0432\u043e\u0451" in text
# ORCH-087 (BR-G5): three explicitly-labelled metrics
# "\u0410\u0433\u0435\u043d\u0442\u044b \u2026 \u00b7 \u0442\u0432\u043e\u0451 \u2026 \u00b7 \u043e\u0431\u0449\u0435\u0435 \u0441 \u043e\u0436\u0438\u0434\u0430\u043d\u0438\u0435\u043c \u2026" (was "\u0412\u0441\u0435\u0433\u043e \u2026 \u00b7 \u0430\u0433\u0435\u043d\u0442\u044b \u2026 \u00b7 \u0442\u0432\u043e\u0451 \u2026").
assert "\u0410\u0433\u0435\u043d\u0442\u044b" in text # \u0410\u0433\u0435\u043d\u0442\u044b
assert "\u0442\u0432\u043e\u0451" in text # \u0442\u0432\u043e\u0451
# \u043e\u0431\u0449\u0435\u0435 \u0441 \u043e\u0436\u0438\u0434\u0430\u043d\u0438\u0435\u043c
assert "\u043e\u0431\u0449\u0435\u0435 \u0441 \u043e\u0436\u0438\u0434\u0430\u043d\u0438\u0435\u043c" in text
assert "\u0412\u0441\u0435\u0433\u043e" not in text # old "\u0412\u0441\u0435\u0433\u043e" label gone
# 📦 deployed line
assert "\U0001f4e6" in text

View File

@@ -0,0 +1,183 @@
"""ORCH-087: effort-in-stage-line (BR-EFF), honest done-time (BR-G5),
deterministic stage labels (G2) and deploy-cycle label (G3).
Telegram/Plane fully isolated (render is pure DB). Covers TC-06, TC-11..TC-15
and the confirm_deploy live-overlay label.
"""
import os
import tempfile
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_eff_time.db")
os.environ["ORCH_DB_PATH"] = _test_db
import pytest # noqa: E402
import src.db as db_module # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import notifications as N # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# No live overlay in render-only tests unless a test opts in.
monkeypatch.setattr(N._get_settings(), "tracker_live_status", False, raising=False)
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
def _mk_task(stage="development", wid="ORCH-087", title="eff/time test",
brd_start=None, brd_end=None, created=None, updated=None):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, "
"brd_review_started_at, brd_review_ended_at) VALUES (?,?,?,?,?,?,?,?)",
("p1", wid, "orchestrator", "feature/ORCH-087-x", stage, title,
brd_start, brd_end),
)
tid = cur.lastrowid
if created or updated:
conn.execute(
"UPDATE tasks SET created_at=COALESCE(?, created_at), "
"updated_at=COALESCE(?, updated_at) WHERE id=?",
(created, updated, tid),
)
conn.commit()
conn.close()
return tid
def _mk_run(tid, agent, started, finished, *, effort=None, model="tokenator/claude-opus-4-8",
in_tok=10, out_tok=5, cost=0.0, exit_code=0):
conn = get_db()
conn.execute(
"INSERT INTO agent_runs (task_id, agent, started_at, finished_at, "
"exit_code, input_tokens, output_tokens, cost_usd, model, effort) "
"VALUES (?,?,?,?,?,?,?,?,?,?)",
(tid, agent, started, finished, exit_code, in_tok, out_tok, cost, model, effort),
)
conn.commit()
conn.close()
# --------------------------------------------------------------------------- #
# G2: plane_status_label deterministic for every stage (TC-06)
# --------------------------------------------------------------------------- #
def test_plane_status_label_all_stages():
"""TC-06/AC-2.2: every stage maps to its own label; deploy -> Awaiting Deploy."""
cases = {
"created": "To Analyse",
"analysis": "Analysis",
"architecture": "Architecture",
"development": "Development",
"review": "Code-Review",
"testing": "Testing",
"done": "Done",
}
for stage, expected in cases.items():
assert N.plane_status_label({"stage": stage}) == expected
deploy = N.plane_status_label({"stage": "deploy"})
assert "Awaiting Deploy" in deploy
# In Review derives from the brd-clock on the analysis stage.
in_review = N.plane_status_label(
{"stage": "analysis", "brd_review_started_at": "2026-06-04 10:00:00",
"brd_review_ended_at": None}
)
assert "In Review" in in_review
def test_confirm_deploy_label_registered():
"""G3/AC-3.x: the deploy-cycle gains a confirm_deploy overlay label."""
assert "confirm_deploy" in N._LIVE_BRANCH_LABELS
assert "Confirm Deploy" in N._LIVE_BRANCH_LABELS["confirm_deploy"]
# confirm_deploy is a REAL dedicated status -> no base-alias suppression.
assert "confirm_deploy" not in N._LIVE_BRANCH_BASE
# --------------------------------------------------------------------------- #
# BR-EFF: effort rendered next to the model (TC-11, TC-12)
# --------------------------------------------------------------------------- #
@pytest.mark.parametrize("agent,label,effort", [
("developer", "Разработка", "xhigh"),
("tester", "Тестирование", "medium"),
("deployer", "Внедрение", "medium"),
("analyst", "Анализ", "high"),
("architect", "Архитектура", "high"),
("reviewer", "Код ревью", "high"),
])
def test_stage_line_shows_effort(agent, label, effort):
"""TC-11/AC-E.2,AC-E.3: stage line shows '· model · effort' for each role."""
tid = _mk_task(stage="done")
_mk_run(tid, agent, "2026-06-04 09:00:00", "2026-06-04 09:10:00", effort=effort)
text = N.render_task_tracker(tid)
line = [ln for ln in text.splitlines() if ln.startswith(f"{label}")][0]
assert line.rstrip().endswith(f"opus-4-8 · {effort}")
def test_stage_line_omits_empty_effort():
"""TC-12/AC-E.4: NULL effort -> suffix omitted, render does not crash."""
tid = _mk_task(stage="analysis")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", effort=None)
text = N.render_task_tracker(tid)
line = [ln for ln in text.splitlines() if ln.startswith("✅ Анализ")][0]
# Ends at the model (no trailing effort segment).
assert line.rstrip().endswith("opus-4-8")
# --------------------------------------------------------------------------- #
# BR-G5: honest done-time (TC-13, TC-14, TC-15)
# --------------------------------------------------------------------------- #
def test_done_review_time_capped():
"""TC-13/AC-5.1: a ~6h open brd_review window is NOT shown as ~6h."""
# 6h review window (10:00 -> 16:00) with default 2h cap.
tid = _mk_task(
stage="done",
brd_start="2026-06-04 10:00:00", brd_end="2026-06-04 16:00:00",
created="2026-06-04 09:00:00", updated="2026-06-04 16:30:00",
)
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:30:00", effort="high")
text = N.render_task_tracker(tid)
time_line = [ln for ln in text.splitlines() if ln.startswith("")][0]
# Capped to ~2h (120м), marked with '~'; the raw 360m is NOT shown as твоё.
assert "твоё ~120м" in time_line
assert "твоё 360м" not in time_line
def test_done_review_time_under_cap_uncapped():
"""AC-5.1: a normal short review window is shown verbatim (no '~')."""
tid = _mk_task(
stage="done",
brd_start="2026-06-04 10:00:00", brd_end="2026-06-04 10:08:00",
created="2026-06-04 09:00:00", updated="2026-06-04 10:30:00",
)
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:30:00", effort="high")
text = N.render_task_tracker(tid)
time_line = [ln for ln in text.splitlines() if ln.startswith("")][0]
assert "твоё 8м" in time_line
assert "~" not in time_line
def test_done_time_line_labels_and_agent_sum():
"""TC-14,TC-15/AC-5.2,AC-5.3: agents=Σ runs; wall labelled 'общее с ожиданием'."""
tid = _mk_task(
stage="done",
created="2026-06-04 09:00:00", updated="2026-06-04 11:00:00", # wall 120m
)
# Two runs: 10m + 6m = 16m of agent time.
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00", effort="high")
_mk_run(tid, "deployer", "2026-06-04 10:50:00", "2026-06-04 10:56:00", effort="medium")
text = N.render_task_tracker(tid)
time_line = [ln for ln in text.splitlines() if ln.startswith("")][0]
# agents = 16m (exact Σ), wall = 120m labelled as "общее с ожиданием".
assert "Агенты 16м" in time_line # Агенты 16м
assert "общее с ожиданием 120м" in time_line # общее с ожиданием 120м
# wall (120m) != agents (16m) -> not presented as a sum.
assert "Всего" not in time_line # no old "Всего"