"""ORCH-098 / TC-01..TC-12: the machine lessons-journal (src/lessons.py + db + wiring). Contract under test (ADR-001 §7 / acceptance-criteria): * the `lessons` table is additive + idempotent and carries the NULLABLE attribution columns (attribution / target_repo / target_domain) from the start; * record() inserts a row (auto/manual) and returns its id; auto records are deduped in a window, manual records are never deduped; * never-raise: a failing DB -> None/[]/{}/False, never an exception into the caller; * kill-switch off -> record/get/update/snapshot inert (no DB access); * get_lessons filters by type/status/repo/work_item + LIMIT + ORDER BY id DESC; * update_lesson mutates fields + stamps updated_at; unknown id is safe; * auto-record wiring: a QG rollback to development writes a `gate_failure` lesson; a launcher transient-budget-exhaustion writes a `transient_retry` lesson; a failing journal never breaks the hot path; * the HTTP endpoints (GET /lessons, POST /lessons, POST /lessons/{id}) and the GET /queue `lessons` block behave + honour the kill-switch; * pipeline invariants (STAGE_TRANSITIONS / QG_CHECKS) are structurally untouched. """ import os import tempfile os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_lessons.db") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import pytest # noqa: E402 import src.db as db # noqa: E402 from src import config as cfg # noqa: E402 from src import lessons # noqa: E402 _REPO = "orchestrator" _WI = "ORCH-098" @pytest.fixture(autouse=True) def fresh_db(tmp_path, monkeypatch): """Isolated tmp SQLite DB + journal ON by default.""" dbfile = tmp_path / "lessons.db" monkeypatch.setattr(db.settings, "db_path", str(dbfile)) monkeypatch.setattr(cfg.settings, "lessons_enabled", True, raising=False) monkeypatch.setattr(cfg.settings, "lessons_query_limit_default", 100, raising=False) monkeypatch.setattr(cfg.settings, "lessons_dedup_window_s", 3600, raising=False) db.init_db() yield def _columns(): conn = db.get_db() try: return {r[1] for r in conn.execute("PRAGMA table_info(lessons)").fetchall()} finally: conn.close() # =========================================================================== # TC-01 — additive + idempotent table with all BR-1 fields # =========================================================================== def test_tc01_table_idempotent_and_fields(): # Double init must not raise nor duplicate. db.init_db() db.init_db() cols = _columns() for f in ( "id", "created_at", "updated_at", "lesson_type", "work_item_id", "task_id", "stage", "agent", "repo", "root_cause", "suggestion", "status", "related_task", ): assert f in cols, f"missing column {f}" # No existing table mutated: tasks/jobs still present and unchanged in shape. conn = db.get_db() try: tabs = { r[0] for r in conn.execute( "SELECT name FROM sqlite_master WHERE type='table'" ).fetchall() } finally: conn.close() assert {"tasks", "jobs", "agent_runs", "lessons"} <= tabs # =========================================================================== # TC-02 — attribution columns present from the start, nullable, set later # =========================================================================== def test_tc02_attribution_columns_nullable_and_settable(): cols = _columns() assert {"attribution", "target_repo", "target_domain"} <= cols # A record WITHOUT attribution is accepted (NULL). lid = lessons.record(lessons.LessonType.DEPLOY_DEGRADED, work_item_id=_WI, repo=_REPO) assert lid is not None rows = lessons.get(work_item_id=_WI) assert rows[0]["attribution"] is None # Attribution can be filled in later via update. assert lessons.update( lid, attribution=lessons.Attribution.PLATFORM, target_repo=_REPO, target_domain=lessons.Domain.RELIABILITY, ) is True rows = lessons.get(work_item_id=_WI) assert rows[0]["attribution"] == "platform" assert rows[0]["target_domain"] == "reliability" # =========================================================================== # TC-03 — record() inserts and returns id, created_at filled, source honoured # =========================================================================== def test_tc03_record_inserts_and_returns_id(): lid = lessons.record( lessons.LessonType.GATE_FAILURE, work_item_id=_WI, task_id=7, stage="review", agent="reviewer", repo=_REPO, root_cause="REQUEST_CHANGES", source="auto", ) assert isinstance(lid, int) and lid > 0 rows = lessons.get(work_item_id=_WI) assert len(rows) == 1 r = rows[0] assert r["lesson_type"] == "gate_failure" assert r["task_id"] == 7 assert r["agent"] == "reviewer" assert r["source"] == "auto" assert r["status"] == "new" assert r["created_at"] # A manual record with a different (work_item, type) -> distinct row. lid2 = lessons.record("custom_manual", work_item_id="ORCH-1", source="manual") assert lid2 is not None and lid2 != lid # =========================================================================== # TC-04 — never-raise: a failing DB -> safe defaults, no exception # =========================================================================== def test_tc04_never_raise_on_db_error(monkeypatch): def boom(*a, **k): raise RuntimeError("db down") monkeypatch.setattr(db, "record_lesson", boom) monkeypatch.setattr(db, "lessons_recent_dup_exists", lambda *a, **k: False) monkeypatch.setattr(db, "get_lessons", boom) monkeypatch.setattr(db, "update_lesson", boom) monkeypatch.setattr(db, "lessons_snapshot", boom) assert lessons.record("gate_failure", work_item_id=_WI) is None assert lessons.get(work_item_id=_WI) == [] assert lessons.update(1, status="closed") is False snap = lessons.snapshot() assert snap == {"enabled": True} # never-raise -> minimal dict, no exception # =========================================================================== # TC-05 — kill-switch: lessons_enabled=False -> inert, no DB access # =========================================================================== def test_tc05_kill_switch_inert(monkeypatch): monkeypatch.setattr(cfg.settings, "lessons_enabled", False, raising=False) def fail(*a, **k): raise AssertionError("DB must NOT be touched when kill-switch is off") monkeypatch.setattr(db, "record_lesson", fail) monkeypatch.setattr(db, "get_lessons", fail) monkeypatch.setattr(db, "update_lesson", fail) monkeypatch.setattr(db, "lessons_snapshot", fail) assert lessons.record("gate_failure", work_item_id=_WI) is None assert lessons.get(work_item_id=_WI) == [] assert lessons.update(1, status="closed") is False assert lessons.snapshot() == {"enabled": False} # =========================================================================== # TC-06 — get_lessons filters + limit + ORDER BY id DESC # =========================================================================== def test_tc06_filters_limit_order(): # Seed rows directly via the DB helper (bypasses the leaf's auto-dedup). for i in range(5): db.record_lesson( lesson_type="gate_failure", work_item_id=f"ORCH-{i}", repo=_REPO, status="new", source="auto", ) db.record_lesson(lesson_type="merge_hold", work_item_id="ORCH-X", repo="enduro-trails", status="closed", source="auto") # Filter by type. gf = db.get_lessons(lesson_type="gate_failure") assert len(gf) == 5 and all(r["lesson_type"] == "gate_failure" for r in gf) # Filter by status. assert len(db.get_lessons(status="closed")) == 1 # Filter by repo. assert len(db.get_lessons(repo="enduro-trails")) == 1 # Filter by work_item. assert len(db.get_lessons(work_item_id="ORCH-3")) == 1 # LIMIT. assert len(db.get_lessons(lesson_type="gate_failure", limit=2)) == 2 # ORDER BY id DESC (newest first). allr = db.get_lessons(limit=100) got_ids = [r["id"] for r in allr] assert got_ids == sorted(got_ids, reverse=True) # =========================================================================== # TC-07 — update_lesson mutates + stamps updated_at; unknown id safe # =========================================================================== def test_tc07_update_and_unknown_id(): lid = db.record_lesson(lesson_type="deploy_degraded", work_item_id=_WI, repo=_REPO, status="new", source="auto") before = db.get_lessons(work_item_id=_WI)[0] assert before["updated_at"] is None ok = db.update_lesson( lid, status="in_progress", attribution="both", target_repo=_REPO, target_domain="reliability", related_task="ORCH-200", ) assert ok is True after = db.get_lessons(work_item_id=_WI)[0] assert after["status"] == "in_progress" assert after["attribution"] == "both" assert after["related_task"] == "ORCH-200" assert after["updated_at"] is not None # Unknown id -> no row changed, no raise. assert db.update_lesson(999999, status="closed") is False # Empty update (no recognised fields) -> False, safe. assert db.update_lesson(lid) is False # =========================================================================== # TC-07b — auto dedup vs manual always-writes (D4) # =========================================================================== def test_tc07b_auto_dedup_and_manual_passthrough(): a = lessons.record("transient_retry", work_item_id=_WI, stage="deploy", source="auto") b = lessons.record("transient_retry", work_item_id=_WI, stage="deploy", source="auto") assert a is not None and b is None # second auto deduped in-window # Manual is never deduped. m1 = lessons.record("transient_retry", work_item_id=_WI, stage="deploy", source="manual") m2 = lessons.record("transient_retry", work_item_id=_WI, stage="deploy", source="manual") assert m1 is not None and m2 is not None and m1 != m2 # Window=0 disables dedup. import src.config as c c.settings.lessons_dedup_window_s = 0 c2 = lessons.record("transient_retry", work_item_id=_WI, stage="deploy", source="auto") assert c2 is not None c.settings.lessons_dedup_window_s = 3600 # =========================================================================== # TC-08 — wiring: QG rollback to development writes a gate_failure lesson # =========================================================================== def test_tc08_gate_failure_autorecord(monkeypatch): from src import stage_engine as se # All side-effecting DB / notifier / plane ops on the rollback path are patched # to no-ops; only the lessons block reaches the (real tmp) DB — so we assert the # WIRING (rolled_back_to -> gate_failure lesson) without standing up a full task. for name in ("notify_stage_change", "plane_notify_stage", "send_telegram", "set_issue_in_progress", "plane_add_comment", "update_task_stage"): monkeypatch.setattr(se, name, lambda *a, **k: None, raising=False) monkeypatch.setattr(se, "extract_test_failures", lambda *a, **k: "", raising=False) monkeypatch.setattr(se, "_developer_retry_count", lambda *a, **k: 0, raising=False) monkeypatch.setattr(se, "enqueue_job", lambda *a, **k: 123, raising=False) result = se.AdvanceResult() se._handle_qg_failure_rollbacks( 99, "testing", _REPO, "ORCH-098", "feature/ORCH-098-fnd", agent="tester", qg_name="check_tests_passed", reason="2 failed", result=result, ) assert result.rolled_back_to == "development" rows = db.get_lessons(lesson_type="gate_failure", work_item_id="ORCH-098") assert len(rows) == 1 r = rows[0] assert r["stage"] == "testing" assert r["agent"] == "tester" assert r["repo"] == _REPO assert r["source"] == "auto" assert r["detail"] == "check_tests_passed" # =========================================================================== # TC-09 — wiring: launcher transient-budget-exhaustion writes a lesson; # a failing journal never breaks the hot path # =========================================================================== def test_tc09_transient_autorecord_and_never_raise(monkeypatch): from src.agents import launcher as lmod launcher = lmod.AgentLauncher() monkeypatch.setattr(launcher, "_notify_failed", lambda *a, **k: None) monkeypatch.setattr(launcher, "_record_outcome", lambda *a, **k: None) monkeypatch.setattr(cfg.settings, "transient_max_attempts", 3, raising=False) job_id = db.enqueue_job("developer", _REPO, "task", task_id=42) job = {"transient_attempts": 3, "task_id": 42, "repo": _REPO} # Budget exhausted (tattempts >= tmax) -> the failed branch records the lesson. launcher._finalize_transient(job_id, "developer", 1, 99, job, retry_after=None) rows = db.get_lessons(lesson_type="transient_retry") assert len(rows) == 1 assert rows[0]["repo"] == _REPO assert rows[0]["agent"] == "developer" assert rows[0]["source"] == "auto" # never-raise in the hot path: a failing record must not break finalisation. def boom(*a, **k): raise RuntimeError("journal down") monkeypatch.setattr(db, "record_lesson", boom) monkeypatch.setattr(db, "lessons_recent_dup_exists", lambda *a, **k: False) job_id2 = db.enqueue_job("developer", _REPO, "task2", task_id=43) job2 = {"transient_attempts": 3, "task_id": 43, "repo": _REPO} # Must NOT raise even though the journal insert blows up. launcher._finalize_transient(job_id2, "developer", 1, 99, job2, retry_after=None) # =========================================================================== # TC-10 — GET /lessons + GET /queue block; reads do not mutate # =========================================================================== def test_tc10_get_endpoints(monkeypatch): from fastapi.testclient import TestClient import src.main as main db.record_lesson(lesson_type="gate_failure", work_item_id=_WI, repo=_REPO, status="new", source="auto") db.record_lesson(lesson_type="merge_hold", work_item_id="ORCH-2", repo="enduro-trails", status="closed", source="auto") client = TestClient(main.app) r = client.get("/lessons") assert r.status_code == 200 body = r.json() assert body["enabled"] is True assert len(body["lessons"]) == 2 # Filters. r = client.get("/lessons", params={"type": "gate_failure"}) assert len(r.json()["lessons"]) == 1 r = client.get("/lessons", params={"repo": "enduro-trails"}) assert len(r.json()["lessons"]) == 1 r = client.get("/lessons", params={"limit": 1}) assert len(r.json()["lessons"]) == 1 # Reads do not mutate. assert db.lessons_snapshot()["total"] == 2 # GET /queue carries the read-only lessons block. q = client.get("/queue") assert q.status_code == 200 assert "lessons" in q.json() assert q.json()["lessons"]["enabled"] is True assert q.json()["lessons"]["total"] == 2 # =========================================================================== # TC-11 — POST /lessons (manual) + POST /lessons/{id} (update); kill-switch # =========================================================================== def test_tc11_post_endpoints_and_killswitch(monkeypatch): from fastapi.testclient import TestClient import src.main as main client = TestClient(main.app) # Manual create with attribution. r = client.post("/lessons", json={ "lesson_type": "process_gap", "work_item_id": _WI, "repo": _REPO, "attribution": "platform", "target_domain": "quality", "root_cause": "manual note", }) assert r.status_code == 200 lid = r.json()["id"] assert isinstance(lid, int) rows = db.get_lessons(work_item_id=_WI) assert rows[0]["source"] == "manual" assert rows[0]["attribution"] == "platform" # Missing lesson_type -> error, no row. r = client.post("/lessons", json={"work_item_id": "X"}) assert r.json()["ok"] is False # Update via POST /lessons/{id}. r = client.post(f"/lessons/{lid}", json={"status": "closed", "related_task": "ORCH-300"}) assert r.json()["ok"] is True assert db.get_lessons(work_item_id=_WI)[0]["status"] == "closed" # Kill-switch off -> endpoints report {"enabled": false}. monkeypatch.setattr(cfg.settings, "lessons_enabled", False, raising=False) assert client.get("/lessons").json() == {"enabled": False, "lessons": []} assert client.post("/lessons", json={"lesson_type": "x"}).json() == {"enabled": False} assert client.post(f"/lessons/{lid}", json={"status": "new"}).json() == {"enabled": False} # =========================================================================== # TC-12 — pipeline invariants structurally untouched # =========================================================================== def test_tc12_pipeline_invariants_untouched(): from src.stages import STAGE_TRANSITIONS from src.qg.checks import QG_CHECKS # The journal must not have added/removed a stage edge or a QG check. assert "development" in STAGE_TRANSITIONS assert "deploy" in STAGE_TRANSITIONS # machine-verdict QG checks still registered (sample of the canon set). for name in ("check_ci_green", "check_tests_passed", "check_coverage_gate"): assert name in QG_CHECKS # The journal is NOT a quality gate — no check named after it. assert not any("lesson" in k.lower() for k in QG_CHECKS)