chore: save WIP changes before audit fixes
- notifications: Telegram integration, richer stage/agent/QG notifications - plane_sync: explicit Plane state IDs, needs_input/in_review/blocked helpers, links in comments - launcher: deployer stage, model flag (opus), PR auto-create, REQUEST_CHANGES/tester/architect rollback+retry logic, partial check_reviewer_verdict path - qg/checks: add check_reviewer_verdict (substring-based, will be hardened in S-5) - stages: review->check_reviewer_verdict, testing->deployer agent - config: telegram_bot_token/chat_id settings
This commit is contained in:
@@ -7,7 +7,7 @@ from ..config import settings
|
||||
from ..db import get_db, get_task_by_repo_branch, update_task_stage
|
||||
from ..stages import get_next_stage, get_qg_for_stage, get_agent_for_stage
|
||||
from ..qg.checks import QG_CHECKS
|
||||
from ..notifications import notify_stage_change, notify_qg_failure
|
||||
from ..notifications import notify_stage_change, notify_qg_failure, notify_agent_started, notify_agent_finished, notify_approve_requested
|
||||
from ..plane_sync import notify_stage_change as plane_notify_stage, add_comment as plane_add_comment
|
||||
|
||||
logger = logging.getLogger("orchestrator.launcher")
|
||||
@@ -26,6 +26,7 @@ class AgentLauncher:
|
||||
"system_prompt": ".openclaw/agents/architect.md",
|
||||
"task_file": ".task-arch.md",
|
||||
"allowed_tools": "Read,Write,Edit,Bash",
|
||||
"model": "opus",
|
||||
},
|
||||
"developer": {
|
||||
"system_prompt": ".openclaw/agents/developer.md",
|
||||
@@ -36,12 +37,18 @@ class AgentLauncher:
|
||||
"system_prompt": ".openclaw/agents/reviewer.md",
|
||||
"task_file": ".task-review.md",
|
||||
"allowed_tools": "Read,Write,Edit,Bash",
|
||||
"model": "opus",
|
||||
},
|
||||
"tester": {
|
||||
"system_prompt": ".openclaw/agents/tester.md",
|
||||
"task_file": ".task-test.md",
|
||||
"allowed_tools": "Read,Write,Edit,Bash",
|
||||
},
|
||||
"deployer": {
|
||||
"task_file": ".task-deploy.md",
|
||||
"system_prompt": ".openclaw/agents/deployer.md",
|
||||
"allowed_tools": "Read,Write,Edit,Bash",
|
||||
},
|
||||
}
|
||||
|
||||
CLAUDE_BIN = "/opt/claude-code/bin/claude.exe"
|
||||
@@ -98,9 +105,13 @@ class AgentLauncher:
|
||||
_br_row = get_db().execute("SELECT branch FROM tasks WHERE id=?", (task_id,)).fetchone() if task_id else None
|
||||
agent_branch = _br_row[0] if _br_row else "main"
|
||||
|
||||
model = config.get("model", "")
|
||||
model_flag = f"--model {model} " if model else ""
|
||||
|
||||
cmd = (
|
||||
f'cd {local_repo_path} && git fetch origin 2>/dev/null; git checkout {agent_branch} 2>/dev/null || git checkout -b {agent_branch} origin/{agent_branch} 2>/dev/null; '
|
||||
f'{self.CLAUDE_BIN} --print '
|
||||
f'{model_flag}'
|
||||
f'"$(cat {task_file})" '
|
||||
f'--system-prompt "$(cat {system_prompt})" '
|
||||
f'--allowedTools {allowed_tools}'
|
||||
@@ -109,12 +120,11 @@ class AgentLauncher:
|
||||
logger.info(f"Launching agent '{agent}' for repo '{repo}', run_id={run_id}")
|
||||
|
||||
# Launch as background process
|
||||
with open(output_path, "w") as log_file:
|
||||
proc = subprocess.Popen(
|
||||
["bash", "-c", cmd],
|
||||
stdout=log_file,
|
||||
stderr=subprocess.STDOUT,
|
||||
env={
|
||||
proc = subprocess.Popen(
|
||||
["bash", "-c", cmd],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
env={
|
||||
**os.environ,
|
||||
"HOME": "/home/slin",
|
||||
"GIT_AUTHOR_NAME": "claude-bot",
|
||||
@@ -144,12 +154,13 @@ class AgentLauncher:
|
||||
# agent_branch already computed above
|
||||
m = threading.Thread(
|
||||
target=self._monitor_agent,
|
||||
args=(proc, run_id, agent, repo, agent_branch),
|
||||
args=(proc, run_id, agent, repo, agent_branch, output_path),
|
||||
daemon=True,
|
||||
)
|
||||
m.start()
|
||||
|
||||
logger.info(f"Agent '{agent}' launched, pid={proc.pid}, run_id={run_id}")
|
||||
notify_agent_started(run_id, agent, task_id)
|
||||
return run_id
|
||||
|
||||
def _watchdog(self, pid: int, run_id: int, timeout: int = None):
|
||||
@@ -171,9 +182,60 @@ class AgentLauncher:
|
||||
except ProcessLookupError:
|
||||
pass # Already finished
|
||||
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch):
|
||||
def _monitor_agent(self, proc, run_id, agent, repo, branch, output_path=None):
|
||||
"""Wait for agent to finish, commit+push results, update DB."""
|
||||
import time as _time
|
||||
_start_ts = _time.time()
|
||||
|
||||
# Stream stdout PIPE to log file with startup timeout
|
||||
if output_path and proc.stdout:
|
||||
try:
|
||||
with open(output_path, "w") as log_file:
|
||||
_got_real_output = False
|
||||
_startup_timeout = 120 # seconds
|
||||
|
||||
import select
|
||||
while True:
|
||||
# Check if process has finished
|
||||
if proc.poll() is not None:
|
||||
# Read remaining output
|
||||
remaining = proc.stdout.read()
|
||||
if remaining:
|
||||
log_file.write(remaining.decode("utf-8", errors="replace"))
|
||||
log_file.flush()
|
||||
break
|
||||
|
||||
# Use select to wait for output with timeout
|
||||
ready, _, _ = select.select([proc.stdout], [], [], 10)
|
||||
if ready:
|
||||
line = proc.stdout.readline()
|
||||
if not line:
|
||||
break # EOF
|
||||
decoded = line.decode("utf-8", errors="replace")
|
||||
log_file.write(decoded)
|
||||
log_file.flush()
|
||||
|
||||
# Check if this is real output (not just git checkout noise)
|
||||
stripped = decoded.strip()
|
||||
if stripped and not stripped.startswith(("M\t", "Your branch", "Already on", "Switched to")):
|
||||
_got_real_output = True
|
||||
else:
|
||||
# Timeout on select - check startup timeout
|
||||
if not _got_real_output and (_time.time() - _start_ts) > _startup_timeout:
|
||||
logger.error(f"Agent run_id={run_id} ({agent}): no output after {_startup_timeout}s, killing")
|
||||
try:
|
||||
proc.kill()
|
||||
except Exception:
|
||||
pass
|
||||
log_file.write(f"\n[TIMEOUT] No output after {_startup_timeout}s - process killed\n")
|
||||
break
|
||||
|
||||
proc.stdout.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Agent run_id={run_id}: log streaming error: {e}")
|
||||
|
||||
exit_code = proc.wait()
|
||||
_duration_s = int(_time.time() - _start_ts)
|
||||
logger.info(f"Agent run_id={run_id} ({agent}) finished with exit_code={exit_code}")
|
||||
|
||||
# Update DB
|
||||
@@ -183,8 +245,14 @@ class AgentLauncher:
|
||||
(exit_code, run_id),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Get task_id for notification
|
||||
_row = conn.execute("SELECT task_id FROM agent_runs WHERE id=?", (run_id,)).fetchone()
|
||||
_task_id = _row[0] if _row else None
|
||||
conn.close()
|
||||
|
||||
notify_agent_finished(run_id, agent, exit_code, task_id=_task_id, duration_s=_duration_s)
|
||||
|
||||
# Commit and push any changes
|
||||
repo_path = os.path.join(settings.repos_dir, repo)
|
||||
try:
|
||||
@@ -239,6 +307,9 @@ class AgentLauncher:
|
||||
)
|
||||
if push_result.returncode == 0:
|
||||
logger.info(f"Agent run_id={run_id}: committed and pushed to {branch}")
|
||||
# Auto-create PR after developer pushes
|
||||
if agent == "developer":
|
||||
self._ensure_pr(repo, branch, run_id)
|
||||
else:
|
||||
logger.error(f"Agent run_id={run_id}: push failed: {push_result.stderr}")
|
||||
else:
|
||||
@@ -248,6 +319,41 @@ class AgentLauncher:
|
||||
except Exception as e:
|
||||
logger.error(f"Agent run_id={run_id}: post-run git failed: {e}")
|
||||
|
||||
# Handle deployer failure (smoke/healthcheck failed) — Task 7
|
||||
if exit_code != 0 and agent == "deployer":
|
||||
conn = get_db()
|
||||
task_row = conn.execute(
|
||||
"SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?",
|
||||
(repo, branch),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if task_row:
|
||||
_tid, _wid = task_row
|
||||
update_task_stage(_tid, "development")
|
||||
notify_stage_change(_tid, "deploy", "development")
|
||||
plane_notify_stage(_wid, "deploy", "development")
|
||||
from ..plane_sync import set_issue_blocked
|
||||
set_issue_blocked(_wid)
|
||||
plane_add_comment(
|
||||
_wid,
|
||||
"\u274c Deploy FAILED (smoke/healthcheck). Rolled back. Developer \u043d\u0443\u0436\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
|
||||
)
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\U0001f6a8 {_wid}: Deploy failed! Rolled back. Needs fix.")
|
||||
|
||||
# Notify on startup timeout (exit_code from kill = -9 or 137)
|
||||
if exit_code != 0 and exit_code not in (None,):
|
||||
conn = get_db()
|
||||
task_row = conn.execute(
|
||||
"SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?",
|
||||
(repo, branch),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if task_row and agent != "deployer": # deployer handled above
|
||||
_tid, _wid = task_row
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u26a0\ufe0f {_wid}: Agent {agent} failed (exit_code={exit_code}). Check logs: /app/data/runs/{run_id}.log")
|
||||
|
||||
# Auto-advance stage if agent finished successfully and QG passes
|
||||
if exit_code == 0:
|
||||
self._try_advance_stage(run_id, agent, repo, branch)
|
||||
@@ -274,19 +380,50 @@ class AgentLauncher:
|
||||
# Run QG check if defined
|
||||
if qg_name and qg_name in QG_CHECKS:
|
||||
check_fn = QG_CHECKS[qg_name]
|
||||
if qg_name in ("check_review_approved", "check_analysis_approved"):
|
||||
if qg_name in ("check_analysis_approved",):
|
||||
# Requires human approval - post request comment if analyst just finished
|
||||
if agent == "analyst" and qg_name == "check_analysis_approved" and work_item_id:
|
||||
files_check = QG_CHECKS.get("check_analysis_complete")
|
||||
if files_check:
|
||||
files_ok, _ = files_check(repo, work_item_id)
|
||||
if files_ok:
|
||||
# Full artifacts ready -> In Review
|
||||
from ..plane_sync import set_issue_in_review
|
||||
set_issue_in_review(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"📋 BRD/ТЗ/AC/TestPlan готовы. "
|
||||
"Прошу review и реакцию :approved: для продвижения в Architecture."
|
||||
"\U0001f4cb BRD/\u0422\u0417/AC/TestPlan \u0433\u043e\u0442\u043e\u0432\u044b. "
|
||||
"\u041f\u0440\u043e\u0448\u0443 review \u0438 \u0440\u0435\u0430\u043a\u0446\u0438\u044e :approved: \u0434\u043b\u044f \u043f\u0440\u043e\u0434\u0432\u0438\u0436\u0435\u043d\u0438\u044f \u0432 Architecture."
|
||||
)
|
||||
notify_approve_requested(task_id)
|
||||
logger.info(f"Task {task_id}: analyst finished, requested :approved: in Plane")
|
||||
else:
|
||||
# Check if questions file exists
|
||||
import os as _os
|
||||
questions_path = _os.path.join(
|
||||
settings.repos_dir, repo,
|
||||
f"docs/work-items/{work_item_id}/01-questions.md"
|
||||
)
|
||||
if _os.path.isfile(questions_path):
|
||||
# Analyst has questions -> Needs Input
|
||||
from ..plane_sync import set_issue_needs_input
|
||||
set_issue_needs_input(work_item_id)
|
||||
with open(questions_path, "r") as qf:
|
||||
questions_text = qf.read()
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u2753 Analyst \u043d\u0443\u0436\u0434\u0430\u0435\u0442\u0441\u044f \u0432 \u0443\u0442\u043e\u0447\u043d\u0435\u043d\u0438\u0438:\n\n{questions_text}"
|
||||
)
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(
|
||||
f"\u2753 {work_item_id}: Analyst \u0437\u0430\u0434\u0430\u0451\u0442 \u0432\u043e\u043f\u0440\u043e\u0441\u044b. \u041e\u0442\u0432\u0435\u0442\u044c \u0432 Plane."
|
||||
)
|
||||
else:
|
||||
# No artifacts and no questions
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
"\u26a0\ufe0f Analyst \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b\u0441\u044f \u0431\u0435\u0437 \u0430\u0440\u0442\u0435\u0444\u0430\u043a\u0442\u043e\u0432 \u0438 \u0431\u0435\u0437 \u0432\u043e\u043f\u0440\u043e\u0441\u043e\u0432. \u041f\u0440\u043e\u0432\u0435\u0440\u044c\u0442\u0435 \u043b\u043e\u0433."
|
||||
)
|
||||
return
|
||||
elif qg_name == "check_ci_green":
|
||||
passed, reason = check_fn(repo, branch)
|
||||
@@ -297,6 +434,91 @@ class AgentLauncher:
|
||||
|
||||
if not passed:
|
||||
logger.info(f"Task {task_id}: QG '{qg_name}' not passed after {agent}: {reason}")
|
||||
# If reviewer says REQUEST_CHANGES, rollback to development
|
||||
if agent == "reviewer" and "REQUEST_CHANGES" in reason:
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
# Count retries
|
||||
conn2 = get_db()
|
||||
retry_count = conn2.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
(task_id,)
|
||||
).fetchone()[0]
|
||||
conn2.close()
|
||||
if retry_count < 3:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: REQUEST_CHANGES from reviewer "
|
||||
f"(attempt {retry_count+1}/3). Fix findings in "
|
||||
f"docs/work-items/{work_item_id}/12-review.md"
|
||||
)
|
||||
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: reviewer REQUEST_CHANGES, relaunched developer (run_id={new_run})")
|
||||
else:
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Max developer retries (3) reached. Manual intervention needed.")
|
||||
logger.error(f"Task {task_id}: max retries reached")
|
||||
|
||||
# Task 6: Tester FAIL -> rollback to development
|
||||
if agent == "tester" and qg_name == "check_tests_passed" and not passed:
|
||||
update_task_stage(task_id, "development")
|
||||
notify_stage_change(task_id, current_stage, "development")
|
||||
plane_notify_stage(work_item_id, current_stage, "development")
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u274c \u0422\u0435\u0441\u0442\u044b \u043d\u0435 \u043f\u0440\u043e\u0448\u043b\u0438: {reason}. Developer \u043f\u0435\u0440\u0435\u0437\u0430\u043f\u0443\u0449\u0435\u043d \u0434\u043b\u044f \u0444\u0438\u043a\u0441\u0430."
|
||||
)
|
||||
conn2 = get_db()
|
||||
retry_count = conn2.execute(
|
||||
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
|
||||
(task_id,)
|
||||
).fetchone()[0]
|
||||
conn2.close()
|
||||
if retry_count < 3:
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: development\nNote: Tests FAILED. "
|
||||
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
|
||||
)
|
||||
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})")
|
||||
else:
|
||||
from ..notifications import send_telegram
|
||||
from ..plane_sync import set_issue_blocked
|
||||
set_issue_blocked(work_item_id)
|
||||
send_telegram(f"\U0001f6a8 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
|
||||
|
||||
# Task 8: Architect conflict -> rollback to analysis
|
||||
if agent == "architect" and qg_name == "check_architecture_done" and not passed:
|
||||
import os as _os
|
||||
conflict_path = _os.path.join(
|
||||
settings.repos_dir, repo,
|
||||
f"docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
if _os.path.isfile(conflict_path):
|
||||
update_task_stage(task_id, "analysis")
|
||||
notify_stage_change(task_id, current_stage, "analysis")
|
||||
plane_notify_stage(work_item_id, current_stage, "analysis")
|
||||
from ..plane_sync import set_issue_in_progress
|
||||
set_issue_in_progress(work_item_id)
|
||||
with open(conflict_path, "r") as cf:
|
||||
conflict_text = cf.read()[:500]
|
||||
plane_add_comment(
|
||||
work_item_id,
|
||||
f"\u26a0\ufe0f Architect \u043d\u0430\u0448\u0451\u043b \u043a\u043e\u043d\u0444\u043b\u0438\u043a\u0442 \u0441 \u0422\u0417. \u0412\u043e\u0437\u0432\u0440\u0430\u0442 \u0432 Analysis.\n\n{conflict_text}"
|
||||
)
|
||||
task_desc = (
|
||||
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
|
||||
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
|
||||
f"See docs/work-items/{work_item_id}/10-conflict.md"
|
||||
)
|
||||
new_run = self.launch("analyst", repo, task_desc, task_id=task_id)
|
||||
logger.info(f"Task {task_id}: architect conflict, relaunched analyst")
|
||||
return
|
||||
|
||||
return
|
||||
elif qg_name:
|
||||
return
|
||||
@@ -317,6 +539,79 @@ class AgentLauncher:
|
||||
except Exception as e:
|
||||
logger.error(f"Auto-advance failed for run_id={run_id}: {e}")
|
||||
|
||||
|
||||
def _ensure_pr(self, repo: str, branch: str, run_id: int):
|
||||
import httpx
|
||||
owner = settings.gitea_owner
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
try:
|
||||
resp = httpx.get(
|
||||
f"{base_url}/repos/{owner}/{repo}/pulls",
|
||||
params={"state": "open", "head": branch},
|
||||
headers=headers, timeout=10
|
||||
)
|
||||
resp.raise_for_status()
|
||||
prs = resp.json()
|
||||
if prs:
|
||||
return prs[0]["number"]
|
||||
parts = branch.split("/")
|
||||
title = parts[-1] if parts else branch
|
||||
resp = httpx.post(
|
||||
f"{base_url}/repos/{owner}/{repo}/pulls",
|
||||
json={"title": f"feat: {title}", "head": branch, "base": "main",
|
||||
"body": f"Auto-created by orchestrator after developer run_id={run_id}"},
|
||||
headers=headers, timeout=10
|
||||
)
|
||||
resp.raise_for_status()
|
||||
pr_number = resp.json()["number"]
|
||||
logger.info(f"Created PR #{pr_number} for {branch}")
|
||||
return pr_number
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create PR for {branch}: {e}")
|
||||
return None
|
||||
|
||||
def _auto_merge_pr(self, repo: str, branch: str, task_id: int, work_item_id: str):
|
||||
import httpx
|
||||
owner = settings.gitea_owner
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
try:
|
||||
resp = httpx.get(
|
||||
f"{base_url}/repos/{owner}/{repo}/pulls",
|
||||
params={"state": "open", "head": branch},
|
||||
headers=headers, timeout=10
|
||||
)
|
||||
resp.raise_for_status()
|
||||
prs = resp.json()
|
||||
if not prs:
|
||||
pr_number = self._ensure_pr(repo, branch, 0)
|
||||
if not pr_number:
|
||||
return False
|
||||
else:
|
||||
pr_number = prs[0]["number"]
|
||||
resp = httpx.post(
|
||||
f"{base_url}/repos/{owner}/{repo}/pulls/{pr_number}/merge",
|
||||
json={"Do": "merge"},
|
||||
headers=headers, timeout=30
|
||||
)
|
||||
if resp.status_code in (200, 204):
|
||||
logger.info(f"PR #{pr_number} merged for {branch}")
|
||||
update_task_stage(task_id, "done")
|
||||
notify_stage_change(task_id, "deploy", "done")
|
||||
plane_notify_stage(work_item_id, "deploy", "done")
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u2705 {work_item_id}: PR #{pr_number} merged! deploy -> done. Task complete.")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Merge failed for PR #{pr_number}: {resp.status_code} {resp.text}")
|
||||
from ..notifications import send_telegram
|
||||
send_telegram(f"\u26a0\ufe0f {work_item_id}: Auto-merge failed (HTTP {resp.status_code}). Manual merge needed.")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Auto-merge failed for {branch}: {e}")
|
||||
return False
|
||||
|
||||
def _write_task_file(self, host_repo_path: str, task_file: str, content: str):
|
||||
"""Write task file to host repo via docker run with stdin."""
|
||||
full_path = os.path.join(host_repo_path, task_file)
|
||||
|
||||
@@ -24,6 +24,11 @@ class Settings(BaseSettings):
|
||||
# DB
|
||||
db_path: str = "/app/data/orchestrator.db"
|
||||
|
||||
|
||||
# Telegram notifications
|
||||
telegram_bot_token: str = ""
|
||||
telegram_chat_id: str = ""
|
||||
|
||||
class Config:
|
||||
env_prefix = "ORCH_"
|
||||
env_file = ".env"
|
||||
|
||||
@@ -1,28 +1,125 @@
|
||||
"""Notifications and logging for orchestrator events."""
|
||||
|
||||
import logging
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger("orchestrator")
|
||||
|
||||
# Lazy import to avoid circular imports at module level
|
||||
_settings = None
|
||||
|
||||
|
||||
def _get_settings():
|
||||
global _settings
|
||||
if _settings is None:
|
||||
from .config import settings
|
||||
_settings = settings
|
||||
return _settings
|
||||
|
||||
|
||||
def send_telegram(text: str):
|
||||
"""Send notification to Telegram. Fire-and-forget, never raises."""
|
||||
s = _get_settings()
|
||||
if not s.telegram_bot_token or not s.telegram_chat_id:
|
||||
return
|
||||
try:
|
||||
url = f"https://api.telegram.org/bot{s.telegram_bot_token}/sendMessage"
|
||||
httpx.post(
|
||||
url,
|
||||
json={
|
||||
"chat_id": s.telegram_chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "HTML",
|
||||
"disable_notification": False,
|
||||
},
|
||||
timeout=5,
|
||||
)
|
||||
except Exception:
|
||||
pass # Never crash orchestrator due to notification failure
|
||||
|
||||
|
||||
def _get_work_item_id(task_id: int) -> str:
|
||||
"""Get work_item_id from DB by task_id."""
|
||||
try:
|
||||
from .db import get_db
|
||||
conn = get_db()
|
||||
row = conn.execute("SELECT work_item_id FROM tasks WHERE id=?", (task_id,)).fetchone()
|
||||
conn.close()
|
||||
return row[0] if row and row[0] else f"task-{task_id}"
|
||||
except Exception:
|
||||
return f"task-{task_id}"
|
||||
|
||||
|
||||
def notify_stage_change(task_id: int, old_stage: str, new_stage: str, agent: str = None):
|
||||
"""Log stage transition."""
|
||||
msg = f"Task {task_id}: {old_stage} → {new_stage}"
|
||||
"""Log and notify stage transition."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f504 {work_item_id}: {old_stage} \u2192 {new_stage}"
|
||||
if agent:
|
||||
msg += f" (launching {agent})"
|
||||
msg += f" (\u0437\u0430\u043f\u0443\u0449\u0435\u043d {agent})"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
|
||||
|
||||
def notify_agent_started(run_id: int, agent: str, task_id: int):
|
||||
"""Notify agent launch."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f680 {work_item_id}: {agent} \u0437\u0430\u043f\u0443\u0449\u0435\u043d (run_id={run_id})"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
|
||||
|
||||
def notify_agent_finished(run_id: int, agent: str, exit_code: int, task_id: int = None, duration_s: int = None):
|
||||
"""Notify agent completion."""
|
||||
work_item_id = _get_work_item_id(task_id) if task_id else "?"
|
||||
if exit_code == 0:
|
||||
dur = f" ({duration_s // 60} \u043c\u0438\u043d)" if duration_s else ""
|
||||
msg = f"\u2705 {work_item_id}: {agent} \u0437\u0430\u0432\u0435\u0440\u0448\u0438\u043b{dur}"
|
||||
elif exit_code == -9:
|
||||
msg = f"\u23f0 {work_item_id}: {agent} \u0443\u0431\u0438\u0442 \u043f\u043e \u0442\u0430\u0439\u043c\u0430\u0443\u0442\u0443 (30 \u043c\u0438\u043d)"
|
||||
else:
|
||||
msg = f"\u274c {work_item_id}: {agent} \u0443\u043f\u0430\u043b (exit_code={exit_code})"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
|
||||
|
||||
def notify_qg_result(task_id: int, check: str, passed: bool, reason: str = None):
|
||||
"""Notify QG check result."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
if passed:
|
||||
msg = f"\u2705 {work_item_id}: QG {check} \u2014 passed"
|
||||
else:
|
||||
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
|
||||
|
||||
def notify_qg_failure(task_id: int, stage: str, check: str, reason: str):
|
||||
"""Log QG check failure."""
|
||||
logger.warning(f"Task {task_id}: QG failed at stage '{stage}', check={check}: {reason}")
|
||||
"""Log and notify QG check failure."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\u26a0\ufe0f {work_item_id}: QG {check} \u2014 failed: {reason}"
|
||||
logger.warning(msg)
|
||||
send_telegram(msg)
|
||||
|
||||
|
||||
def notify_agent_finished(run_id: int, agent: str, exit_code: int):
|
||||
"""Log agent completion."""
|
||||
logger.info(f"Agent run {run_id} ({agent}) finished with exit code {exit_code}")
|
||||
def notify_approve_requested(task_id: int):
|
||||
"""Notify that analyst requests :approved:."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f4cb {work_item_id}: BRD/\u0422\u0417/AC \u0433\u043e\u0442\u043e\u0432\u044b. \u0416\u0434\u0443 :approved: \u0432 Plane"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
|
||||
|
||||
def notify_done(task_id: int):
|
||||
"""Notify task completion."""
|
||||
work_item_id = _get_work_item_id(task_id)
|
||||
msg = f"\U0001f389 {work_item_id}: \u0437\u0430\u0434\u0430\u0447\u0430 \u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d\u0430!"
|
||||
logger.info(msg)
|
||||
send_telegram(msg)
|
||||
|
||||
|
||||
def notify_error(task_id: int, error: str):
|
||||
"""Log error for a task."""
|
||||
logger.error(f"Task {task_id}: ERROR — {error}")
|
||||
"""Log and notify error for a task."""
|
||||
work_item_id = _get_work_item_id(task_id) if task_id else "system"
|
||||
msg = f"\U0001f534 {work_item_id}: ERROR \u2014 {error}"
|
||||
logger.error(msg)
|
||||
send_telegram(msg)
|
||||
|
||||
@@ -11,16 +11,28 @@ PLANE_HEADERS = {"X-API-Key": settings.plane_api_token}
|
||||
WORKSPACE = settings.plane_workspace_slug
|
||||
PROJECT_ID = settings.plane_project_id or "7a79f0a9-5278-49cd-9007-9a338f238f9c"
|
||||
|
||||
# Plane state IDs
|
||||
PLANE_STATES = {
|
||||
"backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
|
||||
"todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10",
|
||||
"in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967",
|
||||
"needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac",
|
||||
"in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b",
|
||||
"blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920",
|
||||
"done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8",
|
||||
"cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17",
|
||||
}
|
||||
|
||||
# Map orchestrator stages to Plane states
|
||||
STAGE_TO_STATE = {
|
||||
"created": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10", # Todo
|
||||
"analysis": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress
|
||||
"architecture": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress
|
||||
"development": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress
|
||||
"review": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress
|
||||
"testing": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress
|
||||
"deploy": "b873d9eb-993c-48cd-97ac-99a9b1623967", # In Progress
|
||||
"done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8", # Done
|
||||
"created": PLANE_STATES["todo"],
|
||||
"analysis": PLANE_STATES["in_progress"],
|
||||
"architecture": PLANE_STATES["in_progress"],
|
||||
"development": PLANE_STATES["in_progress"],
|
||||
"review": PLANE_STATES["in_progress"],
|
||||
"testing": PLANE_STATES["in_progress"],
|
||||
"deploy": PLANE_STATES["in_progress"],
|
||||
"done": PLANE_STATES["done"],
|
||||
}
|
||||
|
||||
|
||||
@@ -108,13 +120,79 @@ def add_comment(work_item_id: str, text: str):
|
||||
logger.error(f"Failed to add comment to {work_item_id}: {e}")
|
||||
|
||||
|
||||
|
||||
def set_issue_needs_input(work_item_id: str):
|
||||
"""Set issue to 'Needs Input' state — waiting for stakeholder response."""
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["needs_input"])
|
||||
|
||||
|
||||
def set_issue_in_review(work_item_id: str):
|
||||
"""Set issue to 'In Review' state — waiting for :approved: or :rejected:."""
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["in_review"])
|
||||
|
||||
|
||||
def set_issue_blocked(work_item_id: str):
|
||||
"""Set issue to 'Blocked' state — manual intervention needed."""
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["blocked"])
|
||||
|
||||
|
||||
def set_issue_in_progress(work_item_id: str):
|
||||
"""Set issue to 'In Progress' state — agent working."""
|
||||
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"])
|
||||
|
||||
|
||||
def _set_issue_state_direct(work_item_id: str, state_id: str):
|
||||
"""Set issue state directly by state_id."""
|
||||
issue_id = find_issue_id(work_item_id)
|
||||
if not issue_id:
|
||||
logger.warning(f"Issue not found in Plane for {work_item_id}")
|
||||
return
|
||||
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/"
|
||||
try:
|
||||
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
|
||||
resp.raise_for_status()
|
||||
logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
|
||||
|
||||
|
||||
def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None):
|
||||
"""Notify Plane about stage transition."""
|
||||
"""Notify Plane about stage transition with links."""
|
||||
update_issue_state(work_item_id, new_stage)
|
||||
|
||||
msg = f"🔄 Stage: {old_stage} → {new_stage}"
|
||||
if agent:
|
||||
msg += f" (launching {agent})"
|
||||
|
||||
# Add relevant links
|
||||
gitea_base = "http://git.mva154.duckdns.org"
|
||||
try:
|
||||
from .db import get_db
|
||||
conn = get_db()
|
||||
row = conn.execute(
|
||||
"SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if row:
|
||||
branch, repo = row
|
||||
msg += chr(10) + "📂 Branch: [" + branch + "](" + gitea_base + "/admin/" + repo + "/src/branch/" + branch + ")"
|
||||
if new_stage in ("review", "testing", "deploy"):
|
||||
import httpx as _httpx
|
||||
from .config import settings
|
||||
_headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
_resp = _httpx.get(
|
||||
f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls",
|
||||
params={"state": "open", "head": branch},
|
||||
headers=_headers, timeout=5
|
||||
)
|
||||
if _resp.status_code == 200:
|
||||
_prs = _resp.json()
|
||||
if _prs:
|
||||
pr_num = _prs[0]["number"]
|
||||
msg += chr(10) + "🔗 PR: [#" + str(pr_num) + "](" + gitea_base + "/admin/" + repo + "/pulls/" + str(pr_num) + ")"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
add_comment(work_item_id, msg)
|
||||
|
||||
|
||||
|
||||
@@ -185,6 +185,35 @@ def check_analysis_approved(repo: str, work_item_id: str) -> tuple[bool, str]:
|
||||
return True, f"Files present; Plane API check skipped ({e})"
|
||||
|
||||
|
||||
|
||||
|
||||
def check_reviewer_verdict(repo: str, work_item_id: str) -> tuple[bool, str]:
|
||||
"""
|
||||
Check reviewer agent verdict from 12-review.md.
|
||||
Returns (True, reason) if APPROVED, (False, reason) if REQUEST_CHANGES or missing.
|
||||
"""
|
||||
repo_path = os.path.join(settings.repos_dir, repo)
|
||||
review_path = os.path.join(repo_path, f"docs/work-items/{work_item_id}/12-review.md")
|
||||
|
||||
if not os.path.isfile(review_path):
|
||||
return False, "Review report not found (12-review.md)"
|
||||
|
||||
try:
|
||||
with open(review_path, "r") as f:
|
||||
content = f.read(5000)
|
||||
|
||||
content_upper = content.upper()
|
||||
if "REQUEST_CHANGES" in content_upper:
|
||||
return False, "Reviewer verdict: REQUEST_CHANGES"
|
||||
if "APPROVED" in content_upper:
|
||||
return True, "Reviewer verdict: APPROVED"
|
||||
if "LGTM" in content_upper or "SHIP IT" in content_upper:
|
||||
return True, "Reviewer verdict: LGTM"
|
||||
return False, "Review exists but no clear APPROVED/REQUEST_CHANGES verdict"
|
||||
except OSError as e:
|
||||
return False, f"Error reading review: {e}"
|
||||
|
||||
|
||||
# Registry for dynamic lookup by name
|
||||
QG_CHECKS = {
|
||||
"check_analysis_approved": check_analysis_approved,
|
||||
@@ -193,4 +222,5 @@ QG_CHECKS = {
|
||||
"check_ci_green": check_ci_green,
|
||||
"check_review_approved": check_review_approved,
|
||||
"check_tests_passed": check_tests_passed,
|
||||
"check_reviewer_verdict": check_reviewer_verdict,
|
||||
}
|
||||
|
||||
@@ -14,8 +14,8 @@ STAGE_TRANSITIONS = {
|
||||
"analysis": {"next": "architecture", "agent": "architect", "qg": "check_analysis_approved"},
|
||||
"architecture": {"next": "development", "agent": "developer", "qg": "check_architecture_done"},
|
||||
"development": {"next": "review", "agent": "reviewer", "qg": "check_ci_green"},
|
||||
"review": {"next": "testing", "agent": "tester", "qg": "check_review_approved"},
|
||||
"testing": {"next": "deploy", "agent": None, "qg": "check_tests_passed"},
|
||||
"review": {"next": "testing", "agent": "tester", "qg": "check_reviewer_verdict"},
|
||||
"testing": {"next": "deploy", "agent": "deployer", "qg": "check_tests_passed"},
|
||||
"deploy": {"next": "done", "agent": None, "qg": None},
|
||||
"done": {"next": None, "agent": None, "qg": None},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user