Files
orchestrator/src/post_deploy.py
claude-bot 2f4c553fd8 feat(post-deploy): post-deploy prod monitoring + degradation reaction (ORCH-021)
Extend pipeline responsibility past deploy->done: after the terminal
transition for an applicable repo, arm a ~15min observation window that
probes prod and reacts to a degradation the restart-time health-check
missed ("green deploy, red prod").

- src/post_deploy.py: new leaf module (config + lazy qg/db only).
  Sentinel-file restart-safe state (.post-deploy-state-<repo>/<wi>/),
  no DB migration. probe_signals/classify/decide_action/run_rollback,
  all never-raise.
- Reserved-agent job `post-deploy-monitor` (no-LLM, Variant B, calque of
  deploy-finalizer): self-requeues each tick via enqueue_job.
- Deterministic classify: DEGRADED iff >= fail_threshold consecutive
  health failures OR window 5xx ratio > 5xx_threshold; fail-safe HEALTHY.
- Self-hosting invariant (BR-5/AC-8): a tick NEVER restarts the prod
  orchestrator container -> orchestrator is ALWAYS ALERT_ONLY.
- Conditionality (ORCH-35/36/43/58): kill-switch + CSV repos, empty ->
  self-hosting only.
- QG_CHECKS / STAGE_TRANSITIONS / schema unchanged (AC-12).
- Docs: CHANGELOG, CLAUDE artefact list (16-post-deploy-log.md),
  architecture README, .env.example (ORCH_POST_DEPLOY_*).

Refs: ORCH-021

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-07 14:40:06 +00:00

