Compare commits

...

4 Commits

Author SHA1 Message Date
dev-bot
9a0298de9d feat(telegram): live editable task tracker (Variant B+), replace 15-message spam
Replace the ~15 separate Telegram messages per task (agent start/finish, stage
transition, QG-pending, tech noise) with ONE live tracker message edited in
place (editMessageText) on every stage transition. Only attention-worthy events
are still sent as SEPARATE, notifying messages: approve-gate, deploy-fail,
agent-fail, task error.

- db.py: idempotent ALTERs — tasks.tracker_message_id, tasks.title,
  tasks.brd_review_started_at/ended_at, agent_runs.model. Helpers for
  tracker message_id + BRD-review clock.
- usage.py: short_model_name() (strip provider/claude- prefix); parse model
  from result-JSON modelUsage; record_usage persists model.
- notifications.py: render_task_tracker(task_id) (stateless render from
  agent_runs), update_task_tracker (sendMessage->store id->editMessageText with
  fallback to a new message, silent), edit_telegram(). Per-stage line
  in↓/out↑·cost·model, ⏸️ Ревью БРД (human time), 💰 totals, finish block
  (⏱️ wall/agents/yours, 🔗 PR · 📦). notify_* are now tracker-only/log-only
  except the four alerts.
- stage_engine.py: stamp brd_review_ended on analysis->architecture advance.
- webhooks/plane.py: persist task title on creation.
- tests/test_telegram_tracker.py: render, short_model_name, send/edit/fallback,
  separate-vs-silent alert behavior.
