feat(queue): resilience schema + backoff helper + config (ORCH-1)
jobs.transient_attempts + available_at columns (idempotent _ensure_column migration); claim_next_job honours available_at; mark_job_transient (backoff requeue with separate transient budget). Config: preflight_cache_ttl, backoff_base/max_seconds, transient_max_attempts, breaker_threshold, breaker_pause_seconds.
This commit is contained in:
91
src/db.py
91
src/db.py
@@ -55,15 +55,29 @@ def init_db():
|
||||
max_attempts INTEGER NOT NULL DEFAULT 2,
|
||||
run_id INTEGER, -- agent_runs.id once started
|
||||
error TEXT, -- last error message
|
||||
transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries
|
||||
available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now)
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
started_at TEXT,
|
||||
finished_at TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
|
||||
""")
|
||||
# Lightweight migration: add resilience columns to a pre-existing jobs table
|
||||
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
|
||||
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
|
||||
_ensure_column(conn, "jobs", "available_at", "TEXT")
|
||||
conn.close()
|
||||
|
||||
|
||||
def _ensure_column(conn, table: str, column: str, decl: str):
|
||||
"""Add a column to `table` if it does not already exist (idempotent migration)."""
|
||||
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
|
||||
if column not in cols:
|
||||
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}")
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_task_by_plane_id(plane_id: str) -> dict | None:
|
||||
"""Find task by Plane work item ID (checks plane_id and plane_issue_id)."""
|
||||
conn = get_db()
|
||||
@@ -170,7 +184,9 @@ def claim_next_job() -> dict | None:
|
||||
try:
|
||||
while True:
|
||||
row = conn.execute(
|
||||
"SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1"
|
||||
"SELECT id FROM jobs WHERE status='queued' "
|
||||
"AND (available_at IS NULL OR available_at <= datetime('now')) "
|
||||
"ORDER BY id LIMIT 1"
|
||||
).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
@@ -192,6 +208,32 @@ def claim_next_job() -> dict | None:
|
||||
conn.close()
|
||||
|
||||
|
||||
def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int,
|
||||
error: str | None = None) -> None:
|
||||
"""ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net).
|
||||
|
||||
Increments `transient_attempts` (separate from the code-fault `attempts`),
|
||||
sets status back to 'queued', and gates re-pickup via `available_at` =
|
||||
now + backoff seconds. started_at/finished_at are cleared.
|
||||
"""
|
||||
conn = get_db()
|
||||
sets = [
|
||||
"status='queued'",
|
||||
"transient_attempts = transient_attempts + 1",
|
||||
"available_at = datetime('now', ?)",
|
||||
"started_at = NULL",
|
||||
"finished_at = NULL",
|
||||
]
|
||||
params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"]
|
||||
if error is not None:
|
||||
sets.append("error = ?")
|
||||
params.append(error)
|
||||
params.append(job_id)
|
||||
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def mark_job(
|
||||
job_id: int,
|
||||
status: str,
|
||||
@@ -282,3 +324,50 @@ def recent_jobs(limit: int = 10) -> list[dict]:
|
||||
).fetchall()
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-1b (resilience): transient backoff helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None):
|
||||
"""ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure.
|
||||
|
||||
Unlike a code-fault requeue, this:
|
||||
- increments `transient_attempts` (a separate budget from code-fault attempts)
|
||||
- sets `available_at = now + delay_seconds` so claim_next_job won't pick it
|
||||
up until the backoff window elapses
|
||||
- sets status back to 'queued' and clears started_at/finished_at
|
||||
|
||||
delay_seconds is computed by the caller (exp backoff, capped, Retry-After).
|
||||
"""
|
||||
conn = get_db()
|
||||
conn.execute(
|
||||
"UPDATE jobs SET status='queued', "
|
||||
"transient_attempts = transient_attempts + 1, "
|
||||
"available_at = datetime('now', ? || ' seconds'), "
|
||||
"started_at = NULL, finished_at = NULL, "
|
||||
"error = COALESCE(?, error) "
|
||||
"WHERE id = ?",
|
||||
(f"+{int(round(delay_seconds))}", error, job_id),
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float:
|
||||
"""ORCH-1b: exponential backoff (seconds) for a transient failure.
|
||||
|
||||
delay = min(2**transient_attempts * base, max). If the server sent a
|
||||
Retry-After hint we honour it as a floor (use the larger of the two so we
|
||||
never poll sooner than the server asked).
|
||||
|
||||
`transient_attempts` is the count AFTER this failure (i.e. how many transient
|
||||
failures have occurred), so the first backoff uses 2**1.
|
||||
"""
|
||||
base = getattr(settings, "backoff_base_seconds", 10)
|
||||
cap = getattr(settings, "backoff_max_seconds", 600)
|
||||
exp = min((2 ** max(transient_attempts, 0)) * base, cap)
|
||||
if retry_after is not None and retry_after > 0:
|
||||
return float(min(max(exp, retry_after), cap))
|
||||
return float(exp)
|
||||
|
||||
Reference in New Issue
Block a user