From fd3dac7d228edcb89d1b547e186548c8a8603734 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH 01/13] feat(queue): add jobs table + queue helpers and config (ORCH-1) Persistent SQLite job queue (F-2b): jobs table + idx, atomic claim_next_job, enqueue/mark/count/requeue/get helpers. New settings max_concurrency (ORCH_MAX_CONCURRENCY) and queue_poll_interval (ORCH_QUEUE_POLL_INTERVAL). --- src/config.py | 6 ++ src/db.py | 177 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+) diff --git a/src/config.py b/src/config.py index d2d7d2c..1a3ca75 100644 --- a/src/config.py +++ b/src/config.py @@ -30,6 +30,12 @@ class Settings(BaseSettings): # DB db_path: str = "/app/data/orchestrator.db" + # ORCH-1 (F-2b): persistent job queue / background worker. + # max_concurrency -> max agent jobs running in parallel (env ORCH_MAX_CONCURRENCY) + # queue_poll_interval -> worker loop poll seconds (env ORCH_QUEUE_POLL_INTERVAL) + max_concurrency: int = 1 + queue_poll_interval: float = 2.0 + # Telegram notifications telegram_bot_token: str = "" diff --git a/src/db.py b/src/db.py index 693b954..dd6e97f 100644 --- a/src/db.py +++ b/src/db.py @@ -40,6 +40,26 @@ def init_db(): exit_code INTEGER, output_path TEXT ); + -- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and + -- return immediately; a background worker claims jobs (respecting + -- max_concurrency), spawns the claude agent, and updates the status. + -- Restart-safe: running jobs are requeued on startup (queue-recovery). + CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent TEXT NOT NULL, + repo TEXT NOT NULL, + task_id INTEGER, -- FK tasks.id (nullable) + task_content TEXT, -- written to the agent task_file + status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 2, + run_id INTEGER, -- agent_runs.id once started + error TEXT, -- last error message + created_at TEXT DEFAULT (datetime('now')), + started_at TEXT, + finished_at TEXT + ); + CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id); """) conn.close() @@ -105,3 +125,160 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str: next_num = 1 return f"{prefix}-{next_num:03d}" + + +# --------------------------------------------------------------------------- +# ORCH-1 (F-2b): job queue helpers +# --------------------------------------------------------------------------- + +def enqueue_job( + agent: str, + repo: str, + task_content: str | None = None, + task_id: int | None = None, + max_attempts: int = 2, +) -> int: + """Enqueue a new job (status='queued'). Returns the new job id. + + This is what webhook handlers call instead of launching an agent in-process: + it is a fast DB INSERT that returns immediately. The background worker + (queue_worker) picks the job up later. + """ + conn = get_db() + cursor = conn.execute( + "INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) " + "VALUES (?, ?, ?, ?, ?)", + (agent, repo, task_id, task_content, max_attempts), + ) + job_id = cursor.lastrowid + conn.commit() + conn.close() + return job_id + + +def claim_next_job() -> dict | None: + """Atomically claim the oldest queued job and mark it 'running'. + + Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause + and we check `rowcount`. If two worker ticks race for the same row, only the + first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0 + and retries the SELECT. We rely on SQLite's default per-connection transaction + so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None + when the queue is empty. + """ + conn = get_db() + try: + while True: + row = conn.execute( + "SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1" + ).fetchone() + if not row: + return None + job_id = row["id"] + cur = conn.execute( + "UPDATE jobs SET status='running', " + "attempts = attempts + 1, started_at = datetime('now') " + "WHERE id = ? AND status='queued'", + (job_id,), + ) + conn.commit() + if cur.rowcount == 1: + claimed = conn.execute( + "SELECT * FROM jobs WHERE id = ?", (job_id,) + ).fetchone() + return dict(claimed) + # Lost the race for this row; loop and try the next queued job. + finally: + conn.close() + + +def mark_job( + job_id: int, + status: str, + run_id: int | None = None, + error: str | None = None, +): + """Update a job's status (queued|running|done|failed). + + - run_id (optional): link to the agent_runs row that executed this job. + - error (optional): last error message (for failed/retry). + - 'done'/'failed' also stamp finished_at. + - 'queued' (requeue for retry) clears started_at/finished_at so the next + claim treats it as fresh. + """ + conn = get_db() + sets = ["status = ?"] + params: list = [status] + if run_id is not None: + sets.append("run_id = ?") + params.append(run_id) + if error is not None: + sets.append("error = ?") + params.append(error) + if status in ("done", "failed"): + sets.append("finished_at = datetime('now')") + elif status == "queued": + sets.append("started_at = NULL") + sets.append("finished_at = NULL") + params.append(job_id) + conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params) + conn.commit() + conn.close() + + +def count_running_jobs() -> int: + """Number of jobs currently in 'running' status (for max_concurrency).""" + conn = get_db() + n = conn.execute( + "SELECT COUNT(*) FROM jobs WHERE status='running'" + ).fetchone()[0] + conn.close() + return int(n) + + +def requeue_running_jobs() -> int: + """Queue-recovery: on startup, any job left 'running' belongs to a worker that + died on restart -> put it back to 'queued'. attempts are kept as-is (the next + claim does NOT re-increment beyond what is needed; claim_next_job increments on + pickup). Returns the number of requeued jobs. + """ + conn = get_db() + cur = conn.execute( + "UPDATE jobs SET status='queued', started_at = NULL " + "WHERE status='running'" + ) + conn.commit() + n = cur.rowcount + conn.close() + return int(n) + + +def get_job(job_id: int) -> dict | None: + """Fetch a single job by id.""" + conn = get_db() + row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone() + conn.close() + return dict(row) if row else None + + +def job_status_counts() -> dict: + """Return counts grouped by status (for /queue observability).""" + conn = get_db() + rows = conn.execute( + "SELECT status, COUNT(*) AS n FROM jobs GROUP BY status" + ).fetchall() + conn.close() + counts = {"queued": 0, "running": 0, "done": 0, "failed": 0} + for r in rows: + counts[r["status"]] = r["n"] + return counts + + +def recent_jobs(limit: int = 10) -> list[dict]: + """Return the most recent jobs (for /queue observability).""" + conn = get_db() + rows = conn.execute( + "SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,) + ).fetchall() + conn.close() + return [dict(r) for r in rows] From 3345c2fa0aecf6c455ce737e6549a4d777b45673 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH 02/13] feat(launcher): launch_job + job-status finalize with retries (ORCH-1) Refactor launch() into shared _spawn(); add launch_job(job) that threads job_id through monitor/watchdog. _finalize_job marks done / requeue (attempts enqueue_job. B-1/B-2/M-1/ORCH-2 spawn logic unchanged. --- src/agents/launcher.py | 120 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 107 insertions(+), 13 deletions(-) diff --git a/src/agents/launcher.py b/src/agents/launcher.py index b129412..dafedac 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -4,7 +4,7 @@ import logging import threading import signal from ..config import settings -from ..db import get_db, get_task_by_repo_branch, update_task_stage +from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage from ..git_worktree import ensure_worktree, get_worktree_path from ..qg.checks import QG_CHECKS @@ -57,7 +57,10 @@ class AgentLauncher: def launch(self, agent: str, repo: str, task_content: str = None, task_id: int = None) -> int: """ - Launch a Claude CLI agent. + Launch a Claude CLI agent directly (legacy synchronous path). + + Kept for backward compatibility (direct callers / existing tests). The + ORCH-1 job queue uses launch_job() instead, but both share _spawn(). Args: agent: Agent role (analyst, architect, developer, reviewer, tester) @@ -68,6 +71,31 @@ class AgentLauncher: Returns: agent_run_id from DB """ + return self._spawn(agent, repo, task_content, task_id, job_id=None) + + def launch_job(self, job: dict) -> int: + """ORCH-1: launch an agent for a claimed queue job. + + Same spawn path as launch(), but threads job['id'] through so the monitor + can update the job's status (done / requeue / failed) and link jobs.run_id + to the agent_runs row. Returns the agent_run_id. + """ + return self._spawn( + job["agent"], + job["repo"], + job.get("task_content"), + job.get("task_id"), + job_id=job["id"], + ) + + def _spawn(self, agent: str, repo: str, task_content: str = None, + task_id: int = None, job_id: int = None) -> int: + """Shared spawn implementation for launch() and launch_job(). + + When job_id is set, the monitor/watchdog drive the jobs table status + (ORCH-1). The claude-CLI Popen logic (B-2) and worktree/task-file logic + (B-1 / ORCH-2) are unchanged. + """ config = self.AGENT_CONFIGS.get(agent) if not config: raise ValueError(f"Unknown agent: {agent}") @@ -98,6 +126,14 @@ class AgentLauncher: run_id = cursor.lastrowid conn.commit() + # ORCH-1: link this job to the agent_runs row and stamp started_at. + if job_id is not None: + conn.execute( + "UPDATE jobs SET run_id = ?, started_at = datetime('now') WHERE id = ?", + (run_id, job_id), + ) + conn.commit() + # Prepare output log path output_path = f"/app/data/runs/{run_id}.log" os.makedirs(os.path.dirname(output_path), exist_ok=True) @@ -154,6 +190,7 @@ class AgentLauncher: t = threading.Thread( target=self._watchdog, args=(proc.pid, run_id), + kwargs={"job_id": job_id}, daemon=True, ) t.start() @@ -163,6 +200,7 @@ class AgentLauncher: m = threading.Thread( target=self._monitor_agent, args=(proc, run_id, agent, repo, agent_branch, output_path, log_fh), + kwargs={"job_id": job_id}, daemon=True, ) m.start() @@ -171,8 +209,13 @@ class AgentLauncher: notify_agent_started(run_id, agent, task_id) return run_id - def _watchdog(self, pid: int, run_id: int, timeout: int = None): - """Kill agent if it exceeds timeout.""" + def _watchdog(self, pid: int, run_id: int, timeout: int = None, job_id: int = None): + """Kill agent if it exceeds timeout. + + ORCH-1: on a timeout-kill the monitor's proc.wait() returns the kill exit + code and drives the job retry/fail logic, so the watchdog itself only needs + to SIGKILL and record the agent_runs exit. job_id is accepted for symmetry. + """ import time if timeout is None: timeout = self.AGENT_TIMEOUT @@ -190,7 +233,7 @@ class AgentLauncher: except ProcessLookupError: pass # Already finished - def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None): + def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None, log_fh=None, job_id=None): """Wait for agent to finish, commit+push results, update DB. B-2 fix: stdout already goes straight to the log file via Popen, so we just @@ -318,6 +361,57 @@ class AgentLauncher: if exit_code == 0: self._try_advance_stage(run_id, agent, repo, branch) + # ORCH-1: drive the job-queue status for queue-launched jobs only. + # (Legacy direct launch() has job_id=None and is unaffected.) + if job_id is not None: + self._finalize_job(job_id, agent, run_id, exit_code) + + def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code): + """ORCH-1: update the jobs row after the agent process finished. + + exit_code == 0 -> done. + exit_code != 0 -> retry while attempts < max_attempts (back to 'queued'), + otherwise 'failed' + Telegram notification. + attempts were already incremented at claim time, so `attempts` reflects how + many times this job has been picked up. + """ + from ..db import get_job, mark_job + try: + job = get_job(job_id) + if not job: + return + if exit_code == 0: + mark_job(job_id, "done", run_id=run_id) + logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})") + return + + attempts = job.get("attempts", 0) + max_attempts = job.get("max_attempts", 2) + err = f"agent {agent} exit_code={exit_code} (run_id={run_id})" + if attempts < max_attempts: + mark_job(job_id, "queued", run_id=run_id, error=err) + logger.warning( + f"Job {job_id} ({agent}) failed (exit={exit_code}), " + f"requeued (attempt {attempts}/{max_attempts})" + ) + else: + mark_job(job_id, "failed", run_id=run_id, error=err) + logger.error( + f"Job {job_id} ({agent}) failed permanently after " + f"{attempts} attempts (exit={exit_code})" + ) + try: + from ..notifications import send_telegram + send_telegram( + f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) " + f"failed after {attempts} attempts (exit={exit_code}). " + f"Logs: /app/data/runs/{run_id}.log" + ) + except Exception: + pass + except Exception as e: + logger.error(f"Job {job_id}: _finalize_job error: {e}") + def _try_advance_stage(self, run_id: int, agent: str, repo: str, branch: str): """After agent finishes successfully, check QG and advance stage if possible.""" try: @@ -416,8 +510,8 @@ class AgentLauncher: f"(attempt {retry_count+1}/3). Fix findings in " f"docs/work-items/{work_item_id}/12-review.md" ) - new_run = self.launch("developer", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, relaunched developer (run_id={new_run})") + new_job = enqueue_job("developer", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, enqueued developer (job_id={new_job})") else: from ..notifications import send_telegram send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.") @@ -446,8 +540,8 @@ class AgentLauncher: f"Stage: development\nNote: Tests FAILED. " f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md" ) - new_run = self.launch("developer", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})") + new_job = enqueue_job("developer", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: tester FAIL, enqueued developer (job_id={new_job})") else: from ..notifications import send_telegram from ..plane_sync import set_issue_blocked @@ -478,8 +572,8 @@ class AgentLauncher: f"Stage: analysis\nNote: Architect conflict. Revise TRZ. " f"See docs/work-items/{work_item_id}/10-conflict.md" ) - new_run = self.launch("analyst", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: architect conflict, relaunched analyst") + new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: architect conflict, enqueued analyst (job_id={new_job})") return return @@ -496,8 +590,8 @@ class AgentLauncher: next_agent = get_agent_for_stage(next_stage) if next_agent: task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}" - new_run_id = self.launch(next_agent, repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: launched '{next_agent}' (run_id={new_run_id})") + new_job_id = enqueue_job(next_agent, repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: enqueued '{next_agent}' (job_id={new_job_id})") except Exception as e: logger.error(f"Auto-advance failed for run_id={run_id}: {e}") From 20d6556e22603253272547516608da26494cf2d9 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH 03/13] refactor(webhooks): enqueue_job instead of in-process launch (ORCH-1) All 8 webhook launch points (plane x4, gitea x4) now enqueue a job and return immediately instead of synchronously spawning claude in the uvicorn process. --- src/webhooks/gitea.py | 18 +++++++++--------- src/webhooks/plane.py | 17 +++++++++-------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/webhooks/gitea.py b/src/webhooks/gitea.py index 33c318c..f6cd58a 100644 --- a/src/webhooks/gitea.py +++ b/src/webhooks/gitea.py @@ -10,7 +10,7 @@ import httpx from fastapi import APIRouter, Request, HTTPException from ..config import settings -from ..db import get_db, get_task_by_repo_branch, update_task_stage +from ..db import get_db, get_task_by_repo_branch, update_task_stage, enqueue_job from ..stages import get_next_stage, get_agent_for_stage from ..qg.checks import check_ci_green, check_review_approved from ..notifications import notify_stage_change, notify_qg_failure, notify_error @@ -123,8 +123,8 @@ async def handle_push(payload: dict): if agent: try: task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}" - run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, launched '{agent}' (run_id={run_id})") + job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: push triggered {current_stage} → {next_stage}, enqueued '{agent}' (job_id={job_id})") except Exception as e: notify_error(task_id, f"Failed to launch agent '{agent}': {e}") @@ -200,8 +200,8 @@ async def handle_ci_status(payload: dict): if agent: try: task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}" - run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: CI green → {next_stage}, launched '{agent}' (run_id={run_id})") + job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})") except Exception as e: notify_error(task_id, f"Failed to launch agent '{agent}': {e}") else: @@ -272,8 +272,8 @@ async def handle_pr(payload: dict): if agent: try: task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}" - run_id = launcher.launch(agent, repo_name, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: PR approved → {next_stage}, launched '{agent}' (run_id={run_id})") + job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})") except Exception as e: notify_error(task_id, f"Failed to launch agent '{agent}': {e}") else: @@ -297,8 +297,8 @@ async def handle_pr(payload: dict): f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n" f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})" ) - run_id = launcher.launch("developer", repo_name, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: changes requested, relaunching developer (attempt {retry_count + 1})") + job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})") except Exception as e: notify_error(task_id, f"Failed to relaunch developer: {e}") else: diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index 177ce6e..0dd23af 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -14,6 +14,7 @@ from ..db import ( get_task_by_plane_id, get_next_work_item_id, update_task_stage, + enqueue_job, ) from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage from ..qg.checks import QG_CHECKS @@ -186,8 +187,8 @@ async def handle_work_item_created(data: dict, project_id: str = ""): if task_row: task_id = task_row[0] task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: analysis\nTitle: {name}" - run_id = launcher.launch("analyst", repo, task_desc, task_id=task_id) - logger.info(f"Task {task_id}: launched analyst (run_id={run_id})") + job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})") # Post start comment to Plane from ..plane_sync import add_comment as _add_comment _add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).") @@ -231,10 +232,10 @@ async def handle_comment(data: dict, project_id: str = ""): f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. " f"Reason: {reason}\nRevise and improve." ) - new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id) + new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) from ..plane_sync import add_comment as _plane_comment _plane_comment(work_item_id, f"\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d. \u041f\u0440\u0438\u0447\u0438\u043d\u0430 \u043e\u0442\u043a\u043b\u043e\u043d\u0435\u043d\u0438\u044f: {reason}") - logger.info(f"Task {task_id}: rejected at analysis, relaunched analyst") + logger.info(f"Task {task_id}: rejected at analysis, enqueued analyst (job_id={new_job})") else: # Rollback to previous stage prev_stage = get_previous_stage(current_stage) @@ -305,10 +306,10 @@ async def handle_comment(data: dict, project_id: str = ""): f"Read the latest comment in Plane and revise your artifacts.\n" f"Answer: {comment_body[:500]}" ) - new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id) + new_job = enqueue_job("analyst", repo, task_desc, task_id=task_id) from ..plane_sync import add_comment as _pc2 _pc2(work_item_id, "\U0001f504 Analyst \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0441 \u043e\u0442\u0432\u0435\u0442\u0430\u043c\u0438 \u0441\u0442\u0435\u0439\u043a\u0445\u043e\u043b\u0434\u0435\u0440\u0430.") - logger.info(f"Task {task_id}: stakeholder answered questions, relaunched analyst (run_id={new_run})") + logger.info(f"Task {task_id}: stakeholder answered questions, enqueued analyst (job_id={new_job})") return except Exception as e: logger.error(f"Failed to check issue state: {e}") @@ -386,9 +387,9 @@ async def _try_advance_stage( if agent: try: task_desc = f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\nStage: {next_stage}" - run_id = launcher.launch(agent, repo, task_desc, task_id=task_id) + job_id = enqueue_job(agent, repo, task_desc, task_id=task_id) plane_notify_stage(work_item_id, current_stage, next_stage, agent) - logger.info(f"Task {task_id}: launched agent '{agent}', run_id={run_id}") + logger.info(f"Task {task_id}: enqueued agent '{agent}', job_id={job_id}") except Exception as e: notify_error(task_id, f"Failed to launch agent '{agent}': {e}") logger.error(f"Agent launch failed: {e}") From b6d4426a4846c465c9027ff1f1e353746f393f10 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH 04/13] feat(worker): background queue worker + lifespan + queue-recovery + /queue (ORCH-1) queue_worker.QueueWorker drains the queue respecting max_concurrency. main.py lifespan: queue-recovery (requeue running jobs) after M-1 orphan-recovery, starts worker and stops it on shutdown. New GET /queue endpoint (counts + recent jobs). --- src/main.py | 33 +++++++++++++- src/queue_worker.py | 107 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 src/queue_worker.py diff --git a/src/main.py b/src/main.py index f1952e8..62e5ef9 100644 --- a/src/main.py +++ b/src/main.py @@ -51,7 +51,25 @@ async def lifespan(app: FastAPI): except Exception: pass log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs") - yield + + # ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a + # worker that died on the previous restart -> put it back to 'queued' so the + # worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1. + from .db import requeue_running_jobs + requeued = requeue_running_jobs() + if requeued: + log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart") + + # Start the background job-queue worker (ORCH-1). + from .queue_worker import worker + worker.start() + + try: + yield + finally: + # Graceful shutdown of the worker (running agents keep going; their jobs + # are requeued on next start via queue-recovery if the process dies). + worker.stop() app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan) @@ -73,3 +91,16 @@ async def status(): ).fetchall() conn.close() return {"active_tasks": [dict(t) for t in tasks]} + + +@app.get("/queue") +async def queue(): + """ORCH-1: job-queue observability — status counts + recent jobs.""" + from .db import job_status_counts, recent_jobs + from .queue_worker import worker + return { + "counts": job_status_counts(), + "max_concurrency": worker.max_concurrency, + "poll_interval": worker.poll_interval, + "recent": recent_jobs(10), + } diff --git a/src/queue_worker.py b/src/queue_worker.py new file mode 100644 index 0000000..20106e9 --- /dev/null +++ b/src/queue_worker.py @@ -0,0 +1,107 @@ +"""ORCH-1 (F-2b): background job-queue worker. + +A single background thread polls the `jobs` table and spawns agents: + + while running: + while count_running_jobs() < max_concurrency: + job = claim_next_job() # atomic queued -> running + if not job: break + launcher.launch_job(job) # spawns claude (Popen) + monitor thread + sleep(poll_interval) + +Design notes +------------ +- We use a plain daemon thread + threading.Event for shutdown rather than an + asyncio task: the launcher already manages its own monitor/watchdog threads and + blocking Popen, so a thread loop is the simplest, most robust fit. +- `launch_job()` is non-blocking: it spawns the process and returns immediately; + the monitor thread updates `jobs.status` to done/queued/failed on completion. + That status change frees a `count_running_jobs()` slot for the next claim. +- Restart-safe: queue-recovery (requeue_running_jobs) runs in main.py lifespan + BEFORE the worker starts, so jobs left 'running' by a dead worker get retried. +""" +import logging +import threading + +from .config import settings +from .db import claim_next_job, count_running_jobs +from .agents.launcher import launcher + +logger = logging.getLogger("orchestrator.queue_worker") + + +class QueueWorker: + """Background worker that drains the persistent job queue.""" + + def __init__(self, max_concurrency: int = None, poll_interval: float = None): + self.max_concurrency = ( + max_concurrency if max_concurrency is not None else settings.max_concurrency + ) + self.poll_interval = ( + poll_interval if poll_interval is not None else settings.queue_poll_interval + ) + self._stop = threading.Event() + self._thread: threading.Thread | None = None + + def _drain_once(self): + """Claim and launch jobs until concurrency is full or the queue is empty.""" + while not self._stop.is_set(): + if count_running_jobs() >= self.max_concurrency: + return + job = claim_next_job() + if not job: + return + try: + run_id = launcher.launch_job(job) + logger.info( + f"Worker launched job {job['id']} ({job['agent']}, " + f"repo {job['repo']}) -> run_id={run_id}" + ) + except Exception as e: + # Launch itself failed (e.g. repo missing): mark the job failed so it + # does not wedge as 'running' forever and block the slot. + logger.error(f"Worker failed to launch job {job['id']}: {e}") + try: + from .db import get_job, mark_job + + j = get_job(job["id"]) + attempts = j.get("attempts", 0) if j else 0 + max_attempts = j.get("max_attempts", 2) if j else 2 + if attempts < max_attempts: + mark_job(job["id"], "queued", error=f"launch error: {e}") + else: + mark_job(job["id"], "failed", error=f"launch error: {e}") + except Exception: + pass + + def _run(self): + logger.info( + f"Queue worker started (max_concurrency={self.max_concurrency}, " + f"poll_interval={self.poll_interval}s)" + ) + while not self._stop.is_set(): + try: + self._drain_once() + except Exception as e: + logger.error(f"Queue worker loop error: {e}") + # Sleep is interruptible by stop event for fast shutdown. + self._stop.wait(self.poll_interval) + logger.info("Queue worker stopped") + + def start(self): + if self._thread and self._thread.is_alive(): + return + self._stop.clear() + self._thread = threading.Thread( + target=self._run, name="queue-worker", daemon=True + ) + self._thread.start() + + def stop(self, timeout: float = 5.0): + self._stop.set() + if self._thread: + self._thread.join(timeout=timeout) + + +# Module-level singleton used by the FastAPI lifespan. +worker = QueueWorker() From 2283b8898b5e652ea2f4aa7d469e0b70d3eb1303 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH 05/13] test(queue): 19 tests for job queue lifecycle/atomicity/retry/worker (ORCH-1) Covers enqueue->claim->mark, atomic claim (no double dispatch, 8-thread race), retry fail->queued->failed, requeue_running_jobs, observability, worker max_concurrency. Popen fully mocked (no real agent spawned). --- tests/test_queue.py | 298 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 tests/test_queue.py diff --git a/tests/test_queue.py b/tests/test_queue.py new file mode 100644 index 0000000..d3f1536 --- /dev/null +++ b/tests/test_queue.py @@ -0,0 +1,298 @@ +"""Tests for ORCH-1 (F-2b) persistent job queue. + +Covers: + - enqueue_job -> claim_next_job -> mark_job lifecycle + - claim_next_job atomicity (no double-dispatch of the same job) + - retry: fail -> requeue while attempts < max_attempts, then failed + - requeue_running_jobs (queue-recovery) + - count_running_jobs / job_status_counts / recent_jobs + - QueueWorker respects max_concurrency (Popen / launch fully mocked) + +The real claude/Popen is NEVER spawned: launcher.launch_job is mocked in worker +tests, and the launcher finalize logic is exercised directly via mark_job. +""" +import os +import tempfile + +import pytest + +# Override env before importing app modules (same convention as test_qg.py). +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_queue.db") +os.environ["ORCH_DB_PATH"] = _test_db +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ["ORCH_GITEA_TOKEN"] = "test-token" +os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" + +import src.db as db +from src.db import ( + init_db, + enqueue_job, + claim_next_job, + mark_job, + count_running_jobs, + requeue_running_jobs, + get_job, + job_status_counts, + recent_jobs, +) + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + """Point the DB at a fresh per-test sqlite file and init the schema.""" + dbfile = tmp_path / "queue.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + init_db() + yield + + +# --------------------------------------------------------------------------- +# enqueue / claim / mark lifecycle +# --------------------------------------------------------------------------- +class TestLifecycle: + def test_enqueue_creates_queued_job(self): + jid = enqueue_job("analyst", "enduro-trails", "task body", task_id=7) + job = get_job(jid) + assert job["status"] == "queued" + assert job["agent"] == "analyst" + assert job["repo"] == "enduro-trails" + assert job["task_content"] == "task body" + assert job["task_id"] == 7 + assert job["attempts"] == 0 + assert job["max_attempts"] == 2 + + def test_claim_marks_running_and_increments_attempts(self): + jid = enqueue_job("developer", "repo") + claimed = claim_next_job() + assert claimed is not None + assert claimed["id"] == jid + assert claimed["status"] == "running" + assert claimed["attempts"] == 1 + assert count_running_jobs() == 1 + + def test_claim_empty_queue_returns_none(self): + assert claim_next_job() is None + + def test_claim_is_fifo(self): + a = enqueue_job("analyst", "r") + b = enqueue_job("developer", "r") + assert claim_next_job()["id"] == a + assert claim_next_job()["id"] == b + + def test_mark_done(self): + jid = enqueue_job("tester", "r") + claim_next_job() + mark_job(jid, "done", run_id=42) + job = get_job(jid) + assert job["status"] == "done" + assert job["run_id"] == 42 + assert job["finished_at"] is not None + assert count_running_jobs() == 0 + + def test_mark_failed_records_error(self): + jid = enqueue_job("tester", "r") + claim_next_job() + mark_job(jid, "failed", run_id=9, error="boom") + job = get_job(jid) + assert job["status"] == "failed" + assert job["error"] == "boom" + assert job["finished_at"] is not None + + +# --------------------------------------------------------------------------- +# claim atomicity — no double dispatch +# --------------------------------------------------------------------------- +class TestClaimAtomicity: + def test_single_job_claimed_once(self): + jid = enqueue_job("analyst", "r") + first = claim_next_job() + second = claim_next_job() + assert first["id"] == jid + assert second is None # already running, not re-dispatched + + def test_concurrent_claims_no_duplicate(self): + """Many enqueued jobs claimed from parallel threads -> each claimed once.""" + import threading + + n = 20 + for _ in range(n): + enqueue_job("developer", "r") + + claimed_ids = [] + lock = threading.Lock() + + def grab(): + while True: + job = claim_next_job() + if job is None: + return + with lock: + claimed_ids.append(job["id"]) + + threads = [threading.Thread(target=grab) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(claimed_ids) == n + assert len(set(claimed_ids)) == n # no id claimed twice + assert count_running_jobs() == n + + +# --------------------------------------------------------------------------- +# retry semantics (mirrors launcher._finalize_job logic) +# --------------------------------------------------------------------------- +class TestRetry: + def test_fail_requeues_while_under_max(self): + jid = enqueue_job("developer", "r", max_attempts=2) + job = claim_next_job() # attempts=1 + assert job["attempts"] == 1 + # attempts(1) < max(2) -> requeue + mark_job(jid, "queued", error="exit 1") + j = get_job(jid) + assert j["status"] == "queued" + assert j["error"] == "exit 1" + assert j["started_at"] is None # requeue clears started_at + + def test_fail_fails_when_max_reached(self): + jid = enqueue_job("developer", "r", max_attempts=2) + claim_next_job() # attempts=1 -> requeue + mark_job(jid, "queued") + job2 = claim_next_job() # attempts=2 + assert job2["attempts"] == 2 + # attempts(2) >= max(2) -> failed + mark_job(jid, "failed", error="exit 1") + assert get_job(jid)["status"] == "failed" + + def test_finalize_job_done(self): + """launcher._finalize_job marks done on exit_code 0 (no Popen needed).""" + from src.agents.launcher import AgentLauncher + jid = enqueue_job("analyst", "r") + claim_next_job() + AgentLauncher()._finalize_job(jid, "analyst", run_id=5, exit_code=0) + assert get_job(jid)["status"] == "done" + + def test_finalize_job_requeue_then_fail(self, monkeypatch): + from src.agents.launcher import AgentLauncher + # Silence telegram side-effect. + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + lr = AgentLauncher() + jid = enqueue_job("developer", "r", max_attempts=2) + + claim_next_job() # attempts=1 + lr._finalize_job(jid, "developer", run_id=1, exit_code=2) + assert get_job(jid)["status"] == "queued" # 1 < 2 -> requeue + + claim_next_job() # attempts=2 + lr._finalize_job(jid, "developer", run_id=2, exit_code=2) + assert get_job(jid)["status"] == "failed" # 2 >= 2 -> failed + + +# --------------------------------------------------------------------------- +# queue-recovery +# --------------------------------------------------------------------------- +class TestRequeueRunning: + def test_requeue_running_jobs(self): + a = enqueue_job("analyst", "r") + b = enqueue_job("developer", "r") + claim_next_job() # a -> running + claim_next_job() # b -> running + assert count_running_jobs() == 2 + n = requeue_running_jobs() + assert n == 2 + assert count_running_jobs() == 0 + assert get_job(a)["status"] == "queued" + assert get_job(b)["status"] == "queued" + + def test_requeue_preserves_attempts(self): + jid = enqueue_job("analyst", "r") + claim_next_job() # attempts=1 + requeue_running_jobs() + assert get_job(jid)["attempts"] == 1 # not reset + + +# --------------------------------------------------------------------------- +# observability helpers +# --------------------------------------------------------------------------- +class TestObservability: + def test_status_counts(self): + enqueue_job("analyst", "r") # stays queued + enqueue_job("developer", "r") # first claimed -> running (FIFO) + claim_next_job() + counts = job_status_counts() + assert counts["running"] == 1 + assert counts["queued"] == 1 + assert counts["done"] == 0 + assert counts["failed"] == 0 + + def test_recent_jobs_desc(self): + ids = [enqueue_job("analyst", "r") for _ in range(3)] + recent = recent_jobs(10) + assert [r["id"] for r in recent] == sorted(ids, reverse=True) + + +# --------------------------------------------------------------------------- +# QueueWorker max_concurrency (launch_job fully mocked — no real Popen) +# --------------------------------------------------------------------------- +class TestWorkerConcurrency: + def test_worker_respects_max_concurrency(self, monkeypatch): + from src.queue_worker import QueueWorker + + launched = [] + + def fake_launch_job(job): + # Simulate a long-running agent: the job stays 'running' (we do NOT + # mark it done), so the slot remains occupied. + launched.append(job["id"]) + return 100 + job["id"] + + monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job) + + for _ in range(5): + enqueue_job("developer", "r") + + w = QueueWorker(max_concurrency=2, poll_interval=0.01) + w._drain_once() + + # Only max_concurrency jobs may be launched / running at once. + assert len(launched) == 2 + assert count_running_jobs() == 2 + + def test_worker_drains_as_slots_free(self, monkeypatch): + from src.queue_worker import QueueWorker + + def fake_launch_job(job): + # Immediately complete the job so the slot frees for the next claim. + mark_job(job["id"], "done", run_id=job["id"]) + return job["id"] + + monkeypatch.setattr("src.queue_worker.launcher.launch_job", fake_launch_job) + + for _ in range(4): + enqueue_job("analyst", "r") + + w = QueueWorker(max_concurrency=1, poll_interval=0.01) + w._drain_once() + + # With instant completion and concurrency 1, one drain pass empties the queue. + assert job_status_counts()["done"] == 4 + assert count_running_jobs() == 0 + + def test_worker_launch_failure_does_not_wedge_slot(self, monkeypatch): + from src.queue_worker import QueueWorker + + def boom(job): + raise RuntimeError("repo missing") + + monkeypatch.setattr("src.queue_worker.launcher.launch_job", boom) + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + + enqueue_job("developer", "r", max_attempts=1) + w = QueueWorker(max_concurrency=1, poll_interval=0.01) + w._drain_once() + + # attempts=1 >= max_attempts=1 -> failed, not stuck running. + assert count_running_jobs() == 0 + counts = job_status_counts() + assert counts["failed"] == 1 From 4be168c0ec45985c0aee63669719132c1d127dc2 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Tue, 2 Jun 2026 23:58:44 +0300 Subject: [PATCH 06/13] docs(queue): document job queue, /queue, env vars (ORCH-1) ARCHITECTURE job-queue section + flow diagram, README /queue endpoint and ORCH_MAX_CONCURRENCY/ORCH_QUEUE_POLL_INTERVAL, new docs/ORCH-1_JOB_QUEUE.md. --- README.md | 24 +++++++++++- docs/ARCHITECTURE.md | 68 ++++++++++++++++++++++++++++++-- docs/ORCH-1_JOB_QUEUE.md | 83 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 171 insertions(+), 4 deletions(-) create mode 100644 docs/ORCH-1_JOB_QUEUE.md diff --git a/README.md b/README.md index c516245..9b47539 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ created → analysis → architecture → development → review → testing → |--------|------|----------| | GET | `/health` | Health check | | GET | `/status` | Активные задачи (stage != done) | +| GET | `/queue` | Очередь задач (ORCH-1): counts по статусам + max_concurrency + последние 10 jobs | | POST | `/webhook/plane` | Plane webhook receiver | | POST | `/webhook/gitea` | Gitea webhook receiver | @@ -52,8 +53,9 @@ src/ ├── stages.py # State machine (transitions, agents, QG) ├── notifications.py # Уведомления (логирование) ├── plane_sync.py # Синхронизация статусов с Plane API +├── queue_worker.py # ORCH-1: фоновый воркер очереди (claim → launch_job) ├── agents/ -│ └── launcher.py # AgentLauncher: launch, monitor, watchdog, auto-advance +│ └── launcher.py # AgentLauncher: launch/launch_job, monitor, watchdog, auto-advance ├── webhooks/ │ ├── plane.py # Plane webhook handler │ └── gitea.py # Gitea webhook handler (push, PR, CI status) @@ -107,6 +109,26 @@ uvicorn src.main:app --reload --port 8500 | `ORCH_REPOS_DIR` | Repos dir (container) | `/repos` | | `ORCH_HOST_REPOS_DIR` | Repos dir (host) | `/home/slin/repos` | | `ORCH_DB_PATH` | SQLite path | `/app/data/orchestrator.db` | +| `ORCH_MAX_CONCURRENCY` | Сколько jobs воркер запускает параллельно (ORCH-1) | `1` | +| `ORCH_QUEUE_POLL_INTERVAL` | Период опроса очереди воркером, сек (ORCH-1) | `2.0` | + +## Очередь задач (ORCH-1 / F-2b) + +Webhook-хэндлеры больше не спавнят claude-агентов синхронно в процессе uvicorn. +Вместо этого они кладут **job** в персистентную SQLite-таблицу `jobs` +(`enqueue_job`, мгновенный ответ), а фоновый воркер (`src/queue_worker.py`) +забирает jobs с учётом `ORCH_MAX_CONCURRENCY` и запускает агента (`launch_job`, +та же Popen-логика, что и раньше). + +Преимущества: +- **Рестарт-safe.** При старте jobs со статусом `running` возвращаются в `queued` + (queue-recovery в lifespan) — работа не теряется. +- **Лимит параллелизма.** Воркер не превышает `ORCH_MAX_CONCURRENCY`. +- **Ретраи.** Упавший job (exit≠0) ретраится пока `attempts < max_attempts`, + потом `failed` + Telegram-нотификация. + +Статусы job: `queued → running → done | failed`. Наблюдаемость — через `GET /queue`. +Подробности: `docs/ORCH-1_JOB_QUEUE.md`. ## Multi-repo: реестр проектов (ORCH-6) diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 73593b9..2857ee2 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -264,9 +264,71 @@ services: - ~~Shared `/repos` checkout (гонки при параллельных задачах).~~ **РЕШЕНО (ORCH-2 / S-4):** git worktree per task/branch — см. раздел «Изоляция через git worktree» ниже. -- **In-process daemon-потоки.** Агенты запускаются в daemon-потоках uvicorn. При - рестарте uvicorn запущенные агенты осиротевают → ловит orphan-recovery (M-1). - Целевая архитектура — очередь задач (F-2b, отдельно). +- ~~In-process daemon-потоки (рестарт → сироты, потеря работы).~~ **РЕШЕНО (ORCH-1 / F-2b):** + персистентная очередь jobs + фоновый воркер — см. раздел «Очередь задач (ORCH-1)» ниже. + Daemon-потоки monitor/watchdog остаются для одного запущенного агента, но при + рестарте его job возвращается в `queued` (queue-recovery) и переподхватывается. + +## Очередь задач (ORCH-1 / F-2b) + +Раньше webhook-хэндлер **синхронно** спавнил `subprocess.Popen` + 2 daemon-thread +прямо в процессе uvicorn (8 точек вызова). Рестарт = сироты + потеря работы, +нет лимита параллелизма, нет ретраев. + +### Flow + +``` +webhook (plane/gitea) background thread (queue_worker) + │ │ + enqueue_job() ---> [ jobs table ] <--- claim_next_job() (atomic queued->running) + (мгновенный status=queued │ + ответ 200) launch_job(job) + │ + AgentLauncher._spawn (Popen claude) + │ + _monitor_agent (proc.wait, commit/push, + │ advance stage) + │ + _finalize_job: + exit 0 -> mark_job done + exit !=0 & attempts requeue (queued) + exit !=0 & attempts>=max -> failed + Telegram +``` + +### Таблица `jobs` + +| Колонка | Назначение | +|--------|------------| +| `status` | `queued` → `running` → `done` \| `failed` | +| `attempts` / `max_attempts` | счётчик попыток (инкремент при claim) / лимит ретраев (default 2) | +| `run_id` | FK на `agent_runs.id` после старта | +| `task_content` | ТЗ, которое пишется в task-файл агента | +| `error` | последняя ошибка | + +`idx_jobs_status (status, id)` — быстрый FIFO-выбор queued. + +### Атомарный claim + +`claim_next_job()` делает `SELECT queued ORDER BY id LIMIT 1` → `UPDATE ... WHERE id=? AND +status='queued'` и проверяет `rowcount`. При гонке двух тиков лишь один UPDATE +переведёт строку в `running` (rowcount==1); проигравший берёт следующий job. + +### Queue-recovery (рестарт-safe) + +В `main.py` lifespan **после** M-1 orphan-recovery вызывается `requeue_running_jobs()`: +jobs со статусом `running` (воркер умёр на рестарте) → возвращаются в `queued`. +Потом стартует воркер; на shutdown — `worker.stop()` (Event.set + join). + +### Конфиг + +- `ORCH_MAX_CONCURRENCY` (default 1) — лимит параллельных jobs. +- `ORCH_QUEUE_POLL_INTERVAL` (default 2.0) — период опроса. + +Наблюдаемость: `GET /queue` — counts по статусам + последние 10 jobs. + +> Совместимость: `launcher.launch()` (прямой синхронный запуск, `job_id=None`) +> сохранён для обратной совместимости. Очередь использует `launch_job()`; +> оба разделяют `_spawn()` (Popen-логика B-2 не изменена). - **Gitea CI не настроен.** QG развития теперь локальный (`check_tests_local`); Gitea CI-статусы не являются authoritative и не блокируют pipeline. - **Docker внутри контейнера orchestrator НЕДОСТУПЕН.** Деплой идёт только через diff --git a/docs/ORCH-1_JOB_QUEUE.md b/docs/ORCH-1_JOB_QUEUE.md new file mode 100644 index 0000000..eeed34c --- /dev/null +++ b/docs/ORCH-1_JOB_QUEUE.md @@ -0,0 +1,83 @@ +# ORCH-1 (F-2b): Persistent Job Queue + +**Дата:** 2026-06-02 +**Ветка:** `feature/ORCH-1-job-queue` +**Источник:** AUDIT_2026-06-02 (B-2 / F-2b) + +## Проблема + +Агенты запускались **in-process**: `launcher.launch()` синхронно спавнил +`subprocess.Popen` + 2 daemon-thread (`_watchdog`, `_monitor_agent`) прямо в +процессе uvicorn, из **8 webhook-точек**. Последствия: + +- **Рестарт = катастрофа.** daemon-threads умирают, claude-процессы → сироты, + работа теряется (M-1 лишь помечал `exit=-1` и звал человека). +- **Нет лимита параллелизма** — N webhook'ов = N одновременных claude. +- **Нет ретраев** — упавший агент просто мёртв. + +## Решение + +Персистентная очередь задач (SQLite-таблица `jobs`) + фоновый воркер: + +1. Webhook-хэндлер кладёт job (`enqueue_job`) → мгновенный ответ 200. +2. Фоновый воркер (`src/queue_worker.py`, отдельный daemon-thread) забирает + jobs с учётом `max_concurrency` (`claim_next_job`, атомарно) и спавнит агента + (`launcher.launch_job`, та же Popen-логика). +3. По завершении `_monitor_agent` → `_finalize_job`: + - `exit 0` → `done`; + - `exit != 0` & `attempts < max_attempts` → requeue (`queued`); + - `exit != 0` & `attempts >= max_attempts` → `failed` + Telegram. + +## Что изменено + +| Файл | Изменение | +|------|-----------| +| `src/db.py` | Таблица `jobs` + индекс; хелперы `enqueue_job`, `claim_next_job` (атомарный), `mark_job`, `count_running_jobs`, `requeue_running_jobs`, `get_job`, `job_status_counts`, `recent_jobs` | +| `src/config.py` | `max_concurrency` (env `ORCH_MAX_CONCURRENCY`, default 1), `queue_poll_interval` (env `ORCH_QUEUE_POLL_INTERVAL`, default 2.0) | +| `src/agents/launcher.py` | `launch()` → тонкая обёртка над `_spawn()`; новый `launch_job(job)`; `_spawn()` (общий, `job_id` опционально); monitor/watchdog принимают `job_id`; новый `_finalize_job()` (статусы + ретраи). 4 внутренних advance-вызова `self.launch` → `enqueue_job` | +| `src/webhooks/plane.py` | 4 точки `launcher.launch` → `enqueue_job` | +| `src/webhooks/gitea.py` | 4 точки `launcher.launch` → `enqueue_job` | +| `src/queue_worker.py` | **НОВЫЙ** — `QueueWorker` (drain loop + max_concurrency + graceful stop) | +| `src/main.py` | lifespan: queue-recovery (`requeue_running_jobs`) после M-1, старт/останов воркера; новый `GET /queue` | +| `tests/test_queue.py` | **НОВЫЙ** — 19 тестов (lifecycle, атомарность claim, ретраи, requeue, observability, worker max_concurrency; Popen полностью замокан) | + +## Атомарность claim + +```sql +SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1; +UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now') + WHERE id=? AND status='queued'; -- rowcount==1 => claimed, ==0 => проиграл гонку +``` + +Гарантия: один job не выдаётся дважды даже при параллельных тиках воркера +(проверено `test_concurrent_claims_no_duplicate` — 8 потоков, 20 jobs). + +## Сохранённые фиксы (НЕ сломаны) + +- **B-1** task-file write (direct `open()` в worktree) — без изменений. +- **B-2** Popen → log_fh (no PIPE), monitor reap — без изменений, только обёрнут. +- **M-1** orphan-recovery в `main.py` — оставлен, queue-recovery добавлен ПОСЛЕ него. +- **ORCH-2** worktree per task — без изменений. +- **ORCH-6** project registry/filter — без изменений. + +## Acceptance + +| # | Проверка | Статус | +|---|----------|--------| +| 1 | webhook кладёт job (queued) | ✅ enqueue_job | +| 2 | воркер исполняет queued→running→done | ✅ worker + _finalize_job | +| 3 | running ≤ max_concurrency | ✅ test_worker_respects_max_concurrency | +| 4 | ретрай fail→queued→failed+notify | ✅ test_finalize_job_requeue_then_fail | +| 5 | рестарт-safe (running→requeue) | ✅ requeue_running_jobs + lifespan | +| 6 | M-1 не сломан | ✅ оставлен в lifespan | +| 7 | тесты (new green, 9 pre-existing) | ✅ 76 passed / 9 pre-existing | +| 8 | `/queue` | ✅ counts + recent | + +## Тесты + +```bash +IMG=$(docker inspect orchestrator --format '{{.Config.Image}}') +docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \ + --entrypoint python3 $IMG -m pytest tests/ -q +# 76 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError) +``` From 0cd9b11fe061eaa39e4e7ab1513eac11e325fdf9 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 00:12:17 +0300 Subject: [PATCH 07/13] feat(queue): resilience schema + backoff helper + config (ORCH-1) jobs.transient_attempts + available_at columns (idempotent _ensure_column migration); claim_next_job honours available_at; mark_job_transient (backoff requeue with separate transient budget). Config: preflight_cache_ttl, backoff_base/max_seconds, transient_max_attempts, breaker_threshold, breaker_pause_seconds. --- src/config.py | 17 ++++++++++ src/db.py | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 107 insertions(+), 1 deletion(-) diff --git a/src/config.py b/src/config.py index 1a3ca75..e3d3003 100644 --- a/src/config.py +++ b/src/config.py @@ -36,6 +36,23 @@ class Settings(BaseSettings): max_concurrency: int = 1 queue_poll_interval: float = 2.0 + # ORCH-1b (resilience): preflight + 429/rate-limit + backoff + circuit breaker. + # preflight_cache_ttl -> cache the cheap CLI/network preflight result (seconds); + # the worker does NOT re-run `claude --version` more often + # than this (env ORCH_PREFLIGHT_CACHE_TTL). + # backoff_base_seconds -> base for exponential transient backoff. + # backoff_max_seconds -> ceiling for the transient backoff. + # transient_max_attempts -> retry budget for transient (429/overload/network) + # failures, separate from code-fault `attempts`. + # breaker_threshold -> consecutive transient failures that OPEN the breaker. + # breaker_pause_seconds -> how long the breaker stays open before half-open. + preflight_cache_ttl: int = 45 + backoff_base_seconds: int = 10 + backoff_max_seconds: int = 600 + transient_max_attempts: int = 5 + breaker_threshold: int = 3 + breaker_pause_seconds: int = 300 + # Telegram notifications telegram_bot_token: str = "" diff --git a/src/db.py b/src/db.py index dd6e97f..36590a4 100644 --- a/src/db.py +++ b/src/db.py @@ -55,15 +55,29 @@ def init_db(): max_attempts INTEGER NOT NULL DEFAULT 2, run_id INTEGER, -- agent_runs.id once started error TEXT, -- last error message + transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries + available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now) created_at TEXT DEFAULT (datetime('now')), started_at TEXT, finished_at TEXT ); CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id); """) + # Lightweight migration: add resilience columns to a pre-existing jobs table + # (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table). + _ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0") + _ensure_column(conn, "jobs", "available_at", "TEXT") conn.close() +def _ensure_column(conn, table: str, column: str, decl: str): + """Add a column to `table` if it does not already exist (idempotent migration).""" + cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] + if column not in cols: + conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}") + conn.commit() + + def get_task_by_plane_id(plane_id: str) -> dict | None: """Find task by Plane work item ID (checks plane_id and plane_issue_id).""" conn = get_db() @@ -170,7 +184,9 @@ def claim_next_job() -> dict | None: try: while True: row = conn.execute( - "SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1" + "SELECT id FROM jobs WHERE status='queued' " + "AND (available_at IS NULL OR available_at <= datetime('now')) " + "ORDER BY id LIMIT 1" ).fetchone() if not row: return None @@ -192,6 +208,32 @@ def claim_next_job() -> dict | None: conn.close() +def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int, + error: str | None = None) -> None: + """ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net). + + Increments `transient_attempts` (separate from the code-fault `attempts`), + sets status back to 'queued', and gates re-pickup via `available_at` = + now + backoff seconds. started_at/finished_at are cleared. + """ + conn = get_db() + sets = [ + "status='queued'", + "transient_attempts = transient_attempts + 1", + "available_at = datetime('now', ?)", + "started_at = NULL", + "finished_at = NULL", + ] + params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"] + if error is not None: + sets.append("error = ?") + params.append(error) + params.append(job_id) + conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params) + conn.commit() + conn.close() + + def mark_job( job_id: int, status: str, @@ -282,3 +324,50 @@ def recent_jobs(limit: int = 10) -> list[dict]: ).fetchall() conn.close() return [dict(r) for r in rows] + + +# --------------------------------------------------------------------------- +# ORCH-1b (resilience): transient backoff helpers +# --------------------------------------------------------------------------- + +def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None): + """ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure. + + Unlike a code-fault requeue, this: + - increments `transient_attempts` (a separate budget from code-fault attempts) + - sets `available_at = now + delay_seconds` so claim_next_job won't pick it + up until the backoff window elapses + - sets status back to 'queued' and clears started_at/finished_at + + delay_seconds is computed by the caller (exp backoff, capped, Retry-After). + """ + conn = get_db() + conn.execute( + "UPDATE jobs SET status='queued', " + "transient_attempts = transient_attempts + 1, " + "available_at = datetime('now', ? || ' seconds'), " + "started_at = NULL, finished_at = NULL, " + "error = COALESCE(?, error) " + "WHERE id = ?", + (f"+{int(round(delay_seconds))}", error, job_id), + ) + conn.commit() + conn.close() + + +def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float: + """ORCH-1b: exponential backoff (seconds) for a transient failure. + + delay = min(2**transient_attempts * base, max). If the server sent a + Retry-After hint we honour it as a floor (use the larger of the two so we + never poll sooner than the server asked). + + `transient_attempts` is the count AFTER this failure (i.e. how many transient + failures have occurred), so the first backoff uses 2**1. + """ + base = getattr(settings, "backoff_base_seconds", 10) + cap = getattr(settings, "backoff_max_seconds", 600) + exp = min((2 ** max(transient_attempts, 0)) * base, cap) + if retry_after is not None and retry_after > 0: + return float(min(max(exp, retry_after), cap)) + return float(exp) From 4ef87a395928d7a670dd8e938760bb0fd36324cf Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 00:12:17 +0300 Subject: [PATCH 08/13] feat(resilience): cheap preflight + 429/transient error classifier (ORCH-1) preflight.py: cached CLAUDE_BIN exists + claude --version (no tokens, no prompt-ping). error_classifier.py: classify_log_file -> transient|permanent from log tail + Retry-After parsing. --- src/error_classifier.py | 87 +++++++++++++++++++++++++++++++++++++++ src/preflight.py | 90 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) create mode 100644 src/error_classifier.py create mode 100644 src/preflight.py diff --git a/src/error_classifier.py b/src/error_classifier.py new file mode 100644 index 0000000..78fd5a4 --- /dev/null +++ b/src/error_classifier.py @@ -0,0 +1,87 @@ +"""ORCH-1 resilience: classify an agent failure as transient vs permanent. + +Rate limits / overload / network blips cannot be reliably predicted in advance, +so we classify *after the run* by scanning the agent's combined stdout/stderr log +(B-2 sends both to /app/data/runs/.log). + +- transient -> 429 / rate limit / overloaded / network / quota-exhausted etc. + => backoff + transient retry (separate counter, larger budget). +- permanent -> a genuine code fault / agent error + => normal attempts < max_attempts, then 'failed'. + +Also extracts a Retry-After hint (seconds) when the server provided one. +""" +import re + +# Case-insensitive substrings/patterns that signal a transient/rate-limit issue. +_TRANSIENT_PATTERNS = [ + r"\b429\b", + r"rate[\s_-]*limit", + r"rate_limit_error", + r"overloaded", + r"overloaded_error", + r"too many requests", + r"quota", + r"insufficient[_\s-]*quota", + r"retry[\s-]*after", + r"service unavailable", + r"\b503\b", + r"\b529\b", + r"timed out", + r"timeout", + r"connection (reset|refused|error|aborted)", + r"temporarily unavailable", + r"econnreset", + r"etimedout", +] + +_TRANSIENT_RE = re.compile("|".join(_TRANSIENT_PATTERNS), re.IGNORECASE) + +# Retry-After: header style ("Retry-After: 30") or JSON ("retry_after": 30) or +# "retry after 30 seconds". Returns the integer seconds. +_RETRY_AFTER_RE = re.compile( + r"retry[\s_-]*after[\"']?\s*[:=]?\s*[\"']?\s*(\d+)", + re.IGNORECASE, +) + + +def classify_text(text: str) -> str: + """Return 'transient' or 'permanent' for a chunk of log/stderr text.""" + if not text: + return "permanent" + return "transient" if _TRANSIENT_RE.search(text) else "permanent" + + +def parse_retry_after(text: str) -> int | None: + """Return Retry-After seconds if present in the text, else None.""" + if not text: + return None + m = _RETRY_AFTER_RE.search(text) + if m: + try: + return int(m.group(1)) + except (TypeError, ValueError): + return None + return None + + +def classify_log_file(path: str, tail_bytes: int = 16384) -> tuple[str, int | None]: + """Classify the tail of a log file. + + Reads the last `tail_bytes` of the log (rate-limit messages appear near the + end) and returns (classification, retry_after_seconds_or_None). + On any read error, treats it as 'permanent' (no special backoff). + """ + if not path: + return "permanent", None + try: + with open(path, "rb") as f: + try: + f.seek(-tail_bytes, 2) + except OSError: + f.seek(0) + data = f.read() + text = data.decode("utf-8", errors="replace") + except Exception: + return "permanent", None + return classify_text(text), parse_retry_after(text) diff --git a/src/preflight.py b/src/preflight.py new file mode 100644 index 0000000..717cee2 --- /dev/null +++ b/src/preflight.py @@ -0,0 +1,90 @@ +"""ORCH-1 resilience: cheap preflight check (CLI / network available?). + +Goal: before the worker claims a job, confirm the claude CLI binary and runtime +are reachable WITHOUT spending any tokens. We only do local/cheap checks: + + 1. os.path.exists(CLAUDE_BIN) -- instant + 2. `claude --version` (timeout 5s) -- spawns CLI, does NOT call the API + +The result is cached for `preflight_cache_ttl` seconds so we do not re-run +`claude --version` on every worker tick. + +🚫 We deliberately do NOT do a prompt ping (ping->pong) — that would burn the +rate limit and add latency. Preflight is local-only. +""" +import os +import time +import logging +import subprocess + +from .config import settings + +logger = logging.getLogger("orchestrator.preflight") + +_VERSION_TIMEOUT = 5 + + +class _PreflightCache: + def __init__(self): + self.ts: float = 0.0 + self.ok: bool = False + self.reason: str = "not checked yet" + + +_cache = _PreflightCache() + + +def _claude_bin() -> str: + return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe" + + +def _run_version(bin_path: str) -> tuple[bool, str]: + """`claude --version` — proves the CLI runs without touching the API.""" + try: + r = subprocess.run( + [bin_path, "--version"], + capture_output=True, + text=True, + timeout=_VERSION_TIMEOUT, + ) + if r.returncode == 0: + return True, (r.stdout or r.stderr or "").strip()[:120] or "ok" + return False, f"--version exit {r.returncode}: {(r.stderr or r.stdout).strip()[:120]}" + except subprocess.TimeoutExpired: + return False, f"--version timed out after {_VERSION_TIMEOUT}s" + except FileNotFoundError: + return False, "claude binary not found (FileNotFoundError)" + except Exception as e: # pragma: no cover - defensive + return False, f"--version error: {e}" + + +def _compute() -> tuple[bool, str]: + bin_path = _claude_bin() + if not os.path.exists(bin_path): + return False, f"CLAUDE_BIN not found: {bin_path}" + return _run_version(bin_path) + + +def check(force: bool = False) -> tuple[bool, str]: + """Return (ok, reason). Cached for preflight_cache_ttl seconds. + + force=True bypasses the cache (used by the breaker half-open probe / tests). + """ + now = time.time() + ttl = settings.preflight_cache_ttl + if not force and _cache.ts > 0 and (now - _cache.ts) < ttl: + return _cache.ok, _cache.reason + ok, reason = _compute() + _cache.ts = now + _cache.ok = ok + _cache.reason = reason + if not ok: + logger.warning(f"Preflight FAIL: {reason}") + return ok, reason + + +def reset_cache() -> None: + """Invalidate the cache (tests / forced recheck).""" + _cache.ts = 0.0 + _cache.ok = False + _cache.reason = "reset" From 90fdd19394508658a366b627604c970c0f374d48 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 00:12:17 +0300 Subject: [PATCH 09/13] feat(launcher): classify failures, backoff transient retry, breaker outcome (ORCH-1) _finalize_job classifies the run log: transient (429/overload) -> backoff requeue via mark_job_transient with separate transient_attempts budget honouring Retry-After; permanent -> normal attempts int: + """Exponential backoff for transient failures, honouring Retry-After. + + backoff = min(2^transient_attempts * base, max). If the server sent a + Retry-After, use the larger of the two (never poll sooner than asked). + """ + base = settings.backoff_base_seconds + cap = settings.backoff_max_seconds + backoff = min((2 ** max(transient_attempts, 0)) * base, cap) + if retry_after is not None and retry_after > 0: + backoff = max(backoff, min(retry_after, cap)) + return int(backoff) + + def _finalize_job(self, job_id: int, agent: str, run_id: int, exit_code, output_path=None): """ORCH-1: update the jobs row after the agent process finished. - exit_code == 0 -> done. - exit_code != 0 -> retry while attempts < max_attempts (back to 'queued'), - otherwise 'failed' + Telegram notification. - attempts were already incremented at claim time, so `attempts` reflects how - many times this job has been picked up. + exit_code == 0 -> done (and resets the breaker streak via on_outcome). + exit_code != 0 -> classify the failure from the run log tail (token-free): + - TRANSIENT (429/overload/network): backoff-requeue with available_at in + the future + a SEPARATE transient_attempts budget + (settings.transient_max_attempts), honouring Retry-After. Reported to + the breaker so it opens after N consecutive transient failures. + - PERMANENT (code fault): ordinary attempts < max_attempts requeue, + otherwise 'failed' + Telegram. """ from ..db import get_job, mark_job + from ..error_classifier import classify_log_file try: job = get_job(job_id) if not job: @@ -383,35 +400,93 @@ class AgentLauncher: if exit_code == 0: mark_job(job_id, "done", run_id=run_id) logger.info(f"Job {job_id} ({agent}) done (run_id={run_id})") + self._record_outcome(transient=False, recovered=True) return - attempts = job.get("attempts", 0) - max_attempts = job.get("max_attempts", 2) - err = f"agent {agent} exit_code={exit_code} (run_id={run_id})" - if attempts < max_attempts: - mark_job(job_id, "queued", run_id=run_id, error=err) - logger.warning( - f"Job {job_id} ({agent}) failed (exit={exit_code}), " - f"requeued (attempt {attempts}/{max_attempts})" - ) + # Classify the failure from the agent log tail (no token cost). + kind, retry_after = "permanent", None + log_path = output_path or f"/app/data/runs/{run_id}.log" + try: + kind, retry_after = classify_log_file(log_path) + except Exception: + pass + + if kind == "transient": + self._finalize_transient(job_id, agent, run_id, exit_code, job, retry_after) else: - mark_job(job_id, "failed", run_id=run_id, error=err) - logger.error( - f"Job {job_id} ({agent}) failed permanently after " - f"{attempts} attempts (exit={exit_code})" - ) - try: - from ..notifications import send_telegram - send_telegram( - f"\U0001f6a8 Job {job_id} ({agent}, repo {job.get('repo')}) " - f"failed after {attempts} attempts (exit={exit_code}). " - f"Logs: /app/data/runs/{run_id}.log" - ) - except Exception: - pass + self._finalize_permanent(job_id, agent, run_id, exit_code, job) except Exception as e: logger.error(f"Job {job_id}: _finalize_job error: {e}") + def _finalize_transient(self, job_id, agent, run_id, exit_code, job, retry_after): + """Transient (429/overload/net) failure -> backoff requeue or fail when budget out.""" + from ..db import mark_job, mark_job_transient + tattempts = job.get("transient_attempts", 0) + tmax = settings.transient_max_attempts + err = (f"transient (429/overload) agent {agent} exit={exit_code} " + f"(run_id={run_id}); retry_after={retry_after}") + self._record_outcome(transient=True, recovered=False) + if tattempts < tmax: + backoff = self._backoff_seconds(tattempts + 1, retry_after) + mark_job_transient(job_id, backoff, error=err) + logger.warning( + f"Job {job_id} ({agent}) TRANSIENT fail (exit={exit_code}), " + f"backoff {backoff}s, transient_attempt {tattempts + 1}/{tmax}" + ) + else: + mark_job(job_id, "failed", run_id=run_id, error=err) + logger.error( + f"Job {job_id} ({agent}) failed after {tattempts} transient attempts" + ) + self._notify_failed(job_id, agent, job, run_id, + f"transient (rate-limit) after {tattempts} attempts") + + def _finalize_permanent(self, job_id, agent, run_id, exit_code, job): + """Permanent (code-fault) failure -> normal attempts Date: Wed, 3 Jun 2026 00:12:17 +0300 Subject: [PATCH 10/13] feat(worker): preflight gate + circuit breaker + /queue resilience (ORCH-1) QueueWorker gates claims behind preflight and the CircuitBreaker (open -> pause, no CLI calls + Telegram alert; half-open probes one job; closed on recovery). Wires launcher.on_outcome. /queue exposes resilience snapshot. --- src/main.py | 1 + src/queue_worker.py | 181 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 161 insertions(+), 21 deletions(-) diff --git a/src/main.py b/src/main.py index 62e5ef9..1cebb7a 100644 --- a/src/main.py +++ b/src/main.py @@ -102,5 +102,6 @@ async def queue(): "counts": job_status_counts(), "max_concurrency": worker.max_concurrency, "poll_interval": worker.poll_interval, + "resilience": worker.status(), "recent": recent_jobs(10), } diff --git a/src/queue_worker.py b/src/queue_worker.py index 20106e9..ab3984e 100644 --- a/src/queue_worker.py +++ b/src/queue_worker.py @@ -1,56 +1,181 @@ -"""ORCH-1 (F-2b): background job-queue worker. +"""ORCH-1 (F-2b): background job-queue worker with resilience layer. A single background thread polls the `jobs` table and spawns agents: while running: + if breaker.open and not cooled_down: sleep; continue # don't touch CLI + if not preflight.ok: sleep; continue # CLI/net down -> wait while count_running_jobs() < max_concurrency: - job = claim_next_job() # atomic queued -> running + job = claim_next_job() # atomic queued -> running (available_at-gated) if not job: break - launcher.launch_job(job) # spawns claude (Popen) + monitor thread + launcher.launch_job(job) # spawns claude (Popen) + monitor thread sleep(poll_interval) -Design notes ------------- -- We use a plain daemon thread + threading.Event for shutdown rather than an - asyncio task: the launcher already manages its own monitor/watchdog threads and - blocking Popen, so a thread loop is the simplest, most robust fit. -- `launch_job()` is non-blocking: it spawns the process and returns immediately; - the monitor thread updates `jobs.status` to done/queued/failed on completion. - That status change frees a `count_running_jobs()` slot for the next claim. -- Restart-safe: queue-recovery (requeue_running_jobs) runs in main.py lifespan - BEFORE the worker starts, so jobs left 'running' by a dead worker get retried. +Resilience (ДОПОЛНЕНИЕ): + A. Preflight — cheap local CLI/net check (cached, no tokens) gates claiming. + B/C. The launcher classifies failures (transient vs permanent) and applies + backoff via available_at; the worker only needs to honour available_at + (claim_next_job does) and react to transient outcomes via the breaker. + D. Circuit breaker — N consecutive transient failures -> open (pause M minutes, + no CLI calls, Telegram alert) -> half-open (probe one job) -> closed. + +Design: plain daemon thread + threading.Event (the launcher already manages its +own monitor/watchdog threads + blocking Popen). """ +import time import logging import threading from .config import settings from .db import claim_next_job, count_running_jobs from .agents.launcher import launcher +from . import preflight logger = logging.getLogger("orchestrator.queue_worker") -class QueueWorker: - """Background worker that drains the persistent job queue.""" +class CircuitBreaker: + """Trips after `threshold` consecutive transient failures. - def __init__(self, max_concurrency: int = None, poll_interval: float = None): + States: closed -> (threshold transient) -> open -> (after pause) half-open + -> (recovered) closed | (transient again) open. + Thread-safe enough for our single-worker + monitor-thread callbacks (a lock + guards the counters). + """ + + def __init__(self, threshold: int = None, pause_seconds: int = None): + self.threshold = threshold if threshold is not None else settings.breaker_threshold + self.pause_seconds = ( + pause_seconds if pause_seconds is not None else settings.breaker_pause_seconds + ) + self._lock = threading.Lock() + self.state = "closed" # closed | open | half-open + self.consecutive_transient = 0 + self.opened_at = 0.0 + self._notify = None # optional callable(message) for alerts + + def set_notifier(self, fn): + self._notify = fn + + def record_transient(self): + with self._lock: + self.consecutive_transient += 1 + if self.state == "half-open": + # Probe failed -> re-open. + self._open("circuit re-opened: probe job hit transient again") + elif self.consecutive_transient >= self.threshold and self.state == "closed": + self._open( + f"circuit OPEN: {self.consecutive_transient} consecutive " + f"transient failures; pausing {self.pause_seconds}s (no CLI calls)" + ) + + def record_recovered(self): + with self._lock: + self.consecutive_transient = 0 + if self.state in ("half-open", "open"): + self.state = "closed" + logger.info("Circuit CLOSED: recovered") + + def record_permanent(self): + # A clean permanent (code-fault) failure breaks the transient streak. + with self._lock: + self.consecutive_transient = 0 + + def _open(self, msg: str): + self.state = "open" + self.opened_at = time.time() + logger.warning(msg) + if self._notify: + try: + self._notify(f"\U0001f534 {msg}") + except Exception: + pass + + def allow_claim(self) -> bool: + """Return True if the worker may attempt to claim/launch a job now. + + - closed -> yes. + - open -> no until pause elapsed; then transition to half-open (yes, one probe). + - half-open -> yes (the single probe). + """ + with self._lock: + if self.state == "closed": + return True + if self.state == "open": + if (time.time() - self.opened_at) >= self.pause_seconds: + self.state = "half-open" + logger.info("Circuit HALF-OPEN: probing one job") + return True + return False + # half-open: allow the probe. + return True + + def snapshot(self) -> dict: + with self._lock: + remaining = 0 + if self.state == "open": + remaining = max(0, int(self.pause_seconds - (time.time() - self.opened_at))) + return { + "state": self.state, + "consecutive_transient": self.consecutive_transient, + "pause_remaining_s": remaining, + } + + +class QueueWorker: + """Background worker that drains the persistent job queue (with resilience).""" + + def __init__(self, max_concurrency: int = None, poll_interval: float = None, + breaker: CircuitBreaker = None): self.max_concurrency = ( max_concurrency if max_concurrency is not None else settings.max_concurrency ) self.poll_interval = ( poll_interval if poll_interval is not None else settings.queue_poll_interval ) + self.breaker = breaker or CircuitBreaker() + self.last_preflight_ok = True + self.last_preflight_reason = "not checked" self._stop = threading.Event() self._thread: threading.Thread | None = None + # --- circuit breaker outcome callback wired into the launcher ---------- + def _on_outcome(self, transient: bool, recovered: bool): + if recovered: + self.breaker.record_recovered() + elif transient: + self.breaker.record_transient() + else: + self.breaker.record_permanent() + def _drain_once(self): - """Claim and launch jobs until concurrency is full or the queue is empty.""" + """Claim and launch jobs until concurrency is full or the queue is empty. + + Gated by the circuit breaker and preflight: if the breaker is open (and + not yet cooled down) or preflight fails, we do NOT claim — jobs stay + queued and no CLI/tokens are touched. + """ + if not self.breaker.allow_claim(): + return + ok, reason = preflight.check() + self.last_preflight_ok = ok + self.last_preflight_reason = reason + if not ok: + logger.info(f"Preflight not ok ({reason}) -> not claiming jobs this tick") + return + + # In half-open we only probe a single job, regardless of max_concurrency. + half_open = self.breaker.snapshot()["state"] == "half-open" + launched = 0 while not self._stop.is_set(): + if half_open and launched >= 1: + return if count_running_jobs() >= self.max_concurrency: return job = claim_next_job() if not job: return + launched += 1 try: run_id = launcher.launch_job(job) logger.info( @@ -58,8 +183,8 @@ class QueueWorker: f"repo {job['repo']}) -> run_id={run_id}" ) except Exception as e: - # Launch itself failed (e.g. repo missing): mark the job failed so it - # does not wedge as 'running' forever and block the slot. + # Launch itself failed (e.g. repo missing): treat as a permanent + # launch error so the job does not wedge as 'running' forever. logger.error(f"Worker failed to launch job {job['id']}: {e}") try: from .db import get_job, mark_job @@ -77,20 +202,26 @@ class QueueWorker: def _run(self): logger.info( f"Queue worker started (max_concurrency={self.max_concurrency}, " - f"poll_interval={self.poll_interval}s)" + f"poll_interval={self.poll_interval}s, breaker_threshold={self.breaker.threshold})" ) while not self._stop.is_set(): try: self._drain_once() except Exception as e: logger.error(f"Queue worker loop error: {e}") - # Sleep is interruptible by stop event for fast shutdown. self._stop.wait(self.poll_interval) logger.info("Queue worker stopped") def start(self): if self._thread and self._thread.is_alive(): return + # Wire breaker alerting + launcher outcome callback. + try: + from .notifications import send_telegram + self.breaker.set_notifier(send_telegram) + except Exception: + pass + launcher.on_outcome = self._on_outcome self._stop.clear() self._thread = threading.Thread( target=self._run, name="queue-worker", daemon=True @@ -102,6 +233,14 @@ class QueueWorker: if self._thread: self._thread.join(timeout=timeout) + def status(self) -> dict: + """Resilience snapshot for /queue.""" + return { + "breaker": self.breaker.snapshot(), + "preflight_ok": self.last_preflight_ok, + "preflight_reason": self.last_preflight_reason, + } + # Module-level singleton used by the FastAPI lifespan. worker = QueueWorker() From a613fd81808b5aa8bab4b86ee8440ca8f59028e7 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 00:12:17 +0300 Subject: [PATCH 11/13] test(resilience): 34 tests for preflight/classifier/backoff/breaker (ORCH-1) Covers preflight FAIL->queued + cache, transient/permanent classifier + Retry-After, exp backoff + available_at gating, launcher transient vs permanent finalize, circuit breaker open/half-open/closed. test_queue worker tests stub preflight OK. Popen never spawned. --- tests/test_queue.py | 6 + tests/test_resilience.py | 295 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 301 insertions(+) create mode 100644 tests/test_resilience.py diff --git a/tests/test_queue.py b/tests/test_queue.py index d3f1536..f6342e8 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -236,6 +236,12 @@ class TestObservability: # QueueWorker max_concurrency (launch_job fully mocked — no real Popen) # --------------------------------------------------------------------------- class TestWorkerConcurrency: + @pytest.fixture(autouse=True) + def _ok_preflight(self, monkeypatch): + # ORCH-1 resilience: the worker gates claims behind preflight; in tests there + # is no claude binary, so stub preflight OK to exercise pure queue/concurrency. + monkeypatch.setattr("src.queue_worker.preflight.check", lambda *a, **k: (True, "ok")) + def test_worker_respects_max_concurrency(self, monkeypatch): from src.queue_worker import QueueWorker diff --git a/tests/test_resilience.py b/tests/test_resilience.py new file mode 100644 index 0000000..1d72117 --- /dev/null +++ b/tests/test_resilience.py @@ -0,0 +1,295 @@ +"""ORCH-1 resilience tests: preflight, 429-classifier, backoff, circuit breaker. + +No real claude/Popen is ever spawned: preflight subprocess and launcher.launch_job +are mocked. DB is a fresh per-test sqlite file. +""" +import os +import tempfile + +import pytest + +_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_resilience.db") +os.environ["ORCH_DB_PATH"] = _test_db +os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() +os.environ["ORCH_GITEA_TOKEN"] = "test-token" +os.environ["ORCH_PLANE_API_TOKEN"] = "test-token" + +import src.db as db +from src.db import ( + init_db, enqueue_job, claim_next_job, get_job, count_running_jobs, + mark_job_transient, +) +from src import preflight, error_classifier +from src.error_classifier import classify_text, parse_retry_after, classify_log_file +from src.queue_worker import QueueWorker, CircuitBreaker +from src.agents.launcher import AgentLauncher + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + monkeypatch.setattr(db.settings, "db_path", str(tmp_path / "res.db")) + init_db() + preflight.reset_cache() + yield + + +# --------------------------------------------------------------------------- +# A. Preflight +# --------------------------------------------------------------------------- +class TestPreflight: + def test_fail_when_bin_missing(self, monkeypatch): + monkeypatch.setattr(preflight, "_claude_bin", lambda: "/no/such/claude") + ok, reason = preflight.check(force=True) + assert ok is False + assert "not found" in reason.lower() + + def test_ok_when_version_succeeds(self, monkeypatch, tmp_path): + fake_bin = tmp_path / "claude" + fake_bin.write_text("#!/bin/sh\necho v1\n") + monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin)) + monkeypatch.setattr(preflight, "_run_version", lambda b: (True, "1.2.3")) + ok, reason = preflight.check(force=True) + assert ok is True + + def test_cache_does_not_recheck_within_ttl(self, monkeypatch, tmp_path): + fake_bin = tmp_path / "claude" + fake_bin.write_text("x") + monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin)) + monkeypatch.setattr(db.settings, "preflight_cache_ttl", 999) + + calls = {"n": 0} + + def counting_version(b): + calls["n"] += 1 + return True, "ok" + + monkeypatch.setattr(preflight, "_run_version", counting_version) + preflight.reset_cache() + preflight.check() # first -> runs version + preflight.check() # cached -> no extra version call + preflight.check() + assert calls["n"] == 1 + + def test_force_bypasses_cache(self, monkeypatch, tmp_path): + fake_bin = tmp_path / "claude" + fake_bin.write_text("x") + monkeypatch.setattr(preflight, "_claude_bin", lambda: str(fake_bin)) + calls = {"n": 0} + monkeypatch.setattr(preflight, "_run_version", + lambda b: (calls.__setitem__("n", calls["n"] + 1), (True, "ok"))[1]) + preflight.reset_cache() + preflight.check() + preflight.check(force=True) + assert calls["n"] == 2 + + def test_worker_does_not_claim_when_preflight_fails(self, monkeypatch): + # Preflight FAIL -> job stays queued, launch_job never called. + monkeypatch.setattr("src.queue_worker.preflight.check", + lambda *a, **k: (False, "down")) + called = {"launch": False} + monkeypatch.setattr("src.queue_worker.launcher.launch_job", + lambda job: called.__setitem__("launch", True)) + jid = enqueue_job("analyst", "r") + QueueWorker(max_concurrency=1, poll_interval=0.01)._drain_once() + assert called["launch"] is False + assert get_job(jid)["status"] == "queued" + assert count_running_jobs() == 0 + + +# --------------------------------------------------------------------------- +# B. Error classifier +# --------------------------------------------------------------------------- +class TestClassifier: + @pytest.mark.parametrize("text", [ + "Error: 429 Too Many Requests", + "anthropic rate limit exceeded", + "overloaded_error: server is overloaded", + "API quota exhausted", + "503 Service Unavailable", + "connection reset by peer", + ]) + def test_transient_patterns(self, text): + assert classify_text(text) == "transient" + + @pytest.mark.parametrize("text", [ + "Traceback: KeyError 'foo'", + "SyntaxError: invalid syntax", + "assertion failed in test", + "", + ]) + def test_permanent_patterns(self, text): + assert classify_text(text) == "permanent" + + def test_retry_after_header(self): + assert parse_retry_after("HTTP/1.1 429\nRetry-After: 42\n") == 42 + + def test_retry_after_json(self): + assert parse_retry_after('{"error":{"type":"rate_limit","retry_after": 7}}') == 7 + + def test_retry_after_absent(self): + assert parse_retry_after("just an error") is None + + def test_classify_log_file(self, tmp_path): + p = tmp_path / "run.log" + p.write_text("...lots of output...\n429 rate limit. Retry-After: 30\n") + kind, ra = classify_log_file(str(p)) + assert kind == "transient" + assert ra == 30 + + def test_classify_missing_file_is_permanent(self): + kind, ra = classify_log_file("/no/such/log") + assert kind == "permanent" + assert ra is None + + +# --------------------------------------------------------------------------- +# C. Backoff + available_at gating +# --------------------------------------------------------------------------- +class TestBackoff: + def test_backoff_grows_exponentially(self): + lr = AgentLauncher() + # base=10, cap=600 (defaults) + b1 = lr._backoff_seconds(1) + b2 = lr._backoff_seconds(2) + b3 = lr._backoff_seconds(3) + assert b1 == 20 # 2^1*10 + assert b2 == 40 # 2^2*10 + assert b3 == 80 # 2^3*10 + assert b2 > b1 and b3 > b2 + + def test_backoff_capped(self): + lr = AgentLauncher() + assert lr._backoff_seconds(20) == 600 # capped at backoff_max_seconds + + def test_retry_after_respected_when_larger(self): + lr = AgentLauncher() + # transient_attempts=1 -> base backoff 20; Retry-After=120 wins. + assert lr._backoff_seconds(1, retry_after=120) == 120 + + def test_retry_after_ignored_when_smaller(self): + lr = AgentLauncher() + assert lr._backoff_seconds(3, retry_after=5) == 80 # backoff bigger + + def test_transient_requeue_sets_future_available_at_and_claim_skips(self): + jid = enqueue_job("developer", "r") + claim_next_job() + # Big backoff -> available_at far in the future. + mark_job_transient(jid, 3600, error="429") + job = get_job(jid) + assert job["status"] == "queued" + assert job["transient_attempts"] == 1 + assert job["available_at"] is not None + # claim must NOT pick it up while available_at is in the future. + assert claim_next_job() is None + + def test_transient_requeue_claimable_when_due(self): + jid = enqueue_job("developer", "r") + claim_next_job() + mark_job_transient(jid, -5, error="429") # available_at in the past + c = claim_next_job() + assert c is not None and c["id"] == jid + + +# --------------------------------------------------------------------------- +# D. Launcher transient/permanent finalize (no Popen) +# --------------------------------------------------------------------------- +class TestFinalizeClassified: + def test_transient_failure_backoff_requeue(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + log = tmp_path / "1.log" + log.write_text("Error 429 rate limit exceeded\n") + jid = enqueue_job("developer", "r", max_attempts=2) + claim_next_job() + AgentLauncher()._finalize_job(jid, "developer", run_id=1, exit_code=1, + output_path=str(log)) + job = get_job(jid) + assert job["status"] == "queued" + assert job["transient_attempts"] == 1 + assert job["available_at"] is not None # backoff-gated + assert job["attempts"] == 1 # code-fault budget NOT burned + + def test_permanent_failure_uses_normal_attempts(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + log = tmp_path / "2.log" + log.write_text("Traceback: ValueError\n") + jid = enqueue_job("developer", "r", max_attempts=2) + claim_next_job() + AgentLauncher()._finalize_job(jid, "developer", run_id=2, exit_code=1, + output_path=str(log)) + job = get_job(jid) + assert job["status"] == "queued" + assert job["transient_attempts"] == 0 # not transient + assert job["available_at"] is None # no backoff for code-fault + + def test_transient_exhausts_to_failed(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.notifications.send_telegram", lambda *a, **k: None) + monkeypatch.setattr(db.settings, "transient_max_attempts", 2) + log = tmp_path / "3.log" + log.write_text("overloaded_error\n") + lr = AgentLauncher() + jid = enqueue_job("developer", "r") + claim_next_job() + lr._finalize_job(jid, "developer", 1, exit_code=1, output_path=str(log)) + assert get_job(jid)["status"] == "queued" # transient 1 -> requeue + # force claimable and retry + mark_job_transient(jid, -1) # makes it due; transient=2 now + claim_next_job() + lr._finalize_job(jid, "developer", 2, exit_code=1, output_path=str(log)) + assert get_job(jid)["status"] == "failed" # transient budget exhausted + + +# --------------------------------------------------------------------------- +# E. Circuit breaker +# --------------------------------------------------------------------------- +class TestCircuitBreaker: + def test_opens_after_threshold(self): + cb = CircuitBreaker(threshold=3, pause_seconds=300) + assert cb.allow_claim() is True + cb.record_transient() + cb.record_transient() + assert cb.state == "closed" + cb.record_transient() # 3rd -> open + assert cb.state == "open" + assert cb.allow_claim() is False # paused, no CLI calls + + def test_recovered_resets_streak(self): + cb = CircuitBreaker(threshold=3) + cb.record_transient() + cb.record_transient() + cb.record_recovered() + assert cb.consecutive_transient == 0 + assert cb.state == "closed" + + def test_half_open_after_pause_then_closed_on_success(self, monkeypatch): + cb = CircuitBreaker(threshold=2, pause_seconds=300) + cb.record_transient() + cb.record_transient() # open + assert cb.state == "open" + # Simulate the pause elapsing. + cb.opened_at -= 301 + assert cb.allow_claim() is True # -> half-open (probe) + assert cb.state == "half-open" + cb.record_recovered() # probe succeeded + assert cb.state == "closed" + + def test_half_open_reopens_on_transient(self): + cb = CircuitBreaker(threshold=2, pause_seconds=300) + cb.record_transient(); cb.record_transient() # open + cb.opened_at -= 301 + cb.allow_claim() # half-open + assert cb.state == "half-open" + cb.record_transient() # probe failed -> re-open + assert cb.state == "open" + + def test_breaker_blocks_worker_claim(self, monkeypatch): + monkeypatch.setattr("src.queue_worker.preflight.check", + lambda *a, **k: (True, "ok")) + called = {"launch": False} + monkeypatch.setattr("src.queue_worker.launcher.launch_job", + lambda job: called.__setitem__("launch", True)) + cb = CircuitBreaker(threshold=1, pause_seconds=300) + cb.record_transient() # open immediately + w = QueueWorker(max_concurrency=1, poll_interval=0.01, breaker=cb) + enqueue_job("analyst", "r") + w._drain_once() + assert called["launch"] is False # breaker open -> no claim, no CLI From d0d47058b4a2d2fb25e66136ac15e5e78d3386b6 Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 00:12:17 +0300 Subject: [PATCH 12/13] docs(resilience): document preflight/429/backoff/breaker + env vars (ORCH-1) --- README.md | 12 ++++++++++- docs/ORCH-1_JOB_QUEUE.md | 46 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9b47539..f3c2f4b 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,12 @@ uvicorn src.main:app --reload --port 8500 | `ORCH_DB_PATH` | SQLite path | `/app/data/orchestrator.db` | | `ORCH_MAX_CONCURRENCY` | Сколько jobs воркер запускает параллельно (ORCH-1) | `1` | | `ORCH_QUEUE_POLL_INTERVAL` | Период опроса очереди воркером, сек (ORCH-1) | `2.0` | +| `ORCH_PREFLIGHT_CACHE_TTL` | Кэш preflight (CLI/net), сек (ORCH-1 resilience) | `45` | +| `ORCH_BACKOFF_BASE_SECONDS` | База exp-backoff для transient (429) | `10` | +| `ORCH_BACKOFF_MAX_SECONDS` | Потолок backoff | `600` | +| `ORCH_TRANSIENT_MAX_ATTEMPTS` | Ретраи для 429/недоступности | `5` | +| `ORCH_BREAKER_THRESHOLD` | transient подряд до открытия breaker | `3` | +| `ORCH_BREAKER_PAUSE_SECONDS` | Пауза при открытом breaker | `300` | ## Очередь задач (ORCH-1 / F-2b) @@ -128,7 +134,11 @@ Webhook-хэндлеры больше не спавнят claude-агентов потом `failed` + Telegram-нотификация. Статусы job: `queued → running → done | failed`. Наблюдаемость — через `GET /queue`. -Подробности: `docs/ORCH-1_JOB_QUEUE.md`. + +**Resilience-слой:** дешёвый preflight (CLI/net, кэш, без токенов) гейтит claim; +429/overload детектится по логу (transient vs permanent), transient ретраится с +exp-backoff (`available_at`, Retry-After); circuit breaker паузит воркер после N +transient подряд. Подробности: `docs/ORCH-1_JOB_QUEUE.md`. ## Multi-repo: реестр проектов (ORCH-6) diff --git a/docs/ORCH-1_JOB_QUEUE.md b/docs/ORCH-1_JOB_QUEUE.md index eeed34c..629751d 100644 --- a/docs/ORCH-1_JOB_QUEUE.md +++ b/docs/ORCH-1_JOB_QUEUE.md @@ -79,5 +79,49 @@ UPDATE jobs SET status='running', attempts=attempts+1, started_at=datetime('now' IMG=$(docker inspect orchestrator --format '{{.Config.Image}}') docker run --rm -v /home/slin/repos/orchestrator:/code -w /code \ --entrypoint python3 $IMG -m pytest tests/ -q -# 76 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError) +# 110 passed, 9 failed (pre-existing test_webhooks 401/signature/TypeError) ``` + +--- + +## Resilience-слой (ДОПОЛНЕНИЕ: preflight + 429 + backoff + circuit breaker) + +Надёжность очереди против недоступности CLI и rate-limit. Два РАЗНЫХ класса +проблем лечатся по-разному. + +### A. Дешёвый preflight (`src/preflight.py`) — не жжёт токены +Перед claim воркер проверяет: `os.path.exists(CLAUDE_BIN)` + `claude --version` +(timeout 5с, токены НЕ тратит). Результат кэшируется `preflight_cache_ttl` (45с). +FAIL → воркер НЕ claim’ит (job остаётся `queued`), ждёт. 🚫 НЕТ prompt-ping. + +### B. 429 — детект НА ВЫХОДЕ (`src/error_classifier.py`) +rate-limit нельзя предсказать — классифицируем по логу прогона. `classify_log_file` +читает хвост лога (16KB), ищет `429/rate limit/overloaded/quota/503/529/timeout/...` +→ `transient` или `permanent`. Извлекает `Retry-After`. + +- **transient** (429/сеть) → backoff-ретрай с ОТДЕЛЬНЫМ `transient_attempts` + (лимит `transient_max_attempts=5`) — не жжёт code-fault бюджет. +- **permanent** (code-fault) → обычные `attempts < max_attempts` (2), потом `failed`. + +### C. Backoff + `available_at` +Колонки `jobs.available_at TEXT` + `jobs.transient_attempts INTEGER` (миграция +`_ensure_column`). `claim_next_job`: `WHERE status='queued' AND (available_at IS NULL +OR available_at <= datetime('now'))`. При transient: `available_at = now + +min(2^n * base, max)` (base=10с, max=600с), `Retry-After` уважается (берёмся max). + +### D. Circuit breaker (`CircuitBreaker` в queue_worker) +N=3 transient подряд → **open**: воркер паузит `breaker_pause_seconds=300`, ВООБЩЕ +не дёргает CLI, Telegram-алерт. Через паузу → **half-open** (пробует 1 job); +ожил (exit 0) → **closed**; снова transient → опять open. Состояние в памяти +воркера, отражается в `/queue.resilience`. +Связь launcher→breaker — через callback `launcher.on_outcome` (без import-цикла). + +### Конфиг (config.py) +`preflight_cache_ttl=45`, `backoff_base_seconds=10`, `backoff_max_seconds=600`, +`transient_max_attempts=5`, `breaker_threshold=3`, `breaker_pause_seconds=300`. + +### Тесты +`tests/test_resilience.py` — 34 теста: preflight (FAIL→queued, кэш, force), +классификатор (transient/permanent/Retry-After), backoff (рост/cap/Retry-After, +`available_at` гейтинг), launcher transient/permanent finalize, breaker +(open/half-open/closed/re-open, блок claim). From c23f000c05e517d60808f064327e2d0bd1a601fb Mon Sep 17 00:00:00 2001 From: Dev Agent Date: Wed, 3 Jun 2026 00:13:44 +0300 Subject: [PATCH 13/13] fix(preflight): check the binary the launcher actually spawns (ORCH-1) Container ORCH_CLAUDE_BIN pointed at a non-existent /usr/bin/claude while the launcher spawns the hardcoded /opt/claude-code/bin/claude.exe. Preflight now follows AgentLauncher.CLAUDE_BIN (the genuinely executed path), so it no longer falsely blocks every job in production. --- src/preflight.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/preflight.py b/src/preflight.py index 717cee2..316fdce 100644 --- a/src/preflight.py +++ b/src/preflight.py @@ -35,7 +35,23 @@ _cache = _PreflightCache() def _claude_bin() -> str: - return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe" + """Resolve the claude binary preflight should check. + + Must match the binary the launcher actually spawns. The launcher hardcodes + AgentLauncher.CLAUDE_BIN for the real Popen, so we prefer that; we only fall + back to settings.claude_bin / a default if it is somehow unset. (Note: the + container's ORCH_CLAUDE_BIN may point elsewhere; preflight follows the path + that is genuinely executed, not the unused env override.) + """ + try: + from .agents.launcher import AgentLauncher + launcher_bin = getattr(AgentLauncher, "CLAUDE_BIN", None) + if launcher_bin and os.path.exists(launcher_bin): + return launcher_bin + # Launcher path not present -> fall back to configured/default. + return launcher_bin or getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe" + except Exception: + return getattr(settings, "claude_bin", None) or "/opt/claude-code/bin/claude.exe" def _run_version(bin_path: str) -> tuple[bool, str]: