import sqlite3 from .config import settings def get_db() -> sqlite3.Connection: conn = sqlite3.connect(settings.db_path) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT DEFAULT (datetime('now')), source TEXT NOT NULL, event_type TEXT NOT NULL, payload TEXT NOT NULL, processed INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, plane_id TEXT, work_item_id TEXT, repo TEXT NOT NULL, branch TEXT, stage TEXT DEFAULT 'created', agent_running TEXT, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')), plane_issue_id TEXT ); CREATE TABLE IF NOT EXISTS agent_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id INTEGER REFERENCES tasks(id), agent TEXT NOT NULL, started_at TEXT DEFAULT (datetime('now')), finished_at TEXT, exit_code INTEGER, output_path TEXT ); -- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and -- return immediately; a background worker claims jobs (respecting -- max_concurrency), spawns the claude agent, and updates the status. -- Restart-safe: running jobs are requeued on startup (queue-recovery). CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent TEXT NOT NULL, repo TEXT NOT NULL, task_id INTEGER, -- FK tasks.id (nullable) task_content TEXT, -- written to the agent task_file status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed attempts INTEGER NOT NULL DEFAULT 0, max_attempts INTEGER NOT NULL DEFAULT 2, run_id INTEGER, -- agent_runs.id once started error TEXT, -- last error message transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now) created_at TEXT DEFAULT (datetime('now')), started_at TEXT, finished_at TEXT ); CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id); """) # Lightweight migration: add resilience columns to a pre-existing jobs table # (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table). _ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0") _ensure_column(conn, "jobs", "available_at", "TEXT") # ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL # unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows # (which have NULL delivery_id) never collide with each other. Restart-safe: # _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT # EXISTS is a no-op once the index exists, so this is safe on the live prod DB. _ensure_column(conn, "events", "delivery_id", "TEXT") conn.execute( "CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery " "ON events(delivery_id) WHERE delivery_id IS NOT NULL" ) # Feature 4 (token usage): per-run token / cost accounting. Parsed from the # claude --output-format json result by the launcher monitor. Idempotent # ALTERs (no-op once the columns exist) so this is safe on the live prod DB. _ensure_column(conn, "agent_runs", "input_tokens", "INTEGER") _ensure_column(conn, "agent_runs", "output_tokens", "INTEGER") _ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER") # Observability fix: also persist cache-CREATION input tokens. Claude CLI # reports the real input split across input_tokens (fresh, ~tens) + # cache_read_input_tokens (cache hit, millions) + cache_creation_input_tokens # (writing new cache). Without this column the cache_creation slice is lost # and the "X in" figure understates the true prompt size. Idempotent ALTER. _ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER") _ensure_column(conn, "agent_runs", "cost_usd", "REAL") conn.commit() conn.close() def _ensure_column(conn, table: str, column: str, decl: str): """Add a column to `table` if it does not already exist (idempotent migration).""" cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] if column not in cols: conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}") conn.commit() def get_task_by_plane_id(plane_id: str) -> dict | None: """Find task by Plane work item ID (checks plane_id and plane_issue_id).""" conn = get_db() row = conn.execute( "SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?", (plane_id, plane_id) ).fetchone() conn.close() if row: return dict(row) return None def get_task_by_repo_branch(repo: str, branch: str) -> dict | None: """Find task by repo and branch name.""" conn = get_db() row = conn.execute( "SELECT * FROM tasks WHERE repo = ? AND branch = ?", (repo, branch) ).fetchone() conn.close() if row: return dict(row) return None def update_task_stage(task_id: int, stage: str): """Update task stage and timestamp.""" conn = get_db() conn.execute( "UPDATE tasks SET stage = ?, updated_at = datetime('now') WHERE id = ?", (stage, task_id), ) conn.commit() conn.close() def get_next_work_item_id(repo: str, prefix: str = "ET") -> str: """Generate next work item ID (e.g., ET-003 / ORCH-001). ORCH-6: numbering is per (repo, prefix). The prefix comes from the project registry (proj.work_item_prefix), so orchestrator issues number ORCH-001, ORCH-002 independently of the ET sequence in enduro-trails. Default prefix stays "ET" for backward compatibility with existing callers. """ conn = get_db() row = conn.execute( "SELECT work_item_id FROM tasks " "WHERE repo = ? AND work_item_id LIKE ? AND work_item_id IS NOT NULL " "ORDER BY id DESC LIMIT 1", (repo, f"{prefix}-%"), ).fetchone() conn.close() if row and row["work_item_id"]: # Parse -003 -> 3, increment (keep the existing prefix). existing_prefix, num = row["work_item_id"].rsplit("-", 1) prefix = existing_prefix next_num = int(num) + 1 else: next_num = 1 return f"{prefix}-{next_num:03d}" def ensure_unique_work_item_id(work_item_id: str, repo: str) -> str: """BUG 2a: guarantee work_item_id uniqueness within (repo) over M-6 derive. M-6 derives the work_item_id from the Plane sequence_id. That number can collide (e.g. an issue was deleted and the sequence reused, or two issues map to the same number) -> the SAME ET-NNN gets handed to two different tasks, which then physically share a branch/worktree slug prefix and step on each other (see ET-006: task 8 and task 25). This is a guard LAYERED ON TOP of the M-6 derive (it does NOT replace it): given the derived id, if that exact -NNN already exists in the tasks table for this repo, walk forward (ET-007, ET-008, ...) until a free number is found and return that instead. If the derived id is free, it is returned unchanged. """ if not work_item_id or "-" not in work_item_id: return work_item_id prefix, num_str = work_item_id.rsplit("-", 1) try: num = int(num_str) except ValueError: return work_item_id width = len(num_str) conn = get_db() try: candidate = work_item_id while conn.execute( "SELECT 1 FROM tasks WHERE repo = ? AND work_item_id = ? LIMIT 1", (repo, candidate), ).fetchone() is not None: num += 1 candidate = f"{prefix}-{num:0{width}d}" return candidate finally: conn.close() # --------------------------------------------------------------------------- # ORCH-5 (M-7): idempotent webhook event logging # --------------------------------------------------------------------------- def insert_event_dedup( source: str, event_type: str, payload: str, delivery_id: str ) -> bool: """Idempotently log a webhook event keyed by delivery_id. Returns True if a NEW row was inserted (caller should dispatch the event) and False if this delivery_id was already present (a duplicate delivery -> caller must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE index idx_events_delivery; rowcount==1 means the row was actually inserted. """ conn = get_db() try: cur = conn.execute( "INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) " "VALUES (?, ?, ?, ?)", (source, event_type, payload, delivery_id), ) conn.commit() return cur.rowcount == 1 finally: conn.close() # --------------------------------------------------------------------------- # ORCH-1 (F-2b): job queue helpers # --------------------------------------------------------------------------- def enqueue_job( agent: str, repo: str, task_content: str | None = None, task_id: int | None = None, max_attempts: int = 2, ) -> int: """Enqueue a new job (status='queued'). Returns the new job id. This is what webhook handlers call instead of launching an agent in-process: it is a fast DB INSERT that returns immediately. The background worker (queue_worker) picks the job up later. """ conn = get_db() cursor = conn.execute( "INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) " "VALUES (?, ?, ?, ?, ?)", (agent, repo, task_id, task_content, max_attempts), ) job_id = cursor.lastrowid conn.commit() conn.close() return job_id def claim_next_job() -> dict | None: """Atomically claim the oldest queued job and mark it 'running'. Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause and we check `rowcount`. If two worker ticks race for the same row, only the first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0 and retries the SELECT. We rely on SQLite's default per-connection transaction so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None when the queue is empty. """ conn = get_db() try: while True: row = conn.execute( "SELECT id FROM jobs WHERE status='queued' " "AND (available_at IS NULL OR available_at <= datetime('now')) " "ORDER BY id LIMIT 1" ).fetchone() if not row: return None job_id = row["id"] cur = conn.execute( "UPDATE jobs SET status='running', " "attempts = attempts + 1, started_at = datetime('now') " "WHERE id = ? AND status='queued'", (job_id,), ) conn.commit() if cur.rowcount == 1: claimed = conn.execute( "SELECT * FROM jobs WHERE id = ?", (job_id,) ).fetchone() return dict(claimed) # Lost the race for this row; loop and try the next queued job. finally: conn.close() def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int, error: str | None = None) -> None: """ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net). Increments `transient_attempts` (separate from the code-fault `attempts`), sets status back to 'queued', and gates re-pickup via `available_at` = now + backoff seconds. started_at/finished_at are cleared. """ conn = get_db() sets = [ "status='queued'", "transient_attempts = transient_attempts + 1", "available_at = datetime('now', ?)", "started_at = NULL", "finished_at = NULL", ] params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"] if error is not None: sets.append("error = ?") params.append(error) params.append(job_id) conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params) conn.commit() conn.close() def mark_job( job_id: int, status: str, run_id: int | None = None, error: str | None = None, ): """Update a job's status (queued|running|done|failed). - run_id (optional): link to the agent_runs row that executed this job. - error (optional): last error message (for failed/retry). - 'done'/'failed' also stamp finished_at. - 'queued' (requeue for retry) clears started_at/finished_at so the next claim treats it as fresh. """ conn = get_db() sets = ["status = ?"] params: list = [status] if run_id is not None: sets.append("run_id = ?") params.append(run_id) if error is not None: sets.append("error = ?") params.append(error) if status in ("done", "failed"): sets.append("finished_at = datetime('now')") elif status == "queued": sets.append("started_at = NULL") sets.append("finished_at = NULL") params.append(job_id) conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params) conn.commit() conn.close() def has_active_job_for_task(task_id: int) -> bool: """True if the task already has a queued or running job. Used by the status-only verdict model (handle_status_start) to guard against double-launching an agent when a duplicate In Progress webhook arrives or a job is still in flight. The events de-dup absorbs identical webhook bodies; this guards against distinct webhooks while a job is pending/running. """ conn = get_db() row = conn.execute( "SELECT 1 FROM jobs WHERE task_id = ? AND status IN ('queued','running') LIMIT 1", (task_id,), ).fetchone() conn.close() return row is not None def count_running_jobs() -> int: """Number of jobs currently in 'running' status (for max_concurrency).""" conn = get_db() n = conn.execute( "SELECT COUNT(*) FROM jobs WHERE status='running'" ).fetchone()[0] conn.close() return int(n) def requeue_running_jobs() -> int: """Queue-recovery: on startup, any job left 'running' belongs to a worker that died on restart -> put it back to 'queued'. attempts are kept as-is (the next claim does NOT re-increment beyond what is needed; claim_next_job increments on pickup). Returns the number of requeued jobs. """ conn = get_db() cur = conn.execute( "UPDATE jobs SET status='queued', started_at = NULL " "WHERE status='running'" ) conn.commit() n = cur.rowcount conn.close() return int(n) def get_job(job_id: int) -> dict | None: """Fetch a single job by id.""" conn = get_db() row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone() conn.close() return dict(row) if row else None def job_status_counts() -> dict: """Return counts grouped by status (for /queue observability).""" conn = get_db() rows = conn.execute( "SELECT status, COUNT(*) AS n FROM jobs GROUP BY status" ).fetchall() conn.close() counts = {"queued": 0, "running": 0, "done": 0, "failed": 0} for r in rows: counts[r["status"]] = r["n"] return counts def recent_jobs(limit: int = 10) -> list[dict]: """Return the most recent jobs (for /queue observability).""" conn = get_db() rows = conn.execute( "SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,) ).fetchall() conn.close() return [dict(r) for r in rows] # --------------------------------------------------------------------------- # ORCH-1b (resilience): transient backoff helpers # --------------------------------------------------------------------------- def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None): """ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure. Unlike a code-fault requeue, this: - increments `transient_attempts` (a separate budget from code-fault attempts) - sets `available_at = now + delay_seconds` so claim_next_job won't pick it up until the backoff window elapses - sets status back to 'queued' and clears started_at/finished_at delay_seconds is computed by the caller (exp backoff, capped, Retry-After). """ conn = get_db() conn.execute( "UPDATE jobs SET status='queued', " "transient_attempts = transient_attempts + 1, " "available_at = datetime('now', ? || ' seconds'), " "started_at = NULL, finished_at = NULL, " "error = COALESCE(?, error) " "WHERE id = ?", (f"+{int(round(delay_seconds))}", error, job_id), ) conn.commit() conn.close() def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float: """ORCH-1b: exponential backoff (seconds) for a transient failure. delay = min(2**transient_attempts * base, max). If the server sent a Retry-After hint we honour it as a floor (use the larger of the two so we never poll sooner than the server asked). `transient_attempts` is the count AFTER this failure (i.e. how many transient failures have occurred), so the first backoff uses 2**1. """ base = getattr(settings, "backoff_base_seconds", 10) cap = getattr(settings, "backoff_max_seconds", 600) exp = min((2 ** max(transient_attempts, 0)) * base, cap) if retry_after is not None and retry_after > 0: return float(min(max(exp, retry_after), cap)) return float(exp)