Adds src/disk_watchdog.py — a background daemon thread modelled on reconciler/job_reaper that measures host-FS fill via the mounted bind-paths (/repos, /app/data) with shutil.disk_usage and Telegram-alerts the operator at >= threshold (default 85%). The missing proactive signal: on 07.06.2026 the mva154 host disk silently hit 100% and stalled the whole self-hosting pipeline. - Pure decide_action(used_pct, threshold, prev, now, realert_s): alert on crossing up, cooldown re-alert, single recovery below threshold (unit-tested without a thread/timer; clock injected). - measure_paths: shutil.disk_usage per path, dedup by st_dev, per-path never-raise (a broken path never fails the tick). - Config flags ORCH_DISK_MONITOR_* with defensive validation (threshold 1..100, positive intervals -> default + warning). Kill-switch -> daemon does not start. - Additive disk_monitor block in GET /queue; start/stop in main.lifespan. - never-raise (per-path/per-tick/per-send); STAGE_TRANSITIONS/QG_CHECKS/check_*/ DB schema untouched, no migration (anti-spam state in-memory). Tests: tests/test_disk_watchdog.py (TC-01..TC-12, 18 cases); full suite green (1296). Docs: INFRA.md, .env.example, CHANGELOG.md (architecture/README.md + ADRs authored at architecture stage). Refs: ORCH-063 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
330 lines
13 KiB
Python
330 lines
13 KiB
Python
"""ORCH-063: disk-watchdog tests (TC-01..TC-12).
|
|
|
|
The watchdog never touches a real disk or Telegram: ``shutil.disk_usage`` is
|
|
monkeypatched to set ``used_pct`` deterministically, ``send_telegram`` is captured
|
|
via monkeypatch, and the cooldown/recovery clock is injected through
|
|
``now_provider`` so time-dependent decisions are tested without a real timer.
|
|
"""
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
# Override env before importing app modules (same convention as test_reaper.py).
|
|
os.environ.setdefault("ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_orch_disk.db"))
|
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
|
|
|
import src.disk_watchdog as dw # noqa: E402
|
|
from src.disk_watchdog import ( # noqa: E402
|
|
ACTION_ALERT,
|
|
ACTION_NONE,
|
|
ACTION_REALERT,
|
|
ACTION_RECOVERY,
|
|
DiskWatchdog,
|
|
PathAlertState,
|
|
decide_action,
|
|
format_alert_message,
|
|
format_recovery_message,
|
|
measure_paths,
|
|
parse_paths,
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Helpers
|
|
# --------------------------------------------------------------------------- #
|
|
def _usage(used_pct: float, total_gb: float = 100.0):
|
|
"""Build a fake ``shutil.disk_usage`` result with the given fill %."""
|
|
total = int(total_gb * (1024 ** 3))
|
|
used = int(total * used_pct / 100)
|
|
free = total - used
|
|
|
|
class _U:
|
|
pass
|
|
|
|
u = _U()
|
|
u.total, u.used, u.free = total, used, free
|
|
return u
|
|
|
|
|
|
@pytest.fixture
|
|
def captured_sends(monkeypatch):
|
|
"""Capture every ``send_telegram`` call made by the watchdog."""
|
|
calls = []
|
|
|
|
def _fake_send(text, disable_notification=False):
|
|
calls.append({"text": text, "disable_notification": disable_notification})
|
|
return 1
|
|
|
|
monkeypatch.setattr(dw, "send_telegram", _fake_send)
|
|
return calls
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-01..TC-05: pure decision function
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc01_alert_on_crossing_up():
|
|
"""TC-01: was below, now >= threshold -> ALERT (threshold crossed)."""
|
|
prev = PathAlertState(alerting=False, last_alert_at=None)
|
|
assert decide_action(90.0, 85, prev, now=1000.0, realert_s=21600) == ACTION_ALERT
|
|
|
|
|
|
def test_tc02_antispam_within_cooldown():
|
|
"""TC-02: already alerting, above, < realert_s since last -> NONE (anti-spam)."""
|
|
prev = PathAlertState(alerting=True, last_alert_at=1000.0)
|
|
# 1000 s later, cooldown is 21600 -> still suppressed.
|
|
assert decide_action(90.0, 85, prev, now=2000.0, realert_s=21600) == ACTION_NONE
|
|
|
|
|
|
def test_tc03_realert_after_cooldown():
|
|
"""TC-03: already alerting, above, >= realert_s elapsed -> REALERT."""
|
|
prev = PathAlertState(alerting=True, last_alert_at=1000.0)
|
|
assert decide_action(90.0, 85, prev, now=1000.0 + 21600, realert_s=21600) == ACTION_REALERT
|
|
|
|
|
|
def test_tc04_recovery_and_no_repeat():
|
|
"""TC-04: above->below resets state with one RECOVERY; staying below is silent."""
|
|
prev_above = PathAlertState(alerting=True, last_alert_at=1000.0)
|
|
assert decide_action(70.0, 85, prev_above, now=5000.0, realert_s=21600) == ACTION_RECOVERY
|
|
# After recovery the state is non-alerting; staying below -> NONE (no repeat).
|
|
prev_below = PathAlertState(alerting=False, last_alert_at=None)
|
|
assert decide_action(70.0, 85, prev_below, now=6000.0, realert_s=21600) == ACTION_NONE
|
|
|
|
|
|
def test_tc05_threshold_boundary_inclusive():
|
|
"""TC-05: used_pct == threshold counts as exceeding; threshold-1 is silent."""
|
|
below = PathAlertState(alerting=False, last_alert_at=None)
|
|
assert decide_action(85.0, 85, below, now=1.0, realert_s=10) == ACTION_ALERT
|
|
assert decide_action(84.0, 85, below, now=1.0, realert_s=10) == ACTION_NONE
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-06: measurement + device dedup
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc06_measure_and_dedup_by_device(monkeypatch):
|
|
"""TC-06: per-path used_pct/free computed; same-device paths dedup to one."""
|
|
monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(50.0))
|
|
# Both paths share st_dev=42 -> single logical partition.
|
|
monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 42})())
|
|
|
|
out = measure_paths(["/repos", "/app/data"])
|
|
assert len(out) == 1
|
|
m = out[0]
|
|
assert m["used_pct"] == 50.0
|
|
assert m["free_bytes"] > 0 and m["free_gb"] > 0
|
|
assert m["dedup_key"] == 42
|
|
|
|
# Distinct devices -> two measurements.
|
|
devs = iter([1, 2])
|
|
monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": next(devs)})())
|
|
out2 = measure_paths(["/repos", "/app/data"])
|
|
assert len(out2) == 2
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-07: never-raise (broken path + send failure)
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc07_broken_path_does_not_kill_tick(monkeypatch):
|
|
"""TC-07: a missing path is skipped; other paths are still measured."""
|
|
def _maybe_raise(path):
|
|
if path == "/nope":
|
|
raise FileNotFoundError(path)
|
|
return _usage(50.0)
|
|
|
|
monkeypatch.setattr(dw.shutil, "disk_usage", _maybe_raise)
|
|
devs = {"/nope": 1, "/repos": 2}
|
|
monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": devs[p]})())
|
|
|
|
out = measure_paths(["/nope", "/repos"])
|
|
assert len(out) == 1
|
|
assert out[0]["path"] == "/repos"
|
|
|
|
|
|
def test_tc07_send_failure_does_not_raise(monkeypatch):
|
|
"""TC-07: an exception in send_telegram is swallowed; the tick completes."""
|
|
monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(95.0))
|
|
monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 7})())
|
|
|
|
def _boom(text, disable_notification=False):
|
|
raise RuntimeError("telegram down")
|
|
|
|
monkeypatch.setattr(dw, "send_telegram", _boom)
|
|
wd = DiskWatchdog(now_provider=lambda: 1000.0)
|
|
wd.tick() # must not raise
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-08: alert message format + notifying
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc08_alert_message_actionable_and_notifying(monkeypatch, captured_sends):
|
|
"""TC-08: alert carries path/used_pct/free/threshold; sent notifying."""
|
|
monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(87.3))
|
|
monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 9})())
|
|
monkeypatch.setattr(dw.settings, "disk_monitor_paths", "/repos", raising=False)
|
|
monkeypatch.setattr(dw.settings, "disk_monitor_threshold_pct", 85, raising=False)
|
|
|
|
wd = DiskWatchdog(now_provider=lambda: 1000.0)
|
|
wd.tick()
|
|
|
|
assert len(captured_sends) == 1
|
|
call = captured_sends[0]
|
|
text = call["text"]
|
|
assert "/repos" in text
|
|
assert "87.3" in text
|
|
assert "85" in text # threshold
|
|
assert "ГБ" in text # free space
|
|
assert call["disable_notification"] is False # notifying, not silent
|
|
|
|
|
|
def test_tc08_format_helpers():
|
|
"""TC-08 (unit): format helpers contain the actionable fields."""
|
|
m = {"path": "/repos", "used_pct": 88.0, "free_gb": 6.2, "free_pct": 12.0}
|
|
alert = format_alert_message(m, 85, "mva154")
|
|
assert "/repos" in alert and "88.0" in alert and "85" in alert and "6.2" in alert
|
|
rec = format_recovery_message(m, "mva154")
|
|
assert "/repos" in rec and "88.0" in rec
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-09: kill-switch
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc09_killswitch_does_not_start(monkeypatch):
|
|
"""TC-09: disk_monitor_enabled=False -> start() is a no-op (no thread)."""
|
|
monkeypatch.setattr(dw.settings, "disk_monitor_enabled", False, raising=False)
|
|
wd = DiskWatchdog()
|
|
wd.start()
|
|
assert wd._thread is None
|
|
|
|
|
|
def test_tc09_killswitch_status_block(monkeypatch):
|
|
"""TC-09: status() reports enabled=False under the kill-switch."""
|
|
monkeypatch.setattr(dw.settings, "disk_monitor_enabled", False, raising=False)
|
|
wd = DiskWatchdog()
|
|
assert wd.status()["enabled"] is False
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-10: status()
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc10_status_shape(monkeypatch):
|
|
"""TC-10: status() returns the expected keys, never-raise with no measurements."""
|
|
monkeypatch.setattr(dw.settings, "disk_monitor_enabled", True, raising=False)
|
|
wd = DiskWatchdog()
|
|
st = wd.status()
|
|
for key in ("enabled", "threshold_pct", "interval_s", "realert_s", "last_run_ts", "paths"):
|
|
assert key in st
|
|
assert st["paths"] == [] # no tick yet
|
|
|
|
|
|
def test_tc10_status_reflects_last_measurement(monkeypatch):
|
|
"""TC-10: after a tick status().paths carries used_pct/free/alerting/last_alert_at."""
|
|
monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(90.0))
|
|
monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 3})())
|
|
monkeypatch.setattr(dw.settings, "disk_monitor_paths", "/repos", raising=False)
|
|
monkeypatch.setattr(dw, "send_telegram", lambda *a, **k: 1)
|
|
|
|
wd = DiskWatchdog(now_provider=lambda: 1000.0)
|
|
wd.tick()
|
|
paths = wd.status()["paths"]
|
|
assert len(paths) == 1
|
|
p = paths[0]
|
|
assert p["path"] == "/repos"
|
|
assert p["used_pct"] == 90.0
|
|
assert p["alerting"] is True
|
|
assert p["last_alert_at"] == 1000.0
|
|
for key in ("free_gb", "free_pct"):
|
|
assert key in p
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Anti-spam / recovery end-to-end through tick()
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tick_antispam_then_realert_then_recovery(monkeypatch, captured_sends):
|
|
"""End-to-end: one alert on crossing, silence within cooldown, realert after
|
|
cooldown, then a single recovery — driving the daemon's in-memory state."""
|
|
fill = {"pct": 90.0}
|
|
clock = {"t": 1000.0}
|
|
monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(fill["pct"]))
|
|
monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 5})())
|
|
monkeypatch.setattr(dw.settings, "disk_monitor_paths", "/repos", raising=False)
|
|
monkeypatch.setattr(dw.settings, "disk_monitor_threshold_pct", 85, raising=False)
|
|
monkeypatch.setattr(dw.settings, "disk_monitor_realert_s", 100, raising=False)
|
|
|
|
wd = DiskWatchdog(now_provider=lambda: clock["t"])
|
|
|
|
wd.tick() # crossing up -> ALERT
|
|
assert len(captured_sends) == 1
|
|
|
|
clock["t"] += 10 # within cooldown -> silent
|
|
wd.tick()
|
|
assert len(captured_sends) == 1
|
|
|
|
clock["t"] += 200 # cooldown elapsed -> REALERT
|
|
wd.tick()
|
|
assert len(captured_sends) == 2
|
|
|
|
fill["pct"] = 70.0 # drop below -> RECOVERY (one message)
|
|
clock["t"] += 10
|
|
wd.tick()
|
|
assert len(captured_sends) == 3
|
|
assert "ниже порога" in captured_sends[2]["text"]
|
|
|
|
wd.tick() # stays below -> silent (no repeat recovery)
|
|
assert len(captured_sends) == 3
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# parse_paths
|
|
# --------------------------------------------------------------------------- #
|
|
def test_parse_paths_default_and_csv():
|
|
assert parse_paths("") == ["/repos", "/app/data"]
|
|
assert parse_paths(" ") == ["/repos", "/app/data"]
|
|
assert parse_paths("/a, /b ,/c") == ["/a", "/b", "/c"]
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-11 / TC-12: GET /queue integration
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc11_queue_has_disk_monitor_block(monkeypatch):
|
|
"""TC-11: GET /queue carries an additive disk_monitor block; existing keys kept."""
|
|
import asyncio
|
|
import src.db as db
|
|
from src.db import init_db
|
|
from src import main
|
|
|
|
dbfile = os.path.join(tempfile.gettempdir(), "test_disk_queue.db")
|
|
monkeypatch.setattr(db.settings, "db_path", dbfile, raising=False)
|
|
init_db()
|
|
|
|
payload = asyncio.run(main.queue())
|
|
|
|
for key in (
|
|
"counts", "max_concurrency", "poll_interval", "resilience", "reconcile",
|
|
"reaper", "post_deploy", "merge_verify", "task_deps", "serial_gate",
|
|
"auto_labels", "recent",
|
|
):
|
|
assert key in payload, f"existing /queue key '{key}' must be preserved"
|
|
|
|
assert "disk_monitor" in payload
|
|
dm = payload["disk_monitor"]
|
|
assert "enabled" in dm and "threshold_pct" in dm and "interval_s" in dm
|
|
assert "paths" in dm
|
|
|
|
|
|
def test_tc12_queue_disabled_block(monkeypatch):
|
|
"""TC-12: with the kill-switch off, /queue reports disk_monitor.enabled=false."""
|
|
import asyncio
|
|
import src.db as db
|
|
from src.db import init_db
|
|
from src import main
|
|
from src import disk_watchdog as dwmod
|
|
|
|
dbfile = os.path.join(tempfile.gettempdir(), "test_disk_queue2.db")
|
|
monkeypatch.setattr(db.settings, "db_path", dbfile, raising=False)
|
|
monkeypatch.setattr(dwmod.settings, "disk_monitor_enabled", False, raising=False)
|
|
init_db()
|
|
|
|
payload = asyncio.run(main.queue())
|
|
assert payload["disk_monitor"]["enabled"] is False
|