Files
orchestrator/src/main.py
claude-bot 87af857082
All checks were successful
CI / test (push) Successful in 1m12s
CI / test (pull_request) Successful in 1m17s
fix(serial-gate): pause-without-blocking via per-task park signal (ORCH-124)
Fixes incident ORCH-116/ORCH-123: serial_gate defined a repo's "active task"
purely by machine stage (tasks.stage NOT IN ('done','cancelled')). Plane statuses
Backlog/Blocked/Needs-Input (layer-B indication, ORCH-066) do NOT change
tasks.stage (layer A), so a paused predecessor was indistinguishable from an active
one and held the FIFO gate closed against an urgent successor — the urgent fix
could not start until the paused task was formally done.

Introduces an explicit, durable, DB-resolvable per-task "park" signal — additive
nullable column tasks.paused_at (pattern of cancelled_at/track) — and a new
ORTHOGONAL scheduler "pause" axis. The serial-gate "active task" predicate becomes
`stage NOT IN ('done','cancelled') AND paused_at IS NULL` across all three points
(build_claim_clause / repo_has_active_task / _per_repo_snapshot). The terminal set
{done,cancelled} in serial_gate/task_deps/stages.py is byte-for-byte unchanged
(adr-0026 not regressed): task_deps/stages.py do NOT read paused_at, so a paused
declared dependency and an active repo_freeze STILL block (pause never bypasses
them — different axes). Anti-stale-base on resume relies on the existing deferred
branch cut (ORCH-088) + pre-merge auto_rebase_onto_main + merge-gate re-test
(ORCH-026/093/110) — no new rebase machinery.

Additive, under an independent sub-flag, never-raise, restart-safe; hot-claim
fail-OPEN and freeze fail-CLOSED preserved. STAGE_TRANSITIONS / QG_CHECKS / check_*
/ machine-verdict keys / existing table schemas are byte-for-byte untouched (this is
a queue-scheduler + observability change, not a Quality Gate).

- src/db.py: additive tasks.paused_at column (_ensure_column) + set/clear/is helpers
- src/serial_gate.py: _pause_layer_enabled() + pause-term in the 3 points; `paused`
  list + per-job `reason` (freeze>dependency>active-task>null) in the /queue snapshot
- src/config.py + .env.example: serial_gate_pause_enabled (default True = true no-op)
- src/main.py: POST /serial-gate/pause|resume?work_item=<id> (by образцу unfreeze)
- tests/test_orch124_serial_gate_pause.py: TC-01 mandatory incident regress + TC-02..15
- CHANGELOG.md: [Unreleased] entry

ADR: docs/work-items/ORCH-124/06-adr/ADR-001-serial-gate-pause-without-blocking.md
Cross-cutting: docs/architecture/adr/adr-0051-serial-gate-pause-without-blocking.md

Refs: ORCH-124

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-16 19:35:55 +03:00

