feat(queue): add jobs table + queue helpers and config (ORCH-1)
Persistent SQLite job queue (F-2b): jobs table + idx, atomic claim_next_job, enqueue/mark/count/requeue/get helpers. New settings max_concurrency (ORCH_MAX_CONCURRENCY) and queue_poll_interval (ORCH_QUEUE_POLL_INTERVAL).
This commit is contained in:
@@ -30,6 +30,12 @@ class Settings(BaseSettings):
|
||||
# DB
|
||||
db_path: str = "/app/data/orchestrator.db"
|
||||
|
||||
# ORCH-1 (F-2b): persistent job queue / background worker.
|
||||
# max_concurrency -> max agent jobs running in parallel (env ORCH_MAX_CONCURRENCY)
|
||||
# queue_poll_interval -> worker loop poll seconds (env ORCH_QUEUE_POLL_INTERVAL)
|
||||
max_concurrency: int = 1
|
||||
queue_poll_interval: float = 2.0
|
||||
|
||||
|
||||
# Telegram notifications
|
||||
telegram_bot_token: str = ""
|
||||
|
||||
177
src/db.py
177
src/db.py
@@ -40,6 +40,26 @@ def init_db():
|
||||
exit_code INTEGER,
|
||||
output_path TEXT
|
||||
);
|
||||
-- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and
|
||||
-- return immediately; a background worker claims jobs (respecting
|
||||
-- max_concurrency), spawns the claude agent, and updates the status.
|
||||
-- Restart-safe: running jobs are requeued on startup (queue-recovery).
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
agent TEXT NOT NULL,
|
||||
repo TEXT NOT NULL,
|
||||
task_id INTEGER, -- FK tasks.id (nullable)
|
||||
task_content TEXT, -- written to the agent task_file
|
||||
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
max_attempts INTEGER NOT NULL DEFAULT 2,
|
||||
run_id INTEGER, -- agent_runs.id once started
|
||||
error TEXT, -- last error message
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
started_at TEXT,
|
||||
finished_at TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
|
||||
""")
|
||||
conn.close()
|
||||
|
||||
@@ -105,3 +125,160 @@ def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
||||
next_num = 1
|
||||
|
||||
return f"{prefix}-{next_num:03d}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-1 (F-2b): job queue helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def enqueue_job(
|
||||
agent: str,
|
||||
repo: str,
|
||||
task_content: str | None = None,
|
||||
task_id: int | None = None,
|
||||
max_attempts: int = 2,
|
||||
) -> int:
|
||||
"""Enqueue a new job (status='queued'). Returns the new job id.
|
||||
|
||||
This is what webhook handlers call instead of launching an agent in-process:
|
||||
it is a fast DB INSERT that returns immediately. The background worker
|
||||
(queue_worker) picks the job up later.
|
||||
"""
|
||||
conn = get_db()
|
||||
cursor = conn.execute(
|
||||
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
(agent, repo, task_id, task_content, max_attempts),
|
||||
)
|
||||
job_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return job_id
|
||||
|
||||
|
||||
def claim_next_job() -> dict | None:
|
||||
"""Atomically claim the oldest queued job and mark it 'running'.
|
||||
|
||||
Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause
|
||||
and we check `rowcount`. If two worker ticks race for the same row, only the
|
||||
first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0
|
||||
and retries the SELECT. We rely on SQLite's default per-connection transaction
|
||||
so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None
|
||||
when the queue is empty.
|
||||
"""
|
||||
conn = get_db()
|
||||
try:
|
||||
while True:
|
||||
row = conn.execute(
|
||||
"SELECT id FROM jobs WHERE status='queued' ORDER BY id LIMIT 1"
|
||||
).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
job_id = row["id"]
|
||||
cur = conn.execute(
|
||||
"UPDATE jobs SET status='running', "
|
||||
"attempts = attempts + 1, started_at = datetime('now') "
|
||||
"WHERE id = ? AND status='queued'",
|
||||
(job_id,),
|
||||
)
|
||||
conn.commit()
|
||||
if cur.rowcount == 1:
|
||||
claimed = conn.execute(
|
||||
"SELECT * FROM jobs WHERE id = ?", (job_id,)
|
||||
).fetchone()
|
||||
return dict(claimed)
|
||||
# Lost the race for this row; loop and try the next queued job.
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def mark_job(
|
||||
job_id: int,
|
||||
status: str,
|
||||
run_id: int | None = None,
|
||||
error: str | None = None,
|
||||
):
|
||||
"""Update a job's status (queued|running|done|failed).
|
||||
|
||||
- run_id (optional): link to the agent_runs row that executed this job.
|
||||
- error (optional): last error message (for failed/retry).
|
||||
- 'done'/'failed' also stamp finished_at.
|
||||
- 'queued' (requeue for retry) clears started_at/finished_at so the next
|
||||
claim treats it as fresh.
|
||||
"""
|
||||
conn = get_db()
|
||||
sets = ["status = ?"]
|
||||
params: list = [status]
|
||||
if run_id is not None:
|
||||
sets.append("run_id = ?")
|
||||
params.append(run_id)
|
||||
if error is not None:
|
||||
sets.append("error = ?")
|
||||
params.append(error)
|
||||
if status in ("done", "failed"):
|
||||
sets.append("finished_at = datetime('now')")
|
||||
elif status == "queued":
|
||||
sets.append("started_at = NULL")
|
||||
sets.append("finished_at = NULL")
|
||||
params.append(job_id)
|
||||
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def count_running_jobs() -> int:
|
||||
"""Number of jobs currently in 'running' status (for max_concurrency)."""
|
||||
conn = get_db()
|
||||
n = conn.execute(
|
||||
"SELECT COUNT(*) FROM jobs WHERE status='running'"
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
return int(n)
|
||||
|
||||
|
||||
def requeue_running_jobs() -> int:
|
||||
"""Queue-recovery: on startup, any job left 'running' belongs to a worker that
|
||||
died on restart -> put it back to 'queued'. attempts are kept as-is (the next
|
||||
claim does NOT re-increment beyond what is needed; claim_next_job increments on
|
||||
pickup). Returns the number of requeued jobs.
|
||||
"""
|
||||
conn = get_db()
|
||||
cur = conn.execute(
|
||||
"UPDATE jobs SET status='queued', started_at = NULL "
|
||||
"WHERE status='running'"
|
||||
)
|
||||
conn.commit()
|
||||
n = cur.rowcount
|
||||
conn.close()
|
||||
return int(n)
|
||||
|
||||
|
||||
def get_job(job_id: int) -> dict | None:
|
||||
"""Fetch a single job by id."""
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
|
||||
conn.close()
|
||||
return dict(row) if row else None
|
||||
|
||||
|
||||
def job_status_counts() -> dict:
|
||||
"""Return counts grouped by status (for /queue observability)."""
|
||||
conn = get_db()
|
||||
rows = conn.execute(
|
||||
"SELECT status, COUNT(*) AS n FROM jobs GROUP BY status"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0}
|
||||
for r in rows:
|
||||
counts[r["status"]] = r["n"]
|
||||
return counts
|
||||
|
||||
|
||||
def recent_jobs(limit: int = 10) -> list[dict]:
|
||||
"""Return the most recent jobs (for /queue observability)."""
|
||||
conn = get_db()
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,)
|
||||
).fetchall()
|
||||
conn.close()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
Reference in New Issue
Block a user