Files
orchestrator/src/post_deploy.py
claude-bot f1635ddb39
All checks were successful
CI / test (push) Successful in 57s
CI / test (pull_request) Successful in 55s
feat(replication): расхардкод хоста + секреты нового хоста + smoke-runbook
Фундамент тиража 10-common (эпик ORCH-10): платформа разворачивается на
новой инфре без правки кода — только env/конфиг. Каждый дефолт = боевому
значению (пустой .env => поведение 1:1, kill-switch-природа, NFR-2);
STAGE_TRANSITIONS/QG_CHECKS/check_*/machine-verdict/схема БД не тронуты.

- config: agent_home_dir / agent_git_name / git_email_domain / staging_port
  (ADR-001 D2/D4); код-блокеры A1-A4 закрыты: plane_sync ссылки из
  gitea_public_url+gitea_owner, launcher - единый agent_git_env() (x2 места),
  self_deploy/post_deploy - HOME+домен из Settings (имена системных акторов -
  платформенные литералы)
- image_freshness: staging_port из конфига + fail-closed guard
  staging_port == прод-порт -> отказ ДО ssh/build (инвариант ORCH-058 AC-9
  стал исполняемым); REPO= передаётся хуку явно обоими инвокерами (D7)
- SELF_HOSTING_REPO - нормативная платформенная константа (D3, пин-тест)
- compose: полная ${VAR:-default}-интерполяция (реестр B, карта D6); группа
  ORCH-040 uid/gid/HOME/маунты двигается согласованно (build.args APP_*);
  group_add "МИНА 1" сохранён x3; оба app-сервиса с явным command:
- Dockerfile: ARG APP_UID/APP_GID/APP_USER/APP_HOME (CMD exec-form 8500
  сознательно не тронут - D5); deploy-hook: REPO="${REPO:-...}" (D1 реестра)
- секреты: stdlib scripts/gen_secrets.py (token_hex(32); печать по умолчанию;
  --write никогда не перезаписывает существующий .env молча, exit=2;
  перезапись только --force); .env.example дополнен до полноты ключей старта
- доки: новый docs/operations/REPLICATION.md (карта env, чек-лист секретов,
  smoke-процедура с PASS/FAIL, границы 10-common/Lite/Bundled), INFRA.md,
  README, CLAUDE.md, CHANGELOG
- анти-регресс: tests/test_no_host_hardcodes.py (tokenize-сканер запрещённых
  литералов, config-модули - структурное исключение, allowlist пуст,
  негативная самопроверка) + test_host_config_keys / test_infra_parametrization
  / test_secrets_gen / test_replication_smoke; согласованные структурные
  правки test_orch040_compose (судит резолв дефолтов) и
  test_deploy_hook_rollback_sim (REPO через env-override = контракт D7)

Полный регресс: 1764 passed.

Refs: ORCH-101

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 20:50:43 +03:00

