fix(reconciler): stop F-2 livelock spam on synced terminal tasks + cache TTL
Reconciler F-2 spammed Telegram "<wi> разблокирована" every ~120s for a
fully-synchronized Done task (incident ET-002, 191+ msgs/night) after the
ORCH-066 Plane status model merge. Two stacked defects (defense in depth):
- D1 (selection): actionable states were told apart by bare UUID, so a Done
issue aliased onto the approved UUID entered the approved branch. Now
terminal states are excluded by Plane state GROUP (completed/cancelled),
a project-independent discriminator robust to UUID aliasing; per-issue
check with a logical-key fallback when the group is unavailable.
get_project_states caches {uuid -> group} from the same /states/ fetch;
new sibling accessor get_project_state_groups.
- D2 (notification): _note_unblock fired unconditionally after _dispatch.
Now it only fires on a confirmed state change (stage before/after _dispatch;
task-appears for the start case) — handlers' contracts untouched.
- TR-3: in-memory dedup guard {issue_id -> last unblocked state} as a backstop.
- TR-4: _STATES_CACHE lived for the whole process lifetime, so a new Plane
status was invisible without a restart. Added TTL ORCH_PLANE_STATES_TTL_S
(default 300s; 0 = previous lifetime cache) reusing reload_project_states();
a failed refresh serves the stale-but-correct set, not enduro defaults.
STAGE_TRANSITIONS / QG_CHECKS / DB schema / handle_* contracts / F-1 / F-3
unchanged; never-raise preserved; self-hosting tick never restarts prod.
Observability: skipped_terminal_total / deduped_total in /queue reconcile block.
Tests: tests/test_reconciler_plane.py (TC-01..TC-10),
tests/test_plane_states_cache.py (TC-11/TC-12).
Refs: ORCH-068
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -265,6 +265,18 @@ class Settings(BaseSettings):
|
||||
reconcile_notify_unblock: bool = True
|
||||
reconcile_skip_blocked_enabled: bool = True
|
||||
|
||||
# ORCH-068: TTL for the per-project Plane states cache (_STATES_CACHE in
|
||||
# plane_sync). Historically the cache lived for the whole process lifetime,
|
||||
# so a status added to Plane after start was never seen without a restart
|
||||
# ("stale set -> no pipeline action"). With a TTL the entry self-heals by
|
||||
# re-fetching /states/ after it expires (invalidation reuses the existing
|
||||
# reload_project_states() primitive — no duplicated reset logic).
|
||||
# plane_states_ttl_s (env ORCH_PLANE_STATES_TTL_S):
|
||||
# >0 -> seconds before a cache entry is re-fetched (default 300 = 5 min);
|
||||
# 0 -> disable TTL -> strictly the previous lifetime cache (back-compat
|
||||
# escape hatch). get_project_states return shape is unchanged.
|
||||
plane_states_ttl_s: int = 300
|
||||
|
||||
# ORCH-021: post-deploy production monitoring + degradation reaction. After
|
||||
# the terminal deploy->done transition for an applicable repo, a reserved-agent
|
||||
# `post-deploy-monitor` job (no LLM, modelled on deploy-finalizer) probes prod
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Plane API sync — update issue state and add comments."""
|
||||
|
||||
import logging
|
||||
import time
|
||||
import httpx
|
||||
from .config import settings
|
||||
|
||||
@@ -130,18 +131,42 @@ _PLANE_NAME_TO_KEY: dict[str, str] = {
|
||||
"Blocked": "blocked",
|
||||
}
|
||||
|
||||
# Per-project state cache: {project_id: {logical_key: state_uuid}}
|
||||
_STATES_CACHE: dict[str, dict[str, str]] = {}
|
||||
# Per-project state cache (ORCH-10 + ORCH-068).
|
||||
#
|
||||
# Each entry is a RECORD, not a bare mapping:
|
||||
# {"states": {logical_key: state_uuid}, # the ORCH-10 mapping (unchanged shape)
|
||||
# "groups": {state_uuid: group}, # ORCH-068 D1: {uuid -> Plane state.group}
|
||||
# "ts": monotonic timestamp} # ORCH-068 TR-4: for TTL self-heal
|
||||
# get_project_states() still RETURNS the bare {logical_key: state_uuid} mapping
|
||||
# (backward compatible — AC-13); the richer record is internal.
|
||||
_STATES_CACHE: dict[str, dict] = {}
|
||||
|
||||
|
||||
def _cache_record_fresh(record: dict) -> bool:
|
||||
"""ORCH-068 (TR-4): is a cache record still within its TTL?
|
||||
|
||||
``plane_states_ttl_s <= 0`` disables the TTL -> a record never expires
|
||||
(strictly the previous lifetime-cache behaviour, back-compat escape hatch).
|
||||
"""
|
||||
ttl = settings.plane_states_ttl_s
|
||||
if ttl <= 0:
|
||||
return True
|
||||
ts = record.get("ts", 0.0)
|
||||
return (time.monotonic() - ts) <= ttl
|
||||
|
||||
|
||||
def get_project_states(project_id: str) -> dict[str, str]:
|
||||
"""ORCH-10: resolve {logical_key -> state_uuid} for a specific Plane project.
|
||||
|
||||
Source of truth: Plane API GET /projects/<project_id>/states/.
|
||||
Results are cached per project_id for the lifetime of the process.
|
||||
Results are cached per project_id. ORCH-068 (TR-4): a cached entry is
|
||||
re-fetched once it is older than ``plane_states_ttl_s`` (default 300s) so a
|
||||
status added to Plane after start self-heals without a process restart;
|
||||
``plane_states_ttl_s = 0`` keeps the previous lifetime cache.
|
||||
|
||||
Falls back to _DEFAULT_STATES (enduro-trails values) if:
|
||||
* project_id is empty/None,
|
||||
* the API call fails (network error, non-2xx),
|
||||
* the API call fails (network error, non-2xx) AND nothing is cached,
|
||||
* the response contains no recognisable states.
|
||||
|
||||
The enduro-trails project therefore returns the same UUIDs as before
|
||||
@@ -151,8 +176,9 @@ def get_project_states(project_id: str) -> dict[str, str]:
|
||||
if not project_id:
|
||||
return _DEFAULT_STATES
|
||||
|
||||
if project_id in _STATES_CACHE:
|
||||
return _STATES_CACHE[project_id]
|
||||
cached = _STATES_CACHE.get(project_id)
|
||||
if cached is not None and _cache_record_fresh(cached):
|
||||
return cached["states"]
|
||||
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}/states/"
|
||||
try:
|
||||
@@ -165,12 +191,21 @@ def get_project_states(project_id: str) -> dict[str, str]:
|
||||
raise ValueError(f"unexpected states response shape: {type(items)}")
|
||||
|
||||
resolved: dict[str, str] = {}
|
||||
groups: dict[str, str] = {}
|
||||
for item in items:
|
||||
name = item.get("name", "")
|
||||
uid = item.get("id", "")
|
||||
key = _PLANE_NAME_TO_KEY.get(name)
|
||||
if key and uid:
|
||||
resolved[key] = uid
|
||||
# ORCH-068 D1: capture {uuid -> group} for terminal-state detection
|
||||
# (a single API fetch — no extra network cost). The group is the
|
||||
# authoritative, project-independent discriminator of terminal
|
||||
# (completed/cancelled) vs review/work statuses, robust to UUID
|
||||
# aliasing after status renames (ORCH-066).
|
||||
grp = item.get("group", "")
|
||||
if uid and grp:
|
||||
groups[uid] = grp
|
||||
|
||||
if not resolved:
|
||||
raise ValueError("no recognisable states in API response")
|
||||
@@ -180,13 +215,26 @@ def get_project_states(project_id: str) -> dict[str, str]:
|
||||
for k, v in _DEFAULT_STATES.items():
|
||||
resolved.setdefault(k, v)
|
||||
|
||||
_STATES_CACHE[project_id] = resolved
|
||||
_STATES_CACHE[project_id] = {
|
||||
"states": resolved,
|
||||
"groups": groups,
|
||||
"ts": time.monotonic(),
|
||||
}
|
||||
logger.debug(
|
||||
f"get_project_states: cached {len(resolved)} states for project {project_id[:8]}..."
|
||||
f"get_project_states: cached {len(resolved)} states / "
|
||||
f"{len(groups)} groups for project {project_id[:8]}..."
|
||||
)
|
||||
return resolved
|
||||
|
||||
except Exception as e:
|
||||
# On a transient API failure keep serving the stale (but project-correct)
|
||||
# set if we have one — far safer than reverting to enduro defaults.
|
||||
if cached is not None:
|
||||
logger.warning(
|
||||
f"get_project_states: API refresh failed for project "
|
||||
f"{project_id[:8]}..., serving stale cached set. Error: {e}"
|
||||
)
|
||||
return cached["states"]
|
||||
logger.warning(
|
||||
f"get_project_states: API failed for project {project_id[:8]}..., "
|
||||
f"falling back to _DEFAULT_STATES. Error: {e}"
|
||||
@@ -194,6 +242,23 @@ def get_project_states(project_id: str) -> dict[str, str]:
|
||||
return _DEFAULT_STATES
|
||||
|
||||
|
||||
def get_project_state_groups(project_id: str) -> dict[str, str]:
|
||||
"""ORCH-068 (D1): return {state_uuid -> group} for a Plane project.
|
||||
|
||||
Reads the SAME cache record populated by ``get_project_states`` (no extra
|
||||
network call). Call ``get_project_states(project_id)`` first to ensure the
|
||||
record is fresh/populated. Returns ``{}`` when nothing is cached (e.g. the
|
||||
API was unreachable and the caller fell back to ``_DEFAULT_STATES``); the
|
||||
reconciler then falls back to logical terminal keys.
|
||||
"""
|
||||
record = _STATES_CACHE.get(project_id)
|
||||
if isinstance(record, dict):
|
||||
groups = record.get("groups")
|
||||
if isinstance(groups, dict):
|
||||
return groups
|
||||
return {}
|
||||
|
||||
|
||||
def reload_project_states(project_id: str = None) -> None:
|
||||
"""ORCH-10: clear the per-project states cache.
|
||||
|
||||
|
||||
@@ -60,7 +60,12 @@ from .stage_engine import (
|
||||
MAX_DEVELOPER_RETRIES,
|
||||
)
|
||||
from .stages import get_qg_for_stage
|
||||
from .plane_sync import fetch_issue_state, get_project_states, list_issues_by_state
|
||||
from .plane_sync import (
|
||||
fetch_issue_state,
|
||||
get_project_states,
|
||||
get_project_state_groups,
|
||||
list_issues_by_state,
|
||||
)
|
||||
from .webhooks.plane import handle_status_start, handle_verdict
|
||||
from .notifications import send_telegram
|
||||
from . import projects
|
||||
@@ -139,6 +144,13 @@ class Reconciler:
|
||||
self.last_run_ts: float | None = None
|
||||
self.unblocked_total: int = 0
|
||||
self.last_unblocked: str | None = None
|
||||
# ORCH-068 observability: terminal-state skips and dedup suppressions.
|
||||
self.skipped_terminal_total: int = 0
|
||||
self.deduped_total: int = 0
|
||||
# ORCH-068 (TR-3): in-memory dedup guard {issue_id -> last unblocked
|
||||
# state uuid}. Best-effort (resets on restart, like unblocked_total);
|
||||
# suppresses a repeat unblock notification for the same issue+state.
|
||||
self._unblock_dedup: dict[str, str] = {}
|
||||
|
||||
# -- F-1: gate-side ----------------------------------------------------
|
||||
def reconcile_gate_once(self) -> None:
|
||||
@@ -242,6 +254,9 @@ class Reconciler:
|
||||
pid = proj.plane_project_id
|
||||
# Resolve the actionable state uuids per-project (never hardcode).
|
||||
states = get_project_states(pid)
|
||||
# ORCH-068 D1: {uuid -> group} from the SAME cache record (no extra
|
||||
# fetch); empty when the API was unreachable -> per-issue fallback by key.
|
||||
groups = get_project_state_groups(pid)
|
||||
in_progress = states["in_progress"]
|
||||
approved = states["approved"]
|
||||
rejected = states["rejected"]
|
||||
@@ -249,16 +264,36 @@ class Reconciler:
|
||||
for issue in issues:
|
||||
try:
|
||||
self._reconcile_plane_issue(
|
||||
issue, pid, in_progress, approved, rejected
|
||||
issue, pid, in_progress, approved, rejected, states, groups
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 - isolate one issue's failure
|
||||
logger.error(
|
||||
f"reconciler F-2: issue {issue.get('id')} failed: {e}"
|
||||
)
|
||||
|
||||
def _is_terminal_state(
|
||||
self, state_uuid: str, states: dict, groups: dict
|
||||
) -> bool:
|
||||
"""ORCH-068 D1: is ``state_uuid`` a terminal (completed/cancelled) state?
|
||||
|
||||
Primary discriminator is the Plane **state group** (project-independent,
|
||||
robust to UUID aliasing after status renames): ``group`` in
|
||||
``{completed, cancelled}`` -> terminal. When the group is unavailable
|
||||
(API gave no ``group`` / we fell back to ``_DEFAULT_STATES``), fall back
|
||||
to the logical terminal keys ``done`` / ``cancelled``.
|
||||
"""
|
||||
if not state_uuid:
|
||||
return False
|
||||
grp = groups.get(state_uuid)
|
||||
if grp:
|
||||
return grp in {"completed", "cancelled"}
|
||||
# Fallback (group unknown): logical terminal keys for this project.
|
||||
return state_uuid in {states.get("done"), states.get("cancelled")}
|
||||
|
||||
def _reconcile_plane_issue(
|
||||
self, issue: dict, project_id: str,
|
||||
in_progress: str, approved: str, rejected: str,
|
||||
states: dict, groups: dict,
|
||||
) -> None:
|
||||
issue_id = str(issue.get("id") or "")
|
||||
if not issue_id:
|
||||
@@ -266,6 +301,15 @@ class Reconciler:
|
||||
state = issue.get("state")
|
||||
new_state = state.get("id") if isinstance(state, dict) else state
|
||||
|
||||
# ORCH-068 D1: a terminal issue (Done / Cancelled) is fully in sync by
|
||||
# definition -> never actionable. Excluded per-issue (not by narrowing
|
||||
# `wanted`) because UUID aliasing can make a terminal uuid collide with
|
||||
# an actionable one — only the state GROUP disentangles them. Restores
|
||||
# the silence-when-in-sync invariant (AC-1/AC-2).
|
||||
if self._is_terminal_state(new_state, states, groups):
|
||||
self.skipped_terminal_total += 1
|
||||
return
|
||||
|
||||
# Grace ("lost, not merely delayed"): use the issue's own updated_at age.
|
||||
# A missing/unparseable timestamp is treated as old enough (the active-job
|
||||
# guard + atomic create-claim still prevent doubling).
|
||||
@@ -290,18 +334,41 @@ class Reconciler:
|
||||
|
||||
if new_state == in_progress and task is None:
|
||||
# In Progress without a task -> start the pipeline (lost start webhook).
|
||||
# ORCH-068 D2: confirm a REAL change (the task now exists) before
|
||||
# announcing — a no-op dispatch stays silent.
|
||||
self._dispatch(handle_status_start, issue_data, project_id)
|
||||
self._note_unblock(issue_id, "analysis")
|
||||
if get_task_by_plane_id(issue_id) is not None:
|
||||
self._note_unblock(issue_id, "analysis", new_state)
|
||||
elif new_state == approved and task is not None:
|
||||
# Approved but the stage never advanced -> replay the verdict.
|
||||
stage_before = task["stage"]
|
||||
self._dispatch(handle_verdict, issue_data, project_id, approved=True)
|
||||
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
|
||||
if self._stage_changed(issue_id, stage_before):
|
||||
self._note_unblock(
|
||||
task.get("work_item_id") or issue_id, stage_before, new_state
|
||||
)
|
||||
elif new_state == rejected and task is not None:
|
||||
# Rejected but never rolled back -> replay the verdict.
|
||||
stage_before = task["stage"]
|
||||
self._dispatch(handle_verdict, issue_data, project_id, approved=False)
|
||||
self._note_unblock(task.get("work_item_id") or issue_id, task["stage"])
|
||||
if self._stage_changed(issue_id, stage_before):
|
||||
self._note_unblock(
|
||||
task.get("work_item_id") or issue_id, stage_before, new_state
|
||||
)
|
||||
# else: everything is in sync -> silence (AC-10).
|
||||
|
||||
@staticmethod
|
||||
def _stage_changed(issue_id: str, stage_before: str) -> bool:
|
||||
"""ORCH-068 D2: did the dispatched handler actually move the stage?
|
||||
|
||||
Re-reads the task after ``_dispatch`` and compares to the captured
|
||||
``stage_before``. A no-op replay (the task was already in the target
|
||||
state) leaves the stage unchanged -> no unblock notification.
|
||||
"""
|
||||
after = get_task_by_plane_id(issue_id)
|
||||
stage_after = after["stage"] if after else stage_before
|
||||
return stage_after != stage_before
|
||||
|
||||
@staticmethod
|
||||
def _dispatch(coro_fn, *args, **kwargs) -> None:
|
||||
"""Run an async plane handler from this sync thread.
|
||||
@@ -314,12 +381,27 @@ class Reconciler:
|
||||
asyncio.run(coro_fn(*args, **kwargs))
|
||||
|
||||
# -- observability (F-4) ----------------------------------------------
|
||||
def _note_unblock(self, work_item_id: str, stage: str) -> None:
|
||||
def _note_unblock(
|
||||
self, work_item_id: str, stage: str, state_uuid: str | None = None
|
||||
) -> None:
|
||||
"""Record + announce that a stuck task was unblocked (AC-12).
|
||||
|
||||
Fires only on an actual state change (an advance / replayed transition),
|
||||
never per idle tick, so it does not conflict with AC-9 / AC-10.
|
||||
|
||||
ORCH-068 (TR-3): an in-memory dedup guard keyed by ``issue_id ->
|
||||
state_uuid`` suppresses a repeat notification for the same issue+state
|
||||
if a future no-op path ever reaches here. ``state_uuid`` is the issue's
|
||||
Plane state; ``work_item_id`` doubles as the issue id for the
|
||||
pipeline-start case (which has no work item yet).
|
||||
"""
|
||||
dedup_key = work_item_id
|
||||
if state_uuid is not None and self._unblock_dedup.get(dedup_key) == state_uuid:
|
||||
self.deduped_total += 1
|
||||
return
|
||||
if state_uuid is not None:
|
||||
self._unblock_dedup[dedup_key] = state_uuid
|
||||
|
||||
self.unblocked_total += 1
|
||||
self.last_unblocked = work_item_id
|
||||
logger.info(
|
||||
@@ -380,6 +462,9 @@ class Reconciler:
|
||||
"last_run_ts": self.last_run_ts,
|
||||
"unblocked_total": self.unblocked_total,
|
||||
"last_unblocked": self.last_unblocked,
|
||||
# ORCH-068 observability.
|
||||
"skipped_terminal_total": self.skipped_terminal_total,
|
||||
"deduped_total": self.deduped_total,
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user