Files
orchestrator/src/git_worktree.py
Dev Agent 1ebe8afc23 feat(worktree): git worktree per task to isolate shared /repos (ORCH-2 / S-4)
- add src/git_worktree.py: ensure/remove/get_worktree_path
- config: worktrees_dir=/repos/_wt
- launcher: agent runs in per-branch worktree; task-file + commit/push in worktree; no shared checkout
- qg/checks: read artifacts + run make test from worktree (branch arg, backward-compatible)
- webhooks/plane: pass branch into QG dispatch; review fallback from worktree
- webhooks/gitea: keep read-only branch --contains in main clone (documented)
- tests: test_git_worktree.py (isolation) + update test_launcher write-task-file
- docs: ARCHITECTURE worktree section + BUGFIXES_2026-06-02_ORCH2

Preserves B-1/B-2/S-1/S-5 fixes (paths now point at worktree).
2026-06-02 21:12:06 +03:00

108 lines
4.4 KiB
Python

"""Git worktree management — isolated working copy per task/branch (ORCH-2 / S-4).
Background
----------
Previously every git operation (checkout/commit/push/test) ran in the single shared
clone ``/repos/<repo>``. With two active tasks a ``git checkout`` of one branch would
overwrite the working copy of the other -> races (see AUDIT S-4 / ET-009 "two collectors").
Solution
--------
Each task (branch) gets an isolated git worktree::
/repos/<repo> <- main clone (fetch / worktree management)
/repos/_wt/<repo>/<safe-branch> <- worktree for one task/branch (agent works here)
A branch can only be checked out in ONE worktree at a time, which is exactly the
property we want: one task = one branch = one worktree.
"""
import os
import re
import subprocess
import logging
from .config import settings
logger = logging.getLogger("orchestrator.git_worktree")
def _safe(branch: str) -> str:
"""Filesystem-safe branch name for use in a path component."""
return re.sub(r"[^A-Za-z0-9._-]", "_", branch)
def get_worktree_path(repo: str, branch: str) -> str:
"""Path of the worktree for (repo, branch). Does NOT create it."""
return os.path.join(settings.worktrees_dir, repo, _safe(branch))
def _main_repo(repo: str) -> str:
return os.path.join(settings.repos_dir, repo)
def ensure_worktree(repo: str, branch: str) -> str:
"""Create (or reuse) an isolated worktree for ``branch``. Returns its path.
Main clone stays at ``/repos/<repo>``. Worktree lives at
``/repos/_wt/<repo>/<safe-branch>``.
- If the worktree already exists, it is fetched + fast-aligned to the branch
(and to ``origin/<branch>`` when that remote branch exists).
- If the branch exists (locally or on origin) it is checked out into a fresh
worktree; otherwise a new branch is created from ``origin/main``.
"""
main_repo = _main_repo(repo)
wt = get_worktree_path(repo, branch)
if not os.path.isdir(main_repo):
raise FileNotFoundError(f"Main repo not found: {main_repo}")
# Always refresh refs in the main clone first.
subprocess.run(["git", "-C", main_repo, "fetch", "origin"],
capture_output=True, timeout=60)
# Reuse existing worktree (.git may be a dir or a file pointer for worktrees).
if os.path.isdir(os.path.join(wt, ".git")) or os.path.isfile(os.path.join(wt, ".git")):
subprocess.run(["git", "-C", wt, "fetch", "origin"], capture_output=True, timeout=60)
subprocess.run(["git", "-C", wt, "checkout", branch], capture_output=True, timeout=30)
# Align to remote only if the remote branch exists (avoid wiping local-only work).
rb = subprocess.run(
["git", "-C", wt, "rev-parse", "--verify", "--quiet", f"origin/{branch}"],
capture_output=True,
)
if rb.returncode == 0:
subprocess.run(["git", "-C", wt, "reset", "--hard", f"origin/{branch}"],
capture_output=True, timeout=30)
logger.info(f"Worktree reused: {wt} (branch {branch})")
return wt
os.makedirs(os.path.dirname(wt), exist_ok=True)
# Try to attach an existing branch (local or remote-tracking) to the new worktree.
r = subprocess.run(["git", "-C", main_repo, "worktree", "add", wt, branch],
capture_output=True, text=True, timeout=60)
if r.returncode != 0:
# Branch doesn't exist yet — create it from origin/main.
r2 = subprocess.run(
["git", "-C", main_repo, "worktree", "add", "-b", branch, wt, "origin/main"],
capture_output=True, text=True, timeout=60,
)
if r2.returncode != 0:
raise RuntimeError(
f"git worktree add failed for {repo}:{branch}: "
f"{r.stderr.strip()} | {r2.stderr.strip()}"
)
logger.info(f"Worktree ready: {wt} (branch {branch})")
return wt
def remove_worktree(repo: str, branch: str):
"""Remove the worktree for (repo, branch) — optional cleanup when a task is done."""
main_repo = _main_repo(repo)
wt = get_worktree_path(repo, branch)
subprocess.run(["git", "-C", main_repo, "worktree", "remove", "--force", wt],
capture_output=True, timeout=30)
# Prune dangling administrative entries.
subprocess.run(["git", "-C", main_repo, "worktree", "prune"],
capture_output=True, timeout=30)
logger.info(f"Worktree removed: {wt}")