Files
orchestrator/src/db.py
Dev Agent 0cd9b11fe0 feat(queue): resilience schema + backoff helper + config (ORCH-1)
jobs.transient_attempts + available_at columns (idempotent _ensure_column
migration); claim_next_job honours available_at; mark_job_transient (backoff
requeue with separate transient budget). Config: preflight_cache_ttl,
backoff_base/max_seconds, transient_max_attempts, breaker_threshold,
breaker_pause_seconds.
2026-06-03 00:12:17 +03:00

374 lines
13 KiB
Python

import sqlite3
from .config import settings
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(settings.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now')),
source TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
processed INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plane_id TEXT,
work_item_id TEXT,
repo TEXT NOT NULL,
branch TEXT,
stage TEXT DEFAULT 'created',
agent_running TEXT,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
plane_issue_id TEXT
);
CREATE TABLE IF NOT EXISTS agent_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER REFERENCES tasks(id),
agent TEXT NOT NULL,
started_at TEXT DEFAULT (datetime('now')),
finished_at TEXT,
exit_code INTEGER,
output_path TEXT
);
-- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and
-- return immediately; a background worker claims jobs (respecting
-- max_concurrency), spawns the claude agent, and updates the status.
-- Restart-safe: running jobs are requeued on startup (queue-recovery).
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
repo TEXT NOT NULL,
task_id INTEGER, -- FK tasks.id (nullable)
task_content TEXT, -- written to the agent task_file
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 2,
run_id INTEGER, -- agent_runs.id once started
error TEXT, -- last error message
transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries
available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now)
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
finished_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
""")
# Lightweight migration: add resilience columns to a pre-existing jobs table
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
_ensure_column(conn, "jobs", "available_at", "TEXT")
conn.close()
def _ensure_column(conn, table: str, column: str, decl: str):
"""Add a column to `table` if it does not already exist (idempotent migration)."""
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
if column not in cols:
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}")
conn.commit()
def get_task_by_plane_id(plane_id: str) -> dict | None:
"""Find task by Plane work item ID (checks plane_id and plane_issue_id)."""
conn = get_db()
row = conn.execute(
"SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?", (plane_id, plane_id)
).fetchone()
conn.close()
if row:
return dict(row)
return None
def get_task_by_repo_branch(repo: str, branch: str) -> dict | None:
"""Find task by repo and branch name."""
conn = get_db()
row = conn.execute(
"SELECT * FROM tasks WHERE repo = ? AND branch = ?", (repo, branch)
).fetchone()
conn.close()
if row:
return dict(row)
return None
def update_task_stage(task_id: int, stage: str):
"""Update task stage and timestamp."""
conn = get_db()
conn.execute(
"UPDATE tasks SET stage = ?, updated_at = datetime('now') WHERE id = ?",
(stage, task_id),
)
conn.commit()
conn.close()
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
"""Generate next work item ID (e.g., ET-003 / ORCH-001).
ORCH-6: numbering is per (repo, prefix). The prefix comes from the project
registry (proj.work_item_prefix), so orchestrator issues number ORCH-001,
ORCH-002 independently of the ET sequence in enduro-trails. Default prefix
stays "ET" for backward compatibility with existing callers.
"""
conn = get_db()
row = conn.execute(
"SELECT work_item_id FROM tasks "
"WHERE repo = ? AND work_item_id LIKE ? AND work_item_id IS NOT NULL "
"ORDER BY id DESC LIMIT 1",
(repo, f"{prefix}-%"),
).fetchone()
conn.close()
if row and row["work_item_id"]:
# Parse <PREFIX>-003 -> 3, increment (keep the existing prefix).
existing_prefix, num = row["work_item_id"].rsplit("-", 1)
prefix = existing_prefix
next_num = int(num) + 1
else:
next_num = 1
return f"{prefix}-{next_num:03d}"
# ---------------------------------------------------------------------------
# ORCH-1 (F-2b): job queue helpers
# ---------------------------------------------------------------------------
def enqueue_job(
agent: str,
repo: str,
task_content: str | None = None,
task_id: int | None = None,
max_attempts: int = 2,
) -> int:
"""Enqueue a new job (status='queued'). Returns the new job id.
This is what webhook handlers call instead of launching an agent in-process:
it is a fast DB INSERT that returns immediately. The background worker
(queue_worker) picks the job up later.
"""
conn = get_db()
cursor = conn.execute(
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) "
"VALUES (?, ?, ?, ?, ?)",
(agent, repo, task_id, task_content, max_attempts),
)
job_id = cursor.lastrowid
conn.commit()
conn.close()
return job_id
def claim_next_job() -> dict | None:
"""Atomically claim the oldest queued job and mark it 'running'.
Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause
and we check `rowcount`. If two worker ticks race for the same row, only the
first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0
and retries the SELECT. We rely on SQLite's default per-connection transaction
so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None
when the queue is empty.
"""
conn = get_db()
try:
while True:
row = conn.execute(
"SELECT id FROM jobs WHERE status='queued' "
"AND (available_at IS NULL OR available_at <= datetime('now')) "
"ORDER BY id LIMIT 1"
).fetchone()
if not row:
return None
job_id = row["id"]
cur = conn.execute(
"UPDATE jobs SET status='running', "
"attempts = attempts + 1, started_at = datetime('now') "
"WHERE id = ? AND status='queued'",
(job_id,),
)
conn.commit()
if cur.rowcount == 1:
claimed = conn.execute(
"SELECT * FROM jobs WHERE id = ?", (job_id,)
).fetchone()
return dict(claimed)
# Lost the race for this row; loop and try the next queued job.
finally:
conn.close()
def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int,
error: str | None = None) -> None:
"""ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net).
Increments `transient_attempts` (separate from the code-fault `attempts`),
sets status back to 'queued', and gates re-pickup via `available_at` =
now + backoff seconds. started_at/finished_at are cleared.
"""
conn = get_db()
sets = [
"status='queued'",
"transient_attempts = transient_attempts + 1",
"available_at = datetime('now', ?)",
"started_at = NULL",
"finished_at = NULL",
]
params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"]
if error is not None:
sets.append("error = ?")
params.append(error)
params.append(job_id)
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
conn.commit()
conn.close()
def mark_job(
job_id: int,
status: str,
run_id: int | None = None,
error: str | None = None,
):
"""Update a job's status (queued|running|done|failed).
- run_id (optional): link to the agent_runs row that executed this job.
- error (optional): last error message (for failed/retry).
- 'done'/'failed' also stamp finished_at.
- 'queued' (requeue for retry) clears started_at/finished_at so the next
claim treats it as fresh.
"""
conn = get_db()
sets = ["status = ?"]
params: list = [status]
if run_id is not None:
sets.append("run_id = ?")
params.append(run_id)
if error is not None:
sets.append("error = ?")
params.append(error)
if status in ("done", "failed"):
sets.append("finished_at = datetime('now')")
elif status == "queued":
sets.append("started_at = NULL")
sets.append("finished_at = NULL")
params.append(job_id)
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
conn.commit()
conn.close()
def count_running_jobs() -> int:
"""Number of jobs currently in 'running' status (for max_concurrency)."""
conn = get_db()
n = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE status='running'"
).fetchone()[0]
conn.close()
return int(n)
def requeue_running_jobs() -> int:
"""Queue-recovery: on startup, any job left 'running' belongs to a worker that
died on restart -> put it back to 'queued'. attempts are kept as-is (the next
claim does NOT re-increment beyond what is needed; claim_next_job increments on
pickup). Returns the number of requeued jobs.
"""
conn = get_db()
cur = conn.execute(
"UPDATE jobs SET status='queued', started_at = NULL "
"WHERE status='running'"
)
conn.commit()
n = cur.rowcount
conn.close()
return int(n)
def get_job(job_id: int) -> dict | None:
"""Fetch a single job by id."""
conn = get_db()
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
conn.close()
return dict(row) if row else None
def job_status_counts() -> dict:
"""Return counts grouped by status (for /queue observability)."""
conn = get_db()
rows = conn.execute(
"SELECT status, COUNT(*) AS n FROM jobs GROUP BY status"
).fetchall()
conn.close()
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0}
for r in rows:
counts[r["status"]] = r["n"]
return counts
def recent_jobs(limit: int = 10) -> list[dict]:
"""Return the most recent jobs (for /queue observability)."""
conn = get_db()
rows = conn.execute(
"SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
# ---------------------------------------------------------------------------
# ORCH-1b (resilience): transient backoff helpers
# ---------------------------------------------------------------------------
def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None):
"""ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure.
Unlike a code-fault requeue, this:
- increments `transient_attempts` (a separate budget from code-fault attempts)
- sets `available_at = now + delay_seconds` so claim_next_job won't pick it
up until the backoff window elapses
- sets status back to 'queued' and clears started_at/finished_at
delay_seconds is computed by the caller (exp backoff, capped, Retry-After).
"""
conn = get_db()
conn.execute(
"UPDATE jobs SET status='queued', "
"transient_attempts = transient_attempts + 1, "
"available_at = datetime('now', ? || ' seconds'), "
"started_at = NULL, finished_at = NULL, "
"error = COALESCE(?, error) "
"WHERE id = ?",
(f"+{int(round(delay_seconds))}", error, job_id),
)
conn.commit()
conn.close()
def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float:
"""ORCH-1b: exponential backoff (seconds) for a transient failure.
delay = min(2**transient_attempts * base, max). If the server sent a
Retry-After hint we honour it as a floor (use the larger of the two so we
never poll sooner than the server asked).
`transient_attempts` is the count AFTER this failure (i.e. how many transient
failures have occurred), so the first backoff uses 2**1.
"""
base = getattr(settings, "backoff_base_seconds", 10)
cap = getattr(settings, "backoff_max_seconds", 600)
exp = min((2 ** max(transient_attempts, 0)) * base, cap)
if retry_after is not None and retry_after > 0:
return float(min(max(exp, retry_after), cap))
return float(exp)