diff --git a/src/agents/launcher.py b/src/agents/launcher.py index cfbf5f1..3a60484 100644 --- a/src/agents/launcher.py +++ b/src/agents/launcher.py @@ -7,7 +7,7 @@ from ..config import settings from ..db import get_db, get_task_by_repo_branch, update_task_stage from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage from ..qg.checks import QG_CHECKS -from ..notifications import notify_stage_change, notify_qg_failure +from ..notifications import notify_stage_change, notify_qg_failure, notify_agent_started, notify_agent_finished, notify_approve_requested from ..plane_sync import notify_stage_change as plane_notify_stage, add_comment as plane_add_comment logger = logging.getLogger("orchestrator.launcher") @@ -26,6 +26,7 @@ class AgentLauncher: "system_prompt": ".openclaw/agents/architect.md", "task_file": ".task-arch.md", "allowed_tools": "Read,Write,Edit,Bash", + "model": "opus", }, "developer": { "system_prompt": ".openclaw/agents/developer.md", @@ -36,12 +37,18 @@ class AgentLauncher: "system_prompt": ".openclaw/agents/reviewer.md", "task_file": ".task-review.md", "allowed_tools": "Read,Write,Edit,Bash", + "model": "opus", }, "tester": { "system_prompt": ".openclaw/agents/tester.md", "task_file": ".task-test.md", "allowed_tools": "Read,Write,Edit,Bash", }, + "deployer": { + "task_file": ".task-deploy.md", + "system_prompt": ".openclaw/agents/deployer.md", + "allowed_tools": "Read,Write,Edit,Bash", + }, } CLAUDE_BIN = "/opt/claude-code/bin/claude.exe" @@ -98,9 +105,13 @@ class AgentLauncher: _br_row = get_db().execute("SELECT branch FROM tasks WHERE id=?", (task_id,)).fetchone() if task_id else None agent_branch = _br_row[0] if _br_row else "main" + model = config.get("model", "") + model_flag = f"--model {model} " if model else "" + cmd = ( f'cd {local_repo_path} && git fetch origin 2>/dev/null; git checkout {agent_branch} 2>/dev/null || git checkout -b {agent_branch} origin/{agent_branch} 2>/dev/null; ' f'{self.CLAUDE_BIN} --print ' + f'{model_flag}' f'"$(cat {task_file})" ' f'--system-prompt "$(cat {system_prompt})" ' f'--allowedTools {allowed_tools}' @@ -109,12 +120,11 @@ class AgentLauncher: logger.info(f"Launching agent '{agent}' for repo '{repo}', run_id={run_id}") # Launch as background process - with open(output_path, "w") as log_file: - proc = subprocess.Popen( - ["bash", "-c", cmd], - stdout=log_file, - stderr=subprocess.STDOUT, - env={ + proc = subprocess.Popen( + ["bash", "-c", cmd], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env={ **os.environ, "HOME": "/home/slin", "GIT_AUTHOR_NAME": "claude-bot", @@ -144,12 +154,13 @@ class AgentLauncher: # agent_branch already computed above m = threading.Thread( target=self._monitor_agent, - args=(proc, run_id, agent, repo, agent_branch), + args=(proc, run_id, agent, repo, agent_branch, output_path), daemon=True, ) m.start() logger.info(f"Agent '{agent}' launched, pid={proc.pid}, run_id={run_id}") + notify_agent_started(run_id, agent, task_id) return run_id def _watchdog(self, pid: int, run_id: int, timeout: int = None): @@ -171,9 +182,60 @@ class AgentLauncher: except ProcessLookupError: pass # Already finished - def _monitor_agent(self, proc, run_id, agent, repo, branch): + def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None): """Wait for agent to finish, commit+push results, update DB.""" + import time as _time + _start_ts = _time.time() + + # Stream stdout PIPE to log file with startup timeout + if output_path and proc.stdout: + try: + with open(output_path, "w") as log_file: + _got_real_output = False + _startup_timeout = 120 # seconds + + import select + while True: + # Check if process has finished + if proc.poll() is not None: + # Read remaining output + remaining = proc.stdout.read() + if remaining: + log_file.write(remaining.decode("utf-8", errors="replace")) + log_file.flush() + break + + # Use select to wait for output with timeout + ready, _, _ = select.select([proc.stdout], [], [], 10) + if ready: + line = proc.stdout.readline() + if not line: + break # EOF + decoded = line.decode("utf-8", errors="replace") + log_file.write(decoded) + log_file.flush() + + # Check if this is real output (not just git checkout noise) + stripped = decoded.strip() + if stripped and not stripped.startswith(("M\t", "Your branch", "Already on", "Switched to")): + _got_real_output = True + else: + # Timeout on select - check startup timeout + if not _got_real_output and (_time.time() - _start_ts) > _startup_timeout: + logger.error(f"Agent run_id={run_id} ({agent}): no output after {_startup_timeout}s, killing") + try: + proc.kill() + except Exception: + pass + log_file.write(f"\n[TIMEOUT] No output after {_startup_timeout}s - process killed\n") + break + + proc.stdout.close() + except Exception as e: + logger.error(f"Agent run_id={run_id}: log streaming error: {e}") + exit_code = proc.wait() + _duration_s = int(_time.time() - _start_ts) logger.info(f"Agent run_id={run_id} ({agent}) finished with exit_code={exit_code}") # Update DB @@ -183,8 +245,14 @@ class AgentLauncher: (exit_code, run_id), ) conn.commit() + + # Get task_id for notification + _row = conn.execute("SELECT task_id FROM agent_runs WHERE id=?", (run_id,)).fetchone() + _task_id = _row[0] if _row else None conn.close() + notify_agent_finished(run_id, agent, exit_code, task_id=_task_id, duration_s=_duration_s) + # Commit and push any changes repo_path = os.path.join(settings.repos_dir, repo) try: @@ -239,6 +307,9 @@ class AgentLauncher: ) if push_result.returncode == 0: logger.info(f"Agent run_id={run_id}: committed and pushed to {branch}") + # Auto-create PR after developer pushes + if agent == "developer": + self._ensure_pr(repo, branch, run_id) else: logger.error(f"Agent run_id={run_id}: push failed: {push_result.stderr}") else: @@ -248,6 +319,41 @@ class AgentLauncher: except Exception as e: logger.error(f"Agent run_id={run_id}: post-run git failed: {e}") + # Handle deployer failure (smoke/healthcheck failed) — Task 7 + if exit_code != 0 and agent == "deployer": + conn = get_db() + task_row = conn.execute( + "SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?", + (repo, branch), + ).fetchone() + conn.close() + if task_row: + _tid, _wid = task_row + update_task_stage(_tid, "development") + notify_stage_change(_tid, "deploy", "development") + plane_notify_stage(_wid, "deploy", "development") + from ..plane_sync import set_issue_blocked + set_issue_blocked(_wid) + plane_add_comment( + _wid, + "\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430." + ) + from ..notifications import send_telegram + send_telegram(f"\U0001f6a8 {_wid}: Deploy failed! Rolled back. Needs fix.") + + # Notify on startup timeout (exit_code from kill = -9 or 137) + if exit_code != 0 and exit_code not in (None,): + conn = get_db() + task_row = conn.execute( + "SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?", + (repo, branch), + ).fetchone() + conn.close() + if task_row and agent != "deployer": # deployer handled above + _tid, _wid = task_row + from ..notifications import send_telegram + send_telegram(f"\u26a0\ufe0f {_wid}: Agent {agent} failed (exit_code={exit_code}). Check logs: /app/data/runs/{run_id}.log") + # Auto-advance stage if agent finished successfully and QG passes if exit_code == 0: self._try_advance_stage(run_id, agent, repo, branch) @@ -274,19 +380,50 @@ class AgentLauncher: # Run QG check if defined if qg_name and qg_name in QG_CHECKS: check_fn = QG_CHECKS[qg_name] - if qg_name in ("check_review_approved", "check_analysis_approved"): + if qg_name in ("check_analysis_approved",): # Requires human approval - post request comment if analyst just finished if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id: files_check = QG_CHECKS.get("check_analysis_complete") if files_check: files_ok, _ = files_check(repo, work_item_id) if files_ok: + # Full artifacts ready -> In Review + from ..plane_sync import set_issue_in_review + set_issue_in_review(work_item_id) plane_add_comment( work_item_id, - "📋 BRD/ТЗ/AC/TestPlan готовы. " - "Прошу review и реакцию :approved: для продвижения в Architecture." + "\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. " + "\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture." ) + notify_approve_requested(task_id) logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane") + else: + # Check if questions file exists + import os as _os + questions_path = _os.path.join( + settings.repos_dir, repo, + f"docs/work-items/{work_item_id}/01-questions.md" + ) + if _os.path.isfile(questions_path): + # Analyst has questions -> Needs Input + from ..plane_sync import set_issue_needs_input + set_issue_needs_input(work_item_id) + with open(questions_path, "r") as qf: + questions_text = qf.read() + plane_add_comment( + work_item_id, + f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}" + ) + from ..notifications import send_telegram + send_telegram( + f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane." + ) + else: + # No artifacts and no questions + plane_add_comment( + work_item_id, + "\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433." + ) return elif qg_name == "check_ci_green": passed, reason = check_fn(repo, branch) @@ -297,6 +434,91 @@ class AgentLauncher: if not passed: logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}") + # If reviewer says REQUEST_CHANGES, rollback to development + if agent == "reviewer" and "REQUEST_CHANGES" in reason: + update_task_stage(task_id, "development") + notify_stage_change(task_id, current_stage, "development") + plane_notify_stage(work_item_id, current_stage, "development") + # Count retries + conn2 = get_db() + retry_count = conn2.execute( + "SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'", + (task_id,) + ).fetchone()[0] + conn2.close() + if retry_count < 3: + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" + f"Stage: development\nNote: REQUEST_CHANGES from reviewer " + f"(attempt {retry_count+1}/3). Fix findings in " + f"docs/work-items/{work_item_id}/12-review.md" + ) + new_run = self.launch("developer", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, relaunched developer (run_id={new_run})") + else: + from ..notifications import send_telegram + send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.") + logger.error(f"Task {task_id}: max retries reached") + + # Task 6: Tester FAIL -> rollback to development + if agent == "tester" and qg_name == "check_tests_passed" and not passed: + update_task_stage(task_id, "development") + notify_stage_change(task_id, current_stage, "development") + plane_notify_stage(work_item_id, current_stage, "development") + from ..plane_sync import set_issue_in_progress + set_issue_in_progress(work_item_id) + plane_add_comment( + work_item_id, + f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430." + ) + conn2 = get_db() + retry_count = conn2.execute( + "SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'", + (task_id,) + ).fetchone()[0] + conn2.close() + if retry_count < 3: + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" + f"Stage: development\nNote: Tests FAILED. " + f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md" + ) + new_run = self.launch("developer", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})") + else: + from ..notifications import send_telegram + from ..plane_sync import set_issue_blocked + set_issue_blocked(work_item_id) + send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.") + + # Task 8: Architect conflict -> rollback to analysis + if agent == "architect" and qg_name == "check_architecture_done" and not passed: + import os as _os + conflict_path = _os.path.join( + settings.repos_dir, repo, + f"docs/work-items/{work_item_id}/10-conflict.md" + ) + if _os.path.isfile(conflict_path): + update_task_stage(task_id, "analysis") + notify_stage_change(task_id, current_stage, "analysis") + plane_notify_stage(work_item_id, current_stage, "analysis") + from ..plane_sync import set_issue_in_progress + set_issue_in_progress(work_item_id) + with open(conflict_path, "r") as cf: + conflict_text = cf.read()[:500] + plane_add_comment( + work_item_id, + f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}" + ) + task_desc = ( + f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n" + f"Stage: analysis\nNote: Architect conflict. Revise TRZ. " + f"See docs/work-items/{work_item_id}/10-conflict.md" + ) + new_run = self.launch("analyst", repo, task_desc, task_id=task_id) + logger.info(f"Task {task_id}: architect conflict, relaunched analyst") + return + return elif qg_name: return @@ -317,6 +539,79 @@ class AgentLauncher: except Exception as e: logger.error(f"Auto-advance failed for run_id={run_id}: {e}") + + def _ensure_pr(self, repo: str, branch: str, run_id: int): + import httpx + owner = settings.gitea_owner + headers = {"Authorization": f"token {settings.gitea_token}"} + base_url = f"{settings.gitea_url}/api/v1" + try: + resp = httpx.get( + f"{base_url}/repos/{owner}/{repo}/pulls", + params={"state": "open", "head": branch}, + headers=headers, timeout=10 + ) + resp.raise_for_status() + prs = resp.json() + if prs: + return prs[0]["number"] + parts = branch.split("/") + title = parts[-1] if parts else branch + resp = httpx.post( + f"{base_url}/repos/{owner}/{repo}/pulls", + json={"title": f"feat: {title}", "head": branch, "base": "main", + "body": f"Auto-created by orchestrator after developer run_id={run_id}"}, + headers=headers, timeout=10 + ) + resp.raise_for_status() + pr_number = resp.json()["number"] + logger.info(f"Created PR #{pr_number} for {branch}") + return pr_number + except Exception as e: + logger.error(f"Failed to create PR for {branch}: {e}") + return None + + def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str): + import httpx + owner = settings.gitea_owner + headers = {"Authorization": f"token {settings.gitea_token}"} + base_url = f"{settings.gitea_url}/api/v1" + try: + resp = httpx.get( + f"{base_url}/repos/{owner}/{repo}/pulls", + params={"state": "open", "head": branch}, + headers=headers, timeout=10 + ) + resp.raise_for_status() + prs = resp.json() + if not prs: + pr_number = self._ensure_pr(repo, branch, 0) + if not pr_number: + return False + else: + pr_number = prs[0]["number"] + resp = httpx.post( + f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge", + json={"Do": "merge"}, + headers=headers, timeout=30 + ) + if resp.status_code in (200, 204): + logger.info(f"PR #{pr_number} merged for {branch}") + update_task_stage(task_id, "done") + notify_stage_change(task_id, "deploy", "done") + plane_notify_stage(work_item_id, "deploy", "done") + from ..notifications import send_telegram + send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.") + return True + else: + logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}") + from ..notifications import send_telegram + send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.") + return False + except Exception as e: + logger.error(f"Auto-merge failed for {branch}: {e}") + return False + def _write_task_file(self, host_repo_path: str, task_file: str, content: str): """Write task file to host repo via docker run with stdin.""" full_path = os.path.join(host_repo_path, task_file) diff --git a/src/config.py b/src/config.py index ba9f40d..1d00d2c 100644 --- a/src/config.py +++ b/src/config.py @@ -24,6 +24,11 @@ class Settings(BaseSettings): # DB db_path: str = "/app/data/orchestrator.db" + + # Telegram notifications + telegram_bot_token: str = "" + telegram_chat_id: str = "" + class Config: env_prefix = "ORCH_" env_file = ".env" diff --git a/src/notifications.py b/src/notifications.py index 2b78fb6..ef885b3 100644 --- a/src/notifications.py +++ b/src/notifications.py @@ -1,28 +1,125 @@ """Notifications and logging for orchestrator events.""" import logging +import httpx logger = logging.getLogger("orchestrator") +# Lazy import to avoid circular imports at module level +_settings = None + + +def _get_settings(): + global _settings + if _settings is None: + from .config import settings + _settings = settings + return _settings + + +def send_telegram(text: str): + """Send notification to Telegram. Fire-and-forget, never raises.""" + s = _get_settings() + if not s.telegram_bot_token or not s.telegram_chat_id: + return + try: + url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage" + httpx.post( + url, + json={ + "chat_id": s.telegram_chat_id, + "text": text, + "parse_mode": "HTML", + "disable_notification": False, + }, + timeout=5, + ) + except Exception: + pass # Never crash orchestrator due to notification failure + + +def _get_work_item_id(task_id: int) -> str: + """Get work_item_id from DB by task_id.""" + try: + from .db import get_db + conn = get_db() + row = conn.execute("SELECT work_item_id FROM tasks WHERE id=?", (task_id,)).fetchone() + conn.close() + return row[0] if row and row[0] else f"task-{task_id}" + except Exception: + return f"task-{task_id}" + def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None): - """Log stage transition.""" - msg = f"Task {task_id}: {old_stage} → {new_stage}" + """Log and notify stage transition.""" + work_item_id = _get_work_item_id(task_id) + msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}" if agent: - msg += f" (launching {agent})" + msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})" logger.info(msg) + send_telegram(msg) + + +def notify_agent_started(run_id: int, agent: str, task_id: int): + """Notify agent launch.""" + work_item_id = _get_work_item_id(task_id) + msg = f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})" + logger.info(msg) + send_telegram(msg) + + +def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None): + """Notify agent completion.""" + work_item_id = _get_work_item_id(task_id) if task_id else "?" + if exit_code == 0: + dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else "" + msg = f"\u2705 {work_item_id}: {agent} \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b{dur}" + elif exit_code == -9: + msg = f"\u23f0 {work_item_id}: {agent} \u0443\u0431\u0438\u0442 \u043f\u043e \u0442\u0430\u0439\u043c\u0430\u0443\u0442\u0443 (30 \u043c\u0438\u043d)" + else: + msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})" + logger.info(msg) + send_telegram(msg) + + +def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None): + """Notify QG check result.""" + work_item_id = _get_work_item_id(task_id) + if passed: + msg = f"\u2705 {work_item_id}: QG {check} \u2014 passed" + else: + msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}" + logger.info(msg) + send_telegram(msg) def notify_qg_failure(task_id: int, stage: str, check: str, reason: str): - """Log QG check failure.""" - logger.warning(f"Task {task_id}: QG failed at stage '{stage}', check={check}: {reason}") + """Log and notify QG check failure.""" + work_item_id = _get_work_item_id(task_id) + msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}" + logger.warning(msg) + send_telegram(msg) -def notify_agent_finished(run_id: int, agent: str, exit_code: int): - """Log agent completion.""" - logger.info(f"Agent run {run_id} ({agent}) finished with exit code {exit_code}") +def notify_approve_requested(task_id: int): + """Notify that analyst requests :approved:.""" + work_item_id = _get_work_item_id(task_id) + msg = f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. \u0416\u0434\u0443 :approved: \u0432 Plane" + logger.info(msg) + send_telegram(msg) + + +def notify_done(task_id: int): + """Notify task completion.""" + work_item_id = _get_work_item_id(task_id) + msg = f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!" + logger.info(msg) + send_telegram(msg) def notify_error(task_id: int, error: str): - """Log error for a task.""" - logger.error(f"Task {task_id}: ERROR — {error}") + """Log and notify error for a task.""" + work_item_id = _get_work_item_id(task_id) if task_id else "system" + msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}" + logger.error(msg) + send_telegram(msg) diff --git a/src/plane_sync.py b/src/plane_sync.py index 4d1e5dc..6c44510 100644 --- a/src/plane_sync.py +++ b/src/plane_sync.py @@ -11,16 +11,28 @@ PLANE_HEADERS = {"X-API-Key": settings.plane_api_token} WORKSPACE = settings.plane_workspace_slug PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c" +# Plane state IDs +PLANE_STATES = { + "backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122", + "todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10", + "in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967", + "needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac", + "in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b", + "blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920", + "done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", + "cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17", +} + # Map orchestrator stages to Plane states STAGE_TO_STATE = { - "created": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10", # Todo - "analysis": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress - "architecture": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress - "development": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress - "review": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress - "testing": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress - "deploy": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress - "done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", # Done + "created": PLANE_STATES["todo"], + "analysis": PLANE_STATES["in_progress"], + "architecture": PLANE_STATES["in_progress"], + "development": PLANE_STATES["in_progress"], + "review": PLANE_STATES["in_progress"], + "testing": PLANE_STATES["in_progress"], + "deploy": PLANE_STATES["in_progress"], + "done": PLANE_STATES["done"], } @@ -108,13 +120,79 @@ def add_comment(work_item_id: str, text: str): logger.error(f"Failed to add comment to {work_item_id}: {e}") + +def set_issue_needs_input(work_item_id: str): + """Set issue to 'Needs Input' state — waiting for stakeholder response.""" + _set_issue_state_direct(work_item_id, PLANE_STATES["needs_input"]) + + +def set_issue_in_review(work_item_id: str): + """Set issue to 'In Review' state — waiting for :approved: or :rejected:.""" + _set_issue_state_direct(work_item_id, PLANE_STATES["in_review"]) + + +def set_issue_blocked(work_item_id: str): + """Set issue to 'Blocked' state — manual intervention needed.""" + _set_issue_state_direct(work_item_id, PLANE_STATES["blocked"]) + + +def set_issue_in_progress(work_item_id: str): + """Set issue to 'In Progress' state — agent working.""" + _set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"]) + + +def _set_issue_state_direct(work_item_id: str, state_id: str): + """Set issue state directly by state_id.""" + issue_id = find_issue_id(work_item_id) + if not issue_id: + logger.warning(f"Issue not found in Plane for {work_item_id}") + return + url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/" + try: + resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10) + resp.raise_for_status() + logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...") + except Exception as e: + logger.error(f"Failed to update Plane state for {work_item_id}: {e}") + + def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None): - """Notify Plane about stage transition.""" + """Notify Plane about stage transition with links.""" update_issue_state(work_item_id, new_stage) msg = f"🔄 Stage: {old_stage} → {new_stage}" if agent: msg += f" (launching {agent})" + + # Add relevant links + gitea_base = "http://git.mva154.duckdns.org" + try: + from .db import get_db + conn = get_db() + row = conn.execute( + "SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,) + ).fetchone() + conn.close() + if row: + branch, repo = row + msg += chr(10) + "📂 Branch: [" + branch + "](" + gitea_base + "/admin/" + repo + "/src/branch/" + branch + ")" + if new_stage in ("review", "testing", "deploy"): + import httpx as _httpx + from .config import settings + _headers = {"Authorization": f"token {settings.gitea_token}"} + _resp = _httpx.get( + f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls", + params={"state": "open", "head": branch}, + headers=_headers, timeout=5 + ) + if _resp.status_code == 200: + _prs = _resp.json() + if _prs: + pr_num = _prs[0]["number"] + msg += chr(10) + "🔗 PR: [#" + str(pr_num) + "](" + gitea_base + "/admin/" + repo + "/pulls/" + str(pr_num) + ")" + except Exception: + pass + add_comment(work_item_id, msg) diff --git a/src/qg/checks.py b/src/qg/checks.py index 5ece365..e2a6b0a 100644 --- a/src/qg/checks.py +++ b/src/qg/checks.py @@ -185,6 +185,35 @@ def check_analysis_approved(repo: str, work_item_id: str) -> tuple[bool, str]: return True, f"Files present; Plane API check skipped ({e})" + + +def check_reviewer_verdict(repo: str, work_item_id: str) -> tuple[bool, str]: + """ + Check reviewer agent verdict from 12-review.md. + Returns (True, reason) if APPROVED, (False, reason) if REQUEST_CHANGES or missing. + """ + repo_path = os.path.join(settings.repos_dir, repo) + review_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/12-review.md") + + if not os.path.isfile(review_path): + return False, "Review report not found (12-review.md)" + + try: + with open(review_path, "r") as f: + content = f.read(5000) + + content_upper = content.upper() + if "REQUEST_CHANGES" in content_upper: + return False, "Reviewer verdict: REQUEST_CHANGES" + if "APPROVED" in content_upper: + return True, "Reviewer verdict: APPROVED" + if "LGTM" in content_upper or "SHIP IT" in content_upper: + return True, "Reviewer verdict: LGTM" + return False, "Review exists but no clear APPROVED/REQUEST_CHANGES verdict" + except OSError as e: + return False, f"Error reading review: {e}" + + # Registry for dynamic lookup by name QG_CHECKS = { "check_analysis_approved": check_analysis_approved, @@ -193,4 +222,5 @@ QG_CHECKS = { "check_ci_green": check_ci_green, "check_review_approved": check_review_approved, "check_tests_passed": check_tests_passed, + "check_reviewer_verdict": check_reviewer_verdict, } diff --git a/src/stages.py b/src/stages.py index d3c8383..a27cd5e 100644 --- a/src/stages.py +++ b/src/stages.py @@ -14,8 +14,8 @@ STAGE_TRANSITIONS = { "analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"}, "architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"}, "development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"}, - "review": {"next": "testing", "agent": "tester", "qg": "check_review_approved"}, - "testing": {"next": "deploy", "agent": None, "qg": "check_tests_passed"}, + "review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"}, + "testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"}, "deploy": {"next": "done", "agent": None, "qg": None}, "done": {"next": None, "agent": None, "qg": None}, }