diff --git a/src/config.py b/src/config.py index 1a3ca75..e3d3003 100644 --- a/src/config.py +++ b/src/config.py @@ -36,6 +36,23 @@ class Settings(BaseSettings): max_concurrency: int = 1 queue_poll_interval: float = 2.0 + # ORCH-1b (resilience): preflight + 429/rate-limit + backoff + circuit breaker. + # preflight_cache_ttl -> cache the cheap CLI/network preflight result (seconds); + # the worker does NOT re-run `claude --version` more often + # than this (env ORCH_PREFLIGHT_CACHE_TTL). + # backoff_base_seconds -> base for exponential transient backoff. + # backoff_max_seconds -> ceiling for the transient backoff. + # transient_max_attempts -> retry budget for transient (429/overload/network) + # failures, separate from code-fault `attempts`. + # breaker_threshold -> consecutive transient failures that OPEN the breaker. + # breaker_pause_seconds -> how long the breaker stays open before half-open. + preflight_cache_ttl: int = 45 + backoff_base_seconds: int = 10 + backoff_max_seconds: int = 600 + transient_max_attempts: int = 5 + breaker_threshold: int = 3 + breaker_pause_seconds: int = 300 + # Telegram notifications telegram_bot_token: str = "" diff --git a/src/db.py b/src/db.py index dd6e97f..36590a4 100644 --- a/src/db.py +++ b/src/db.py @@ -55,15 +55,29 @@ def init_db(): max_attempts INTEGER NOT NULL DEFAULT 2, run_id INTEGER, -- agent_runs.id once started error TEXT, -- last error message + transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries + available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now) created_at TEXT DEFAULT (datetime('now')), started_at TEXT, finished_at TEXT ); CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id); """) + # Lightweight migration: add resilience columns to a pre-existing jobs table + # (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table). + _ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0") + _ensure_column(conn, "jobs", "available_at", "TEXT") conn.close() +def _ensure_column(conn, table: str, column: str, decl: str): + """Add a column to `table` if it does not already exist (idempotent migration).""" + cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] + if column not in cols: + conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}") + conn.commit() + + def get_task_by_plane_id(plane_id: str) -> dict | None: """Find task by Plane work item ID (checks plane_id and plane_issue_id).""" conn = get_db() @@ -170,7 +184,9 @@ def claim_next_job() -> dict | None: try: while True: row = conn.execute( - "SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1" + "SELECT id FROM jobs WHERE status='queued' " + "AND (available_at IS NULL OR available_at <= datetime('now')) " + "ORDER BY id LIMIT 1" ).fetchone() if not row: return None @@ -192,6 +208,32 @@ def claim_next_job() -> dict | None: conn.close() +def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int, + error: str | None = None) -> None: + """ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net). + + Increments `transient_attempts` (separate from the code-fault `attempts`), + sets status back to 'queued', and gates re-pickup via `available_at` = + now + backoff seconds. started_at/finished_at are cleared. + """ + conn = get_db() + sets = [ + "status='queued'", + "transient_attempts = transient_attempts + 1", + "available_at = datetime('now', ?)", + "started_at = NULL", + "finished_at = NULL", + ] + params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"] + if error is not None: + sets.append("error = ?") + params.append(error) + params.append(job_id) + conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params) + conn.commit() + conn.close() + + def mark_job( job_id: int, status: str, @@ -282,3 +324,50 @@ def recent_jobs(limit: int = 10) -> list[dict]: ).fetchall() conn.close() return [dict(r) for r in rows] + + +# --------------------------------------------------------------------------- +# ORCH-1b (resilience): transient backoff helpers +# --------------------------------------------------------------------------- + +def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None): + """ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure. + + Unlike a code-fault requeue, this: + - increments `transient_attempts` (a separate budget from code-fault attempts) + - sets `available_at = now + delay_seconds` so claim_next_job won't pick it + up until the backoff window elapses + - sets status back to 'queued' and clears started_at/finished_at + + delay_seconds is computed by the caller (exp backoff, capped, Retry-After). + """ + conn = get_db() + conn.execute( + "UPDATE jobs SET status='queued', " + "transient_attempts = transient_attempts + 1, " + "available_at = datetime('now', ? || ' seconds'), " + "started_at = NULL, finished_at = NULL, " + "error = COALESCE(?, error) " + "WHERE id = ?", + (f"+{int(round(delay_seconds))}", error, job_id), + ) + conn.commit() + conn.close() + + +def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float: + """ORCH-1b: exponential backoff (seconds) for a transient failure. + + delay = min(2**transient_attempts * base, max). If the server sent a + Retry-After hint we honour it as a floor (use the larger of the two so we + never poll sooner than the server asked). + + `transient_attempts` is the count AFTER this failure (i.e. how many transient + failures have occurred), so the first backoff uses 2**1. + """ + base = getattr(settings, "backoff_base_seconds", 10) + cap = getattr(settings, "backoff_max_seconds", 600) + exp = min((2 ** max(transient_attempts, 0)) * base, cap) + if retry_after is not None and retry_after > 0: + return float(min(max(exp, retry_after), cap)) + return float(exp)