Step 1 ("Foundation", F2) of the self-improvement epic: formalise free-text
"lessons" from memory/ into a machine-readable `lessons` table — the foundation
for the future retrospective agent (E2), the RICE prioritiser (E3) and Стрим.
- src/lessons.py: pure never-raise observer leaf (record/get/update/snapshot),
kill-switch only, NO repo scope (observer-only; records about any repo incl.
enduro; repo cut on the read side). Slug-convention constants.
- src/db.py: additive idempotent `lessons` table in init_db() (+3 indexes);
nullable attribution columns from the start (NFR-6, _ensure_column forward-safe);
helpers record_lesson/get_lessons/update_lesson/lessons_snapshot/
lessons_recent_dup_exists (auto-dedup window).
- 4 auto-detectors (best-effort, source="auto", deduped): gate_failure
(_handle_qg_failure_rollbacks), merge_hold (_handle_merge_verify HOLD),
transient_retry (launcher._finalize_transient budget-exhaustion), deploy_degraded
(post-deploy DEGRADED -> set_repo_freeze).
- src/main.py: GET /lessons, POST /lessons, POST /lessons/{id} + read-only
`lessons` block in GET /queue; off-switch -> {"enabled": false}.
- src/config.py: lessons_enabled / lessons_query_limit_default / lessons_dedup_window_s.
- tests/test_lessons.py: TC-01..TC-12 (unit + integration), all green.
- Docs: CLAUDE.md, docs/architecture/README.md (component + schema + API), CHANGELOG.
Invariant: the journal is an OBSERVER, not a Quality Gate — STAGE_TRANSITIONS /
QG_CHECKS / check_* / machine-verdict / existing table schemas are byte-for-byte
untouched; enduro not affected. never-raise on every public fn + injection.
Refs: ORCH-098
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1688 lines
68 KiB
Python
1688 lines
68 KiB
Python
import sqlite3
|
||
import threading
|
||
from .config import settings
|
||
|
||
# ORCH-053 (F-2 anti-dup): process-wide lock guarding the SELECT-exists -> INSERT
|
||
# task-creation claim. The prod topology is a single uvicorn process per DB
|
||
# (staging/prod isolated), with the webhook running in uvicorn's asyncio thread
|
||
# and the reconciler in its own thread of the SAME process -> a threading.Lock
|
||
# covers both sides of the create race without a schema migration. See
|
||
# docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md §4.
|
||
_CREATE_TASK_LOCK = threading.Lock()
|
||
|
||
|
||
def get_db() -> sqlite3.Connection:
|
||
conn = sqlite3.connect(settings.db_path)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
def init_db():
|
||
conn = get_db()
|
||
conn.executescript("""
|
||
CREATE TABLE IF NOT EXISTS events (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
timestamp TEXT DEFAULT (datetime('now')),
|
||
source TEXT NOT NULL,
|
||
event_type TEXT NOT NULL,
|
||
payload TEXT NOT NULL,
|
||
processed INTEGER DEFAULT 0
|
||
);
|
||
CREATE TABLE IF NOT EXISTS tasks (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
plane_id TEXT,
|
||
work_item_id TEXT,
|
||
repo TEXT NOT NULL,
|
||
branch TEXT,
|
||
stage TEXT DEFAULT 'created',
|
||
agent_running TEXT,
|
||
created_at TEXT DEFAULT (datetime('now')),
|
||
updated_at TEXT DEFAULT (datetime('now')),
|
||
plane_issue_id TEXT
|
||
);
|
||
CREATE TABLE IF NOT EXISTS agent_runs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
task_id INTEGER REFERENCES tasks(id),
|
||
agent TEXT NOT NULL,
|
||
started_at TEXT DEFAULT (datetime('now')),
|
||
finished_at TEXT,
|
||
exit_code INTEGER,
|
||
output_path TEXT
|
||
);
|
||
-- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and
|
||
-- return immediately; a background worker claims jobs (respecting
|
||
-- max_concurrency), spawns the claude agent, and updates the status.
|
||
-- Restart-safe: running jobs are requeued on startup (queue-recovery).
|
||
CREATE TABLE IF NOT EXISTS jobs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
agent TEXT NOT NULL,
|
||
repo TEXT NOT NULL,
|
||
task_id INTEGER, -- FK tasks.id (nullable)
|
||
task_content TEXT, -- written to the agent task_file
|
||
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed|cancelled (ORCH-090: cancelled is a terminal outcome, never requeued)
|
||
attempts INTEGER NOT NULL DEFAULT 0,
|
||
max_attempts INTEGER NOT NULL DEFAULT 2,
|
||
run_id INTEGER, -- agent_runs.id once started
|
||
error TEXT, -- last error message
|
||
transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries
|
||
available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now)
|
||
created_at TEXT DEFAULT (datetime('now')),
|
||
started_at TEXT,
|
||
finished_at TEXT
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
|
||
""")
|
||
# Lightweight migration: add resilience columns to a pre-existing jobs table
|
||
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
|
||
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
|
||
_ensure_column(conn, "jobs", "available_at", "TEXT")
|
||
# ORCH-065: pid of the spawned agent process, stamped in launcher._spawn next to
|
||
# run_id/started_at. The job-reaper uses it for Tier-1 liveness (os.kill(pid, 0))
|
||
# to detect a 'running' job whose process died before _finalize_job. Idempotent
|
||
# ALTER (no-op once present) -> safe on the live prod DB.
|
||
_ensure_column(conn, "jobs", "pid", "INTEGER")
|
||
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
|
||
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
|
||
# (which have NULL delivery_id) never collide with each other. Restart-safe:
|
||
# _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT
|
||
# EXISTS is a no-op once the index exists, so this is safe on the live prod DB.
|
||
_ensure_column(conn, "events", "delivery_id", "TEXT")
|
||
conn.execute(
|
||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
|
||
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
|
||
)
|
||
# Feature 4 (token usage): per-run token / cost accounting. Parsed from the
|
||
# claude --output-format json result by the launcher monitor. Idempotent
|
||
# ALTERs (no-op once the columns exist) so this is safe on the live prod DB.
|
||
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
|
||
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
|
||
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
|
||
# Observability fix: also persist cache-CREATION input tokens. Claude CLI
|
||
# reports the real input split across input_tokens (fresh, ~tens) +
|
||
# cache_read_input_tokens (cache hit, millions) + cache_creation_input_tokens
|
||
# (writing new cache). Without this column the cache_creation slice is lost
|
||
# and the "X in" figure understates the true prompt size. Idempotent ALTER.
|
||
_ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER")
|
||
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
|
||
# Telegram live tracker (feat/telegram-live-tracker): persist the FULL model
|
||
# name (e.g. "tokenator/claude-opus-4-8") per agent_runs row so the tracker
|
||
# can render a short model tag per stage. Parsed from the run-log result JSON
|
||
# (modelUsage key) by the launcher monitor; NULL when unknown. Idempotent ALTER.
|
||
_ensure_column(conn, "agent_runs", "model", "TEXT")
|
||
# ORCH-087 (BR-EFF): persist the REAL --effort value sent to the Claude CLI per
|
||
# agent_runs row (low|medium|high|xhigh|max) so the tracker can render the
|
||
# resolved effort next to the model ("· opus-4-8 · xhigh"). Stamped in
|
||
# launcher._spawn right after resolve_agent_effort; NULL when no --effort flag
|
||
# was passed (resolved to "") or for historical rows. Idempotent ALTER.
|
||
_ensure_column(conn, "agent_runs", "effort", "TEXT")
|
||
# Telegram live tracker: one editable Telegram message per task. We store its
|
||
# message_id so each stage transition can editMessageText the same message
|
||
# instead of spamming a new one. Idempotent ALTER (safe on the live prod DB).
|
||
_ensure_column(conn, "tasks", "tracker_message_id", "INTEGER")
|
||
# Telegram live tracker: human-readable task title for the tracker header
|
||
# ("🛠️ ET-012 · <title>"). Populated from the Plane work-item name at task
|
||
# creation; falls back to the work_item_id when absent. Idempotent ALTER.
|
||
_ensure_column(conn, "tasks", "title", "TEXT")
|
||
# Telegram live tracker: "BRD review" is the only HUMAN gate time — the delta
|
||
# between "BRD ready / approve requested" and the analysis->architecture
|
||
# advance (human flipped Plane to Approved). Persisted on the task so the
|
||
# tracker can show "твоё время" without recomputing from activity history.
|
||
_ensure_column(conn, "tasks", "brd_review_started_at", "TEXT")
|
||
_ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT")
|
||
# ORCH-090 (08-data-requirements.md): STOP-cancellation durable markers. Both are
|
||
# additive, idempotent (_ensure_column is a no-op once present) -> safe on the live
|
||
# shared prod DB (enduro untouched). The durable terminal itself is tasks.stage=
|
||
# 'cancelled' (already understood by the reconciler terminal-skip); these columns
|
||
# are audit/observability + the deferred-cancel signal.
|
||
# cancelled_at -> timestamp the task was cancelled (NULL otherwise).
|
||
# cancel_requested_at -> STOP arrived inside a critical merge/deploy window
|
||
# (ADR-001 D7): cancellation is DEFERRED until the
|
||
# irreversible step finishes honestly, then applied.
|
||
_ensure_column(conn, "tasks", "cancelled_at", "TEXT")
|
||
_ensure_column(conn, "tasks", "cancel_requested_at", "TEXT")
|
||
# ORCH-019 (08-data-requirements.md): bug-fast-track task type. Additive,
|
||
# idempotent (_ensure_column is a no-op once present) -> safe on the live shared
|
||
# prod DB (enduro untouched). Values: 'full' (DEFAULT — ALL existing and non-bug
|
||
# tasks) | 'bug' (a task carrying the Plane `Bug` label, set in start_pipeline
|
||
# after a successful atomic create). Read in advance_stage for the routing-override
|
||
# (skips architecture) — from the DB, NEVER from the network (NFR-4).
|
||
_ensure_column(conn, "tasks", "track", "TEXT DEFAULT 'full'")
|
||
# ORCH-026 (Level B): declarative task dependencies. job_deps stores the
|
||
# directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The
|
||
# scheduler gate in claim_next_job keeps B queued until every A reaches
|
||
# tasks.stage='done'. Purely ADDITIVE (CREATE TABLE/INDEX IF NOT EXISTS, no
|
||
# change to jobs/tasks/agent_runs/events columns) -> idempotent and safe on
|
||
# the live shared prod DB (enduro-trails data untouched). The logical FK on
|
||
# tasks.id is intentional (no REFERENCES, mirrors jobs.task_id) so the
|
||
# migration cannot fail on a pre-existing DB. See 08-data-requirements.md.
|
||
conn.executescript("""
|
||
CREATE TABLE IF NOT EXISTS job_deps (
|
||
task_id INTEGER NOT NULL,
|
||
depends_on_task_id INTEGER NOT NULL,
|
||
created_at TEXT DEFAULT (datetime('now')),
|
||
PRIMARY KEY (task_id, depends_on_task_id)
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_job_deps_task ON job_deps(task_id);
|
||
CREATE INDEX IF NOT EXISTS idx_job_deps_depends ON job_deps(depends_on_task_id);
|
||
""")
|
||
# ORCH-087 (BR-G1, ADR-001 Р-1): authoritative ledger of EVERY tracker card
|
||
# (Telegram message_id) ever created for a task. The scalar
|
||
# tasks.tracker_message_id only ever knew the LAST mid, so any lost reference
|
||
# (delete-fail+send-ok, race, restart) orphaned older cards forever. This
|
||
# ledger lets every bump delete ALL still-open mids (deleted_at IS NULL), not
|
||
# just the last one. tasks.tracker_message_id is KEPT (current-card pointer,
|
||
# full BC). Purely ADDITIVE (CREATE TABLE/INDEX IF NOT EXISTS) -> idempotent,
|
||
# restart-safe on the live shared prod DB (enduro-trails data untouched). The
|
||
# logical FK on tasks.id is intentional (no REFERENCES, mirrors job_deps) so
|
||
# the migration cannot fail on a pre-existing DB. See 08-data-requirements.md.
|
||
conn.executescript("""
|
||
CREATE TABLE IF NOT EXISTS tracker_messages (
|
||
task_id INTEGER NOT NULL,
|
||
message_id INTEGER NOT NULL,
|
||
created_at TEXT DEFAULT (datetime('now')),
|
||
deleted_at TEXT,
|
||
PRIMARY KEY (task_id, message_id)
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_tracker_messages_open
|
||
ON tracker_messages(task_id) WHERE deleted_at IS NULL;
|
||
""")
|
||
# ORCH-088 (FR-5, ADR-001 D2): durable per-repo rollback-freeze. After a
|
||
# post-deploy DEGRADED verdict the repo is frozen so the serial gate stays
|
||
# CLOSED unconditionally (the degraded task is already stage='done' — BR-7 — so
|
||
# the ordinary active-task gate would not hold it) until an operator clears it
|
||
# via POST /serial-gate/unfreeze. Append-only journal: an ACTIVE freeze for repo
|
||
# R ⇔ a row with repo=R AND cleared_at IS NULL. Purely ADDITIVE (CREATE
|
||
# TABLE/INDEX IF NOT EXISTS) -> idempotent, restart-safe on the live shared prod
|
||
# DB (enduro-trails data untouched). See 08-data-requirements.md.
|
||
conn.executescript("""
|
||
CREATE TABLE IF NOT EXISTS repo_freeze (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
repo TEXT NOT NULL,
|
||
frozen_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||
reason TEXT,
|
||
work_item_id TEXT,
|
||
cleared_at TEXT
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_repo_freeze_active
|
||
ON repo_freeze (repo, cleared_at);
|
||
""")
|
||
# ORCH-027 (FR-4, ADR-001 D4): additive per-repo coverage baseline for the
|
||
# coverage-gate ratchet. One row per repo; the baseline is monotonically
|
||
# non-decreasing via ratchet_coverage_baseline (atomic compare-and-set). Purely
|
||
# ADDITIVE (CREATE TABLE IF NOT EXISTS, pattern repo_freeze/job_deps) ->
|
||
# idempotent, restart-safe on the shared prod DB; existing tables untouched
|
||
# (NFR-5). See docs/work-items/ORCH-027/08-data-requirements.md.
|
||
conn.executescript("""
|
||
CREATE TABLE IF NOT EXISTS coverage_baseline (
|
||
repo TEXT PRIMARY KEY,
|
||
coverage REAL NOT NULL,
|
||
source_sha TEXT,
|
||
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||
);
|
||
""")
|
||
# ORCH-098 (FR-1, ADR-001 D1): additive machine lessons-journal — a structured
|
||
# table of pipeline deviations (gate-fail / merge-hold / transient-retry /
|
||
# post-deploy-degraded), the foundation of the self-improvement epic (E2
|
||
# retrospective / E3 RICE prioritiser). Purely ADDITIVE (CREATE TABLE/INDEX IF NOT
|
||
# EXISTS, pattern repo_freeze/coverage_baseline) -> idempotent, restart-safe on
|
||
# the shared prod DB; existing tables untouched (NFR-3, enduro-trails not
|
||
# affected). The attribution columns (attribution/target_repo/target_domain) are
|
||
# NULLABLE and present FROM THE START (Слава 10.06, NFR-6) so the live shared DB
|
||
# never needs a schema rework — an auto-recorded `unknown` lesson is classified
|
||
# later via update. lesson_type / attribution / target_domain carry NO enum/CHECK
|
||
# constraint: the values are a forward-compatible slug convention (a new lesson
|
||
# type never needs a migration). See docs/work-items/ORCH-098/08-data-requirements.md.
|
||
conn.executescript("""
|
||
CREATE TABLE IF NOT EXISTS lessons (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||
updated_at TEXT,
|
||
lesson_type TEXT NOT NULL,
|
||
work_item_id TEXT,
|
||
task_id INTEGER,
|
||
stage TEXT,
|
||
agent TEXT,
|
||
repo TEXT,
|
||
root_cause TEXT,
|
||
suggestion TEXT,
|
||
status TEXT NOT NULL DEFAULT 'new',
|
||
related_task TEXT,
|
||
attribution TEXT,
|
||
target_repo TEXT,
|
||
target_domain TEXT,
|
||
source TEXT,
|
||
detail TEXT
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_lessons_type_status ON lessons (lesson_type, status);
|
||
CREATE INDEX IF NOT EXISTS idx_lessons_repo ON lessons (repo);
|
||
CREATE INDEX IF NOT EXISTS idx_lessons_wi_type ON lessons (work_item_id, lesson_type);
|
||
""")
|
||
# Forward-safe: on an already-created `lessons` table the attribution columns are
|
||
# added idempotently (_ensure_column is a no-op once present) so an old prod DB
|
||
# picks them up without a data migration (NFR-6, AC-2).
|
||
_ensure_column(conn, "lessons", "attribution", "TEXT")
|
||
_ensure_column(conn, "lessons", "target_repo", "TEXT")
|
||
_ensure_column(conn, "lessons", "target_domain", "TEXT")
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ORCH-098 (FR-1..FR-5, ADR-001 D1): lessons-journal DDL helpers. Each opens its
|
||
# own connection and closes it in `finally` (pattern coverage_baseline). The leaf
|
||
# src/lessons.py wraps these in its never-raise contract — these may raise on a
|
||
# real DB fault (the leaf swallows it).
|
||
# ---------------------------------------------------------------------------
|
||
# The full column set, in INSERT order. Single source of truth so record/get stay
|
||
# in lockstep with the schema.
|
||
_LESSON_COLUMNS = (
|
||
"lesson_type", "work_item_id", "task_id", "stage", "agent", "repo",
|
||
"root_cause", "suggestion", "status", "related_task",
|
||
"attribution", "target_repo", "target_domain", "source", "detail",
|
||
)
|
||
# Fields an update() may set (everything mutable; never id/created_at/lesson_type).
|
||
_LESSON_UPDATABLE = (
|
||
"status", "attribution", "target_repo", "target_domain", "related_task",
|
||
"root_cause", "suggestion", "stage", "agent", "repo", "detail",
|
||
)
|
||
|
||
|
||
def record_lesson(**fields) -> int:
|
||
"""Insert one lessons row; return the new id. Raises only on a real DB fault.
|
||
|
||
Only the known columns in ``_LESSON_COLUMNS`` are written; unknown keys are
|
||
ignored (forward-safe). ``created_at`` is stamped by the table default.
|
||
"""
|
||
cols = [c for c in _LESSON_COLUMNS if c in fields]
|
||
if "lesson_type" not in cols:
|
||
raise ValueError("record_lesson requires lesson_type")
|
||
placeholders = ", ".join("?" for _ in cols)
|
||
sql = f"INSERT INTO lessons ({', '.join(cols)}) VALUES ({placeholders})"
|
||
conn = get_db()
|
||
try:
|
||
cur = conn.execute(sql, tuple(fields[c] for c in cols))
|
||
conn.commit()
|
||
return int(cur.lastrowid)
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def lessons_recent_dup_exists(work_item_id, lesson_type, stage, window_s: int) -> bool:
|
||
"""ORCH-098 (D4): is there an auto-lesson with the same (work_item_id,
|
||
lesson_type, stage) within the last ``window_s`` seconds? One indexed lookup on
|
||
``idx_lessons_wi_type``. Used to suppress duplicate auto-records on retries.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT 1 FROM lessons "
|
||
"WHERE work_item_id IS ? AND lesson_type = ? AND stage IS ? "
|
||
"AND source = 'auto' "
|
||
"AND created_at > datetime('now', ?) LIMIT 1",
|
||
(work_item_id, lesson_type, stage, f"-{int(window_s)} seconds"),
|
||
).fetchone()
|
||
finally:
|
||
conn.close()
|
||
return row is not None
|
||
|
||
|
||
def get_lessons(*, lesson_type=None, status=None, repo=None, work_item_id=None,
|
||
limit: int = 100) -> list[dict]:
|
||
"""Read-only parametrised SELECT of lessons (ORDER BY id DESC LIMIT ?)."""
|
||
where = []
|
||
params: list = []
|
||
if lesson_type:
|
||
where.append("lesson_type = ?")
|
||
params.append(lesson_type)
|
||
if status:
|
||
where.append("status = ?")
|
||
params.append(status)
|
||
if repo:
|
||
where.append("repo = ?")
|
||
params.append(repo)
|
||
if work_item_id:
|
||
where.append("work_item_id = ?")
|
||
params.append(work_item_id)
|
||
sql = "SELECT * FROM lessons"
|
||
if where:
|
||
sql += " WHERE " + " AND ".join(where)
|
||
sql += " ORDER BY id DESC LIMIT ?"
|
||
try:
|
||
lim = int(limit)
|
||
except (TypeError, ValueError):
|
||
lim = 100
|
||
params.append(max(1, lim))
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(sql, tuple(params)).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def update_lesson(lesson_id: int, **fields) -> bool:
|
||
"""Update mutable fields of a lesson + stamp updated_at. Returns True iff a row
|
||
changed. Unknown / non-updatable keys are ignored (forward-safe).
|
||
"""
|
||
sets = [c for c in _LESSON_UPDATABLE if c in fields]
|
||
if not sets:
|
||
return False
|
||
assignments = ", ".join(f"{c} = ?" for c in sets)
|
||
sql = f"UPDATE lessons SET {assignments}, updated_at = datetime('now') WHERE id = ?"
|
||
conn = get_db()
|
||
try:
|
||
cur = conn.execute(sql, tuple(fields[c] for c in sets) + (int(lesson_id),))
|
||
conn.commit()
|
||
return (cur.rowcount or 0) > 0
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def lessons_snapshot(recent: int = 10) -> dict:
|
||
"""Light GROUP BY summary (counts by type/status) + the last N lessons, for the
|
||
GET /queue observability block."""
|
||
conn = get_db()
|
||
try:
|
||
total = conn.execute("SELECT COUNT(*) FROM lessons").fetchone()[0]
|
||
by_type = {
|
||
r["lesson_type"]: r["n"]
|
||
for r in conn.execute(
|
||
"SELECT lesson_type, COUNT(*) AS n FROM lessons GROUP BY lesson_type"
|
||
).fetchall()
|
||
}
|
||
by_status = {
|
||
r["status"]: r["n"]
|
||
for r in conn.execute(
|
||
"SELECT status, COUNT(*) AS n FROM lessons GROUP BY status"
|
||
).fetchall()
|
||
}
|
||
rows = conn.execute(
|
||
"SELECT * FROM lessons ORDER BY id DESC LIMIT ?", (max(1, int(recent)),)
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return {
|
||
"total": total,
|
||
"by_type": by_type,
|
||
"by_status": by_status,
|
||
"recent": [dict(r) for r in rows],
|
||
}
|
||
|
||
|
||
def get_coverage_baseline(repo: str) -> float | None:
|
||
"""ORCH-027: read the per-repo coverage baseline (%, line coverage).
|
||
|
||
Returns ``None`` when no baseline is stored yet (bootstrap mode — the gate then
|
||
decides on the absolute floor only, D3). Raises only on a real DB error (the
|
||
coverage_gate leaf caller wraps this in its never-raise contract).
|
||
"""
|
||
if not repo:
|
||
return None
|
||
conn = get_db()
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT coverage FROM coverage_baseline WHERE repo = ?", (repo,)
|
||
).fetchone()
|
||
finally:
|
||
conn.close()
|
||
if row is None:
|
||
return None
|
||
try:
|
||
return float(row["coverage"])
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
def ratchet_coverage_baseline(repo: str, coverage: float, sha: str | None = None) -> bool:
|
||
"""ORCH-027 (FR-4, D5): raise the per-repo coverage baseline UP, never down.
|
||
|
||
Atomic compare-and-set: ``UPDATE ... WHERE coverage <= ?`` (the baseline never
|
||
decreases — an equal value is an idempotent no-harm re-stamp), or ``INSERT`` when
|
||
no row exists yet (bootstrap). Under the held merge-lease (ORCH-043) plus this
|
||
single-statement guard, two parallel merges can never lower or lose the value.
|
||
Returns True iff a row was inserted or raised.
|
||
"""
|
||
if not repo:
|
||
return False
|
||
try:
|
||
cov = float(coverage)
|
||
except (TypeError, ValueError):
|
||
return False
|
||
conn = get_db()
|
||
try:
|
||
cur = conn.execute(
|
||
"UPDATE coverage_baseline "
|
||
"SET coverage = ?, source_sha = ?, updated_at = datetime('now') "
|
||
"WHERE repo = ? AND coverage <= ?",
|
||
(cov, sha, repo, cov),
|
||
)
|
||
changed = cur.rowcount or 0
|
||
if changed == 0:
|
||
# No row updated: either the row is absent (bootstrap INSERT) or the
|
||
# existing baseline is already higher (skip — never lower it).
|
||
exists = conn.execute(
|
||
"SELECT 1 FROM coverage_baseline WHERE repo = ?", (repo,)
|
||
).fetchone()
|
||
if exists is None:
|
||
conn.execute(
|
||
"INSERT INTO coverage_baseline (repo, coverage, source_sha, updated_at) "
|
||
"VALUES (?, ?, ?, datetime('now'))",
|
||
(repo, cov, sha),
|
||
)
|
||
changed = 1
|
||
conn.commit()
|
||
return bool(changed)
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def set_coverage_baseline(repo: str, coverage: float, sha: str | None = None) -> bool:
|
||
"""ORCH-027 (D8): UNCONDITIONALLY set the per-repo coverage baseline.
|
||
|
||
For a legitimate one-off coverage drop (e.g. removing a large tested module) via
|
||
the manual ``POST /coverage/baseline`` override. Unlike ``ratchet_coverage_baseline``
|
||
this CAN lower the baseline. Returns True on success.
|
||
"""
|
||
if not repo:
|
||
return False
|
||
try:
|
||
cov = float(coverage)
|
||
except (TypeError, ValueError):
|
||
return False
|
||
conn = get_db()
|
||
try:
|
||
conn.execute(
|
||
"INSERT INTO coverage_baseline (repo, coverage, source_sha, updated_at) "
|
||
"VALUES (?, ?, ?, datetime('now')) "
|
||
"ON CONFLICT(repo) DO UPDATE SET coverage = excluded.coverage, "
|
||
"source_sha = excluded.source_sha, updated_at = excluded.updated_at",
|
||
(repo, cov, sha),
|
||
)
|
||
conn.commit()
|
||
return True
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def all_coverage_baselines() -> dict:
|
||
"""ORCH-027: all per-repo coverage baselines for the GET /queue snapshot."""
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT repo, coverage, source_sha, updated_at FROM coverage_baseline"
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return {
|
||
r["repo"]: {
|
||
"coverage": r["coverage"],
|
||
"source_sha": r["source_sha"],
|
||
"updated_at": r["updated_at"],
|
||
}
|
||
for r in rows
|
||
}
|
||
|
||
|
||
def _ensure_column(conn, table: str, column: str, decl: str):
|
||
"""Add a column to `table` if it does not already exist (idempotent migration)."""
|
||
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
|
||
if column not in cols:
|
||
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}")
|
||
conn.commit()
|
||
|
||
|
||
def get_task_by_plane_id(plane_id: str) -> dict | None:
|
||
"""Find task by Plane work item ID (checks plane_id and plane_issue_id)."""
|
||
conn = get_db()
|
||
row = conn.execute(
|
||
"SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?", (plane_id, plane_id)
|
||
).fetchone()
|
||
conn.close()
|
||
if row:
|
||
return dict(row)
|
||
return None
|
||
|
||
|
||
def get_task_by_work_item_id(work_item_id: str) -> dict | None:
|
||
"""ORCH-094: read-only lookup of the live task row by human-readable
|
||
``work_item_id`` (e.g. ``"ORCH-061"``).
|
||
|
||
``get_task_by_plane_id`` matches the Plane UUIDs (``plane_id`` /
|
||
``plane_issue_id``), not the human-readable ``work_item_id`` the deploy-phase
|
||
setters receive — hence this thin accessor. A live row matches exactly; the
|
||
ORCH-090 cancel tombstones carry a ``#cancelled-<id>`` suffix on
|
||
``work_item_id`` so they never collide with a clean id. No schema change.
|
||
"""
|
||
if not work_item_id:
|
||
return None
|
||
conn = get_db()
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT * FROM tasks WHERE work_item_id = ?", (work_item_id,)
|
||
).fetchone()
|
||
finally:
|
||
conn.close()
|
||
return dict(row) if row else None
|
||
|
||
|
||
def get_task_by_repo_branch(repo: str, branch: str) -> dict | None:
|
||
"""Find task by repo and branch name."""
|
||
conn = get_db()
|
||
row = conn.execute(
|
||
"SELECT * FROM tasks WHERE repo = ? AND branch = ?", (repo, branch)
|
||
).fetchone()
|
||
conn.close()
|
||
if row:
|
||
return dict(row)
|
||
return None
|
||
|
||
|
||
def get_active_tasks_for_reconcile() -> list[dict]:
|
||
"""ORCH-053 (F-1): tasks eligible for the gate-side sweeper.
|
||
|
||
Returns every task whose stage is not terminal ('done'), each augmented with
|
||
``age_s`` = seconds since ``tasks.updated_at`` (computed in SQL against UTC
|
||
'now', matching how ``update_task_stage`` stamps ``updated_at``). The
|
||
reconciler applies the per-stage grace and active-job guard on top.
|
||
|
||
ORCH-090 (adr-0026): a ``cancelled`` task is DELIBERATELY still returned here
|
||
and skipped by the reconciler's own terminal-skip (``stage in
|
||
('done','cancelled')``, ORCH-086 D2) — narrowing the query to exclude
|
||
``cancelled`` would lose the observability skip-counter increment that ORCH-086
|
||
relies on. The terminal set is harmonised in the *scheduler* predicates
|
||
(serial_gate / task_deps), not here.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT *, "
|
||
"CAST(strftime('%s','now') - strftime('%s', updated_at) AS INTEGER) AS age_s "
|
||
"FROM tasks WHERE stage != 'done'"
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def get_development_tasks_by_repo(repo: str) -> list[dict]:
|
||
"""ORCH-053 (F-3): tasks of a repo currently on the 'development' stage.
|
||
|
||
Used as the sha->branch DB fallback in handle_ci_status: a CI-status webhook
|
||
whose branch could not be resolved (no branches[], empty
|
||
``git branch -r --contains``) is matched to the unique development task of
|
||
the repo (ambiguity -> caller leaves it unresolved).
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT * FROM tasks WHERE repo = ? AND stage = 'development'", (repo,)
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def create_task_atomic(
|
||
plane_id: str,
|
||
work_item_id: str,
|
||
repo: str,
|
||
branch: str,
|
||
stage: str,
|
||
title: str,
|
||
) -> tuple[dict, bool]:
|
||
"""ORCH-053 (AC-4): atomically claim creation of a task for a plane_id.
|
||
|
||
Performs SELECT-exists -> INSERT under the process-wide ``_CREATE_TASK_LOCK``
|
||
so a race between the live Plane webhook and the F-2 reconciler (both seeing
|
||
"no task yet" for the same plane_id) cannot create two task rows / branches /
|
||
worktrees / starter analyst jobs.
|
||
|
||
Returns ``(row, created)``:
|
||
* ``created=True`` -> THIS caller inserted the row and owns the follow-up
|
||
work (branch / docs / analyst enqueue);
|
||
* ``created=False`` -> a task for this plane_id already existed (the other
|
||
racer won); ``row`` is the existing task and the caller must NOT duplicate
|
||
the follow-up work.
|
||
"""
|
||
with _CREATE_TASK_LOCK:
|
||
conn = get_db()
|
||
try:
|
||
existing = conn.execute(
|
||
"SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?",
|
||
(plane_id, plane_id),
|
||
).fetchone()
|
||
if existing:
|
||
return dict(existing), False
|
||
cur = conn.execute(
|
||
"INSERT INTO tasks "
|
||
"(plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
|
||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||
(plane_id, work_item_id, repo, branch, stage, plane_id, title),
|
||
)
|
||
conn.commit()
|
||
row = conn.execute(
|
||
"SELECT * FROM tasks WHERE id = ?", (cur.lastrowid,)
|
||
).fetchone()
|
||
return dict(row), True
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def update_task_stage(task_id: int, stage: str):
|
||
"""Update task stage and timestamp."""
|
||
conn = get_db()
|
||
conn.execute(
|
||
"UPDATE tasks SET stage = ?, updated_at = datetime('now') WHERE id = ?",
|
||
(stage, task_id),
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ORCH-019: bug-fast-track task type (tasks.track) helpers
|
||
# ---------------------------------------------------------------------------
|
||
def set_task_track(task_id: int, track: str) -> None:
|
||
"""ORCH-019: persist the task's pipeline track ('full' | 'bug').
|
||
|
||
Idempotent overwrite. Called from start_pipeline (after a successful atomic
|
||
create, when the issue carries the `Bug` label) and from the escalate endpoint
|
||
(reset 'bug' -> 'full' to return a complex bug to the full cycle).
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tasks SET track = ? WHERE id = ?", (track, task_id)
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_task_track(task_id: int) -> str:
|
||
"""ORCH-019: read the task's pipeline track; missing/NULL -> 'full' (fail-safe).
|
||
|
||
Read in the hot advance_stage path for the routing-override (skips architecture).
|
||
A non-existent row, a NULL value, or any read error degrades to 'full' so a bug
|
||
can never be created by accident (fail-safe -> full cycle).
|
||
"""
|
||
try:
|
||
conn = get_db()
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT track FROM tasks WHERE id = ?", (task_id,)
|
||
).fetchone()
|
||
finally:
|
||
conn.close()
|
||
if not row:
|
||
return "full"
|
||
return row["track"] or "full"
|
||
except Exception: # noqa: BLE001 - fail-safe -> full cycle
|
||
return "full"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Telegram live tracker helpers (feat/telegram-live-tracker)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def get_tracker_message_id(task_id: int) -> int | None:
|
||
"""Return the stored Telegram tracker message_id for a task, or None."""
|
||
conn = get_db()
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT tracker_message_id FROM tasks WHERE id=?", (task_id,)
|
||
).fetchone()
|
||
finally:
|
||
conn.close()
|
||
return row[0] if row and row[0] is not None else None
|
||
|
||
|
||
def set_tracker_message_id(task_id: int, message_id: int) -> None:
|
||
"""Persist the Telegram tracker message_id for a task (idempotent overwrite)."""
|
||
conn = get_db()
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tasks SET tracker_message_id=? WHERE id=?",
|
||
(message_id, task_id),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ORCH-087 (BR-G1): tracker_messages ledger — full accounting of every card mid
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def add_tracker_message(task_id: int, message_id: int) -> None:
|
||
"""ORCH-087: record a freshly-created tracker card mid in the ledger.
|
||
|
||
Called ONLY after a successful send_telegram (new_mid is not None). INSERT OR
|
||
IGNORE keeps it idempotent: a repeat mid (race / restart replay) does not
|
||
duplicate the row or resurrect a deleted_at stamp.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
conn.execute(
|
||
"INSERT OR IGNORE INTO tracker_messages (task_id, message_id) "
|
||
"VALUES (?, ?)",
|
||
(task_id, message_id),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_open_tracker_messages(task_id: int) -> list[int]:
|
||
"""ORCH-087: all still-open (deleted_at IS NULL) card mids for a task.
|
||
|
||
These are the cards the next bump must clean up. Ordered oldest-first so the
|
||
oldest orphans are deleted first. Never includes the rows already marked
|
||
deleted.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT message_id FROM tracker_messages "
|
||
"WHERE task_id=? AND deleted_at IS NULL ORDER BY message_id ASC",
|
||
(task_id,),
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [r[0] for r in rows]
|
||
|
||
|
||
def mark_tracker_message_deleted(task_id: int, message_id: int) -> None:
|
||
"""ORCH-087: stamp deleted_at on a card mid that is confirmed gone.
|
||
|
||
Called for mids that delete_telegram reported as gone (deleted now OR already
|
||
gone / >48h per _DELETE_GONE_MARKERS) so they drop out of
|
||
get_open_tracker_messages. Transient-delete mids are left untouched (NULL) for
|
||
a retry on the next bump.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tracker_messages SET deleted_at=datetime('now') "
|
||
"WHERE task_id=? AND message_id=? AND deleted_at IS NULL",
|
||
(task_id, message_id),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def mark_brd_review_started(task_id: int) -> None:
|
||
"""Stamp when BRD review (the human approve gate) started, if not already set.
|
||
|
||
Idempotent: only sets it the first time (a retried analyst run must not reset
|
||
the clock). The delta to brd_review_ended_at is the only "твоё время".
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tasks SET brd_review_started_at=datetime('now') "
|
||
"WHERE id=? AND brd_review_started_at IS NULL",
|
||
(task_id,),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def mark_brd_review_ended(task_id: int) -> None:
|
||
"""Stamp when BRD review ended (analysis->architecture advance / Approved).
|
||
|
||
Idempotent: only sets it the first time and only if a start exists.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
conn.execute(
|
||
"UPDATE tasks SET brd_review_ended_at=datetime('now') "
|
||
"WHERE id=? AND brd_review_started_at IS NOT NULL "
|
||
"AND brd_review_ended_at IS NULL",
|
||
(task_id,),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||
"""Generate next work item ID (e.g., ET-003 / ORCH-001).
|
||
|
||
ORCH-6: numbering is per (repo, prefix). The prefix comes from the project
|
||
registry (proj.work_item_prefix), so orchestrator issues number ORCH-001,
|
||
ORCH-002 independently of the ET sequence in enduro-trails. Default prefix
|
||
stays "ET" for backward compatibility with existing callers.
|
||
"""
|
||
conn = get_db()
|
||
row = conn.execute(
|
||
"SELECT work_item_id FROM tasks "
|
||
"WHERE repo = ? AND work_item_id LIKE ? AND work_item_id IS NOT NULL "
|
||
"ORDER BY id DESC LIMIT 1",
|
||
(repo, f"{prefix}-%"),
|
||
).fetchone()
|
||
conn.close()
|
||
|
||
if row and row["work_item_id"]:
|
||
# Parse <PREFIX>-003 -> 3, increment (keep the existing prefix).
|
||
existing_prefix, num = row["work_item_id"].rsplit("-", 1)
|
||
prefix = existing_prefix
|
||
next_num = int(num) + 1
|
||
else:
|
||
next_num = 1
|
||
|
||
return f"{prefix}-{next_num:03d}"
|
||
|
||
|
||
def ensure_unique_work_item_id(work_item_id: str, repo: str) -> str:
|
||
"""BUG 2a: guarantee work_item_id uniqueness within (repo) over M-6 derive.
|
||
|
||
M-6 derives the work_item_id from the Plane sequence_id. That number can
|
||
collide (e.g. an issue was deleted and the sequence reused, or two issues
|
||
map to the same number) -> the SAME ET-NNN gets handed to two different
|
||
tasks, which then physically share a branch/worktree slug prefix and step on
|
||
each other (see ET-006: task 8 and task 25).
|
||
|
||
This is a guard LAYERED ON TOP of the M-6 derive (it does NOT replace it):
|
||
given the derived id, if that exact <PREFIX>-NNN already exists in the tasks
|
||
table for this repo, walk forward (ET-007, ET-008, ...) until a free number
|
||
is found and return that instead. If the derived id is free, it is returned
|
||
unchanged.
|
||
"""
|
||
if not work_item_id or "-" not in work_item_id:
|
||
return work_item_id
|
||
prefix, num_str = work_item_id.rsplit("-", 1)
|
||
try:
|
||
num = int(num_str)
|
||
except ValueError:
|
||
return work_item_id
|
||
width = len(num_str)
|
||
|
||
conn = get_db()
|
||
try:
|
||
candidate = work_item_id
|
||
while conn.execute(
|
||
"SELECT 1 FROM tasks WHERE repo = ? AND work_item_id = ? LIMIT 1",
|
||
(repo, candidate),
|
||
).fetchone() is not None:
|
||
num += 1
|
||
candidate = f"{prefix}-{num:0{width}d}"
|
||
return candidate
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ORCH-5 (M-7): idempotent webhook event logging
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def insert_event_dedup(
|
||
source: str, event_type: str, payload: str, delivery_id: str
|
||
) -> bool:
|
||
"""Idempotently log a webhook event keyed by delivery_id.
|
||
|
||
Returns True if a NEW row was inserted (caller should dispatch the event) and
|
||
False if this delivery_id was already present (a duplicate delivery -> caller
|
||
must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE
|
||
index idx_events_delivery; rowcount==1 means the row was actually inserted.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
cur = conn.execute(
|
||
"INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) "
|
||
"VALUES (?, ?, ?, ?)",
|
||
(source, event_type, payload, delivery_id),
|
||
)
|
||
conn.commit()
|
||
return cur.rowcount == 1
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ORCH-1 (F-2b): job queue helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def enqueue_job(
|
||
agent: str,
|
||
repo: str,
|
||
task_content: str | None = None,
|
||
task_id: int | None = None,
|
||
max_attempts: int = 2,
|
||
available_at_delay_s: int | None = None,
|
||
) -> int:
|
||
"""Enqueue a new job (status='queued'). Returns the new job id.
|
||
|
||
This is what webhook handlers call instead of launching an agent in-process:
|
||
it is a fast DB INSERT that returns immediately. The background worker
|
||
(queue_worker) picks the job up later.
|
||
|
||
ORCH-043 (merge-gate defer): when ``available_at_delay_s`` is given the job's
|
||
``available_at`` is set to ``now + delay`` so claim_next_job won't pick it up
|
||
until the delay elapses (re-uses the existing ORCH-1 backoff gate). Used to
|
||
re-queue the staging-deployer after a "merge-lock busy" defer without burning a
|
||
worker slot in a blocking wait.
|
||
"""
|
||
conn = get_db()
|
||
if available_at_delay_s is not None:
|
||
cursor = conn.execute(
|
||
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts, available_at) "
|
||
"VALUES (?, ?, ?, ?, ?, datetime('now', ?))",
|
||
(agent, repo, task_id, task_content, max_attempts,
|
||
f"+{int(available_at_delay_s)} seconds"),
|
||
)
|
||
else:
|
||
cursor = conn.execute(
|
||
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) "
|
||
"VALUES (?, ?, ?, ?, ?)",
|
||
(agent, repo, task_id, task_content, max_attempts),
|
||
)
|
||
job_id = cursor.lastrowid
|
||
conn.commit()
|
||
conn.close()
|
||
return job_id
|
||
|
||
|
||
def claim_next_job() -> dict | None:
|
||
"""Atomically claim the oldest queued job and mark it 'running'.
|
||
|
||
Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause
|
||
and we check `rowcount`. If two worker ticks race for the same row, only the
|
||
first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0
|
||
and retries the SELECT. We rely on SQLite's default per-connection transaction
|
||
so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None
|
||
when the queue is empty.
|
||
"""
|
||
# ORCH-026 (Level B, B-2): scheduler dependency gate. When task_deps_enabled
|
||
# is on, a job whose task has an UNFINISHED declared dependency
|
||
# (job_deps.depends_on_task_id -> a task with stage != 'done') is NOT
|
||
# claimable -> it stays 'queued' without occupying a max_concurrency slot.
|
||
# Jobs with a NULL task_id (no task) or with no job_deps rows are unaffected
|
||
# (NOT EXISTS is True). Kill-switch off -> the clause is omitted -> 1:1 the
|
||
# ORCH-1 query. The gate reads only the DB (offline-safe hot path).
|
||
dep_gate = ""
|
||
if getattr(settings, "task_deps_enabled", False):
|
||
dep_gate = (
|
||
"AND NOT EXISTS ("
|
||
" SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
|
||
# ORCH-090 (adr-0026): a cancelled predecessor is TERMINAL -> the
|
||
# dependent must NOT wait on it forever. Terminal set = {done,cancelled}.
|
||
" WHERE d.task_id = jobs.task_id AND t.stage NOT IN ('done','cancelled')"
|
||
") "
|
||
)
|
||
# ORCH-088 (FR-1, ADR-001 D1): per-repo serial gate. An analyst-job of a NEW
|
||
# task is NOT claimable while the same repo has another unfinished task OR is
|
||
# frozen. The fragment is built in the serial_gate leaf (sanitised repo scope,
|
||
# fail-OPEN on any build error so a transient fault never wedges the queue of
|
||
# ALL projects — AC-8). Jobs of an already-active task (architect/.../deployer)
|
||
# are unaffected — the gate keys on jobs.agent='analyst' only. Reads only the
|
||
# local DB (offline-safe hot path, NFR-2).
|
||
serial_gate = ""
|
||
try:
|
||
from . import serial_gate as _serial_gate
|
||
serial_gate = _serial_gate.build_claim_clause()
|
||
except Exception: # noqa: BLE001 - fail-OPEN: never wedge the claim
|
||
serial_gate = ""
|
||
conn = get_db()
|
||
try:
|
||
while True:
|
||
row = conn.execute(
|
||
"SELECT id FROM jobs WHERE status='queued' "
|
||
"AND (available_at IS NULL OR available_at <= datetime('now')) "
|
||
f"{dep_gate}"
|
||
f"{serial_gate}"
|
||
"ORDER BY id LIMIT 1"
|
||
).fetchone()
|
||
if not row:
|
||
return None
|
||
job_id = row["id"]
|
||
cur = conn.execute(
|
||
"UPDATE jobs SET status='running', "
|
||
"attempts = attempts + 1, started_at = datetime('now') "
|
||
"WHERE id = ? AND status='queued'",
|
||
(job_id,),
|
||
)
|
||
conn.commit()
|
||
if cur.rowcount == 1:
|
||
claimed = conn.execute(
|
||
"SELECT * FROM jobs WHERE id = ?", (job_id,)
|
||
).fetchone()
|
||
return dict(claimed)
|
||
# Lost the race for this row; loop and try the next queued job.
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int,
|
||
error: str | None = None) -> None:
|
||
"""ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net).
|
||
|
||
Increments `transient_attempts` (separate from the code-fault `attempts`),
|
||
sets status back to 'queued', and gates re-pickup via `available_at` =
|
||
now + backoff seconds. started_at/finished_at are cleared.
|
||
"""
|
||
conn = get_db()
|
||
sets = [
|
||
"status='queued'",
|
||
"transient_attempts = transient_attempts + 1",
|
||
"available_at = datetime('now', ?)",
|
||
"started_at = NULL",
|
||
"finished_at = NULL",
|
||
]
|
||
params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"]
|
||
if error is not None:
|
||
sets.append("error = ?")
|
||
params.append(error)
|
||
params.append(job_id)
|
||
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def mark_job(
|
||
job_id: int,
|
||
status: str,
|
||
run_id: int | None = None,
|
||
error: str | None = None,
|
||
):
|
||
"""Update a job's status (queued|running|done|failed|cancelled).
|
||
|
||
- run_id (optional): link to the agent_runs row that executed this job.
|
||
- error (optional): last error message (for failed/retry).
|
||
- 'done'/'failed'/'cancelled' (ORCH-090) also stamp finished_at.
|
||
- 'queued' (requeue for retry) clears started_at/finished_at so the next
|
||
claim treats it as fresh.
|
||
"""
|
||
conn = get_db()
|
||
sets = ["status = ?"]
|
||
params: list = [status]
|
||
if run_id is not None:
|
||
sets.append("run_id = ?")
|
||
params.append(run_id)
|
||
if error is not None:
|
||
sets.append("error = ?")
|
||
params.append(error)
|
||
if status in ("done", "failed", "cancelled"):
|
||
sets.append("finished_at = datetime('now')")
|
||
elif status == "queued":
|
||
sets.append("started_at = NULL")
|
||
sets.append("finished_at = NULL")
|
||
params.append(job_id)
|
||
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def has_active_job_for_task(task_id: int) -> bool:
|
||
"""True if the task already has a queued or running job.
|
||
|
||
Used by the status-only verdict model (handle_status_start) to guard against
|
||
double-launching an agent when a duplicate In Progress webhook arrives or a
|
||
job is still in flight. The events de-dup absorbs identical webhook bodies;
|
||
this guards against distinct webhooks while a job is pending/running.
|
||
"""
|
||
conn = get_db()
|
||
row = conn.execute(
|
||
"SELECT 1 FROM jobs WHERE task_id = ? AND status IN ('queued','running') LIMIT 1",
|
||
(task_id,),
|
||
).fetchone()
|
||
conn.close()
|
||
return row is not None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ORCH-090: STOP-cancellation helpers (task + jobs terminal state)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def get_task(task_id: int) -> dict | None:
|
||
"""Fetch a single task row by id (None when absent)."""
|
||
conn = get_db()
|
||
try:
|
||
row = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||
finally:
|
||
conn.close()
|
||
return dict(row) if row else None
|
||
|
||
|
||
def get_active_jobs_for_task(task_id: int) -> list[dict]:
|
||
"""ORCH-090: queued/running jobs of a task (for STOP — stop agent + cancel).
|
||
|
||
Returns the full job rows (incl. ``pid`` / ``run_id`` / ``status``) so the
|
||
cancel orchestrator can SIGTERM the running agent by ``jobs.pid`` and then flip
|
||
every job to the terminal ``cancelled`` outcome.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT * FROM jobs WHERE task_id = ? AND status IN ('queued','running') "
|
||
"ORDER BY id",
|
||
(task_id,),
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def cancel_jobs_for_task(task_id: int, only_queued: bool = False) -> int:
|
||
"""ORCH-090 (ADR-001 D3): flip a task's jobs to the terminal ``cancelled`` outcome.
|
||
|
||
Guarded UPDATE over ``status IN ('queued','running')`` (or only ``'queued'`` when
|
||
``only_queued`` — the deferred-cancel path inside a critical merge/deploy window,
|
||
D7, which must NOT cancel the still-running deploy/merge actor). ``cancelled`` is
|
||
never requeued: ``claim_next_job`` only selects ``status='queued'`` and the reaper
|
||
/ worker check the task's terminal stage before any requeue. Returns the number of
|
||
jobs cancelled. never-raise -> 0 on error.
|
||
"""
|
||
statuses = "('queued')" if only_queued else "('queued','running')"
|
||
try:
|
||
conn = get_db()
|
||
try:
|
||
cur = conn.execute(
|
||
f"UPDATE jobs SET status='cancelled', finished_at=datetime('now') "
|
||
f"WHERE task_id = ? AND status IN {statuses}",
|
||
(task_id,),
|
||
)
|
||
conn.commit()
|
||
return cur.rowcount or 0
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
return 0
|
||
|
||
|
||
def mark_task_cancelled(task_id: int) -> bool:
|
||
"""ORCH-090 (ADR-001 D4): durable terminal + natural-key tombstone for a task.
|
||
|
||
Atomically (single UPDATE):
|
||
* ``stage='cancelled'`` (durable terminal, understood by the reconciler skip);
|
||
* ``cancelled_at=now``, ``cancel_requested_at=NULL`` (clear any deferred flag);
|
||
* TOMBSTONE the natural keys so a later "To Analyse" re-creates the task FROM
|
||
SCRATCH: ``plane_id`` / ``work_item_id`` / ``plane_issue_id`` get a
|
||
deterministic ``#cancelled-<id>`` suffix -> ``get_task_by_plane_id`` returns
|
||
None and the anti-dup / uniqueness guards no longer collide. The row is NOT
|
||
deleted (durable audit).
|
||
|
||
ADR-001 D4 refinement (ORCH-090): the ADR proposed keeping ``plane_issue_id``
|
||
untouched for audit, but ``get_task_by_plane_id`` / ``create_task_atomic`` match
|
||
on ``plane_id OR plane_issue_id`` — leaving ``plane_issue_id`` matchable would
|
||
keep the cancelled row "findable" and BLOCK the clean-slate re-create (BR-3 /
|
||
TR-4). We therefore suffix it too; the ``#cancelled-<id>`` tag is deterministic
|
||
and parseable, so the original Plane issue UUID (== the original ``plane_id`` in
|
||
every create path) is still fully recoverable for audit.
|
||
|
||
Idempotent-safe: the suffix is only appended when not already present (a repeat
|
||
STOP on an already-cancelled row does not double-suffix). Returns True iff the
|
||
row was updated. never-raise -> False on error.
|
||
"""
|
||
try:
|
||
conn = get_db()
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT plane_id, work_item_id, plane_issue_id FROM tasks WHERE id = ?",
|
||
(task_id,),
|
||
).fetchone()
|
||
if not row:
|
||
return False
|
||
suffix = f"#cancelled-{task_id}"
|
||
|
||
def _tomb(v):
|
||
v = v or ""
|
||
return v if suffix in v else f"{v}{suffix}"
|
||
|
||
plane_id = _tomb(row["plane_id"])
|
||
work_item_id = _tomb(row["work_item_id"])
|
||
plane_issue_id = _tomb(row["plane_issue_id"])
|
||
conn.execute(
|
||
"UPDATE tasks SET stage='cancelled', cancelled_at=datetime('now'), "
|
||
"cancel_requested_at=NULL, plane_id=?, work_item_id=?, plane_issue_id=?, "
|
||
"updated_at=datetime('now') WHERE id = ?",
|
||
(plane_id, work_item_id, plane_issue_id, task_id),
|
||
)
|
||
conn.commit()
|
||
return True
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def set_task_cancel_requested(task_id: int) -> bool:
|
||
"""ORCH-090 (ADR-001 D7): mark a deferred cancellation (STOP in critical window).
|
||
|
||
Idempotent: only stamps ``cancel_requested_at`` the first time. Returns the
|
||
**first-stamp fact** — ``True`` iff THIS call actually stamped the column (a
|
||
repeated STOP while still deferred updates 0 rows -> ``False``), so the caller can
|
||
suppress duplicate notifications (AC-6). The deterministic deploy/merge finalizer
|
||
reads the column once the irreversible step completes and then applies the full
|
||
cancellation. never-raise -> False on error.
|
||
"""
|
||
try:
|
||
conn = get_db()
|
||
try:
|
||
cur = conn.execute(
|
||
"UPDATE tasks SET cancel_requested_at=datetime('now') "
|
||
"WHERE id = ? AND cancel_requested_at IS NULL",
|
||
(task_id,),
|
||
)
|
||
conn.commit()
|
||
return cur.rowcount > 0
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def cancelled_tasks_snapshot(limit: int = 10) -> dict:
|
||
"""ORCH-090 (AC-10): read-only cancellation summary for GET /queue.
|
||
|
||
Returns ``{count, pending, recent}`` where ``count`` is the number of cancelled
|
||
tasks, ``pending`` the number with a deferred (not-yet-applied) cancellation, and
|
||
``recent`` the last ``limit`` cancelled tasks. never-raise -> minimal dict.
|
||
"""
|
||
try:
|
||
conn = get_db()
|
||
try:
|
||
count = conn.execute(
|
||
"SELECT COUNT(*) FROM tasks WHERE stage='cancelled'"
|
||
).fetchone()[0]
|
||
pending = conn.execute(
|
||
"SELECT COUNT(*) FROM tasks WHERE cancel_requested_at IS NOT NULL "
|
||
"AND stage != 'cancelled'"
|
||
).fetchone()[0]
|
||
recent = [
|
||
{"work_item_id": r["work_item_id"], "repo": r["repo"],
|
||
"cancelled_at": r["cancelled_at"]}
|
||
for r in conn.execute(
|
||
"SELECT work_item_id, repo, cancelled_at FROM tasks "
|
||
"WHERE stage='cancelled' ORDER BY cancelled_at DESC LIMIT ?",
|
||
(limit,),
|
||
).fetchall()
|
||
]
|
||
finally:
|
||
conn.close()
|
||
return {"count": int(count), "pending": int(pending), "recent": recent}
|
||
except Exception:
|
||
return {"count": 0, "pending": 0, "recent": []}
|
||
|
||
|
||
def count_running_jobs() -> int:
|
||
"""Number of jobs currently in 'running' status (for max_concurrency)."""
|
||
conn = get_db()
|
||
n = conn.execute(
|
||
"SELECT COUNT(*) FROM jobs WHERE status='running'"
|
||
).fetchone()[0]
|
||
conn.close()
|
||
return int(n)
|
||
|
||
|
||
def requeue_running_jobs() -> int:
|
||
"""Queue-recovery: on startup, any job left 'running' belongs to a worker that
|
||
died on restart -> put it back to 'queued'. attempts are kept as-is (the next
|
||
claim does NOT re-increment beyond what is needed; claim_next_job increments on
|
||
pickup). Returns the number of requeued jobs.
|
||
"""
|
||
conn = get_db()
|
||
cur = conn.execute(
|
||
"UPDATE jobs SET status='queued', started_at = NULL "
|
||
"WHERE status='running'"
|
||
)
|
||
conn.commit()
|
||
n = cur.rowcount
|
||
conn.close()
|
||
return int(n)
|
||
|
||
|
||
def get_running_jobs() -> list[dict]:
|
||
"""ORCH-065: snapshot of every 'running' job for the job-reaper scan.
|
||
|
||
Each row carries the job columns plus four reaper inputs:
|
||
* ``running_age_s`` — seconds since ``started_at`` (Tier-3 backstop);
|
||
* ``exit_code`` — the linked ``agent_runs.exit_code`` (Tier-2: process
|
||
finished but the job is still 'running' -> monitor died mid-finalize);
|
||
* ``finished_at_run`` — the linked ``agent_runs.finished_at``;
|
||
* ``finished_age_s`` — seconds since ``agent_runs.finished_at`` (Tier-2
|
||
finalization grace: a LIVE monitor writes exit_code, THEN does git
|
||
push / PR / Plane comments before _finalize_job, so a freshly-finished
|
||
run is NOT yet a zombie — the reaper waits ``reaper_finalize_grace_s``).
|
||
|
||
A LEFT JOIN on ``run_id`` keeps jobs with no agent_runs row (exit_code NULL).
|
||
Read-only; never mutates. The reaper applies liveness/streak/backstop on top.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT j.*, "
|
||
"CAST(strftime('%s','now') - strftime('%s', j.started_at) AS INTEGER) "
|
||
" AS running_age_s, "
|
||
"r.exit_code AS exit_code, r.finished_at AS finished_at_run, "
|
||
"CAST(strftime('%s','now') - strftime('%s', r.finished_at) AS INTEGER) "
|
||
" AS finished_age_s "
|
||
"FROM jobs j LEFT JOIN agent_runs r ON r.id = j.run_id "
|
||
"WHERE j.status='running'"
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def get_running_agents() -> list[dict]:
|
||
"""ORCH-099 (D5): read-only liveness snapshot of every 'running' job for /metrics.
|
||
|
||
A dedicated read-only SELECT — deliberately NOT an extension of
|
||
``get_running_jobs()`` (the job-reaper hot path, ORCH-065): widening that
|
||
query under observability needs would migrate a foreign component's invariant.
|
||
Each row carries the process identity + cost context the F1b sidecar needs:
|
||
* ``job_id`` / ``run_id`` / ``pid`` — process identity (pid may be NULL until
|
||
the launcher stamps it / after the process exits);
|
||
* ``agent`` / ``repo`` — role and project (the sidecar is multi-project);
|
||
* ``running_age_s`` — seconds since ``jobs.started_at`` (the same process
|
||
anchor the reaper uses for backstop-liveness, D6);
|
||
* ``model`` / ``effort`` — cost context (LEFT JOIN ``agent_runs``);
|
||
* the token / ``cost_usd`` columns — current per-run accruals, usually NULL
|
||
until the launcher parses the CLI result JSON on finish (honest raw, TR-5).
|
||
|
||
A LEFT JOIN on ``run_id`` keeps a job with no ``agent_runs`` row. Read-only;
|
||
never mutates.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT j.id AS job_id, j.run_id AS run_id, j.pid AS pid, "
|
||
"j.agent AS agent, j.repo AS repo, j.started_at AS started_at, "
|
||
"CAST(strftime('%s','now') - strftime('%s', j.started_at) AS INTEGER) "
|
||
" AS running_age_s, "
|
||
"r.model AS model, r.effort AS effort, r.cost_usd AS cost_usd, "
|
||
"r.input_tokens AS input_tokens, r.output_tokens AS output_tokens, "
|
||
"r.cache_read_tokens AS cache_read_tokens, "
|
||
"r.cache_creation_tokens AS cache_creation_tokens "
|
||
"FROM jobs j LEFT JOIN agent_runs r ON r.id = j.run_id "
|
||
"WHERE j.status='running'"
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def agent_cost_totals() -> dict:
|
||
"""ORCH-099 (D7): read-only aggregate of cost / tokens over all agent_runs.
|
||
|
||
Pure ``SELECT COALESCE(SUM(...),0)`` — an empty ``agent_runs`` table yields
|
||
zeros, never an error (TC-06 / TC-11). Read-only; never mutates.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT "
|
||
"COALESCE(SUM(cost_usd),0) AS cost_usd, "
|
||
"COALESCE(SUM(input_tokens),0) AS input_tokens, "
|
||
"COALESCE(SUM(output_tokens),0) AS output_tokens, "
|
||
"COALESCE(SUM(cache_read_tokens),0) AS cache_read_tokens, "
|
||
"COALESCE(SUM(cache_creation_tokens),0) AS cache_creation_tokens "
|
||
"FROM agent_runs"
|
||
).fetchone()
|
||
finally:
|
||
conn.close()
|
||
return dict(row) if row else {
|
||
"cost_usd": 0,
|
||
"input_tokens": 0,
|
||
"output_tokens": 0,
|
||
"cache_read_tokens": 0,
|
||
"cache_creation_tokens": 0,
|
||
}
|
||
|
||
|
||
def queue_retry_stats() -> dict:
|
||
"""ORCH-099 (D4): read-only retry raw over UNFINISHED jobs for /metrics.queue.
|
||
|
||
Aggregates ``attempts`` / ``transient_attempts`` and counts jobs currently in
|
||
backoff (``available_at > now``) across non-terminal jobs (status NOT IN
|
||
done/failed/cancelled). Read-only; never mutates.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
row = conn.execute(
|
||
"SELECT "
|
||
"COALESCE(SUM(attempts),0) AS total_attempts, "
|
||
"COALESCE(SUM(transient_attempts),0) AS total_transient_attempts, "
|
||
"COALESCE(MAX(attempts),0) AS max_attempts_seen, "
|
||
"COALESCE(SUM(CASE WHEN available_at IS NOT NULL "
|
||
" AND available_at > datetime('now') THEN 1 ELSE 0 END),0) AS in_backoff "
|
||
"FROM jobs WHERE status NOT IN ('done','failed','cancelled')"
|
||
).fetchone()
|
||
finally:
|
||
conn.close()
|
||
return dict(row) if row else {
|
||
"total_attempts": 0,
|
||
"total_transient_attempts": 0,
|
||
"max_attempts_seen": 0,
|
||
"in_backoff": 0,
|
||
}
|
||
|
||
|
||
def reap_running_job(
|
||
job_id: int,
|
||
status: str,
|
||
run_id: int | None = None,
|
||
error: str | None = None,
|
||
) -> bool:
|
||
"""ORCH-065: atomic terminal flip of a RUNNING job by the job-reaper.
|
||
|
||
Mirrors ``mark_job`` but carries the ``status='running'`` guard in the WHERE
|
||
clause and reports ``rowcount`` so a late-arriving monitor / the startup
|
||
``requeue_running_jobs`` / a second reaper tick can never double-process the
|
||
same row (AC-5, restart-safe). Returns True iff THIS call won the flip
|
||
(rowcount == 1); False -> someone else already moved the row.
|
||
|
||
Status semantics match ``mark_job``: done/failed stamp ``finished_at``; queued
|
||
clears ``started_at``/``finished_at`` so the next claim treats it as fresh.
|
||
"""
|
||
conn = get_db()
|
||
try:
|
||
sets = ["status = ?"]
|
||
params: list = [status]
|
||
if run_id is not None:
|
||
sets.append("run_id = ?")
|
||
params.append(run_id)
|
||
if error is not None:
|
||
sets.append("error = ?")
|
||
params.append(error)
|
||
if status in ("done", "failed", "cancelled"): # ORCH-090: cancelled is terminal
|
||
sets.append("finished_at = datetime('now')")
|
||
elif status == "queued":
|
||
sets.append("started_at = NULL")
|
||
sets.append("finished_at = NULL")
|
||
params.append(job_id)
|
||
cur = conn.execute(
|
||
f"UPDATE jobs SET {', '.join(sets)} WHERE id = ? AND status='running'",
|
||
params,
|
||
)
|
||
conn.commit()
|
||
return cur.rowcount == 1
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_job(job_id: int) -> dict | None:
|
||
"""Fetch a single job by id."""
|
||
conn = get_db()
|
||
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
|
||
conn.close()
|
||
return dict(row) if row else None
|
||
|
||
|
||
def job_status_counts() -> dict:
|
||
"""Return counts grouped by status (for /queue and /metrics observability).
|
||
|
||
ORCH-099 (D4): the default dict carries the ``cancelled`` terminal key
|
||
(ORCH-090, terminal set ``{done, cancelled}``) so the key is always present
|
||
with a 0 default instead of materialising only when a cancelled job exists.
|
||
Purely additive — the GROUP BY query is unchanged and pre-existing keys keep
|
||
their meaning (no /queue contract break).
|
||
"""
|
||
conn = get_db()
|
||
rows = conn.execute(
|
||
"SELECT status, COUNT(*) AS n FROM jobs GROUP BY status"
|
||
).fetchall()
|
||
conn.close()
|
||
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0, "cancelled": 0}
|
||
for r in rows:
|
||
counts[r["status"]] = r["n"]
|
||
return counts
|
||
|
||
|
||
def recent_jobs(limit: int = 10) -> list[dict]:
|
||
"""Return the most recent jobs (for /queue observability)."""
|
||
conn = get_db()
|
||
rows = conn.execute(
|
||
"SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,)
|
||
).fetchall()
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ORCH-026 (Level B): declarative task-dependency helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def add_dependency(task_id: int, depends_on_task_id: int) -> bool:
|
||
"""Declare that task ``task_id`` (B) is blocked-by ``depends_on_task_id`` (A).
|
||
|
||
Idempotent INSERT OR IGNORE against the job_deps PK (re-declaring the same
|
||
edge is a no-op). A self-edge (task depends on itself) is rejected — it would
|
||
deadlock the task forever and can never be satisfied. never-raise
|
||
(self-hosting safety, AC-G1): any DB error -> returns False, the caller must
|
||
not crash the webhook / worker. Returns True iff a NEW edge row was inserted.
|
||
"""
|
||
if task_id is None or depends_on_task_id is None:
|
||
return False
|
||
if task_id == depends_on_task_id:
|
||
return False
|
||
try:
|
||
conn = get_db()
|
||
try:
|
||
cur = conn.execute(
|
||
"INSERT OR IGNORE INTO job_deps (task_id, depends_on_task_id) "
|
||
"VALUES (?, ?)",
|
||
(task_id, depends_on_task_id),
|
||
)
|
||
conn.commit()
|
||
return cur.rowcount == 1
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def get_dependencies(task_id: int) -> list[int]:
|
||
"""Return the list of depends_on_task_id (A) that ``task_id`` (B) waits for.
|
||
|
||
never-raise: any DB error -> [] (conservative: caller treats the task as
|
||
having no declared dependency rather than crashing).
|
||
"""
|
||
try:
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT depends_on_task_id FROM job_deps WHERE task_id = ?",
|
||
(task_id,),
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [r[0] for r in rows]
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
def get_dependency_edges() -> list[tuple[int, int]]:
|
||
"""Return ALL declared edges as ``(task_id, depends_on_task_id)`` tuples.
|
||
|
||
Used by the cycle detector (DFS over the whole declared graph) and the
|
||
/queue snapshot. never-raise -> [] on any DB error.
|
||
"""
|
||
try:
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT task_id, depends_on_task_id FROM job_deps"
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [(r[0], r[1]) for r in rows]
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
def get_unfinished_dependencies(task_id: int) -> list[dict]:
|
||
"""Return the UNFINISHED dependencies of ``task_id`` (A's not yet 'done').
|
||
|
||
Each dict carries the predecessor's ``id``, ``work_item_id`` and ``stage``
|
||
so the readiness gate / Telegram waiting-line can name what B is waiting for.
|
||
never-raise -> [] on any DB error (treated as "ready", consistent with the
|
||
scheduler omitting the gate on failure).
|
||
"""
|
||
try:
|
||
conn = get_db()
|
||
try:
|
||
rows = conn.execute(
|
||
"SELECT t.id AS id, t.work_item_id AS work_item_id, t.stage AS stage "
|
||
"FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
|
||
# ORCH-090 (adr-0026): {done,cancelled} are both terminal -> a
|
||
# cancelled predecessor no longer blocks the dependent.
|
||
"WHERE d.task_id = ? AND t.stage NOT IN ('done','cancelled')",
|
||
(task_id,),
|
||
).fetchall()
|
||
finally:
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ORCH-1b (resilience): transient backoff helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None):
|
||
"""ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure.
|
||
|
||
Unlike a code-fault requeue, this:
|
||
- increments `transient_attempts` (a separate budget from code-fault attempts)
|
||
- sets `available_at = now + delay_seconds` so claim_next_job won't pick it
|
||
up until the backoff window elapses
|
||
- sets status back to 'queued' and clears started_at/finished_at
|
||
|
||
delay_seconds is computed by the caller (exp backoff, capped, Retry-After).
|
||
"""
|
||
conn = get_db()
|
||
conn.execute(
|
||
"UPDATE jobs SET status='queued', "
|
||
"transient_attempts = transient_attempts + 1, "
|
||
"available_at = datetime('now', ? || ' seconds'), "
|
||
"started_at = NULL, finished_at = NULL, "
|
||
"error = COALESCE(?, error) "
|
||
"WHERE id = ?",
|
||
(f"+{int(round(delay_seconds))}", error, job_id),
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
|
||
def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float:
|
||
"""ORCH-1b: exponential backoff (seconds) for a transient failure.
|
||
|
||
delay = min(2**transient_attempts * base, max). If the server sent a
|
||
Retry-After hint we honour it as a floor (use the larger of the two so we
|
||
never poll sooner than the server asked).
|
||
|
||
`transient_attempts` is the count AFTER this failure (i.e. how many transient
|
||
failures have occurred), so the first backoff uses 2**1.
|
||
"""
|
||
base = getattr(settings, "backoff_base_seconds", 10)
|
||
cap = getattr(settings, "backoff_max_seconds", 600)
|
||
exp = min((2 ** max(transient_attempts, 0)) * base, cap)
|
||
if retry_after is not None and retry_after > 0:
|
||
return float(min(max(exp, retry_after), cap))
|
||
return float(exp)
|