diff --git a/src/config.py b/src/config.py index d2d7d2c..1a3ca75 100644 --- a/src/config.py +++ b/src/config.py @@ -30,6 +30,12 @@ class Settings(BaseSettings): # DB db_path: str = "/app/data/orchestrator.db" + # ORCH-1 (F-2b): persistent job queue / background worker. + # max_concurrency -> max agent jobs running in parallel (env ORCH_MAX_CONCURRENCY) + # queue_poll_interval -> worker loop poll seconds (env ORCH_QUEUE_POLL_INTERVAL) + max_concurrency: int = 1 + queue_poll_interval: float = 2.0 + # Telegram notifications telegram_bot_token: str = "" diff --git a/src/db.py b/src/db.py index 693b954..dd6e97f 100644 --- a/src/db.py +++ b/src/db.py @@ -40,6 +40,26 @@ def init_db(): exit_code INTEGER, output_path TEXT ); + -- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and + -- return immediately; a background worker claims jobs (respecting + -- max_concurrency), spawns the claude agent, and updates the status. + -- Restart-safe: running jobs are requeued on startup (queue-recovery). + CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent TEXT NOT NULL, + repo TEXT NOT NULL, + task_id INTEGER, -- FK tasks.id (nullable) + task_content TEXT, -- written to the agent task_file + status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 2, + run_id INTEGER, -- agent_runs.id once started + error TEXT, -- last error message + created_at TEXT DEFAULT (datetime('now')), + started_at TEXT, + finished_at TEXT + ); + CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id); """) conn.close() @@ -105,3 +125,160 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str: next_num = 1 return f"{prefix}-{next_num:03d}" + + +# --------------------------------------------------------------------------- +# ORCH-1 (F-2b): job queue helpers +# --------------------------------------------------------------------------- + +def enqueue_job( + agent: str, + repo: str, + task_content: str | None = None, + task_id: int | None = None, + max_attempts: int = 2, +) -> int: + """Enqueue a new job (status='queued'). Returns the new job id. + + This is what webhook handlers call instead of launching an agent in-process: + it is a fast DB INSERT that returns immediately. The background worker + (queue_worker) picks the job up later. + """ + conn = get_db() + cursor = conn.execute( + "INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) " + "VALUES (?, ?, ?, ?, ?)", + (agent, repo, task_id, task_content, max_attempts), + ) + job_id = cursor.lastrowid + conn.commit() + conn.close() + return job_id + + +def claim_next_job() -> dict | None: + """Atomically claim the oldest queued job and mark it 'running'. + + Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause + and we check `rowcount`. If two worker ticks race for the same row, only the + first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0 + and retries the SELECT. We rely on SQLite's default per-connection transaction + so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None + when the queue is empty. + """ + conn = get_db() + try: + while True: + row = conn.execute( + "SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1" + ).fetchone() + if not row: + return None + job_id = row["id"] + cur = conn.execute( + "UPDATE jobs SET status='running', " + "attempts = attempts + 1, started_at = datetime('now') " + "WHERE id = ? AND status='queued'", + (job_id,), + ) + conn.commit() + if cur.rowcount == 1: + claimed = conn.execute( + "SELECT * FROM jobs WHERE id = ?", (job_id,) + ).fetchone() + return dict(claimed) + # Lost the race for this row; loop and try the next queued job. + finally: + conn.close() + + +def mark_job( + job_id: int, + status: str, + run_id: int | None = None, + error: str | None = None, +): + """Update a job's status (queued|running|done|failed). + + - run_id (optional): link to the agent_runs row that executed this job. + - error (optional): last error message (for failed/retry). + - 'done'/'failed' also stamp finished_at. + - 'queued' (requeue for retry) clears started_at/finished_at so the next + claim treats it as fresh. + """ + conn = get_db() + sets = ["status = ?"] + params: list = [status] + if run_id is not None: + sets.append("run_id = ?") + params.append(run_id) + if error is not None: + sets.append("error = ?") + params.append(error) + if status in ("done", "failed"): + sets.append("finished_at = datetime('now')") + elif status == "queued": + sets.append("started_at = NULL") + sets.append("finished_at = NULL") + params.append(job_id) + conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params) + conn.commit() + conn.close() + + +def count_running_jobs() -> int: + """Number of jobs currently in 'running' status (for max_concurrency).""" + conn = get_db() + n = conn.execute( + "SELECT COUNT(*) FROM jobs WHERE status='running'" + ).fetchone()[0] + conn.close() + return int(n) + + +def requeue_running_jobs() -> int: + """Queue-recovery: on startup, any job left 'running' belongs to a worker that + died on restart -> put it back to 'queued'. attempts are kept as-is (the next + claim does NOT re-increment beyond what is needed; claim_next_job increments on + pickup). Returns the number of requeued jobs. + """ + conn = get_db() + cur = conn.execute( + "UPDATE jobs SET status='queued', started_at = NULL " + "WHERE status='running'" + ) + conn.commit() + n = cur.rowcount + conn.close() + return int(n) + + +def get_job(job_id: int) -> dict | None: + """Fetch a single job by id.""" + conn = get_db() + row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone() + conn.close() + return dict(row) if row else None + + +def job_status_counts() -> dict: + """Return counts grouped by status (for /queue observability).""" + conn = get_db() + rows = conn.execute( + "SELECT status, COUNT(*) AS n FROM jobs GROUP BY status" + ).fetchall() + conn.close() + counts = {"queued": 0, "running": 0, "done": 0, "failed": 0} + for r in rows: + counts[r["status"]] = r["n"] + return counts + + +def recent_jobs(limit: int = 10) -> list[dict]: + """Return the most recent jobs (for /queue observability).""" + conn = get_db() + rows = conn.execute( + "SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,) + ).fetchall() + conn.close() + return [dict(r) for r in rows]