615 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Post-deploy production monitoring + degradation reaction (ORCH-021).
The pipeline used to end at ``deploy -> done`` and then **forget about prod**:
"success" meant the health-check passed at restart (~60s window in
``scripts/orchestrator-deploy-hook.sh``). The class of incidents "green deploy,
red prod" (precedent ET-8 — degradation appears minutes later under real
traffic; ``/health`` answers ``200 ok`` while the feature is broken) was never
caught. ORCH-021 extends responsibility **PAST** ``done``: after the terminal
transition for an applicable repo we arm an observation window
(``post_deploy_window_s`` ~15 min, interval ``post_deploy_interval_s``);
degradation is detected by deterministic thresholds and, when confirmed,
triggers a reaction.
The observation mechanism (ADR-001 §1, Variant B) is a **reserved-agent job**
``post-deploy-monitor`` — a deterministic, no-LLM job modelled exactly on
``deploy-finalizer``. One "tick" == one job: it does ONE probe, appends to a
persisted ``series`` file, classifies, and either re-queues itself with a delay
(``available_at_delay_s``) or finishes (DEGRADED -> reaction; or window expired
-> HEALTHY). Between ticks no job runs (it is scheduled in the future), so the
single worker stays free for other projects — exactly like the finalizer defer.
This module is a **leaf** (mirrors ``self_deploy.py`` / ``staging_verdict.py``):
it imports only config (and lazily ``qg.checks.is_self_hosting_repo``), never
``stage_engine`` / ``launcher`` — the orchestration that needs those lives in
``stage_engine.run_post_deploy_monitor``. Every public helper honours a
**never-raise** contract so a monitoring hiccup can never crash the worker /
lifespan / the pipeline of other projects (AC-16).
Restart-safe state lives in sentinel files under
``<repos_dir>/.post-deploy-state-<repo>/<work_item_id>/`` (mirrors the
deploy-state pattern, no DB migration — ТЗ §2.7):
* ``armed`` — monitoring armed for this work item (idempotency-guard, AC-15);
* ``series`` — JSON list of probe results (restart-safe streak/5xx counters);
* ``done`` — monitoring finished (anti-dupe, AC-15).
Self-hosting safety (BR-5 / AC-8): a monitor tick NEVER auto-rolls-back or
restarts the prod ``orchestrator`` container — for ``orchestrator`` the reaction
is ALWAYS ``ALERT_ONLY`` (loud Telegram + Plane, manual approve).
"""
from __future__ import annotations
import glob
import json
import logging
import os
import shlex
import subprocess
import urllib.error
import urllib.request
from dataclasses import dataclass
from .config import settings
logger = logging.getLogger("orchestrator.post_deploy")
# Sentinel marker filenames (see module docstring).
ARMED = "armed"
SERIES = "series"
DONE = "done"
# Verdicts (classify).
HEALTHY = "HEALTHY"
DEGRADED = "DEGRADED"
# Reaction decisions (decide_action).
NONE = "NONE"
ROLLBACK = "ROLLBACK"
ALERT_ONLY = "ALERT_ONLY"
# action_taken values written to the artefact frontmatter.
ROLLBACK_OK = "ROLLBACK_OK"
ROLLBACK_FAILED = "ROLLBACK_FAILED"
# The 5xx-monitored endpoints (besides /health, whose 200+ok is its own signal).
_FIVEXX_ENDPOINTS = ("/status", "/queue")
_PROBE_TIMEOUT = 5
_SSH_TIMEOUT = 60
_GIT_TIMEOUT = 60
# ---------------------------------------------------------------------------
# Conditionality (mirrors self_deploy_applies / _merge_gate_applies)
# ---------------------------------------------------------------------------
def post_deploy_applies(repo: str) -> bool:
"""Whether post-deploy monitoring is REAL for this repo (AC-2 / AC-10).
Mirrors the ORCH-35/36/43/58 conditional rollout:
* ``post_deploy_monitor_enabled=False`` -> always False (global
kill-switch); the pipeline is 1:1 as before ORCH-021 (AC-10).
* ``post_deploy_repos`` (CSV) non-empty -> real only for listed repos.
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``).
Never raises.
"""
try:
if not settings.post_deploy_monitor_enabled:
return False
raw = (settings.post_deploy_repos or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
# Lazy import keeps this module a leaf (avoid importing qg at load time).
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("post_deploy_applies error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# Signal probe (one tick)
# ---------------------------------------------------------------------------
@dataclass
class ProbeResult:
"""Outcome of ONE probe tick (JSON-serialisable via ``as_dict``).
``health_ok`` — ``/health`` answered HTTP 200 with ``{"status": "ok"}``.
``total`` — number of 5xx-monitored endpoints probed (``/status``,
``/queue``) — the denominator of the window 5xx ratio.
``fivexx`` — how many of those returned 5xx (or were unreachable, which
is conservatively counted as a server failure).
``detail`` — human-readable note (logs / artefact body).
"""
health_ok: bool
total: int
fivexx: int
detail: str = ""
def as_dict(self) -> dict:
return {
"health_ok": bool(self.health_ok),
"total": int(self.total),
"fivexx": int(self.fivexx),
"detail": str(self.detail),
}
def _http_status(url: str) -> tuple[int, str]:
"""GET ``url`` -> (http_code, body). Network/timeout -> (0, "").
Never raises. ``urllib`` raises ``HTTPError`` for >=400 responses; we treat
that as a real status code (so a 5xx is observed, not swallowed).
"""
try:
with urllib.request.urlopen(url, timeout=_PROBE_TIMEOUT) as resp: # noqa: S310
body = resp.read(4096).decode("utf-8", "replace")
return int(getattr(resp, "status", resp.getcode())), body
except urllib.error.HTTPError as e:
try:
body = e.read(4096).decode("utf-8", "replace")
except Exception:
body = ""
return int(e.code), body
except Exception as e: # noqa: BLE001 - URLError / socket timeout / anything
logger.warning("post_deploy probe error for %s: %s", url, e)
return 0, ""
def probe_signals(base_url: str) -> ProbeResult:
"""Probe ``/health`` + the key endpoints of the prod instance ONCE (AC-16).
``/health`` is healthy iff HTTP 200 AND the body parses to
``{"status": "ok"}``. ``/status`` and ``/queue`` contribute to the window
5xx ratio: an HTTP 5xx OR an unreachable endpoint (network error / timeout,
code 0) is counted as a failure (conservative — a down server is bad). A
network failure yields a conservative "failed" probe, NEVER an exception
(TC-14).
"""
base = (base_url or "").rstrip("/")
# --- /health: the primary liveness signal ---
code, body = _http_status(base + "/health")
health_ok = False
if code == 200:
try:
health_ok = json.loads(body).get("status") == "ok"
except Exception:
health_ok = False
# --- /status, /queue: 5xx ratio over the window ---
total = 0
fivexx = 0
for ep in _FIVEXX_ENDPOINTS:
total += 1
ep_code, _ = _http_status(base + ep)
if ep_code == 0 or 500 <= ep_code <= 599:
fivexx += 1
detail = f"health={code}({'ok' if health_ok else 'bad'}) 5xx={fivexx}/{total}"
return ProbeResult(health_ok=health_ok, total=total, fivexx=fivexx, detail=detail)
# ---------------------------------------------------------------------------
# Classification (pure, no I/O — the MAIN unit-test subject, like
# compute_staging_verdict in ORCH-061)
# ---------------------------------------------------------------------------
def classify(series, fail_threshold: int, fivexx_threshold: float) -> str:
"""Fold a probe series into ``HEALTHY`` | ``DEGRADED`` (deterministic, pure).
``series`` — iterable of probe dicts (``{"health_ok", "total", "fivexx"}``),
as persisted by :func:`append_probe`.
Decision (BR-3 / AC-3..AC-6):
* ``>= fail_threshold`` CONSECUTIVE health failures -> ``DEGRADED`` (AC-4);
* window 5xx ratio ``sum(fivexx)/sum(total)`` strictly ``> fivexx_threshold``
-> ``DEGRADED`` even if ``/health`` answers 200 (AC-5);
* otherwise ``HEALTHY`` — a single glitch below the threshold that recovers
does NOT trip (AC-3 / AC-6, no false rollback).
Never raises: on malformed input it returns ``HEALTHY`` (fail-SAFE — a false
``DEGRADED`` would trigger an unwanted rollback, the worse outcome).
"""
try:
# Non-list input is malformed -> fail-safe HEALTHY (never a false rollback).
if not isinstance(series, (list, tuple)):
return HEALTHY
# Longest run of consecutive health failures.
streak = 0
best = 0
total = 0
fivexx = 0
for row in series:
# A non-dict row is malformed: skip it (do NOT count it as a failure,
# which could fabricate a DEGRADED streak from garbage).
if not isinstance(row, dict):
continue
ok = bool(row.get("health_ok"))
total += int(row.get("total") or 0)
fivexx += int(row.get("fivexx") or 0)
if ok:
streak = 0
else:
streak += 1
if streak > best:
best = streak
if best >= int(fail_threshold):
return DEGRADED
if total > 0 and (fivexx / total) > float(fivexx_threshold):
return DEGRADED
return HEALTHY
except Exception as e: # noqa: BLE001 - never-raise; fail-safe to HEALTHY
logger.warning("post_deploy classify error: %s", e)
return HEALTHY
def decide_action(repo: str, verdict: str) -> str:
"""Decide the reaction for ``(repo, verdict)`` (pure, BR-5 / AC-7 / AC-8).
* ``HEALTHY`` -> ``NONE`` (no reaction, any repo);
* ``DEGRADED`` + self-hosting -> ``ALERT_ONLY`` (ALWAYS — the tick
NEVER auto-rolls-back / restarts the prod orchestrator container, AC-8);
* ``DEGRADED`` + non-self + ``post_deploy_auto_rollback=True`` -> ``ROLLBACK``;
* ``DEGRADED`` + non-self + auto_rollback False (default) -> ``ALERT_ONLY``.
Never raises: on doubt returns ``ALERT_ONLY`` (never an unexpected rollback).
"""
try:
if verdict != DEGRADED:
return NONE
from .qg.checks import is_self_hosting_repo
if is_self_hosting_repo(repo):
return ALERT_ONLY # BR-5: self-hosting is NEVER auto-rolled-back
if settings.post_deploy_auto_rollback:
return ROLLBACK
return ALERT_ONLY
except Exception as e: # noqa: BLE001 - never-raise; safe default
logger.warning("post_deploy decide_action error for %s: %s", repo, e)
return ALERT_ONLY
def map_rollback_exit_code(exit_code) -> str:
"""Map a ``--rollback`` hook exit-code to an ``action_taken`` (pure, AC-9).
Hook exit-code contract (unchanged, 0/1/2):
* ``0`` -> ``ROLLBACK_OK`` (rollback proven healthy);
* ``1`` (no prev image), ``2`` (rollback also failed), anything else, or a
non-int/None -> ``ROLLBACK_FAILED`` (fail-closed -> loud escalation).
"""
try:
code = int(exit_code)
except (TypeError, ValueError):
return ROLLBACK_FAILED
return ROLLBACK_OK if code == 0 else ROLLBACK_FAILED
# ---------------------------------------------------------------------------
# Sentinel state (restart-safe, no DB migration — ТЗ §2.7)
# ---------------------------------------------------------------------------
def _state_dir(base: str, repo: str, work_item_id: str | None) -> str:
return os.path.join(base, f".post-deploy-state-{repo}", (work_item_id or "_"))
def state_dir(repo: str, work_item_id: str | None) -> str:
"""State dir as seen from the container (``settings.repos_dir`` mount)."""
return _state_dir(settings.repos_dir, repo, work_item_id)
def host_state_dir(repo: str, work_item_id: str | None) -> str:
"""State dir as seen from the HOST (``settings.host_repos_dir``).
Same physical directory as :func:`state_dir` via the shared mount; the host
path is what we embed in an ssh command if a host-side helper needs it.
"""
return _state_dir(settings.host_repos_dir, repo, work_item_id)
def marker_path(repo: str, work_item_id: str | None, name: str) -> str:
return os.path.join(state_dir(repo, work_item_id), name)
def has_marker(repo: str, work_item_id: str | None, name: str) -> bool:
"""True iff the named sentinel exists. Never raises."""
try:
return os.path.isfile(marker_path(repo, work_item_id, name))
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("has_marker error for %s/%s/%s: %s", repo, work_item_id, name, e)
return False
def write_marker(repo: str, work_item_id: str | None, name: str, content: str = "") -> bool:
"""Create/overwrite a sentinel (best-effort). Returns True on success."""
try:
d = state_dir(repo, work_item_id)
os.makedirs(d, exist_ok=True)
with open(os.path.join(d, name), "w", encoding="utf-8") as f:
f.write(str(content))
return True
except OSError as e:
logger.warning("write_marker error for %s/%s/%s: %s", repo, work_item_id, name, e)
return False
def mark_done(repo: str, work_item_id: str | None) -> bool:
"""Mark monitoring finished for this work item (anti-dupe, AC-15)."""
return write_marker(repo, work_item_id, DONE, "done")
def read_series(repo: str, work_item_id: str | None) -> list:
"""Read the persisted probe series (JSON list). Missing/corrupt -> ``[]``.
Never raises — restart-safe streak/5xx counters survive a container restart.
"""
p = marker_path(repo, work_item_id, SERIES)
try:
with open(p, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except FileNotFoundError:
return []
except Exception as e: # noqa: BLE001 - never-raise; corrupt -> empty
logger.warning("read_series error for %s/%s: %s", repo, work_item_id, e)
return []
def append_probe(repo: str, work_item_id: str | None, probe: ProbeResult) -> list:
"""Append a probe to the persisted series and return the new list.
Best-effort (a write error logs and returns the in-memory list so the tick
still classifies). Never raises.
"""
series = read_series(repo, work_item_id)
try:
series.append(probe.as_dict() if isinstance(probe, ProbeResult) else dict(probe))
except Exception as e: # noqa: BLE001
logger.warning("append_probe coerce error for %s/%s: %s", repo, work_item_id, e)
return series
try:
d = state_dir(repo, work_item_id)
os.makedirs(d, exist_ok=True)
with open(os.path.join(d, SERIES), "w", encoding="utf-8") as f:
json.dump(series, f)
except OSError as e:
logger.warning("append_probe write error for %s/%s: %s", repo, work_item_id, e)
return series
def arm_monitor(repo: str, work_item_id: str | None, branch: str, task_id: int) -> bool:
"""Arm post-deploy monitoring after ``deploy -> done`` (AC-1 / AC-15).
Idempotent: if the ``armed`` sentinel already exists this is a no-op (a double
webhook / reconciler F-1 / finalizer Phase C can drive ``done`` more than once,
AC-15). Otherwise creates the state dir, writes ``armed`` + an empty ``series``,
and enqueues the FIRST ``post-deploy-monitor`` job with a delay of one interval
(so the prod has settled before the first probe). Returns True iff it armed a
NEW monitor. Never raises — the caller (terminal block of ``advance_stage``)
must never be crashed by a monitoring hiccup.
"""
try:
if has_marker(repo, work_item_id, ARMED):
logger.info("arm_monitor: already armed for %s/%s (no-op)", repo, work_item_id)
return False
write_marker(repo, work_item_id, ARMED, "armed")
# Initialise an empty series so read_series is well-defined from tick 1.
try:
d = state_dir(repo, work_item_id)
os.makedirs(d, exist_ok=True)
with open(os.path.join(d, SERIES), "w", encoding="utf-8") as f:
json.dump([], f)
except OSError as e:
logger.warning("arm_monitor: series init error for %s/%s: %s", repo, work_item_id, e)
# Lazy import keeps this module a leaf (db is a low-level dependency).
from .db import enqueue_job
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: post-deploy\nNote: post-deploy monitor tick 1 "
f"(window {settings.post_deploy_window_s}s, interval "
f"{settings.post_deploy_interval_s}s)."
)
job_id = enqueue_job(
"post-deploy-monitor", repo, task_desc, task_id=task_id,
available_at_delay_s=settings.post_deploy_interval_s,
)
logger.info(
"arm_monitor: armed post-deploy monitor for %s/%s (job_id=%s)",
repo, work_item_id, job_id,
)
return True
except Exception as e: # noqa: BLE001 - never-raise contract
logger.error("arm_monitor error for %s/%s: %s", repo, work_item_id, e)
return False
def max_ticks() -> int:
"""Bounded tick budget for the window (anti-livelock, like
``deploy_finalize_max_attempts``): ``window_s // interval_s`` (>= 1)."""
try:
interval = max(1, int(settings.post_deploy_interval_s))
return max(1, int(settings.post_deploy_window_s) // interval)
except Exception: # noqa: BLE001 - never-raise
return 1
# ---------------------------------------------------------------------------
# Rollback command (non-self repos only; reuses deploy_prod_* env — ТЗ §2.4)
# ---------------------------------------------------------------------------
def build_rollback_command(repo: str) -> list[str]:
"""Build the ssh argv that runs the deploy hook in ``--rollback`` mode.
Mirrors ``self_deploy.build_deploy_command`` (same prod-env, INFRA P-2 ssh
target) but the action is ``--rollback`` and the call is SYNCHRONOUS (the
target container is NOT the orchestrator, so it is safe to wait for the hook
exit-code directly — no detached setsid wrapper, no ``result`` sentinel).
Reuses the existing ``deploy_prod_*`` settings; no new duplicate config.
"""
env_assignments = (
f"TARGET_SERVICE={shlex.quote(settings.deploy_prod_target_service)} "
f"TARGET_PORT={int(settings.deploy_prod_target_port)} "
f"TARGET_IMAGE={shlex.quote(settings.deploy_prod_target_image)} "
f"COMPOSE_PROFILE={shlex.quote(settings.deploy_prod_compose_profile)} "
f"PREV_IMAGE_FILE={shlex.quote(settings.deploy_prod_prev_image_file)}"
)
inner = (
f"cd {shlex.quote(settings.deploy_host_repo_path)} && "
f"{env_assignments} "
f"bash {shlex.quote(settings.deploy_hook_script)} --rollback"
)
user = (settings.deploy_ssh_user or "").strip()
host = (settings.deploy_ssh_host or "").strip()
target = f"{user}@{host}" if user else host
return ["ssh", "-o", "StrictHostKeyChecking=no", target, inner]
def run_rollback(repo: str) -> tuple[int, str]:
"""Run the ``--rollback`` hook synchronously. Returns ``(exit_code, detail)``.
Never raises: an ssh launch error / timeout maps to a non-zero exit-code so
the caller records ``ROLLBACK_FAILED`` and escalates (AC-9). NEVER used for
the self-hosting repo (``decide_action`` returns ``ALERT_ONLY`` there) — the
structural guard against a tick restarting the prod orchestrator (AC-8).
"""
cmd = build_rollback_command(repo)
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=_SSH_TIMEOUT)
except subprocess.TimeoutExpired:
return 2, "rollback ssh timeout"
except (subprocess.SubprocessError, OSError) as e:
return 2, f"rollback ssh error: {e}"
detail = ((r.stderr or "") + (r.stdout or "")).strip()[:200]
return int(r.returncode), detail
# ---------------------------------------------------------------------------
# Artefact 16-post-deploy-log.md (machine-readable frontmatter — ТЗ §2.5)
# ---------------------------------------------------------------------------
def build_post_deploy_log(
work_item_id: str,
status: str,
action_taken: str,
window_s: int,
checks_total: int,
checks_failed: int,
body_extra: str = "",
) -> str:
"""Render a 16-post-deploy-log.md body. Only the YAML-frontmatter is machine
read (canon of gates; the loop-of-lessons ORCH-8 consumes it, BR-10). The
body is informational. Parseable by ``yaml.safe_load`` (AC-13).
"""
return (
"---\n"
f"post_deploy_status: {status}\n"
f"action_taken: {action_taken}\n"
f"work_item: {work_item_id}\n"
f"window_s: {int(window_s)}\n"
f"checks_total: {int(checks_total)}\n"
f"checks_failed: {int(checks_failed)}\n"
"---\n\n"
"# Post-deploy log — ORCH-021 post-deploy monitor\n\n"
f"Наблюдение прода завершено: `post_deploy_status: {status}`, "
f"`action_taken: {action_taken}`.\n\n"
f"Окно наблюдения: {int(window_s)}s; опросов всего: {int(checks_total)}, "
f"из них с провалом: {int(checks_failed)}.\n"
f"{body_extra}"
)
def write_post_deploy_log(
repo: str,
work_item_id: str,
branch: str,
status: str,
action_taken: str,
window_s: int,
checks_total: int,
checks_failed: int,
body_extra: str = "",
) -> bool:
"""Write 16-post-deploy-log.md into the task worktree and best-effort
commit+push it. Returns True iff the file was written. Never raises — the
artefact is best-effort, its absence rolls nothing back (AC-13 / TC-15).
"""
from .git_worktree import get_worktree_path
rel = f"docs/work-items/{work_item_id}/16-post-deploy-log.md"
try:
wt = get_worktree_path(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise
logger.error("write_post_deploy_log: worktree error for %s/%s: %s", repo, branch, e)
return False
path = os.path.join(wt, rel)
content = build_post_deploy_log(
work_item_id, status, action_taken, window_s, checks_total, checks_failed, body_extra
)
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
except OSError as e:
logger.error("write_post_deploy_log: write error at %s: %s", path, e)
return False
git_env = {
**os.environ,
"HOME": "/home/slin",
"GIT_AUTHOR_NAME": "post-deploy-monitor",
"GIT_AUTHOR_EMAIL": "post-deploy-monitor@mva154.local",
"GIT_COMMITTER_NAME": "post-deploy-monitor",
"GIT_COMMITTER_EMAIL": "post-deploy-monitor@mva154.local",
}
try:
subprocess.run(["git", "-C", wt, "add", rel],
capture_output=True, timeout=_GIT_TIMEOUT, env=git_env)
commit = subprocess.run(
["git", "-C", wt, "commit", "-m",
f"docs(ORCH-021): post-deploy {status}/{action_taken} for {work_item_id}"],
capture_output=True, text=True, timeout=_GIT_TIMEOUT, env=git_env,
)
if commit.returncode == 0:
subprocess.run(["git", "-C", wt, "push", "origin", branch],
capture_output=True, timeout=_GIT_TIMEOUT, env=git_env)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("write_post_deploy_log: git commit/push best-effort failed: %s", e)
return True
# ---------------------------------------------------------------------------
# Observability snapshot for GET /queue (BR-9 / AC-14)
# ---------------------------------------------------------------------------
def status() -> dict:
"""Post-deploy snapshot for /queue observability. Never raises.
``active`` — work items with an ``armed`` sentinel but no ``done`` yet (a
monitoring window in flight). ``last_outcome`` — best-effort last finished
window read from the most-recent ``done`` state dir's series length.
"""
snap = {
"enabled": False,
"window_s": None,
"interval_s": None,
"repos": "",
"active": [],
"active_count": 0,
}
try:
snap["enabled"] = bool(settings.post_deploy_monitor_enabled)
snap["window_s"] = int(settings.post_deploy_window_s)
snap["interval_s"] = int(settings.post_deploy_interval_s)
snap["repos"] = settings.post_deploy_repos or ""
pattern = os.path.join(settings.repos_dir, ".post-deploy-state-*", "*")
active: list[str] = []
for d in glob.glob(pattern):
try:
if not os.path.isdir(d):
continue
if os.path.isfile(os.path.join(d, ARMED)) and not os.path.isfile(
os.path.join(d, DONE)
):
active.append(os.path.basename(d))
except Exception: # noqa: BLE001 - skip one dir
continue
snap["active"] = sorted(active)
snap["active_count"] = len(active)
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("post_deploy status snapshot error: %s", e)
return snap