From a74379f657c33c154989ee9b458a2418bb50101d Mon Sep 17 00:00:00 2001 From: claude-bot Date: Mon, 8 Jun 2026 19:06:22 +0300 Subject: [PATCH] feat(ORCH-026): task dependencies (B waits for A) + single-repo merge serialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Level A — merge/deploy serialization within one repo: reuse the existing ORCH-043/065 merge-lease (no new mechanism); the only new logic is an unconditional pre-merge rebase in check_branch_mergeable — under the held lease, auto_rebase_onto_main is ALWAYS called when premerge_rebase_always (default True), not just when the branch is behind. No-op on an up-to-date branch (rebase keeps HEAD, force-with-lease -> "Everything up-to-date", CI not triggered). Kill-switch off -> ORCH-043 behaviour 1:1. Level B — declarative task dependencies: additive job_deps table (CREATE ... IF NOT EXISTS, no live-DB migration); claim_next_job gate (NOT EXISTS) defers a job whose depends-on tasks are not yet 'done' without occupying a max_concurrency slot; inert on empty job_deps -> zero regression. New leaf src/task_deps.py (never-raise): is_task_ready (fail-open), DFS cycle detection + Blocked/alert, declare/ingest_plane_relations (db source never hits the network on the hot path), snapshot. Telegram waiting-line, /queue observability, reconciler skip + cycle backstop, reaper untouched. Invariants unchanged: STAGE_TRANSITIONS, QG_CHECKS registry (dep gate is a claim_next_job врезка, not a registered QG), DB schema of existing tables, HTTP endpoints; non-self repos remain a no-op on empty deps/scope. Flags: ORCH_PREMERGE_REBASE_ALWAYS, ORCH_TASK_DEPS_ENABLED, ORCH_TASK_DEPS_SOURCE. Docs: docs/architecture/README.md, CLAUDE.md, .env.example, CHANGELOG.md, adr-0015. Tests: tests/test_orch026_*.py (64 tests); full suite 991 green. Refs: ORCH-026 Co-Authored-By: Claude Opus 4.7 --- .env.example | 16 + CHANGELOG.md | 1 + CLAUDE.md | 2 +- src/config.py | 31 ++ src/db.py | 130 ++++++++ src/main.py | 4 + src/notifications.py | 16 + src/plane_sync.py | 66 ++++ src/qg/checks.py | 13 +- src/reconciler.py | 23 ++ src/task_deps.py | 335 ++++++++++++++++++++ src/webhooks/plane.py | 11 + tests/test_merge_gate_race.py | 5 + tests/test_orch026_conditionality.py | 118 +++++++ tests/test_orch026_dep_cycles.py | 136 ++++++++ tests/test_orch026_dep_visibility.py | 79 +++++ tests/test_orch026_deps_integration.py | 124 ++++++++ tests/test_orch026_merge_serialize.py | 95 ++++++ tests/test_orch026_migration.py | 83 +++++ tests/test_orch026_premerge_rebase.py | 82 +++++ tests/test_orch026_queue_observability.py | 90 ++++++ tests/test_orch026_serialize_integration.py | 65 ++++ tests/test_orch026_task_deps.py | 157 +++++++++ tests/test_qg_merge_gate.py | 6 + 24 files changed, 1686 insertions(+), 2 deletions(-) create mode 100644 src/task_deps.py create mode 100644 tests/test_orch026_conditionality.py create mode 100644 tests/test_orch026_dep_cycles.py create mode 100644 tests/test_orch026_dep_visibility.py create mode 100644 tests/test_orch026_deps_integration.py create mode 100644 tests/test_orch026_merge_serialize.py create mode 100644 tests/test_orch026_migration.py create mode 100644 tests/test_orch026_premerge_rebase.py create mode 100644 tests/test_orch026_queue_observability.py create mode 100644 tests/test_orch026_serialize_integration.py create mode 100644 tests/test_orch026_task_deps.py diff --git a/.env.example b/.env.example index 55cd373..6c5b037 100644 --- a/.env.example +++ b/.env.example @@ -50,6 +50,22 @@ ORCH_MERGE_RETEST_TARGET=tests/ ORCH_MERGE_LOCK_TIMEOUT_S=300 ORCH_MERGE_DEFER_DELAY_S=60 ORCH_MERGE_DEFER_MAX_ATTEMPTS=5 +# ORCH-026 Level A: unconditional pre-merge rebase. With the flag ON (default), +# check_branch_mergeable ALWAYS rebases the branch onto origin/main under the held +# merge-lease (not only when behind) — a deterministic structural anti-phantom on +# the scheduler edge. No-op on an up-to-date branch (rebase keeps HEAD, force-with- +# lease -> "Everything up-to-date", CI not triggered). Scope = ORCH_MERGE_GATE_REPOS. +# PREMERGE_REBASE_ALWAYS=false -> strictly pre-ORCH-026 (rebase only when behind). +ORCH_PREMERGE_REBASE_ALWAYS=true +# ORCH-026 Level B: declarative task dependencies ("B waits for A"). claim_next_job +# gates jobs whose depends-on tasks are not yet 'done' (additive job_deps table, +# NOT EXISTS) WITHOUT occupying a max_concurrency slot. Inert on an empty job_deps. +# TASK_DEPS_ENABLED=false -> claim query is 1:1 the ORCH-1 query (no gate). +# TASK_DEPS_SOURCE=db|plane|hybrid -> declaration source; db (default) never calls +# Plane on the hot path; plane/hybrid ingest Plane `blocked-by` relations and +# cache them into job_deps (the scheduler then reads only the DB). +ORCH_TASK_DEPS_ENABLED=true +ORCH_TASK_DEPS_SOURCE=db # ORCH-071/073: merge-verify under-gate on the `deploy -> done` edge (врезка in # advance_stage, NOT a new STAGE_TRANSITIONS edge / registered QG). A deterministic # merge-actor merges the feature code-PR via the Gitea PR-merge API (never push/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 531cd63..8b1459c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ Формат: [Keep a Changelog](https://keepachangelog.com/). Записи — на смысловой PR/задачу. ## [Unreleased] +- **Управление зависимостями задач (B ждёт A) + сериализация мержа одного репо** (ORCH-026): два уровня по ADR-001, оба условны (kill-switch + CSV-область, never-raise), без новой стадии и без изменения `STAGE_TRANSITIONS`/реестра `QG_CHECKS`. **Уровень A — сериализация merge/deploy внутри одного репо:** переиспользует существующий merge-lease ORCH-043/065 (никакого нового механизма); единственная новая логика — **безусловный pre-merge rebase**: в `check_branch_mergeable` (`src/qg/checks.py`) под удержанным лизом при флаге `premerge_rebase_always` (дефолт `True`) `auto_rebase_onto_main` вызывается **всегда** (а не только при `branch_is_behind_main`) — детерминированный структурный анти-фантом на ребре планировщика, дополняющий рубежи ORCH-073. На актуальной ветке это no-op (rebase не сдвигает HEAD, `push --force-with-lease` → «Everything up-to-date», CI не триггерится); kill-switch `premerge_rebase_always=False` → прежнее поведение ORCH-043 1:1. Окно сериализации «merge → main-updated» per-repo (для self `done` ⇔ SHA-in-main, ORCH-073): пока A не в `main`, B того же репо получает `merge-lock busy` → defer (не откат); кросс-репо параллелизм сохранён (лиз — per-repo файл). **Уровень B — декларативные зависимости задач:** аддитивная таблица `job_deps(task_id, depends_on_task_id)` (идемпотентный `CREATE TABLE/INDEX IF NOT EXISTS` в `init_db`, без миграции на живой БД); гейт планировщика в `claim_next_job` (`src/db.py`) — `NOT EXISTS (job_deps d JOIN tasks t … WHERE d.task_id=jobs.task_id AND t.stage!='done')` при `task_deps_enabled`: задача с незавершённой зависимостью **не выбирается** и слот `max_concurrency` не занимает; инертно при пустой `job_deps` → нулевая регрессия, kill-switch `task_deps_enabled=False` → запрос 1:1 как ORCH-1. Новый leaf-модуль `src/task_deps.py` (контракт never-raise): `is_task_ready` (fail-open → ready), DFS-детектор циклов (`detect_cycle`/`find_any_cycle`, итеративный WHITE/GREY/BLACK), `handle_cycle` (`set_issue_blocked` по каждой задаче цикла + один Telegram-alert с цепочкой «A → B → A»), `declare_dependency` (вставка + детект цикла), `ingest_plane_relations` (только для `task_deps_source=plane|hybrid`: резолв Plane `blocked-by` UUID → локальный task → запись в `job_deps`; источник истины горячего цикла остаётся БД, дефолт `db` НЕ ходит в сеть на claim), `snapshot` (read-only сводка). Видимость: строка «⏳ ждёт ORCH-NNN» в Telegram-карточке (`src/notifications.py`, never-raise, инвариант «одна карточка на задачу» сохранён); блок `task_deps` в `GET /queue` (`src/main.py`). Совместимость: `reconciler` F-1 пропускает dep-заблокированные задачи (`is_task_ready`, паттерн ORCH-060) + backstop-детект цикла; `job_reaper` сканирует только `running` → dep-блок остаётся `queued`. Зависимости — только intra-repo (v1). Новые настройки: `ORCH_PREMERGE_REBASE_ALWAYS` (true), `ORCH_TASK_DEPS_ENABLED` (true), `ORCH_TASK_DEPS_SOURCE` (db). Инварианты НЕ менялись: `STAGE_TRANSITIONS`, реестр `QG_CHECKS` (гейт зависимостей — врезка в `claim_next_job`, НЕ зарегистрированный QG), схема `tasks`/`jobs`/`agent_runs`, внешние HTTP-эндпоинты; non-self (enduro) — no-op при пустых `job_deps`/области. ADR `docs/work-items/ORCH-026/06-adr/ADR-001-merge-serialization-and-task-deps.md`, глобальный `docs/architecture/adr/adr-0015-task-deps-and-merge-serialization.md`. Документация: `docs/architecture/README.md`, `CLAUDE.md`, `.env.example`. Тесты: `tests/test_orch026_premerge_rebase.py`, `tests/test_orch026_merge_serialize.py`, `tests/test_orch026_conditionality.py`, `tests/test_orch026_task_deps.py`, `tests/test_orch026_dep_cycles.py`, `tests/test_orch026_dep_visibility.py`, `tests/test_orch026_migration.py`, `tests/test_orch026_queue_observability.py`, `tests/test_orch026_serialize_integration.py`, `tests/test_orch026_deps_integration.py`. - **CRIT: системный фикс эрозии `main` — SHA-в-main как единственный критерий merge-verify + регресс-гард + `.gitattributes`** (ORCH-073): устранён корень фантомного merge, из-за которого код задач ORCH-067 (`plane_issue_link`) и ORCH-069 (`qg0_title_max`) дошёл до `done`, но физически отсутствовал в `origin/main` (в `main` попадали только их авто docs-PR). **(FR-1)** `merge_gate.verify_merged_to_main` подтверждает merge **ТОЛЬКО** прямым фактом `git merge-base --is-ancestor origin/main` — OR-ветка `pr_already_merged` удалена (merged PR больше не подтверждает merge); пустой SHA / git-ошибка → `False` (fail-closed, never-raise). **(FR-2)** `pr_already_merged` понижен до idempotency-guard для `merge_pr` и засчитывает PR лишь при `merged & head.ref== & base.ref=="main"` (явный in-loop фильтр вместо ненадёжного query-параметра `head` — исключает авто docs-PR). **(FR-3)** `merge_pr` выбирает open code-PR строго по `head.ref==` И `base.ref=="main"`; merge только через Gitea PR-merge API, никогда push/force-push в `main`. **(FR-5)** новый детерминированный регресс-гард `merge_gate.check_main_regression` в `_handle_merge_verify` ПОСЛЕ подтверждённого SHA-в-main и ДО `done` проверяет, что `origin/main` содержит декларативный append-only набор маркеров ранее-merged задач (`MAIN_REGRESSION_MARKERS`, `git grep -c origin/main -- `); детерминированный `count==0` → alert «main regressed» + HOLD (`set_issue_blocked` + Telegram + Plane, задача НЕ `done`, БЕЗ авто-отката на `development`), git-ошибка самого грепа → fail-OPEN (не блокирует, SHA-в-main остаётся первичным гейтом). Kill-switch `ORCH_REGRESSION_GUARD_ENABLED` (дефолт `true`), область — `merge_verify_applies` (self-hosting / `merge_verify_repos`), non-self → no-op. **(FR-4)** корневой `.gitattributes` с `CHANGELOG.md merge=union` — правки `## [Unreleased]` авто-сливаются при `auto_rebase_onto_main` без конфликта (обе записи сохраняются), ветка не откатывается в `development` и не тащит устаревший код-сосед; `docs/**` под union НЕ ставится. `GET /queue::merge_verify_status` дополнен счётчиком `main_regressed_alerts_total` (read-only). Инварианты НЕ менялись: `STAGE_TRANSITIONS`, реестр `QG_CHECKS` (под-гейт — врезка в `advance_stage`), `check_deploy_status`/`_parse_deploy_status`, merge-gate, image-freshness, схема БД, внешние HTTP-эндпоинты; non-self (enduro) merge/verify/гард — no-op (INV-5); ручной `Confirm Deploy` сохранён. ADR `docs/work-items/ORCH-073/06-adr/ADR-001-merge-verify-sha-truth-and-regression-guard.md` (+ сквозной `adr-0014`). Документация: `docs/architecture/README.md`, `.env.example`. Тесты: `tests/test_orch073_*.py` (TC-01..18). - **Конфигурируемый верхний лимит длины заголовка QG-0 (`ORCH_QG0_TITLE_MAX`, дефолт 200)** (ORCH-069): хардкод `if len(name) > 80` во входной валидации `_qg0_errors` (`src/webhooks/plane.py`) вынесен в настраиваемый параметр `Settings.qg0_title_max` (env `ORCH_QG0_TITLE_MAX`, дефолт 200). Лимит 80 был гигиеническим, а не структурным (slug режется независимо `[:30]`, `tasks.title TEXT` без ограничения), поэтому валидные заголовки 81–200 символов отклонялись на входе без бизнес-причины. Лимит читается из `settings.qg0_title_max` динамически на каждый вызов (тесты патчат значение), текст ошибки подставляет актуальное число; граница строгая (`len > limit` → FAIL, `len == limit` → PASS). **Graceful-деградация (AC-3, self-hosting safety):** пустое/нечисловое значение env не роняет процесс на старте — `field_validator(mode="before")` `_qg0_title_max_default` в `src/config.py` перехватывает сырое env ДО `int`-парсинга pydantic и при невалидном/пустом входе возвращает дефолт 200 (never-raise), гася `ValidationError`. Чисто аддитивно и обратносовместимо: дефолт 200 > прежних 80 → все ранее проходившие заголовки проходят (AC-7). Инварианты НЕ менялись: `STAGE_TRANSITIONS`, реестр `QG_CHECKS` (QG-0 — inline-валидация входа, не зарегистрированный stage-gate), схема БД, slug-логика `[:30]`, нижние лимиты (`< 5` title, `< 20` description), soft-QG-0 поведение (warning на `work_item.created`), API. ADR `docs/work-items/ORCH-069/06-adr/ADR-001-configurable-qg0-title-limit.md`. Документация: `.env.example`, `.env.staging.example`. Тесты: `tests/test_qg0_title_limit.py`. diff --git a/CLAUDE.md b/CLAUDE.md index 7b3b780..dcbdd29 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -7,7 +7,7 @@ - Backend: FastAPI + uvicorn (Python 3.12) - БД: SQLite (`src/db.py`) - Агенты: Claude CLI (`ORCH_CLAUDE_BIN`), по одному промпту на роль в `.openclaw/agents/` -- Очередь задач: собственная (SQLite `jobs`, `src/queue_worker.py`, ORCH-1) +- Очередь задач: собственная (SQLite `jobs`, `src/queue_worker.py`, ORCH-1). **ORCH-026:** `claim_next_job` гейтит задачи с незавершёнными зависимостями (`job_deps`, `NOT EXISTS`) без занятия слота `max_concurrency`; декларации/детект циклов — leaf `src/task_deps.py` (kill-switch `ORCH_TASK_DEPS_ENABLED`). Сериализация мержа одного репо — безусловный pre-merge rebase под merge-lease (`ORCH_PREMERGE_REBASE_ALWAYS`). - Контейнеризация: Docker + Compose - CI/CD: Gitea Actions (`.gitea/workflows/`) - Деплой: docker compose на mva154 diff --git a/src/config.py b/src/config.py index 65e078e..dc93615 100644 --- a/src/config.py +++ b/src/config.py @@ -396,6 +396,37 @@ class Settings(BaseSettings): merge_pr_timeout_s: int = 60 merge_verify_timeout_s: int = 60 + # ORCH-026: intra-repo merge serialisation (Level A) + declarative task + # dependencies (Level B). Level A reuses the ORCH-043/065 merge-lease window + # (no new mechanism) — the merge-lease already serialises "merge -> main-updated" + # per repo; the ONLY new behaviour is an unconditional pre-merge rebase. Level B + # adds a new ADDITIVE job_deps table + a NOT EXISTS gate in claim_next_job. Both + # features are inert without data (no applicable repo / no declared deps) -> + # zero regression for enduro-trails. + # premerge_rebase_always -> Level A (A-2): when True, check_branch_mergeable + # ALWAYS rebases the task branch onto the CURRENT + # origin/main UNDER the merge-lease (not only when + # branch_is_behind_main) — a deterministic anti-phantom + # that does not depend on the ancestor check's precision. + # auto_rebase_onto_main is a cheap no-op on an already + # up-to-date branch (rc 0, push up-to-date, CI not + # retriggered). Scope = merge_gate_repos (empty -> + # self-hosting). Kill-switch (False -> exactly the + # ORCH-043 behaviour: rebase only when behind). Env + # ORCH_PREMERGE_REBASE_ALWAYS. + # task_deps_enabled -> Level B (B-2): global kill-switch for the scheduler + # dependency gate. False -> claim_next_job is 1:1 as + # ORCH-1 (the NOT EXISTS clause is omitted). Inert when + # job_deps is empty. Env ORCH_TASK_DEPS_ENABLED. + # task_deps_source -> declaration source: db|plane|hybrid (default db). + # The scheduler ALWAYS reads the DB cache (offline-safe + # hot path); plane/hybrid additionally ingest Plane + # `blocked-by` relations into job_deps at task creation. + # Env ORCH_TASK_DEPS_SOURCE. + premerge_rebase_always: bool = True + task_deps_enabled: bool = True + task_deps_source: str = "db" + # ORCH-073 (ADR-001 Р-4): main-integrity regression guard. After the merge-verify # under-gate confirms the deployed SHA is an ancestor of origin/main (FR-1), a # secondary deterministic (no-LLM) guard checks that a declarative set of markers diff --git a/src/db.py b/src/db.py index bbe0e5b..579ec04 100644 --- a/src/db.py +++ b/src/db.py @@ -123,6 +123,24 @@ def init_db(): # tracker can show "твоё время" without recomputing from activity history. _ensure_column(conn, "tasks", "brd_review_started_at", "TEXT") _ensure_column(conn, "tasks", "brd_review_ended_at", "TEXT") + # ORCH-026 (Level B): declarative task dependencies. job_deps stores the + # directed edge "task_id (B) is blocked-by depends_on_task_id (A)". The + # scheduler gate in claim_next_job keeps B queued until every A reaches + # tasks.stage='done'. Purely ADDITIVE (CREATE TABLE/INDEX IF NOT EXISTS, no + # change to jobs/tasks/agent_runs/events columns) -> idempotent and safe on + # the live shared prod DB (enduro-trails data untouched). The logical FK on + # tasks.id is intentional (no REFERENCES, mirrors jobs.task_id) so the + # migration cannot fail on a pre-existing DB. See 08-data-requirements.md. + conn.executescript(""" + CREATE TABLE IF NOT EXISTS job_deps ( + task_id INTEGER NOT NULL, + depends_on_task_id INTEGER NOT NULL, + created_at TEXT DEFAULT (datetime('now')), + PRIMARY KEY (task_id, depends_on_task_id) + ); + CREATE INDEX IF NOT EXISTS idx_job_deps_task ON job_deps(task_id); + CREATE INDEX IF NOT EXISTS idx_job_deps_depends ON job_deps(depends_on_task_id); + """) conn.commit() conn.close() @@ -466,12 +484,28 @@ def claim_next_job() -> dict | None: so the SELECT+UPDATE pair is consistent. Returns the claimed job dict or None when the queue is empty. """ + # ORCH-026 (Level B, B-2): scheduler dependency gate. When task_deps_enabled + # is on, a job whose task has an UNFINISHED declared dependency + # (job_deps.depends_on_task_id -> a task with stage != 'done') is NOT + # claimable -> it stays 'queued' without occupying a max_concurrency slot. + # Jobs with a NULL task_id (no task) or with no job_deps rows are unaffected + # (NOT EXISTS is True). Kill-switch off -> the clause is omitted -> 1:1 the + # ORCH-1 query. The gate reads only the DB (offline-safe hot path). + dep_gate = "" + if getattr(settings, "task_deps_enabled", False): + dep_gate = ( + "AND NOT EXISTS (" + " SELECT 1 FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id " + " WHERE d.task_id = jobs.task_id AND t.stage != 'done'" + ") " + ) conn = get_db() try: while True: row = conn.execute( "SELECT id FROM jobs WHERE status='queued' " "AND (available_at IS NULL OR available_at <= datetime('now')) " + f"{dep_gate}" "ORDER BY id LIMIT 1" ).fetchone() if not row: @@ -705,6 +739,102 @@ def recent_jobs(limit: int = 10) -> list[dict]: return [dict(r) for r in rows] +# --------------------------------------------------------------------------- +# ORCH-026 (Level B): declarative task-dependency helpers +# --------------------------------------------------------------------------- + +def add_dependency(task_id: int, depends_on_task_id: int) -> bool: + """Declare that task ``task_id`` (B) is blocked-by ``depends_on_task_id`` (A). + + Idempotent INSERT OR IGNORE against the job_deps PK (re-declaring the same + edge is a no-op). A self-edge (task depends on itself) is rejected — it would + deadlock the task forever and can never be satisfied. never-raise + (self-hosting safety, AC-G1): any DB error -> returns False, the caller must + not crash the webhook / worker. Returns True iff a NEW edge row was inserted. + """ + if task_id is None or depends_on_task_id is None: + return False + if task_id == depends_on_task_id: + return False + try: + conn = get_db() + try: + cur = conn.execute( + "INSERT OR IGNORE INTO job_deps (task_id, depends_on_task_id) " + "VALUES (?, ?)", + (task_id, depends_on_task_id), + ) + conn.commit() + return cur.rowcount == 1 + finally: + conn.close() + except Exception: + return False + + +def get_dependencies(task_id: int) -> list[int]: + """Return the list of depends_on_task_id (A) that ``task_id`` (B) waits for. + + never-raise: any DB error -> [] (conservative: caller treats the task as + having no declared dependency rather than crashing). + """ + try: + conn = get_db() + try: + rows = conn.execute( + "SELECT depends_on_task_id FROM job_deps WHERE task_id = ?", + (task_id,), + ).fetchall() + finally: + conn.close() + return [r[0] for r in rows] + except Exception: + return [] + + +def get_dependency_edges() -> list[tuple[int, int]]: + """Return ALL declared edges as ``(task_id, depends_on_task_id)`` tuples. + + Used by the cycle detector (DFS over the whole declared graph) and the + /queue snapshot. never-raise -> [] on any DB error. + """ + try: + conn = get_db() + try: + rows = conn.execute( + "SELECT task_id, depends_on_task_id FROM job_deps" + ).fetchall() + finally: + conn.close() + return [(r[0], r[1]) for r in rows] + except Exception: + return [] + + +def get_unfinished_dependencies(task_id: int) -> list[dict]: + """Return the UNFINISHED dependencies of ``task_id`` (A's not yet 'done'). + + Each dict carries the predecessor's ``id``, ``work_item_id`` and ``stage`` + so the readiness gate / Telegram waiting-line can name what B is waiting for. + never-raise -> [] on any DB error (treated as "ready", consistent with the + scheduler omitting the gate on failure). + """ + try: + conn = get_db() + try: + rows = conn.execute( + "SELECT t.id AS id, t.work_item_id AS work_item_id, t.stage AS stage " + "FROM job_deps d JOIN tasks t ON t.id = d.depends_on_task_id " + "WHERE d.task_id = ? AND t.stage != 'done'", + (task_id,), + ).fetchall() + finally: + conn.close() + return [dict(r) for r in rows] + except Exception: + return [] + + # --------------------------------------------------------------------------- # ORCH-1b (resilience): transient backoff helpers # --------------------------------------------------------------------------- diff --git a/src/main.py b/src/main.py index cc23797..a602b24 100644 --- a/src/main.py +++ b/src/main.py @@ -148,6 +148,7 @@ async def queue(): from .job_reaper import reaper from . import post_deploy from . import merge_gate + from . import task_deps return { "counts": job_status_counts(), "max_concurrency": worker.max_concurrency, @@ -157,5 +158,8 @@ async def queue(): "reaper": reaper.status(), "post_deploy": post_deploy.status(), "merge_verify": merge_gate.merge_verify_status(), + # ORCH-026 (G-2): declarative task-dependency observability (read-only, + # NOT a source of truth) — declared edges, blocked tasks, detected cycle. + "task_deps": task_deps.snapshot(), "recent": recent_jobs(10), } diff --git a/src/notifications.py b/src/notifications.py index a688fd1..1af7dad 100644 --- a/src/notifications.py +++ b/src/notifications.py @@ -380,6 +380,22 @@ def render_task_tracker(task_id: int) -> str: status_line = f"\U0001f4cd {status_label}" lines = [header, status_line, bar] + # ORCH-026 (B-4): waiting-line for a task blocked by an unfinished declared + # dependency. Shows WHAT the task is waiting on ("⏳ ждёт ORCH-NNN"), + # so the single tracker card (invariant preserved) makes the wait visible. + # Never breaks the render: any error -> no waiting-line. + if not done: + try: + from . import task_deps + from .config import settings as _settings + if getattr(_settings, "task_deps_enabled", False): + ready, waiting_on = task_deps.is_task_ready(task_id) + if not ready and waiting_on: + waits = ", ".join(link_for(w) for w in waiting_on) + lines.append(f"⏳ ждёт {waits}") + except Exception: + pass + def _stage_line(label, run): usage = { "input_tokens": run["input_tokens"], diff --git a/src/plane_sync.py b/src/plane_sync.py index ca2ad62..f2e31fb 100644 --- a/src/plane_sync.py +++ b/src/plane_sync.py @@ -433,6 +433,72 @@ def fetch_issue_state(issue_id: str, project_id: str, timeout: int = 10) -> str return None +def fetch_blocked_by_issue_ids(issue_id: str, project_id: str, timeout: int = 10) -> list[str]: + """ORCH-026 (B-1): list the Plane issue UUIDs that ``issue_id`` is BLOCKED-BY. + + Reads the Plane issue-relation endpoint and returns the related issue UUIDs + declared as ``blocked_by`` (i.e. the predecessors A that this task B waits + for). Plane's relation payload shape has varied across versions, so the parse + is defensive: it accepts either a grouped object (``{"blocked_by": [...]}``) + or a flat list of ``{"relation_type": ..., "related_issue": ...}`` rows, and + pulls a uuid from ``related_issue`` / ``issue`` / ``id`` (bare uuid or nested + ``{"id": ...}``). + + never-raise (AC-G1, self-hosting): a Plane outage / non-2xx / unexpected + shape -> ``[]`` (no edge declared), so the ingestion degrades conservatively + and the pipeline never stalls on the network. + """ + if not issue_id or not project_id: + return [] + url = ( + f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{project_id}" + f"/issues/{issue_id}/issue-relation/" + ) + try: + resp = httpx.get(url, headers=PLANE_HEADERS, timeout=timeout) + resp.raise_for_status() + body = resp.json() + except Exception as e: + logger.warning(f"fetch_blocked_by_issue_ids failed for {issue_id}: {e}") + return [] + + def _uuid_of(row) -> str | None: + if isinstance(row, str): + return row + if isinstance(row, dict): + for key in ("related_issue", "issue", "id"): + v = row.get(key) + if isinstance(v, dict): + v = v.get("id") + if v: + return str(v) + return None + + out: list[str] = [] + try: + rows = [] + if isinstance(body, dict): + # Grouped shape: {"blocked_by": [...], "blocking": [...], ...} + if "blocked_by" in body and isinstance(body["blocked_by"], list): + rows = body["blocked_by"] + else: + # Flat shape nested under common envelope keys. + rows = body.get("results") or body.get("relations") or [] + elif isinstance(body, list): + rows = body + for row in rows: + # In the flat shape, keep only blocked_by rows. + if isinstance(row, dict) and row.get("relation_type") not in (None, "blocked_by"): + continue + uid = _uuid_of(row) + if uid and uid != issue_id: + out.append(uid) + except Exception as e: + logger.warning(f"fetch_blocked_by_issue_ids parse error for {issue_id}: {e}") + return [] + return out + + import re as _re diff --git a/src/qg/checks.py b/src/qg/checks.py index 2c95d84..78db3c4 100644 --- a/src/qg/checks.py +++ b/src/qg/checks.py @@ -673,8 +673,19 @@ def check_branch_mergeable(repo: str, work_item_id: str, branch: str) -> tuple[b return False, reason try: + # ORCH-026 (Level A, A-2): proactive pre-merge rebase. When + # premerge_rebase_always is on, ALWAYS rebase onto the CURRENT + # origin/main under the held lease — even when branch_is_behind_main + # says "not behind". The ancestor check can miss a divergence + # (squash/force-push history, ORCH-073 phantom-merge class), so an + # unconditional rebase is a deterministic anti-phantom: it guarantees + # B carries A's code before merge. auto_rebase_onto_main is a cheap + # no-op on an already up-to-date branch (rc 0, push up-to-date, CI not + # retriggered). Kill-switch off -> 1:1 the ORCH-043 short-circuit + # below (rebase only when behind). + always = bool(getattr(settings, "premerge_rebase_always", False)) # Double-check under the lease: another task may have just merged. - if not merge_gate.branch_is_behind_main(repo, branch): + if not always and not merge_gate.branch_is_behind_main(repo, branch): logger.info("check_branch_mergeable: %s up-to-date with main", branch) return True, "branch up-to-date with main" diff --git a/src/reconciler.py b/src/reconciler.py index 5ae330e..a442f54 100644 --- a/src/reconciler.py +++ b/src/reconciler.py @@ -69,6 +69,7 @@ from .plane_sync import ( from .webhooks.plane import handle_status_start, handle_verdict from .notifications import send_telegram, link_for from . import projects +from . import task_deps logger = logging.getLogger("orchestrator.reconciler") @@ -165,6 +166,16 @@ class Reconciler: f"reconciler F-1: task {task.get('id')} " f"(stage={task.get('stage')}) failed: {e}" ) + # ORCH-026 (B-3) backstop: surface ANY dependency deadlock in the declared + # graph, even one whose tasks are not individually evaluated above (e.g. no + # active queued job). One alert per cycle; never-raise. + if settings.task_deps_enabled: + try: + cyc = task_deps.find_any_cycle() + if cyc: + task_deps.handle_cycle(cyc) + except Exception as e: # noqa: BLE001 - never break the sweep + logger.error(f"reconciler F-1: cycle backstop failed: {e}") def _reconcile_gate_task(self, task: dict) -> None: task_id = task["id"] @@ -194,6 +205,18 @@ class Reconciler: # Networked; runs after Guard 1 so escalated tasks never hit Plane. if self._is_blocked_or_needs_input(task): return + # ORCH-026 Guard 3 (B-5): a task blocked by an unfinished declared + # dependency is legitimately waiting, NOT stuck -> F-1 must not advance it + # past its depends-on (mirrors the Blocked/Needs-Input skip). Local DB, + # never-raise (is_task_ready fails OPEN). If the wait is actually a + # dependency DEADLOCK (cycle), surface it (Blocked + alert) once. + if settings.task_deps_enabled: + ready, _waiting = task_deps.is_task_ready(task_id) + if not ready: + cyc = task_deps.detect_cycle(task_id) + if cyc: + task_deps.handle_cycle(cyc) + return result = advance_if_gate_passed( task_id, stage, diff --git a/src/task_deps.py b/src/task_deps.py new file mode 100644 index 0000000..97c1353 --- /dev/null +++ b/src/task_deps.py @@ -0,0 +1,335 @@ +"""ORCH-026 (Level B): declarative task-dependency logic. + +Leaf module — pure, unit-testable functions over the additive ``job_deps`` table +(see src/db.py / 08-data-requirements.md). It answers two questions the rest of +the pipeline asks: + + * "is task B ready to run?" — every declared predecessor A reached + ``tasks.stage = 'done'`` (``is_task_ready``). The scheduler gate in + ``db.claim_next_job`` enforces the same predicate in SQL; this Python copy is + for the reconciler skip and for naming WHAT a task waits on (visibility). + * "is there a dependency deadlock?" — a directed cycle A->B->A (or longer) can + never be satisfied, so the tasks in it would wait forever. ``detect_cycle`` / + ``find_any_cycle`` find one deterministically; ``handle_cycle`` escalates it + to Blocked + alert so the deadlock is visible instead of silent. + +never-raise contract (AC-G1, self-hosting safety): EVERY public function +degrades conservatively on any error (DB/import) and NEVER propagates an +exception into the worker / reconciler / webhook. Readiness fails OPEN +(``True``) so a transient DB error cannot wedge the whole queue; cycle detection +fails CLOSED-safe (``None`` = "no cycle proven", do not block). +""" +from __future__ import annotations + +import logging + +from . import db +from .config import settings + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Readiness gate (B-2) +# --------------------------------------------------------------------------- + +def is_task_ready(task_id: int) -> tuple[bool, list[str]]: + """Return ``(ready, waiting_on)`` for a task. + + ``ready`` is True when the task has no declared dependency whose predecessor + is still un-done (``tasks.stage != 'done'``). ``waiting_on`` is the list of + predecessor work-item ids (e.g. ``["ORCH-010"]``) the task is still blocked + by — used for the Telegram waiting-line / Plane visibility. + + never-raise: any error -> ``(True, [])`` (fail OPEN — consistent with the + scheduler omitting the gate when the DB read fails; a transient error must + not wedge an otherwise-claimable task). + """ + if task_id is None: + return True, [] + try: + unfinished = db.get_unfinished_dependencies(task_id) + except Exception: + return True, [] + if not unfinished: + return True, [] + waiting_on = [ + str(d.get("work_item_id") or d.get("id")) + for d in unfinished + ] + return False, waiting_on + + +# --------------------------------------------------------------------------- +# Cycle / deadlock detection (B-3) +# --------------------------------------------------------------------------- + +def _build_adjacency(edges: list[tuple[int, int]]) -> dict[int, list[int]]: + """Build a ``task_id -> [depends_on_task_id, ...]`` adjacency map. + + Edge direction follows the dependency: an edge (B, A) means "B depends on A", + so we traverse from a dependent task towards its predecessors. A cycle in + this graph is an unsatisfiable deadlock. + """ + adj: dict[int, list[int]] = {} + for task_id, depends_on in edges: + adj.setdefault(task_id, []).append(depends_on) + return adj + + +def _find_cycle_from(start: int, adj: dict[int, list[int]]) -> list[int] | None: + """Iterative DFS from ``start``; return a cycle path if one is reachable. + + Returns the node sequence closing the cycle (e.g. ``[A, B, A]``) or None. + Iterative (explicit stack) so a pathological deep graph cannot blow the + Python recursion limit — relevant on the shared prod process. + """ + WHITE, GREY, BLACK = 0, 1, 2 + color: dict[int, int] = {} + parent: dict[int, int] = {} + # stack of (node, is_exit): is_exit=True marks the post-visit (color BLACK). + stack: list[tuple[int, bool]] = [(start, False)] + while stack: + node, is_exit = stack.pop() + if is_exit: + color[node] = BLACK + continue + if color.get(node, WHITE) != WHITE: + continue + color[node] = GREY + stack.append((node, True)) + for nxt in adj.get(node, []): + c = color.get(nxt, WHITE) + if c == GREY: + # Back-edge -> cycle. Reconstruct path nxt..node via parent. + path = [node] + cur = node + while cur != nxt and cur in parent: + cur = parent[cur] + path.append(cur) + path.reverse() + path.append(nxt) + return path + if c == WHITE: + parent[nxt] = node + stack.append((nxt, False)) + return None + + +def detect_cycle(task_id: int, edges: list[tuple[int, int]] | None = None) -> list[int] | None: + """Detect a dependency cycle reachable from ``task_id``. + + Returns the cycle path (node sequence, first == last) or None when the graph + reachable from ``task_id`` is acyclic. ``edges`` may be injected (unit tests); + otherwise the full declared edge set is read from the DB. + + never-raise: any error -> None (do not falsely claim a deadlock on an error). + """ + if task_id is None: + return None + try: + if edges is None: + edges = db.get_dependency_edges() + adj = _build_adjacency(edges) + return _find_cycle_from(task_id, adj) + except Exception: + return None + + +def find_any_cycle(edges: list[tuple[int, int]] | None = None) -> list[int] | None: + """Backstop: detect ANY cycle in the whole declared graph. + + Used by the reconciler tick to surface a deadlock even when no specific task + is being evaluated. Returns the first cycle found or None. never-raise -> None. + """ + try: + if edges is None: + edges = db.get_dependency_edges() + adj = _build_adjacency(edges) + for node in list(adj.keys()): + cyc = _find_cycle_from(node, adj) + if cyc: + return cyc + return None + except Exception: + return None + + +def _work_item_id_for(task_id: int) -> str | None: + """Best-effort ``tasks.work_item_id`` lookup for a task_id (never-raise).""" + try: + conn = db.get_db() + try: + row = conn.execute( + "SELECT work_item_id FROM tasks WHERE id = ?", (task_id,) + ).fetchone() + finally: + conn.close() + return row[0] if row and row[0] else None + except Exception: + return None + + +def handle_cycle(cycle: list[int]) -> bool: + """Escalate a detected dependency cycle: Blocked + alert (B-3, AC-G1). + + For every task in the cycle, sets its Plane issue to Blocked (best-effort) + and sends ONE Telegram alert naming the cycle, so a deadlock is visible + instead of a silent forever-wait. Does NOT mutate job_deps / stages — the + declaration is the human's to fix. never-raise: any notify/Plane error is + swallowed; the worker/reconciler never crashes. Returns True if an alert was + attempted, False on a no-op / error. + """ + if not cycle: + return False + try: + # Map task ids -> work-item ids for a human-readable chain. + labels: list[str] = [] + seen: set[int] = set() + for tid in cycle: + wi = _work_item_id_for(tid) + labels.append(wi or f"task#{tid}") + if tid not in seen: + seen.add(tid) + chain = " -> ".join(labels) + try: + from . import notifications, plane_sync + except Exception: + return False + # Blocked indication on each distinct issue in the cycle. + for tid in seen: + wi = _work_item_id_for(tid) + if wi: + try: + plane_sync.set_issue_blocked(wi) + except Exception: + pass + try: + notifications.send_telegram( + f"\U0001f6a8 ORCH-026: dependency DEADLOCK detected (cycle): {chain}. " + f"Tasks set to Blocked — fix the blocked-by declaration." + ) + except Exception: + pass + logger.error("ORCH-026: dependency cycle detected: %s", chain) + return True + except Exception: + return False + + +# --------------------------------------------------------------------------- +# Declaration (B-1) — db.add_dependency + immediate cycle escalation +# --------------------------------------------------------------------------- + +def declare_dependency(task_id: int, depends_on_task_id: int) -> bool: + """Declare "task_id (B) blocked-by depends_on_task_id (A)" and check for a cycle. + + Thin wrapper over ``db.add_dependency`` that, after a successful insert, runs + ``detect_cycle`` from the new dependent — so a freshly-introduced deadlock is + surfaced (Blocked + alert) at declaration time (best UX, ADR B-3) rather than + only by the reconciler backstop. The edge is NOT rolled back on a cycle (the + SQL gate already keeps the cyclic tasks un-claimable; the human fixes the + declaration) — we make it VISIBLE. never-raise: any error -> False. + + Returns True iff a NEW edge row was inserted (idempotent re-declaration -> + False, matching db.add_dependency). + """ + try: + inserted = db.add_dependency(task_id, depends_on_task_id) + # Always check for a cycle (even on a duplicate edge an existing cycle may + # now be relevant), but only escalate when one is actually found. + cyc = detect_cycle(task_id) + if cyc: + handle_cycle(cyc) + return inserted + except Exception: + return False + + +def ingest_plane_relations( + task_id: int, issue_id: str, project_id: str +) -> int: + """B-1 (plane/hybrid source): import Plane ``blocked-by`` relations into job_deps. + + Reads the issue's ``blocked_by`` predecessors from Plane, resolves each to a + local ``tasks.id`` (intra-repo only, v1) and declares the edge. A predecessor + not yet known locally (no task row) is SKIPPED — the scheduler can only gate + on tasks it knows; a re-run after that task is created will pick it up. + + Active ONLY when ``task_deps_source`` is ``plane`` or ``hybrid`` (default + ``db`` -> no Plane call on the hot creation path). never-raise (self-hosting): + any error -> 0 edges, the pipeline start is never blocked by Plane. Returns + the number of edges declared. + """ + source = (getattr(settings, "task_deps_source", "db") or "db").strip().lower() + if source not in ("plane", "hybrid"): + return 0 + if not issue_id or not project_id: + return 0 + try: + from . import plane_sync + blocked_by = plane_sync.fetch_blocked_by_issue_ids(issue_id, project_id) + except Exception: + return 0 + declared = 0 + for pred_issue in blocked_by: + try: + pred = db.get_task_by_plane_id(str(pred_issue)) + if not pred: + continue + if declare_dependency(task_id, pred["id"]): + declared += 1 + except Exception: + continue + return declared + + +# --------------------------------------------------------------------------- +# Observability (/queue snapshot, G-2) +# --------------------------------------------------------------------------- + +def snapshot() -> dict: + """Read-only summary of the dependency subsystem for GET /queue (G-2). + + Returns a dict (NOT a source of truth — pure observability): + * ``enabled`` — task_deps_enabled flag; + * ``source`` — task_deps_source (db|plane|hybrid); + * ``edges`` — number of declared edges; + * ``blocked_tasks`` — list of ``{task_id, work_item_id, waiting_on}`` for + tasks with at least one un-done predecessor; + * ``cycle`` — a detected cycle path (work-item labels) or None. + + never-raise: any error -> a minimal dict with the flags and empty data. + """ + enabled = bool(getattr(settings, "task_deps_enabled", False)) + source = getattr(settings, "task_deps_source", "db") + try: + edges = db.get_dependency_edges() + blocked: list[dict] = [] + for task_id in {e[0] for e in edges}: + ready, waiting_on = is_task_ready(task_id) + if not ready: + blocked.append({ + "task_id": task_id, + "work_item_id": _work_item_id_for(task_id), + "waiting_on": waiting_on, + }) + cyc = find_any_cycle(edges) + cycle_labels = None + if cyc: + cycle_labels = [(_work_item_id_for(t) or f"task#{t}") for t in cyc] + return { + "enabled": enabled, + "source": source, + "edges": len(edges), + "blocked_tasks": blocked, + "cycle": cycle_labels, + } + except Exception: + return { + "enabled": enabled, + "source": source, + "edges": 0, + "blocked_tasks": [], + "cycle": None, + } diff --git a/src/webhooks/plane.py b/src/webhooks/plane.py index 4bdaf0c..c56bbbd 100644 --- a/src/webhooks/plane.py +++ b/src/webhooks/plane.py @@ -608,6 +608,17 @@ async def start_pipeline(data: dict, project_id: str = ""): except Exception as e: logger.error(f"Failed to launch analyst for {work_item_id}: {e}") + # ORCH-026 (B-1): import declared Plane `blocked-by` relations into job_deps + # (only for task_deps_source = plane|hybrid; default `db` -> no-op, no Plane + # call). Best-effort, never-raise: a Plane outage must not block the start. + try: + from .. import task_deps + n = task_deps.ingest_plane_relations(task_id, plane_id, plane_project_id) + if n: + logger.info(f"Task {task_id}: ingested {n} blocked-by dependency edge(s)") + except Exception as e: + logger.warning(f"Task {task_id}: dependency ingestion skipped: {e}") + async def handle_comment(data: dict, project_id: str = ""): """Status-only verdict model: comments NEVER drive the pipeline. diff --git a/tests/test_merge_gate_race.py b/tests/test_merge_gate_race.py index 8a885f6..398909f 100644 --- a/tests/test_merge_gate_race.py +++ b/tests/test_merge_gate_race.py @@ -58,6 +58,11 @@ def race_repo(tmp_path, monkeypatch): monkeypatch.setattr(qg.settings, "merge_gate_enabled", True) monkeypatch.setattr(qg.settings, "merge_gate_repos", repo) monkeypatch.setattr(merge_gate.settings, "merge_lock_timeout_s", 300) + # ORCH-026: this redrive test asserts the ORCH-043 ancestor-based short-circuit + # ("already caught up" -> skip expensive re-test). Pin the always-rebase + # kill-switch OFF so the legacy short-circuit path is exercised here; the new + # default (True) is covered by tests/test_orch026_premerge_rebase.py (TC-A01). + monkeypatch.setattr(qg.settings, "premerge_rebase_always", False, raising=False) origin = tmp_path / "origin.git" subprocess.run(["git", "init", "--bare", "-b", "main", str(origin)], capture_output=True) diff --git a/tests/test_orch026_conditionality.py b/tests/test_orch026_conditionality.py new file mode 100644 index 0000000..d97f328 --- /dev/null +++ b/tests/test_orch026_conditionality.py @@ -0,0 +1,118 @@ +"""ORCH-026 conditionality / self-hosting safety (TC-A06, TC-A07). + +TC-A06 kill-switch / out-of-scope: with the flag off (or for a repo outside the + merge-gate scope) the merge path behaves 1:1 as before ORCH-026 — no-op. +TC-A07 self-hosting safety: the new Level-A logic never pushes to main; the only + force op stays --force-with-lease on the task branch; STAGE_TRANSITIONS + and the QG_CHECKS registry are unchanged. +""" +import os +import tempfile + +os.environ.setdefault("ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_orch026_cond.db")) +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +from src import merge_gate # noqa: E402 +from src.qg import checks # noqa: E402 + + +# ----------------------------------------------------------------- TC-A06 +def test_out_of_scope_repo_is_noop_even_with_flag_on(monkeypatch): + """A repo outside merge_gate scope -> N/A pass, regardless of premerge flag.""" + monkeypatch.setattr(checks.settings, "merge_gate_enabled", True, raising=False) + monkeypatch.setattr(checks.settings, "merge_gate_repos", "orchestrator", raising=False) + monkeypatch.setattr(checks.settings, "premerge_rebase_always", True, raising=False) + + # enduro-trails is NOT in the scope -> no lease, no rebase, just N/A. + called = {"acquire": 0, "rebase": 0} + monkeypatch.setattr(merge_gate, "acquire_merge_lease", + lambda *a, **k: (called.__setitem__("acquire", called["acquire"] + 1), (True, "x"))[1], + raising=False) + monkeypatch.setattr(merge_gate, "auto_rebase_onto_main", + lambda *a, **k: (called.__setitem__("rebase", called["rebase"] + 1), (True, "x"))[1], + raising=False) + ok, reason = checks.check_branch_mergeable("enduro-trails", "ET-1", "feature/e") + assert ok is True + assert "N/A" in reason + assert called["acquire"] == 0 and called["rebase"] == 0 + + +def test_task_deps_kill_switch_omits_gate(monkeypatch): + """task_deps_enabled=False -> claim_next_job query is the ORCH-1 query (no gate).""" + import src.db as db + monkeypatch.setattr(db.settings, "task_deps_enabled", False, raising=False) + # Inspect the SQL the claim builds by stubbing the connection. + captured = {} + + class _FakeConn: + def execute(self, sql, *a): + captured.setdefault("sql", sql) + + class _R: + def fetchone(self_inner): + return None + return _R() + + def commit(self): + pass + + def close(self): + pass + + monkeypatch.setattr(db, "get_db", lambda: _FakeConn()) + db.claim_next_job() + assert "NOT EXISTS" not in captured["sql"], "gate must be omitted when disabled" + + +def test_task_deps_enabled_adds_gate(monkeypatch): + import src.db as db + monkeypatch.setattr(db.settings, "task_deps_enabled", True, raising=False) + captured = {} + + class _FakeConn: + def execute(self, sql, *a): + captured.setdefault("sql", sql) + + class _R: + def fetchone(self_inner): + return None + return _R() + + def commit(self): + pass + + def close(self): + pass + + monkeypatch.setattr(db, "get_db", lambda: _FakeConn()) + db.claim_next_job() + assert "NOT EXISTS" in captured["sql"], "gate must be present when enabled" + assert "job_deps" in captured["sql"] + + +# ----------------------------------------------------------------- TC-A07 +def test_stage_transitions_unchanged(): + """ORCH-026 must not touch the state machine (AC-A5).""" + from src.stages import STAGE_TRANSITIONS + # The canonical happy-path edges must still exist exactly. + assert STAGE_TRANSITIONS["deploy-staging"]["next"] == "deploy" + assert STAGE_TRANSITIONS["deploy"]["next"] == "done" + assert STAGE_TRANSITIONS["development"]["next"] == "review" + + +def test_qg_registry_has_no_new_dep_gate(): + """The dependency gate is врезка in claim_next_job, NOT a registered QG.""" + from src.qg.checks import QG_CHECKS + joined = " ".join(QG_CHECKS.keys()) + assert "task_dep" not in joined and "dependency" not in joined + + +def test_premerge_only_force_with_lease_on_branch(): + """auto_rebase_onto_main never pushes to main; force is --force-with-lease only.""" + import inspect + src = inspect.getsource(merge_gate.auto_rebase_onto_main) + assert "--force-with-lease" in src + # No raw 'push origin main' / force-push to main in the rebase path. + assert "push origin main" not in src + assert "--force " not in src # plain --force (not -with-lease) is forbidden diff --git a/tests/test_orch026_dep_cycles.py b/tests/test_orch026_dep_cycles.py new file mode 100644 index 0000000..7254b39 --- /dev/null +++ b/tests/test_orch026_dep_cycles.py @@ -0,0 +1,136 @@ +"""ORCH-026 Level B — dependency cycle / deadlock detection (TC-B03, TC-B04). + +TC-B03 detect_cycle is deterministic: A->B->A (and longer) is detected; an + acyclic graph yields None. Pure function (edges injected). +TC-B04 a detected cycle escalates: set_issue_blocked + a Telegram alert, with + no worker crash and no blocking of other tasks (never-raise). +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_cycles.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import task_deps # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "cycles.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + monkeypatch.setattr(db.settings, "task_deps_enabled", True, raising=False) + init_db() + yield + + +# ----------------------------------------------------------------- TC-B03 +def test_detect_two_node_cycle(): + # edge (B, A) means "B depends on A"; 1->2 and 2->1 is a 2-cycle. + edges = [(1, 2), (2, 1)] + cyc = task_deps.detect_cycle(1, edges=edges) + assert cyc is not None + assert cyc[0] == cyc[-1] # closed cycle + assert set(cyc) == {1, 2} + + +def test_detect_longer_cycle(): + edges = [(1, 2), (2, 3), (3, 1)] + cyc = task_deps.detect_cycle(1, edges=edges) + assert cyc is not None + assert set(cyc) >= {1, 2, 3} + + +def test_acyclic_graph_has_no_cycle(): + edges = [(1, 2), (2, 3), (1, 3)] # DAG + assert task_deps.detect_cycle(1, edges=edges) is None + assert task_deps.find_any_cycle(edges=edges) is None + + +def test_find_any_cycle_scans_whole_graph(): + # A disconnected cycle 10<->11 not reachable from node 1. + edges = [(1, 2), (10, 11), (11, 10)] + assert task_deps.detect_cycle(1, edges=edges) is None + cyc = task_deps.find_any_cycle(edges=edges) + assert cyc is not None + assert set(cyc) == {10, 11} + + +def test_detect_cycle_never_raises_on_garbage(): + assert task_deps.detect_cycle(None) is None + # Malformed edge list -> swallowed -> None. + assert task_deps.detect_cycle(1, edges="not-a-list") is None + + +# ----------------------------------------------------------------- TC-B04 +def _make_task(work_item_id, stage="development"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " + "VALUES (?, ?, ?, ?, ?)", + (work_item_id, work_item_id, "orchestrator", f"feature/{work_item_id}", stage), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def test_handle_cycle_blocks_and_alerts(monkeypatch): + a = _make_task("ORCH-60") + b = _make_task("ORCH-61") + db.add_dependency(a, b) + db.add_dependency(b, a) # cycle a<->b + + blocked = [] + alerts = [] + import src.plane_sync as plane_sync + import src.notifications as notifications + monkeypatch.setattr(plane_sync, "set_issue_blocked", + lambda wi, *a, **k: blocked.append(wi), raising=False) + monkeypatch.setattr(notifications, "send_telegram", + lambda text, *a, **k: alerts.append(text), raising=False) + + cyc = task_deps.detect_cycle(a) + assert cyc is not None + ok = task_deps.handle_cycle(cyc) + assert ok is True + assert set(blocked) == {"ORCH-60", "ORCH-61"} + assert len(alerts) == 1 + assert "ORCH-60" in alerts[0] and "ORCH-61" in alerts[0] + + +def test_handle_cycle_never_raises_when_notify_fails(monkeypatch): + a = _make_task("ORCH-70") + b = _make_task("ORCH-71") + db.add_dependency(a, b) + db.add_dependency(b, a) + import src.plane_sync as plane_sync + import src.notifications as notifications + + def _boom(*a, **k): + raise RuntimeError("plane down") + + monkeypatch.setattr(plane_sync, "set_issue_blocked", _boom, raising=False) + monkeypatch.setattr(notifications, "send_telegram", _boom, raising=False) + cyc = task_deps.detect_cycle(a) + # Must not propagate the exception (AC-G1). + assert task_deps.handle_cycle(cyc) in (True, False) + + +def test_declare_dependency_escalates_cycle(monkeypatch): + """declare_dependency surfaces a freshly-introduced cycle at declaration.""" + a = _make_task("ORCH-80") + b = _make_task("ORCH-81") + handled = [] + monkeypatch.setattr(task_deps, "handle_cycle", + lambda cyc: handled.append(cyc), raising=False) + assert task_deps.declare_dependency(a, b) is True + assert handled == [] # no cycle yet + # Closing the loop -> handle_cycle invoked. + assert task_deps.declare_dependency(b, a) is True + assert len(handled) == 1 diff --git a/tests/test_orch026_dep_visibility.py b/tests/test_orch026_dep_visibility.py new file mode 100644 index 0000000..3b3c1a0 --- /dev/null +++ b/tests/test_orch026_dep_visibility.py @@ -0,0 +1,79 @@ +"""ORCH-026 Level B — blocked-task visibility (TC-B06). + +A dep-blocked task surfaces a waiting-line ("⏳ ждёт ORCH-NNN") in its single +Telegram tracker card; the "one card per task" invariant is preserved (the line +is added to the SAME render, not a new message). Render is never broken by the +dependency lookup (never-raise). +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_visibility.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import notifications # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "vis.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + monkeypatch.setattr(db.settings, "task_deps_enabled", True, raising=False) + init_db() + yield + + +def _make_task(work_item_id, stage="development"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) " + "VALUES (?, ?, ?, ?, ?, ?)", + (work_item_id, work_item_id, "orchestrator", f"feature/{work_item_id}", + stage, f"title {work_item_id}"), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def test_blocked_task_shows_waiting_line(): + a = _make_task("ORCH-90", stage="development") + b = _make_task("ORCH-91", stage="development") + db.add_dependency(b, a) + text = notifications.render_task_tracker(b) + assert "ждёт" in text + assert "ORCH-90" in text + + +def test_ready_task_has_no_waiting_line(): + a = _make_task("ORCH-92", stage="done") + b = _make_task("ORCH-93", stage="development") + db.add_dependency(b, a) + text = notifications.render_task_tracker(b) + assert "ждёт" not in text + + +def test_done_task_has_no_waiting_line(): + a = _make_task("ORCH-94", stage="development") + b = _make_task("ORCH-95", stage="done") + db.add_dependency(b, a) + text = notifications.render_task_tracker(b) + # A done task is terminal -> the waiting-line branch is skipped entirely. + assert "ждёт" not in text + + +def test_render_never_raises_on_dep_error(monkeypatch): + b = _make_task("ORCH-96", stage="development") + from src import task_deps + monkeypatch.setattr(task_deps, "is_task_ready", + lambda tid: (_ for _ in ()).throw(RuntimeError("boom")), + raising=False) + # Must still produce a card (no crash). + text = notifications.render_task_tracker(b) + assert "ORCH-96" in text diff --git a/tests/test_orch026_deps_integration.py b/tests/test_orch026_deps_integration.py new file mode 100644 index 0000000..95c63f5 --- /dev/null +++ b/tests/test_orch026_deps_integration.py @@ -0,0 +1,124 @@ +"""ORCH-026 Level B — declarative dependencies integration (TC-B08). + +End-to-end (DB level): B declared blocked-by A; queued B does not start until A +is 'done'; after A->done the worker can claim B. Also covers the plane/hybrid +ingestion path: Plane `blocked-by` relations are resolved to local task ids and +written into job_deps (the scheduler then reads only the DB). +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_depsint.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402 +from src import task_deps # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "depsint.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + monkeypatch.setattr(db.settings, "task_deps_enabled", True, raising=False) + init_db() + yield + + +def _make_task(work_item_id, stage="development", plane_id=None): + conn = get_db() + pid = plane_id or work_item_id + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, plane_issue_id) " + "VALUES (?, ?, ?, ?, ?, ?)", + (pid, work_item_id, "orchestrator", f"feature/{work_item_id}", stage, pid), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def _set_stage(task_id, stage): + conn = get_db() + conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id)) + conn.commit() + conn.close() + + +def test_b_waits_for_a_then_runs(): + a = _make_task("ORCH-200", stage="development") + b = _make_task("ORCH-201", stage="development") + db.add_dependency(b, a) + + job_b = enqueue_job("developer", "orchestrator", "do B", task_id=b) + # While A is in flight, B is not claimable. + assert claim_next_job() is None + ready, waiting = task_deps.is_task_ready(b) + assert ready is False and "ORCH-200" in waiting + + # A advances through to done. + _set_stage(a, "review") + assert claim_next_job() is None # still not terminal + _set_stage(a, "done") + + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b + + +def test_multiple_predecessors_all_must_be_done(): + a1 = _make_task("ORCH-210", stage="development") + a2 = _make_task("ORCH-211", stage="development") + b = _make_task("ORCH-212", stage="development") + db.add_dependency(b, a1) + db.add_dependency(b, a2) + job_b = enqueue_job("developer", "orchestrator", "B", task_id=b) + + _set_stage(a1, "done") + assert claim_next_job() is None, "still blocked by a2" + _set_stage(a2, "done") + claimed = claim_next_job() + assert claimed is not None and claimed["id"] == job_b + + +# ---- plane/hybrid ingestion path (TC-B01) --------------------------------- +def test_ingest_plane_relations_writes_db(monkeypatch): + monkeypatch.setattr(db.settings, "task_deps_source", "hybrid", raising=False) + a = _make_task("ORCH-220", stage="development", plane_id="plane-uuid-A") + b = _make_task("ORCH-221", stage="development", plane_id="plane-uuid-B") + + import src.plane_sync as plane_sync + monkeypatch.setattr(plane_sync, "fetch_blocked_by_issue_ids", + lambda issue_id, project_id, **k: ["plane-uuid-A"], + raising=False) + n = task_deps.ingest_plane_relations(b, "plane-uuid-B", "proj-1") + assert n == 1 + assert db.get_dependencies(b) == [a] + + +def test_ingest_noop_when_source_db(monkeypatch): + monkeypatch.setattr(db.settings, "task_deps_source", "db", raising=False) + b = _make_task("ORCH-230", stage="development", plane_id="plane-uuid-Z") + import src.plane_sync as plane_sync + called = {"n": 0} + monkeypatch.setattr(plane_sync, "fetch_blocked_by_issue_ids", + lambda *a, **k: called.__setitem__("n", called["n"] + 1) or [], + raising=False) + n = task_deps.ingest_plane_relations(b, "plane-uuid-Z", "proj-1") + assert n == 0 + assert called["n"] == 0, "default db source must not call Plane" + + +def test_ingest_never_raises_on_plane_outage(monkeypatch): + monkeypatch.setattr(db.settings, "task_deps_source", "plane", raising=False) + b = _make_task("ORCH-240", stage="development", plane_id="plane-uuid-Y") + import src.plane_sync as plane_sync + + def _boom(*a, **k): + raise RuntimeError("plane down") + + monkeypatch.setattr(plane_sync, "fetch_blocked_by_issue_ids", _boom, raising=False) + assert task_deps.ingest_plane_relations(b, "plane-uuid-Y", "proj-1") == 0 diff --git a/tests/test_orch026_merge_serialize.py b/tests/test_orch026_merge_serialize.py new file mode 100644 index 0000000..87a54b9 --- /dev/null +++ b/tests/test_orch026_merge_serialize.py @@ -0,0 +1,95 @@ +"""ORCH-026 Level A serialization (TC-A02..A05). + +The merge-lease window (ORCH-043/065) is what serialises "merge -> main-updated" +per repo; ORCH-026 reuses it unchanged. These tests confirm the properties the +ADR relies on: + + TC-A02 extended window: while A holds the lease, B of the SAME repo gets + "merge-lock busy" -> defer (not rollback); holder-aware release does + NOT delete A's lease. + TC-A03 strict per-repo: an orchestrator lease never blocks an enduro-trails + acquire (both claimable in parallel). + TC-A04 restart-safe + proactive reclaim: a dead holder's lease is reclaimed + (reclaim_stale_lease) so the pipeline never wedges forever. + TC-A05 anti-livelock defer budget: merge_defer_max_attempts is bounded and + positive -> exhaustion escalates instead of looping forever. +""" +import os +import tempfile + +import pytest + +os.environ.setdefault("ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_orch026_serialize.db")) +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +from src import merge_gate # noqa: E402 + + +@pytest.fixture +def leases_dir(tmp_path, monkeypatch): + monkeypatch.setattr(merge_gate.settings, "repos_dir", str(tmp_path), raising=False) + monkeypatch.setattr(merge_gate.settings, "merge_lock_timeout_s", 300, raising=False) + monkeypatch.setattr(merge_gate.settings, "merge_gate_repos", "", raising=False) + monkeypatch.setattr(merge_gate.settings, "lease_reclaim_enabled", True, raising=False) + return tmp_path + + +# ----------------------------------------------------------------- TC-A02 +def test_second_task_same_repo_defers_not_rollback(leases_dir): + okA, reasonA = merge_gate.acquire_merge_lease("orchestrator", "feature/A", "ORCH-1") + assert okA is True + + okB, reasonB = merge_gate.acquire_merge_lease("orchestrator", "feature/B", "ORCH-2") + assert okB is False + assert reasonB == "merge-lock busy" # -> caller DEFERS, never a rollback signal + + +def test_holder_aware_release_keeps_foreign_lease(leases_dir): + merge_gate.acquire_merge_lease("orchestrator", "feature/A", "ORCH-1") + # A delayed release from B (which never held it) must NOT delete A's lease. + merge_gate.release_merge_lease("orchestrator", "feature/B") + okB, reasonB = merge_gate.acquire_merge_lease("orchestrator", "feature/B", "ORCH-2") + assert okB is False and reasonB == "merge-lock busy" + # A's own release frees it. + merge_gate.release_merge_lease("orchestrator", "feature/A") + okB2, _ = merge_gate.acquire_merge_lease("orchestrator", "feature/B", "ORCH-2") + assert okB2 is True + + +# ----------------------------------------------------------------- TC-A03 +def test_serialization_is_strictly_per_repo(leases_dir): + okA, _ = merge_gate.acquire_merge_lease("orchestrator", "feature/A", "ORCH-1") + okET, _ = merge_gate.acquire_merge_lease("enduro-trails", "feature/E", "ET-1") + assert okA is True + assert okET is True, "a different repo must be claimable in parallel (AC-A3)" + + +# ----------------------------------------------------------------- TC-A04 +def test_dead_holder_lease_is_reclaimed(leases_dir, monkeypatch): + merge_gate.acquire_merge_lease("orchestrator", "feature/A", "ORCH-1") + # Holder pid is THIS process; simulate it being dead. + monkeypatch.setattr(merge_gate, "pid_alive", lambda pid: False, raising=False) + reclaimed = merge_gate.reclaim_stale_lease("orchestrator") + assert reclaimed is True + # After reclaim B can acquire -> pipeline does not wedge forever. + okB, _ = merge_gate.acquire_merge_lease("orchestrator", "feature/B", "ORCH-2") + assert okB is True + + +def test_stale_lease_age_reclaimed_on_acquire(leases_dir, monkeypatch): + # A very short timeout makes the existing lease look stale on B's acquire. + merge_gate.acquire_merge_lease("orchestrator", "feature/A", "ORCH-1") + monkeypatch.setattr(merge_gate.settings, "merge_lock_timeout_s", 0, raising=False) + okB, reasonB = merge_gate.acquire_merge_lease("orchestrator", "feature/B", "ORCH-2") + assert okB is True + assert "reclaimed" in reasonB + + +# ----------------------------------------------------------------- TC-A05 +def test_defer_budget_is_bounded(monkeypatch): + """The defer budget is a positive finite int -> exhaustion escalates (AC-A6).""" + from src.config import settings + assert isinstance(settings.merge_defer_max_attempts, int) + assert settings.merge_defer_max_attempts > 0 + assert settings.merge_defer_delay_s > 0 diff --git a/tests/test_orch026_migration.py b/tests/test_orch026_migration.py new file mode 100644 index 0000000..606ac2f --- /dev/null +++ b/tests/test_orch026_migration.py @@ -0,0 +1,83 @@ +"""ORCH-026 — additive job_deps migration (TC-G01, AC-G4). + +The migration must be additive (CREATE TABLE/INDEX IF NOT EXISTS), idempotent, +and safe on a pre-existing DB with data: existing columns of jobs/tasks/ +agent_runs/events are untouched. +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_migration.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db # noqa: E402 + + +@pytest.fixture +def dbfile(tmp_path, monkeypatch): + f = tmp_path / "mig.db" + monkeypatch.setattr(db.settings, "db_path", str(f)) + return f + + +def _columns(conn, table): + return [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] + + +def test_job_deps_table_created(dbfile): + init_db() + conn = get_db() + cols = _columns(conn, "job_deps") + conn.close() + assert set(cols) == {"task_id", "depends_on_task_id", "created_at"} + + +def test_job_deps_indices_created(dbfile): + init_db() + conn = get_db() + idx = {r[1] for r in conn.execute("PRAGMA index_list(job_deps)").fetchall()} + conn.close() + assert "idx_job_deps_task" in idx + assert "idx_job_deps_depends" in idx + + +def test_primary_key_idempotent_insert(dbfile): + init_db() + conn = get_db() + conn.execute("INSERT OR IGNORE INTO job_deps (task_id, depends_on_task_id) VALUES (1, 2)") + conn.execute("INSERT OR IGNORE INTO job_deps (task_id, depends_on_task_id) VALUES (1, 2)") + conn.commit() + n = conn.execute("SELECT COUNT(*) FROM job_deps").fetchone()[0] + conn.close() + assert n == 1, "PK (task_id, depends_on_task_id) prevents dup rows" + + +def test_migration_idempotent_and_preserves_data(dbfile): + # First init + seed legacy data. + init_db() + conn = get_db() + conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " + "VALUES ('ET-1','ET-1','enduro-trails','feature/x','development')" + ) + conn.execute( + "INSERT INTO jobs (agent, repo, status) VALUES ('developer','enduro-trails','queued')" + ) + conn.commit() + tasks_cols_before = _columns(conn, "tasks") + jobs_cols_before = _columns(conn, "jobs") + conn.close() + + # Re-run init_db (simulates a restart on a live DB) -> must be a no-op. + init_db() + conn = get_db() + assert _columns(conn, "tasks") == tasks_cols_before, "tasks columns unchanged" + assert _columns(conn, "jobs") == jobs_cols_before, "jobs columns unchanged" + # Legacy data survives. + assert conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0] == 1 + assert conn.execute("SELECT COUNT(*) FROM jobs").fetchone()[0] == 1 + conn.close() diff --git a/tests/test_orch026_premerge_rebase.py b/tests/test_orch026_premerge_rebase.py new file mode 100644 index 0000000..d257c75 --- /dev/null +++ b/tests/test_orch026_premerge_rebase.py @@ -0,0 +1,82 @@ +"""ORCH-026 Level A (TC-A01): proactive pre-merge rebase. + +check_branch_mergeable must ALWAYS rebase the task branch onto the current +origin/main under the held merge-lease when ``premerge_rebase_always`` is on — +even when ``branch_is_behind_main`` would short-circuit (no conflict, formally +not behind). With the flag OFF the ORCH-043 short-circuit is restored 1:1. + +These are pure unit tests: every merge_gate primitive is monkeypatched, so no +git/network is touched — we assert the CONTROL FLOW (was auto_rebase_onto_main +called?) and the verdict. +""" +import os +import tempfile + +import pytest + +os.environ.setdefault("ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_orch026_premerge.db")) +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +from src import merge_gate # noqa: E402 +from src.qg import checks # noqa: E402 + + +@pytest.fixture +def patched_gate(monkeypatch): + """Patch merge_gate primitives; record whether auto_rebase ran.""" + calls = {"rebase": 0, "retest": 0, "released": 0, "behind_checked": 0} + + monkeypatch.setattr(checks.settings, "merge_gate_enabled", True, raising=False) + monkeypatch.setattr(checks.settings, "merge_gate_repos", "", raising=False) + + monkeypatch.setattr(merge_gate, "acquire_merge_lease", + lambda *a, **k: (True, "lease acquired"), raising=False) + + def _behind(repo, branch): + calls["behind_checked"] += 1 + return False # NOT behind -> ORCH-043 would short-circuit + + def _rebase(repo, branch): + calls["rebase"] += 1 + return True, "rebased (noop)" + + def _retest(repo, branch): + calls["retest"] += 1 + return True, "green" + + def _release(repo, branch=None): + calls["released"] += 1 + + monkeypatch.setattr(merge_gate, "branch_is_behind_main", _behind, raising=False) + monkeypatch.setattr(merge_gate, "auto_rebase_onto_main", _rebase, raising=False) + monkeypatch.setattr(merge_gate, "retest_branch", _retest, raising=False) + monkeypatch.setattr(merge_gate, "release_merge_lease", _release, raising=False) + return calls + + +def test_always_rebases_even_when_not_behind(patched_gate, monkeypatch): + """premerge_rebase_always=True -> auto_rebase_onto_main ALWAYS called (AC-A2).""" + monkeypatch.setattr(checks.settings, "premerge_rebase_always", True, raising=False) + ok, reason = checks.check_branch_mergeable("orchestrator", "ORCH-026", "feature/x") + assert ok is True + assert patched_gate["rebase"] == 1, "rebase must run even when not behind" + assert patched_gate["retest"] == 1, "re-test must run after the proactive rebase" + + +def test_flag_off_short_circuits_like_orch043(patched_gate, monkeypatch): + """premerge_rebase_always=False -> not-behind short-circuit, no rebase (AC-A7).""" + monkeypatch.setattr(checks.settings, "premerge_rebase_always", False, raising=False) + ok, reason = checks.check_branch_mergeable("orchestrator", "ORCH-026", "feature/x") + assert ok is True + assert reason == "branch up-to-date with main" + assert patched_gate["rebase"] == 0, "must NOT rebase when not behind and flag off" + + +def test_disabled_gate_is_noop(monkeypatch): + """merge_gate_enabled=False -> pass-through, no lease/rebase at all (AC-G2).""" + monkeypatch.setattr(checks.settings, "merge_gate_enabled", False, raising=False) + monkeypatch.setattr(checks.settings, "premerge_rebase_always", True, raising=False) + ok, reason = checks.check_branch_mergeable("orchestrator", "ORCH-026", "feature/x") + assert ok is True + assert "disabled" in reason diff --git a/tests/test_orch026_queue_observability.py b/tests/test_orch026_queue_observability.py new file mode 100644 index 0000000..bb204f5 --- /dev/null +++ b/tests/test_orch026_queue_observability.py @@ -0,0 +1,90 @@ +"""ORCH-026 — /queue task_deps observability (TC-G02, G-2). + +task_deps.snapshot() is a read-only summary (NOT a source of truth) exposing the +declared edges, blocked tasks and any detected cycle. It must never raise. +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_queue_obs.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db # noqa: E402 +from src import task_deps # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "obs.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + monkeypatch.setattr(db.settings, "task_deps_enabled", True, raising=False) + monkeypatch.setattr(db.settings, "task_deps_source", "db", raising=False) + init_db() + yield + + +def _make_task(work_item_id, stage="development"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " + "VALUES (?, ?, ?, ?, ?)", + (work_item_id, work_item_id, "orchestrator", f"feature/{work_item_id}", stage), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def test_snapshot_shape_empty(): + snap = task_deps.snapshot() + assert snap["enabled"] is True + assert snap["source"] == "db" + assert snap["edges"] == 0 + assert snap["blocked_tasks"] == [] + assert snap["cycle"] is None + + +def test_snapshot_reports_blocked_task(): + a = _make_task("ORCH-100", stage="development") + b = _make_task("ORCH-101", stage="development") + db.add_dependency(b, a) + snap = task_deps.snapshot() + assert snap["edges"] == 1 + assert len(snap["blocked_tasks"]) == 1 + bt = snap["blocked_tasks"][0] + assert bt["work_item_id"] == "ORCH-101" + assert "ORCH-100" in bt["waiting_on"] + assert snap["cycle"] is None + + +def test_snapshot_reports_cycle(): + a = _make_task("ORCH-102") + b = _make_task("ORCH-103") + db.add_dependency(a, b) + db.add_dependency(b, a) + snap = task_deps.snapshot() + assert snap["cycle"] is not None + assert "ORCH-102" in snap["cycle"] or "ORCH-103" in snap["cycle"] + + +def test_snapshot_never_raises(monkeypatch): + monkeypatch.setattr(db, "get_dependency_edges", + lambda: (_ for _ in ()).throw(RuntimeError("db down")), + raising=False) + snap = task_deps.snapshot() + assert snap["edges"] == 0 + assert snap["blocked_tasks"] == [] + + +def test_queue_endpoint_includes_task_deps(monkeypatch): + """GET /queue payload carries the task_deps block (read-only).""" + import asyncio + from src import main + payload = asyncio.run(main.queue()) + assert "task_deps" in payload + assert "enabled" in payload["task_deps"] diff --git a/tests/test_orch026_serialize_integration.py b/tests/test_orch026_serialize_integration.py new file mode 100644 index 0000000..7fdcde1 --- /dev/null +++ b/tests/test_orch026_serialize_integration.py @@ -0,0 +1,65 @@ +"""ORCH-026 Level A — serialization integration (TC-A08). + +Scenario (no network, lease + gate level): two tasks of the SAME repo race for +the merge edge. While A holds the merge-lease (the merge->main-updated window), +B's check_branch_mergeable returns "merge-lock busy" -> the engine DEFERS B (it +does NOT roll back). After A releases (A reached main / done), B acquires, is +proactively rebased onto the now-current main (carrying A's code) and merges. +""" +import os +import tempfile + +import pytest + +os.environ.setdefault("ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_orch026_serint.db")) +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +from src import merge_gate # noqa: E402 +from src.qg import checks # noqa: E402 + + +@pytest.fixture +def env(tmp_path, monkeypatch): + monkeypatch.setattr(merge_gate.settings, "repos_dir", str(tmp_path), raising=False) + monkeypatch.setattr(merge_gate.settings, "merge_lock_timeout_s", 300, raising=False) + monkeypatch.setattr(checks.settings, "merge_gate_enabled", True, raising=False) + monkeypatch.setattr(checks.settings, "merge_gate_repos", "", raising=False) + monkeypatch.setattr(checks.settings, "premerge_rebase_always", True, raising=False) + # Make the git/test primitives deterministic no-ops; A's rebase is a no-op, + # B's rebase is the real "catch up to A's code". + monkeypatch.setattr(merge_gate, "branch_is_behind_main", lambda r, b: False, raising=False) + monkeypatch.setattr(merge_gate, "auto_rebase_onto_main", lambda r, b: (True, "ok"), raising=False) + monkeypatch.setattr(merge_gate, "retest_branch", lambda r, b: (True, "green"), raising=False) + return tmp_path + + +def test_serialized_merge_window(env, monkeypatch): + repo = "orchestrator" + # A reaches the merge edge first: gate passes and HOLDS the lease. + okA, reasonA = checks.check_branch_mergeable(repo, "ORCH-1", "feature/A") + assert okA is True + # Lease is held by A. + assert merge_gate._read_lease(merge_gate._lease_path(repo))["branch"] == "feature/A" + + # B reaches the merge edge while A still holds the window -> busy -> DEFER. + okB, reasonB = checks.check_branch_mergeable(repo, "ORCH-2", "feature/B") + assert okB is False + assert reasonB == "merge-lock busy" # NOT a rollback; engine re-queues via available_at + # B's defer must NOT have stolen / cleared A's lease. + assert merge_gate._read_lease(merge_gate._lease_path(repo))["branch"] == "feature/A" + + # A completes (PR merged / deploy->done) -> lease released. + merge_gate.release_merge_lease(repo, "feature/A") + + # B retries: now acquires, is proactively rebased onto current main, merges. + rebased = {"called": 0} + + def _rebase(r, b): + rebased["called"] += 1 + return True, "rebased onto A" + + monkeypatch.setattr(merge_gate, "auto_rebase_onto_main", _rebase, raising=False) + okB2, reasonB2 = checks.check_branch_mergeable(repo, "ORCH-2", "feature/B") + assert okB2 is True + assert rebased["called"] == 1, "B must be proactively rebased onto the fresh main (A's code)" diff --git a/tests/test_orch026_task_deps.py b/tests/test_orch026_task_deps.py new file mode 100644 index 0000000..d9d9146 --- /dev/null +++ b/tests/test_orch026_task_deps.py @@ -0,0 +1,157 @@ +"""ORCH-026 Level B — declarative task dependencies (TC-B01/B02/B05/B07). + +Real SQLite (tmp db). We drive tasks + job_deps directly and assert: + TC-B01 add_dependency declares an edge; get_dependencies resolves it; a + self-edge is rejected; never-raise on a bad input. + TC-B02 is_task_ready: a task with an un-done predecessor is NOT ready; when + every predecessor reaches 'done' it becomes ready. + TC-B05 claim_next_job does NOT claim a dep-blocked job (no slot taken); once + the predecessor is 'done' the job becomes claimable. + TC-B07 reconciler skip helper: is_task_ready=False is honoured (the gate task + is left waiting). +""" +import os +import tempfile + +import pytest + +os.environ["ORCH_DB_PATH"] = os.path.join(tempfile.gettempdir(), "test_orch026_task_deps.db") +os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token") +os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token") + +import src.db as db # noqa: E402 +from src.db import init_db, get_db, enqueue_job, claim_next_job # noqa: E402 +from src import task_deps # noqa: E402 + + +@pytest.fixture(autouse=True) +def fresh_db(tmp_path, monkeypatch): + dbfile = tmp_path / "deps.db" + monkeypatch.setattr(db.settings, "db_path", str(dbfile)) + monkeypatch.setattr(db.settings, "task_deps_enabled", True, raising=False) + init_db() + yield + + +def _make_task(stage="development", work_item_id="ORCH-1", repo="orchestrator"): + conn = get_db() + cur = conn.execute( + "INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage) " + "VALUES (?, ?, ?, ?, ?)", + (work_item_id, work_item_id, repo, f"feature/{work_item_id}", stage), + ) + tid = cur.lastrowid + conn.commit() + conn.close() + return tid + + +def _set_stage(task_id, stage): + conn = get_db() + conn.execute("UPDATE tasks SET stage=? WHERE id=?", (stage, task_id)) + conn.commit() + conn.close() + + +# ----------------------------------------------------------------- TC-B01 +def test_add_dependency_declares_and_resolves(): + a = _make_task(work_item_id="ORCH-10", stage="development") + b = _make_task(work_item_id="ORCH-11", stage="development") + assert db.add_dependency(b, a) is True + assert db.get_dependencies(b) == [a] + # Idempotent: re-declaring the same edge is a no-op. + assert db.add_dependency(b, a) is False + + +def test_self_edge_rejected(): + a = _make_task(work_item_id="ORCH-12") + assert db.add_dependency(a, a) is False + assert db.get_dependencies(a) == [] + + +def test_add_dependency_never_raises_on_bad_input(): + assert db.add_dependency(None, 1) is False + assert db.add_dependency(1, None) is False + + +# ----------------------------------------------------------------- TC-B02 +def test_is_task_ready_blocked_then_ready(): + a = _make_task(work_item_id="ORCH-20", stage="development") + b = _make_task(work_item_id="ORCH-21", stage="development") + db.add_dependency(b, a) + + ready, waiting = task_deps.is_task_ready(b) + assert ready is False + assert "ORCH-20" in waiting + + _set_stage(a, "done") + ready2, waiting2 = task_deps.is_task_ready(b) + assert ready2 is True + assert waiting2 == [] + + +def test_is_task_ready_no_deps_is_ready(): + a = _make_task(work_item_id="ORCH-22") + ready, waiting = task_deps.is_task_ready(a) + assert ready is True and waiting == [] + + +# ----------------------------------------------------------------- TC-B05 +def test_claim_skips_dep_blocked_job(): + a = _make_task(work_item_id="ORCH-30", stage="development") + b = _make_task(work_item_id="ORCH-31", stage="development") + db.add_dependency(b, a) + + job_b = enqueue_job("developer", "orchestrator", "do B", task_id=b) + # B is blocked by un-done A -> claim must NOT pick it (no slot taken). + claimed = claim_next_job() + assert claimed is None, "dep-blocked job must not be claimed" + + # A finishes -> B becomes claimable. + _set_stage(a, "done") + claimed2 = claim_next_job() + assert claimed2 is not None + assert claimed2["id"] == job_b + + +def test_claim_prefers_unblocked_job_over_blocked(): + a = _make_task(work_item_id="ORCH-40", stage="development") + b = _make_task(work_item_id="ORCH-41", stage="development") + c = _make_task(work_item_id="ORCH-42", stage="development") + db.add_dependency(b, a) # b blocked by a + + job_b = enqueue_job("developer", "orchestrator", "B", task_id=b) # older id + job_c = enqueue_job("developer", "orchestrator", "C", task_id=c) # not blocked + + claimed = claim_next_job() + assert claimed is not None + assert claimed["id"] == job_c, "blocked B skipped, unblocked C claimed" + assert job_b # referenced + + +# ----------------------------------------------------------------- TC-B07 +def test_reconciler_skip_helper_honours_block(monkeypatch): + """The reconciler reads is_task_ready; a not-ready task must be skipped.""" + from src import reconciler as rec + a = _make_task(work_item_id="ORCH-50", stage="development") + b = _make_task(work_item_id="ORCH-51", stage="development") + db.add_dependency(b, a) + + advanced = {"called": False} + monkeypatch.setattr(rec, "advance_if_gate_passed", + lambda *a, **k: advanced.__setitem__("called", True), + raising=False) + monkeypatch.setattr(rec, "has_active_job_for_task", lambda tid: False, raising=False) + monkeypatch.setattr(rec, "developer_retry_count", lambda tid: 0, raising=False) + monkeypatch.setattr(rec.settings, "task_deps_enabled", True, raising=False) + monkeypatch.setattr(rec.settings, "reconcile_enabled", True, raising=False) + monkeypatch.setattr(rec.settings, "reconcile_grace_default_s", 0, raising=False) + + r = rec.Reconciler() + # Bypass Guard 2 (networked) so we isolate Guard 3. + monkeypatch.setattr(r, "_is_blocked_or_needs_input", lambda task: False) + + task_row = {"id": b, "stage": "development", "repo": "orchestrator", + "work_item_id": "ORCH-51", "branch": "feature/ORCH-51", "age_s": 9999} + r._reconcile_gate_task(task_row) + assert advanced["called"] is False, "dep-blocked task must not be advanced (B-5)" diff --git a/tests/test_qg_merge_gate.py b/tests/test_qg_merge_gate.py index 302f012..98b7dbe 100644 --- a/tests/test_qg_merge_gate.py +++ b/tests/test_qg_merge_gate.py @@ -58,6 +58,12 @@ def lease_spy(monkeypatch): # Default merge_gate scope: real for the self-hosting orchestrator repo. monkeypatch.setattr(qg.settings, "merge_gate_enabled", True) monkeypatch.setattr(qg.settings, "merge_gate_repos", "") + # ORCH-026: these ORCH-043 composition tests assert the ancestor-based + # short-circuit ("branch up-to-date with main" -> no rebase). That is now the + # `premerge_rebase_always=False` kill-switch path; pin it OFF here so they + # keep testing the legacy ORCH-043 behaviour. The new always-rebase default + # (True) is covered by tests/test_orch026_premerge_rebase.py (TC-A01). + monkeypatch.setattr(qg.settings, "premerge_rebase_always", False, raising=False) return state