"""ORCH-111: the /proc collector — pure parsing + a fake /proc tree (never-raise). Mirrors ``test_host_collector.py``: the pure parsers are unit-tested on text fixtures and ``collect_candidates`` is driven against a temporary ``/proc`` tree, so no real host / Linux kernel is required. """ from watchdog.collectors import proc as proc_mod # -- parse_btime -------------------------------------------------------------- def test_parse_btime_reads_the_btime_line(): text = "cpu 1 2 3 4\nbtime 1700000000\nprocesses 99\n" assert proc_mod.parse_btime(text) == 1700000000 def test_parse_btime_absent_is_none(): assert proc_mod.parse_btime("cpu 1 2 3\nintr 0\n") is None def test_parse_btime_garbage_is_none(): assert proc_mod.parse_btime("btime not-a-number\n") is None assert proc_mod.parse_btime("") is None # -- parse_pid_stat (comm may contain spaces/parens) -------------------------- def test_parse_pid_stat_simple(): # field 14 utime, 15 stime, 22 starttime. fields = ["0"] * 52 fields[0], fields[1], fields[2] = "1234", "(python3)", "R" fields[13], fields[14], fields[21] = "50", "25", "9000" st = proc_mod.parse_pid_stat(" ".join(fields)) assert st == {"utime": 50, "stime": 25, "starttime": 9000} def test_parse_pid_stat_comm_with_spaces_and_parens(): # A pathological comm "(py (test) x)" must not break field indexing — we # split after the LAST ')'. fields = ["0"] * 52 fields[13], fields[14], fields[21] = "7", "3", "4242" tail = " ".join(fields[2:]) line = f"1234 (py (test) x) {tail}" st = proc_mod.parse_pid_stat(line) assert st == {"utime": 7, "stime": 3, "starttime": 4242} def test_parse_pid_stat_truncated_is_none(): assert proc_mod.parse_pid_stat("1234 (python3) R 1 2 3") is None assert proc_mod.parse_pid_stat("no parens here") is None assert proc_mod.parse_pid_stat("") is None # -- decode_cmdline ----------------------------------------------------------- def test_decode_cmdline_nul_separated(): raw = b"python3\x00-m\x00pytest\x00tests/test_x.py\x00" assert proc_mod.decode_cmdline(raw) == "python3 -m pytest tests/test_x.py" def test_decode_cmdline_empty_for_kernel_thread(): assert proc_mod.decode_cmdline(b"") == "" assert proc_mod.decode_cmdline(None) == "" # -- matches_patterns --------------------------------------------------------- def test_matches_patterns_substring_any(): assert proc_mod.matches_patterns("python3 -m pytest x", ["pytest"]) is True assert proc_mod.matches_patterns("python3 -m coverage run", ["pytest", "coverage run"]) is True assert proc_mod.matches_patterns("bash -c sleep", ["pytest"]) is False assert proc_mod.matches_patterns("", ["pytest"]) is False assert proc_mod.matches_patterns("pytest", []) is False # -- collect_candidates (fake /proc tree) ------------------------------------- def _stat_line(start_ticks, utime=0, stime=0): fields = ["0"] * 52 fields[0], fields[1], fields[2] = "999", "(python3)", "S" fields[13], fields[14], fields[21] = str(utime), str(stime), str(start_ticks) return " ".join(fields) def _write_proc(root, btime, procs): (root / "stat").write_text(f"cpu 1 2 3\nbtime {btime}\n") for pid, (cmdline, stat) in procs.items(): d = root / pid d.mkdir() (d / "cmdline").write_bytes(cmdline.replace(" ", "\x00").encode() + b"\x00") (d / "stat").write_text(stat) def test_collect_candidates_computes_age_and_cpu(tmp_path): # btime=1_000_000, starttime=200_000 ticks @ 100 Hz -> start epoch = 1_002_000. # now=1_010_000 -> age 8000s; utime+stime=300 ticks @100Hz -> cpu 3s. _write_proc( tmp_path, btime=1_000_000, procs={"200": ("python3 -m pytest tests/", _stat_line(200_000, utime=200, stime=100))}, ) recs = proc_mod.collect_candidates( ["pytest"], now=1_010_000.0, proc_root=str(tmp_path), clk_tck=100 ) assert len(recs) == 1 r = recs[0] assert r["pid"] == 200 assert r["cmdline"] == "python3 -m pytest tests/" assert abs(r["age_s"] - 8000.0) < 1e-6 assert abs(r["cpu_s"] - 3.0) < 1e-6 def test_collect_candidates_filters_by_pattern(tmp_path): _write_proc( tmp_path, btime=1_000_000, procs={ "100": ("claude --model x", _stat_line(0)), "200": ("python3 -m pytest a", _stat_line(0)), "300": ("/usr/bin/dockerd", _stat_line(0)), }, ) recs = proc_mod.collect_candidates( ["pytest"], now=1_010_000.0, proc_root=str(tmp_path), clk_tck=100 ) assert [r["pid"] for r in recs] == [200] def test_collect_candidates_skips_unreadable_pid(tmp_path): # A matching pid whose stat is unparseable (race: vanished mid-scan) is # skipped without dropping the rest. _write_proc( tmp_path, btime=1_000_000, procs={ "200": ("python3 -m pytest a", "garbage no parens"), "201": ("python3 -m pytest b", _stat_line(0)), }, ) recs = proc_mod.collect_candidates( ["pytest"], now=1_010_000.0, proc_root=str(tmp_path), clk_tck=100 ) assert [r["pid"] for r in recs] == [201] def test_collect_candidates_ignores_non_numeric_entries(tmp_path): _write_proc(tmp_path, btime=1_000_000, procs={"200": ("pytest", _stat_line(0))}) (tmp_path / "self").mkdir() # non-numeric -> ignored (tmp_path / "meminfo").write_text("noise") recs = proc_mod.collect_candidates( ["pytest"], now=1_000_000.0, proc_root=str(tmp_path), clk_tck=100 ) assert [r["pid"] for r in recs] == [200]