"""TC-11: tolerance to the /metrics contract. Unknown fields are ignored, a missing optional does not crash, and a schema_version above the known one logs a warning (no crash). Also covers the envelope-derived signal evaluation (agent_hung / stage_stuck / job_failed / queue_depth). """ import logging from watchdog.collectors import orch as orch_mod from watchdog.config import Config from watchdog.signals import AgentSample, eval_envelope def _cfg(**kw): return Config.from_env(kw) def test_unknown_field_ignored(): body = '{"schema_version":1,"stages":[],"brand_new_field":42}' env = orch_mod.parse_envelope(body) assert env["brand_new_field"] == 42 # tolerated, not a crash def test_missing_optional_not_an_error(): env = orch_mod.parse_envelope('{"schema_version":1}') ev = eval_envelope(env, _cfg(), prev_agents={}, prev_failed=None) assert ev.signals == [] # no stages/agents/queue -> no signals, no crash def test_non_object_body_raises_valueerror(): import pytest with pytest.raises(ValueError): orch_mod.parse_envelope("[1,2,3]") def test_schema_version_bump_warns(caplog): env = {"schema_version": 999} with caplog.at_level(logging.WARNING): orch_mod.check_schema_version(env) assert any("schema_version" in r.message for r in caplog.records) def test_parse_generated_at_roundtrip_and_tolerant(): assert orch_mod.parse_generated_at({"generated_at": "2026-06-10T00:00:00Z"}) assert orch_mod.parse_generated_at({"generated_at": "garbage"}) is None assert orch_mod.parse_generated_at({}) is None def test_queue_depth_and_job_failed_signals(): env = { "schema_version": 1, "queue": {"depth": 25, "counts": {"failed": 5}}, } cfg = _cfg(WATCHDOG_QUEUE_DEPTH="20") # First tick: failed baseline established, depth over threshold fires. ev = eval_envelope(env, cfg, prev_agents={}, prev_failed=None) keys = {s.key for s in ev.signals} assert "queue_depth" in keys assert "job_failed" not in keys # no prior baseline -> no edge yet assert ev.failed_count == 5 # Next tick: failed grew 5 -> 7 -> edge job_failed alert. env2 = {"queue": {"depth": 0, "counts": {"failed": 7}}} ev2 = eval_envelope(env2, cfg, prev_agents={}, prev_failed=ev.failed_count) jf = [s for s in ev2.signals if s.key == "job_failed"] assert len(jf) == 1 and jf[0].edge is True and jf[0].active is True def test_stage_stuck_signal(): env = {"stages": [{"work_item": "ORCH-1", "stage": "review", "age_in_stage_s": 9999}]} cfg = _cfg(WATCHDOG_STAGE_STUCK_MIN="1") # 60s threshold ev = eval_envelope(env, cfg, prev_agents={}, prev_failed=None) stuck = [s for s in ev.signals if s.key == ("stage_stuck", "ORCH-1")] assert len(stuck) == 1 and stuck[0].active is True def test_agent_hung_needs_two_polls_and_low_cpu(): cfg = _cfg(WATCHDOG_AGENT_HUNG_MIN="1", WATCHDOG_AGENT_CPU_FLOOR="0.01") env = { "schema_version": 1, "generated_at": "2026-06-10T00:01:40Z", # +100s vs prev sample below "clk_tck": 100, "agents": [{"run_id": 7, "agent": "developer", "runtime_s": 999, "cpu_ticks": 50}], } prev_t = orch_mod.parse_generated_at({"generated_at": "2026-06-10T00:00:00Z"}) prev = {7: AgentSample(cpu_ticks=40, generated_at=prev_t)} # Δticks=10 over clk_tck=100 -> 0.1 CPU-seconds over 100s -> frac 0.001 < floor. ev = eval_envelope(env, cfg, prev_agents=prev, prev_failed=None) hung = [s for s in ev.signals if s.key == ("agent_hung", 7)] assert len(hung) == 1 and hung[0].active is True def test_agent_hung_skipped_when_cpu_ticks_null(): cfg = _cfg(WATCHDOG_AGENT_HUNG_MIN="1") env = { "generated_at": "2026-06-10T00:01:40Z", "clk_tck": 100, "agents": [{"run_id": 8, "runtime_s": 999, "cpu_ticks": None}], } prev = {8: AgentSample(cpu_ticks=10, generated_at=0.0)} ev = eval_envelope(env, cfg, prev_agents=prev, prev_failed=None) assert [s for s in ev.signals if s.key == ("agent_hung", 8)] == [] def test_agent_busy_not_hung(): cfg = _cfg(WATCHDOG_AGENT_HUNG_MIN="1", WATCHDOG_AGENT_CPU_FLOOR="0.01") env = { "generated_at": "2026-06-10T00:01:40Z", "clk_tck": 100, "agents": [{"run_id": 9, "runtime_s": 999, "cpu_ticks": 5000}], } prev_t = orch_mod.parse_generated_at({"generated_at": "2026-06-10T00:00:00Z"}) prev = {9: AgentSample(cpu_ticks=40, generated_at=prev_t)} # Big Δticks -> high CPU fraction -> not hung. ev = eval_envelope(env, cfg, prev_agents=prev, prev_failed=None) assert [s for s in ev.signals if s.key == ("agent_hung", 9)] == []