Merge remote-tracking branch 'origin/main' into feature/ORCH-036-orch-36-deploy-b
All checks were successful
CI / test (push) Successful in 16s
CI / test (pull_request) Successful in 14s

# Conflicts:
#	.env.example
#	CHANGELOG.md
#	docs/architecture/README.md
#	docs/operations/INFRA.md
#	src/config.py
This commit is contained in:
stream
2026-06-07 00:22:19 +03:00
31 changed files with 2735 additions and 25 deletions

View File

@@ -195,6 +195,27 @@ class Settings(BaseSettings):
deploy_prod_target_image: str = "orchestrator-orchestrator"
deploy_prod_compose_profile: str = ""
deploy_prod_prev_image_file: str = ".deploy-prev-image-prod"
# ORCH-053: stuck-task reconciler (sweeper for lost webhooks). A background
# daemon thread reconciles the "source of truth (gate / Plane) != task stage"
# drift left behind by a dropped webhook (502 on rebuild, no Plane/Gitea
# retries, unresolved sha->branch). See docs/architecture/adr/adr-0007-reconciler.md.
# reconcile_enabled -> global kill-switch (self-hosting safety,
# staged rollout, env ORCH_RECONCILE_ENABLED).
# reconcile_interval_s -> background sweep period (seconds).
# reconcile_plane_enabled -> separate flag for the F-2 Plane-API poll so
# only the plane branch can be muted.
# reconcile_grace_default_s -> default "stuck" threshold on tasks.updated_at.
# reconcile_grace_overrides_json -> JSON object of per-stage thresholds, e.g.
# {"analysis": 1800, "development": 300}. Invalid
# JSON -> default (mirrors agent_timeout_overrides_json).
# reconcile_notify_unblock -> send a Telegram message when a stuck task is
# unblocked (F-4 observability).
reconcile_enabled: bool = True
reconcile_interval_s: int = 120
reconcile_plane_enabled: bool = True
reconcile_grace_default_s: int = 600
reconcile_grace_overrides_json: str = ""
reconcile_notify_unblock: bool = True
# Telegram notifications
telegram_bot_token: str = ""

View File

@@ -1,6 +1,15 @@
import sqlite3
import threading
from .config import settings
# ORCH-053 (F-2 anti-dup): process-wide lock guarding the SELECT-exists -> INSERT
# task-creation claim. The prod topology is a single uvicorn process per DB
# (staging/prod isolated), with the webhook running in uvicorn's asyncio thread
# and the reconciler in its own thread of the SAME process -> a threading.Lock
# covers both sides of the create race without a schema migration. See
# docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md §4.
_CREATE_TASK_LOCK = threading.Lock()
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(settings.db_path)
@@ -145,6 +154,90 @@ def get_task_by_repo_branch(repo: str, branch: str) -> dict | None:
return None
def get_active_tasks_for_reconcile() -> list[dict]:
"""ORCH-053 (F-1): tasks eligible for the gate-side sweeper.
Returns every task whose stage is not terminal ('done'), each augmented with
``age_s`` = seconds since ``tasks.updated_at`` (computed in SQL against UTC
'now', matching how ``update_task_stage`` stamps ``updated_at``). The
reconciler applies the per-stage grace and active-job guard on top.
"""
conn = get_db()
try:
rows = conn.execute(
"SELECT *, "
"CAST(strftime('%s','now') - strftime('%s', updated_at) AS INTEGER) AS age_s "
"FROM tasks WHERE stage != 'done'"
).fetchall()
finally:
conn.close()
return [dict(r) for r in rows]
def get_development_tasks_by_repo(repo: str) -> list[dict]:
"""ORCH-053 (F-3): tasks of a repo currently on the 'development' stage.
Used as the sha->branch DB fallback in handle_ci_status: a CI-status webhook
whose branch could not be resolved (no branches[], empty
``git branch -r --contains``) is matched to the unique development task of
the repo (ambiguity -> caller leaves it unresolved).
"""
conn = get_db()
try:
rows = conn.execute(
"SELECT * FROM tasks WHERE repo = ? AND stage = 'development'", (repo,)
).fetchall()
finally:
conn.close()
return [dict(r) for r in rows]
def create_task_atomic(
plane_id: str,
work_item_id: str,
repo: str,
branch: str,
stage: str,
title: str,
) -> tuple[dict, bool]:
"""ORCH-053 (AC-4): atomically claim creation of a task for a plane_id.
Performs SELECT-exists -> INSERT under the process-wide ``_CREATE_TASK_LOCK``
so a race between the live Plane webhook and the F-2 reconciler (both seeing
"no task yet" for the same plane_id) cannot create two task rows / branches /
worktrees / starter analyst jobs.
Returns ``(row, created)``:
* ``created=True`` -> THIS caller inserted the row and owns the follow-up
work (branch / docs / analyst enqueue);
* ``created=False`` -> a task for this plane_id already existed (the other
racer won); ``row`` is the existing task and the caller must NOT duplicate
the follow-up work.
"""
with _CREATE_TASK_LOCK:
conn = get_db()
try:
existing = conn.execute(
"SELECT * FROM tasks WHERE plane_id = ? OR plane_issue_id = ?",
(plane_id, plane_id),
).fetchone()
if existing:
return dict(existing), False
cur = conn.execute(
"INSERT INTO tasks "
"(plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(plane_id, work_item_id, repo, branch, stage, plane_id, title),
)
conn.commit()
row = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (cur.lastrowid,)
).fetchone()
return dict(row), True
finally:
conn.close()
def update_task_stage(task_id: int, stage: str):
"""Update task stage and timestamp."""
conn = get_db()

