Level A — merge/deploy serialization within one repo: reuse the existing ORCH-043/065 merge-lease (no new mechanism); the only new logic is an unconditional pre-merge rebase in check_branch_mergeable — under the held lease, auto_rebase_onto_main is ALWAYS called when premerge_rebase_always (default True), not just when the branch is behind. No-op on an up-to-date branch (rebase keeps HEAD, force-with-lease -> "Everything up-to-date", CI not triggered). Kill-switch off -> ORCH-043 behaviour 1:1. Level B — declarative task dependencies: additive job_deps table (CREATE ... IF NOT EXISTS, no live-DB migration); claim_next_job gate (NOT EXISTS) defers a job whose depends-on tasks are not yet 'done' without occupying a max_concurrency slot; inert on empty job_deps -> zero regression. New leaf src/task_deps.py (never-raise): is_task_ready (fail-open), DFS cycle detection + Blocked/alert, declare/ingest_plane_relations (db source never hits the network on the hot path), snapshot. Telegram waiting-line, /queue observability, reconciler skip + cycle backstop, reaper untouched. Invariants unchanged: STAGE_TRANSITIONS, QG_CHECKS registry (dep gate is a claim_next_job врезка, not a registered QG), DB schema of existing tables, HTTP endpoints; non-self repos remain a no-op on empty deps/scope. Flags: ORCH_PREMERGE_REBASE_ALWAYS, ORCH_TASK_DEPS_ENABLED, ORCH_TASK_DEPS_SOURCE. Docs: docs/architecture/README.md, CLAUDE.md, .env.example, CHANGELOG.md, adr-0015. Tests: tests/test_orch026_*.py (64 tests); full suite 991 green. Refs: ORCH-026 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
883 lines
34 KiB
Python
883 lines
34 KiB
Python
import sqlite3
|
|
import threading
|
|
from .config import settings
|
|
|
|
# ORCH-053 (F-2 anti-dup): process-wide lock guarding the SELECT-exists -> INSERT
|
|
# task-creation claim. The prod topology is a single uvicorn process per DB
|
|
# (staging/prod isolated), with the webhook running in uvicorn's asyncio thread
|
|
# and the reconciler in its own thread of the SAME process -> a threading.Lock
|
|
# covers both sides of the create race without a schema migration. See
|
|
# docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md §4.
|
|
_CREATE_TASK_LOCK = threading.Lock()
|
|
|
|
|
|
def get_db() -> sqlite3.Connection:
|
|
conn = sqlite3.connect(settings.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db():
|
|
conn = get_db()
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT DEFAULT (datetime('now')),
|
|
source TEXT NOT NULL,
|
|
event_type TEXT NOT NULL,
|
|
payload TEXT NOT NULL,
|
|
processed INTEGER DEFAULT 0
|
|
);
|
|
CREATE TABLE IF NOT EXISTS tasks (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
plane_id TEXT,
|
|
work_item_id TEXT,
|
|
repo TEXT NOT NULL,
|
|
branch TEXT,
|
|
stage TEXT DEFAULT 'created',
|
|
agent_running TEXT,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
updated_at TEXT DEFAULT (datetime('now')),
|
|
plane_issue_id TEXT
|
|
);
|
|
CREATE TABLE IF NOT EXISTS agent_runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
task_id INTEGER REFERENCES tasks(id),
|
|
agent TEXT NOT NULL,
|
|
started_at TEXT DEFAULT (datetime('now')),
|
|
finished_at TEXT,
|
|
exit_code INTEGER,
|
|
output_path TEXT
|
|
);
|
|
-- ORCH-1 (F-2b): persistent job queue. Webhook handlers enqueue a job and
|
|
-- return immediately; a background worker claims jobs (respecting
|
|
-- max_concurrency), spawns the claude agent, and updates the status.
|
|
-- Restart-safe: running jobs are requeued on startup (queue-recovery).
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
agent TEXT NOT NULL,
|
|
repo TEXT NOT NULL,
|
|
task_id INTEGER, -- FK tasks.id (nullable)
|
|
task_content TEXT, -- written to the agent task_file
|
|
status TEXT NOT NULL DEFAULT 'queued', -- queued|running|done|failed
|
|
attempts INTEGER NOT NULL DEFAULT 0,
|
|
max_attempts INTEGER NOT NULL DEFAULT 2,
|
|
run_id INTEGER, -- agent_runs.id once started
|
|
error TEXT, -- last error message
|
|
transient_attempts INTEGER NOT NULL DEFAULT 0, -- ORCH-1 resilience: 429/transient retries
|
|
available_at TEXT, -- ORCH-1 resilience: backoff gate (claim when <= now)
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
started_at TEXT,
|
|
finished_at TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status, id);
|
|
""")
|
|
# Lightweight migration: add resilience columns to a pre-existing jobs table
|
|
# (CREATE TABLE IF NOT EXISTS won't add columns to an already-created table).
|
|
_ensure_column(conn, "jobs", "transient_attempts", "INTEGER NOT NULL DEFAULT 0")
|
|
_ensure_column(conn, "jobs", "available_at", "TEXT")
|
|
# ORCH-065: pid of the spawned agent process, stamped in launcher._spawn next to
|
|
# run_id/started_at. The job-reaper uses it for Tier-1 liveness (os.kill(pid, 0))
|
|
# to detect a 'running' job whose process died before _finalize_job. Idempotent
|
|
# ALTER (no-op once present) -> safe on the live prod DB.
|
|
_ensure_column(conn, "jobs", "pid", "INTEGER")
|
|
# ORCH-5 (M-7): webhook delivery de-dup. Add events.delivery_id and a PARTIAL
|
|
# unique index. Partial (WHERE delivery_id IS NOT NULL) so pre-existing rows
|
|
# (which have NULL delivery_id) never collide with each other. Restart-safe:
|
|
# _ensure_column is a no-op once the column exists, and CREATE INDEX IF NOT
|
|
# EXISTS is a no-op once the index exists, so this is safe on the live prod DB.
|
|
_ensure_column(conn, "events", "delivery_id", "TEXT")
|
|
conn.execute(
|
|
"CREATE UNIQUE INDEX IF NOT EXISTS idx_events_delivery "
|
|
"ON events(delivery_id) WHERE delivery_id IS NOT NULL"
|
|
)
|
|
# Feature 4 (token usage): per-run token / cost accounting. Parsed from the
|
|
# claude --output-format json result by the launcher monitor. Idempotent
|
|
# ALTERs (no-op once the columns exist) so this is safe on the live prod DB.
|
|
_ensure_column(conn, "agent_runs", "input_tokens", "INTEGER")
|
|
_ensure_column(conn, "agent_runs", "output_tokens", "INTEGER")
|
|
_ensure_column(conn, "agent_runs", "cache_read_tokens", "INTEGER")
|
|
# Observability fix: also persist cache-CREATION input tokens. Claude CLI
|
|
# reports the real input split across input_tokens (fresh, ~tens) +
|
|
# cache_read_input_tokens (cache hit, millions) + cache_creation_input_tokens
|
|
# (writing new cache). Without this column the cache_creation slice is lost
|
|
# and the "X in" figure understates the true prompt size. Idempotent ALTER.
|
|
_ensure_column(conn, "agent_runs", "cache_creation_tokens", "INTEGER")
|
|
_ensure_column(conn, "agent_runs", "cost_usd", "REAL")
|
|
# Telegram live tracker (feat/telegram-live-tracker): persist the FULL model
|
|
# name (e.g. "tokenator/claude-opus-4-8") per agent_runs row so the tracker
|
|
# can render a short model tag per stage. Parsed from the run-log result JSON
|
|
# (modelUsage key) by the launcher monitor; NULL when unknown. Idempotent ALTER.
|
|
_ensure_column(conn, "agent_runs", "model", "TEXT")
|
|
# Telegram live tracker: one editable Telegram message per task. We store its
|
|
# message_id so each stage transition can editMessageText the same message
|
|
# instead of spamming a new one. Idempotent ALTER (safe on the live prod DB).
|
|
_ensure_column(conn, "tasks", "tracker_message_id", "INTEGER")
|
|
# Telegram live tracker: human-readable task title for the tracker header
|
|
# ("🛠️ ET-012 · <title>"). Populated from the Plane work-item name at task
|
|
# creation; falls back to the work_item_id when absent. Idempotent ALTER.
|
|
_ensure_column(conn, "tasks", "title", "TEXT")
|
|
# Telegram live tracker: "BRD review" is the only HUMAN gate time — the delta
|
|
# between "BRD ready / approve requested" and the analysis->architecture
|
|
# advance (human flipped Plane to Approved). Persisted on the task so the
|
|
# tracker can show "твоё время" without recomputing from activity history.
|
|
_ensure_column(conn, "tasks", "brd_review_started_at", "TEXT")
|
|
_ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT")
|
|
# ORCH-026 (Level B): declarative task dependencies. job_deps stores the
|
|
# directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The
|
|
# scheduler gate in claim_next_job keeps B queued until every A reaches
|
|
# tasks.stage='done'. Purely ADDITIVE (CREATE TABLE/INDEX IF NOT EXISTS, no
|
|
# change to jobs/tasks/agent_runs/events columns) -> idempotent and safe on
|
|
# the live shared prod DB (enduro-trails data untouched). The logical FK on
|
|
# tasks.id is intentional (no REFERENCES, mirrors jobs.task_id) so the
|
|
# migration cannot fail on a pre-existing DB. See 08-data-requirements.md.
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS job_deps (
|
|
task_id INTEGER NOT NULL,
|
|
depends_on_task_id INTEGER NOT NULL,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
PRIMARY KEY (task_id, depends_on_task_id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_job_deps_task ON job_deps(task_id);
|
|
CREATE INDEX IF NOT EXISTS idx_job_deps_depends ON job_deps(depends_on_task_id);
|
|
""")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _ensure_column(conn, table: str, column: str, decl: str):
|
|
"""Add a column to `table` if it does not already exist (idempotent migration)."""
|
|
cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
|
|
if column not in cols:
|
|
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}")
|
|
conn.commit()
|
|
|
|
|
|
def get_task_by_plane_id(plane_id: str) -> dict | None:
|
|
"""Find task by Plane work item ID (checks plane_id and plane_issue_id)."""
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?", (plane_id, plane_id)
|
|
).fetchone()
|
|
conn.close()
|
|
if row:
|
|
return dict(row)
|
|
return None
|
|
|
|
|
|
def get_task_by_repo_branch(repo: str, branch: str) -> dict | None:
|
|
"""Find task by repo and branch name."""
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT * FROM tasks WHERE repo = ? AND branch = ?", (repo, branch)
|
|
).fetchone()
|
|
conn.close()
|
|
if row:
|
|
return dict(row)
|
|
return None
|
|
|
|
|
|
def get_active_tasks_for_reconcile() -> list[dict]:
|
|
"""ORCH-053 (F-1): tasks eligible for the gate-side sweeper.
|
|
|
|
Returns every task whose stage is not terminal ('done'), each augmented with
|
|
``age_s`` = seconds since ``tasks.updated_at`` (computed in SQL against UTC
|
|
'now', matching how ``update_task_stage`` stamps ``updated_at``). The
|
|
reconciler applies the per-stage grace and active-job guard on top.
|
|
"""
|
|
conn = get_db()
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT *, "
|
|
"CAST(strftime('%s','now') - strftime('%s', updated_at) AS INTEGER) AS age_s "
|
|
"FROM tasks WHERE stage != 'done'"
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def get_development_tasks_by_repo(repo: str) -> list[dict]:
|
|
"""ORCH-053 (F-3): tasks of a repo currently on the 'development' stage.
|
|
|
|
Used as the sha->branch DB fallback in handle_ci_status: a CI-status webhook
|
|
whose branch could not be resolved (no branches[], empty
|
|
``git branch -r --contains``) is matched to the unique development task of
|
|
the repo (ambiguity -> caller leaves it unresolved).
|
|
"""
|
|
conn = get_db()
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT * FROM tasks WHERE repo = ? AND stage = 'development'", (repo,)
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def create_task_atomic(
|
|
plane_id: str,
|
|
work_item_id: str,
|
|
repo: str,
|
|
branch: str,
|
|
stage: str,
|
|
title: str,
|
|
) -> tuple[dict, bool]:
|
|
"""ORCH-053 (AC-4): atomically claim creation of a task for a plane_id.
|
|
|
|
Performs SELECT-exists -> INSERT under the process-wide ``_CREATE_TASK_LOCK``
|
|
so a race between the live Plane webhook and the F-2 reconciler (both seeing
|
|
"no task yet" for the same plane_id) cannot create two task rows / branches /
|
|
worktrees / starter analyst jobs.
|
|
|
|
Returns ``(row, created)``:
|
|
* ``created=True`` -> THIS caller inserted the row and owns the follow-up
|
|
work (branch / docs / analyst enqueue);
|
|
* ``created=False`` -> a task for this plane_id already existed (the other
|
|
racer won); ``row`` is the existing task and the caller must NOT duplicate
|
|
the follow-up work.
|
|
"""
|
|
with _CREATE_TASK_LOCK:
|
|
conn = get_db()
|
|
try:
|
|
existing = conn.execute(
|
|
"SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?",
|
|
(plane_id, plane_id),
|
|
).fetchone()
|
|
if existing:
|
|
return dict(existing), False
|
|
cur = conn.execute(
|
|
"INSERT INTO tasks "
|
|
"(plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(plane_id, work_item_id, repo, branch, stage, plane_id, title),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM tasks WHERE id = ?", (cur.lastrowid,)
|
|
).fetchone()
|
|
return dict(row), True
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_task_stage(task_id: int, stage: str):
|
|
"""Update task stage and timestamp."""
|
|
conn = get_db()
|
|
conn.execute(
|
|
"UPDATE tasks SET stage = ?, updated_at = datetime('now') WHERE id = ?",
|
|
(stage, task_id),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Telegram live tracker helpers (feat/telegram-live-tracker)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_tracker_message_id(task_id: int) -> int | None:
|
|
"""Return the stored Telegram tracker message_id for a task, or None."""
|
|
conn = get_db()
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT tracker_message_id FROM tasks WHERE id=?", (task_id,)
|
|
).fetchone()
|
|
finally:
|
|
conn.close()
|
|
return row[0] if row and row[0] is not None else None
|
|
|
|
|
|
def set_tracker_message_id(task_id: int, message_id: int) -> None:
|
|
"""Persist the Telegram tracker message_id for a task (idempotent overwrite)."""
|
|
conn = get_db()
|
|
try:
|
|
conn.execute(
|
|
"UPDATE tasks SET tracker_message_id=? WHERE id=?",
|
|
(message_id, task_id),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def mark_brd_review_started(task_id: int) -> None:
|
|
"""Stamp when BRD review (the human approve gate) started, if not already set.
|
|
|
|
Idempotent: only sets it the first time (a retried analyst run must not reset
|
|
the clock). The delta to brd_review_ended_at is the only "твоё время".
|
|
"""
|
|
conn = get_db()
|
|
try:
|
|
conn.execute(
|
|
"UPDATE tasks SET brd_review_started_at=datetime('now') "
|
|
"WHERE id=? AND brd_review_started_at IS NULL",
|
|
(task_id,),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def mark_brd_review_ended(task_id: int) -> None:
|
|
"""Stamp when BRD review ended (analysis->architecture advance / Approved).
|
|
|
|
Idempotent: only sets it the first time and only if a start exists.
|
|
"""
|
|
conn = get_db()
|
|
try:
|
|
conn.execute(
|
|
"UPDATE tasks SET brd_review_ended_at=datetime('now') "
|
|
"WHERE id=? AND brd_review_started_at IS NOT NULL "
|
|
"AND brd_review_ended_at IS NULL",
|
|
(task_id,),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_next_work_item_id(repo: str, prefix: str = "ET") -> str:
|
|
"""Generate next work item ID (e.g., ET-003 / ORCH-001).
|
|
|
|
ORCH-6: numbering is per (repo, prefix). The prefix comes from the project
|
|
registry (proj.work_item_prefix), so orchestrator issues number ORCH-001,
|
|
ORCH-002 independently of the ET sequence in enduro-trails. Default prefix
|
|
stays "ET" for backward compatibility with existing callers.
|
|
"""
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT work_item_id FROM tasks "
|
|
"WHERE repo = ? AND work_item_id LIKE ? AND work_item_id IS NOT NULL "
|
|
"ORDER BY id DESC LIMIT 1",
|
|
(repo, f"{prefix}-%"),
|
|
).fetchone()
|
|
conn.close()
|
|
|
|
if row and row["work_item_id"]:
|
|
# Parse <PREFIX>-003 -> 3, increment (keep the existing prefix).
|
|
existing_prefix, num = row["work_item_id"].rsplit("-", 1)
|
|
prefix = existing_prefix
|
|
next_num = int(num) + 1
|
|
else:
|
|
next_num = 1
|
|
|
|
return f"{prefix}-{next_num:03d}"
|
|
|
|
|
|
def ensure_unique_work_item_id(work_item_id: str, repo: str) -> str:
|
|
"""BUG 2a: guarantee work_item_id uniqueness within (repo) over M-6 derive.
|
|
|
|
M-6 derives the work_item_id from the Plane sequence_id. That number can
|
|
collide (e.g. an issue was deleted and the sequence reused, or two issues
|
|
map to the same number) -> the SAME ET-NNN gets handed to two different
|
|
tasks, which then physically share a branch/worktree slug prefix and step on
|
|
each other (see ET-006: task 8 and task 25).
|
|
|
|
This is a guard LAYERED ON TOP of the M-6 derive (it does NOT replace it):
|
|
given the derived id, if that exact <PREFIX>-NNN already exists in the tasks
|
|
table for this repo, walk forward (ET-007, ET-008, ...) until a free number
|
|
is found and return that instead. If the derived id is free, it is returned
|
|
unchanged.
|
|
"""
|
|
if not work_item_id or "-" not in work_item_id:
|
|
return work_item_id
|
|
prefix, num_str = work_item_id.rsplit("-", 1)
|
|
try:
|
|
num = int(num_str)
|
|
except ValueError:
|
|
return work_item_id
|
|
width = len(num_str)
|
|
|
|
conn = get_db()
|
|
try:
|
|
candidate = work_item_id
|
|
while conn.execute(
|
|
"SELECT 1 FROM tasks WHERE repo = ? AND work_item_id = ? LIMIT 1",
|
|
(repo, candidate),
|
|
).fetchone() is not None:
|
|
num += 1
|
|
candidate = f"{prefix}-{num:0{width}d}"
|
|
return candidate
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ORCH-5 (M-7): idempotent webhook event logging
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def insert_event_dedup(
|
|
source: str, event_type: str, payload: str, delivery_id: str
|
|
) -> bool:
|
|
"""Idempotently log a webhook event keyed by delivery_id.
|
|
|
|
Returns True if a NEW row was inserted (caller should dispatch the event) and
|
|
False if this delivery_id was already present (a duplicate delivery -> caller
|
|
must skip dispatch/enqueue). Uses INSERT OR IGNORE against the partial UNIQUE
|
|
index idx_events_delivery; rowcount==1 means the row was actually inserted.
|
|
"""
|
|
conn = get_db()
|
|
try:
|
|
cur = conn.execute(
|
|
"INSERT OR IGNORE INTO events (source, event_type, payload, delivery_id) "
|
|
"VALUES (?, ?, ?, ?)",
|
|
(source, event_type, payload, delivery_id),
|
|
)
|
|
conn.commit()
|
|
return cur.rowcount == 1
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ORCH-1 (F-2b): job queue helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def enqueue_job(
|
|
agent: str,
|
|
repo: str,
|
|
task_content: str | None = None,
|
|
task_id: int | None = None,
|
|
max_attempts: int = 2,
|
|
available_at_delay_s: int | None = None,
|
|
) -> int:
|
|
"""Enqueue a new job (status='queued'). Returns the new job id.
|
|
|
|
This is what webhook handlers call instead of launching an agent in-process:
|
|
it is a fast DB INSERT that returns immediately. The background worker
|
|
(queue_worker) picks the job up later.
|
|
|
|
ORCH-043 (merge-gate defer): when ``available_at_delay_s`` is given the job's
|
|
``available_at`` is set to ``now + delay`` so claim_next_job won't pick it up
|
|
until the delay elapses (re-uses the existing ORCH-1 backoff gate). Used to
|
|
re-queue the staging-deployer after a "merge-lock busy" defer without burning a
|
|
worker slot in a blocking wait.
|
|
"""
|
|
conn = get_db()
|
|
if available_at_delay_s is not None:
|
|
cursor = conn.execute(
|
|
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts, available_at) "
|
|
"VALUES (?, ?, ?, ?, ?, datetime('now', ?))",
|
|
(agent, repo, task_id, task_content, max_attempts,
|
|
f"+{int(available_at_delay_s)} seconds"),
|
|
)
|
|
else:
|
|
cursor = conn.execute(
|
|
"INSERT INTO jobs (agent, repo, task_id, task_content, max_attempts) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
(agent, repo, task_id, task_content, max_attempts),
|
|
)
|
|
job_id = cursor.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return job_id
|
|
|
|
|
|
def claim_next_job() -> dict | None:
|
|
"""Atomically claim the oldest queued job and mark it 'running'.
|
|
|
|
Atomicity: the UPDATE carries the `status='queued'` guard in its WHERE clause
|
|
and we check `rowcount`. If two worker ticks race for the same row, only the
|
|
first UPDATE flips it to 'running' (rowcount==1); the loser sees rowcount==0
|
|
and retries the SELECT. We rely on SQLite's default per-connection transaction
|
|
so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None
|
|
when the queue is empty.
|
|
"""
|
|
# ORCH-026 (Level B, B-2): scheduler dependency gate. When task_deps_enabled
|
|
# is on, a job whose task has an UNFINISHED declared dependency
|
|
# (job_deps.depends_on_task_id -> a task with stage != 'done') is NOT
|
|
# claimable -> it stays 'queued' without occupying a max_concurrency slot.
|
|
# Jobs with a NULL task_id (no task) or with no job_deps rows are unaffected
|
|
# (NOT EXISTS is True). Kill-switch off -> the clause is omitted -> 1:1 the
|
|
# ORCH-1 query. The gate reads only the DB (offline-safe hot path).
|
|
dep_gate = ""
|
|
if getattr(settings, "task_deps_enabled", False):
|
|
dep_gate = (
|
|
"AND NOT EXISTS ("
|
|
" SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
|
|
" WHERE d.task_id = jobs.task_id AND t.stage != 'done'"
|
|
") "
|
|
)
|
|
conn = get_db()
|
|
try:
|
|
while True:
|
|
row = conn.execute(
|
|
"SELECT id FROM jobs WHERE status='queued' "
|
|
"AND (available_at IS NULL OR available_at <= datetime('now')) "
|
|
f"{dep_gate}"
|
|
"ORDER BY id LIMIT 1"
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
job_id = row["id"]
|
|
cur = conn.execute(
|
|
"UPDATE jobs SET status='running', "
|
|
"attempts = attempts + 1, started_at = datetime('now') "
|
|
"WHERE id = ? AND status='queued'",
|
|
(job_id,),
|
|
)
|
|
conn.commit()
|
|
if cur.rowcount == 1:
|
|
claimed = conn.execute(
|
|
"SELECT * FROM jobs WHERE id = ?", (job_id,)
|
|
).fetchone()
|
|
return dict(claimed)
|
|
# Lost the race for this row; loop and try the next queued job.
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def mark_job_transient(job_id: int, available_at_sql_offset_seconds: int,
|
|
error: str | None = None) -> None:
|
|
"""ORCH-1 resilience: requeue a job after a *transient* failure (429/overload/net).
|
|
|
|
Increments `transient_attempts` (separate from the code-fault `attempts`),
|
|
sets status back to 'queued', and gates re-pickup via `available_at` =
|
|
now + backoff seconds. started_at/finished_at are cleared.
|
|
"""
|
|
conn = get_db()
|
|
sets = [
|
|
"status='queued'",
|
|
"transient_attempts = transient_attempts + 1",
|
|
"available_at = datetime('now', ?)",
|
|
"started_at = NULL",
|
|
"finished_at = NULL",
|
|
]
|
|
params: list = [f"+{int(available_at_sql_offset_seconds)} seconds"]
|
|
if error is not None:
|
|
sets.append("error = ?")
|
|
params.append(error)
|
|
params.append(job_id)
|
|
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def mark_job(
|
|
job_id: int,
|
|
status: str,
|
|
run_id: int | None = None,
|
|
error: str | None = None,
|
|
):
|
|
"""Update a job's status (queued|running|done|failed).
|
|
|
|
- run_id (optional): link to the agent_runs row that executed this job.
|
|
- error (optional): last error message (for failed/retry).
|
|
- 'done'/'failed' also stamp finished_at.
|
|
- 'queued' (requeue for retry) clears started_at/finished_at so the next
|
|
claim treats it as fresh.
|
|
"""
|
|
conn = get_db()
|
|
sets = ["status = ?"]
|
|
params: list = [status]
|
|
if run_id is not None:
|
|
sets.append("run_id = ?")
|
|
params.append(run_id)
|
|
if error is not None:
|
|
sets.append("error = ?")
|
|
params.append(error)
|
|
if status in ("done", "failed"):
|
|
sets.append("finished_at = datetime('now')")
|
|
elif status == "queued":
|
|
sets.append("started_at = NULL")
|
|
sets.append("finished_at = NULL")
|
|
params.append(job_id)
|
|
conn.execute(f"UPDATE jobs SET {', '.join(sets)} WHERE id = ?", params)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def has_active_job_for_task(task_id: int) -> bool:
|
|
"""True if the task already has a queued or running job.
|
|
|
|
Used by the status-only verdict model (handle_status_start) to guard against
|
|
double-launching an agent when a duplicate In Progress webhook arrives or a
|
|
job is still in flight. The events de-dup absorbs identical webhook bodies;
|
|
this guards against distinct webhooks while a job is pending/running.
|
|
"""
|
|
conn = get_db()
|
|
row = conn.execute(
|
|
"SELECT 1 FROM jobs WHERE task_id = ? AND status IN ('queued','running') LIMIT 1",
|
|
(task_id,),
|
|
).fetchone()
|
|
conn.close()
|
|
return row is not None
|
|
|
|
|
|
def count_running_jobs() -> int:
|
|
"""Number of jobs currently in 'running' status (for max_concurrency)."""
|
|
conn = get_db()
|
|
n = conn.execute(
|
|
"SELECT COUNT(*) FROM jobs WHERE status='running'"
|
|
).fetchone()[0]
|
|
conn.close()
|
|
return int(n)
|
|
|
|
|
|
def requeue_running_jobs() -> int:
|
|
"""Queue-recovery: on startup, any job left 'running' belongs to a worker that
|
|
died on restart -> put it back to 'queued'. attempts are kept as-is (the next
|
|
claim does NOT re-increment beyond what is needed; claim_next_job increments on
|
|
pickup). Returns the number of requeued jobs.
|
|
"""
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"UPDATE jobs SET status='queued', started_at = NULL "
|
|
"WHERE status='running'"
|
|
)
|
|
conn.commit()
|
|
n = cur.rowcount
|
|
conn.close()
|
|
return int(n)
|
|
|
|
|
|
def get_running_jobs() -> list[dict]:
|
|
"""ORCH-065: snapshot of every 'running' job for the job-reaper scan.
|
|
|
|
Each row carries the job columns plus four reaper inputs:
|
|
* ``running_age_s`` — seconds since ``started_at`` (Tier-3 backstop);
|
|
* ``exit_code`` — the linked ``agent_runs.exit_code`` (Tier-2: process
|
|
finished but the job is still 'running' -> monitor died mid-finalize);
|
|
* ``finished_at_run`` — the linked ``agent_runs.finished_at``;
|
|
* ``finished_age_s`` — seconds since ``agent_runs.finished_at`` (Tier-2
|
|
finalization grace: a LIVE monitor writes exit_code, THEN does git
|
|
push / PR / Plane comments before _finalize_job, so a freshly-finished
|
|
run is NOT yet a zombie — the reaper waits ``reaper_finalize_grace_s``).
|
|
|
|
A LEFT JOIN on ``run_id`` keeps jobs with no agent_runs row (exit_code NULL).
|
|
Read-only; never mutates. The reaper applies liveness/streak/backstop on top.
|
|
"""
|
|
conn = get_db()
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT j.*, "
|
|
"CAST(strftime('%s','now') - strftime('%s', j.started_at) AS INTEGER) "
|
|
" AS running_age_s, "
|
|
"r.exit_code AS exit_code, r.finished_at AS finished_at_run, "
|
|
"CAST(strftime('%s','now') - strftime('%s', r.finished_at) AS INTEGER) "
|
|
" AS finished_age_s "
|
|
"FROM jobs j LEFT JOIN agent_runs r ON r.id = j.run_id "
|
|
"WHERE j.status='running'"
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def reap_running_job(
|
|
job_id: int,
|
|
status: str,
|
|
run_id: int | None = None,
|
|
error: str | None = None,
|
|
) -> bool:
|
|
"""ORCH-065: atomic terminal flip of a RUNNING job by the job-reaper.
|
|
|
|
Mirrors ``mark_job`` but carries the ``status='running'`` guard in the WHERE
|
|
clause and reports ``rowcount`` so a late-arriving monitor / the startup
|
|
``requeue_running_jobs`` / a second reaper tick can never double-process the
|
|
same row (AC-5, restart-safe). Returns True iff THIS call won the flip
|
|
(rowcount == 1); False -> someone else already moved the row.
|
|
|
|
Status semantics match ``mark_job``: done/failed stamp ``finished_at``; queued
|
|
clears ``started_at``/``finished_at`` so the next claim treats it as fresh.
|
|
"""
|
|
conn = get_db()
|
|
try:
|
|
sets = ["status = ?"]
|
|
params: list = [status]
|
|
if run_id is not None:
|
|
sets.append("run_id = ?")
|
|
params.append(run_id)
|
|
if error is not None:
|
|
sets.append("error = ?")
|
|
params.append(error)
|
|
if status in ("done", "failed"):
|
|
sets.append("finished_at = datetime('now')")
|
|
elif status == "queued":
|
|
sets.append("started_at = NULL")
|
|
sets.append("finished_at = NULL")
|
|
params.append(job_id)
|
|
cur = conn.execute(
|
|
f"UPDATE jobs SET {', '.join(sets)} WHERE id = ? AND status='running'",
|
|
params,
|
|
)
|
|
conn.commit()
|
|
return cur.rowcount == 1
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_job(job_id: int) -> dict | None:
|
|
"""Fetch a single job by id."""
|
|
conn = get_db()
|
|
row = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone()
|
|
conn.close()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def job_status_counts() -> dict:
|
|
"""Return counts grouped by status (for /queue observability)."""
|
|
conn = get_db()
|
|
rows = conn.execute(
|
|
"SELECT status, COUNT(*) AS n FROM jobs GROUP BY status"
|
|
).fetchall()
|
|
conn.close()
|
|
counts = {"queued": 0, "running": 0, "done": 0, "failed": 0}
|
|
for r in rows:
|
|
counts[r["status"]] = r["n"]
|
|
return counts
|
|
|
|
|
|
def recent_jobs(limit: int = 10) -> list[dict]:
|
|
"""Return the most recent jobs (for /queue observability)."""
|
|
conn = get_db()
|
|
rows = conn.execute(
|
|
"SELECT * FROM jobs ORDER BY id DESC LIMIT ?", (limit,)
|
|
).fetchall()
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ORCH-026 (Level B): declarative task-dependency helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def add_dependency(task_id: int, depends_on_task_id: int) -> bool:
|
|
"""Declare that task ``task_id`` (B) is blocked-by ``depends_on_task_id`` (A).
|
|
|
|
Idempotent INSERT OR IGNORE against the job_deps PK (re-declaring the same
|
|
edge is a no-op). A self-edge (task depends on itself) is rejected — it would
|
|
deadlock the task forever and can never be satisfied. never-raise
|
|
(self-hosting safety, AC-G1): any DB error -> returns False, the caller must
|
|
not crash the webhook / worker. Returns True iff a NEW edge row was inserted.
|
|
"""
|
|
if task_id is None or depends_on_task_id is None:
|
|
return False
|
|
if task_id == depends_on_task_id:
|
|
return False
|
|
try:
|
|
conn = get_db()
|
|
try:
|
|
cur = conn.execute(
|
|
"INSERT OR IGNORE INTO job_deps (task_id, depends_on_task_id) "
|
|
"VALUES (?, ?)",
|
|
(task_id, depends_on_task_id),
|
|
)
|
|
conn.commit()
|
|
return cur.rowcount == 1
|
|
finally:
|
|
conn.close()
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def get_dependencies(task_id: int) -> list[int]:
|
|
"""Return the list of depends_on_task_id (A) that ``task_id`` (B) waits for.
|
|
|
|
never-raise: any DB error -> [] (conservative: caller treats the task as
|
|
having no declared dependency rather than crashing).
|
|
"""
|
|
try:
|
|
conn = get_db()
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT depends_on_task_id FROM job_deps WHERE task_id = ?",
|
|
(task_id,),
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [r[0] for r in rows]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def get_dependency_edges() -> list[tuple[int, int]]:
|
|
"""Return ALL declared edges as ``(task_id, depends_on_task_id)`` tuples.
|
|
|
|
Used by the cycle detector (DFS over the whole declared graph) and the
|
|
/queue snapshot. never-raise -> [] on any DB error.
|
|
"""
|
|
try:
|
|
conn = get_db()
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT task_id, depends_on_task_id FROM job_deps"
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [(r[0], r[1]) for r in rows]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def get_unfinished_dependencies(task_id: int) -> list[dict]:
|
|
"""Return the UNFINISHED dependencies of ``task_id`` (A's not yet 'done').
|
|
|
|
Each dict carries the predecessor's ``id``, ``work_item_id`` and ``stage``
|
|
so the readiness gate / Telegram waiting-line can name what B is waiting for.
|
|
never-raise -> [] on any DB error (treated as "ready", consistent with the
|
|
scheduler omitting the gate on failure).
|
|
"""
|
|
try:
|
|
conn = get_db()
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT t.id AS id, t.work_item_id AS work_item_id, t.stage AS stage "
|
|
"FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id "
|
|
"WHERE d.task_id = ? AND t.stage != 'done'",
|
|
(task_id,),
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ORCH-1b (resilience): transient backoff helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def requeue_job_transient(job_id: int, delay_seconds: float, error: str | None = None):
|
|
"""ORCH-1b: requeue a job after a TRANSIENT (429/overload/network) failure.
|
|
|
|
Unlike a code-fault requeue, this:
|
|
- increments `transient_attempts` (a separate budget from code-fault attempts)
|
|
- sets `available_at = now + delay_seconds` so claim_next_job won't pick it
|
|
up until the backoff window elapses
|
|
- sets status back to 'queued' and clears started_at/finished_at
|
|
|
|
delay_seconds is computed by the caller (exp backoff, capped, Retry-After).
|
|
"""
|
|
conn = get_db()
|
|
conn.execute(
|
|
"UPDATE jobs SET status='queued', "
|
|
"transient_attempts = transient_attempts + 1, "
|
|
"available_at = datetime('now', ? || ' seconds'), "
|
|
"started_at = NULL, finished_at = NULL, "
|
|
"error = COALESCE(?, error) "
|
|
"WHERE id = ?",
|
|
(f"+{int(round(delay_seconds))}", error, job_id),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def compute_backoff(transient_attempts: int, retry_after: float | None = None) -> float:
|
|
"""ORCH-1b: exponential backoff (seconds) for a transient failure.
|
|
|
|
delay = min(2**transient_attempts * base, max). If the server sent a
|
|
Retry-After hint we honour it as a floor (use the larger of the two so we
|
|
never poll sooner than the server asked).
|
|
|
|
`transient_attempts` is the count AFTER this failure (i.e. how many transient
|
|
failures have occurred), so the first backoff uses 2**1.
|
|
"""
|
|
base = getattr(settings, "backoff_base_seconds", 10)
|
|
cap = getattr(settings, "backoff_max_seconds", 600)
|
|
exp = min((2 ** max(transient_attempts, 0)) * base, cap)
|
|
if retry_after is not None and retry_after > 0:
|
|
return float(min(max(exp, retry_after), cap))
|
|
return float(exp)
|