feat(reconciler): sweeper потерянных webhook (реконсиляция застрявших стадий)

Конвейер продвигается только входящими webhook; потерянное событие (502 на
ребилде, отсутствие ретраев у Plane/Gitea, неразрезолвленный sha→branch)
оставляет задачу молча застрявшей (класс инцидента ORCH-044). Новый фоновый
daemon-поток src/reconciler.py (паттерн queue_worker) доигрывает пропущенный
переход через те же штатные гейты/обработчики, что и webhook:

- F-1 gate-side: для задач stage≠done, без активного job и age(updated_at) ≥
  grace_for_stage(stage) — read-only пред-оценка канонического QG; зелёный →
  stage_engine.advance_stage(..., finished_agent=None); красный → тишина (спам
  нотификаций структурно невозможен). analysis F-1 не трогает (человеческий гейт).
- F-2 plane-side: опрос Plane API per-project (plane_sync.list_issues_by_state,
  курсорная пагинация, never-raise) → реплей In Progress/Approved/Rejected через
  существующие handle_status_start/handle_verdict (async из sync-потока, asyncio.run).
- F-3: усиление sha→branch в handle_ci_status — БД-fallback по единственной
  development-задаче repo (неоднозначность → не резолвим), debug→info.
- Анти-дубль на создании (db.create_task_atomic под process-wide Lock): гонка
  reconcile↔webhook не плодит второй task/branch/worktree/analyst-job (AC-4).
- F-4 observability: лог-строка разблокировки + Telegram + блок reconcile в /queue.

Старт/стоп в main.lifespan (после worker.start() / перед worker.stop()),
restart-safe, never-raise на единицу работы. Kill-switches ORCH_RECONCILE_ENABLED
/ ORCH_RECONCILE_PLANE_ENABLED + grace-настройки. Схема БД и реестры
STAGE_TRANSITIONS/QG_CHECKS не менялись.

Тесты: test_reconciler.py, test_reconciler_plane.py, test_gitea_sha_resolve.py,
test_config.py (33 новых, 563 всего зелёные). Документация обновлена (golden source):
architecture/README.md, INFRA.md, README.md, CHANGELOG.md, adr-0007 → accepted.

Refs: ORCH-053

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 20:37:01 +00:00
committed by Dev Agent
parent f5aae50514
commit 7d2d77217a
20 changed files with 1535 additions and 31 deletions

View File

@@ -144,6 +144,36 @@ async def handle_push(payload: dict):
logger.info(f"Task {task_id}: source push detected on '{branch}', waiting for CI")
def _resolve_branch_via_db(repo_name: str) -> str:
"""ORCH-053 (F-3): resolve a CI-status SHA to a branch via the tasks DB.
Returns the branch of the SINGLE development-stage task for ``repo_name``.
If there are zero or several such tasks the match is ambiguous -> return ""
(the caller leaves the branch unresolved; never a false match). Logged at
INFO for visibility. Never raises.
"""
try:
from ..db import get_development_tasks_by_repo
devs = get_development_tasks_by_repo(repo_name)
except Exception as e: # noqa: BLE001 - defensive, never break the webhook
logger.info(f"CI status: sha->branch DB fallback errored for {repo_name}: {e}")
return ""
if len(devs) == 1:
branch = devs[0].get("branch") or ""
if branch:
logger.info(
f"CI status: sha->branch resolved via DB fallback to '{branch}' "
f"(unique development task in {repo_name})"
)
return branch
if len(devs) > 1:
logger.info(
f"CI status: sha->branch DB fallback ambiguous "
f"({len(devs)} development tasks in {repo_name}), leaving unresolved"
)
return ""
async def handle_ci_status(payload: dict):
"""
CI status update:
@@ -178,7 +208,15 @@ async def handle_ci_status(payload: dict):
except Exception:
pass
if not branch:
logger.debug(f"CI status event: could not determine branch for sha={sha}")
# ORCH-053 (F-3): DB fallback — when the SHA cannot be resolved to a
# branch (lost on a 502 rebuild, etc.), match it to the UNIQUE
# development-stage task of this repo. Ambiguity (more than one) is
# left unresolved to avoid a false match; the F-1 sweeper still picks
# such a task up later (defense-in-depth, not the critical path).
branch = _resolve_branch_via_db(repo_name)
if not branch:
# logger.info (was debug) so a lost CI event is VISIBLE in the logs.
logger.info(f"CI status event: could not determine branch for sha={sha}")
return
repo_name = payload.get("repository", {}).get("name", settings.default_repo)

View File

@@ -17,6 +17,7 @@ from ..db import (
update_task_stage,
enqueue_job,
insert_event_dedup,
create_task_atomic,
)
from ._dedup import plane_delivery_id
from ..stages import get_next_stage, get_agent_for_stage, get_qg_for_stage, get_previous_stage
@@ -496,15 +497,21 @@ async def start_pipeline(data: dict, project_id: str = ""):
f"branch collision for {repo}; disambiguated to unique branch {branch}"
)
# Insert task into DB
conn = get_db()
conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id, title) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(plane_id, work_item_id, repo, branch, "analysis", plane_id, name),
# Insert task into DB — ORCH-053 (AC-4): atomic anti-dup claim under a
# process-wide lock. If the F-2 reconciler and this live webhook race on the
# same plane_id, exactly one wins (created=True); the loser sees the existing
# task and returns WITHOUT creating a second branch / worktree / analyst job.
task_row, created = create_task_atomic(
plane_id, work_item_id, repo, branch, "analysis", name
)
conn.commit()
conn.close()
if not created:
logger.info(
f"start_pipeline: task for plane_id={plane_id} already exists "
f"(id={task_row['id']}, work_item_id={task_row.get('work_item_id')}), "
f"skipping duplicate creation"
)
return
task_id = task_row["id"]
# Create branch in Gitea
try:
@@ -523,20 +530,17 @@ async def start_pipeline(data: dict, project_id: str = ""):
logger.info(f"Task created: {work_item_id} ({name}), branch={branch}, stage=analysis")
# Launch analyst agent
# Launch analyst agent (task_id from the atomic create above).
try:
task_row = get_db().execute("SELECT id FROM tasks WHERE work_item_id=?", (work_item_id,)).fetchone()
if task_row:
task_id = task_row[0]
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
)
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
# Post start comment to Plane
from ..plane_sync import add_comment as _add_comment
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nTitle: {name}\n\nDescription:\n{description}"
)
job_id = enqueue_job("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: enqueued analyst (job_id={job_id})")
# Post start comment to Plane
from ..plane_sync import add_comment as _add_comment
_add_comment(work_item_id, "\U0001f50d Analyst \u0437\u0430\u043f\u0443\u0449\u0435\u043d. BRD/\u0422\u0417/AC/TestPlan \u0432 \u0440\u0430\u0431\u043e\u0442\u0435 (\u043e\u0436\u0438\u0434\u0430\u0439\u0442\u0435 8-15 \u043c\u0438\u043d).", author="analyst")
except Exception as e:
logger.error(f"Failed to launch analyst for {work_item_id}: {e}")