import sqlite3 import threading from .config import settings # ORCH-053 (F-2 anti-dup): process-wide lock guarding the SELECT-exists -> INSERT # task-creation claim. The prod topology is a single uvicorn process per DB # (staging/prod isolated), with the webhook running in uvicorn's asyncio thread # and the reconciler in its own thread of the SAME process -> a threading.Lock # covers both sides of the create race without a schema migration. See # docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md §4. _CREATE_TASK_LOCK = threading.Lock() def get_db() -> sqlite3.Connection: conn = sqlite3.connect(settings.db_path) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT DEFAULT (datetime('now')), source TEXT NOT NULL, event_type TEXT NOT NULL, payload TEXT NOT NULL, processed INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, plane_id TEXT, work_item_id TEXT, repo TEXT NOT NULL, branch TEXT, stage TEXT DEFAULT 'created', agent_running TEXT, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')), plane_issue_id TEXT ); CREATE TABLE IF NOT EXISTS agent_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id INTEGER REFERENCES tasks(id), agent TEXT NOT NULL, started_at TEXT DEFAULT (datetime('now')), finished_at TEXT, exit_code INTEGER, output_path TEXT ); -- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and -- return immediately; a background worker claims jobs (respecting -- max_concurrency), spawns the claude agent, and updates the status. -- Restart-safe: running jobs are requeued on startup (queue-recovery). CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent TEXT NOT NULL, repo TEXT NOT NULL, task_id INTEGER, -- FK tasks.id (nullable) task_content TEXT, -- written to the agent task_file status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed|cancelled (ORCH-090: cancelled is a terminal outcome, never requeued) attempts INTEGER NOT NULL DEFAULT 0, max_attempts INTEGER NOT NULL DEFAULT 2, run_id INTEGER, -- agent_runs.id once started error TEXT, -- last error message transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now) created_at TEXT DEFAULT (datetime('now')), started_at TEXT, finished_at TEXT ); CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id); """) # Lightweight migration: add resilience columns to a pre-existing jobs table # (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table). _ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0") _ensure_column(conn, "jobs", "available_at", "TEXT") # ORCH-065: pid of the spawned agent process, stamped in launcher._spawn next to # run_id/started_at. The job-reaper uses it for Tier-1 liveness (os.kill(pid, 0)) # to detect a 'running' job whose process died before _finalize_job. Idempotent # ALTER (no-op once present) -> safe on the live prod DB. _ensure_column(conn, "jobs", "pid", "INTEGER") # ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL # unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows # (which have NULL delivery_id) never collide with each other. Restart-safe: # _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT # EXISTS is a no-op once the index exists, so this is safe on the live prod DB. _ensure_column(conn, "events", "delivery_id", "TEXT") conn.execute( "CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery " "ON events(delivery_id) WHERE delivery_id IS NOT NULL" ) # Feature 4 (token usage): per-run token / cost accounting. Parsed from the # claude --output-format json result by the launcher monitor. Idempotent # ALTERs (no-op once the columns exist) so this is safe on the live prod DB. _ensure_column(conn, "agent_runs", "input_tokens", "INTEGER") _ensure_column(conn, "agent_runs", "output_tokens", "INTEGER") _ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER") # Observability fix: also persist cache-CREATION input tokens. Claude CLI # reports the real input split across input_tokens (fresh, ~tens) + # cache_read_input_tokens (cache hit, millions) + cache_creation_input_tokens # (writing new cache). Without this column the cache_creation slice is lost # and the "X in" figure understates the true prompt size. Idempotent ALTER. _ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER") _ensure_column(conn, "agent_runs", "cost_usd", "REAL") # Telegram live tracker (feat/telegram-live-tracker): persist the FULL model # name (e.g. "tokenator/claude-opus-4-8") per agent_runs row so the tracker # can render a short model tag per stage. Parsed from the run-log result JSON # (modelUsage key) by the launcher monitor; NULL when unknown. Idempotent ALTER. _ensure_column(conn, "agent_runs", "model", "TEXT") # ORCH-087 (BR-EFF): persist the REAL --effort value sent to the Claude CLI per # agent_runs row (low|medium|high|xhigh|max) so the tracker can render the # resolved effort next to the model ("· opus-4-8 · xhigh"). Stamped in # launcher._spawn right after resolve_agent_effort; NULL when no --effort flag # was passed (resolved to "") or for historical rows. Idempotent ALTER. _ensure_column(conn, "agent_runs", "effort", "TEXT") # Telegram live tracker: one editable Telegram message per task. We store its # message_id so each stage transition can editMessageText the same message # instead of spamming a new one. Idempotent ALTER (safe on the live prod DB). _ensure_column(conn, "tasks", "tracker_message_id", "INTEGER") # Telegram live tracker: human-readable task title for the tracker header # ("🛠️ ET-012 · "). Populated from the Plane work-item name at task # creation; falls back to the work_item_id when absent. Idempotent ALTER. _ensure_column(conn, "tasks", "title", "TEXT") # Telegram live tracker: "BRD review" is the only HUMAN gate time — the delta # between "BRD ready / approve requested" and the analysis->architecture # advance (human flipped Plane to Approved). Persisted on the task so the # tracker can show "твоё время" without recomputing from activity history. _ensure_column(conn, "tasks", "brd_review_started_at", "TEXT") _ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT") # ORCH-090 (08-data-requirements.md): STOP-cancellation durable markers. Both are # additive, idempotent (_ensure_column is a no-op once present) -> safe on the live # shared prod DB (enduro untouched). The durable terminal itself is tasks.stage= # 'cancelled' (already understood by the reconciler terminal-skip); these columns # are audit/observability + the deferred-cancel signal. # cancelled_at -> timestamp the task was cancelled (NULL otherwise). # cancel_requested_at -> STOP arrived inside a critical merge/deploy window # (ADR-001 D7): cancellation is DEFERRED until the # irreversible step finishes honestly, then applied. _ensure_column(conn, "tasks", "cancelled_at", "TEXT") _ensure_column(conn, "tasks", "cancel_requested_at", "TEXT") # ORCH-026 (Level B): declarative task dependencies. job_deps stores the # directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The # scheduler gate in claim_next_job keeps B queued until every A reaches # tasks.stage='done'. Purely ADDITIVE (CREATE TABLE/INDEX IF NOT EXISTS, no # change to jobs/tasks/agent_runs/events columns) -> idempotent and safe on # the live shared prod DB (enduro-trails data untouched). The logical FK on # tasks.id is intentional (no REFERENCES, mirrors jobs.task_id) so the # migration cannot fail on a pre-existing DB. See 08-data-requirements.md. conn.executescript(""" CREATE TABLE IF NOT EXISTS job_deps ( task_id INTEGER NOT NULL, depends_on_task_id INTEGER NOT NULL, created_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (task_id, depends_on_task_id) ); CREATE INDEX IF NOT EXISTS idx_job_deps_task ON job_deps(task_id); CREATE INDEX IF NOT EXISTS idx_job_deps_depends ON job_deps(depends_on_task_id); """) # ORCH-087 (BR-G1, ADR-001 Р-1): authoritative ledger of EVERY tracker card # (Telegram message_id) ever created for a task. The scalar # tasks.tracker_message_id only ever knew the LAST mid, so any lost reference # (delete-fail+send-ok, race, restart) orphaned older cards forever. This # ledger lets every bump delete ALL still-open mids (deleted_at IS NULL), not # just the last one. tasks.tracker_message_id is KEPT (current-card pointer, # full BC). Purely ADDITIVE (CREATE TABLE/INDEX IF NOT EXISTS) -> idempotent, # restart-safe on the live shared prod DB (enduro-trails data untouched). The # logical FK on tasks.id is intentional (no REFERENCES, mirrors job_deps) so # the migration cannot fail on a pre-existing DB. See 08-data-requirements.md. conn.executescript(""" CREATE TABLE IF NOT EXISTS tracker_messages ( task_id INTEGER NOT NULL, message_id INTEGER NOT NULL, created_at TEXT DEFAULT (datetime('now')), deleted_at TEXT, PRIMARY KEY (task_id, message_id) ); CREATE INDEX IF NOT EXISTS idx_tracker_messages_open ON tracker_messages(task_id) WHERE deleted_at IS NULL; """) # ORCH-088 (FR-5, ADR-001 D2): durable per-repo rollback-freeze. After a # post-deploy DEGRADED verdict the repo is frozen so the serial gate stays # CLOSED unconditionally (the degraded task is already stage='done' — BR-7 — so # the ordinary active-task gate would not hold it) until an operator clears it # via POST /serial-gate/unfreeze. Append-only journal: an ACTIVE freeze for repo # R ⇔ a row with repo=R AND cleared_at IS NULL. Purely ADDITIVE (CREATE # TABLE/INDEX IF NOT EXISTS) -> idempotent, restart-safe on the live shared prod # DB (enduro-trails data untouched). See 08-data-requirements.md. conn.executescript(""" CREATE TABLE IF NOT EXISTS repo_freeze ( id INTEGER PRIMARY KEY AUTOINCREMENT, repo TEXT NOT NULL, frozen_at TEXT NOT NULL DEFAULT (datetime('now')), reason TEXT, work_item_id TEXT, cleared_at TEXT ); CREATE INDEX IF NOT EXISTS idx_repo_freeze_active ON repo_freeze (repo, cleared_at); """) conn.commit() conn.close() def _ensure_column(conn, table: str, column: str, decl: str): """Add a column to `table` if it does not already exist (idempotent migration).""" cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] if column not in cols: conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}") conn.commit() def get_task_by_plane_id(plane_id: str) -> dict | None: """Find task by Plane work item ID (checks plane_id and plane_issue_id).""" conn = get_db() row = conn.execute( "SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?", (plane_id, plane_id) ).fetchone() conn.close() if row: return dict(row) return None def get_task_by_repo_branch(repo: str, branch: str) -> dict | None: """Find task by repo and branch name.""" conn = get_db() row = conn.execute( "SELECT * FROM tasks WHERE repo = ? AND branch = ?", (repo, branch) ).fetchone() conn.close() if row: return dict(row) return None def get_active_tasks_for_reconcile() -> list[dict]: """ORCH-053 (F-1): tasks eligible for the gate-side sweeper. Returns every task whose stage is not terminal ('done'), each augmented with ``age_s`` = seconds since ``tasks.updated_at`` (computed in SQL against UTC 'now', matching how ``update_task_stage`` stamps ``updated_at``). The reconciler applies the per-stage grace and active-job guard on top. ORCH-090 (adr-0026): a ``cancelled`` task is DELIBERATELY still returned here and skipped by the reconciler's own terminal-skip (``stage in ('done','cancelled')``, ORCH-086 D2) — narrowing the query to exclude ``cancelled`` would lose the observability skip-counter increment that ORCH-086 relies on. The terminal set is harmonised in the *scheduler* predicates (serial_gate / task_deps), not here. """ conn = get_db() try: rows = conn.execute( "SELECT *, " "CAST(strftime('%s','now') - strftime('%s', updated_at) AS INTEGER) AS age_s " "FROM tasks WHERE stage != 'done'" ).fetchall() finally: conn.close() return [dict(r) for r in rows] def get_development_tasks_by_repo(repo: str) -> list[dict]: """ORCH-053 (F-3): tasks of a repo currently on the 'development' stage. Used as the sha->branch DB fallback in handle_ci_status: a CI-status webhook whose branch could not be resolved (no branches[], empty ``git branch -r --contains``) is matched to the unique development task of the repo (ambiguity -> caller leaves it unresolved). """ conn = get_db() try: rows = conn.execute( "SELECT * FROM tasks WHERE repo = ? AND stage = 'development'", (repo,) ).fetchall() finally: conn.close() return [dict(r) for r in rows] def create_task_atomic( plane_id: str, work_item_id: str, repo: str, branch: str, stage: str, title: str, ) -> tuple[dict, bool]: """ORCH-053 (AC-4): atomically claim creation of a task for a plane_id. Performs SELECT-exists -> INSERT under the process-wide ``_CREATE_TASK_LOCK`` so a race between the live Plane webhook and the F-2 reconciler (both seeing "no task yet" for the same plane_id) cannot create two task rows / branches / worktrees / starter analyst jobs. Returns ``(row, created)``: * ``created=True`` -> THIS caller inserted the row and owns the follow-up work (branch / docs / analyst enqueue); * ``created=False`` -> a task for this plane_id already existed (the other racer won); ``row`` is the existing task and the caller must NOT duplicate the follow-up work. """ with _CREATE_TASK_LOCK: conn = get_db() try: existing = conn.execute( "SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?", (plane_id, plane_id), ).fetchone() if existing: return dict(existing), False cur = conn.execute( "INSERT INTO tasks " "(plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (plane_id, work_item_id, repo, branch, stage, plane_id, title), ) conn.commit() row = conn.execute( "SELECT * FROM tasks WHERE id = ?", (cur.lastrowid,) ).fetchone() return dict(row), True finally: conn.close() def update_task_stage(task_id: int, stage: str): """Update task stage and timestamp.""" conn = get_db() conn.execute( "UPDATE tasks SET stage = ?, updated_at = datetime('now') WHERE id = ?", (stage, task_id), ) conn.commit() conn.close() # --------------------------------------------------------------------------- # Telegram live tracker helpers (feat/telegram-live-tracker) # --------------------------------------------------------------------------- def get_tracker_message_id(task_id: int) -> int | None: """Return the stored Telegram tracker message_id for a task, or None.""" conn = get_db() try: row = conn.execute( "SELECT tracker_message_id FROM tasks WHERE id=?", (task_id,) ).fetchone() finally: conn.close() return row[0] if row and row[0] is not None else None def set_tracker_message_id(task_id: int, message_id: int) -> None: """Persist the Telegram tracker message_id for a task (idempotent overwrite).""" conn = get_db() try: conn.execute( "UPDATE tasks SET tracker_message_id=? WHERE id=?", (message_id, task_id), ) conn.commit() finally: conn.close() # --------------------------------------------------------------------------- # ORCH-087 (BR-G1): tracker_messages ledger — full accounting of every card mid # --------------------------------------------------------------------------- def add_tracker_message(task_id: int, message_id: int) -> None: """ORCH-087: record a freshly-created tracker card mid in the ledger. Called ONLY after a successful send_telegram (new_mid is not None). INSERT OR IGNORE keeps it idempotent: a repeat mid (race / restart replay) does not duplicate the row or resurrect a deleted_at stamp. """ conn = get_db() try: conn.execute( "INSERT OR IGNORE INTO tracker_messages (task_id, message_id) " "VALUES (?, ?)", (task_id, message_id), ) conn.commit() finally: conn.close() def get_open_tracker_messages(task_id: int) -> list[int]: """ORCH-087: all still-open (deleted_at IS NULL) card mids for a task. These are the cards the next bump must clean up. Ordered oldest-first so the oldest orphans are deleted first. Never includes the rows already marked deleted. """ conn = get_db() try: rows = conn.execute( "SELECT message_id FROM tracker_messages " "WHERE task_id=? AND deleted_at IS NULL ORDER BY message_id ASC", (task_id,), ).fetchall() finally: conn.close() return [r[0] for r in rows] def mark_tracker_message_deleted(task_id: int, message_id: int) -> None: """ORCH-087: stamp deleted_at on a card mid that is confirmed gone. Called for mids that delete_telegram reported as gone (deleted now OR already gone / >48h per _DELETE_GONE_MARKERS) so they drop out of get_open_tracker_messages. Transient-delete mids are left untouched (NULL) for a retry on the next bump. """ conn = get_db() try: conn.execute( "UPDATE tracker_messages SET deleted_at=datetime('now') " "WHERE task_id=? AND message_id=? AND deleted_at IS NULL", (task_id, message_id), ) conn.commit() finally: conn.close() def mark_brd_review_started(task_id: int) -> None: """Stamp when BRD review (the human approve gate) started, if not already set. Idempotent: only sets it the first time (a retried analyst run must not reset the clock). The delta to brd_review_ended_at is the only "твоё время". """ conn = get_db() try: conn.execute( "UPDATE tasks SET brd_review_started_at=datetime('now') " "WHERE id=? AND brd_review_started_at IS NULL", (task_id,), ) conn.commit() finally: conn.close() def mark_brd_review_ended(task_id: int) -> None: """Stamp when BRD review ended (analysis->architecture advance / Approved). Idempotent: only sets it the first time and only if a start exists. """ conn = get_db() try: conn.execute( "UPDATE tasks SET brd_review_ended_at=datetime('now') " "WHERE id=? AND brd_review_started_at IS NOT NULL " "AND brd_review_ended_at IS NULL", (task_id,), ) conn.commit() finally: conn.close() def get_next_work_item_id(repo: str, prefix: str = "ET") -> str: """Generate next work item ID (e.g., ET-003 / ORCH-001). ORCH-6: numbering is per (repo, prefix). The prefix comes from the project registry (proj.work_item_prefix), so orchestrator issues number ORCH-001, ORCH-002 independently of the ET sequence in enduro-trails. Default prefix stays "ET" for backward compatibility with existing callers. """ conn = get_db() row = conn.execute( "SELECT work_item_id FROM tasks " "WHERE repo = ? AND work_item_id LIKE ? AND work_item_id IS NOT NULL " "ORDER BY id DESC LIMIT 1", (repo, f"{prefix}-%"), ).fetchone() conn.close() if row and row["work_item_id"]: # Parse <PREFIX>-003 -> 3, increment (keep the existing prefix). existing_prefix, num = row["work_item_id"].rsplit("-", 1) prefix = existing_prefix next_num = int(num) + 1 else: next_num = 1 return f"{prefix}-{next_num:03d}" def ensure_unique_work_item_id(work_item_id: str, repo: str) -> str: """BUG 2a: guarantee work_item_id uniqueness within (repo) over M-6 derive. M-6 derives the work_item_id from the Plane sequence_id. That number can collide (e.g. an issue was deleted and the sequence reused, or two issues map to the same number) -> the SAME ET-NNN gets handed to two different tasks, which then physically share a branch/worktree slug prefix and step on each other (see ET-006: task 8 and task 25). This is a guard LAYERED ON TOP of the M-6 derive (it does NOT replace it): given the derived id, if that exact <PREFIX>-NNN already exists in the tasks table for this repo, walk forward (ET-007, ET-008, ...) until a free number is found and return that instead. If the derived id is free, it is returned unchanged. """ if not work_item_id or "-" not in work_item_id: return work_item_id prefix, num_str = work_item_id.rsplit("-", 1) try: num = int(num_str) except ValueError: return work_item_id width = len(num_str) conn = get_db() try: candidate = work_item_id while conn.execute( "SELECT 1 FROM tasks WHERE repo = ? AND work_item_id = ? LIMIT 1", (repo, candidate), ).fetchone() is not None: num += 1 candidate = f"{prefix}-{num:0{width}d}" return candidate finally: conn.close() # --------------------------------------------------------------------------- # ORCH-5 (M-7): idempotent webhook event logging # --------------------------------------------------------------------------- def insert_event_dedup( source: str, event_type: str, payload: str, delivery_id: str ) -> bool: """Idempotently log a webhook event keyed by delivery_id. Returns True if a NEW row was inserted (caller should dispatch the event) and False if this delivery_id was already present (a duplicate delivery -> caller must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE index idx_events_delivery; rowcount==1 means the row was actually inserted. """ conn = get_db() try: cur = conn.execute( "INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) " "VALUES (?, ?, ?, ?)", (source, event_type, payload, delivery_id), ) conn.commit() return cur.rowcount == 1 finally: conn.close() # --------------------------------------------------------------------------- # ORCH-1 (F-2b): job queue helpers # --------------------------------------------------------------------------- def enqueue_job( agent: str, repo: str, task_content: str | None = None, task_id: int | None = None, max_attempts: int = 2, available_at_delay_s: int | None = None, ) -> int: """Enqueue a new job (status='queued'). Returns the new job id. This is what webhook handlers call instead of launching an agent in-process: it is a fast DB INSERT that returns immediately. The background worker (queue_worker) picks the job up later. ORCH-043 (merge-gate defer): when ``available_at_delay_s`` is given the job's ``available_at`` is set to ``now + delay`` so claim_next_job won't pick it up until the delay elapses (re-uses the existing ORCH-1 backoff gate). Used to re-queue the staging-deployer after a "merge-lock busy" defer without burning a worker slot in a blocking wait. """ conn = get_db() if available_at_delay_s is not None: cursor = conn.execute( "INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts, available_at) " "VALUES (?, ?, ?, ?, ?, datetime('now', ?))", (agent, repo, task_id, task_content, max_attempts, f"+{int(available_at_delay_s)} seconds"), ) else: cursor = conn.execute( "INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) " "VALUES (?, ?, ?, ?, ?)", (agent, repo, task_id, task_content, max_attempts), ) job_id = cursor.lastrowid conn.commit() conn.close() return job_id def claim_next_job() -> dict | None: """Atomically claim the oldest queued job and mark it 'running'. Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause and we check `rowcount`. If two worker ticks race for the same row, only the first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0 and retries the SELECT. We rely on SQLite's default per-connection transaction so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None when the queue is empty. """ # ORCH-026 (Level B, B-2): scheduler dependency gate. When task_deps_enabled # is on, a job whose task has an UNFINISHED declared dependency # (job_deps.depends_on_task_id -> a task with stage != 'done') is NOT # claimable -> it stays 'queued' without occupying a max_concurrency slot. # Jobs with a NULL task_id (no task) or with no job_deps rows are unaffected # (NOT EXISTS is True). Kill-switch off -> the clause is omitted -> 1:1 the # ORCH-1 query. The gate reads only the DB (offline-safe hot path). dep_gate = "" if getattr(settings, "task_deps_enabled", False): dep_gate = ( "AND NOT EXISTS (" " SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id " # ORCH-090 (adr-0026): a cancelled predecessor is TERMINAL -> the # dependent must NOT wait on it forever. Terminal set = {done,cancelled}. " WHERE d.task_id = jobs.task_id AND t.stage NOT IN ('done','cancelled')" ") " ) # ORCH-088 (FR-1, ADR-001 D1): per-repo serial gate. An analyst-job of a NEW # task is NOT claimable while the same repo has another unfinished task OR is # frozen. The fragment is built in the serial_gate leaf (sanitised repo scope, # fail-OPEN on any build error so a transient fault never wedges the queue of # ALL projects — AC-8). Jobs of an already-active task (architect/.../deployer) # are unaffected — the gate keys on jobs.agent='analyst' only. Reads only the # local DB (offline-safe hot path, NFR-2). serial_gate = "" try: from . import serial_gate as _serial_gate serial_gate = _serial_gate.build_claim_clause() except Exception: # noqa: BLE001 - fail-OPEN: never wedge the claim serial_gate = "" conn = get_db() try: while True: row = conn.execute( "SELECT id FROM jobs WHERE status='queued' " "AND (available_at IS NULL OR available_at <= datetime('now')) " f"{dep_gate}" f"{serial_gate}" "ORDER BY id LIMIT 1" ).fetchone() if not row: return None job_id = row["id"] cur = conn.execute( "UPDATE jobs SET status='running', " "attempts = attempts + 1, started_at = datetime('now') " "WHERE id = ? AND status='queued'", (job_id,), ) conn.commit() if cur.rowcount == 1: claimed = conn.execute( "SELECT * FROM jobs WHERE id = ?", (job_id,) ).fetchone() return dict(claimed) # Lost the race for this row; loop and try the next queued job. finally: conn.close() def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int, error: str | None = None) -> None: """ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net). Increments `transient_attempts` (separate from the code-fault `attempts`), sets status back to 'queued', and gates re-pickup via `available_at` = now + backoff seconds. started_at/finished_at are cleared. """ conn = get_db() sets = [ "status='queued'", "transient_attempts = transient_attempts + 1", "available_at = datetime('now', ?)", "started_at = NULL", "finished_at = NULL", ] params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"] if error is not None: sets.append("error = ?") params.append(error) params.append(job_id) conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params) conn.commit() conn.close() def mark_job( job_id: int, status: str, run_id: int | None = None, error: str | None = None, ): """Update a job's status (queued|running|done|failed|cancelled). - run_id (optional): link to the agent_runs row that executed this job. - error (optional): last error message (for failed/retry). - 'done'/'failed'/'cancelled' (ORCH-090) also stamp finished_at. - 'queued' (requeue for retry) clears started_at/finished_at so the next claim treats it as fresh. """ conn = get_db() sets = ["status = ?"] params: list = [status] if run_id is not None: sets.append("run_id = ?") params.append(run_id) if error is not None: sets.append("error = ?") params.append(error) if status in ("done", "failed", "cancelled"): sets.append("finished_at = datetime('now')") elif status == "queued": sets.append("started_at = NULL") sets.append("finished_at = NULL") params.append(job_id) conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params) conn.commit() conn.close() def has_active_job_for_task(task_id: int) -> bool: """True if the task already has a queued or running job. Used by the status-only verdict model (handle_status_start) to guard against double-launching an agent when a duplicate In Progress webhook arrives or a job is still in flight. The events de-dup absorbs identical webhook bodies; this guards against distinct webhooks while a job is pending/running. """ conn = get_db() row = conn.execute( "SELECT 1 FROM jobs WHERE task_id = ? AND status IN ('queued','running') LIMIT 1", (task_id,), ).fetchone() conn.close() return row is not None # --------------------------------------------------------------------------- # ORCH-090: STOP-cancellation helpers (task + jobs terminal state) # --------------------------------------------------------------------------- def get_task(task_id: int) -> dict | None: """Fetch a single task row by id (None when absent).""" conn = get_db() try: row = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone() finally: conn.close() return dict(row) if row else None def get_active_jobs_for_task(task_id: int) -> list[dict]: """ORCH-090: queued/running jobs of a task (for STOP — stop agent + cancel). Returns the full job rows (incl. ``pid`` / ``run_id`` / ``status``) so the cancel orchestrator can SIGTERM the running agent by ``jobs.pid`` and then flip every job to the terminal ``cancelled`` outcome. """ conn = get_db() try: rows = conn.execute( "SELECT * FROM jobs WHERE task_id = ? AND status IN ('queued','running') " "ORDER BY id", (task_id,), ).fetchall() finally: conn.close() return [dict(r) for r in rows] def cancel_jobs_for_task(task_id: int, only_queued: bool = False) -> int: """ORCH-090 (ADR-001 D3): flip a task's jobs to the terminal ``cancelled`` outcome. Guarded UPDATE over ``status IN ('queued','running')`` (or only ``'queued'`` when ``only_queued`` — the deferred-cancel path inside a critical merge/deploy window, D7, which must NOT cancel the still-running deploy/merge actor). ``cancelled`` is never requeued: ``claim_next_job`` only selects ``status='queued'`` and the reaper / worker check the task's terminal stage before any requeue. Returns the number of jobs cancelled. never-raise -> 0 on error. """ statuses = "('queued')" if only_queued else "('queued','running')" try: conn = get_db() try: cur = conn.execute( f"UPDATE jobs SET status='cancelled', finished_at=datetime('now') " f"WHERE task_id = ? AND status IN {statuses}", (task_id,), ) conn.commit() return cur.rowcount or 0 finally: conn.close() except Exception: return 0 def mark_task_cancelled(task_id: int) -> bool: """ORCH-090 (ADR-001 D4): durable terminal + natural-key tombstone for a task. Atomically (single UPDATE): * ``stage='cancelled'`` (durable terminal, understood by the reconciler skip); * ``cancelled_at=now``, ``cancel_requested_at=NULL`` (clear any deferred flag); * TOMBSTONE the natural keys so a later "To Analyse" re-creates the task FROM SCRATCH: ``plane_id`` / ``work_item_id`` / ``plane_issue_id`` get a deterministic ``#cancelled-<id>`` suffix -> ``get_task_by_plane_id`` returns None and the anti-dup / uniqueness guards no longer collide. The row is NOT deleted (durable audit). ADR-001 D4 refinement (ORCH-090): the ADR proposed keeping ``plane_issue_id`` untouched for audit, but ``get_task_by_plane_id`` / ``create_task_atomic`` match on ``plane_id OR plane_issue_id`` — leaving ``plane_issue_id`` matchable would keep the cancelled row "findable" and BLOCK the clean-slate re-create (BR-3 / TR-4). We therefore suffix it too; the ``#cancelled-<id>`` tag is deterministic and parseable, so the original Plane issue UUID (== the original ``plane_id`` in every create path) is still fully recoverable for audit. Idempotent-safe: the suffix is only appended when not already present (a repeat STOP on an already-cancelled row does not double-suffix). Returns True iff the row was updated. never-raise -> False on error. """ try: conn = get_db() try: row = conn.execute( "SELECT plane_id, work_item_id, plane_issue_id FROM tasks WHERE id = ?", (task_id,), ).fetchone() if not row: return False suffix = f"#cancelled-{task_id}" def _tomb(v): v = v or "" return v if suffix in v else f"{v}{suffix}" plane_id = _tomb(row["plane_id"]) work_item_id = _tomb(row["work_item_id"]) plane_issue_id = _tomb(row["plane_issue_id"]) conn.execute( "UPDATE tasks SET stage='cancelled', cancelled_at=datetime('now'), " "cancel_requested_at=NULL, plane_id=?, work_item_id=?, plane_issue_id=?, " "updated_at=datetime('now') WHERE id = ?", (plane_id, work_item_id, plane_issue_id, task_id), ) conn.commit() return True finally: conn.close() except Exception: return False def set_task_cancel_requested(task_id: int) -> bool: """ORCH-090 (ADR-001 D7): mark a deferred cancellation (STOP in critical window). Idempotent: only stamps ``cancel_requested_at`` the first time. Returns the **first-stamp fact** — ``True`` iff THIS call actually stamped the column (a repeated STOP while still deferred updates 0 rows -> ``False``), so the caller can suppress duplicate notifications (AC-6). The deterministic deploy/merge finalizer reads the column once the irreversible step completes and then applies the full cancellation. never-raise -> False on error. """ try: conn = get_db() try: cur = conn.execute( "UPDATE tasks SET cancel_requested_at=datetime('now') " "WHERE id = ? AND cancel_requested_at IS NULL", (task_id,), ) conn.commit() return cur.rowcount > 0 finally: conn.close() except Exception: return False def cancelled_tasks_snapshot(limit: int = 10) -> dict: """ORCH-090 (AC-10): read-only cancellation summary for GET /queue. Returns ``{count, pending, recent}`` where ``count`` is the number of cancelled tasks, ``pending`` the number with a deferred (not-yet-applied) cancellation, and ``recent`` the last ``limit`` cancelled tasks. never-raise -> minimal dict. """ try: conn = get_db() try: count = conn.execute( "SELECT COUNT(*) FROM tasks WHERE stage='cancelled'" ).fetchone()[0] pending = conn.execute( "SELECT COUNT(*) FROM tasks WHERE cancel_requested_at IS NOT NULL " "AND stage != 'cancelled'" ).fetchone()[0] recent = [ {"work_item_id": r["work_item_id"], "repo": r["repo"], "cancelled_at": r["cancelled_at"]} for r in conn.execute( "SELECT work_item_id, repo, cancelled_at FROM tasks " "WHERE stage='cancelled' ORDER BY cancelled_at DESC LIMIT ?", (limit,), ).fetchall() ] finally: conn.close() return {"count": int(count), "pending": int(pending), "recent": recent} except Exception: return {"count": 0, "pending": 0, "recent": []} def count_running_jobs() -> int: """Number of jobs currently in 'running' status (for max_concurrency).""" conn = get_db() n = conn.execute( "SELECT COUNT(*) FROM jobs WHERE status='running'" ).fetchone()[0] conn.close() return int(n) def requeue_running_jobs() -> int: """Queue-recovery: on startup, any job left 'running' belongs to a worker that died on restart -> put it back to 'queued'. attempts are kept as-is (the next claim does NOT re-increment beyond what is needed; claim_next_job increments on pickup). Returns the number of requeued jobs. """ conn = get_db() cur = conn.execute( "UPDATE jobs SET status='queued', started_at = NULL " "WHERE status='running'" ) conn.commit() n = cur.rowcount conn.close() return int(n) def get_running_jobs() -> list[dict]: """ORCH-065: snapshot of every 'running' job for the job-reaper scan. Each row carries the job columns plus four reaper inputs: * ``running_age_s`` — seconds since ``started_at`` (Tier-3 backstop); * ``exit_code`` — the linked ``agent_runs.exit_code`` (Tier-2: process finished but the job is still 'running' -> monitor died mid-finalize); * ``finished_at_run`` — the linked ``agent_runs.finished_at``; * ``finished_age_s`` — seconds since ``agent_runs.finished_at`` (Tier-2 finalization grace: a LIVE monitor writes exit_code, THEN does git push / PR / Plane comments before _finalize_job, so a freshly-finished run is NOT yet a zombie — the reaper waits ``reaper_finalize_grace_s``). A LEFT JOIN on ``run_id`` keeps jobs with no agent_runs row (exit_code NULL). Read-only; never mutates. The reaper applies liveness/streak/backstop on top. """ conn = get_db() try: rows = conn.execute( "SELECT j.*, " "CAST(strftime('%s','now') - strftime('%s', j.started_at) AS INTEGER) " " AS running_age_s, " "r.exit_code AS exit_code, r.finished_at AS finished_at_run, " "CAST(strftime('%s','now') - strftime('%s', r.finished_at) AS INTEGER) " " AS finished_age_s " "FROM jobs j LEFT JOIN agent_runs r ON r.id = j.run_id " "WHERE j.status='running'" ).fetchall() finally: conn.close() return [dict(r) for r in rows] def reap_running_job( job_id: int, status: str, run_id: int | None = None, error: str | None = None, ) -> bool: """ORCH-065: atomic terminal flip of a RUNNING job by the job-reaper. Mirrors ``mark_job`` but carries the ``status='running'`` guard in the WHERE clause and reports ``rowcount`` so a late-arriving monitor / the startup ``requeue_running_jobs`` / a second reaper tick can never double-process the same row (AC-5, restart-safe). Returns True iff THIS call won the flip (rowcount == 1); False -> someone else already moved the row. Status semantics match ``mark_job``: done/failed stamp ``finished_at``; queued clears ``started_at``/``finished_at`` so the next claim treats it as fresh. """ conn = get_db() try: sets = ["status = ?"] params: list = [status] if run_id is not None: sets.append("run_id = ?") params.append(run_id) if error is not None: sets.append("error = ?") params.append(error) if status in ("done", "failed", "cancelled"): # ORCH-090: cancelled is terminal sets.append("finished_at = datetime('now')") elif status == "queued": sets.append("started_at = NULL") sets.append("finished_at = NULL") params.append(job_id) cur = conn.execute( f"UPDATE jobs SET {', '.join(sets)} WHERE id = ? AND status='running'", params, ) conn.commit() return cur.rowcount == 1 finally: conn.close() def get_job(job_id: int) -> dict | None: """Fetch a single job by id.""" conn = get_db() row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone() conn.close() return dict(row) if row else None def job_status_counts() -> dict: """Return counts grouped by status (for /queue observability).""" conn = get_db() rows = conn.execute( "SELECT status, COUNT(*) AS n FROM jobs GROUP BY status" ).fetchall() conn.close() counts = {"queued": 0, "running": 0, "done": 0, "failed": 0} for r in rows: counts[r["status"]] = r["n"] return counts def recent_jobs(limit: int = 10) -> list[dict]: """Return the most recent jobs (for /queue observability).""" conn = get_db() rows = conn.execute( "SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,) ).fetchall() conn.close() return [dict(r) for r in rows] # --------------------------------------------------------------------------- # ORCH-026 (Level B): declarative task-dependency helpers # --------------------------------------------------------------------------- def add_dependency(task_id: int, depends_on_task_id: int) -> bool: """Declare that task ``task_id`` (B) is blocked-by ``depends_on_task_id`` (A). Idempotent INSERT OR IGNORE against the job_deps PK (re-declaring the same edge is a no-op). A self-edge (task depends on itself) is rejected — it would deadlock the task forever and can never be satisfied. never-raise (self-hosting safety, AC-G1): any DB error -> returns False, the caller must not crash the webhook / worker. Returns True iff a NEW edge row was inserted. """ if task_id is None or depends_on_task_id is None: return False if task_id == depends_on_task_id: return False try: conn = get_db() try: cur = conn.execute( "INSERT OR IGNORE INTO job_deps (task_id, depends_on_task_id) " "VALUES (?, ?)", (task_id, depends_on_task_id), ) conn.commit() return cur.rowcount == 1 finally: conn.close() except Exception: return False def get_dependencies(task_id: int) -> list[int]: """Return the list of depends_on_task_id (A) that ``task_id`` (B) waits for. never-raise: any DB error -> [] (conservative: caller treats the task as having no declared dependency rather than crashing). """ try: conn = get_db() try: rows = conn.execute( "SELECT depends_on_task_id FROM job_deps WHERE task_id = ?", (task_id,), ).fetchall() finally: conn.close() return [r[0] for r in rows] except Exception: return [] def get_dependency_edges() -> list[tuple[int, int]]: """Return ALL declared edges as ``(task_id, depends_on_task_id)`` tuples. Used by the cycle detector (DFS over the whole declared graph) and the /queue snapshot. never-raise -> [] on any DB error. """ try: conn = get_db() try: rows = conn.execute( "SELECT task_id, depends_on_task_id FROM job_deps" ).fetchall() finally: conn.close() return [(r[0], r[1]) for r in rows] except Exception: return [] def get_unfinished_dependencies(task_id: int) -> list[dict]: """Return the UNFINISHED dependencies of ``task_id`` (A's not yet 'done'). Each dict carries the predecessor's ``id``, ``work_item_id`` and ``stage`` so the readiness gate / Telegram waiting-line can name what B is waiting for. never-raise -> [] on any DB error (treated as "ready", consistent with the scheduler omitting the gate on failure). """ try: conn = get_db() try: rows = conn.execute( "SELECT t.id AS id, t.work_item_id AS work_item_id, t.stage AS stage " "FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id " # ORCH-090 (adr-0026): {done,cancelled} are both terminal -> a # cancelled predecessor no longer blocks the dependent. "WHERE d.task_id = ? AND t.stage NOT IN ('done','cancelled')", (task_id,), ).fetchall() finally: conn.close() return [dict(r) for r in rows] except Exception: return [] # --------------------------------------------------------------------------- # ORCH-1b (resilience): transient backoff helpers # --------------------------------------------------------------------------- def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None): """ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure. Unlike a code-fault requeue, this: - increments `transient_attempts` (a separate budget from code-fault attempts) - sets `available_at = now + delay_seconds` so claim_next_job won't pick it up until the backoff window elapses - sets status back to 'queued' and clears started_at/finished_at delay_seconds is computed by the caller (exp backoff, capped, Retry-After). """ conn = get_db() conn.execute( "UPDATE jobs SET status='queued', " "transient_attempts = transient_attempts + 1, " "available_at = datetime('now', ? || ' seconds'), " "started_at = NULL, finished_at = NULL, " "error = COALESCE(?, error) " "WHERE id = ?", (f"+{int(round(delay_seconds))}", error, job_id), ) conn.commit() conn.close() def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float: """ORCH-1b: exponential backoff (seconds) for a transient failure. delay = min(2**transient_attempts * base, max). If the server sent a Retry-After hint we honour it as a floor (use the larger of the two so we never poll sooner than the server asked). `transient_attempts` is the count AFTER this failure (i.e. how many transient failures have occurred), so the first backoff uses 2**1. """ base = getattr(settings, "backoff_base_seconds", 10) cap = getattr(settings, "backoff_max_seconds", 600) exp = min((2 ** max(transient_attempts, 0)) * base, cap) if retry_after is not None and retry_after > 0: return float(min(max(exp, retry_after), cap)) return float(exp)