View File

@@ -80,11 +80,19 @@ async def lifespan(app: FastAPI):
from .queue_worker import worker
worker.start()
# ORCH-053: start the stuck-task reconciler AFTER the worker so its active-job
# guard sees a fully-initialised queue. Kill-switch: ORCH_RECONCILE_ENABLED.
from .reconciler import reconciler
reconciler.start()
try:
yield
finally:
# Graceful shutdown of the worker (running agents keep going; their jobs
# are requeued on next start via queue-recovery if the process dies).
# Graceful shutdown order mirrors startup in reverse: stop the reconciler
# first (it must not enqueue new work while the worker is winding down),
# then the worker. Running agents keep going; their jobs are requeued on
# next start via queue-recovery if the process dies.
reconciler.stop()
worker.stop()
@@ -114,10 +122,12 @@ async def queue():
"""ORCH-1: job-queue observability — status counts + recent jobs."""
from .db import job_status_counts, recent_jobs
from .queue_worker import worker
from .reconciler import reconciler
return {
"counts": job_status_counts(),
"max_concurrency": worker.max_concurrency,
"poll_interval": worker.poll_interval,
"resilience": worker.status(),
"reconcile": reconciler.status(),
"recent": recent_jobs(10),
}

View File

@@ -356,6 +356,62 @@ def fetch_issue_fields(issue_id: str, project_id: str) -> tuple[str, str]:
return "", ""
def list_issues_by_state(project_id: str, state_uuids: list[str]) -> list[dict]:
"""ORCH-053 (F-2): list a project's issues whose state is in ``state_uuids``.
GETs ``/workspaces/{ws}/projects/{pid}/issues/`` and walks ALL pages
(Plane's cursor pagination: ``results`` + ``next_cursor`` /
``next_page_results``), keeping only issues whose state uuid is one of the
requested ones. The filter is applied client-side on ``issue.state`` (a dict
``{id,...}`` or a bare uuid string) so it works regardless of whether Plane's
query-param state filter is honoured.
Never raises: on any network / API / shape error it logs a warning and
returns ``[]`` so a Plane outage degrades the F-2 tick softly instead of
crashing it.
"""
if not project_id or not state_uuids:
return []
wanted = set(state_uuids)
out: list[dict] = []
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/issues/"
try:
cursor = None
pages = 0
while True:
params: dict = {"per_page": 100}
if cursor:
params["cursor"] = cursor
resp = httpx.get(url, headers=PLANE_HEADERS, params=params, timeout=10)
resp.raise_for_status()
body = resp.json()
if isinstance(body, dict):
items = body.get("results", [])
else:
items = body if isinstance(body, list) else []
for issue in items:
state = issue.get("state")
sid = state.get("id") if isinstance(state, dict) else state
if sid in wanted:
out.append(issue)
# Pagination: continue only while Plane reports more pages.
pages += 1
if not isinstance(body, dict):
break
has_more = bool(body.get("next_page_results"))
next_cursor = body.get("next_cursor")
if not has_more or not next_cursor or pages >= 100:
break
cursor = next_cursor
return out
except Exception as e:
logger.warning(
f"list_issues_by_state: API failed for project {project_id[:8]}..., "
f"returning []. Error: {e}"
)
return []
def find_issue_id(work_item_id: str, project_id: str = None) -> str | None:
"""Find Plane issue UUID by work_item_id (e.g. 'ET-002')."""
project_id = _resolve_project_id(work_item_id, project_id)

