"""ORCH-063: disk-watchdog tests (TC-01..TC-12). The watchdog never touches a real disk or Telegram: ``shutil.disk_usage`` is monkeypatched to set ``used_pct`` deterministically, ``send_telegram`` is captured via monkeypatch, and the cooldown/recovery clock is injected through ``now_provider`` so time-dependent decisions are tested without a real timer. """ import os import tempfile import pytest # Override env before importing app modules (same convention as test_reaper.py). os.environ.setdefault("ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_orch_disk.db")) os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.disk_watchdog as dw # noqa: E402 from src.disk_watchdog import ( # noqa: E402 ACTION_ALERT, ACTION_NONE, ACTION_REALERT, ACTION_RECOVERY, DiskWatchdog, PathAlertState, decide_action, format_alert_message, format_recovery_message, measure_paths, parse_paths, ) # --------------------------------------------------------------------------- # # Helpers # --------------------------------------------------------------------------- # def _usage(used_pct: float, total_gb: float = 100.0): """Build a fake ``shutil.disk_usage`` result with the given fill %.""" total = int(total_gb * (1024 ** 3)) used = int(total * used_pct / 100) free = total - used class _U: pass u = _U() u.total, u.used, u.free = total, used, free return u @pytest.fixture def captured_sends(monkeypatch): """Capture every ``send_telegram`` call made by the watchdog.""" calls = [] def _fake_send(text, disable_notification=False): calls.append({"text": text, "disable_notification": disable_notification}) return 1 monkeypatch.setattr(dw, "send_telegram", _fake_send) return calls # --------------------------------------------------------------------------- # # TC-01..TC-05: pure decision function # --------------------------------------------------------------------------- # def test_tc01_alert_on_crossing_up(): """TC-01: was below, now >= threshold -> ALERT (threshold crossed).""" prev = PathAlertState(alerting=False, last_alert_at=None) assert decide_action(90.0, 85, prev, now=1000.0, realert_s=21600) == ACTION_ALERT def test_tc02_antispam_within_cooldown(): """TC-02: already alerting, above, < realert_s since last -> NONE (anti-spam).""" prev = PathAlertState(alerting=True, last_alert_at=1000.0) # 1000 s later, cooldown is 21600 -> still suppressed. assert decide_action(90.0, 85, prev, now=2000.0, realert_s=21600) == ACTION_NONE def test_tc03_realert_after_cooldown(): """TC-03: already alerting, above, >= realert_s elapsed -> REALERT.""" prev = PathAlertState(alerting=True, last_alert_at=1000.0) assert decide_action(90.0, 85, prev, now=1000.0 + 21600, realert_s=21600) == ACTION_REALERT def test_tc04_recovery_and_no_repeat(): """TC-04: above->below resets state with one RECOVERY; staying below is silent.""" prev_above = PathAlertState(alerting=True, last_alert_at=1000.0) assert decide_action(70.0, 85, prev_above, now=5000.0, realert_s=21600) == ACTION_RECOVERY # After recovery the state is non-alerting; staying below -> NONE (no repeat). prev_below = PathAlertState(alerting=False, last_alert_at=None) assert decide_action(70.0, 85, prev_below, now=6000.0, realert_s=21600) == ACTION_NONE def test_tc05_threshold_boundary_inclusive(): """TC-05: used_pct == threshold counts as exceeding; threshold-1 is silent.""" below = PathAlertState(alerting=False, last_alert_at=None) assert decide_action(85.0, 85, below, now=1.0, realert_s=10) == ACTION_ALERT assert decide_action(84.0, 85, below, now=1.0, realert_s=10) == ACTION_NONE # --------------------------------------------------------------------------- # # TC-06: measurement + device dedup # --------------------------------------------------------------------------- # def test_tc06_measure_and_dedup_by_device(monkeypatch): """TC-06: per-path used_pct/free computed; same-device paths dedup to one.""" monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(50.0)) # Both paths share st_dev=42 -> single logical partition. monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 42})()) out = measure_paths(["/repos", "/app/data"]) assert len(out) == 1 m = out[0] assert m["used_pct"] == 50.0 assert m["free_bytes"] > 0 and m["free_gb"] > 0 assert m["dedup_key"] == 42 # Distinct devices -> two measurements. devs = iter([1, 2]) monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": next(devs)})()) out2 = measure_paths(["/repos", "/app/data"]) assert len(out2) == 2 # --------------------------------------------------------------------------- # # TC-07: never-raise (broken path + send failure) # --------------------------------------------------------------------------- # def test_tc07_broken_path_does_not_kill_tick(monkeypatch): """TC-07: a missing path is skipped; other paths are still measured.""" def _maybe_raise(path): if path == "/nope": raise FileNotFoundError(path) return _usage(50.0) monkeypatch.setattr(dw.shutil, "disk_usage", _maybe_raise) devs = {"/nope": 1, "/repos": 2} monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": devs[p]})()) out = measure_paths(["/nope", "/repos"]) assert len(out) == 1 assert out[0]["path"] == "/repos" def test_tc07_send_failure_does_not_raise(monkeypatch): """TC-07: an exception in send_telegram is swallowed; the tick completes.""" monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(95.0)) monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 7})()) def _boom(text, disable_notification=False): raise RuntimeError("telegram down") monkeypatch.setattr(dw, "send_telegram", _boom) wd = DiskWatchdog(now_provider=lambda: 1000.0) wd.tick() # must not raise # --------------------------------------------------------------------------- # # TC-08: alert message format + notifying # --------------------------------------------------------------------------- # def test_tc08_alert_message_actionable_and_notifying(monkeypatch, captured_sends): """TC-08: alert carries path/used_pct/free/threshold; sent notifying.""" monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(87.3)) monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 9})()) monkeypatch.setattr(dw.settings, "disk_monitor_paths", "/repos", raising=False) monkeypatch.setattr(dw.settings, "disk_monitor_threshold_pct", 85, raising=False) wd = DiskWatchdog(now_provider=lambda: 1000.0) wd.tick() assert len(captured_sends) == 1 call = captured_sends[0] text = call["text"] assert "/repos" in text assert "87.3" in text assert "85" in text # threshold assert "ГБ" in text # free space assert call["disable_notification"] is False # notifying, not silent def test_tc08_format_helpers(): """TC-08 (unit): format helpers contain the actionable fields.""" m = {"path": "/repos", "used_pct": 88.0, "free_gb": 6.2, "free_pct": 12.0} alert = format_alert_message(m, 85, "mva154") assert "/repos" in alert and "88.0" in alert and "85" in alert and "6.2" in alert rec = format_recovery_message(m, "mva154") assert "/repos" in rec and "88.0" in rec # --------------------------------------------------------------------------- # # TC-09: kill-switch # --------------------------------------------------------------------------- # def test_tc09_killswitch_does_not_start(monkeypatch): """TC-09: disk_monitor_enabled=False -> start() is a no-op (no thread).""" monkeypatch.setattr(dw.settings, "disk_monitor_enabled", False, raising=False) wd = DiskWatchdog() wd.start() assert wd._thread is None def test_tc09_killswitch_status_block(monkeypatch): """TC-09: status() reports enabled=False under the kill-switch.""" monkeypatch.setattr(dw.settings, "disk_monitor_enabled", False, raising=False) wd = DiskWatchdog() assert wd.status()["enabled"] is False # --------------------------------------------------------------------------- # # TC-10: status() # --------------------------------------------------------------------------- # def test_tc10_status_shape(monkeypatch): """TC-10: status() returns the expected keys, never-raise with no measurements.""" monkeypatch.setattr(dw.settings, "disk_monitor_enabled", True, raising=False) wd = DiskWatchdog() st = wd.status() for key in ("enabled", "threshold_pct", "interval_s", "realert_s", "last_run_ts", "paths"): assert key in st assert st["paths"] == [] # no tick yet def test_tc10_status_reflects_last_measurement(monkeypatch): """TC-10: after a tick status().paths carries used_pct/free/alerting/last_alert_at.""" monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(90.0)) monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 3})()) monkeypatch.setattr(dw.settings, "disk_monitor_paths", "/repos", raising=False) monkeypatch.setattr(dw, "send_telegram", lambda *a, **k: 1) wd = DiskWatchdog(now_provider=lambda: 1000.0) wd.tick() paths = wd.status()["paths"] assert len(paths) == 1 p = paths[0] assert p["path"] == "/repos" assert p["used_pct"] == 90.0 assert p["alerting"] is True assert p["last_alert_at"] == 1000.0 for key in ("free_gb", "free_pct"): assert key in p # --------------------------------------------------------------------------- # # Anti-spam / recovery end-to-end through tick() # --------------------------------------------------------------------------- # def test_tick_antispam_then_realert_then_recovery(monkeypatch, captured_sends): """End-to-end: one alert on crossing, silence within cooldown, realert after cooldown, then a single recovery — driving the daemon's in-memory state.""" fill = {"pct": 90.0} clock = {"t": 1000.0} monkeypatch.setattr(dw.shutil, "disk_usage", lambda p: _usage(fill["pct"])) monkeypatch.setattr(dw.os, "stat", lambda p: type("S", (), {"st_dev": 5})()) monkeypatch.setattr(dw.settings, "disk_monitor_paths", "/repos", raising=False) monkeypatch.setattr(dw.settings, "disk_monitor_threshold_pct", 85, raising=False) monkeypatch.setattr(dw.settings, "disk_monitor_realert_s", 100, raising=False) wd = DiskWatchdog(now_provider=lambda: clock["t"]) wd.tick() # crossing up -> ALERT assert len(captured_sends) == 1 clock["t"] += 10 # within cooldown -> silent wd.tick() assert len(captured_sends) == 1 clock["t"] += 200 # cooldown elapsed -> REALERT wd.tick() assert len(captured_sends) == 2 fill["pct"] = 70.0 # drop below -> RECOVERY (one message) clock["t"] += 10 wd.tick() assert len(captured_sends) == 3 assert "ниже порога" in captured_sends[2]["text"] wd.tick() # stays below -> silent (no repeat recovery) assert len(captured_sends) == 3 # --------------------------------------------------------------------------- # # parse_paths # --------------------------------------------------------------------------- # def test_parse_paths_default_and_csv(): assert parse_paths("") == ["/repos", "/app/data"] assert parse_paths(" ") == ["/repos", "/app/data"] assert parse_paths("/a, /b ,/c") == ["/a", "/b", "/c"] # --------------------------------------------------------------------------- # # TC-11 / TC-12: GET /queue integration # --------------------------------------------------------------------------- # def test_tc11_queue_has_disk_monitor_block(monkeypatch): """TC-11: GET /queue carries an additive disk_monitor block; existing keys kept.""" import asyncio import src.db as db from src.db import init_db from src import main dbfile = os.path.join(tempfile.gettempdir(), "test_disk_queue.db") monkeypatch.setattr(db.settings, "db_path", dbfile, raising=False) init_db() payload = asyncio.run(main.queue()) for key in ( "counts", "max_concurrency", "poll_interval", "resilience", "reconcile", "reaper", "post_deploy", "merge_verify", "task_deps", "serial_gate", "auto_labels", "recent", ): assert key in payload, f"existing /queue key '{key}' must be preserved" assert "disk_monitor" in payload dm = payload["disk_monitor"] assert "enabled" in dm and "threshold_pct" in dm and "interval_s" in dm assert "paths" in dm def test_tc12_queue_disabled_block(monkeypatch): """TC-12: with the kill-switch off, /queue reports disk_monitor.enabled=false.""" import asyncio import src.db as db from src.db import init_db from src import main from src import disk_watchdog as dwmod dbfile = os.path.join(tempfile.gettempdir(), "test_disk_queue2.db") monkeypatch.setattr(db.settings, "db_path", dbfile, raising=False) monkeypatch.setattr(dwmod.settings, "disk_monitor_enabled", False, raising=False) init_db() payload = asyncio.run(main.queue()) assert payload["disk_monitor"]["enabled"] is False