24 KiB
DEV_TASK: Полная интеграция с Plane (12 пунктов)
Контекст
Orchestrator для enduro-trails. Нужно довести интеграцию с Plane до полноценной:
- Статусы Issue отражают реальное состояние
- Analyst может задавать вопросы
- Откаты при ошибках (tester fail, deploy fail, architect conflict)
- Auto-init при создании Issue в Plane
Сервер
- Host:
slin@82.22.50.71 - SSH:
/home/node/.openclaw/skills/installer/scripts/ssh_exec.sh --host mva154 - Orchestrator container:
orchestrator - Orchestrator repo на хосте:
/home/slin/repos/orchestrator - Plane API:
http://localhost:8091/api/v1(из контейнера orchestrator) - Plane workspace:
ag_proj - Plane project ID (Enduro Trails):
7a79f0a9-5278-49cd-9007-9a338f238f9c
Plane States (уже созданы)
| State | ID | Group |
|---|---|---|
| Backlog | 113b24f6-cce8-4be9-9a22-a359b9cf0122 | backlog |
| Todo | 2c7d3df3-9eb9-419b-92b7-d7d560bcdd10 | unstarted |
| In Progress | b873d9eb-993c-48cd-97ac-99a9b1623967 | started |
| Needs Input | babf08a3-ff4d-41f3-a821-5491aa29a8ac | started |
| In Review | 38fb1f64-aa1e-48a3-92e0-0b109679046b | started |
| Blocked | 6c4543f9-ac47-4ef7-ae0f-070020dc9920 | started |
| Done | 381a2833-3c4e-4be5-bd0f-be84cb946ad8 | completed |
| Cancelled | b1cae7f9-961d-4889-a179-f3acea697d17 | cancelled |
Задачи
Task 1: Обновить STAGE_TO_STATE в plane_sync.py
Файл: /home/slin/repos/orchestrator/src/plane_sync.py
Заменить STAGE_TO_STATE на:
# Plane state IDs
PLANE_STATES = {
"backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
"todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10",
"in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967",
"needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac",
"in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b",
"blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920",
"done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8",
"cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17",
}
# Map orchestrator stages to Plane states
STAGE_TO_STATE = {
"created": PLANE_STATES["todo"],
"analysis": PLANE_STATES["in_progress"],
"architecture": PLANE_STATES["in_progress"],
"development": PLANE_STATES["in_progress"],
"review": PLANE_STATES["in_progress"],
"testing": PLANE_STATES["in_progress"],
"deploy": PLANE_STATES["in_progress"],
"done": PLANE_STATES["done"],
}
Добавить новые функции:
def set_issue_needs_input(work_item_id: str):
"""Set issue to 'Needs Input' state — waiting for stakeholder response."""
_set_issue_state_direct(work_item_id, PLANE_STATES["needs_input"])
def set_issue_in_review(work_item_id: str):
"""Set issue to 'In Review' state — waiting for :approved: or :rejected:."""
_set_issue_state_direct(work_item_id, PLANE_STATES["in_review"])
def set_issue_blocked(work_item_id: str):
"""Set issue to 'Blocked' state — manual intervention needed."""
_set_issue_state_direct(work_item_id, PLANE_STATES["blocked"])
def set_issue_in_progress(work_item_id: str):
"""Set issue to 'In Progress' state — agent working."""
_set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"])
def _set_issue_state_direct(work_item_id: str, state_id: str):
"""Set issue state directly by state_id."""
issue_id = find_issue_id(work_item_id)
if not issue_id:
logger.warning(f"Issue not found in Plane for {work_item_id}")
return
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/"
try:
resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
resp.raise_for_status()
logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...")
except Exception as e:
logger.error(f"Failed to update Plane state for {work_item_id}: {e}")
Task 2: Analyst questions flow
Файл: /home/slin/repos/orchestrator/src/agents/launcher.py
В _try_advance_stage, в блоке где agent == "analyst" и qg_name == "check_analysis_approved":
После files_check — добавить проверку на questions:
if files_ok:
# Full artifacts ready → In Review
from ..plane_sync import set_issue_in_review
set_issue_in_review(work_item_id)
plane_add_comment(
work_item_id,
"📋 BRD/ТЗ/AC/TestPlan готовы. "
"Прошу review и реакцию :approved: для продвижения в Architecture."
)
notify_approve_requested(task_id)
else:
# Check if questions file exists
import os as _os
questions_path = _os.path.join(
settings.repos_dir, repo,
f"docs/work-items/{work_item_id}/01-questions.md"
)
if _os.path.isfile(questions_path):
# Analyst has questions → Needs Input
from ..plane_sync import set_issue_needs_input
set_issue_needs_input(work_item_id)
# Read questions and post to Plane
with open(questions_path, "r") as qf:
questions_text = qf.read()
plane_add_comment(
work_item_id,
f"❓ Analyst нуждается в уточнении:\n\n{questions_text}"
)
from ..notifications import send_telegram
send_telegram(
f"❓ {work_item_id}: Analyst задаёт вопросы. Ответь в Plane."
)
else:
# No artifacts and no questions — something went wrong
plane_add_comment(
work_item_id,
"⚠️ Analyst завершился без артефактов и без вопросов. Проверьте лог."
)
Task 3: Handle stakeholder response to questions
Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py
В handle_comment, после проверки :rejected: и :approved:, добавить:
# If neither :approved: nor :rejected: — check if this is an answer to questions
if current_stage == "analysis":
# Check if issue is in "Needs Input" state (analyst asked questions)
from ..plane_sync import PLANE_STATES, set_issue_in_progress
issue_id = task.get("plane_issue_id") or task.get("plane_id")
if issue_id:
# Check current Plane state
from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
import httpx as _httpx
try:
_resp = _httpx.get(
f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/",
headers=PLANE_HEADERS, timeout=10
)
if _resp.status_code == 200:
issue_data = _resp.json()
if issue_data.get("state") == PLANE_STATES["needs_input"]:
# This is an answer to analyst's questions
set_issue_in_progress(work_item_id)
# Relaunch analyst with context about the answer
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Stakeholder answered your questions. "
f"Read the latest comment in Plane and revise your artifacts.\n"
f"Answer: {comment_body[:500]}"
)
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
plane_add_comment(work_item_id, "🔄 Analyst перезапущен с ответами стейкхолдера.")
logger.info(f"Task {task_id}: stakeholder answered questions, relaunched analyst (run_id={new_run})")
return
except Exception as e:
logger.error(f"Failed to check issue state: {e}")
Также добавить import вверху файла:
from ..plane_sync import add_comment as plane_add_comment
Task 4: :rejected: handler — set In Progress and relaunch
Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py
Обновить блок :rejected::
if ":rejected:" in comment_body:
# Extract reason (text after :rejected:)
reason = comment_body.split(":rejected:", 1)[-1].strip()[:300]
# Rollback to analysis (re-do current stage)
if current_stage == "analysis":
# Already in analysis — just relaunch analyst with rejection reason
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
f"Reason: {reason}\nRevise and improve."
)
new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
plane_add_comment(work_item_id, f"🔄 Analyst перезапущен. Причина отклонения: {reason}")
logger.info(f"Task {task_id}: rejected at analysis, relaunched analyst")
else:
# Rollback to previous stage
prev_stage = get_previous_stage(current_stage)
if prev_stage:
update_task_stage(task_id, prev_stage)
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
notify_stage_change(task_id, current_stage, prev_stage)
plane_add_comment(work_item_id, f"🔄 Откат: {current_stage} → {prev_stage}. Причина: {reason}")
logger.info(f"Task {task_id}: rejected, rolled back {current_stage} → {prev_stage}")
return
Task 5: :approved: handler — set In Progress before advancing
Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py
В блоке :approved:, перед вызовом _try_advance_stage:
if ":approved:" in comment_body:
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
# Try to advance stage
await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
return
Task 6: Tester FAIL → rollback to development
Файл: /home/slin/repos/orchestrator/src/agents/launcher.py
В _try_advance_stage, добавить обработку tester fail. После блока reviewer REQUEST_CHANGES, добавить:
# If tester reports FAIL, rollback to development
if agent == "tester" and qg_name == "check_tests_passed" and not passed:
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
plane_notify_stage(work_item_id, current_stage, "development")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
plane_add_comment(
work_item_id,
f"❌ Тесты не прошли: {reason}. Developer перезапущен для фикса."
)
# Count developer retries
conn2 = get_db()
retry_count = conn2.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
(task_id,)
).fetchone()[0]
conn2.close()
if retry_count < 3:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: development\nNote: Tests FAILED. "
f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
)
new_run = self.launch("developer", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})")
else:
from ..notifications import send_telegram
from ..plane_sync import set_issue_blocked
set_issue_blocked(work_item_id)
send_telegram(f"🚨 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
return
Task 7: Deploy FAIL → rollback to development
Файл: /home/slin/repos/orchestrator/src/agents/launcher.py
В _monitor_agent, после if exit_code == 0: block, add handling for deployer failure:
# Handle deployer failure (smoke/healthcheck failed)
if exit_code != 0 and agent == "deployer":
conn = get_db()
task_row = conn.execute(
"SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?",
(repo, branch),
).fetchone()
conn.close()
if task_row:
_tid, _wid = task_row
update_task_stage(_tid, "development")
notify_stage_change(_tid, "deploy", "development")
plane_notify_stage(_wid, "deploy", "development")
from .plane_sync import set_issue_blocked
set_issue_blocked(_wid)
plane_add_comment(
_wid,
"❌ Deploy FAILED (smoke/healthcheck). Rolled back. Developer нужен для фикса."
)
from .notifications import send_telegram
send_telegram(f"🚨 {_wid}: Deploy failed! Rolled back. Needs fix.")
Task 8: Architect conflict → rollback to analysis
Файл: /home/slin/repos/orchestrator/src/agents/launcher.py
В _try_advance_stage, в QG check for check_architecture_done:
# If architect finished but QG failed — check if conflict file exists
if agent == "architect" and qg_name == "check_architecture_done" and not passed:
import os as _os
conflict_path = _os.path.join(
settings.repos_dir, repo,
f"docs/work-items/{work_item_id}/10-conflict.md"
)
if _os.path.isfile(conflict_path):
# Architect found conflict with TRZ → rollback to analysis
update_task_stage(task_id, "analysis")
notify_stage_change(task_id, current_stage, "analysis")
plane_notify_stage(work_item_id, current_stage, "analysis")
from ..plane_sync import set_issue_in_progress
set_issue_in_progress(work_item_id)
with open(conflict_path, "r") as cf:
conflict_text = cf.read()[:500]
plane_add_comment(
work_item_id,
f"⚠️ Architect нашёл конфликт с ТЗ. Возврат в Analysis.\n\n{conflict_text}"
)
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
f"See docs/work-items/{work_item_id}/10-conflict.md"
)
new_run = self.launch("analyst", repo, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: architect conflict, relaunched analyst")
return
Task 9: Ссылки в комментариях
Файл: /home/slin/repos/orchestrator/src/plane_sync.py
Обновить notify_stage_change:
def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None):
"""Notify Plane about stage transition with links."""
update_issue_state(work_item_id, new_stage)
msg = f"🔄 Stage: {old_stage} → {new_stage}"
if agent:
msg += f" (launching {agent})"
# Add relevant links
gitea_base = f"http://git.mva154.duckdns.org"
# Find branch from DB
try:
from .db import get_db
conn = get_db()
row = conn.execute(
"SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,)
).fetchone()
conn.close()
if row:
branch, repo = row
msg += f"\n📂 Branch: <a href='{gitea_base}/admin/{repo}/src/branch/{branch}'>{branch}</a>"
# Add PR link if exists
if new_stage in ("review", "testing", "deploy"):
import httpx as _httpx
from .config import settings
_headers = {"Authorization": f"token {settings.gitea_token}"}
_resp = _httpx.get(
f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls",
params={"state": "open", "head": branch},
headers=_headers, timeout=5
)
if _resp.status_code == 200:
_prs = _resp.json()
if _prs:
pr_num = _prs[0]["number"]
msg += f"\n🔗 PR: <a href='{gitea_base}/admin/{repo}/pulls/{pr_num}'>#{pr_num}</a>"
except Exception:
pass
add_comment(work_item_id, msg)
Task 10: work_item.created webhook — QG-0 validation
Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py
Обновить handle_work_item_created — добавить QG-0 валидацию:
async def handle_work_item_created(data: dict):
"""
New work item created in Plane.
QG-0: validate title, description, priority.
If valid: create branch, init docs, launch analyst.
If invalid: comment with what's missing, set Blocked.
"""
plane_id = data.get("id", "")
name = data.get("name", "")
description = data.get("description_stripped", data.get("description", ""))
priority = data.get("priority", {})
priority_name = priority if isinstance(priority, str) else priority.get("name", "")
repo = settings.default_repo
# QG-0 validation
errors = []
if not name or len(name) < 5:
errors.append("Title слишком короткий (нужно ≥5 символов)")
if len(name) > 80:
errors.append("Title слишком длинный (максимум 80 символов)")
if not description or len(description.split('.')) < 2:
errors.append("Description слишком короткий (нужно ≥2 предложений)")
if errors:
# QG-0 failed
error_text = "⚠️ QG-0 failed:\n" + "\n".join(f"• {e}" for e in errors)
# Post comment
from ..plane_sync import add_comment, set_issue_blocked, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
# We need to comment on the issue directly
url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{plane_id}/comments/"
try:
import httpx as _httpx
_httpx.post(url, headers=PLANE_HEADERS,
json={"comment_html": f"<p>{error_text}</p>"}, timeout=10)
except Exception:
pass
# Set blocked
url2 = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{plane_id}/"
try:
from ..plane_sync import PLANE_STATES
_httpx.patch(url2, headers=PLANE_HEADERS,
json={"state": PLANE_STATES["blocked"]}, timeout=10)
except Exception:
pass
logger.info(f"QG-0 failed for {plane_id}: {errors}")
return
# QG-0 passed — proceed with init
# ... (rest of existing code stays the same)
Task 11: Max analyst question rounds (3)
Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py
В Task 3 (handle stakeholder response), before relaunching analyst, check retry count:
# Check analyst retry count (max 3 question rounds)
conn3 = get_db()
analyst_runs = conn3.execute(
"SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='analyst'",
(task_id,)
).fetchone()[0]
conn3.close()
if analyst_runs >= 4: # initial + 3 retries
from ..plane_sync import set_issue_blocked
set_issue_blocked(work_item_id)
plane_add_comment(
work_item_id,
"🚨 3 раунда уточнений исчерпаны. Analyst не может сформировать ТЗ. "
"Требуется более детальное описание или встреча."
)
from ..notifications import send_telegram
send_telegram(f"🚨 {work_item_id}: 3 раунда вопросов analyst'а исчерпаны. Нужна помощь.")
return
Task 12: Rebuild, test, verify
cd /home/slin/repos/orchestrator
docker compose build --no-cache
docker compose up -d
sleep 3
curl -s http://localhost:8500/health
Verification:
docker exec orchestrator python3 -c "
from src.plane_sync import PLANE_STATES, set_issue_needs_input, set_issue_in_review, set_issue_blocked, set_issue_in_progress
print('PLANE_STATES keys:', list(PLANE_STATES.keys()))
print('All state functions imported OK')
from src.stages import STAGE_TRANSITIONS
assert STAGE_TRANSITIONS['testing']['agent'] == 'deployer'
print('deployer in stages: OK')
"
# Test Plane API — set ET-002 to Needs Input and back
docker exec orchestrator python3 -c "
from src.plane_sync import set_issue_needs_input, set_issue_in_progress
set_issue_needs_input('ET-002')
print('Set ET-002 to Needs Input')
import time; time.sleep(2)
set_issue_in_progress('ET-002')
print('Set ET-002 back to In Progress')
"
Ограничения
- НЕ трогать deployer.md (уже готов)
- НЕ менять AGENT_CONFIGS (deployer уже добавлен)
- НЕ менять stages.py (deployer уже там)
- Plane API URL:
http://localhost:8091/api/v1(проверить в config.py, может быть другой порт) - Все изменения в
/home/slin/repos/orchestrator/src/ - После изменений —
docker compose build && docker compose up -d
Порядок применения
Файлы меняются в таком порядке:
src/plane_sync.py(Tasks 1, 9)src/webhooks/plane.py(Tasks 3, 4, 5, 10, 11)src/agents/launcher.py(Tasks 2, 6, 7, 8)- Rebuild (Task 12)
Acceptance
curl -s http://localhost:8500/health→ OKdocker exec orchestrator python3 -c "from src.plane_sync import PLANE_STATES, set_issue_needs_input, set_issue_in_review, set_issue_blocked"→ no error- Plane states: Needs Input, In Review, Blocked видны в UI
- Syntax check:
docker exec orchestrator python3 -c "import src.main"→ no error - Test state change: set ET-002 to Needs Input → verify in Plane → set back to In Progress