332
src/reconciler.py Normal file
View File

@@ -0,0 +1,332 @@
"""ORCH-053: stuck-task reconciler (sweeper for lost webhooks).
The pipeline advances ONLY on incoming webhooks (Plane status / Gitea CI/PR). A
dropped event (502 on a rebuilding instance, no Plane/Gitea retries, an
unresolved ``sha->branch``) leaves the source of truth (the gate / the Plane
status) changed while the task stays put — a silently stuck task (incident
ORCH-044). None of the existing resilience layers (``requeue_running_jobs``,
orphan-recovery, events de-dup, ``ci_poll``) reconcile this
"source-of-truth != task-stage" drift; they all work at the jobs/agent_runs
level, not the stage transition.
This module is a background daemon thread (modelled on ``queue_worker``) that
periodically replays the missed transition through the SAME standard gates /
handlers a webhook would use:
* **F-1 gate-side** (``reconcile_gate_once``): for each task with
``stage != 'done'``, no active job and ``age(updated_at) >=
grace_for_stage(stage)``, do a read-only pre-evaluation of the stage's
canonical quality gate; green -> advance through the unchanged
``stage_engine.advance_stage(..., finished_agent=None)``; red -> silence
(no advance, no notification). ``analysis`` is NOT reconciled here (human
gate; owned by F-2).
* **F-2 plane-side** (``reconcile_plane_once``): poll the Plane API per
project (``list_issues_by_state``) and replay In Progress / Approved /
Rejected through ``webhooks.plane.handle_status_start`` /
``handle_verdict`` (no logic duplicated).
Invariants: source of truth is the gate / Plane (not the event); advance only
via ``advance_stage``; idempotency (active-job guard + atomic create-claim +
grace + ``max_concurrency=1``); never-raise per unit of work; silence when in
sync; restart-safe; kill-switch ``ORCH_RECONCILE_ENABLED``
(+ ``ORCH_RECONCILE_PLANE_ENABLED`` mutes only F-2). The DB schema and the
registries (``STAGE_TRANSITIONS`` / ``QG_CHECKS``) are unchanged.
See docs/work-items/ORCH-053/06-adr/ADR-001-stuck-task-reconciler.md and the
cross-cutting docs/architecture/adr/adr-0007-reconciler.md.
"""
import asyncio
import json
import logging
import threading
from datetime import datetime, timezone
from .config import settings
from .db import (
get_active_tasks_for_reconcile,
get_task_by_plane_id,
has_active_job_for_task,
)
from .stage_engine import advance_if_gate_passed
from .stages import get_qg_for_stage
from .plane_sync import get_project_states, list_issues_by_state
from .webhooks.plane import handle_status_start, handle_verdict
from .notifications import send_telegram
from . import projects
logger = logging.getLogger("orchestrator.reconciler")
def _parse_grace_overrides(raw: str) -> dict[str, int]:
"""Parse ``reconcile_grace_overrides_json`` into {stage: seconds}.
Invalid / non-object JSON -> {} (caller falls back to the default grace),
mirroring the never-raise contract of ``agent_timeout_overrides_json``.
"""
if not raw or not raw.strip():
return {}
try:
data = json.loads(raw)
except (ValueError, TypeError) as e:
logger.warning(f"reconcile_grace_overrides_json is not valid JSON, ignoring: {e}")
return {}
if not isinstance(data, dict):
logger.warning("reconcile_grace_overrides_json must be a JSON object, ignoring")
return {}
out: dict[str, int] = {}
for k, v in data.items():
try:
out[str(k)] = int(v)
except (ValueError, TypeError):
logger.warning(f"reconcile_grace_overrides_json[{k}] is not an int, ignoring")
return out
def grace_for_stage(stage: str) -> int:
"""Per-stage "stuck" threshold (seconds): override from JSON, else default."""
overrides = _parse_grace_overrides(settings.reconcile_grace_overrides_json)
return overrides.get(stage, settings.reconcile_grace_default_s)
def _age_seconds_iso(ts: str) -> float | None:
"""Age in seconds of a Plane ISO-8601 timestamp (e.g. issue.updated_at).
Returns None when the value is missing / unparseable (caller decides the
fallback). Handles a trailing 'Z' and treats naive timestamps as UTC.
"""
if not ts:
return None
try:
text = ts.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
dt = datetime.fromisoformat(text)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return (datetime.now(timezone.utc) - dt).total_seconds()
except Exception:
return None
class Reconciler:
"""Background daemon that reconciles webhook-induced stage drift.
Modelled on ``QueueWorker``: a plain ``threading.Thread(daemon=True)`` +
``threading.Event`` for a clean stop. No correctness-critical state is held
in memory — every tick re-reads the DB / Plane; the observability counters
(``last_run_ts`` / ``unblocked_total`` / ``last_unblocked``) are best-effort
and may reset on restart (AC-11 allows this).
"""
def __init__(self, interval_s: float | None = None):
self.interval_s = (
interval_s if interval_s is not None else settings.reconcile_interval_s
)
self._stop = threading.Event()
self._thread: threading.Thread | None = None
# Best-effort observability (F-4).
self.last_run_ts: float | None = None
self.unblocked_total: int = 0
self.last_unblocked: str | None = None
# -- F-1: gate-side ----------------------------------------------------
def reconcile_gate_once(self) -> None:
"""One F-1 pass over all non-terminal tasks (per-task never-raise)."""
if not settings.reconcile_enabled:
return
for task in get_active_tasks_for_reconcile():
try:
self._reconcile_gate_task(task)
except Exception as e: # noqa: BLE001 - isolate one task's failure
logger.error(
f"reconciler F-1: task {task.get('id')} "
f"(stage={task.get('stage')}) failed: {e}"
)
def _reconcile_gate_task(self, task: dict) -> None:
task_id = task["id"]
stage = task["stage"]
# AC-16: analysis is a human gate -> owned by F-2, never F-1.
if stage == "analysis":
return
# created / done have no gate to evaluate.
if get_qg_for_stage(stage) is None:
return
# AC-3: a queued/running job means the task is legitimately in flight (or
# a live webhook just enqueued one) -> do not touch it.
if has_active_job_for_task(task_id):
return
# AC-5: respect the per-stage grace ("stuck", not just busy).
age_s = task.get("age_s") or 0
if age_s < grace_for_stage(stage):
return
result = advance_if_gate_passed(
task_id,
stage,
task["repo"],
task.get("work_item_id") or "",
task.get("branch") or "",
)
if result is not None and getattr(result, "advanced", False):
self._note_unblock(task.get("work_item_id") or str(task_id), stage)
# -- F-2: plane-side ---------------------------------------------------
def reconcile_plane_once(self) -> None:
"""One F-2 pass: poll Plane per project and replay missed transitions."""
if not settings.reconcile_enabled or not settings.reconcile_plane_enabled:
return
for proj in projects.PROJECTS:
try:
self._reconcile_plane_project(proj)
except Exception as e: # noqa: BLE001 - isolate one project's failure
logger.error(f"reconciler F-2: project {proj.repo} failed: {e}")
def _reconcile_plane_project(self, proj) -> None:
pid = proj.plane_project_id
# Resolve the actionable state uuids per-project (never hardcode).
states = get_project_states(pid)
in_progress = states["in_progress"]
approved = states["approved"]
rejected = states["rejected"]
issues = list_issues_by_state(pid, [in_progress, approved, rejected])
for issue in issues:
try:
self._reconcile_plane_issue(
issue, pid, in_progress, approved, rejected
)
except Exception as e: # noqa: BLE001 - isolate one issue's failure
logger.error(
f"reconciler F-2: issue {issue.get('id')} failed: {e}"
)
def _reconcile_plane_issue(
self, issue: dict, project_id: str,
in_progress: str, approved: str, rejected: str,
) -> None:
issue_id = str(issue.get("id") or "")
if not issue_id:
return
state = issue.get("state")
new_state = state.get("id") if isinstance(state, dict) else state
# Grace ("lost, not merely delayed"): use the issue's own updated_at age.
# A missing/unparseable timestamp is treated as old enough (the active-job
# guard + atomic create-claim still prevent doubling).
age = _age_seconds_iso(issue.get("updated_at") or "")
if age is not None and age < settings.reconcile_grace_default_s:
return
task = get_task_by_plane_id(issue_id)
# AC-3/AC-4: a live webhook is in flight for this task -> skip.
if task is not None and has_active_job_for_task(task["id"]):
return
# issue_data in the shape the plane handlers expect; missing name /
# description are pulled by the handlers themselves (fetch_issue_fields).
issue_data = {
"id": issue_id,
"state": {"id": new_state},
"project": project_id,
"name": issue.get("name", ""),
"description_stripped": issue.get("description_stripped", ""),
}
if new_state == in_progress and task is None:
# In Progress without a task -> start the pipeline (lost start webhook).
self._dispatch(handle_status_start, issue_data, project_id)
self._note_unblock(issue_id, "analysis")
elif new_state == approved and task is not None:
# Approved but the stage never advanced -> replay the verdict.
self._dispatch(handle_verdict, issue_data, project_id, approved=True)
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
elif new_state == rejected and task is not None:
# Rejected but never rolled back -> replay the verdict.
self._dispatch(handle_verdict, issue_data, project_id, approved=False)
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
# else: everything is in sync -> silence (AC-10).
@staticmethod
def _dispatch(coro_fn, *args, **kwargs) -> None:
"""Run an async plane handler from this sync thread.
``asyncio.run`` spins a fresh event loop per call, which is required
because ``handle_verdict -> _try_advance_stage`` uses
``asyncio.to_thread`` (needs a running loop). The handlers are
REUSED verbatim — no pipeline logic is duplicated here.
"""
asyncio.run(coro_fn(*args, **kwargs))
# -- observability (F-4) ----------------------------------------------
def _note_unblock(self, work_item_id: str, stage: str) -> None:
"""Record + announce that a stuck task was unblocked (AC-12).
Fires only on an actual state change (an advance / replayed transition),
never per idle tick, so it does not conflict with AC-9 / AC-10.
"""
self.unblocked_total += 1
self.last_unblocked = work_item_id
logger.info(
f"reconciler: {work_item_id} {stage} разблокирована (потерян webhook)"
)
if settings.reconcile_notify_unblock:
try:
send_telegram(
f"\U0001f527 reconciler: {work_item_id} {stage} "
f"разблокирована (потерян webhook)"
)
except Exception as e: # noqa: BLE001 - never break the tick
logger.warning(f"reconciler: unblock telegram failed: {e}")
# -- loop / lifecycle --------------------------------------------------
def _tick(self) -> None:
if settings.reconcile_enabled:
self.reconcile_gate_once() # F-1
if settings.reconcile_plane_enabled:
self.reconcile_plane_once() # F-2
self.last_run_ts = datetime.now(timezone.utc).timestamp()
def _run(self) -> None:
logger.info(
f"Reconciler started (interval={self.interval_s}s, "
f"enabled={settings.reconcile_enabled}, "
f"plane_enabled={settings.reconcile_plane_enabled})"
)
while not self._stop.is_set():
try:
self._tick()
except Exception as e: # noqa: BLE001 - outer never-raise
logger.error(f"Reconciler loop error: {e}")
self._stop.wait(self.interval_s)
logger.info("Reconciler stopped")
def start(self) -> None:
"""Start the daemon thread (idempotent: a live thread is a no-op)."""
if self._thread and self._thread.is_alive():
return
self._stop.clear()
self._thread = threading.Thread(
target=self._run, name="reconciler", daemon=True
)
self._thread.start()
def stop(self, timeout: float = 5.0) -> None:
self._stop.set()
if self._thread:
self._thread.join(timeout=timeout)
def status(self) -> dict:
"""Reconcile snapshot for /queue observability."""
return {
"enabled": settings.reconcile_enabled,
"plane_enabled": settings.reconcile_plane_enabled,
"interval": self.interval_s,
"last_run_ts": self.last_run_ts,
"unblocked_total": self.unblocked_total,
"last_unblocked": self.last_unblocked,
}
# Module-level singleton used by the FastAPI lifespan.
reconciler = Reconciler()

