"""ORCH-026 — additive job_deps migration (TC-G01, AC-G4). The migration must be additive (CREATE TABLE/INDEX IF NOT EXISTS), idempotent, and safe on a pre-existing DB with data: existing columns of jobs/tasks/ agent_runs/events are untouched. """ import os import tempfile import pytest os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_migration.db") os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") import src.db as db # noqa: E402 from src.db import init_db, get_db # noqa: E402 @pytest.fixture def dbfile(tmp_path, monkeypatch): f = tmp_path / "mig.db" monkeypatch.setattr(db.settings, "db_path", str(f)) return f def _columns(conn, table): return [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] def test_job_deps_table_created(dbfile): init_db() conn = get_db() cols = _columns(conn, "job_deps") conn.close() assert set(cols) == {"task_id", "depends_on_task_id", "created_at"} def test_job_deps_indices_created(dbfile): init_db() conn = get_db() idx = {r[1] for r in conn.execute("PRAGMA index_list(job_deps)").fetchall()} conn.close() assert "idx_job_deps_task" in idx assert "idx_job_deps_depends" in idx def test_primary_key_idempotent_insert(dbfile): init_db() conn = get_db() conn.execute("INSERT OR IGNORE INTO job_deps (task_id, depends_on_task_id) VALUES (1, 2)") conn.execute("INSERT OR IGNORE INTO job_deps (task_id, depends_on_task_id) VALUES (1, 2)") conn.commit() n = conn.execute("SELECT COUNT(*) FROM job_deps").fetchone()[0] conn.close() assert n == 1, "PK (task_id, depends_on_task_id) prevents dup rows" def test_migration_idempotent_and_preserves_data(dbfile): # First init + seed legacy data. init_db() conn = get_db() conn.execute( "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " "VALUES ('ET-1','ET-1','enduro-trails','feature/x','development')" ) conn.execute( "INSERT INTO jobs (agent, repo, status) VALUES ('developer','enduro-trails','queued')" ) conn.commit() tasks_cols_before = _columns(conn, "tasks") jobs_cols_before = _columns(conn, "jobs") conn.close() # Re-run init_db (simulates a restart on a live DB) -> must be a no-op. init_db() conn = get_db() assert _columns(conn, "tasks") == tasks_cols_before, "tasks columns unchanged" assert _columns(conn, "jobs") == jobs_cols_before, "jobs columns unchanged" # Legacy data survives. assert conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0] == 1 assert conn.execute("SELECT COUNT(*) FROM jobs").fetchone()[0] == 1 conn.close()