2026-06-04 11:42:46 +03:00
2801983d7b Merge pull request 'fix(observability): merge-gate on deploy, full token input, Plane Done, artifact links' (#20) from fix/observability-and-merge-gate into main 2026-06-04 11:21:50 +03:00
Dev Agent
61e26a8930 fix(observability): merge-gate on deploy, full token input, Plane Done, artifact links
1. BUG 8 (second door): merge webhook no longer fake-completes a task at the
   deploy stage; done is gated by the deployer verdict (check_deploy_status).
   Other stages keep merge->done.
2. Token accounting: parse+persist cache_creation_input_tokens (new
   idempotent agent_runs column). usage_comment / task_summary now show the
   FULL input (input + cache_read + cache_creation) with a cached breakdown.
   cost_usd untouched.
3. deploy->done success now forces the Plane issue to terminal Done state.
4. All agents (architect/developer/reviewer/tester/deployer) attach artifact
   links to their finish comment via gitea_public_url.

Tests added for each fix; pytest 244 passed / 9 failed (off-limits HMAC group).
2026-06-04 11:17:58 +03:00
2629dffe1b Merge pull request 'fix(deploy): gate deploy->done on deployer verdict, not LLM exit code' (#19) from fix/deploy-verdict-gate into main 2026-06-04 02:46:52 +03:00
12 changed files with 1368 additions and 50 deletions

View File

@@ -699,12 +699,49 @@ class AgentLauncher:
task_id, work_item_id = row[0], row[1]
if not work_item_id:
return
plane_add_comment(work_item_id, usage_comment(agent, usage), author=agent)
# Observability: every agent's finish comment links its artifact(s)
# (reviewer->12-review, tester->13-test-report, deployer->14-deploy-log,
# architect->ADR, developer->PR/branch). For the developer we resolve the
# open PR number so the link points straight at it.
pr_number = None
if agent == "developer":
pr_number = self._open_pr_number(repo, branch)
plane_add_comment(
work_item_id,
usage_comment(
agent,
usage,
repo=repo,
branch=branch,
work_item_id=work_item_id,
pr_number=pr_number,
),
author=agent,
)
if agent == "deployer":
plane_add_comment(
work_item_id, task_summary_comment(task_id), author="deployer"
)
def _open_pr_number(self, repo: str, branch: str):
"""Return the open PR number for `branch`, or None. Never raises."""
try:
import httpx
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
resp = httpx.get(
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
params={"state": "open", "head": branch},
headers=headers, timeout=5,
)
if resp.status_code == 200:
prs = resp.json()
if prs:
return prs[0].get("number")
except Exception:
pass
return None
def _ensure_pr(self, repo: str, branch: str, run_id: int):
import httpx
owner = settings.gitea_owner

View File

@@ -83,7 +83,32 @@ def init_db():
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
# Observability fix: also persist cache-CREATION input tokens. Claude CLI
# reports the real input split across input_tokens (fresh, ~tens) +
# cache_read_input_tokens (cache hit, millions) + cache_creation_input_tokens
# (writing new cache). Without this column the cache_creation slice is lost
# and the "X in" figure understates the true prompt size. Idempotent ALTER.
_ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER")
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
# Telegram live tracker (feat/telegram-live-tracker): persist the FULL model
# name (e.g. "tokenator/claude-opus-4-8") per agent_runs row so the tracker
# can render a short model tag per stage. Parsed from the run-log result JSON
# (modelUsage key) by the launcher monitor; NULL when unknown. Idempotent ALTER.
_ensure_column(conn, "agent_runs", "model", "TEXT")
# Telegram live tracker: one editable Telegram message per task. We store its
# message_id so each stage transition can editMessageText the same message
# instead of spamming a new one. Idempotent ALTER (safe on the live prod DB).
_ensure_column(conn, "tasks", "tracker_message_id", "INTEGER")
# Telegram live tracker: human-readable task title for the tracker header
# ("🛠️ ET-012 · <title>"). Populated from the Plane work-item name at task
# creation; falls back to the work_item_id when absent. Idempotent ALTER.
_ensure_column(conn, "tasks", "title", "TEXT")
# Telegram live tracker: "BRD review" is the only HUMAN gate time — the delta
# between "BRD ready / approve requested" and the analysis->architecture
# advance (human flipped Plane to Approved). Persisted on the task so the
# tracker can show "твоё время" without recomputing from activity history.
_ensure_column(conn, "tasks", "brd_review_started_at", "TEXT")
_ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT")
conn.commit()
conn.close()
@@ -131,6 +156,71 @@ def update_task_stage(task_id: int, stage: str):
conn.close()
# ---------------------------------------------------------------------------
# Telegram live tracker helpers (feat/telegram-live-tracker)
# ---------------------------------------------------------------------------
def get_tracker_message_id(task_id: int) -> int | None:
"""Return the stored Telegram tracker message_id for a task, or None."""
conn = get_db()
try:
row = conn.execute(
"SELECT tracker_message_id FROM tasks WHERE id=?", (task_id,)
).fetchone()
finally:
conn.close()
return row[0] if row and row[0] is not None else None
def set_tracker_message_id(task_id: int, message_id: int) -> None:
"""Persist the Telegram tracker message_id for a task (idempotent overwrite)."""
conn = get_db()
try:
conn.execute(
"UPDATE tasks SET tracker_message_id=? WHERE id=?",
(message_id, task_id),
)
conn.commit()
finally:
conn.close()
def mark_brd_review_started(task_id: int) -> None:
"""Stamp when BRD review (the human approve gate) started, if not already set.
Idempotent: only sets it the first time (a retried analyst run must not reset
the clock). The delta to brd_review_ended_at is the only "твоё время".
"""
conn = get_db()
try:
conn.execute(
"UPDATE tasks SET brd_review_started_at=datetime('now') "
"WHERE id=? AND brd_review_started_at IS NULL",
(task_id,),
)
conn.commit()
finally:
conn.close()
def mark_brd_review_ended(task_id: int) -> None:
"""Stamp when BRD review ended (analysis->architecture advance / Approved).
Idempotent: only sets it the first time and only if a start exists.
"""
conn = get_db()
try:
conn.execute(
"UPDATE tasks SET brd_review_ended_at=datetime('now') "
"WHERE id=? AND brd_review_started_at IS NOT NULL "
"AND brd_review_ended_at IS NULL",
(task_id,),
)
conn.commit()
finally:
conn.close()
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
"""Generate next work item ID (e.g., ET-003 / ORCH-001).

View File

@@ -1,6 +1,24 @@
"""Notifications and logging for orchestrator events."""
"""Notifications and logging for orchestrator events.
feat/telegram-live-tracker (Variant B+): instead of ~15 separate Telegram
messages per task (agent start / finish / stage transition / QG-pending / tech
noise), the orchestrator now maintains ONE live tracker message per task that is
edited in place (editMessageText) on every stage transition. Only events that
NEED Slava's attention are sent as SEPARATE, notifying messages:
* approve-gate (notify_approve_requested) — BRD/TZ/AC ready, flip to Approved
* deploy failed / rolled back — send_telegram from launcher/engine
* agent failed (exit_code != 0) — send_telegram from launcher
* task error (notify_error)
The tracker itself is edited SILENTLY (disable_notification: true). Stage-change,
agent-start, agent-finish and QG-pending no longer emit their own messages — they
just refresh the tracker (or are log-only).
"""
import html
import logging
import httpx
logger = logging.getLogger("orchestrator")
@@ -17,25 +35,65 @@ def _get_settings():
return _settings
def send_telegram(text: str):
"""Send notification to Telegram. Fire-and-forget, never raises."""
# --------------------------------------------------------------------------- #
# Low-level Telegram primitives
# --------------------------------------------------------------------------- #
def send_telegram(text: str, disable_notification: bool = False):
"""Send a notification to Telegram. Fire-and-forget, never raises.
Returns the Telegram message_id on success, else None (so callers that want
to track the message — the tracker — can store it; legacy callers ignore it).
"""
s = _get_settings()
if not s.telegram_bot_token or not s.telegram_chat_id:
return
return None
try:
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
httpx.post(
resp = httpx.post(
url,
json={
"chat_id": s.telegram_chat_id,
"text": text,
"parse_mode": "HTML",
"disable_notification": False,
"disable_notification": disable_notification,
},
timeout=5,
)
data = resp.json()
if data.get("ok"):
return data["result"]["message_id"]
except Exception:
pass # Never crash orchestrator due to notification failure
return None
def edit_telegram(message_id: int, text: str) -> bool:
"""Edit an existing Telegram message. Returns True on success, else False.
Used by the live tracker to refresh the single per-task message in place.
Never raises. A False return tells the caller to fall back to a new message
(e.g. the message is too old to edit / was deleted / 400).
"""
s = _get_settings()
if not s.telegram_bot_token or not s.telegram_chat_id:
return False
try:
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/editMessageText"
resp = httpx.post(
url,
json={
"chat_id": s.telegram_chat_id,
"message_id": message_id,
"text": text,
"parse_mode": "HTML",
},
timeout=5,
)
data = resp.json()
return bool(data.get("ok"))
except Exception:
return False
def _get_work_item_id(task_id: int) -> str:
@@ -50,26 +108,318 @@ def _get_work_item_id(task_id: int) -> str:
return f"task-{task_id}"
# --------------------------------------------------------------------------- #
# Live task tracker
# --------------------------------------------------------------------------- #
# Pipeline stages shown in the tracker, in order, with their display label and
# the agent whose agent_runs rows describe that stage's work. "Ревью БРД" is NOT
# an agent stage — it is the human approve gate rendered between Analysis and
# Architecture from the task's brd_review_* timestamps.
_TRACKER_STAGES = [
("analysis", "Analysis", "analyst"),
("architecture", "Architecture", "architect"),
("development", "Development", "developer"),
("review", "Review", "reviewer"),
("testing", "Testing", "tester"),
("deploy", "Deploy", "deployer"),
]
# Map a pipeline stage -> the agent that is RUNNING while the task sits in it.
# (development is entered after architecture finishes, etc.) Used to render the
# "🔄 <Stage> … идёт" line for the currently-active stage.
_BRD_LABEL = "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" # "Ревью БРД"
_STAGE_ACTIVE_AGENT = {
"analysis": "analyst",
"architecture": "architect",
"development": "developer",
"review": "reviewer",
"testing": "tester",
"deploy": "deployer",
}
def _fmt_minutes(seconds) -> str:
"""Render a duration in whole minutes: 0..59s -> '<1м', else '<n>м'."""
try:
seconds = int(seconds or 0)
except (TypeError, ValueError):
seconds = 0
if seconds <= 0:
return ""
if seconds < 60:
return "<1м"
return f"{seconds // 60}\u043c"
def _parse_sql_ts(ts):
"""Parse a SQLite 'YYYY-MM-DD HH:MM:SS' UTC timestamp -> aware datetime/None."""
if not ts:
return None
from datetime import datetime, timezone
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(str(ts)[:19], fmt).replace(tzinfo=timezone.utc)
except (ValueError, TypeError):
continue
return None
def _duration_seconds(started, finished):
"""Seconds between two SQL timestamps; None if either is missing/unparseable."""
a = _parse_sql_ts(started)
b = _parse_sql_ts(finished)
if a is None or b is None:
return None
return max(int((b - a).total_seconds()), 0)
def render_task_tracker(task_id: int) -> str:
"""Build the full live-tracker text for a task from the DB (stateless render).
Pulls the task header (work_item_id, title, stage), every agent_runs row, and
the BRD-review timestamps, then renders:
- one '✅ <Stage> <dur> · <in>↓/<out>↑ · <cost> · <model>' line per finished
stage (latest run per stage),
- the '⏸️ Ревью БРД <dur> · твоё время[ ⏳]' line between Analysis/Architecture,
- a '🔄 <Stage> … идёт' line for the active (in-progress) stage,
- the '💰 <in>↓ / <out>↑ · <cost>' totals,
- on done: '⏱️ Всего .. · агенты .. · твоё ..' and a '🔗 PR / 📦' line.
Never raises (returns a minimal fallback string on error).
"""
from .db import get_db
from .usage import fmt_tokens, fmt_cost, _input_total, short_model_name
try:
conn = get_db()
task = conn.execute(
"SELECT id, work_item_id, title, stage, created_at, updated_at, "
"brd_review_started_at, brd_review_ended_at "
"FROM tasks WHERE id=?",
(task_id,),
).fetchone()
if not task:
conn.close()
return f"task-{task_id}"
runs = conn.execute(
"SELECT agent, started_at, finished_at, exit_code, input_tokens, "
"output_tokens, cache_read_tokens, cache_creation_tokens, cost_usd, model "
"FROM agent_runs WHERE task_id=? ORDER BY id ASC",
(task_id,),
).fetchall()
conn.close()
except Exception as e:
logger.warning(f"render_task_tracker({task_id}) DB error: {e}")
return f"task-{task_id}"
work_item_id = task["work_item_id"] or f"task-{task_id}"
title = task["title"] or work_item_id
stage = task["stage"] or "created"
done = stage == "done"
# Latest completed run per agent (a stage may have multiple runs on retry;
# we show the most recent FINISHED, successful run for the stage line).
last_done = {}
agent_runs_by_agent = {}
for r in runs:
agent_runs_by_agent.setdefault(r["agent"], []).append(r)
if r["finished_at"] and (r["exit_code"] == 0 or r["exit_code"] is None):
last_done[r["agent"]] = r
# Totals across ALL runs (every input/output token + cost counts).
total_in = 0
total_out = 0
total_cost = 0.0
agent_seconds = 0
for r in runs:
usage = {
"input_tokens": r["input_tokens"],
"cache_read_tokens": r["cache_read_tokens"],
"cache_creation_tokens": r["cache_creation_tokens"],
}
total_in += _input_total(usage)
total_out += int(r["output_tokens"] or 0)
total_cost += float(r["cost_usd"] or 0.0)
d = _duration_seconds(r["started_at"], r["finished_at"])
if d is not None:
agent_seconds += d
esc_title = html.escape(title)
header = (
f"\U0001f389 {html.escape(work_item_id)} \u00b7 {esc_title} \u2014 \u0413\u041e\u0422\u041e\u0412\u041e"
if done
else f"\U0001f6e0\ufe0f {html.escape(work_item_id)} \u00b7 {esc_title}"
)
bar = "\u2501" * 22
lines = [header, bar]
def _stage_line(label, run):
usage = {
"input_tokens": run["input_tokens"],
"cache_read_tokens": run["cache_read_tokens"],
"cache_creation_tokens": run["cache_creation_tokens"],
}
in_tok = fmt_tokens(_input_total(usage))
out_tok = fmt_tokens(run["output_tokens"])
cost = fmt_cost(run["cost_usd"])
dur = _fmt_minutes(_duration_seconds(run["started_at"], run["finished_at"]))
model = short_model_name(run["model"])
model_suffix = f" \u00b7 {model}" if model else ""
return (
f"\u2705 {label:<13} {dur} \u00b7 "
f"{in_tok}\u2193/{out_tok}\u2191 \u00b7 {cost}{model_suffix}"
)
# BRD review line: between Analysis and Architecture, only once Analysis has
# produced a run (i.e. the gate is live). Time = human review delta.
brd_started = task["brd_review_started_at"]
brd_ended = task["brd_review_ended_at"]
review_seconds = _duration_seconds(brd_started, brd_ended)
for stage_key, label, agent in _TRACKER_STAGES:
run = last_done.get(agent)
if run is not None:
lines.append(_stage_line(label, run))
elif _STAGE_ACTIVE_AGENT.get(stage) == agent and stage == stage_key:
# This stage is the active one and has no finished run yet.
lines.append(f"\U0001f504 {label:<13} \u2026 \u00b7 \u0438\u0434\u0451\u0442")
# else: not started yet -> not shown.
# Insert the BRD review line right after Analysis.
if stage_key == "analysis" and brd_started:
brd_label = f"{_BRD_LABEL:<13}"
if review_seconds is not None:
dur = _fmt_minutes(review_seconds)
lines.append(
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f"
)
else:
# Still waiting on the human (ended not stamped yet).
from datetime import datetime, timezone
start_dt = _parse_sql_ts(brd_started)
waited = None
if start_dt is not None:
waited = int(
(datetime.now(timezone.utc) - start_dt).total_seconds()
)
dur = _fmt_minutes(waited) if waited is not None else "\u2026"
lines.append(
f"\u23f8\ufe0f {brd_label} {dur} \u00b7 \u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f \u23f3"
)
lines.append(bar)
lines.append(
f"\U0001f4b0 {fmt_tokens(total_in)}\u2193 / {fmt_tokens(total_out)}\u2191 \u00b7 "
f"{fmt_cost(total_cost)}"
)
if done:
wall = _duration_seconds(task["created_at"], task["updated_at"])
wall_str = _fmt_minutes(wall) if wall is not None else "?"
review_str = _fmt_minutes(review_seconds) if review_seconds else ""
lines.append(
f"\u23f1\ufe0f \u0412\u0441\u0435\u0433\u043e {wall_str} \u00b7 "
f"\u0430\u0433\u0435\u043d\u0442\u044b {_fmt_minutes(agent_seconds)} \u00b7 "
f"\u0442\u0432\u043e\u0451 {review_str}"
)
link = _done_link(task_id, task["work_item_id"])
if link:
lines.append(link)
return "\n".join(lines)
def _done_link(task_id: int, work_item_id) -> str | None:
"""Build the final '🔗 PR #n · 📦 deployed' line. Never raises -> None."""
try:
from .config import settings
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT repo, branch FROM tasks WHERE id=?", (task_id,)
).fetchone()
conn.close()
if not row:
return None
repo, branch = row["repo"], row["branch"]
pr_part = None
try:
owner = settings.gitea_owner
headers = {"Authorization": f"token {settings.gitea_token}"}
resp = httpx.get(
f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/pulls",
params={"state": "all", "head": branch},
headers=headers, timeout=5,
)
if resp.status_code == 200:
prs = resp.json()
if prs:
pr_part = f"\U0001f517 PR #{prs[0].get('number')}"
except Exception:
pr_part = None
parts = []
if pr_part:
parts.append(pr_part)
parts.append("\U0001f4e6 deployed")
return " \u00b7 ".join(parts)
except Exception:
return None
def update_task_tracker(task_id: int):
"""Render + push the live tracker for a task. Never raises.
First call (no stored tracker_message_id): sendMessage (silent) and store the
returned message_id. Subsequent calls: editMessageText the stored message; if
the edit fails (too old / deleted / 400), fall back to a NEW message and
update the stored id. The tracker is always sent with disable_notification so
it never pings — only the dedicated alert helpers ping.
"""
try:
from .db import get_tracker_message_id, set_tracker_message_id
text = render_task_tracker(task_id)
mid = get_tracker_message_id(task_id)
if mid is not None:
if edit_telegram(mid, text):
return
# Edit failed -> fall back to a fresh message.
new_mid = send_telegram(text, disable_notification=True)
if new_mid is not None:
set_tracker_message_id(task_id, new_mid)
except Exception as e:
logger.warning(f"update_task_tracker({task_id}) failed: {e}")
# --------------------------------------------------------------------------- #
# Stage / agent lifecycle notifications (now tracker-only, no separate message)
# --------------------------------------------------------------------------- #
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
"""Log and notify stage transition."""
"""Log a stage transition and refresh the live tracker (no separate message)."""
work_item_id = _get_work_item_id(task_id)
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
if agent:
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
logger.info(msg)
send_telegram(msg)
update_task_tracker(task_id)
def notify_agent_started(run_id: int, agent: str, task_id: int):
"""Notify agent launch."""
"""Log an agent launch and refresh the tracker (no separate message)."""
work_item_id = _get_work_item_id(task_id)
msg = f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})"
logger.info(msg)
send_telegram(msg)
logger.info(f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})")
if task_id:
update_task_tracker(task_id)
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
"""Notify agent completion."""
"""Log agent completion and refresh the tracker (no separate message).
The agent-FAILED alert (exit_code != 0) is still sent separately by the
launcher via send_telegram; this helper itself only logs + refreshes.
"""
work_item_id = _get_work_item_id(task_id) if task_id else "?"
if exit_code == 0:
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
@@ -79,47 +429,66 @@ def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int
else:
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
logger.info(msg)
send_telegram(msg)
if task_id:
update_task_tracker(task_id)
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
"""Notify QG check result."""
"""Log a QG check result (NO separate Telegram message: QG-pending is noise).
Kept for callers; QG outcomes are log-only now and reflected by the tracker
through the resulting stage transition.
"""
work_item_id = _get_work_item_id(task_id)
if passed:
msg = f"\u2705 {work_item_id}: QG {check} \u2014 passed"
logger.info(f"\u2705 {work_item_id}: QG {check} \u2014 passed")
else:
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
logger.info(msg)
send_telegram(msg)
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
"""Log and notify QG check failure."""
"""Log a QG check failure (log-only).
QG-pending / QG-failed are NOT pinged as separate messages anymore (they are
not actionable for Slava). Real rollbacks/deploy-fails are alerted by their
own dedicated send_telegram calls in the engine/launcher.
"""
work_item_id = _get_work_item_id(task_id)
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
logger.warning(msg)
send_telegram(msg)
logger.warning(f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}")
def notify_approve_requested(task_id: int):
"""Notify that analyst requests :approved:."""
"""ALERT (separate, notifying): BRD/TZ/AC ready -> flip Plane to Approved.
Also starts the BRD-review clock and refreshes the tracker so the
'⏸️ Ревью БРД · твоё время ⏳' line appears.
"""
work_item_id = _get_work_item_id(task_id)
msg = f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. \u0416\u0434\u0443 :approved: \u0432 Plane"
try:
from .db import mark_brd_review_started
mark_brd_review_started(task_id)
except Exception as e:
logger.warning(f"notify_approve_requested: brd clock start failed: {e}")
msg = (
f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. "
f"\u041f\u0435\u0440\u0435\u0432\u0435\u0434\u0438\u0442\u0435 \u0437\u0430\u0434\u0430\u0447\u0443 \u0432 \u0441\u0442\u0430\u0442\u0443\u0441 Approved "
f"\u0432 Plane \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u043e\u043b\u0436\u0435\u043d\u0438\u044f."
)
logger.info(msg)
send_telegram(msg)
update_task_tracker(task_id)
send_telegram(msg) # separate, notifying
def notify_done(task_id: int):
"""Notify task completion."""
"""Task completion: refresh the tracker to its final ГОТОВО form (no separate ping)."""
work_item_id = _get_work_item_id(task_id)
msg = f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!"
logger.info(msg)
send_telegram(msg)
logger.info(f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!")
update_task_tracker(task_id)
def notify_error(task_id: int, error: str):
"""Log and notify error for a task."""
"""ALERT (separate, notifying): task error."""
work_item_id = _get_work_item_id(task_id) if task_id else "system"
msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}"
logger.error(msg)
send_telegram(msg)
send_telegram(msg) # separate, notifying

View File

@@ -343,6 +343,17 @@ def set_issue_blocked(work_item_id: str, project_id: str = None):
_set_issue_state_direct(work_item_id, PLANE_STATES["blocked"], project_id)
def set_issue_done(work_item_id: str, project_id: str = None):
"""Observability fix: force the issue into the TERMINAL Done state.
Used by the deploy->done success path so a completed task always reaches the
terminal Plane state (it used to stick on In Progress because the merge
webhook bypassed the stage engine). Uses the existing PLANE_STATES['done']
UUID — the mapping itself is NOT changed.
"""
_set_issue_state_direct(work_item_id, PLANE_STATES["done"], project_id)
def set_issue_in_progress(work_item_id: str, project_id: str = None):
"""Set issue to 'In Progress' state — agent working."""
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"], project_id)

View File

@@ -47,6 +47,7 @@ from .plane_sync import (
set_issue_needs_input,
set_issue_in_progress,
set_issue_blocked,
set_issue_done,
)
from .config import settings
@@ -239,6 +240,15 @@ def advance_stage(
# --- Advance ---------------------------------------------------------
update_task_stage(task_id, next_stage)
# Telegram live tracker: the analysis->architecture advance is the human
# Approved gate clearing -> stamp the END of "Ревью БРД" (the only
# human time). Idempotent: only the first stamp counts.
if current_stage == "analysis" and next_stage == "architecture":
try:
from .db import mark_brd_review_ended
mark_brd_review_ended(task_id)
except Exception as e:
logger.warning(f"Task {task_id}: brd review end stamp failed: {e}")
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
result.advanced = True
@@ -247,6 +257,22 @@ def advance_stage(
f"(auto-advance after {agent})"
)
# --- Terminal sync: deploy -> done must reach Plane's Done -----------
# When the deployer's check_deploy_status passes we advance to the
# terminal 'done' stage. Previously a merged-PR webhook completed the
# task out-of-band and Plane stuck on In Progress. Now done flows through
# here, so explicitly drive the Plane issue into the terminal Done state
# (PLANE_STATES['done'] — mapping unchanged) in addition to the
# stage-change comment above.
if next_stage == "done" and work_item_id:
try:
set_issue_done(work_item_id)
logger.info(
f"Task {task_id}: deploy->done, Plane state forced to Done"
)
except Exception as e:
logger.error(f"Task {task_id}: failed to set Plane Done: {e}")
# --- Launch the next agent (ORCH-4 fix: current_stage, not next) -----
next_agent = get_agent_for_stage(current_stage)
if next_agent:

View File

@@ -31,7 +31,8 @@ def parse_usage_from_text(text: str) -> dict | None:
top-level '{' ... '}' that parses and carries usage/total_cost_usd.
Returns a normalised dict
{input_tokens, output_tokens, cache_read_tokens, cost_usd}
{input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
cost_usd}
(ints / float, missing fields -> 0 / 0.0), or None if no usable JSON found.
"""
if not text:
@@ -71,10 +72,67 @@ def parse_usage_from_text(text: str) -> dict | None:
"cache_read_tokens": _int(
usage.get("cache_read_input_tokens", usage.get("cache_read_tokens"))
),
# The cache-CREATION slice (writing new cache entries) is part of the
# REAL input and used to be dropped on the floor. Persist it so the
# "X in" figure reflects the full prompt size, not just fresh tokens.
"cache_creation_tokens": _int(
usage.get("cache_creation_input_tokens", usage.get("cache_creation_tokens"))
),
"cost_usd": _float(cost),
# Telegram live tracker: the model the run actually used. claude
# --output-format json reports it under modelUsage (a dict keyed by the
# full model id) and/or a top-level "model" field. We keep the FULL name
# here; short_model_name() trims it for the tracker. None when unknown.
"model": _extract_model(candidate),
}
def _extract_model(candidate: dict) -> str | None:
"""Best-effort: pull the model id out of a claude result JSON object.
Prefers modelUsage (a dict keyed by full model ids, e.g.
{"claude-opus-4-8": {...}}) and returns the key with the most output
tokens; falls back to a top-level "model" string. Never raises -> None.
"""
try:
mu = candidate.get("modelUsage")
if isinstance(mu, dict) and mu:
def _out(v):
try:
return int((v or {}).get("outputTokens", 0))
except (TypeError, ValueError, AttributeError):
return 0
best = max(mu.items(), key=lambda kv: _out(kv[1]))
if best and best[0]:
return str(best[0])
model = candidate.get("model")
if isinstance(model, str) and model:
return model
except Exception:
pass
return None
def short_model_name(full: str | None) -> str:
"""Trim a full model id to a short tag for the tracker.
'tokenator/claude-opus-4-8' -> 'opus-4-8'
'vibecode/claude-sonnet-4.6' -> 'sonnet-4.6'
'claude-opus-4-8' -> 'opus-4-8'
Returns '' when full is falsy so callers can omit the ' · <model>' suffix.
"""
if not full:
return ""
name = str(full).strip()
# Drop any provider prefix up to and including the last '/'.
if "/" in name:
name = name.rsplit("/", 1)[-1]
# Drop a leading 'claude-' marketing prefix.
if name.startswith("claude-"):
name = name[len("claude-"):]
return name
def _extract_last_json_object(text: str) -> dict | None:
"""Return the last balanced top-level JSON object in `text` that parses.
@@ -150,12 +208,15 @@ def record_usage(run_id: int, usage: dict | None):
try:
conn.execute(
"UPDATE agent_runs SET input_tokens=?, output_tokens=?, "
"cache_read_tokens=?, cost_usd=? WHERE id=?",
"cache_read_tokens=?, cache_creation_tokens=?, cost_usd=?, "
"model=COALESCE(?, model) WHERE id=?",
(
usage.get("input_tokens"),
usage.get("output_tokens"),
usage.get("cache_read_tokens"),
usage.get("cache_creation_tokens"),
usage.get("cost_usd"),
usage.get("model"),
run_id,
),
)
@@ -197,19 +258,132 @@ AGENT_DISPLAY = {
}
def usage_comment(agent: str, usage: dict | None) -> str:
def _input_total(usage: dict) -> int:
"""FULL input = fresh input + cache-read + cache-creation tokens."""
def _i(k):
try:
return int(usage.get(k) or 0)
except (TypeError, ValueError):
return 0
return _i("input_tokens") + _i("cache_read_tokens") + _i("cache_creation_tokens")
def _cached_total(usage: dict) -> int:
"""Cached portion of the input = cache-read + cache-creation tokens."""
def _i(k):
try:
return int(usage.get(k) or 0)
except (TypeError, ValueError):
return 0
return _i("cache_read_tokens") + _i("cache_creation_tokens")
def fmt_in(usage: dict) -> str:
"""Render the input figure as full total with a cached breakdown.
'8.5M in (8.4M cached)' when there is a cache; '45.2k in' when cached==0.
"""
total = _input_total(usage)
cached = _cached_total(usage)
if cached > 0:
return f"{fmt_tokens(total)} in ({fmt_tokens(cached)} cached)"
return f"{fmt_tokens(total)} in"
def usage_comment(
agent: str,
usage: dict | None,
repo: str | None = None,
branch: str | None = None,
work_item_id: str | None = None,
pr_number=None,
) -> str:
"""Build the per-agent finish comment, e.g.
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 45.2k in / 12.1k out \u00b7 $0.21'.
'\U0001f4bb Developer \u0433\u043e\u0442\u043e\u0432 \u00b7 8.5M in (8.4M cached) / 45.8k out \u00b7 $7.29'.
When repo/branch/work_item_id are supplied, the agent's artifact link(s) are
appended (BUG: only analyst used to link its docs). Missing artifacts are
silently skipped — link building never raises.
"""
usage = usage or {}
name = AGENT_DISPLAY.get(agent, agent.capitalize())
icon = AGENT_ICON.get(agent, "\u2705")
return (
line = (
f"{icon} {name} \u0433\u043e\u0442\u043e\u0432 \u00b7 "
f"{fmt_tokens(usage.get('input_tokens'))} in / "
f"{fmt_in(usage)} / "
f"{fmt_tokens(usage.get('output_tokens'))} out \u00b7 "
f"{fmt_cost(usage.get('cost_usd'))}"
)
links = artifact_links(agent, repo, branch, work_item_id, pr_number)
if links:
line += "\n" + "\n".join(links)
return line
# Per-agent artifact file under docs/work-items/{wid}/ (architect/developer use
# special handling for ADR dirs / PR links, see artifact_links()).
AGENT_ARTIFACT = {
"reviewer": ("Review", "12-review.md"),
"tester": ("Test report", "13-test-report.md"),
"deployer": ("Deploy log", "14-deploy-log.md"),
}
def artifact_links(
agent: str,
repo: str | None,
branch: str | None,
work_item_id: str | None,
pr_number=None,
) -> list[str]:
"""Markdown link(s) to the finishing agent's artifact(s) in Gitea.
Uses gitea_public_url (falls back to gitea_url) for clickable links, mirroring
the analyst doc links. Returns [] (never raises) when there is nothing to
link or the required context is missing. analyst is intentionally NOT handled
here — its richer doc list lives in stage_engine._build_analyst_ready_comment.
"""
try:
from .config import settings
owner = getattr(settings, "gitea_owner", "admin")
base = (
getattr(settings, "gitea_public_url", "") or getattr(settings, "gitea_url", "")
).rstrip("/")
if not base or not repo:
return []
links: list[str] = []
if agent == "developer":
if branch:
links.append(
f"\U0001f4c2 [Branch {branch}]({base}/{owner}/{repo}/src/branch/{branch})"
)
if pr_number:
links.append(
f"\U0001f517 [PR #{pr_number}]({base}/{owner}/{repo}/pulls/{pr_number})"
)
return links
if agent == "architect":
if branch and work_item_id:
adr_dir = (
f"{base}/{owner}/{repo}/src/branch/{branch}/"
f"docs/work-items/{work_item_id}/06-adr"
)
links.append(f"\U0001f4d0 [ADR]({adr_dir})")
return links
spec = AGENT_ARTIFACT.get(agent)
if spec and branch and work_item_id:
label, fname = spec
href = (
f"{base}/{owner}/{repo}/src/branch/{branch}/"
f"docs/work-items/{work_item_id}/{fname}"
)
links.append(f"\U0001f4c4 [{label}]({href})")
return links
except Exception:
return []
AGENT_ICON = {
@@ -225,13 +399,22 @@ AGENT_ICON = {
def task_usage_summary(task_id: int) -> dict:
"""Aggregate agent_runs usage for a task.
Returns {total_in, total_out, total_cost, per_agent: [(agent, in, out, cost), ...]}.
total_in counts the FULL input (input + cache_read + cache_creation), and
total_cached counts the cached portion (cache_read + cache_creation).
COALESCE(...,0) keeps pre-existing rows (NULL cache_creation) from breaking.
Returns {total_in, total_cached, total_out, total_cost,
per_agent: [(agent, in, cached, out, cost), ...]}.
"""
conn = get_db()
try:
rows = conn.execute(
"SELECT agent, "
"COALESCE(SUM(input_tokens),0), "
"COALESCE(SUM(input_tokens),0) "
" + COALESCE(SUM(cache_read_tokens),0) "
" + COALESCE(SUM(cache_creation_tokens),0), "
"COALESCE(SUM(cache_read_tokens),0) "
" + COALESCE(SUM(cache_creation_tokens),0), "
"COALESCE(SUM(output_tokens),0), "
"COALESCE(SUM(cost_usd),0.0) "
"FROM agent_runs WHERE task_id=? GROUP BY agent ORDER BY agent",
@@ -239,12 +422,14 @@ def task_usage_summary(task_id: int) -> dict:
).fetchall()
finally:
conn.close()
per_agent = [(r[0], int(r[1]), int(r[2]), float(r[3])) for r in rows]
per_agent = [(r[0], int(r[1]), int(r[2]), int(r[3]), float(r[4])) for r in rows]
total_in = sum(r[1] for r in per_agent)
total_out = sum(r[2] for r in per_agent)
total_cost = sum(r[3] for r in per_agent)
total_cached = sum(r[2] for r in per_agent)
total_out = sum(r[3] for r in per_agent)
total_cost = sum(r[4] for r in per_agent)
return {
"total_in": total_in,
"total_cached": total_cached,
"total_out": total_out,
"total_cost": total_cost,
"per_agent": per_agent,
@@ -254,15 +439,26 @@ def task_usage_summary(task_id: int) -> dict:
def task_summary_comment(task_id: int) -> str:
"""Build the Deployer end-of-task summary comment (Feature 4, variant B)."""
s = task_usage_summary(task_id)
cached = s.get("total_cached", 0)
head_in = (
f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434 ({fmt_tokens(cached)} cached)"
if cached > 0
else f"{fmt_tokens(s['total_in'])} \u0432\u0445\u043e\u0434"
)
lines = [
f"\U0001f4ca \u0418\u0442\u043e\u0433\u043e \u043f\u043e \u0437\u0430\u0434\u0430\u0447\u0435: "
f"{fmt_tokens(s['total_in'])} \u0442\u043e\u043a\u0435\u043d\u043e\u0432 \u0432\u0445\u043e\u0434 / "
f"{head_in} / "
f"{fmt_tokens(s['total_out'])} \u0432\u044b\u0445\u043e\u0434 \u00b7 "
f"{fmt_cost(s['total_cost'])}"
]
for agent, ti, to, cost in s["per_agent"]:
for agent, ti, tc, to, cost in s["per_agent"]:
name = AGENT_DISPLAY.get(agent, agent.capitalize())
in_str = (
f"{fmt_tokens(ti)} in ({fmt_tokens(tc)} cached)"
if tc > 0
else f"{fmt_tokens(ti)} in"
)
lines.append(
f"\u2022 {name}: {fmt_tokens(ti)} in / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
f"\u2022 {name}: {in_str} / {fmt_tokens(to)} out \u00b7 {fmt_cost(cost)}"
)
return "\n".join(lines)

View File

@@ -334,6 +334,20 @@ async def handle_pr(payload: dict):
logger.error(f"Task {task_id}: max retries reached, needs manual intervention")
elif action == "closed" and pr.get("merged", False):
# BUG 8 (second door): at the deploy stage `done` is gated by the
# deployer's verdict (check_deploy_status via advance_stage), NOT by the
# fact that the PR was merged. The deployer merges the PR at the START of
# its run, so a merged webhook arrives ~30s later while the deployer is
# still working — blindly setting done here would fake-complete the task
# and discard a later deploy_status: FAILED verdict. advance_stage will
# drive deploy→done (and Plane→Done) when the deployer job finishes.
# For every OTHER stage the merge-driven done behaviour is preserved.
if current_stage == "deploy":
logger.info(
f"Task {task_id}: PR merged at deploy stage — done gated by "
f"deployer verdict (check_deploy_status), ignoring merge-driven done."
)
return
update_task_stage(task_id, "done")
notify_stage_change(task_id, current_stage, "done")
logger.info(f"Task {task_id}: PR merged, stage → done")

View File

@@ -494,8 +494,9 @@ async def start_pipeline(data: dict, project_id: str = ""):
# Insert task into DB
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) VALUES (?, ?, ?, ?, ?, ?)",
(plane_id, work_item_id, repo, branch, "analysis", plane_id),
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(plane_id, work_item_id, repo, branch, "analysis", plane_id, name),
)
conn.commit()
conn.close()

View File

@@ -69,6 +69,7 @@ def silence_side_effects(monkeypatch):
"set_issue_needs_input",
"set_issue_in_progress",
"set_issue_blocked",
"set_issue_done",
):
monkeypatch.setattr(stage_engine, name, MagicMock())
@@ -177,6 +178,40 @@ class TestHappyPathAgentSelection:
assert res.enqueued_agent is None
assert _jobs() == []
def test_deploy_success_syncs_plane_to_terminal_done(self, monkeypatch):
"""FIX 3: a successful deploy->done forces the Plane issue to terminal Done.
Previously the task could stick on In Progress because the merge webhook
completed it out-of-band. Now the engine drives set_issue_done() on the
deploy->done success transition.
"""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task("deploy", wi="ET-012")
res = advance_stage(
task_id, "deploy", "enduro-trails", "ET-012",
"feature/ET-012-x", finished_agent="deployer",
)
assert res.advanced is True
assert _stage(task_id) == "done"
# The terminal Plane sync was invoked with the work item id.
stage_engine.set_issue_done.assert_called_once_with("ET-012")
def test_non_terminal_advance_does_not_force_plane_done(self, monkeypatch):
"""set_issue_done must only fire on the terminal deploy->done transition."""
monkeypatch.setattr(
stage_engine, "QG_CHECKS",
{k: _pass for k in stage_engine.QG_CHECKS},
)
task_id = _make_task("review")
advance_stage(
task_id, "review", "enduro-trails", "ET-001",
"feature/ET-001-x", finished_agent=None,
)
stage_engine.set_issue_done.assert_not_called()
def test_done_is_terminal(self):
task_id = _make_task("done")
res = advance_stage(task_id, "done", "enduro-trails", "ET-001",

View File

@@ -0,0 +1,342 @@
"""feat/telegram-live-tracker: tests for the live Telegram task tracker.
Covers (per DEV_TASK_TELEGRAM_TRACKER.md):
* short_model_name: provider/claude- prefix trimming.
* render_task_tracker: per-stage line format (in↓/out↑, model, cost, minutes),
the "⏸️ Ревью БРД · твоё время" line, the 💰 totals, and the finish block
(⏱️ three times + 🔗/📦).
* first message -> sendMessage stores message_id; transition -> editMessageText.
* fallback: editMessageText fails -> a NEW message is sent and the id updated.
* which alerts go out SEPARATELY (approve-gate / deploy-fail / agent-fail /
error) vs which do NOT (QG-pending / agent-start / stage-transition).
Isolated temp DB; no network (httpx is patched).
"""
import os
import tempfile
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_tracker.db")
os.environ["ORCH_DB_PATH"] = _test_db
from unittest.mock import MagicMock, patch # noqa: E402
import pytest # noqa: E402
import src.db as db_module # noqa: E402
from src.db import init_db, get_db # noqa: E402
from src import notifications as N # noqa: E402
from src import usage as U # noqa: E402
@pytest.fixture(autouse=True)
def setup_db(monkeypatch):
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
if os.path.exists(_test_db):
os.unlink(_test_db)
init_db()
# Re-enable send_telegram (conftest stubs it to a no-op); these tests patch
# httpx / the lower-level helpers explicitly per case.
yield
if os.path.exists(_test_db):
os.unlink(_test_db)
# --------------------------------------------------------------------------- #
# helpers to build a task + runs in the DB
# --------------------------------------------------------------------------- #
def _mk_task(stage="development", title="\u0422\u0440\u0435\u043a\u0438 \u0441 \u0437\u0443\u043c\u0430 z5",
wid="ET-012", brd_start=None, brd_end=None):
conn = get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title, "
"brd_review_started_at, brd_review_ended_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
("p1", wid, "enduro-trails", "feature/ET-012-x", stage, title,
brd_start, brd_end),
)
tid = cur.lastrowid
conn.commit()
conn.close()
return tid
def _mk_run(task_id, agent, started, finished, in_tok, out_tok,
cache_read=0, cache_creation=0, cost=0.0, model=None, exit_code=0):
conn = get_db()
cur = conn.execute(
"INSERT INTO agent_runs (task_id, agent, started_at, finished_at, "
"exit_code, input_tokens, output_tokens, cache_read_tokens, "
"cache_creation_tokens, cost_usd, model) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(task_id, agent, started, finished, exit_code, in_tok, out_tok,
cache_read, cache_creation, cost, model),
)
rid = cur.lastrowid
conn.commit()
conn.close()
return rid
# --------------------------------------------------------------------------- #
# short_model_name
# --------------------------------------------------------------------------- #
def test_short_model_name():
assert U.short_model_name("tokenator/claude-opus-4-8") == "opus-4-8"
assert U.short_model_name("vibecode/claude-sonnet-4.6") == "sonnet-4.6"
assert U.short_model_name("claude-opus-4-8") == "opus-4-8"
assert U.short_model_name("opus-4-8") == "opus-4-8"
assert U.short_model_name(None) == ""
assert U.short_model_name("") == ""
def test_parse_usage_extracts_model_from_modelusage():
blob = (
'{"total_cost_usd":0.01,'
'"usage":{"input_tokens":10,"output_tokens":5},'
'"modelUsage":{"claude-opus-4-8":{"inputTokens":10,"outputTokens":5}}}'
)
u = U.parse_usage_from_text(blob)
assert u["model"] == "claude-opus-4-8"
# --------------------------------------------------------------------------- #
# render_task_tracker
# --------------------------------------------------------------------------- #
def test_render_in_progress_stage_lines_and_totals():
tid = _mk_task(stage="deploy", brd_start="2026-06-04 10:00:00",
brd_end="2026-06-04 10:08:00")
# Analysis: 10м, 1.1M in (mostly cache) / 39.6k out, $2.38, opus-4-8
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
model="tokenator/claude-opus-4-8")
_mk_run(tid, "architect", "2026-06-04 10:08:00", "2026-06-04 10:17:00",
in_tok=500, out_tok=34400, cache_read=1_500_000, cost=2.24,
model="tokenator/claude-opus-4-8")
_mk_run(tid, "developer", "2026-06-04 10:17:00", "2026-06-04 10:28:00",
in_tok=400, out_tok=45800, cache_read=8_400_000, cost=7.29,
model="tokenator/claude-opus-4-8")
_mk_run(tid, "reviewer", "2026-06-04 10:28:00", "2026-06-04 10:31:00",
in_tok=300, out_tok=12900, cache_read=1_200_000, cost=1.53,
model="vibecode/claude-sonnet-4.6")
_mk_run(tid, "tester", "2026-06-04 10:31:00", "2026-06-04 10:36:00",
in_tok=200, out_tok=19500, cache_read=1_200_000, cost=1.51,
model="vibecode/claude-sonnet-4.6")
# deployer started but not finished -> active "идёт" line.
_mk_run(tid, "deployer", "2026-06-04 10:36:00", None,
in_tok=0, out_tok=0, model=None, exit_code=None)
text = N.render_task_tracker(tid)
# Header in-progress
assert text.startswith("\U0001f6e0\ufe0f ET-012 \u00b7 \u0422\u0440\u0435\u043a\u0438")
# Per-stage format: in↓/out↑ · cost · model
assert "\u2705 Analysis" in text
assert "10\u043c" in text # analysis duration
assert "39.6k\u2191" in text # analysis out
assert "$2.38" in text
assert "opus-4-8" in text
assert "sonnet-4.6" in text # reviewer/tester model
# BRD review line (human time, ended)
assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text
assert "\u0442\u0432\u043e\u0451 \u0432\u0440\u0435\u043c\u044f" in text
# Active stage
assert "\U0001f504 Deploy" in text
assert "\u0438\u0434\u0451\u0442" in text
# Totals line present with 💰
assert "\U0001f4b0" in text
# In-progress: no final ⏱️ line
assert "\u0412\u0441\u0435\u0433\u043e" not in text
def test_render_brd_review_waiting_shows_hourglass():
tid = _mk_task(stage="analysis", brd_start="2026-06-04 10:00:00",
brd_end=None)
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
model="tokenator/claude-opus-4-8")
text = N.render_task_tracker(tid)
assert "\u0420\u0435\u0432\u044c\u044e \u0411\u0420\u0414" in text
assert "\u23f3" in text # hourglass while waiting
def test_render_done_has_times_and_links():
tid = _mk_task(stage="done", brd_start="2026-06-04 10:00:00",
brd_end="2026-06-04 10:08:00")
# set created/updated to compute wall clock
conn = get_db()
conn.execute(
"UPDATE tasks SET created_at='2026-06-04 09:00:00', "
"updated_at='2026-06-04 09:56:00' WHERE id=?", (tid,))
conn.commit()
conn.close()
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=1000, out_tok=39600, cache_read=1_100_000, cost=2.38,
model="tokenator/claude-opus-4-8")
_mk_run(tid, "deployer", "2026-06-04 09:50:00", "2026-06-04 09:56:00",
in_tok=400, out_tok=22400, cache_read=1_600_000, cost=1.73,
model="tokenator/claude-opus-4-8")
with patch("src.notifications.httpx") as _hx:
# No PR found -> just "📦 deployed"
_resp = MagicMock(status_code=200)
_resp.json.return_value = []
_hx.get.return_value = _resp
text = N.render_task_tracker(tid)
assert text.startswith("\U0001f389 ET-012")
assert "\u0413\u041e\u0422\u041e\u0412\u041e" in text
# ⏱️ with three times
assert "\u23f1\ufe0f" in text
assert "\u0412\u0441\u0435\u0433\u043e" in text
assert "\u0430\u0433\u0435\u043d\u0442\u044b" in text
assert "\u0442\u0432\u043e\u0451" in text
# 📦 deployed line
assert "\U0001f4e6" in text
def test_render_escapes_html_in_title():
tid = _mk_task(stage="analysis", title="A <b>& B</b>")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.0)
text = N.render_task_tracker(tid)
assert "&lt;b&gt;" in text
assert "&amp;" in text
def test_render_omits_model_when_unknown():
tid = _mk_task(stage="analysis")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.0, model=None)
text = N.render_task_tracker(tid)
# No trailing " · <model>" — line ends at cost.
line = [l for l in text.splitlines() if l.startswith("\u2705 Analysis")][0]
assert line.rstrip().endswith("$0.00")
# --------------------------------------------------------------------------- #
# tracker send / edit / fallback
# --------------------------------------------------------------------------- #
def test_first_call_sends_message_and_stores_id(monkeypatch):
tid = _mk_task(stage="analysis")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", None, in_tok=0, out_tok=0,
exit_code=None)
sent = {}
def _fake_send(text, disable_notification=False):
sent["text"] = text
sent["silent"] = disable_notification
return 555
monkeypatch.setattr(N, "send_telegram", _fake_send)
monkeypatch.setattr(N, "edit_telegram", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not edit on first call")))
N.update_task_tracker(tid)
from src.db import get_tracker_message_id
assert get_tracker_message_id(tid) == 555
assert sent["silent"] is True # tracker is silent
def test_second_call_edits_existing_message(monkeypatch):
tid = _mk_task(stage="development")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.1)
from src.db import set_tracker_message_id
set_tracker_message_id(tid, 777)
edited = {}
monkeypatch.setattr(N, "edit_telegram",
lambda mid, text: edited.update(mid=mid) or True)
monkeypatch.setattr(N, "send_telegram",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not send when edit succeeds")))
N.update_task_tracker(tid)
assert edited["mid"] == 777
def test_fallback_to_new_message_when_edit_fails(monkeypatch):
tid = _mk_task(stage="development")
_mk_run(tid, "analyst", "2026-06-04 09:00:00", "2026-06-04 09:10:00",
in_tok=10, out_tok=5, cost=0.1)
from src.db import set_tracker_message_id, get_tracker_message_id
set_tracker_message_id(tid, 100)
monkeypatch.setattr(N, "edit_telegram", lambda mid, text: False) # edit fails
monkeypatch.setattr(N, "send_telegram", lambda text, disable_notification=False: 200)
N.update_task_tracker(tid)
assert get_tracker_message_id(tid) == 200 # id updated to the new message
# --------------------------------------------------------------------------- #
# which alerts are SEPARATE vs tracker-only
# --------------------------------------------------------------------------- #
def test_approve_gate_sends_separate_message_and_starts_brd_clock(monkeypatch):
tid = _mk_task(stage="analysis")
calls = []
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1)
monkeypatch.setattr(N, "update_task_tracker", lambda task_id: None)
N.notify_approve_requested(tid)
# exactly one SEPARATE (notifying) send for the approve gate
assert len(calls) == 1
assert calls[0][1] is False # notifying
assert "Approved" in calls[0][0]
# BRD clock started
conn = get_db()
row = conn.execute("SELECT brd_review_started_at FROM tasks WHERE id=?", (tid,)).fetchone()
conn.close()
assert row[0] is not None
def test_error_sends_separate_message(monkeypatch):
tid = _mk_task(stage="development")
calls = []
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False: calls.append((text, disable_notification)) or 1)
N.notify_error(tid, "boom")
assert len(calls) == 1
assert calls[0][1] is False # notifying
assert "ERROR" in calls[0][0]
def test_stage_change_does_not_send_separate_message(monkeypatch):
tid = _mk_task(stage="development")
sent = []
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False: sent.append(text) or 1)
# tracker refresh is allowed (edit/send silent) but must NOT use send_telegram
# for a separate notification; stub update to isolate.
refreshed = []
monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id))
N.notify_stage_change(tid, "development", "review")
assert sent == [] # no separate message
assert refreshed == [tid] # tracker refreshed instead
def test_agent_started_does_not_send_separate_message(monkeypatch):
tid = _mk_task(stage="analysis")
sent = []
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False: sent.append(text) or 1)
refreshed = []
monkeypatch.setattr(N, "update_task_tracker", lambda task_id: refreshed.append(task_id))
N.notify_agent_started(1, "analyst", tid)
assert sent == []
assert refreshed == [tid]
def test_qg_failure_does_not_send_separate_message(monkeypatch):
tid = _mk_task(stage="development")
sent = []
monkeypatch.setattr(N, "send_telegram",
lambda text, disable_notification=False: sent.append(text) or 1)
N.notify_qg_failure(tid, "development", "check_ci_green", "CI state: pending")
assert sent == [] # QG-pending is log-only, never a separate ping

View File

@@ -62,9 +62,27 @@ def test_parse_real_result_json():
assert u["input_tokens"] == 45231
assert u["output_tokens"] == 12100
assert u["cache_read_tokens"] == 18500
# FIX 2: cache_creation slice must now be parsed (was dropped before).
assert u["cache_creation_tokens"] == 7418
assert abs(u["cost_usd"] - 0.0560175) < 1e-9
def test_parse_cache_creation_present():
u = U.parse_usage_from_text(REAL_RESULT_JSON)
assert u["cache_creation_tokens"] == 7418
def test_parse_cache_creation_missing_defaults_zero():
blob = (
'{"total_cost_usd":0.01,'
'"usage":{"input_tokens":10,"output_tokens":5,'
'"cache_read_input_tokens":100}}'
)
u = U.parse_usage_from_text(blob)
assert u["cache_creation_tokens"] == 0
assert u["cache_read_tokens"] == 100
def test_parse_with_leading_text():
"""The agent may print text before the trailing JSON; we still find it."""
text = "some agent stdout line\nanother line\n" + REAL_RESULT_JSON
@@ -106,13 +124,16 @@ def test_record_usage_writes_columns():
U.record_usage(rid, u)
conn = get_db()
row = conn.execute(
"SELECT input_tokens, output_tokens, cache_read_tokens, cost_usd "
"SELECT input_tokens, output_tokens, cache_read_tokens, "
"cache_creation_tokens, cost_usd "
"FROM agent_runs WHERE id=?", (rid,)
).fetchone()
conn.close()
assert row["input_tokens"] == 45231
assert row["output_tokens"] == 12100
assert row["cache_read_tokens"] == 18500
# FIX 2: cache_creation column is now persisted.
assert row["cache_creation_tokens"] == 7418
assert abs(row["cost_usd"] - 0.0560175) < 1e-9
@@ -144,14 +165,82 @@ def test_fmt_cost():
def test_usage_comment_format():
# No cache -> in_total == input_tokens, no cached breakdown shown.
u = {"input_tokens": 45231, "output_tokens": 12100, "cost_usd": 0.21}
c = U.usage_comment("developer", u)
assert "Developer" in c
assert "45.2k in" in c
assert "cached" not in c
assert "12.1k out" in c
assert "$0.21" in c
def test_usage_comment_shows_full_input_with_cached():
"""FIX 2: in = input + cache_read + cache_creation, with cached breakdown."""
u = {
"input_tokens": 81,
"cache_read_tokens": 8_400_000,
"cache_creation_tokens": 100_000,
"output_tokens": 45_800,
"cost_usd": 7.29,
}
c = U.usage_comment("developer", u)
# total in = 8_500_081 -> 8.5M ; cached = 8_500_000 -> 8.5M
assert "8.5M in (8.5M cached)" in c
assert "45.8k out" in c
assert "$7.29" in c
def test_usage_comment_no_cached_when_zero():
u = {"input_tokens": 1234, "cache_read_tokens": 0,
"cache_creation_tokens": 0, "output_tokens": 50, "cost_usd": 0.01}
c = U.usage_comment("developer", u)
assert "1.2k in" in c
assert "cached" not in c
# --------------------------------------------------------------------------- #
# FIX 4: per-agent artifact links in finish comments
# --------------------------------------------------------------------------- #
def _ctx():
return dict(repo="enduro-trails", branch="feature/ET-012-x",
work_item_id="ET-012")
def test_usage_comment_reviewer_links_review_doc():
c = U.usage_comment("reviewer", {"input_tokens": 5}, **_ctx())
assert "12-review.md" in c
assert "ET-012" in c
def test_usage_comment_tester_links_test_report():
c = U.usage_comment("tester", {"input_tokens": 5}, **_ctx())
assert "13-test-report.md" in c
def test_usage_comment_deployer_links_deploy_log():
c = U.usage_comment("deployer", {"input_tokens": 5}, **_ctx())
assert "14-deploy-log.md" in c
def test_usage_comment_developer_links_pr_and_branch():
c = U.usage_comment("developer", {"input_tokens": 5}, pr_number=7, **_ctx())
assert "pulls/7" in c
assert "feature/ET-012-x" in c
def test_usage_comment_architect_links_adr():
c = U.usage_comment("architect", {"input_tokens": 5}, **_ctx())
assert "06-adr" in c
def test_usage_comment_no_links_without_context():
"""Without repo/branch context, no links are appended (no crash)."""
c = U.usage_comment("reviewer", {"input_tokens": 5})
assert "12-review.md" not in c
assert "http" not in c
# --------------------------------------------------------------------------- #
# task summary
# --------------------------------------------------------------------------- #
@@ -174,3 +263,47 @@ def test_task_summary_aggregates_over_agents():
assert "$0.15" in comment # total cost
assert "Developer" in comment
assert "Tester" in comment
def test_task_summary_sums_all_three_input_components():
"""FIX 2: total_in = SUM(input + cache_read + cache_creation); total_cached too."""
rid = _new_run(agent="developer", task_id=77)
U.record_usage(rid, {
"input_tokens": 100,
"cache_read_tokens": 2000,
"cache_creation_tokens": 900,
"output_tokens": 50,
"cost_usd": 0.10,
})
rid2 = _new_run(agent="tester", task_id=77)
U.record_usage(rid2, {
"input_tokens": 10,
"cache_read_tokens": 500,
"cache_creation_tokens": 0,
"output_tokens": 5,
"cost_usd": 0.05,
})
s = U.task_usage_summary(77)
# total_in = (100+2000+900) + (10+500+0) = 3510
assert s["total_in"] == 3510
# total_cached = (2000+900) + (500+0) = 3400
assert s["total_cached"] == 3400
assert s["total_out"] == 55
comment = U.task_summary_comment(77)
assert "cached" in comment
def test_task_summary_handles_null_cache_creation():
"""Pre-existing rows (NULL cache_creation) must not break aggregation."""
rid = _new_run(agent="developer", task_id=88)
conn = get_db()
conn.execute(
"UPDATE agent_runs SET input_tokens=100, cache_read_tokens=200, "
"cache_creation_tokens=NULL, output_tokens=10, cost_usd=0.01 WHERE id=?",
(rid,),
)
conn.commit()
conn.close()
s = U.task_usage_summary(88) # must not raise
assert s["total_in"] == 300 # 100 + 200 + (NULL->0)
assert s["total_cached"] == 200

View File

@@ -433,3 +433,67 @@ def test_ci_failure_development_escalates_at_limit(
assert "after CI failure" in err_msg
# Stage untouched.
assert not mock_update_stage.called
# ---------------------------------------------------------------------------
# BUG 8 (second door): a merged-PR webhook must NOT fake-complete a task that is
# still in the deploy stage. On `deploy` done is gated by the deployer's verdict
# (check_deploy_status via advance_stage), not by the merge event. For every
# other stage the merge->done behaviour is preserved. Pure-logic tests: invoke
# handle_pr() directly with mocked helpers (no HMAC barrier).
# ---------------------------------------------------------------------------
def _merged_pr_payload(branch="feature/ET-012-x"):
return {
"action": "closed",
"pull_request": {
"merged": True,
"number": 7,
"head": {"ref": branch},
},
"repository": {"name": "enduro-trails"},
}
@patch("src.webhooks.gitea.notify_stage_change")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_merge_on_deploy_stage_does_not_set_done(
mock_proj, mock_task, mock_update_stage, mock_notify,
):
"""FIX 1: merge at deploy stage is ignored — done is gated by deployer verdict."""
from src.webhooks.gitea import handle_pr
mock_proj.return_value = {"repo": "enduro-trails"}
mock_task.return_value = {
"id": 1, "stage": "deploy", "work_item_id": "ET-012",
}
asyncio.run(handle_pr(_merged_pr_payload()))
# The merge-driven done path must NOT run on deploy.
assert not mock_update_stage.called
assert not mock_notify.called
@patch("src.webhooks.gitea.notify_stage_change")
@patch("src.webhooks.gitea.update_task_stage")
@patch("src.webhooks.gitea.get_task_by_repo_branch")
@patch("src.webhooks.gitea.get_project_by_repo")
def test_merge_on_non_deploy_stage_sets_done(
mock_proj, mock_task, mock_update_stage, mock_notify,
):
"""FIX 1: merge behaviour is preserved for non-deploy stages (e.g. review)."""
from src.webhooks.gitea import handle_pr
mock_proj.return_value = {"repo": "enduro-trails"}
mock_task.return_value = {
"id": 2, "stage": "review", "work_item_id": "ET-013",
}
asyncio.run(handle_pr(_merged_pr_payload(branch="feature/ET-013-x")))
# Non-deploy stages still get the merge-driven done.
mock_update_stage.assert_called_once_with(2, "done")
assert mock_notify.called