fix(merge-gate): tolerate re-test infra-timeout + tree-kill spawned pytest
Eliminate the false `deploy-staging -> development` rollback that fired when the merge-gate local re-test timed out (infra/resource) on a green CI + tester + staging branch (incident ORCH-109/PR #129: a 516.7s suite blew its 600s budget under CPU starvation from orphaned pytest processes -> timeout misrouted as a code fault -> developer-retry loop -> manual gate). Additive, 5 independent kill-switches, never-raise, self-hosting scope. Untouched byte-for-byte: STAGE_TRANSITIONS, the QG_CHECKS registry, check_branch_mergeable name/semantics, machine-verdict keys, the DB schema. INV-4 (never push/force-push main) and the no-prod-restart rule are preserved. - D1: new stdlib-only leaf src/proc_group.py runs the spawned re-test/coverage pytest in its own process group (start_new_session) and tree-kills the WHOLE group on timeout (os.killpg SIGTERM->grace->SIGKILL); used by merge_gate.retest_branch and coverage_gate.measure_coverage. No orphan leak. Fallback never-break: subprocess_tree_kill_enabled=False / non-POSIX -> the prior subprocess.run. - D2/D3: merge_gate.classify_retest_failure distinguishes timeout/red/lock-busy/ other; an infra timeout routes to _handle_merge_gate_infra_retry (bounded re-queue, task stays on deploy-staging, no rollback / no developer-retry); a red re-test / conflict still rolls back (BR-6). Exhaustion -> one infra alert. - D4: skip the local re-test when the pre-merge rebase was a proven no-op (HEAD already CI/tester/staging-validated); fail-safe runs the re-test on any uncertainty. Flag merge_retest_skip_when_current_enabled. - D5: merge_retest_timeout_s 600 -> 900 + _resolve_retest_timeout validation; reaper_max_running_s invariant preserved without change. - D6: in-process counters + read-only merge_gate block in GET /queue; appended ("ORCH-110","classify_retest_failure","src/merge_gate.py") to MAIN_REGRESSION_MARKERS. Docs (README/internals overview/CLAUDE/CHANGELOG/ .env.example) updated in the same PR. Tests: tests/test_orch110_*.py (TC-01..TC-12, incl. the red-before/green-after incident regression). Full suite green (1988 passed). Refs: ORCH-110 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -202,18 +202,50 @@ class Settings(BaseSettings):
|
||||
# only the self-hosting repo (orchestrator). Other
|
||||
# repos -> conditional no-op (mirrors ORCH-35 staging).
|
||||
# merge_retest_timeout_s -> wall-clock budget for the post-rebase re-test.
|
||||
# ORCH-110 (D5): raised 600 -> 900 (74% headroom over the observed 516.7s
|
||||
# suite vs the prior ~16%). Cross-invariant (ORCH-065/109): keep
|
||||
# reaper_max_running_s (5400) > Σ(deploy-staging gate-work) + grace — see
|
||||
# docs/work-items/ORCH-110/07-infra-requirements.md.
|
||||
# merge_retest_target -> pytest target for the re-test (portability across repos).
|
||||
# merge_lock_timeout_s -> max lease age; an older lease is reclaimed (crash backstop).
|
||||
# merge_defer_delay_s -> delay before re-running the gate when the lock is busy.
|
||||
# merge_defer_max_attempts -> defer retries before escalation (avoids livelock).
|
||||
merge_gate_enabled: bool = True
|
||||
merge_gate_repos: str = ""
|
||||
merge_retest_timeout_s: int = 600
|
||||
merge_retest_timeout_s: int = 900
|
||||
merge_retest_target: str = "tests/"
|
||||
merge_lock_timeout_s: int = 300
|
||||
merge_defer_delay_s: int = 60
|
||||
merge_defer_max_attempts: int = 5
|
||||
|
||||
# ORCH-110: merge-gate re-test infra-timeout tolerance + tree-kill of the
|
||||
# orchestrator-spawned pytest subprocess (re-test + coverage). Each default =
|
||||
# the desired prod behaviour (ORCH-101 canon); each flag is an INDEPENDENT
|
||||
# kill-switch (off -> byte-for-byte pre-ORCH-110). Detailed ADR:
|
||||
# docs/work-items/ORCH-110/06-adr/ADR-001-merge-gate-retest-infra-tolerance-and-tree-kill.md.
|
||||
# subprocess_tree_kill_enabled -> D1: spawn the re-test / coverage
|
||||
# pytest in its own process group and tree-kill the WHOLE group on timeout
|
||||
# (no orphan grandchildren grinding the host CPU). off -> the prior
|
||||
# subprocess.run(timeout=) (ORCH_SUBPROCESS_TREE_KILL_ENABLED).
|
||||
# merge_retest_infra_tolerance_enabled -> D3: a re-test TIMEOUT is a transient
|
||||
# (bounded infra-retry, NOT a code-fault rollback to development burning a
|
||||
# developer retry). off -> timeout = the prior rollback
|
||||
# (ORCH_MERGE_RETEST_INFRA_TOLERANCE_ENABLED).
|
||||
# merge_retest_infra_max_retries -> D3: infra-retry budget before an
|
||||
# infra-alert (anti-loop). (ORCH_MERGE_RETEST_INFRA_MAX_RETRIES)
|
||||
# merge_retest_infra_retry_delay_s -> D3: delay before the staging-deployer
|
||||
# re-run. (ORCH_MERGE_RETEST_INFRA_RETRY_DELAY_S)
|
||||
# merge_retest_skip_when_current_enabled -> D4: skip the local re-test when the
|
||||
# pre-merge rebase was a PROVEN no-op (branch already at origin/main; HEAD is
|
||||
# exactly the CI/tester/staging-validated commit). off -> always re-test
|
||||
# after rebase (ORCH_MERGE_RETEST_SKIP_WHEN_CURRENT_ENABLED).
|
||||
# The tree-kill grace reuses the existing agent_kill_grace_seconds (no new key).
|
||||
subprocess_tree_kill_enabled: bool = True
|
||||
merge_retest_infra_tolerance_enabled: bool = True
|
||||
merge_retest_infra_max_retries: int = 2
|
||||
merge_retest_infra_retry_delay_s: int = 120
|
||||
merge_retest_skip_when_current_enabled: bool = True
|
||||
|
||||
# ORCH-036: executable self-deploy (deploy stage drives the host hook).
|
||||
# The `deploy` stage for the self-hosting repo is turned into a REAL prod
|
||||
# restart via a detached host process, gated by a manual approve. Three-phase
|
||||
|
||||
@@ -44,7 +44,8 @@ Invariants (ADR-001 §7, never broken):
|
||||
artefact / decides. It never calls the deploy hook, never restarts the prod
|
||||
container, never pushes / force-pushes ``main``.
|
||||
|
||||
This module is a **leaf**: it imports only ``config`` / ``git_worktree`` and lazily
|
||||
This module is a **leaf**: it imports only ``config`` / ``git_worktree`` /
|
||||
``proc_group`` (the ORCH-110 stdlib-only process-group runner) and lazily
|
||||
``qg.checks.is_self_hosting_repo`` / ``db`` / ``notifications``; it never imports
|
||||
``stage_engine``.
|
||||
"""
|
||||
@@ -52,11 +53,11 @@ This module is a **leaf**: it imports only ``config`` / ``git_worktree`` and laz
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from .config import settings
|
||||
from .git_worktree import ensure_worktree, get_worktree_path
|
||||
from .proc_group import run_in_process_group # ORCH-110 D1: tree-kill on timeout
|
||||
|
||||
logger = logging.getLogger("orchestrator.coverage_gate")
|
||||
|
||||
@@ -152,22 +153,25 @@ def measure_coverage(repo: str, branch: str) -> float | None:
|
||||
"-q",
|
||||
]
|
||||
timeout = settings.coverage_run_timeout_s
|
||||
try:
|
||||
subprocess.run(cmd, cwd=wt, capture_output=True, text=True, timeout=timeout)
|
||||
except subprocess.TimeoutExpired:
|
||||
# ORCH-110 (D1 / FR-2 / BR-3): run the coverage suite in its OWN process group so
|
||||
# a timeout tree-kills the WHOLE subtree (the sibling orphan-leak source of the
|
||||
# ORCH-109 incident), not just the direct child. The metric is read from the JSON
|
||||
# file regardless of the exit code, so a non-timeout spawn/OS error (returncode
|
||||
# None) just falls through to "no coverage json produced" -> None, byte-for-byte
|
||||
# the prior fail-open contract.
|
||||
res = run_in_process_group(
|
||||
cmd,
|
||||
cwd=wt,
|
||||
timeout=timeout,
|
||||
tree_kill=bool(getattr(settings, "subprocess_tree_kill_enabled", True)),
|
||||
grace_s=settings.agent_kill_grace_seconds,
|
||||
)
|
||||
if res.timed_out:
|
||||
logger.warning(
|
||||
"measure_coverage: pytest --cov timed out after %ss for %s/%s",
|
||||
timeout, repo, branch,
|
||||
)
|
||||
return None
|
||||
except FileNotFoundError:
|
||||
logger.warning(
|
||||
"measure_coverage: pytest / pytest-cov not available for %s/%s", repo, branch
|
||||
)
|
||||
return None
|
||||
except (subprocess.SubprocessError, OSError) as e:
|
||||
logger.warning("measure_coverage: pytest --cov error for %s/%s: %s", repo, branch, e)
|
||||
return None
|
||||
|
||||
data = None
|
||||
try:
|
||||
|
||||
@@ -225,6 +225,11 @@ async def queue():
|
||||
"reaper": reaper.status(),
|
||||
"post_deploy": post_deploy.status(),
|
||||
"merge_verify": merge_gate.merge_verify_status(),
|
||||
# ORCH-110 (D6): merge-gate re-test infra-timeout observability (read-only) —
|
||||
# tolerance/skip/tree-kill flags + timeout/infra-retry/skip counters, so an
|
||||
# infra-timeout is distinguishable from a code-fault rollback. Additive block;
|
||||
# never-raise.
|
||||
"merge_gate": merge_gate.merge_gate_status(),
|
||||
# ORCH-026 (G-2): declarative task-dependency observability (read-only,
|
||||
# NOT a source of truth) — declared edges, blocked tasks, detected cycle.
|
||||
"task_deps": task_deps.snapshot(),
|
||||
|
||||
@@ -36,6 +36,7 @@ import time
|
||||
|
||||
from .config import settings
|
||||
from .git_worktree import ensure_worktree, get_worktree_path
|
||||
from .proc_group import run_in_process_group # ORCH-110 D1: tree-kill on timeout
|
||||
|
||||
logger = logging.getLogger("orchestrator.merge_gate")
|
||||
|
||||
@@ -175,17 +176,103 @@ def auto_rebase_onto_main(repo: str, branch: str) -> tuple[bool, str]:
|
||||
# ---------------------------------------------------------------------------
|
||||
# re-test in the caught-up worktree
|
||||
# ---------------------------------------------------------------------------
|
||||
def _resolve_retest_timeout() -> int:
|
||||
"""Resolve the re-test wall-clock budget (ORCH-110 D5), never-break.
|
||||
|
||||
``int(settings.merge_retest_timeout_s)`` when it is a positive int; otherwise
|
||||
the safe default (900) + WARNING (mirrors ``launcher._resolve_timeout``). A
|
||||
malformed / non-positive config never reaches ``subprocess`` (FR-3 / AC-5).
|
||||
"""
|
||||
default = 900
|
||||
try:
|
||||
v = int(settings.merge_retest_timeout_s)
|
||||
if v > 0:
|
||||
return v
|
||||
logger.warning(
|
||||
"Non-positive merge_retest_timeout_s=%r; using default %ss",
|
||||
settings.merge_retest_timeout_s, default,
|
||||
)
|
||||
except (TypeError, ValueError):
|
||||
logger.warning(
|
||||
"Invalid merge_retest_timeout_s=%r; using default %ss",
|
||||
settings.merge_retest_timeout_s, default,
|
||||
)
|
||||
return default
|
||||
|
||||
|
||||
def head_sha(repo: str, branch: str) -> str:
|
||||
"""Return ``git rev-parse HEAD`` of the branch worktree, or ``""`` (ORCH-110 D4).
|
||||
|
||||
Used by ``check_branch_mergeable`` to detect a no-op rebase (HEAD unchanged ->
|
||||
branch already contained the latest origin/main). Reads ONLY the existing
|
||||
per-branch worktree (``get_worktree_path``; the worktree exists by the
|
||||
deploy-staging->deploy edge) — it never CREATES one, so on an absent worktree it
|
||||
safely returns ``""`` (the no-op-skip then never triggers; re-test runs —
|
||||
fail-safe). Never-raise: any git/OS error -> ``""``.
|
||||
"""
|
||||
try:
|
||||
wt = get_worktree_path(repo, branch)
|
||||
if not os.path.isdir(wt):
|
||||
return ""
|
||||
r = subprocess.run(
|
||||
["git", "-C", wt, "rev-parse", "HEAD"],
|
||||
capture_output=True, text=True, timeout=_SHORT_TIMEOUT,
|
||||
)
|
||||
if r.returncode != 0:
|
||||
return ""
|
||||
return (r.stdout or "").strip()
|
||||
except (subprocess.SubprocessError, OSError) as e:
|
||||
logger.warning("head_sha error for %s/%s: %s", repo, branch, e)
|
||||
return ""
|
||||
|
||||
|
||||
def classify_retest_failure(reason: str) -> str:
|
||||
"""Classify a merge-gate FAIL ``reason`` (ORCH-110 D2). Pure, never-raise.
|
||||
|
||||
The single point for the 'magic string' of the re-test outcome so the routing
|
||||
in ``stage_engine._handle_merge_gate`` can distinguish an INFRA timeout from a
|
||||
real RED re-test without scattering ``"timeout" in reason`` checks:
|
||||
|
||||
* ``"timeout"`` — the re-test hit its wall-clock budget (infra/resource);
|
||||
``check_branch_mergeable`` returns ``"re-test timeout after <T>s"``.
|
||||
* ``"red"`` — a deterministically red re-test (a real code defect):
|
||||
``"re-test failed after rebase: ..."``.
|
||||
* ``"lock-busy"`` — merge-lock contention (``"merge-lock busy"``).
|
||||
* ``"other"`` — rebase conflict / setup error / anything else.
|
||||
|
||||
SCOPE (ADR D2): ONLY the re-test timeout is an infra-tolerated transient.
|
||||
``auto_rebase_onto_main``'s own ``"rebase timeout"`` is a DIFFERENT timeout (git
|
||||
hung) and stays on the rollback path — without a successful rebase the branch
|
||||
cannot be caught up to ``main``, so a merge is impossible by construction.
|
||||
"""
|
||||
try:
|
||||
r = (reason or "").strip().lower()
|
||||
if r == "merge-lock busy":
|
||||
return "lock-busy"
|
||||
if "re-test timeout" in r:
|
||||
return "timeout"
|
||||
if r.startswith("re-test failed"):
|
||||
return "red"
|
||||
return "other"
|
||||
except Exception: # noqa: BLE001 - never-raise; unknown -> other (safe: rollback)
|
||||
return "other"
|
||||
|
||||
|
||||
def retest_branch(repo: str, branch: str) -> tuple[bool, str]:
|
||||
"""Run the project test-suite in the (already caught-up) branch worktree.
|
||||
|
||||
Command: ``python -m pytest <merge_retest_target>`` (default ``tests/``),
|
||||
matching the orchestrator CI / check_tests_local pattern. Bounded by
|
||||
``settings.merge_retest_timeout_s``.
|
||||
matching the orchestrator CI / check_tests_local pattern. Bounded by the
|
||||
validated ``_resolve_retest_timeout()`` budget (ORCH-110 D5).
|
||||
|
||||
Returns:
|
||||
ORCH-110 (D1 / FR-2): the suite is spawned in its OWN process group via
|
||||
``run_in_process_group`` so a timeout tree-kills the WHOLE subtree (no orphan
|
||||
grandchildren grinding the host CPU). The RETURN contract is byte-for-byte the
|
||||
prior one — only the side effect (no leak) changes:
|
||||
* ``(True, "re-test green")`` — pytest rc == 0
|
||||
* ``(False, "re-test timeout after <T>s")`` — exceeded the timeout (AC-6)
|
||||
* ``(False, "re-test timeout after <T>s")`` — exceeded the timeout (AC-4/AC-6)
|
||||
* ``(False, "re-test failed: ...<tail>")`` — non-zero rc, with output tail
|
||||
* ``(False, "re-test error: ...")`` — spawn / OS error (not a timeout)
|
||||
Never-raise (AC-9): any setup/OS error -> ``(False, "<reason>")``.
|
||||
"""
|
||||
wt = get_worktree_path(repo, branch)
|
||||
@@ -197,21 +284,25 @@ def retest_branch(repo: str, branch: str) -> tuple[bool, str]:
|
||||
return False, f"re-test setup error: {e}"
|
||||
|
||||
target = settings.merge_retest_target or "tests/"
|
||||
timeout = settings.merge_retest_timeout_s
|
||||
try:
|
||||
r = subprocess.run(
|
||||
["python", "-m", "pytest", target, "-q"],
|
||||
cwd=wt, capture_output=True, text=True, timeout=timeout,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
timeout = _resolve_retest_timeout()
|
||||
res = run_in_process_group(
|
||||
["python", "-m", "pytest", target, "-q"],
|
||||
cwd=wt,
|
||||
timeout=timeout,
|
||||
tree_kill=bool(getattr(settings, "subprocess_tree_kill_enabled", True)),
|
||||
grace_s=settings.agent_kill_grace_seconds,
|
||||
)
|
||||
if res.timed_out:
|
||||
logger.warning("retest_branch: timeout (%ss) on %s/%s", timeout, repo, branch)
|
||||
return False, f"re-test timeout after {timeout}s"
|
||||
except (subprocess.SubprocessError, OSError) as e:
|
||||
return False, f"re-test error: {e}"
|
||||
|
||||
if r.returncode == 0:
|
||||
if res.returncode is None:
|
||||
# Spawn / OS error that was NOT a timeout -> preserve the prior error reason.
|
||||
detail = (res.stderr or "").strip()[:200]
|
||||
logger.warning("retest_branch: spawn error on %s/%s: %s", repo, branch, detail)
|
||||
return False, f"re-test error: {detail}"
|
||||
if res.returncode == 0:
|
||||
return True, "re-test green"
|
||||
tail = ((r.stdout or "") + (r.stderr or ""))[-500:]
|
||||
tail = ((res.stdout or "") + (res.stderr or ""))[-500:]
|
||||
logger.warning("retest_branch: red on %s/%s", repo, branch)
|
||||
return False, f"re-test failed: ...{tail}"
|
||||
|
||||
@@ -576,6 +667,82 @@ def merge_verify_status() -> dict:
|
||||
return {"enabled": False}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ORCH-110 (D6): re-test infra-timeout observability counters (read-only).
|
||||
#
|
||||
# In-process counters (mirror _MERGE_VERIFY_COUNTERS): reset only on process start,
|
||||
# surfaced read-only via merge_gate_status() in GET /queue. NEVER the source of any
|
||||
# decision — purely informational, so an infra timeout is distinguishable from a
|
||||
# code-fault rollback in the snapshot. never-raise.
|
||||
# ---------------------------------------------------------------------------
|
||||
_MERGE_GATE_COUNTERS: dict = {
|
||||
"retest_timeout_total": 0, # re-test hit its wall-clock budget
|
||||
"retest_infra_retry_total": 0, # infra-timeout -> bounded re-queue (D3)
|
||||
"retest_infra_exhausted_total": 0, # infra-retry budget exhausted -> infra-alert
|
||||
"retest_skipped_current_total": 0, # re-test skipped on a no-op rebase (D4)
|
||||
"last_infra_timeout_wi": None,
|
||||
}
|
||||
|
||||
|
||||
def note_retest_timeout(work_item_id: str | None = None) -> None:
|
||||
"""Bump the 're-test timeout' counter (observability only). Never raises."""
|
||||
try:
|
||||
_MERGE_GATE_COUNTERS["retest_timeout_total"] += 1
|
||||
_MERGE_GATE_COUNTERS["last_infra_timeout_wi"] = work_item_id
|
||||
except Exception: # noqa: BLE001 - observability must never break a decision
|
||||
pass
|
||||
|
||||
|
||||
def note_retest_infra_retry() -> None:
|
||||
"""Bump the 'infra-timeout re-queue' counter (observability only). Never raises."""
|
||||
try:
|
||||
_MERGE_GATE_COUNTERS["retest_infra_retry_total"] += 1
|
||||
except Exception: # noqa: BLE001 - observability must never break a decision
|
||||
pass
|
||||
|
||||
|
||||
def note_retest_infra_exhausted() -> None:
|
||||
"""Bump the 'infra-retry exhausted' counter (observability only). Never raises."""
|
||||
try:
|
||||
_MERGE_GATE_COUNTERS["retest_infra_exhausted_total"] += 1
|
||||
except Exception: # noqa: BLE001 - observability must never break a decision
|
||||
pass
|
||||
|
||||
|
||||
def note_retest_skipped_current() -> None:
|
||||
"""Bump the 're-test skipped (no-op rebase)' counter (D4). Never raises."""
|
||||
try:
|
||||
_MERGE_GATE_COUNTERS["retest_skipped_current_total"] += 1
|
||||
except Exception: # noqa: BLE001 - observability must never break a decision
|
||||
pass
|
||||
|
||||
|
||||
def merge_gate_status() -> dict:
|
||||
"""Read-only snapshot of the ORCH-110 merge-gate re-test path for GET /queue.
|
||||
|
||||
Additive block; existing /queue keys are untouched. Surfaces the flags +
|
||||
counters so an infra-timeout (retry/exhaustion) is distinguishable from a
|
||||
code-fault rollback. Never the source of any decision. never-raise.
|
||||
"""
|
||||
try:
|
||||
return {
|
||||
"infra_tolerance_enabled": bool(settings.merge_retest_infra_tolerance_enabled),
|
||||
"infra_max_retries": settings.merge_retest_infra_max_retries,
|
||||
"infra_retry_delay_s": settings.merge_retest_infra_retry_delay_s,
|
||||
"skip_when_current_enabled": bool(settings.merge_retest_skip_when_current_enabled),
|
||||
"tree_kill_enabled": bool(settings.subprocess_tree_kill_enabled),
|
||||
"retest_timeout_s": settings.merge_retest_timeout_s,
|
||||
"retest_timeout_total": _MERGE_GATE_COUNTERS["retest_timeout_total"],
|
||||
"retest_infra_retry_total": _MERGE_GATE_COUNTERS["retest_infra_retry_total"],
|
||||
"retest_infra_exhausted_total": _MERGE_GATE_COUNTERS["retest_infra_exhausted_total"],
|
||||
"retest_skipped_current_total": _MERGE_GATE_COUNTERS["retest_skipped_current_total"],
|
||||
"last_infra_timeout_wi": _MERGE_GATE_COUNTERS["last_infra_timeout_wi"],
|
||||
}
|
||||
except Exception as e: # noqa: BLE001 - never-raise contract
|
||||
logger.warning("merge_gate_status error: %s", e)
|
||||
return {"infra_tolerance_enabled": False}
|
||||
|
||||
|
||||
def merge_verify_applies(repo: str) -> bool:
|
||||
"""Whether the ORCH-071 merge-verify under-gate is REAL for this repo.
|
||||
|
||||
@@ -1037,6 +1204,7 @@ MAIN_REGRESSION_MARKERS: list[tuple[str, str, str]] = [
|
||||
("ORCH-073", "check_main_regression", "src/merge_gate.py"),
|
||||
("ORCH-082", "ensure_open_pr", "src/merge_gate.py"),
|
||||
("ORCH-093", "_classify_merge_response", "src/merge_gate.py"),
|
||||
("ORCH-110", "classify_retest_failure", "src/merge_gate.py"),
|
||||
]
|
||||
|
||||
|
||||
|
||||
179
src/proc_group.py
Normal file
179
src/proc_group.py
Normal file
@@ -0,0 +1,179 @@
|
||||
"""Process-group runner with tree-kill on timeout (ORCH-110 D1).
|
||||
|
||||
Background
|
||||
----------
|
||||
The orchestrator spawns its OWN pytest subprocesses OUTSIDE the LLM-agent path:
|
||||
``merge_gate.retest_branch`` (the merge-gate re-test) and
|
||||
``coverage_gate.measure_coverage`` (the coverage measurement). Both used a plain
|
||||
``subprocess.run(..., timeout=...)``. On ``TimeoutExpired`` CPython kills only the
|
||||
DIRECT child (``proc.kill()``); any grandchildren (pytest-xdist workers, tests that
|
||||
spawn their own processes) are re-parented to PID 1 and keep running — for days,
|
||||
starving the host CPU. That orphan leak is a ROOT cause of the ORCH-109 / PR #129
|
||||
incident (a 516s suite blew its 600s budget under CPU starvation -> a false
|
||||
merge-gate rollback after a green CI).
|
||||
|
||||
This leaf gives those orchestrator-spawned runs the SAME SIGTERM->grace->SIGKILL
|
||||
cascade the LLM agents already get from ``launcher.stop_process`` — but at the
|
||||
process-GROUP level so the WHOLE subtree dies on a timeout:
|
||||
|
||||
* the child becomes the leader of a NEW session / process group
|
||||
(``start_new_session=True`` -> ``setsid``), so every descendant shares
|
||||
``pgid == child.pid``;
|
||||
* on timeout the whole group is killed with ``os.killpg`` (SIGTERM, grace poll,
|
||||
then SIGKILL), then reaped (no zombies).
|
||||
|
||||
Invariants:
|
||||
* **stdlib-only leaf** — imports only ``os`` / ``signal`` / ``subprocess`` /
|
||||
``time`` / ``logging`` / ``dataclasses``; NEVER another ``src/*`` module
|
||||
(mirrors the purity of ``serial_gate`` / ``staging_verdict``).
|
||||
* **never-raise** — any OS error degrades to a safe ``ProcResult`` (the caller maps
|
||||
it to its existing contract); an exception never escapes.
|
||||
* **never-break fallback** — with ``tree_kill=False`` OR on a platform without
|
||||
``os.killpg`` / ``os.setsid`` (non-POSIX) it degrades to the prior
|
||||
``subprocess.run(cmd, ..., timeout=timeout)`` (byte-for-byte the old behaviour;
|
||||
prod is Linux and never hits the fallback).
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
logger = logging.getLogger("orchestrator.proc_group")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProcResult:
|
||||
"""Outcome of a process-group run (ORCH-110 D1).
|
||||
|
||||
``returncode`` is the child's exit code, or ``None`` when the process could not
|
||||
be reaped with a real code (timeout-kill / spawn error). ``timed_out`` is True
|
||||
iff the command exceeded its wall-clock budget (and the whole group was killed).
|
||||
"""
|
||||
|
||||
returncode: int | None
|
||||
stdout: str
|
||||
stderr: str
|
||||
timed_out: bool
|
||||
|
||||
|
||||
def _tree_kill_supported() -> bool:
|
||||
"""POSIX process-group ops available? (non-POSIX -> fall back to subprocess.run)."""
|
||||
return hasattr(os, "killpg") and hasattr(os, "getpgid") and hasattr(os, "setsid")
|
||||
|
||||
|
||||
def _kill_group(pid: int, grace_s: float) -> None:
|
||||
"""SIGTERM -> grace -> SIGKILL the whole process GROUP led by ``pid``.
|
||||
|
||||
Mirrors ``launcher.stop_process`` but targets the process GROUP (``os.killpg``)
|
||||
so grandchildren die too. ``ProcessLookupError`` is tolerated at every step (the
|
||||
group may already be gone). Never raises.
|
||||
"""
|
||||
try:
|
||||
pgid = os.getpgid(pid)
|
||||
except (ProcessLookupError, OSError):
|
||||
return
|
||||
|
||||
# Phase 1: SIGTERM the whole group (graceful).
|
||||
try:
|
||||
os.killpg(pgid, signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
return
|
||||
except OSError as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("proc_group: SIGTERM killpg(%s) error: %s", pgid, e)
|
||||
|
||||
# Phase 2: poll for graceful group exit within the grace window.
|
||||
poll = 0.2
|
||||
waited = 0.0
|
||||
while waited < max(0.0, grace_s):
|
||||
time.sleep(poll)
|
||||
waited += poll
|
||||
try:
|
||||
os.killpg(pgid, 0) # signal 0 = liveness probe of the group
|
||||
except ProcessLookupError:
|
||||
return # whole group gone
|
||||
except OSError:
|
||||
break
|
||||
|
||||
# Phase 3: still alive -> hard SIGKILL the group.
|
||||
try:
|
||||
os.killpg(pgid, signal.SIGKILL)
|
||||
except ProcessLookupError:
|
||||
return
|
||||
except OSError as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("proc_group: SIGKILL killpg(%s) error: %s", pgid, e)
|
||||
|
||||
|
||||
def _run_plain(cmd, *, cwd, timeout, env=None) -> ProcResult:
|
||||
"""Fallback path: prior ``subprocess.run`` semantics (never-break)."""
|
||||
try:
|
||||
r = subprocess.run(
|
||||
cmd, cwd=cwd, env=env, capture_output=True, text=True, timeout=timeout
|
||||
)
|
||||
return ProcResult(
|
||||
returncode=r.returncode, stdout=r.stdout or "", stderr=r.stderr or "",
|
||||
timed_out=False,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
return ProcResult(returncode=None, stdout="", stderr="", timed_out=True)
|
||||
except (subprocess.SubprocessError, OSError) as e:
|
||||
logger.warning("proc_group(_run_plain): error for %s: %s", cmd, e)
|
||||
return ProcResult(returncode=None, stdout="", stderr=str(e), timed_out=False)
|
||||
|
||||
|
||||
def run_in_process_group(
|
||||
cmd, *, cwd, timeout, env=None, grace_s: float = 5.0, tree_kill: bool = True
|
||||
) -> ProcResult:
|
||||
"""Run ``cmd`` in its own process group; tree-kill the whole group on timeout.
|
||||
|
||||
See the module docstring. Returns a ``ProcResult``; ``timed_out=True`` iff the
|
||||
command exceeded ``timeout`` (and the whole group was killed). Never raises.
|
||||
|
||||
Fallback (never-break): ``tree_kill=False`` or a non-POSIX platform -> a plain
|
||||
``subprocess.run(cmd, ..., timeout=timeout)`` (the prior behaviour).
|
||||
"""
|
||||
if not tree_kill or not _tree_kill_supported():
|
||||
return _run_plain(cmd, cwd=cwd, timeout=timeout, env=env)
|
||||
|
||||
try:
|
||||
proc = subprocess.Popen( # noqa: S603 - cmd is an internal, fixed argv
|
||||
cmd,
|
||||
cwd=cwd,
|
||||
env=env,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
start_new_session=True, # setsid -> child leads a new process group
|
||||
)
|
||||
except (OSError, ValueError) as e:
|
||||
logger.warning("proc_group: spawn error for %s: %s", cmd, e)
|
||||
return ProcResult(returncode=None, stdout="", stderr=str(e), timed_out=False)
|
||||
|
||||
try:
|
||||
out, err = proc.communicate(timeout=timeout)
|
||||
return ProcResult(
|
||||
returncode=proc.returncode, stdout=out or "", stderr=err or "",
|
||||
timed_out=False,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
# Tree-kill the WHOLE group (children + grandchildren), then reap to avoid
|
||||
# leaving a zombie behind.
|
||||
_kill_group(proc.pid, grace_s)
|
||||
out, err = "", ""
|
||||
try:
|
||||
out, err = proc.communicate(timeout=grace_s + 5.0)
|
||||
except (subprocess.TimeoutExpired, OSError, ValueError):
|
||||
try:
|
||||
proc.wait(timeout=grace_s + 5.0)
|
||||
except (subprocess.TimeoutExpired, OSError, ValueError):
|
||||
pass
|
||||
return ProcResult(
|
||||
returncode=proc.returncode, stdout=out or "", stderr=err or "",
|
||||
timed_out=True,
|
||||
)
|
||||
except (OSError, ValueError) as e: # noqa: BLE001 - never-raise
|
||||
logger.warning("proc_group: communicate error for %s: %s", cmd, e)
|
||||
_kill_group(proc.pid, grace_s)
|
||||
return ProcResult(returncode=proc.returncode, stdout="", stderr=str(e), timed_out=False)
|
||||
@@ -708,10 +708,38 @@ def check_branch_mergeable(repo: str, work_item_id: str, branch: str) -> tuple[b
|
||||
logger.info("check_branch_mergeable: %s up-to-date with main", branch)
|
||||
return True, "branch up-to-date with main"
|
||||
|
||||
# ORCH-110 (D4): capture HEAD before/after the rebase to detect a no-op
|
||||
# rebase (branch already contained the latest origin/main).
|
||||
pre_sha = merge_gate.head_sha(repo, branch)
|
||||
ok, rb_reason = merge_gate.auto_rebase_onto_main(repo, branch)
|
||||
if not ok:
|
||||
merge_gate.release_merge_lease(repo, branch)
|
||||
return False, rb_reason # "rebase conflict: ..."
|
||||
post_sha = merge_gate.head_sha(repo, branch)
|
||||
|
||||
# ORCH-110 (D4 / FR-4 / AC-6): re-test contract. ORCH-043 catches a
|
||||
# SEMANTIC merge conflict that can only arise when ``main`` actually moved
|
||||
# and the branch was really rebased onto new commits. When the rebase was
|
||||
# a PROVEN no-op (HEAD unchanged), there is no "moved main" -> the local
|
||||
# re-test re-checks exactly the commit CI + tester + staging already
|
||||
# validated on THIS HEAD -> it is a redundant single point of false
|
||||
# failure (the ORCH-109 timeout incident). Skip it ONLY on a proven no-op
|
||||
# (both SHAs non-empty AND equal); on ANY uncertainty (empty SHA / flag
|
||||
# off) the re-test runs (fail-safe to BR-6/AC-3). This extends to the
|
||||
# premerge_rebase_always=True path the same optimisation the
|
||||
# premerge_rebase_always=False not-behind short-circuit already has.
|
||||
if (
|
||||
bool(getattr(settings, "merge_retest_skip_when_current_enabled", False))
|
||||
and pre_sha and post_sha and pre_sha == post_sha
|
||||
):
|
||||
logger.info(
|
||||
"check_branch_mergeable: %s rebase no-op (HEAD %s unchanged) -> "
|
||||
"re-test skipped (HEAD CI-validated)", branch, pre_sha[:8],
|
||||
)
|
||||
merge_gate.note_retest_skipped_current()
|
||||
return True, (
|
||||
"branch up-to-date (re-test skipped: rebase no-op, HEAD CI-validated)"
|
||||
)
|
||||
|
||||
ok_t, t_reason = merge_gate.retest_branch(repo, branch)
|
||||
if ok_t:
|
||||
|
||||
@@ -995,6 +995,22 @@ def _handle_merge_gate(
|
||||
)
|
||||
return True
|
||||
|
||||
# ORCH-110 (D2/D3): classify the FAIL cause. An INFRA re-test TIMEOUT (the
|
||||
# ORCH-109 incident: green CI + green tester, but the local re-test blew its
|
||||
# wall-clock budget under CPU starvation) is a TRANSIENT, NOT a code fault — it
|
||||
# must NOT take the _handle_merge_gate_rollback path (rollback to development +
|
||||
# burn a developer retry that nobody can fix). Route it to a bounded infra-retry
|
||||
# instead. A deterministically RED re-test / conflict still rolls back (BR-6 /
|
||||
# AC-3). Kill-switch off -> byte-for-byte the prior behaviour (timeout ->
|
||||
# rollback), NFR-2.
|
||||
if merge_gate.classify_retest_failure(reason) == "timeout":
|
||||
merge_gate.note_retest_timeout(work_item_id)
|
||||
if settings.merge_retest_infra_tolerance_enabled:
|
||||
_handle_merge_gate_infra_retry(
|
||||
task_id, current_stage, repo, work_item_id, branch, reason, result
|
||||
)
|
||||
return True
|
||||
|
||||
_handle_merge_gate_rollback(
|
||||
task_id, current_stage, repo, work_item_id, branch, reason, result
|
||||
)
|
||||
@@ -1044,6 +1060,110 @@ def _handle_merge_gate_defer(
|
||||
)
|
||||
|
||||
|
||||
def _merge_infra_retry_count(task_id: int) -> int:
|
||||
"""How many times this task was re-queued by the merge-gate infra-timeout path.
|
||||
|
||||
Restart-safe (mirror of ``_merge_defer_count``): counted from the persisted jobs
|
||||
queue by the infra-retry marker in task_content, so a service restart never
|
||||
resets the infra-retry budget (ORCH-110 D3 / AC-9).
|
||||
"""
|
||||
conn = get_db()
|
||||
n = conn.execute(
|
||||
"SELECT COUNT(*) FROM jobs WHERE task_id=? AND "
|
||||
"task_content LIKE '%merge-gate infra-timeout retry%'",
|
||||
(task_id,),
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
return n
|
||||
|
||||
|
||||
def _handle_merge_gate_infra_retry(
|
||||
task_id, current_stage, repo, work_item_id, branch, reason, result: AdvanceResult
|
||||
):
|
||||
"""ORCH-110 (D3): merge-gate re-test INFRA timeout -> bounded infra-retry.
|
||||
|
||||
Mirror of ``_handle_merge_gate_defer`` (NOT ``_handle_merge_gate_rollback``): the
|
||||
task STAYS on deploy-staging, the staging-deployer is re-queued after a delay, and
|
||||
there is NO ``update_task_stage("development")``, NO developer-retry increment and
|
||||
NO code-fault QG-failure semantics — so a green-CI + green-tester task is never
|
||||
falsely sent back to development for an infra hiccup (BR-1 / AC-1). The merge lease
|
||||
was already released by ``check_branch_mergeable`` on the timeout, so the re-run
|
||||
re-acquires it normally (defer-path parity).
|
||||
|
||||
Bounded (anti-loop, NFR-5 / AC-9): after ``merge_retest_infra_max_retries``
|
||||
re-tries the task is blocked with an INFRA-specific alert (a resource problem,
|
||||
explicitly NOT "developer must fix"), so it never bounces forever. The first
|
||||
timeout already tree-killed the orphan processes (D1), so the next re-test usually
|
||||
passes — the retry is recovery, not masking. The task does NOT move to development.
|
||||
|
||||
never-raise (TC-09): any unexpected error in this transient path is logged with a
|
||||
WARNING and swallowed (the task simply stays on deploy-staging for the reconciler /
|
||||
reaper to re-drive) so an exception never escapes into ``advance_stage``.
|
||||
"""
|
||||
try:
|
||||
_merge_gate_infra_retry_impl(
|
||||
task_id, current_stage, repo, work_item_id, branch, reason, result
|
||||
)
|
||||
except Exception as e: # noqa: BLE001 - never-raise; do not crash advance_stage
|
||||
logger.warning(
|
||||
f"Task {task_id}: merge-gate infra-retry path error: {e} "
|
||||
f"(swallowed; task stays on deploy-staging)"
|
||||
)
|
||||
result.note = "merge-gate-infra-retry-error"
|
||||
|
||||
|
||||
def _merge_gate_infra_retry_impl(
|
||||
task_id, current_stage, repo, work_item_id, branch, reason, result: AdvanceResult
|
||||
):
|
||||
"""Body of the merge-gate infra-retry (ORCH-110 D3); wrapped by the never-raise
|
||||
``_handle_merge_gate_infra_retry``."""
|
||||
retries = _merge_infra_retry_count(task_id)
|
||||
if retries < settings.merge_retest_infra_max_retries:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: deploy-staging\nNote: merge-gate infra-timeout retry "
|
||||
f"(attempt {retries + 1}/{settings.merge_retest_infra_max_retries}) — "
|
||||
f"re-test hit the wall-clock budget (infra/resource, NOT a code defect); "
|
||||
f"retrying after {settings.merge_retest_infra_retry_delay_s}s. Reason: {reason}."
|
||||
)
|
||||
new_job = enqueue_job(
|
||||
"deployer", repo, task_desc, task_id=task_id,
|
||||
available_at_delay_s=settings.merge_retest_infra_retry_delay_s,
|
||||
)
|
||||
merge_gate.note_retest_infra_retry()
|
||||
result.enqueued_agent = "deployer"
|
||||
result.enqueued_job_id = new_job
|
||||
result.note = "merge-gate-infra-retry"
|
||||
logger.warning(
|
||||
f"Task {task_id}: merge-gate re-test infra-timeout, re-queued deployer "
|
||||
f"(job_id={new_job}, attempt {retries + 1}/"
|
||||
f"{settings.merge_retest_infra_max_retries}); NOT a code fault ({reason})"
|
||||
)
|
||||
else:
|
||||
merge_gate.note_retest_infra_exhausted()
|
||||
set_issue_blocked(work_item_id)
|
||||
send_telegram(
|
||||
f"⏳ {link_for(work_item_id)}: merge-gate re-test infra-timeout "
|
||||
f"сохраняется после {settings.merge_retest_infra_max_retries} повторов — "
|
||||
f"ресурсная проблема (CPU / осиротевшие процессы), НЕ дефект кода. "
|
||||
f"Проверьте хост / нужно ручное вмешательство."
|
||||
)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"⏳ Merge-gate re-test infra-timeout сохраняется после "
|
||||
f"{settings.merge_retest_infra_max_retries} повторов — инфраструктурная "
|
||||
f"проблема (ресурсы / осиротевшие процессы), НЕ дефект кода. Задача "
|
||||
f"остаётся на deploy-staging; нужно ручное вмешательство (НЕ доработка кода).",
|
||||
author="deployer",
|
||||
)
|
||||
result.alerted = True
|
||||
result.note = "merge-gate-infra-retry-exhausted"
|
||||
logger.error(
|
||||
f"Task {task_id}: merge-gate re-test infra-timeout exhausted "
|
||||
f"({settings.merge_retest_infra_max_retries} retries) ({reason})"
|
||||
)
|
||||
|
||||
|
||||
def _handle_merge_gate_rollback(
|
||||
task_id, current_stage, repo, work_item_id, branch, reason, result: AdvanceResult
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user