View File

@@ -353,6 +353,75 @@ def advance_stage(
return result
def advance_if_gate_passed(
task_id: int,
current_stage: str,
repo: str,
work_item_id: str,
branch: str,
) -> AdvanceResult | None:
"""ORCH-053 (F-1): reconcile a stuck stage by advancing it ONLY if its
quality gate is already green — without spamming failure notifications.
This is the thin wrapper the reconciler uses so that:
* The source of truth stays the GATE, and the advance path stays the
UNCHANGED unified ``advance_stage(..., finished_agent=None)`` (the same
path the Plane Approved-webhook uses). The reconciler never duplicates
``update_task_stage`` / ``enqueue_job`` (AC-2).
* On a stable-RED gate the sweeper is structurally silent: we do a cheap
read-only pre-evaluation of the gate and, if it fails, return ``None``
WITHOUT ever calling ``advance_stage`` — so the QG-failure notification
branch inside ``advance_stage`` (``agent is None`` ->
``notify_qg_failure`` + ``plane_notify_qg``) cannot fire on any tick
(AC-9). Spam is impossible by construction.
``analysis`` is intentionally NOT reconciled here: its gate
(``check_analysis_approved``) is a HUMAN gate; with ``finished_agent=None``
``advance_stage`` would treat it as approved-via-status and could advance an
unapproved BRD. The analysis advance is owned by the Plane-side reconciler
(F-2), which checks the real Plane status (AC-16).
Returns the ``AdvanceResult`` from ``advance_stage`` when the gate passed,
or ``None`` when the stage is not eligible / the gate is red / on any error
(never raises — the caller isolates per-task failures).
"""
try:
# AC-16: F-1 never reconciles the human analysis gate.
if current_stage == "analysis":
return None
qg_name = get_qg_for_stage(current_stage)
if not qg_name:
# created / done -> no gate to evaluate.
return None
# Read-only pre-evaluation with the SAME dispatcher the webhook path uses.
passed, reason = _run_qg(qg_name, repo, work_item_id, branch)
if not passed:
# Stable-red -> stay silent (no advance_stage call -> no QG-failure
# notification on this or any later tick).
logger.debug(
f"reconciler: task {task_id} gate '{qg_name}' still red "
f"({reason}); leaving on '{current_stage}'"
)
return None
# Gate is green: advance via the unchanged unified path. It re-runs the
# (idempotent, read-only) gate, advances the stage, sends the STANDARD
# advance notifications and enqueues the next agent.
return advance_stage(
task_id, current_stage, repo, work_item_id, branch, finished_agent=None
)
except Exception as e: # noqa: BLE001 - never-raise per ORCH-053 NFR
logger.error(
f"advance_if_gate_passed failed for task_id={task_id} "
f"stage={current_stage}: {e}"
)
return None
def _build_analyst_ready_comment(
repo: str, work_item_id: str, branch: str, task_id: int | None = None
) -> str:

View File

@@ -144,6 +144,36 @@ async def handle_push(payload: dict):
logger.info(f"Task {task_id}: source push detected on '{branch}', waiting for CI")
def _resolve_branch_via_db(repo_name: str) -> str:
"""ORCH-053 (F-3): resolve a CI-status SHA to a branch via the tasks DB.
Returns the branch of the SINGLE development-stage task for ``repo_name``.
If there are zero or several such tasks the match is ambiguous -> return ""
(the caller leaves the branch unresolved; never a false match). Logged at
INFO for visibility. Never raises.
"""
try:
from ..db import get_development_tasks_by_repo
devs = get_development_tasks_by_repo(repo_name)
except Exception as e: # noqa: BLE001 - defensive, never break the webhook
logger.info(f"CI status: sha->branch DB fallback errored for {repo_name}: {e}")
return ""
if len(devs) == 1:
branch = devs[0].get("branch") or ""
if branch:
logger.info(
f"CI status: sha->branch resolved via DB fallback to '{branch}' "
f"(unique development task in {repo_name})"
)
return branch
if len(devs) > 1:
logger.info(
f"CI status: sha->branch DB fallback ambiguous "
f"({len(devs)} development tasks in {repo_name}), leaving unresolved"
)
return ""
async def handle_ci_status(payload: dict):
"""
CI status update:
@@ -178,7 +208,15 @@ async def handle_ci_status(payload: dict):
except Exception:
pass
if not branch:
logger.debug(f"CI status event: could not determine branch for sha={sha}")
# ORCH-053 (F-3): DB fallback — when the SHA cannot be resolved to a
# branch (lost on a 502 rebuild, etc.), match it to the UNIQUE
# development-stage task of this repo. Ambiguity (more than one) is
# left unresolved to avoid a false match; the F-1 sweeper still picks
# such a task up later (defense-in-depth, not the critical path).
branch = _resolve_branch_via_db(repo_name)
if not branch:
# logger.info (was debug) so a lost CI event is VISIBLE in the logs.
logger.info(f"CI status event: could not determine branch for sha={sha}")
return
repo_name = payload.get("repository", {}).get("name", settings.default_repo)

View File

@@ -17,6 +17,7 @@ from ..db import (
update_task_stage,
enqueue_job,
insert_event_dedup,
create_task_atomic,
)
from ._dedup import plane_delivery_id
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
@@ -496,15 +497,21 @@ async def start_pipeline(data: dict, project_id: str = ""):
f"branch collision for {repo}; disambiguated to unique branch {branch}"
)
# Insert task into DB
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(plane_id, work_item_id, repo, branch, "analysis", plane_id, name),
# Insert task into DB — ORCH-053 (AC-4): atomic anti-dup claim under a
# process-wide lock. If the F-2 reconciler and this live webhook race on the
# same plane_id, exactly one wins (created=True); the loser sees the existing
# task and returns WITHOUT creating a second branch / worktree / analyst job.
task_row, created = create_task_atomic(
plane_id, work_item_id, repo, branch, "analysis", name
)
conn.commit()
conn.close()
if not created:
logger.info(
f"start_pipeline: task for plane_id={plane_id} already exists "
f"(id={task_row['id']}, work_item_id={task_row.get('work_item_id')}), "
f"skipping duplicate creation"
)
return
task_id = task_row["id"]
# Create branch in Gitea
try:
@@ -523,20 +530,17 @@ async def start_pipeline(data: dict, project_id: str = ""):
logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis")
# Launch analyst agent
# Launch analyst agent (task_id from the atomic create above).
try:
task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone()
if task_row:
task_id = task_row[0]
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
)
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
# Post start comment to Plane
from ..plane_sync import add_comment as _add_comment
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
)
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
# Post start comment to Plane
from ..plane_sync import add_comment as _add_comment
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
except Exception as e:
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")