640 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Post-deploy production monitoring + degradation reaction (ORCH-021).
The pipeline used to end at ``deploy -> done`` and then **forget about prod**:
"success" meant the health-check passed at restart (~60s window in
``scripts/orchestrator-deploy-hook.sh``). The class of incidents "green deploy,
red prod" (precedent ET-8 — degradation appears minutes later under real
traffic; ``/health`` answers ``200 ok`` while the feature is broken) was never
caught. ORCH-021 extends responsibility **PAST** ``done``: after the terminal
transition for an applicable repo we arm an observation window
(``post_deploy_window_s`` ~15 min, interval ``post_deploy_interval_s``);
degradation is detected by deterministic thresholds and, when confirmed,
triggers a reaction.
The observation mechanism (ADR-001 §1, Variant B) is a **reserved-agent job**
``post-deploy-monitor`` — a deterministic, no-LLM job modelled exactly on
``deploy-finalizer``. One "tick" == one job: it does ONE probe, appends to a
persisted ``series`` file, classifies, and either re-queues itself with a delay
(``available_at_delay_s``) or finishes (DEGRADED -> reaction; or window expired
-> HEALTHY). Between ticks no job runs (it is scheduled in the future), so the
single worker stays free for other projects — exactly like the finalizer defer.
This module is a **leaf** (mirrors ``self_deploy.py`` / ``staging_verdict.py``):
it imports only config (and lazily ``qg.checks.is_self_hosting_repo``), never
``stage_engine`` / ``launcher`` — the orchestration that needs those lives in
``stage_engine.run_post_deploy_monitor``. Every public helper honours a
**never-raise** contract so a monitoring hiccup can never crash the worker /
lifespan / the pipeline of other projects (AC-16).
Restart-safe state lives in sentinel files under
``<repos_dir>/.post-deploy-state-<repo>/<work_item_id>/`` (mirrors the
deploy-state pattern, no DB migration — ТЗ §2.7):
* ``armed`` — monitoring armed for this work item (idempotency-guard, AC-15);
* ``series`` — JSON list of probe results (restart-safe streak/5xx counters);
* ``done`` — monitoring finished (anti-dupe, AC-15).
Self-hosting safety (BR-5 / AC-8): a monitor tick NEVER auto-rolls-back or
restarts the prod ``orchestrator`` container — for ``orchestrator`` the reaction
is ALWAYS ``ALERT_ONLY`` (loud Telegram + Plane, manual approve).
"""
from __future__ import annotations
import glob
import json
import logging
import os
import shlex
import subprocess
import urllib.error
import urllib.request
from dataclasses import dataclass
from .config import settings
logger = logging.getLogger("orchestrator.post_deploy")
# Sentinel marker filenames (see module docstring).
ARMED = "armed"
SERIES = "series"
DONE = "done"
# Verdicts (classify).
HEALTHY = "HEALTHY"
DEGRADED = "DEGRADED"
# Reaction decisions (decide_action).
NONE = "NONE"
ROLLBACK = "ROLLBACK"
ALERT_ONLY = "ALERT_ONLY"
# action_taken values written to the artefact frontmatter.
ROLLBACK_OK = "ROLLBACK_OK"
ROLLBACK_FAILED = "ROLLBACK_FAILED"
# The 5xx-monitored endpoints (besides /health, whose 200+ok is its own signal).
_FIVEXX_ENDPOINTS = ("/status", "/queue")
_PROBE_TIMEOUT = 5
_SSH_TIMEOUT = 60
_GIT_TIMEOUT = 60
# ---------------------------------------------------------------------------
# Conditionality (mirrors self_deploy_applies / _merge_gate_applies)
# ---------------------------------------------------------------------------
def post_deploy_applies(repo: str) -> bool:
"""Whether post-deploy monitoring is REAL for this repo (AC-2 / AC-10).
Mirrors the ORCH-35/36/43/58 conditional rollout:
* ``post_deploy_monitor_enabled=False`` -> always False (global
kill-switch); the pipeline is 1:1 as before ORCH-021 (AC-10).
* ``post_deploy_repos`` (CSV) non-empty -> real only for listed repos.
* empty CSV -> real ONLY for the self-hosting repo (``orchestrator``).
Never raises.
"""
try:
if not settings.post_deploy_monitor_enabled:
return False
raw = (settings.post_deploy_repos or "").strip()
if raw:
allowed = {r.strip().lower() for r in raw.split(",") if r.strip()}
return (repo or "").strip().lower() in allowed
# Lazy import keeps this module a leaf (avoid importing qg at load time).
from .qg.checks import is_self_hosting_repo
return is_self_hosting_repo(repo)
except Exception as e: # noqa: BLE001 - never-raise contract
logger.warning("post_deploy_applies error for %s: %s", repo, e)
return False
# ---------------------------------------------------------------------------
# Signal probe (one tick)
# ---------------------------------------------------------------------------
@dataclass
class ProbeResult:
"""Outcome of ONE probe tick (JSON-serialisable via ``as_dict``).
``health_ok`` — ``/health`` answered HTTP 200 with ``{"status": "ok"}``.
``total`` — number of 5xx-monitored endpoints probed (``/status``,
``/queue``) — the denominator of the window 5xx ratio.
``fivexx`` — how many of those returned 5xx (or were unreachable, which
is conservatively counted as a server failure).
``detail`` — human-readable note (logs / artefact body).
"""
health_ok: bool
total: int
fivexx: int
detail: str = ""
def as_dict(self) -> dict:
return {
"health_ok": bool(self.health_ok),
"total": int(self.total),
"fivexx": int(self.fivexx),
"detail": str(self.detail),
}
def _http_status(url: str) -> tuple[int, str]:
"""GET ``url`` -> (http_code, body). Network/timeout -> (0, "").
Never raises. ``urllib`` raises ``HTTPError`` for >=400 responses; we treat
that as a real status code (so a 5xx is observed, not swallowed).
"""
try:
with urllib.request.urlopen(url, timeout=_PROBE_TIMEOUT) as resp: # noqa: S310
body = resp.read(4096).decode("utf-8", "replace")
return int(getattr(resp, "status", resp.getcode())), body
except urllib.error.HTTPError as e:
try:
body = e.read(4096).decode("utf-8", "replace")
except Exception:
body = ""
return int(e.code), body
except Exception as e: # noqa: BLE001 - URLError / socket timeout / anything
logger.warning("post_deploy probe error for %s: %s", url, e)
return 0, ""
def probe_signals(base_url: str) -> ProbeResult:
"""Probe ``/health`` + the key endpoints of the prod instance ONCE (AC-16).
``/health`` is healthy iff HTTP 200 AND the body parses to
``{"status": "ok"}``. ``/status`` and ``/queue`` contribute to the window
5xx ratio: an HTTP 5xx OR an unreachable endpoint (network error / timeout,
code 0) is counted as a failure (conservative — a down server is bad). A
network failure yields a conservative "failed" probe, NEVER an exception
(TC-14).
"""
base = (base_url or "").rstrip("/")
# --- /health: the primary liveness signal ---
code, body = _http_status(base + "/health")
health_ok = False
if code == 200:
try:
health_ok = json.loads(body).get("status") == "ok"
except Exception:
health_ok = False
# --- /status, /queue: 5xx ratio over the window ---
total = 0
fivexx = 0
for ep in _FIVEXX_ENDPOINTS:
total += 1
ep_code, _ = _http_status(base + ep)
if ep_code == 0 or 500 <= ep_code <= 599:
fivexx += 1
detail = f"health={code}({'ok' if health_ok else 'bad'}) 5xx={fivexx}/{total}"
return ProbeResult(health_ok=health_ok, total=total, fivexx=fivexx, detail=detail)
# ---------------------------------------------------------------------------
# Classification (pure, no I/O — the MAIN unit-test subject, like
# compute_staging_verdict in ORCH-061)
# ---------------------------------------------------------------------------
def classify(series, fail_threshold: int, fivexx_threshold: float) -> str:
"""Fold a probe series into ``HEALTHY`` | ``DEGRADED`` (deterministic, pure).
``series`` — iterable of probe dicts (``{"health_ok", "total", "fivexx"}``),
as persisted by :func:`append_probe`.
Decision (BR-3 / AC-3..AC-6):
* ``>= fail_threshold`` CONSECUTIVE health failures -> ``DEGRADED`` (AC-4);
* window 5xx ratio ``sum(fivexx)/sum(total)`` strictly ``> fivexx_threshold``
-> ``DEGRADED`` even if ``/health`` answers 200 (AC-5);
* otherwise ``HEALTHY`` — a single glitch below the threshold that recovers
does NOT trip (AC-3 / AC-6, no false rollback).
Never raises: on malformed input it returns ``HEALTHY`` (fail-SAFE — a false
``DEGRADED`` would trigger an unwanted rollback, the worse outcome).
"""
try:
# Non-list input is malformed -> fail-safe HEALTHY (never a false rollback).
if not isinstance(series, (list, tuple)):
return HEALTHY
# Longest run of consecutive health failures.
streak = 0
best = 0
total = 0
fivexx = 0
for row in series:
# A non-dict row is malformed: skip it (do NOT count it as a failure,
# which could fabricate a DEGRADED streak from garbage).
if not isinstance(row, dict):
continue
ok = bool(row.get("health_ok"))
total += int(row.get("total") or 0)
fivexx += int(row.get("fivexx") or 0)
if ok:
streak = 0
else:
streak += 1
if streak > best:
best = streak
if best >= int(fail_threshold):
return DEGRADED
if total > 0 and (fivexx / total) > float(fivexx_threshold):
return DEGRADED
return HEALTHY
except Exception as e: # noqa: BLE001 - never-raise; fail-safe to HEALTHY
logger.warning("post_deploy classify error: %s", e)
return HEALTHY
def decide_action(repo: str, verdict: str) -> str:
"""Decide the reaction for ``(repo, verdict)`` (pure, BR-5 / AC-7 / AC-8).
* ``HEALTHY`` -> ``NONE`` (no reaction, any repo);
* ``DEGRADED`` + self-hosting -> ``ALERT_ONLY`` (ALWAYS — the tick
NEVER auto-rolls-back / restarts the prod orchestrator container, AC-8);
* ``DEGRADED`` + non-self + ``post_deploy_auto_rollback=True`` -> ``ROLLBACK``;
* ``DEGRADED`` + non-self + auto_rollback False (default) -> ``ALERT_ONLY``.
Never raises: on doubt returns ``ALERT_ONLY`` (never an unexpected rollback).
"""
try:
if verdict != DEGRADED:
return NONE
from .qg.checks import is_self_hosting_repo
if is_self_hosting_repo(repo):
return ALERT_ONLY # BR-5: self-hosting is NEVER auto-rolled-back
if settings.post_deploy_auto_rollback:
return ROLLBACK
return ALERT_ONLY
except Exception as e: # noqa: BLE001 - never-raise; safe default
logger.warning("post_deploy decide_action error for %s: %s", repo, e)
return ALERT_ONLY
def map_rollback_exit_code(exit_code) -> str:
"""Map a ``--rollback`` hook exit-code to an ``action_taken`` (pure, AC-9).
Hook exit-code contract (unchanged, 0/1/2):
* ``0`` -> ``ROLLBACK_OK`` (rollback proven healthy);
* ``1`` (no prev image), ``2`` (rollback also failed), anything else, or a
non-int/None -> ``ROLLBACK_FAILED`` (fail-closed -> loud escalation).
"""
try:
code = int(exit_code)
except (TypeError, ValueError):
return ROLLBACK_FAILED
return ROLLBACK_OK if code == 0 else ROLLBACK_FAILED
# ---------------------------------------------------------------------------
# Sentinel state (restart-safe, no DB migration — ТЗ §2.7)
# ---------------------------------------------------------------------------
def _state_dir(base: str, repo: str, work_item_id: str | None) -> str:
return os.path.join(base, f".post-deploy-state-{repo}", (work_item_id or "_"))
def state_dir(repo: str, work_item_id: str | None) -> str:
"""State dir as seen from the container (``settings.repos_dir`` mount)."""
return _state_dir(settings.repos_dir, repo, work_item_id)
def host_state_dir(repo: str, work_item_id: str | None) -> str:
"""State dir as seen from the HOST (``settings.host_repos_dir``).
Same physical directory as :func:`state_dir` via the shared mount; the host
path is what we embed in an ssh command if a host-side helper needs it.
"""
return _state_dir(settings.host_repos_dir, repo, work_item_id)
def marker_path(repo: str, work_item_id: str | None, name: str) -> str:
return os.path.join(state_dir(repo, work_item_id), name)
def has_marker(repo: str, work_item_id: str | None, name: str) -> bool:
"""True iff the named sentinel exists. Never raises."""
try:
return os.path.isfile(marker_path(repo, work_item_id, name))
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("has_marker error for %s/%s/%s: %s", repo, work_item_id, name, e)
return False
def window_active(repo: str, work_item_id: str | None) -> bool:
"""ORCH-094: True iff a post-deploy observation window is currently OPEN.
A window is open iff it has been armed (``ARMED`` sentinel) and has NOT yet
finished (no ``DONE`` sentinel). The terminal-window-aware deploy-status guard
(``deploy_status_guard.decide``) uses this to keep the legitimate post-deploy
``Monitoring after Deploy`` status for a task that is already DB-``done`` while
its window is live, and to converge to ``Done`` once the window has closed.
Restart-safe (the sentinels live on disk) and never-raise -> False on error
(a doubt resolves to "window closed", i.e. converge to Done — the safe-for-
indication default that matches the bug we are fixing).
"""
try:
return has_marker(repo, work_item_id, ARMED) and not has_marker(
repo, work_item_id, DONE
)
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("window_active error for %s/%s: %s", repo, work_item_id, e)
return False
def write_marker(repo: str, work_item_id: str | None, name: str, content: str = "") -> bool:
"""Create/overwrite a sentinel (best-effort). Returns True on success."""
try:
d = state_dir(repo, work_item_id)
os.makedirs(d, exist_ok=True)
with open(os.path.join(d, name), "w", encoding="utf-8") as f:
f.write(str(content))
return True
except OSError as e:
logger.warning("write_marker error for %s/%s/%s: %s", repo, work_item_id, name, e)
return False
def mark_done(repo: str, work_item_id: str | None) -> bool:
"""Mark monitoring finished for this work item (anti-dupe, AC-15)."""
return write_marker(repo, work_item_id, DONE, "done")
def read_series(repo: str, work_item_id: str | None) -> list:
"""Read the persisted probe series (JSON list). Missing/corrupt -> ``[]``.
Never raises — restart-safe streak/5xx counters survive a container restart.
"""
p = marker_path(repo, work_item_id, SERIES)
try:
with open(p, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except FileNotFoundError:
return []
except Exception as e: # noqa: BLE001 - never-raise; corrupt -> empty
logger.warning("read_series error for %s/%s: %s", repo, work_item_id, e)
return []
def append_probe(repo: str, work_item_id: str | None, probe: ProbeResult) -> list:
"""Append a probe to the persisted series and return the new list.
Best-effort (a write error logs and returns the in-memory list so the tick
still classifies). Never raises.
"""
series = read_series(repo, work_item_id)
try:
series.append(probe.as_dict() if isinstance(probe, ProbeResult) else dict(probe))
except Exception as e: # noqa: BLE001
logger.warning("append_probe coerce error for %s/%s: %s", repo, work_item_id, e)
return series
try:
d = state_dir(repo, work_item_id)
os.makedirs(d, exist_ok=True)
with open(os.path.join(d, SERIES), "w", encoding="utf-8") as f:
json.dump(series, f)
except OSError as e:
logger.warning("append_probe write error for %s/%s: %s", repo, work_item_id, e)
return series
def arm_monitor(repo: str, work_item_id: str | None, branch: str, task_id: int) -> bool:
"""Arm post-deploy monitoring after ``deploy -> done`` (AC-1 / AC-15).
Idempotent: if the ``armed`` sentinel already exists this is a no-op (a double
webhook / reconciler F-1 / finalizer Phase C can drive ``done`` more than once,
AC-15). Otherwise creates the state dir, writes ``armed`` + an empty ``series``,
and enqueues the FIRST ``post-deploy-monitor`` job with a delay of one interval
(so the prod has settled before the first probe). Returns True iff it armed a
NEW monitor. Never raises — the caller (terminal block of ``advance_stage``)
must never be crashed by a monitoring hiccup.
"""
try:
if has_marker(repo, work_item_id, ARMED):
logger.info("arm_monitor: already armed for %s/%s (no-op)", repo, work_item_id)
return False
write_marker(repo, work_item_id, ARMED, "armed")
# Initialise an empty series so read_series is well-defined from tick 1.
try:
d = state_dir(repo, work_item_id)
os.makedirs(d, exist_ok=True)
with open(os.path.join(d, SERIES), "w", encoding="utf-8") as f:
json.dump([], f)
except OSError as e:
logger.warning("arm_monitor: series init error for %s/%s: %s", repo, work_item_id, e)
# Lazy import keeps this module a leaf (db is a low-level dependency).
from .db import enqueue_job
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: post-deploy\nNote: post-deploy monitor tick 1 "
f"(window {settings.post_deploy_window_s}s, interval "
f"{settings.post_deploy_interval_s}s)."
)
job_id = enqueue_job(
"post-deploy-monitor", repo, task_desc, task_id=task_id,
available_at_delay_s=settings.post_deploy_interval_s,
)
logger.info(
"arm_monitor: armed post-deploy monitor for %s/%s (job_id=%s)",
repo, work_item_id, job_id,
)
return True
except Exception as e: # noqa: BLE001 - never-raise contract
logger.error("arm_monitor error for %s/%s: %s", repo, work_item_id, e)
return False
def max_ticks() -> int:
"""Bounded tick budget for the window (anti-livelock, like
``deploy_finalize_max_attempts``): ``window_s // interval_s`` (>= 1)."""
try:
interval = max(1, int(settings.post_deploy_interval_s))
return max(1, int(settings.post_deploy_window_s) // interval)
except Exception: # noqa: BLE001 - never-raise
return 1
# ---------------------------------------------------------------------------
# Rollback command (non-self repos only; reuses deploy_prod_* env — ТЗ §2.4)
# ---------------------------------------------------------------------------
def build_rollback_command(repo: str) -> list[str]:
"""Build the ssh argv that runs the deploy hook in ``--rollback`` mode.
Mirrors ``self_deploy.build_deploy_command`` (same prod-env, INFRA P-2 ssh
target) but the action is ``--rollback`` and the call is SYNCHRONOUS (the
target container is NOT the orchestrator, so it is safe to wait for the hook
exit-code directly — no detached setsid wrapper, no ``result`` sentinel).
Reuses the existing ``deploy_prod_*`` settings; no new duplicate config.
"""
env_assignments = (
f"TARGET_SERVICE={shlex.quote(settings.deploy_prod_target_service)} "
f"TARGET_PORT={int(settings.deploy_prod_target_port)} "
f"TARGET_IMAGE={shlex.quote(settings.deploy_prod_target_image)} "
f"COMPOSE_PROFILE={shlex.quote(settings.deploy_prod_compose_profile)} "
f"PREV_IMAGE_FILE={shlex.quote(settings.deploy_prod_prev_image_file)}"
)
inner = (
f"cd {shlex.quote(settings.deploy_host_repo_path)} && "
f"{env_assignments} "
f"bash {shlex.quote(settings.deploy_hook_script)} --rollback"
)
user = (settings.deploy_ssh_user or "").strip()
host = (settings.deploy_ssh_host or "").strip()
target = f"{user}@{host}" if user else host
return ["ssh", "-o", "StrictHostKeyChecking=no", target, inner]
def run_rollback(repo: str) -> tuple[int, str]:
"""Run the ``--rollback`` hook synchronously. Returns ``(exit_code, detail)``.
Never raises: an ssh launch error / timeout maps to a non-zero exit-code so
the caller records ``ROLLBACK_FAILED`` and escalates (AC-9). NEVER used for
the self-hosting repo (``decide_action`` returns ``ALERT_ONLY`` there) — the
structural guard against a tick restarting the prod orchestrator (AC-8).
"""
cmd = build_rollback_command(repo)
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=_SSH_TIMEOUT)
except subprocess.TimeoutExpired:
return 2, "rollback ssh timeout"
except (subprocess.SubprocessError, OSError) as e:
return 2, f"rollback ssh error: {e}"
detail = ((r.stderr or "") + (r.stdout or "")).strip()[:200]
return int(r.returncode), detail
# ---------------------------------------------------------------------------
# Artefact 16-post-deploy-log.md (machine-readable frontmatter — ТЗ §2.5)
# ---------------------------------------------------------------------------
def build_post_deploy_log(
work_item_id: str,
status: str,
action_taken: str,
window_s: int,
checks_total: int,
checks_failed: int,
body_extra: str = "",
) -> str:
"""Render a 16-post-deploy-log.md body. Only the YAML-frontmatter is machine
read (canon of gates; the loop-of-lessons ORCH-8 consumes it, BR-10). The
body is informational. Parseable by ``yaml.safe_load`` (AC-13).
"""
return (
"---\n"
f"post_deploy_status: {status}\n"
f"action_taken: {action_taken}\n"
f"work_item: {work_item_id}\n"
f"window_s: {int(window_s)}\n"
f"checks_total: {int(checks_total)}\n"
f"checks_failed: {int(checks_failed)}\n"
"---\n\n"
"# Post-deploy log — ORCH-021 post-deploy monitor\n\n"
f"Наблюдение прода завершено: `post_deploy_status: {status}`, "
f"`action_taken: {action_taken}`.\n\n"
f"Окно наблюдения: {int(window_s)}s; опросов всего: {int(checks_total)}, "
f"из них с провалом: {int(checks_failed)}.\n"
f"{body_extra}"
)
def write_post_deploy_log(
repo: str,
work_item_id: str,
branch: str,
status: str,
action_taken: str,
window_s: int,
checks_total: int,
checks_failed: int,
body_extra: str = "",
) -> bool:
"""Write 16-post-deploy-log.md into the task worktree and best-effort
commit+push it. Returns True iff the file was written. Never raises — the
artefact is best-effort, its absence rolls nothing back (AC-13 / TC-15).
"""
from .git_worktree import get_worktree_path
rel = f"docs/work-items/{work_item_id}/16-post-deploy-log.md"
try:
wt = get_worktree_path(repo, branch)
except Exception as e: # noqa: BLE001 - never-raise
logger.error("write_post_deploy_log: worktree error for %s/%s: %s", repo, branch, e)
return False
path = os.path.join(wt, rel)
content = build_post_deploy_log(
work_item_id, status, action_taken, window_s, checks_total, checks_failed, body_extra
)
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
except OSError as e:
logger.error("write_post_deploy_log: write error at %s: %s", path, e)
return False
# ORCH-101 (A4): HOME + email domain from Settings; the actor NAME stays the
# platform literal `post-deploy-monitor` (D2). Defaults = previous values.
_email = f"post-deploy-monitor@{settings.git_email_domain}"
git_env = {
**os.environ,
"HOME": settings.agent_home_dir,
"GIT_AUTHOR_NAME": "post-deploy-monitor",
"GIT_AUTHOR_EMAIL": _email,
"GIT_COMMITTER_NAME": "post-deploy-monitor",
"GIT_COMMITTER_EMAIL": _email,
}
try:
subprocess.run(["git", "-C", wt, "add", rel],
capture_output=True, timeout=_GIT_TIMEOUT, env=git_env)
commit = subprocess.run(
["git", "-C", wt, "commit", "-m",
f"docs(ORCH-021): post-deploy {status}/{action_taken} for {work_item_id}"],
capture_output=True, text=True, timeout=_GIT_TIMEOUT, env=git_env,
)
if commit.returncode == 0:
subprocess.run(["git", "-C", wt, "push", "origin", branch],
capture_output=True, timeout=_GIT_TIMEOUT, env=git_env)
except (subprocess.SubprocessError, OSError) as e:
logger.warning("write_post_deploy_log: git commit/push best-effort failed: %s", e)
return True
# ---------------------------------------------------------------------------
# Observability snapshot for GET /queue (BR-9 / AC-14)
# ---------------------------------------------------------------------------
def status() -> dict:
"""Post-deploy snapshot for /queue observability. Never raises.
``active`` — work items with an ``armed`` sentinel but no ``done`` yet (a
monitoring window in flight). ``last_outcome`` — best-effort last finished
window read from the most-recent ``done`` state dir's series length.
"""
snap = {
"enabled": False,
"window_s": None,
"interval_s": None,
"repos": "",
"active": [],
"active_count": 0,
}
try:
snap["enabled"] = bool(settings.post_deploy_monitor_enabled)
snap["window_s"] = int(settings.post_deploy_window_s)
snap["interval_s"] = int(settings.post_deploy_interval_s)
snap["repos"] = settings.post_deploy_repos or ""
pattern = os.path.join(settings.repos_dir, ".post-deploy-state-*", "*")
active: list[str] = []
for d in glob.glob(pattern):
try:
if not os.path.isdir(d):
continue
if os.path.isfile(os.path.join(d, ARMED)) and not os.path.isfile(
os.path.join(d, DONE)
):
active.append(os.path.basename(d))
except Exception: # noqa: BLE001 - skip one dir
continue
snap["active"] = sorted(active)
snap["active_count"] = len(active)
except Exception as e: # noqa: BLE001 - never-raise
logger.warning("post_deploy status snapshot error: %s", e)
return snap