649 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
import logging
from .db import init_db
from .webhooks.plane import router as plane_router
from .webhooks.gitea import router as gitea_router
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
# M-1: proper orphan-recovery.
# An orphan = an agent_run with no finished_at that is older than the recovery
# window. After a uvicorn restart the monitor thread is gone, so its child claude
# process (if any) was reparented to init; we cannot kill it by pid (pid is not
# persisted). Instead of silently writing exit=-1, we: enumerate each orphan,
# mark it exit=-1, log a warning per run, and notify so a human can check/restart.
log = logging.getLogger('orchestrator')
from .db import get_db
conn = get_db()
orphan_rows = conn.execute(
"SELECT id, task_id, agent FROM agent_runs "
"WHERE finished_at IS NULL AND started_at < datetime('now', '-35 minutes')"
).fetchall()
for row in orphan_rows:
run_id, task_id, agent = row[0], row[1], row[2]
conn.execute(
"UPDATE agent_runs SET finished_at=datetime('now'), exit_code=-1 WHERE id=?",
(run_id,),
)
log.warning(
f"Orphan run {run_id} (task {task_id}, agent {agent}) recovered — "
f"manual check needed (process may have been killed on restart)"
)
conn.commit()
conn.close()
if orphan_rows:
try:
from .notifications import send_telegram
ids = ", ".join(str(r[0]) for r in orphan_rows)
send_telegram(
f"\u26a0\ufe0f Orchestrator restart: {len(orphan_rows)} orphaned agent run(s) "
f"(run_id: {ids}) marked exit=-1. Нужна ручная проверка/перезапуск."
)
except Exception:
pass
log.warning(f"Recovered {len(orphan_rows)} orphaned agent runs")
# ORCH-1 (F-2b): queue-recovery. Any job left in 'running' status belongs to a
# worker that died on the previous restart -> put it back to 'queued' so the
# worker re-picks it up (restart-safe, no lost work). Runs AFTER M-1.
from .db import requeue_running_jobs
requeued = requeue_running_jobs()
if requeued:
log.warning(f"Queue-recovery: requeued {requeued} running job(s) after restart")
# ORCH-114 (adr-0045 / D7 / FR-4): clear durable transition-leases left by the
# PREVIOUS process boot. This process has a fresh boot_id, so every prior lease is
# stale by construction -> reclaim it so the just-requeued jobs can re-drive their
# side-effectful transitions cleanly. Idempotency of the re-drive comes from the
# authoritative durable facts (SHA-in-main / the INITIATED self-deploy marker /
# the coverage-ratchet CAS), NOT from a new recovery brain — the lease only
# guarantees the re-drive runs SEQUENTIALLY (one owner), never concurrently. Runs
# AFTER requeue_running_jobs and BEFORE the reaper starts. never raises.
try:
from . import transition_lease
cleared_leases = transition_lease.recover_on_startup()
if cleared_leases:
log.warning(
f"Transition-lease recovery: cleared {cleared_leases} stale lease(s) "
f"from a previous boot"
)
except Exception as e:
log.warning(f"Transition-lease recovery skipped: {e}")
# ORCH-123 (adr-0049 / D5 / FR-4): prod-like preflight of the host-side staging
# execution channel. The deploy-staging staging-runner (ORCH-115) runs the suite
# HOST-SIDE over ssh because the prod container ships no docker CLI (Dockerfile:11);
# probe the channel at startup so a broken environment (no docker on host / staging
# down / ssh unreachable / no ssh target) surfaces HERE — not postfactum as a false
# rollback of a real task (incident ORCH-116). Purely observational: it never blocks
# the start / gates the pipeline. never raises (runs after requeue + lease-recovery).
try:
from . import staging_runner
ok, reason = staging_runner.preflight()
if not ok:
log.warning(f"Staging-runner preflight: {reason}")
except Exception as e:
log.warning(f"Staging-runner preflight skipped: {e}")
# ORCH-065: proactive startup reclaim of dead/stale merge-leases, next to the
# queue-recovery above. A lease held by the previous (now dead) process pid is
# released at once instead of waiting for the TTL / a foreign acquire so the
# next merge is not blocked. Conditional (merge_gate_repos / self-hosting) and
# gated by ORCH_LEASE_RECLAIM_ENABLED; never raises.
try:
from .job_reaper import reclaim_all_stale_leases
reclaimed = reclaim_all_stale_leases()
if reclaimed:
log.warning(f"Startup lease-reclaim: reclaimed {reclaimed} stale merge-lease(s)")
except Exception as e:
log.warning(f"Startup lease-reclaim skipped: {e}")
# L-2: rotate old per-run logs at startup (best-effort; never fatal).
try:
import os as _os
from .config import settings as _settings
from .agents.launcher import prune_run_logs
_runs_dir = _os.path.join(_os.path.dirname(_settings.db_path), "runs")
_removed = prune_run_logs(
_runs_dir,
keep_days=_settings.log_keep_days,
keep_max=_settings.log_keep_max,
)
if _removed:
log.info(f"Log rotation: pruned {_removed} old run log(s) from {_runs_dir}")
except Exception as e:
log.warning(f"Log rotation skipped: {e}")
# ORCH-057 (D3 / FR-3): best-effort legacy-ownership detect. Surfaces a
# PROACTIVE operator signal (WARNING + Telegram) when /repos still holds
# root-owned files after the uid migration, BEFORE a task fails on launch.
# never-fatal (mirrors lease-reclaim / log-rotation above): a detect error must
# not crash the start of the shared instance. The actual "clear, early" failure
# is delivered by the actionable error in ensure_worktree (D1) — claim is NOT
# blocked (ADR-001 D3). Honours ORCH_FS_NORMALIZE_ENABLED inside scan_ownership.
try:
from .fs_normalize import scan_ownership, healing_command, normalize
from .config import settings as _fs_settings
scan = scan_ownership()
if scan.mismatch:
log.warning(
"FS-ownership mismatch: %d root(s) with files not owned by uid %s "
"(%s; sample: %s). Heal: %s",
len(scan.roots_mismatch), scan.target_uid,
", ".join(scan.roots_mismatch), scan.sample_path, healing_command(),
)
try:
from .notifications import send_telegram
send_telegram(
"⚠️ Orchestrator: обнаружены legacy root-owned файлы в "
f"{', '.join(scan.roots_mismatch)} (uid != {scan.target_uid}). "
f"Первый запуск задачи может упасть на создании worktree. "
f"Лечение: {healing_command()}"
)
except Exception:
pass
# D4 / FR-4: opt-in auto-chown ONLY when privileged (no-op under uid 1000).
if getattr(_fs_settings, "fs_normalize_auto", False):
try:
res = normalize()
log.warning("FS-ownership auto-normalize: %s", res.get("note"))
except Exception as e: # noqa: BLE001
log.warning("FS-ownership auto-normalize skipped: %s", e)
except Exception as e:
log.warning(f"FS-ownership detect skipped: {e}")
# Start the background job-queue worker (ORCH-1).
from .queue_worker import worker
worker.start()
# ORCH-053: start the stuck-task reconciler AFTER the worker so its active-job
# guard sees a fully-initialised queue. Kill-switch: ORCH_RECONCILE_ENABLED.
from .reconciler import reconciler
reconciler.start()
# ORCH-065: start the job-reaper LAST (after requeue_running_jobs + the worker
# + the reconciler) so its atomic status='running' guard never races the
# startup requeue. It reaps zombie jobs and periodically reclaims stale
# merge-leases. Kill-switch: ORCH_REAPER_ENABLED.
from .job_reaper import reaper
reaper.start()
# ORCH-063: start the disk-watchdog LAST (after the reaper). It is independent
# of the queue/DB — it only reads host-FS fill and Telegram-alerts at >=
# threshold — so the order is not critical, but we follow the daemon
# convention. Honours the kill-switch ORCH_DISK_MONITOR_ENABLED (start() is a
# no-op when disabled, so behaviour is 1:1 as before).
from .disk_watchdog import disk_watchdog
disk_watchdog.start()
# ORCH-062: start the build-cache-pruner LAST, right after the disk-watchdog
# (D7). It is the "second half" of the watchdog (watchdog signals, pruner
# cleans): a daemon thread that periodically runs `docker builder prune` on
# the host over ssh. Honours the kill-switch ORCH_BUILD_CACHE_PRUNE_ENABLED
# (start() is a no-op when disabled, so behaviour is 1:1 as before).
from .build_cache_pruner import build_cache_pruner
build_cache_pruner.start()
try:
yield
finally:
# ORCH-062: stop the build-cache-pruner first (reverse of startup, D7).
build_cache_pruner.stop()
# ORCH-063: stop the disk-watchdog next (reverse of startup).
disk_watchdog.stop()
# Graceful shutdown order mirrors startup in reverse: stop the reaper
# first, then the reconciler (it must not enqueue new work while the
# worker is winding down), then the worker. Running agents keep going;
# their jobs are requeued on next start via queue-recovery if the
# process dies.
reaper.stop()
reconciler.stop()
worker.stop()
app = FastAPI(title="Multi-Agent Orchestrator", lifespan=lifespan)
app.include_router(plane_router, prefix="/webhook")
app.include_router(gitea_router, prefix="/webhook")
@app.get("/health")
async def health():
return {"status": "ok", "service": "orchestrator"}
@app.get("/status")
async def status():
from .db import get_db
conn = get_db()
tasks = conn.execute(
"SELECT * FROM tasks WHERE stage != 'done' ORDER BY created_at DESC LIMIT 10"
).fetchall()
conn.close()
return {"active_tasks": [dict(t) for t in tasks]}
@app.get("/queue")
async def queue():
"""ORCH-1: job-queue observability — status counts + recent jobs."""
from .db import job_status_counts, recent_jobs
from .queue_worker import worker
from .reconciler import reconciler
from .job_reaper import reaper
from . import post_deploy
from . import merge_gate
from . import task_deps
from . import serial_gate
from . import coverage_gate
from . import fs_normalize
from . import labels
from . import cancel
from . import bug_fast_track
from . import lessons
from . import checkout_hygiene
from . import transition_lease
from . import staging_runner
from . import test_runner
from .disk_watchdog import disk_watchdog
from .build_cache_pruner import build_cache_pruner
return {
"counts": job_status_counts(),
"max_concurrency": worker.max_concurrency,
"poll_interval": worker.poll_interval,
"resilience": worker.status(),
"reconcile": reconciler.status(),
"reaper": reaper.status(),
"post_deploy": post_deploy.status(),
"merge_verify": merge_gate.merge_verify_status(),
# ORCH-110 (D6): merge-gate re-test infra-timeout observability (read-only) —
# tolerance/skip/tree-kill flags + timeout/infra-retry/skip counters, so an
# infra-timeout is distinguishable from a code-fault rollback. Additive block;
# never-raise.
"merge_gate": merge_gate.merge_gate_status(),
# ORCH-026 (G-2): declarative task-dependency observability (read-only,
# NOT a source of truth) — declared edges, blocked tasks, detected cycle.
"task_deps": task_deps.snapshot(),
# ORCH-088 (D9 / AC-10): per-repo serial-gate observability (read-only) —
# active task, queued/waiting analyst-jobs, freeze state. Additive block.
"serial_gate": serial_gate.snapshot(),
# ORCH-027 (FR-7 / AC-9): coverage-gate observability (read-only) —
# kill-switch, scope, policy/floor/epsilon, per-repo baselines. Additive block.
"coverage": coverage_gate.snapshot(),
# ORCH-057 (D6 / AC-4): legacy-ownership detect observability (read-only) —
# kill-switch, scope, target_uid, mismatch + affected roots (TTL-cached scan).
# Additive block; never-raise.
"fs_ownership": fs_normalize.snapshot(),
# ORCH-089 (D7): auto-mode-by-label observability (read-only) — kill-switch,
# label names, scope. Additive block.
"auto_labels": labels.snapshot(),
# ORCH-090 (AC-10): STOP-cancellation observability (read-only) — kill-switch,
# repo scope, cancelled/deferred counts, recent cancellations. Additive block;
# never-raise.
"stop": cancel.snapshot(),
# ORCH-019 (FR-7 / AC-7): bug-fast-track observability (read-only) —
# kill-switch, label, scope, bug-task counts + the structural savings metric
# (architecture stages skipped). Additive block; never-raise.
"bug_fast_track": bug_fast_track.snapshot(),
# ORCH-112 (D3): deploy-base checkout-hygiene observability (read-only) —
# kill-switch + scope. Additive block; never-raise.
"checkout_hygiene": checkout_hygiene.snapshot(),
# ORCH-114 (adr-0045 / D10 / FR-6): durable transition-ownership lease
# observability (read-only) — kill-switch, scope, boot_id, active holders
# (owner/stage/age/live) + defer/reclaim/CAS-lost counters. Additive block;
# never-raise.
"transition_lease": transition_lease.snapshot(),
# ORCH-115 (FR-7 / AC-10): deterministic staging-runner observability
# (read-only) — kill-switch, scope, timeout/infra budget + run/success/
# failed/tool_error/deferred counters, so a code-fail FAILED is distinguishable
# from an infra tool-error. Additive block; never-raise.
"staging_runner": staging_runner.snapshot(),
# ORCH-116 (FR-8 / AC-13): deterministic test-runner observability (read-only)
# — kill-switch, scope, target, timeout/smoke/infra budget + run/pass/fail/
# tool_error/deferred counters, so a code-fail FAIL is distinguishable from an
# infra tool-error. Additive block; never-raise.
"test_runner": test_runner.snapshot(),
# ORCH-098 (FR-4 / AC-4): lessons-journal observability (read-only) —
# kill-switch + counts by type/status + last N lessons. Additive block;
# never-raise (snapshot() returns {"enabled": ...} minimum on error).
"lessons": lessons.snapshot(),
# ORCH-063 (FR-6 / AC-7): disk-watchdog observability (read-only) —
# enabled, threshold, interval, last measurement per host-path. Additive
# block; never-raise (status() returns {"enabled": ...} minimum on error).
"disk_monitor": disk_watchdog.status(),
# ORCH-062 (FR-4 / AC-7): build-cache-pruner observability (read-only) —
# enabled, interval, retention (until), last run + best-effort reclaimed /
# last error. Additive block; never-raise (status() returns {"enabled":
# ...} minimum on error).
"build_cache_prune": build_cache_pruner.status(),
"recent": recent_jobs(10),
}
@app.get("/metrics")
async def metrics():
"""ORCH-099 (FND/F1a): lightweight read-only raw-signal snapshot for the F1b sidecar.
A versioned JSON envelope (``schema_version`` / ``generated_at`` / ``clk_tck``)
with four raw-signal sections — ``stages`` (active task stages + age),
``queue`` (counts / retries / breaker / concurrency), ``agents`` (agent-liveness:
pid / runtime / cpu_ticks), ``cost`` (per-run + aggregate tokens/cost). The
orchestrator emits ONLY raw signal it alone knows; the stateful arbiter
(thresholds / deltas / alerts) is the separate sidecar (BRD §1).
Thin wrapper over ``metrics.build_metrics()`` (in the style of GET /queue): the
collector is already strictly read-only and never-raise, so no extra error
handling is needed here. Same access level as /queue//status. The format is the
documented contract for the sidecar (docs/architecture/README.md).
"""
from . import metrics as metrics_mod
return metrics_mod.build_metrics()
@app.post("/serial-gate/unfreeze")
async def serial_gate_unfreeze(repo: str = ""):
"""ORCH-088 (FR-5, ADR-001 D4): manually clear a per-repo rollback-freeze.
A freeze set by the post-deploy monitor (DEGRADED) keeps the serial gate CLOSED
for the repo until an operator explicitly clears it here. Idempotent: clearing
an already-clear repo reports ``cleared: 0``. The next queued analyst-job is then
claimable on the next scheduler tick (no restart needed). Alternative manual path
(documented in README): ``UPDATE repo_freeze SET cleared_at=datetime('now')
WHERE repo=? AND cleared_at IS NULL``.
"""
from . import serial_gate
if not repo or not repo.strip():
return {"ok": False, "error": "missing 'repo'", "repo": repo, "cleared": 0}
repo = repo.strip()
cleared = serial_gate.clear_repo_freeze(repo)
frozen = serial_gate.is_repo_frozen(repo)
if cleared:
try:
from .notifications import send_telegram
send_telegram(
f"🔥 {repo}: пакет РАЗМОРОЖЕН вручную ({cleared} запис(ь/и) снято). "
f"Следующая задача репо стартует на ближайшем цикле."
)
except Exception:
pass
return {"ok": True, "repo": repo, "cleared": cleared, "frozen": frozen}
@app.post("/serial-gate/pause")
async def serial_gate_pause(work_item: str = ""):
"""ORCH-124 (adr-0051 D7): park a task so the serial gate stops counting it as
an active FIFO blocker — an urgent successor may overtake it.
Explicit, durable, DB-resolvable operator intent (NOT a Plane-status gesture):
stamps ``tasks.paused_at`` so the offline hot-claim SQL reads it locally without
a network call. Pause does NOT bypass a ``repo_freeze`` or a declared dependency
(different axes) and is NOT terminal (distinct from STOP/cancel). By образцу
``POST /serial-gate/unfreeze``; never-raise. Pausing a terminal (done/cancelled)
task is a no-op. When the pause sub-flag is off the call is a no-op + warning
(the pause-term is omitted from the gate, so a column write would be latent).
"""
from . import db
from . import serial_gate
if not work_item or not work_item.strip():
return {"ok": False, "error": "missing 'work_item'", "work_item": work_item}
work_item = work_item.strip()
if not serial_gate._pause_layer_enabled():
return {"ok": False, "error": "serial_gate_pause_enabled is off (no-op)",
"work_item": work_item}
task = db.get_task_by_work_item_id(work_item)
if not task:
return {"ok": False, "error": "unknown work_item", "work_item": work_item}
task_id = task["id"]
stage = task.get("stage")
if stage in ("done", "cancelled"):
return {"ok": False, "error": f"task is terminal (stage={stage})",
"work_item": work_item, "task_id": task_id, "stage": stage}
ok = db.set_task_paused(task_id)
refreshed = db.get_task_by_work_item_id(work_item) or {}
paused_at = refreshed.get("paused_at")
if ok:
try:
from .notifications import send_telegram, link_for
send_telegram(
f"⏸️ {link_for(work_item)}: задача поставлена на ПАУЗУ для serial-gate "
f"(task {task_id}, stage={stage}). Срочный успешник репо может обогнать; "
f"resume — POST /serial-gate/resume."
)
except Exception:
pass
return {"ok": ok, "work_item": work_item, "task_id": task_id,
"stage": stage, "paused_at": paused_at}
@app.post("/serial-gate/resume")
async def serial_gate_resume(work_item: str = ""):
"""ORCH-124 (adr-0051 D7 / AC-10): resume a parked task — it re-enters the
serial gate (holds it as active again / re-enters FIFO with the deferred branch
cut, D8). Inverse of ``POST /serial-gate/pause``; idempotent (resuming a task
that is not paused clears nothing). Anti-stale-base on resume is guaranteed by
the EXISTING mechanisms (deferred branch cut + pre-merge auto_rebase_onto_main +
merge-gate re-test, ORCH-088/093/110) — no new rebase machinery. never-raise.
"""
from . import db
if not work_item or not work_item.strip():
return {"ok": False, "error": "missing 'work_item'", "work_item": work_item}
work_item = work_item.strip()
task = db.get_task_by_work_item_id(work_item)
if not task:
return {"ok": False, "error": "unknown work_item", "work_item": work_item}
task_id = task["id"]
was_paused = task.get("paused_at") is not None
ok = db.clear_task_paused(task_id)
if ok and was_paused:
try:
from .notifications import send_telegram, link_for
send_telegram(
f"▶️ {link_for(work_item)}: задача СНЯТА С ПАУЗЫ (task {task_id}) — "
f"снова участвует в serial-gate."
)
except Exception:
pass
return {"ok": ok, "work_item": work_item, "task_id": task_id,
"was_paused": was_paused, "paused_at": None}
@app.post("/transition-lease/release")
async def transition_lease_release(work_item: str = ""):
"""ORCH-114 (adr-0045 / D10): operator manual reclaim of a stuck transition-lease.
By образцу ``POST /serial-gate/unfreeze``: if a lease somehow outlives its owner
(the normal try/finally release + the reaper force-release + the Tier-3 backstop
should make this unnecessary), an operator can force-release it by work-item id so
a re-approve / the reconciler can re-drive the transition. Idempotent: releasing a
free task reports ``released: false``. Read-only/never-raise otherwise.
"""
from . import transition_lease
from .db import get_task_by_work_item_id
if not work_item or not work_item.strip():
return {"ok": False, "error": "missing 'work_item'", "work_item": work_item}
work_item = work_item.strip()
task = get_task_by_work_item_id(work_item)
if not task:
return {"ok": False, "error": "task not found", "work_item": work_item}
task_id = task["id"]
held_before = transition_lease.is_held_by_live_owner(task_id)
transition_lease.release(task_id, force=True)
if held_before:
try:
from .notifications import send_telegram, link_for
send_telegram(
f"🔓 {link_for(work_item)}: transition-lease сброшен вручную "
f"(task {task_id}). Переход может быть пере-исполнен."
)
except Exception:
pass
return {"ok": True, "work_item": work_item, "task_id": task_id, "released": held_before}
@app.post("/fs-normalize/check")
async def fs_normalize_check(normalize: bool = False):
"""ORCH-057 (D6 / AC-4): force a fresh legacy-ownership detect (bypass the TTL
cache) and return the snapshot. By образцу ``POST /serial-gate/unfreeze``.
``normalize=true`` additionally attempts an opt-in ``chown`` — a no-op under uid
1000 (the prod-self case), effective only when the process is privileged (D4).
The real fix remains the operator procedure (docs/operations/INFRA.md «Миграция
uid»). Read-only/never-raise otherwise.
"""
from . import fs_normalize as _fs
scan = _fs.scan_ownership(force=True)
out = {"ok": True, "scan": scan.to_dict(), "healing": _fs.healing_command()}
if normalize:
out["normalize"] = _fs.normalize()
# Re-scan so the returned snapshot reflects any change a privileged run made.
out["scan"] = _fs.scan_ownership(force=True).to_dict()
return out
@app.post("/coverage/baseline")
async def coverage_set_baseline(repo: str = "", value: float | None = None):
"""ORCH-027 (D8): manually set/override the per-repo coverage baseline.
For a legitimate one-off coverage drop (e.g. removing a large tested module) the
operator sets the baseline directly here (by образцу ``POST /serial-gate/unfreeze``)
instead of waiting for the upward-only ratchet. Unlike the ratchet this CAN lower
the baseline. Alternative without this endpoint: temporarily flip
``ORCH_COVERAGE_POLICY=absolute``.
"""
from . import db
if not repo or not repo.strip():
return {"ok": False, "error": "missing 'repo'", "repo": repo}
if value is None:
return {"ok": False, "error": "missing 'value'", "repo": repo}
repo = repo.strip()
ok = db.set_coverage_baseline(repo, value, sha="manual-override")
return {"ok": ok, "repo": repo, "baseline": db.get_coverage_baseline(repo)}
@app.post("/bug-fast-track/escalate")
async def bug_fast_track_escalate(work_item: str = ""):
"""ORCH-019 (FR-5 / AC-5, ADR-001 D5): escalate a bug-fast-track task to the
full cycle (return it to the route WITH `architecture`).
Operator path for a bug that turned out to be complex / architectural / visual
(needs an ADR or a mock): reset ``tasks.track`` 'bug' -> 'full'. Apply while the
task is still in `analysis` (before its exit) — the next advance_stage then routes
analysis -> architecture normally. By образцу ``POST /serial-gate/unfreeze`` /
``POST /coverage/baseline``. never-raise.
"""
from . import db
if not work_item or not work_item.strip():
return {"ok": False, "error": "missing 'work_item'", "work_item": work_item}
work_item = work_item.strip()
task = db.get_task_by_work_item_id(work_item)
if not task:
return {"ok": False, "error": "unknown work_item", "work_item": work_item}
prev_track = task.get("track") or "full"
db.set_task_track(task["id"], "full")
if prev_track == "bug":
try:
from .notifications import send_telegram
send_telegram(
f"🐞➡️ {work_item}: эскалация в ПОЛНЫЙ цикл "
f"(багфикс-трек снят, стадия architecture восстановлена)."
)
except Exception:
pass
try:
from .plane_sync import add_comment
add_comment(
work_item,
"🐞➡️ Эскалация: задача возвращена в полный цикл "
"(багфикс-трек снят, стадия architecture восстановлена).",
author="analyst",
)
except Exception:
pass
return {"ok": True, "work_item": work_item, "track": "full", "was": prev_track}
# ---------------------------------------------------------------------------
# ORCH-098 (FR-4 / FR-5, ADR-001 D5): machine lessons-journal endpoints.
# Read-only fetch + manual record + re-classify. All never-raise; with the
# kill-switch off they return {"enabled": false} (style of /metrics, AC-7).
# ---------------------------------------------------------------------------
@app.get("/lessons")
async def lessons_list(
type: str = "", status: str = "", repo: str = "", work_item: str = "",
limit: int | None = None,
):
"""ORCH-098: read-only lessons fetch with optional filters (type / status / repo
/ work_item / limit). Always 200; reading never mutates. ``lessons_enabled=False``
-> ``{"enabled": false}``."""
from . import lessons
from .config import settings
if not getattr(settings, "lessons_enabled", True):
return {"enabled": False, "lessons": []}
rows = lessons.get(
lesson_type=(type or None), status=(status or None), repo=(repo or None),
work_item_id=(work_item or None), limit=limit,
)
return {"enabled": True, "lessons": rows}
@app.post("/lessons")
async def lessons_create(request: Request):
"""ORCH-098: manually record a lesson (``source="manual"``, never deduped). JSON
body: ``lesson_type`` (required) + optional context / analysis / attribution
fields. Returns ``{"id": <int>}`` or ``{"enabled": false}`` /
``{"error": ...}``."""
from . import lessons
from .config import settings
if not getattr(settings, "lessons_enabled", True):
return {"enabled": False}
try:
body = await request.json()
except Exception: # noqa: BLE001 - malformed body
body = {}
if not isinstance(body, dict):
body = {}
lesson_type = body.get("lesson_type")
if not lesson_type:
return {"ok": False, "error": "missing 'lesson_type'"}
# Only forward known fields; source is forced to "manual" (operator/Стрим).
allowed = (
"work_item_id", "task_id", "stage", "agent", "repo", "root_cause",
"suggestion", "status", "related_task", "attribution", "target_repo",
"target_domain", "detail",
)
kwargs = {k: body[k] for k in allowed if k in body}
new_id = lessons.record(lesson_type, source="manual", **kwargs)
return {"id": new_id}
@app.post("/lessons/{lesson_id}")
async def lessons_update(lesson_id: int, request: Request):
"""ORCH-098: re-classify / re-status an existing lesson (status / attribution /
target_* / related_task / root_cause / suggestion). Lets a human / the
retrospective agent classify an auto-recorded ``unknown``. Returns
``{"ok": bool}`` or ``{"enabled": false}``."""
from . import lessons
from .config import settings
if not getattr(settings, "lessons_enabled", True):
return {"enabled": False}
try:
body = await request.json()
except Exception: # noqa: BLE001 - malformed body
body = {}
if not isinstance(body, dict):
body = {}
allowed = (
"status", "attribution", "target_repo", "target_domain", "related_task",
"root_cause", "suggestion", "stage", "agent", "repo", "detail",
)
kwargs = {k: body[k] for k in allowed if k in body}
ok = lessons.update(lesson_id, **kwargs)
return {"ok": ok}