integ: merge ORCH-068 reconciler livelock fix

# Conflicts:
#	docs/architecture/README.md
#	src/reconciler.py
This commit is contained in:
stream
2026-06-08 06:36:29 +00:00
20 changed files with 1379 additions and 17 deletions

View File

@@ -60,7 +60,12 @@ from .stage_engine import (
MAX_DEVELOPER_RETRIES,
)
from .stages import get_qg_for_stage
from .plane_sync import fetch_issue_state, get_project_states, list_issues_by_state
from .plane_sync import (
fetch_issue_state,
get_project_states,
get_project_state_groups,
list_issues_by_state,
)
from .webhooks.plane import handle_status_start, handle_verdict
from .notifications import send_telegram
from . import projects
@@ -139,6 +144,13 @@ class Reconciler:
self.last_run_ts: float | None = None
self.unblocked_total: int = 0
self.last_unblocked: str | None = None
# ORCH-068 observability: terminal-state skips and dedup suppressions.
self.skipped_terminal_total: int = 0
self.deduped_total: int = 0
# ORCH-068 (TR-3): in-memory dedup guard {issue_id -> last unblocked
# state uuid}. Best-effort (resets on restart, like unblocked_total);
# suppresses a repeat unblock notification for the same issue+state.
self._unblock_dedup: dict[str, str] = {}
# -- F-1: gate-side ----------------------------------------------------
def reconcile_gate_once(self) -> None:
@@ -271,23 +283,47 @@ class Reconciler:
# the project's own `in_progress` UUID, so enduro behaviour is identical
# (and `list_issues_by_state` deduplicates the uuid via its internal set).
states = get_project_states(pid)
# ORCH-066 (AC-19): start/resume trigger is `To Analyse`.
to_analyse = states["to_analyse"]
# ORCH-068 D1: {uuid -> group} from the SAME cache record (no extra
# fetch); empty when the API was unreachable -> per-issue fallback by key.
groups = get_project_state_groups(pid)
approved = states["approved"]
rejected = states["rejected"]
issues = list_issues_by_state(pid, [to_analyse, approved, rejected])
for issue in issues:
try:
self._reconcile_plane_issue(
issue, pid, to_analyse, approved, rejected
issue, pid, to_analyse, approved, rejected, states, groups
)
except Exception as e: # noqa: BLE001 - isolate one issue's failure
logger.error(
f"reconciler F-2: issue {issue.get('id')} failed: {e}"
)
def _is_terminal_state(
self, state_uuid: str, states: dict, groups: dict
) -> bool:
"""ORCH-068 D1: is ``state_uuid`` a terminal (completed/cancelled) state?
Primary discriminator is the Plane **state group** (project-independent,
robust to UUID aliasing after status renames): ``group`` in
``{completed, cancelled}`` -> terminal. When the group is unavailable
(API gave no ``group`` / we fell back to ``_DEFAULT_STATES``), fall back
to the logical terminal keys ``done`` / ``cancelled``.
"""
if not state_uuid:
return False
grp = groups.get(state_uuid)
if grp:
return grp in {"completed", "cancelled"}
# Fallback (group unknown): logical terminal keys for this project.
return state_uuid in {states.get("done"), states.get("cancelled")}
def _reconcile_plane_issue(
self, issue: dict, project_id: str,
to_analyse: str, approved: str, rejected: str,
states: dict, groups: dict,
) -> None:
issue_id = str(issue.get("id") or "")
if not issue_id:
@@ -295,6 +331,15 @@ class Reconciler:
state = issue.get("state")
new_state = state.get("id") if isinstance(state, dict) else state
# ORCH-068 D1: a terminal issue (Done / Cancelled) is fully in sync by
# definition -> never actionable. Excluded per-issue (not by narrowing
# `wanted`) because UUID aliasing can make a terminal uuid collide with
# an actionable one — only the state GROUP disentangles them. Restores
# the silence-when-in-sync invariant (AC-1/AC-2).
if self._is_terminal_state(new_state, states, groups):
self.skipped_terminal_total += 1
return
# Grace ("lost, not merely delayed"): use the issue's own updated_at age.
# A missing/unparseable timestamp is treated as old enough (the active-job
# guard + atomic create-claim still prevent doubling).
@@ -319,24 +364,48 @@ class Reconciler:
if new_state == to_analyse and task is None:
# To Analyse without a task -> start the pipeline (lost start webhook).
# ORCH-068 D2: confirm a REAL change (the task now exists) before
# announcing — a no-op dispatch stays silent.
self._dispatch(handle_status_start, issue_data, project_id)
self._note_unblock(issue_id, "analysis")
if get_task_by_plane_id(issue_id) is not None:
self._note_unblock(issue_id, "analysis", new_state)
elif new_state == to_analyse and task is not None:
# To Analyse with an existing (idle) task -> resume the analyst from
# Needs Input (lost resume webhook). handle_status_start applies its
# own busy-guard / start-vs-resume fork.
self._dispatch(handle_status_start, issue_data, project_id)
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"], new_state)
elif new_state == approved and task is not None:
# Approved but the stage never advanced -> replay the verdict.
stage_before = task["stage"]
self._dispatch(handle_verdict, issue_data, project_id, approved=True)
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
if self._stage_changed(issue_id, stage_before):
self._note_unblock(
task.get("work_item_id") or issue_id, stage_before, new_state
)
elif new_state == rejected and task is not None:
# Rejected but never rolled back -> replay the verdict.
stage_before = task["stage"]
self._dispatch(handle_verdict, issue_data, project_id, approved=False)
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
if self._stage_changed(issue_id, stage_before):
self._note_unblock(
task.get("work_item_id") or issue_id, stage_before, new_state
)
# else: everything is in sync -> silence (AC-10).
@staticmethod
def _stage_changed(issue_id: str, stage_before: str) -> bool:
"""ORCH-068 D2: did the dispatched handler actually move the stage?
Re-reads the task after ``_dispatch`` and compares to the captured
``stage_before``. A no-op replay (the task was already in the target
state) leaves the stage unchanged -> no unblock notification.
"""
after = get_task_by_plane_id(issue_id)
stage_after = after["stage"] if after else stage_before
return stage_after != stage_before
@staticmethod
def _dispatch(coro_fn, *args, **kwargs) -> None:
"""Run an async plane handler from this sync thread.
@@ -349,12 +418,27 @@ class Reconciler:
asyncio.run(coro_fn(*args, **kwargs))
# -- observability (F-4) ----------------------------------------------
def _note_unblock(self, work_item_id: str, stage: str) -> None:
def _note_unblock(
self, work_item_id: str, stage: str, state_uuid: str | None = None
) -> None:
"""Record + announce that a stuck task was unblocked (AC-12).
Fires only on an actual state change (an advance / replayed transition),
never per idle tick, so it does not conflict with AC-9 / AC-10.
ORCH-068 (TR-3): an in-memory dedup guard keyed by ``issue_id ->
state_uuid`` suppresses a repeat notification for the same issue+state
if a future no-op path ever reaches here. ``state_uuid`` is the issue's
Plane state; ``work_item_id`` doubles as the issue id for the
pipeline-start case (which has no work item yet).
"""
dedup_key = work_item_id
if state_uuid is not None and self._unblock_dedup.get(dedup_key) == state_uuid:
self.deduped_total += 1
return
if state_uuid is not None:
self._unblock_dedup[dedup_key] = state_uuid
self.unblocked_total += 1
self.last_unblocked = work_item_id
logger.info(
@@ -415,6 +499,9 @@ class Reconciler:
"last_run_ts": self.last_run_ts,
"unblocked_total": self.unblocked_total,
"last_unblocked": self.last_unblocked,
# ORCH-068 observability.
"skipped_terminal_total": self.skipped_terminal_total,
"deduped_total": self.deduped_total,
}