"""ORCH-094 — observability of deploy-status setting (FR-4 / AC-5 / TC-09). Every deploy-phase status decision emits ONE structured line carrying work_item, caller (reason), target_status, db_stage, window_active and the verdict; a suppression/convergence is logged explicitly so a future flapp is attributable. """ import logging import os import tempfile import pytest _test_db = os.path.join(tempfile.gettempdir(), "test_deploy_status_obs.db") os.environ["ORCH_DB_PATH"] = _test_db os.environ["ORCH_REPOS_DIR"] = tempfile.gettempdir() os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.db as _db # noqa: E402 from src.db import init_db, get_db # noqa: E402 from src import deploy_status_guard as guard # noqa: E402 from src import post_deploy # noqa: E402 from src import config as cfg # noqa: E402 @pytest.fixture(autouse=True) def fresh_db(monkeypatch, tmp_path): monkeypatch.setattr(_db.settings, "db_path", _test_db) if os.path.exists(_test_db): os.unlink(_test_db) init_db() monkeypatch.setattr(cfg.settings, "deploy_status_guard_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "deploy_status_guard_repos", "", raising=False) monkeypatch.setattr(post_deploy.settings, "repos_dir", str(tmp_path)) monkeypatch.setattr(post_deploy.settings, "host_repos_dir", str(tmp_path)) yield def _make_task(stage, repo="orchestrator", wi="ORCH-061", branch="feature/ORCH-061-x"): conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES (?, ?, ?, ?, ?)", (f"plane-{wi}", wi, repo, branch, stage), ) conn.commit() conn.close() def test_tc09_converge_logs_full_attribution(caplog): _make_task("done") with caplog.at_level(logging.INFO, logger="orchestrator.deploy_status_guard"): verdict = guard.decide("ORCH-061", guard.MONITORING, reason="advance:deploy->done") assert verdict == guard.CONVERGE_DONE rec = [r for r in caplog.records if r.name == "orchestrator.deploy_status_guard"] assert rec, "guard emitted no observability record" msg = rec[-1].getMessage() # All five attribution fields + verdict are present. for token in ( "work_item=ORCH-061", "caller=advance:deploy->done", "target=monitoring", "db_stage=done", "window_active=False", "verdict=CONVERGE_DONE", ): assert token in msg, f"missing {token!r} in {msg!r}" # A convergence is logged at WARNING (easy to grep on a future flapp). assert rec[-1].levelno == logging.WARNING def test_tc09_allow_active_window_logged(caplog): _make_task("done") post_deploy.write_marker("orchestrator", "ORCH-061", post_deploy.ARMED, "armed") with caplog.at_level(logging.INFO, logger="orchestrator.deploy_status_guard"): verdict = guard.decide("ORCH-061", guard.MONITORING, reason="advance:deploy->done") assert verdict == guard.ALLOW rec = [r for r in caplog.records if r.name == "orchestrator.deploy_status_guard"][-1] msg = rec.getMessage() assert "window_active=True" in msg and "verdict=ALLOW" in msg assert rec.levelno == logging.INFO def test_tc09_suppress_cancelled_logged(caplog): _make_task("cancelled") with caplog.at_level(logging.INFO, logger="orchestrator.deploy_status_guard"): verdict = guard.decide("ORCH-061", guard.AWAITING, reason="phase_a") assert verdict == guard.SUPPRESS rec = [r for r in caplog.records if r.name == "orchestrator.deploy_status_guard"][-1] assert "verdict=SUPPRESS" in rec.getMessage() assert "db_stage=cancelled" in rec.getMessage() assert rec.levelno == logging.WARNING