Compare commits

...

4 Commits

Author SHA1 Message Date
dev-agent
4e4cc6c724 fix(qg): find 14-deploy-log.md in origin/main when absent in feature worktree
ET-013: deployer writes 14-deploy-log.md and merges deploy artifacts into
main via a separate PR, so the log lands in origin/main, not the feature
branch worktree that check_deploy_status reads via _repo_path(repo, branch).
Result: every successful deploy was falsely failed (Deploy log not found)
and rolled back deploy->development.

Fix: when the log is absent in the worktree, fall back to reading it from
origin/main on the shared clone (git fetch origin main + git show
origin/main:docs/work-items/<WI>/14-deploy-log.md). Lookup order:
worktree -> origin/main -> not found. Fetch/show failures degrade to
not found (never raise). Does not touch the merge-gate in gitea.py.

Tests: origin/main SUCCESS->PASS (ET-013 case), origin/main FAILED->FAILED,
absent everywhere->not found, fetch failure->degrades no exception,
worktree log short-circuits main lookup.
2026-06-04 13:35:35 +03:00
b222d7af27 Merge pull request 'fix(tracker): no duplicate Telegram messages on not-modified/transient edits' (#22) from fix/tracker-edit-not-modified into main 2026-06-04 13:22:46 +03:00
dev-bot
ec9aa74492 fix(tracker): no duplicate Telegram messages on not-modified/transient edits
edit_telegram now returns a distinguishable outcome (ok|not_modified|gone|
failed) instead of a bare bool. update_task_tracker only sends a NEW message
when the original is truly gone; not_modified and transient failures no longer
spawn duplicate trackers or orphan the live one.

render_task_tracker shows "попытка N" on an actively re-run stage (>=2 agent
runs) so the text changes between review<->development cycles. Finished ()
lines are unchanged.

Tests: edit_telegram classification (ok/not_modified/gone/failed via mocked
httpx), update_task_tracker (not_modified/failed -> no send, gone -> send+id),
render attempt marker.
2026-06-04 13:20:40 +03:00
3e5c74ce4f Merge pull request 'feat(telegram): live editable task tracker (Variant B+)' (#21) from feat/telegram-live-tracker into main 2026-06-04 11:46:21 +03:00
4 changed files with 423 additions and 44 deletions

View File

@@ -68,16 +68,43 @@ def send_telegram(text: str, disable_notification: bool = False):
return None
def edit_telegram(message_id: int, text: str) -> bool:
"""Edit an existing Telegram message. Returns True on success, else False.
# edit_telegram outcome codes -> let update_task_tracker decide what to do:
# "ok" edit applied -> nothing else to do
# "not_modified" Telegram says text is identical (400 "message is not
# modified" / "exactly the same") -> success, NO new message
# "gone" original message can't be edited (deleted / too old /
# invalid id) -> caller must fall back to a NEW message
# "failed" transient failure (network / timeout / 5xx / unknown 400)
# -> caller must NOT send a new message (avoid duplicates)
EDIT_OK = "ok"
EDIT_NOT_MODIFIED = "not_modified"
EDIT_GONE = "gone"
EDIT_FAILED = "failed"
Used by the live tracker to refresh the single per-task message in place.
Never raises. A False return tells the caller to fall back to a new message
(e.g. the message is too old to edit / was deleted / 400).
# Telegram error descriptions that mean the message is permanently un-editable
# (it is gone / orphaned) -> fall back to a fresh message.
_GONE_MARKERS = (
"message to edit not found",
"message can't be edited",
"message_id_invalid",
)
# Telegram "nothing changed" -> treat as success, never a duplicate.
_NOT_MODIFIED_MARKERS = (
"message is not modified",
"exactly the same",
)
def edit_telegram(message_id: int, text: str) -> str:
"""Edit an existing Telegram message. Never raises.
Returns a distinguishable outcome (see EDIT_* constants) so the caller can
tell apart "all good" / "nothing changed" / "message gone" / "transient
failure" and only fall back to a NEW message when the original is truly gone.
"""
s = _get_settings()
if not s.telegram_bot_token or not s.telegram_chat_id:
return False
return EDIT_FAILED
try:
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText"
resp = httpx.post(
@@ -91,9 +118,32 @@ def edit_telegram(message_id: int, text: str) -> bool:
timeout=5,
)
data = resp.json()
return bool(data.get("ok"))
except Exception:
return False
if data.get("ok"):
return EDIT_OK
# ok:false -> inspect the description to classify the 400.
desc = str(data.get("description") or "").lower()
if any(m in desc for m in _NOT_MODIFIED_MARKERS):
# Text is identical between transitions (e.g. repeat review cycle
# renders the same line). Nothing to do, NOT a duplicate.
logger.debug(
f"edit_telegram(mid={message_id}): not modified, skipping"
)
return EDIT_NOT_MODIFIED
if any(m in desc for m in _GONE_MARKERS):
logger.warning(
f"edit_telegram(mid={message_id}): message gone ({desc!r}), "
f"will fall back to a new message"
)
return EDIT_GONE
# Unknown 400 / other non-ok -> transient/unknown, do NOT duplicate.
logger.warning(
f"edit_telegram(mid={message_id}): edit failed ({desc!r})"
)
return EDIT_FAILED
except Exception as e:
# Network / timeout / 5xx -> transient, do NOT duplicate.
logger.warning(f"edit_telegram(mid={message_id}): transient error: {e}")
return EDIT_FAILED
def _get_work_item_id(task_id: int) -> str:
@@ -280,11 +330,35 @@ def render_task_tracker(task_id: int) -> str:
for stage_key, label, agent in _TRACKER_STAGES:
run = last_done.get(agent)
if run is not None:
# The stage is "in progress" only when it is the task's current stage AND
# there is an unfinished run for its agent (the agent is actually still
# working). A finished run with no in-flight run -> show the \u2705 result,
# even if the task still sits in that stage (just-finished snapshot).
agent_runs = agent_runs_by_agent.get(agent, [])
has_inflight = any(ar["finished_at"] is None for ar in agent_runs)
is_active_stage = (
_STAGE_ACTIVE_AGENT.get(stage) == agent
and stage == stage_key
and (has_inflight or run is None)
)
if is_active_stage:
# Live "\U0001f504 ... \u0438\u0434\u0451\u0442" line. Count how many times THIS stage's
# agent has run for this task; a 2nd+ run means we're re-doing the
# stage (e.g. review->development->review), so show "\u043f\u043e\u043f\u044b\u0442\u043a\u0430 N"
# to make the text change between cycles and to honestly show Slava
# the stage is being re-worked.
attempt = len(agent_runs)
if attempt >= 2:
lines.append(
f"\U0001f504 {label} \u00b7 \u043f\u043e\u043f\u044b\u0442\u043a\u0430 {attempt} "
f"\u2026 \u0438\u0434\u0451\u0442"
)
else:
lines.append(
f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442"
)
elif run is not None:
lines.append(_stage_line(label, run))
elif _STAGE_ACTIVE_AGENT.get(stage) == agent and stage == stage_key:
# This stage is the active one and has no finished run yet.
lines.append(f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442")
# else: not started yet -> not shown.
# Insert the BRD review line right after Analysis.
@@ -372,19 +446,32 @@ def update_task_tracker(task_id: int):
"""Render + push the live tracker for a task. Never raises.
First call (no stored tracker_message_id): sendMessage (silent) and store the
returned message_id. Subsequent calls: editMessageText the stored message; if
the edit fails (too old / deleted / 400), fall back to a NEW message and
update the stored id. The tracker is always sent with disable_notification so
it never pings — only the dedicated alert helpers ping.
returned message_id. Subsequent calls: editMessageText the stored message.
A NEW message is sent ONLY when the original is truly gone (deleted / too old
/ invalid id). On "not modified" (text unchanged) or transient failures
(network / timeout / 5xx / unknown 400) we do NOT send a new message — that
is exactly what produced duplicate trackers and orphaned (lagging) messages.
The tracker is always sent with disable_notification so it never pings —
only the dedicated alert helpers ping.
"""
try:
from .db import get_tracker_message_id, set_tracker_message_id
text = render_task_tracker(task_id)
mid = get_tracker_message_id(task_id)
if mid is not None:
if edit_telegram(mid, text):
result = edit_telegram(mid, text)
if result in (EDIT_OK, EDIT_NOT_MODIFIED):
# Edited in place (or nothing to change) -> done, no duplicate.
return
# Edit failed -> fall back to a fresh message.
if result == EDIT_FAILED:
# Transient -> don't duplicate; tracker redraws next transition.
logger.debug(
f"update_task_tracker({task_id}): edit failed transiently, "
f"keeping message {mid}"
)
return
# result == EDIT_GONE -> the stored message is gone; fall through
# to send a fresh one and re-point tracker_message_id at it.
new_mid = send_telegram(text, disable_notification=True)
if new_mid is not None:
set_tracker_message_id(task_id, new_mid)

View File

@@ -2,6 +2,7 @@
import os
import logging
import subprocess
import httpx
from ..config import settings
@@ -281,6 +282,64 @@ def check_tests_local(repo: str, branch: str) -> tuple[bool, str]:
return False, f"Local test run error: {e}"
def _parse_deploy_status(content: str) -> tuple[bool, str]:
"""Parse a 14-deploy-log.md body and map its `deploy_status:` frontmatter to a
quality-gate verdict. Reads ONLY the machine-readable YAML field, never prose.
deploy_status: SUCCESS -> (True, "Deploy status: SUCCESS")
deploy_status: FAILED -> (False, "Deploy status: FAILED")
missing field / no frontmatter / bad YAML -> (False, <reason>)
"""
import yaml
status = None
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in deploy log: {e}"
status = str(fm.get("deploy_status", "")).upper().strip()
if status == "SUCCESS":
return True, "Deploy status: SUCCESS"
if status == "FAILED":
return False, "Deploy status: FAILED"
return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})"
def _deploy_log_from_main(repo: str, work_item_id: str) -> str | None:
"""Best-effort read of 14-deploy-log.md from origin/main on the shared clone.
The deployer writes 14-deploy-log.md and merges the deploy artifacts into main
via a separate PR (see ET-013), so the file lands in origin/main, NOT in the
feature branch worktree the gate normally reads. This recovers it from main.
Degrades gracefully: any git failure (no clone, network/fetch error, file
absent in main) returns None instead of raising, so the caller falls back to
the plain "not found" verdict. Never raises.
"""
repo_clone = os.path.join(settings.repos_dir, repo)
if not os.path.isdir(os.path.join(repo_clone, ".git")):
return None
rel = f"docs/work-items/{work_item_id}/14-deploy-log.md"
try:
# Refresh origin/main so we see freshly-merged deploy artifacts.
subprocess.run(
["git", "-C", repo_clone, "fetch", "origin", "main"],
check=False, capture_output=True, timeout=30,
)
show = subprocess.run(
["git", "-C", repo_clone, "show", f"origin/main:{rel}"],
check=False, capture_output=True, text=True, timeout=15,
)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("deploy-log origin/main lookup failed for %s/%s: %s", repo, work_item_id, e)
return None
if show.returncode != 0:
return None
return show.stdout
def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None) -> tuple[bool, str]:
"""
БАГ 8 fix: gate the deploy -> done transition on the deployer's machine-readable
@@ -291,32 +350,30 @@ def check_deploy_status(repo: str, work_item_id: str, branch: str | None = None)
frontmatter. Returns:
(True, ...) -> deploy_status: SUCCESS
(False, ...) -> deploy_status: FAILED, missing field, or no frontmatter
ET-013 path-sync fix: the deployer writes 14-deploy-log.md and merges the deploy
artifacts into main via a SEPARATE PR, so the log lands in origin/main, not in
the feature-branch worktree this gate reads via _repo_path(repo, branch). If the
file is absent in the worktree we fall back to reading it from origin/main on the
shared clone. Lookup order: worktree -> origin/main -> not found.
"""
import yaml
repo_path = _repo_path(repo, branch)
log_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/14-deploy-log.md")
if not os.path.isfile(log_path):
return False, "Deploy log not found (14-deploy-log.md)"
try:
with open(log_path, "r") as f:
content = f.read()
status = None
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as e:
return False, f"Invalid YAML frontmatter in deploy log: {e}"
status = str(fm.get("deploy_status", "")).upper().strip()
if status == "SUCCESS":
return True, "Deploy status: SUCCESS"
if status == "FAILED":
return False, "Deploy status: FAILED"
return False, f"No machine-readable deploy_status in frontmatter (got: {status!r})"
except OSError as e:
return False, f"Error reading deploy log: {e}"
if os.path.isfile(log_path):
try:
with open(log_path, "r") as f:
content = f.read()
except OSError as e:
return False, f"Error reading deploy log: {e}"
return _parse_deploy_status(content)
# Not in the feature worktree — the deployer may have merged it into main.
main_content = _deploy_log_from_main(repo, work_item_id)
if main_content is not None:
return _parse_deploy_status(main_content)
return False, "Deploy log not found (14-deploy-log.md)"
# Registry for dynamic lookup by name

View File

@@ -242,6 +242,65 @@ class TestCheckDeployStatus:
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is False
# --- ET-013 path-sync fix: log written to origin/main via separate PR ---
def test_origin_main_success_passes_when_absent_in_worktree(self, monkeypatch):
# Deployer merged 14-deploy-log.md into main via a separate PR; it is NOT
# in the feature worktree. Gate must recover it from origin/main -> PASS.
# (This is the exact ET-013 regression.)
monkeypatch.setattr(
"src.qg.checks._deploy_log_from_main",
lambda repo, wi: "---\ndeploy_status: SUCCESS\nversion: v0.0.5\n---\n\nLive.\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-013")
assert passed is True
assert "SUCCESS" in reason
def test_origin_main_failed_fails(self, monkeypatch):
# A genuine FAILED log in main must still fail.
monkeypatch.setattr(
"src.qg.checks._deploy_log_from_main",
lambda repo, wi: "---\ndeploy_status: FAILED\nversion: v0.0.5\n---\n\nboom.\n",
)
passed, reason = check_deploy_status("enduro-trails", "ET-013")
assert passed is False
assert "FAILED" in reason
def test_absent_everywhere_fails(self, monkeypatch):
# Not in worktree and origin/main lookup yields nothing -> not found.
monkeypatch.setattr(
"src.qg.checks._deploy_log_from_main", lambda repo, wi: None
)
passed, reason = check_deploy_status("enduro-trails", "ET-013")
assert passed is False
assert "not found" in reason.lower()
@patch("src.qg.checks.subprocess.run")
@patch("src.qg.checks.os.path.isdir", return_value=True)
def test_fetch_failure_degrades_no_exception(self, mock_isdir, mock_run):
# git fetch/show raising (e.g. network) must degrade to "not found",
# never propagate an exception out of the gate.
import subprocess as _sp
mock_run.side_effect = _sp.TimeoutExpired(cmd="git", timeout=30)
passed, reason = check_deploy_status("enduro-trails", "ET-013")
assert passed is False
assert "not found" in reason.lower()
def test_worktree_log_short_circuits_main_lookup(self, setup_work_item_dir, monkeypatch):
# If the log IS present in the worktree, origin/main must NOT be consulted.
self._write_log(
setup_work_item_dir,
"---\ndeploy_status: SUCCESS\nversion: v0.0.3\n---\n\nDeployed OK.\n",
)
called = {"n": 0}
def _boom(repo, wi):
called["n"] += 1
return None
monkeypatch.setattr("src.qg.checks._deploy_log_from_main", _boom)
passed, reason = check_deploy_status("enduro-trails", "ET-011")
assert passed is True
assert called["n"] == 0
def test_deploy_stage_qg_is_check_deploy_status(self):
assert get_qg_for_stage("deploy") == "check_deploy_status"

View File

@@ -249,7 +249,7 @@ def test_second_call_edits_existing_message(monkeypatch):
edited = {}
monkeypatch.setattr(N, "edit_telegram",
lambda mid, text: edited.update(mid=mid) or True)
lambda mid, text: edited.update(mid=mid) or N.EDIT_OK)
monkeypatch.setattr(N, "send_telegram",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not send when edit succeeds")))
@@ -257,20 +257,196 @@ def test_second_call_edits_existing_message(monkeypatch):
assert edited["mid"] == 777
def test_fallback_to_new_message_when_edit_fails(monkeypatch):
def test_fallback_to_new_message_when_edit_gone(monkeypatch):
"""edit returns 'gone' (message deleted/too old) -> send NEW + update id."""
tid = _mk_task(stage="development")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.1)
from src.db import set_tracker_message_id, get_tracker_message_id
set_tracker_message_id(tid, 100)
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: False) # edit fails
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: N.EDIT_GONE)
monkeypatch.setattr(N, "send_telegram", lambda text, disable_notification=False: 200)
N.update_task_tracker(tid)
assert get_tracker_message_id(tid) == 200 # id updated to the new message
def test_not_modified_does_not_send_new_message(monkeypatch):
"""edit returns 'not_modified' -> NO new message, id unchanged (no dupe)."""
tid = _mk_task(stage="development")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.1)
from src.db import set_tracker_message_id, get_tracker_message_id
set_tracker_message_id(tid, 100)
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: N.EDIT_NOT_MODIFIED)
monkeypatch.setattr(N, "send_telegram",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("must not send on not_modified")))
N.update_task_tracker(tid)
assert get_tracker_message_id(tid) == 100 # unchanged, no duplicate
def test_transient_edit_failure_does_not_send_new_message(monkeypatch):
"""edit returns 'failed' (network/timeout/5xx) -> NO new message (no dupe)."""
tid = _mk_task(stage="development")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.1)
from src.db import set_tracker_message_id, get_tracker_message_id
set_tracker_message_id(tid, 100)
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: N.EDIT_FAILED)
monkeypatch.setattr(N, "send_telegram",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("must not send on transient failure")))
N.update_task_tracker(tid)
assert get_tracker_message_id(tid) == 100 # unchanged, no duplicate
# --------------------------------------------------------------------------- #
# edit_telegram outcome classification (httpx mocked)
# --------------------------------------------------------------------------- #
def _edit_resp(ok, description=None):
resp = MagicMock()
body = {"ok": ok}
if description is not None:
body["description"] = description
resp.json.return_value = body
return resp
def _patch_tg_creds(monkeypatch):
monkeypatch.setattr(N._get_settings(), "telegram_bot_token", "T", raising=False)
monkeypatch.setattr(N._get_settings(), "telegram_chat_id", "C", raising=False)
def test_edit_telegram_ok(monkeypatch):
_patch_tg_creds(monkeypatch)
with patch("src.notifications.httpx") as hx:
hx.post.return_value = _edit_resp(True)
assert N.edit_telegram(1, "x") == N.EDIT_OK
def test_edit_telegram_not_modified_is_success(monkeypatch):
# 400 "message is not modified" -> success, not gone, no duplicate
_patch_tg_creds(monkeypatch)
with patch("src.notifications.httpx") as hx:
hx.post.return_value = _edit_resp(
False, "Bad Request: message is not modified: ...")
assert N.edit_telegram(1, "x") == N.EDIT_NOT_MODIFIED
def test_edit_telegram_exactly_the_same_is_not_modified(monkeypatch):
_patch_tg_creds(monkeypatch)
with patch("src.notifications.httpx") as hx:
hx.post.return_value = _edit_resp(
False, "Bad Request: specified new message content and reply markup "
"are exactly the same")
assert N.edit_telegram(1, "x") == N.EDIT_NOT_MODIFIED
def test_edit_telegram_message_not_found_is_gone(monkeypatch):
_patch_tg_creds(monkeypatch)
with patch("src.notifications.httpx") as hx:
hx.post.return_value = _edit_resp(
False, "Bad Request: message to edit not found")
assert N.edit_telegram(1, "x") == N.EDIT_GONE
def test_edit_telegram_cant_be_edited_is_gone(monkeypatch):
_patch_tg_creds(monkeypatch)
with patch("src.notifications.httpx") as hx:
hx.post.return_value = _edit_resp(
False, "Bad Request: message can't be edited")
assert N.edit_telegram(1, "x") == N.EDIT_GONE
def test_edit_telegram_unknown_400_is_failed(monkeypatch):
# unknown 400 -> failed (NOT gone) -> caller won't duplicate
_patch_tg_creds(monkeypatch)
with patch("src.notifications.httpx") as hx:
hx.post.return_value = _edit_resp(
False, "Bad Request: some other unexpected error")
assert N.edit_telegram(1, "x") == N.EDIT_FAILED
def test_edit_telegram_timeout_is_failed(monkeypatch):
_patch_tg_creds(monkeypatch)
with patch("src.notifications.httpx") as hx:
hx.post.side_effect = Exception("read timeout")
assert N.edit_telegram(1, "x") == N.EDIT_FAILED
def test_edit_telegram_5xx_is_failed(monkeypatch):
# Telegram 5xx still returns ok:false w/o gone/not_modified markers
_patch_tg_creds(monkeypatch)
with patch("src.notifications.httpx") as hx:
hx.post.return_value = _edit_resp(False, "Internal Server Error")
assert N.edit_telegram(1, "x") == N.EDIT_FAILED
# --------------------------------------------------------------------------- #
# render: repeated stage attempt shows "попытка N"
# --------------------------------------------------------------------------- #
_POPYTKA = "\u043f\u043e\u043f\u044b\u0442\u043a\u0430" # popytka
def test_render_active_stage_shows_attempt_on_second_run():
# Two reviewer runs while in review -> active line shows attempt 2.
tid = _mk_task(stage="review")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
_mk_run(tid, "developer", "2026-06-04 09:10:00", "2026-06-04 09:20:00",
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
# First review run finished (sent back to dev), second review run active.
_mk_run(tid, "reviewer", "2026-06-04 09:20:00", "2026-06-04 09:25:00",
in_tok=10, out_tok=5, cost=0.1, model="vibecode/claude-sonnet-4.6",
exit_code=0)
_mk_run(tid, "reviewer", "2026-06-04 09:30:00", None,
in_tok=0, out_tok=0, exit_code=None)
text = N.render_task_tracker(tid)
active = [l for l in text.splitlines()
if l.startswith("\U0001f504") and "Review" in l][0]
assert _POPYTKA in active
assert "2" in active
assert "\u0438\u0434\u0451\u0442" in active
def test_render_active_stage_no_attempt_on_first_run():
# Single reviewer run -> active line has NO attempt marker.
tid = _mk_task(stage="review")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
_mk_run(tid, "developer", "2026-06-04 09:10:00", "2026-06-04 09:20:00",
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
_mk_run(tid, "reviewer", "2026-06-04 09:20:00", None,
in_tok=0, out_tok=0, exit_code=None)
text = N.render_task_tracker(tid)
active = [l for l in text.splitlines()
if l.startswith("\U0001f504") and "Review" in l][0]
assert _POPYTKA not in active
assert "\u0438\u0434\u0451\u0442" in active
def test_render_finished_lines_unaffected_by_attempt_logic():
# Completed (checkmark) lines never carry an attempt marker.
tid = _mk_task(stage="review")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
# developer ran twice (retry) but is a FINISHED stage now.
_mk_run(tid, "developer", "2026-06-04 09:10:00", "2026-06-04 09:15:00",
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
_mk_run(tid, "developer", "2026-06-04 09:16:00", "2026-06-04 09:20:00",
in_tok=10, out_tok=5, cost=0.1, model="tokenator/claude-opus-4-8")
text = N.render_task_tracker(tid)
for l in text.splitlines():
if l.startswith("\u2705"):
assert _POPYTKA not in l
# --------------------------------------------------------------------------- #
# which alerts are SEPARATE vs tracker-only
# --------------------------------------------------------------------------- #