Files
wiki/tasks/orchestrator/DEV_TASK_PLANE_FULL_INTEGRATION.md
2026-06-02 21:00:01 +03:00

24 KiB
Raw Blame History

DEV_TASK: Полная интеграция с Plane (12 пунктов)

Контекст

Orchestrator для enduro-trails. Нужно довести интеграцию с Plane до полноценной:

  • Статусы Issue отражают реальное состояние
  • Analyst может задавать вопросы
  • Откаты при ошибках (tester fail, deploy fail, architect conflict)
  • Auto-init при создании Issue в Plane

Сервер

  • Host: slin@82.22.50.71
  • SSH: /home/node/.openclaw/skills/installer/scripts/ssh_exec.sh --host mva154
  • Orchestrator container: orchestrator
  • Orchestrator repo на хосте: /home/slin/repos/orchestrator
  • Plane API: http://localhost:8091/api/v1 (из контейнера orchestrator)
  • Plane workspace: ag_proj
  • Plane project ID (Enduro Trails): 7a79f0a9-5278-49cd-9007-9a338f238f9c

Plane States (уже созданы)

State ID Group
Backlog 113b24f6-cce8-4be9-9a22-a359b9cf0122 backlog
Todo 2c7d3df3-9eb9-419b-92b7-d7d560bcdd10 unstarted
In Progress b873d9eb-993c-48cd-97ac-99a9b1623967 started
Needs Input babf08a3-ff4d-41f3-a821-5491aa29a8ac started
In Review 38fb1f64-aa1e-48a3-92e0-0b109679046b started
Blocked 6c4543f9-ac47-4ef7-ae0f-070020dc9920 started
Done 381a2833-3c4e-4be5-bd0f-be84cb946ad8 completed
Cancelled b1cae7f9-961d-4889-a179-f3acea697d17 cancelled

Задачи


Task 1: Обновить STAGE_TO_STATE в plane_sync.py

Файл: /home/slin/repos/orchestrator/src/plane_sync.py

Заменить STAGE_TO_STATE на:

# Plane state IDs
PLANE_STATES = {
    "backlog": "113b24f6-cce8-4be9-9a22-a359b9cf0122",
    "todo": "2c7d3df3-9eb9-419b-92b7-d7d560bcdd10",
    "in_progress": "b873d9eb-993c-48cd-97ac-99a9b1623967",
    "needs_input": "babf08a3-ff4d-41f3-a821-5491aa29a8ac",
    "in_review": "38fb1f64-aa1e-48a3-92e0-0b109679046b",
    "blocked": "6c4543f9-ac47-4ef7-ae0f-070020dc9920",
    "done": "381a2833-3c4e-4be5-bd0f-be84cb946ad8",
    "cancelled": "b1cae7f9-961d-4889-a179-f3acea697d17",
}

# Map orchestrator stages to Plane states
STAGE_TO_STATE = {
    "created": PLANE_STATES["todo"],
    "analysis": PLANE_STATES["in_progress"],
    "architecture": PLANE_STATES["in_progress"],
    "development": PLANE_STATES["in_progress"],
    "review": PLANE_STATES["in_progress"],
    "testing": PLANE_STATES["in_progress"],
    "deploy": PLANE_STATES["in_progress"],
    "done": PLANE_STATES["done"],
}

Добавить новые функции:

def set_issue_needs_input(work_item_id: str):
    """Set issue to 'Needs Input' state — waiting for stakeholder response."""
    _set_issue_state_direct(work_item_id, PLANE_STATES["needs_input"])

def set_issue_in_review(work_item_id: str):
    """Set issue to 'In Review' state — waiting for :approved: or :rejected:."""
    _set_issue_state_direct(work_item_id, PLANE_STATES["in_review"])

def set_issue_blocked(work_item_id: str):
    """Set issue to 'Blocked' state — manual intervention needed."""
    _set_issue_state_direct(work_item_id, PLANE_STATES["blocked"])

def set_issue_in_progress(work_item_id: str):
    """Set issue to 'In Progress' state — agent working."""
    _set_issue_state_direct(work_item_id, PLANE_STATES["in_progress"])

def _set_issue_state_direct(work_item_id: str, state_id: str):
    """Set issue state directly by state_id."""
    issue_id = find_issue_id(work_item_id)
    if not issue_id:
        logger.warning(f"Issue not found in Plane for {work_item_id}")
        return
    url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/"
    try:
        resp = httpx.patch(url, headers=PLANE_HEADERS, json={"state": state_id}, timeout=10)
        resp.raise_for_status()
        logger.info(f"Plane: {work_item_id} state -> {state_id[:8]}...")
    except Exception as e:
        logger.error(f"Failed to update Plane state for {work_item_id}: {e}")

Task 2: Analyst questions flow

Файл: /home/slin/repos/orchestrator/src/agents/launcher.py

В _try_advance_stage, в блоке где agent == "analyst" и qg_name == "check_analysis_approved":

После files_check — добавить проверку на questions:

                if files_ok:
                    # Full artifacts ready → In Review
                    from ..plane_sync import set_issue_in_review
                    set_issue_in_review(work_item_id)
                    plane_add_comment(
                        work_item_id,
                        "📋 BRD/ТЗ/AC/TestPlan готовы. "
                        "Прошу review и реакцию :approved: для продвижения в Architecture."
                    )
                    notify_approve_requested(task_id)
                else:
                    # Check if questions file exists
                    import os as _os
                    questions_path = _os.path.join(
                        settings.repos_dir, repo,
                        f"docs/work-items/{work_item_id}/01-questions.md"
                    )
                    if _os.path.isfile(questions_path):
                        # Analyst has questions → Needs Input
                        from ..plane_sync import set_issue_needs_input
                        set_issue_needs_input(work_item_id)
                        # Read questions and post to Plane
                        with open(questions_path, "r") as qf:
                            questions_text = qf.read()
                        plane_add_comment(
                            work_item_id,
                            f"❓ Analyst нуждается в уточнении:\n\n{questions_text}"
                        )
                        from ..notifications import send_telegram
                        send_telegram(
                            f"❓ {work_item_id}: Analyst задаёт вопросы. Ответь в Plane."
                        )
                    else:
                        # No artifacts and no questions — something went wrong
                        plane_add_comment(
                            work_item_id,
                            "⚠️ Analyst завершился без артефактов и без вопросов. Проверьте лог."
                        )

Task 3: Handle stakeholder response to questions

Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py

В handle_comment, после проверки :rejected: и :approved:, добавить:

    # If neither :approved: nor :rejected: — check if this is an answer to questions
    if current_stage == "analysis":
        # Check if issue is in "Needs Input" state (analyst asked questions)
        from ..plane_sync import PLANE_STATES, set_issue_in_progress
        issue_id = task.get("plane_issue_id") or task.get("plane_id")
        if issue_id:
            # Check current Plane state
            from ..plane_sync import PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
            import httpx as _httpx
            try:
                _resp = _httpx.get(
                    f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{issue_id}/",
                    headers=PLANE_HEADERS, timeout=10
                )
                if _resp.status_code == 200:
                    issue_data = _resp.json()
                    if issue_data.get("state") == PLANE_STATES["needs_input"]:
                        # This is an answer to analyst's questions
                        set_issue_in_progress(work_item_id)
                        # Relaunch analyst with context about the answer
                        task_desc = (
                            f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
                            f"Stage: analysis\nNote: Stakeholder answered your questions. "
                            f"Read the latest comment in Plane and revise your artifacts.\n"
                            f"Answer: {comment_body[:500]}"
                        )
                        new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
                        plane_add_comment(work_item_id, "🔄 Analyst перезапущен с ответами стейкхолдера.")
                        logger.info(f"Task {task_id}: stakeholder answered questions, relaunched analyst (run_id={new_run})")
                        return
            except Exception as e:
                logger.error(f"Failed to check issue state: {e}")

Также добавить import вверху файла:

from ..plane_sync import add_comment as plane_add_comment

Task 4: :rejected: handler — set In Progress and relaunch

Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py

Обновить блок :rejected::

    if ":rejected:" in comment_body:
        # Extract reason (text after :rejected:)
        reason = comment_body.split(":rejected:", 1)[-1].strip()[:300]
        
        # Rollback to analysis (re-do current stage)
        if current_stage == "analysis":
            # Already in analysis — just relaunch analyst with rejection reason
            from ..plane_sync import set_issue_in_progress
            set_issue_in_progress(work_item_id)
            task_desc = (
                f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
                f"Stage: analysis\nNote: Stakeholder REJECTED your artifacts. "
                f"Reason: {reason}\nRevise and improve."
            )
            new_run = launcher.launch("analyst", repo, task_desc, task_id=task_id)
            plane_add_comment(work_item_id, f"🔄 Analyst перезапущен. Причина отклонения: {reason}")
            logger.info(f"Task {task_id}: rejected at analysis, relaunched analyst")
        else:
            # Rollback to previous stage
            prev_stage = get_previous_stage(current_stage)
            if prev_stage:
                update_task_stage(task_id, prev_stage)
                from ..plane_sync import set_issue_in_progress
                set_issue_in_progress(work_item_id)
                notify_stage_change(task_id, current_stage, prev_stage)
                plane_add_comment(work_item_id, f"🔄 Откат: {current_stage}{prev_stage}. Причина: {reason}")
                logger.info(f"Task {task_id}: rejected, rolled back {current_stage}{prev_stage}")
        return

Task 5: :approved: handler — set In Progress before advancing

Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py

В блоке :approved:, перед вызовом _try_advance_stage:

    if ":approved:" in comment_body:
        from ..plane_sync import set_issue_in_progress
        set_issue_in_progress(work_item_id)
        # Try to advance stage
        await _try_advance_stage(task_id, current_stage, repo, work_item_id, branch)
        return

Task 6: Tester FAIL → rollback to development

Файл: /home/slin/repos/orchestrator/src/agents/launcher.py

В _try_advance_stage, добавить обработку tester fail. После блока reviewer REQUEST_CHANGES, добавить:

                    # If tester reports FAIL, rollback to development
                    if agent == "tester" and qg_name == "check_tests_passed" and not passed:
                        update_task_stage(task_id, "development")
                        notify_stage_change(task_id, current_stage, "development")
                        plane_notify_stage(work_item_id, current_stage, "development")
                        from ..plane_sync import set_issue_in_progress
                        set_issue_in_progress(work_item_id)
                        plane_add_comment(
                            work_item_id,
                            f"❌ Тесты не прошли: {reason}. Developer перезапущен для фикса."
                        )
                        # Count developer retries
                        conn2 = get_db()
                        retry_count = conn2.execute(
                            "SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='developer'",
                            (task_id,)
                        ).fetchone()[0]
                        conn2.close()
                        if retry_count < 3:
                            task_desc = (
                                f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
                                f"Stage: development\nNote: Tests FAILED. "
                                f"Fix failures described in docs/work-items/{work_item_id}/13-test-report.md"
                            )
                            new_run = self.launch("developer", repo, task_desc, task_id=task_id)
                            logger.info(f"Task {task_id}: tester FAIL, relaunched developer (run_id={new_run})")
                        else:
                            from ..notifications import send_telegram
                            from ..plane_sync import set_issue_blocked
                            set_issue_blocked(work_item_id)
                            send_telegram(f"🚨 {work_item_id}: Tests still failing after 3 developer retries. Manual intervention needed.")
                        return

Task 7: Deploy FAIL → rollback to development

Файл: /home/slin/repos/orchestrator/src/agents/launcher.py

В _monitor_agent, после if exit_code == 0: block, add handling for deployer failure:

        # Handle deployer failure (smoke/healthcheck failed)
        if exit_code != 0 and agent == "deployer":
            conn = get_db()
            task_row = conn.execute(
                "SELECT id, work_item_id FROM tasks WHERE repo=? AND branch=?",
                (repo, branch),
            ).fetchone()
            conn.close()
            if task_row:
                _tid, _wid = task_row
                update_task_stage(_tid, "development")
                notify_stage_change(_tid, "deploy", "development")
                plane_notify_stage(_wid, "deploy", "development")
                from .plane_sync import set_issue_blocked
                set_issue_blocked(_wid)
                plane_add_comment(
                    _wid,
                    "❌ Deploy FAILED (smoke/healthcheck). Rolled back. Developer нужен для фикса."
                )
                from .notifications import send_telegram
                send_telegram(f"🚨 {_wid}: Deploy failed! Rolled back. Needs fix.")

Task 8: Architect conflict → rollback to analysis

Файл: /home/slin/repos/orchestrator/src/agents/launcher.py

В _try_advance_stage, в QG check for check_architecture_done:

                    # If architect finished but QG failed — check if conflict file exists
                    if agent == "architect" and qg_name == "check_architecture_done" and not passed:
                        import os as _os
                        conflict_path = _os.path.join(
                            settings.repos_dir, repo,
                            f"docs/work-items/{work_item_id}/10-conflict.md"
                        )
                        if _os.path.isfile(conflict_path):
                            # Architect found conflict with TRZ → rollback to analysis
                            update_task_stage(task_id, "analysis")
                            notify_stage_change(task_id, current_stage, "analysis")
                            plane_notify_stage(work_item_id, current_stage, "analysis")
                            from ..plane_sync import set_issue_in_progress
                            set_issue_in_progress(work_item_id)
                            with open(conflict_path, "r") as cf:
                                conflict_text = cf.read()[:500]
                            plane_add_comment(
                                work_item_id,
                                f"⚠️ Architect нашёл конфликт с ТЗ. Возврат в Analysis.\n\n{conflict_text}"
                            )
                            task_desc = (
                                f"Work item: {work_item_id}\nRepo: {repo}\nBranch: {branch}\n"
                                f"Stage: analysis\nNote: Architect conflict. Revise TRZ. "
                                f"See docs/work-items/{work_item_id}/10-conflict.md"
                            )
                            new_run = self.launch("analyst", repo, task_desc, task_id=task_id)
                            logger.info(f"Task {task_id}: architect conflict, relaunched analyst")
                            return

Task 9: Ссылки в комментариях

Файл: /home/slin/repos/orchestrator/src/plane_sync.py

Обновить notify_stage_change:

def notify_stage_change(work_item_id: str, old_stage: str, new_stage: str, agent: str = None):
    """Notify Plane about stage transition with links."""
    update_issue_state(work_item_id, new_stage)

    msg = f"🔄 Stage: {old_stage}{new_stage}"
    if agent:
        msg += f" (launching {agent})"
    
    # Add relevant links
    gitea_base = f"http://git.mva154.duckdns.org"
    # Find branch from DB
    try:
        from .db import get_db
        conn = get_db()
        row = conn.execute(
            "SELECT branch, repo FROM tasks WHERE work_item_id=?", (work_item_id,)
        ).fetchone()
        conn.close()
        if row:
            branch, repo = row
            msg += f"\n📂 Branch: <a href='{gitea_base}/admin/{repo}/src/branch/{branch}'>{branch}</a>"
            # Add PR link if exists
            if new_stage in ("review", "testing", "deploy"):
                import httpx as _httpx
                from .config import settings
                _headers = {"Authorization": f"token {settings.gitea_token}"}
                _resp = _httpx.get(
                    f"{settings.gitea_url}/api/v1/repos/{settings.gitea_owner}/{repo}/pulls",
                    params={"state": "open", "head": branch},
                    headers=_headers, timeout=5
                )
                if _resp.status_code == 200:
                    _prs = _resp.json()
                    if _prs:
                        pr_num = _prs[0]["number"]
                        msg += f"\n🔗 PR: <a href='{gitea_base}/admin/{repo}/pulls/{pr_num}'>#{pr_num}</a>"
    except Exception:
        pass

    add_comment(work_item_id, msg)

Task 10: work_item.created webhook — QG-0 validation

Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py

Обновить handle_work_item_created — добавить QG-0 валидацию:

async def handle_work_item_created(data: dict):
    """
    New work item created in Plane.
    QG-0: validate title, description, priority.
    If valid: create branch, init docs, launch analyst.
    If invalid: comment with what's missing, set Blocked.
    """
    plane_id = data.get("id", "")
    name = data.get("name", "")
    description = data.get("description_stripped", data.get("description", ""))
    priority = data.get("priority", {})
    priority_name = priority if isinstance(priority, str) else priority.get("name", "")
    repo = settings.default_repo

    # QG-0 validation
    errors = []
    if not name or len(name) < 5:
        errors.append("Title слишком короткий (нужно ≥5 символов)")
    if len(name) > 80:
        errors.append("Title слишком длинный (максимум 80 символов)")
    if not description or len(description.split('.')) < 2:
        errors.append("Description слишком короткий (нужно ≥2 предложений)")

    if errors:
        # QG-0 failed
        error_text = "⚠️ QG-0 failed:\n" + "\n".join(f"• {e}" for e in errors)
        # Post comment
        from ..plane_sync import add_comment, set_issue_blocked, PLANE_BASE, PLANE_HEADERS, WORKSPACE, PROJECT_ID
        # We need to comment on the issue directly
        url = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{plane_id}/comments/"
        try:
            import httpx as _httpx
            _httpx.post(url, headers=PLANE_HEADERS, 
                       json={"comment_html": f"<p>{error_text}</p>"}, timeout=10)
        except Exception:
            pass
        # Set blocked
        url2 = f"{PLANE_BASE}/workspaces/{WORKSPACE}/projects/{PROJECT_ID}/issues/{plane_id}/"
        try:
            from ..plane_sync import PLANE_STATES
            _httpx.patch(url2, headers=PLANE_HEADERS,
                        json={"state": PLANE_STATES["blocked"]}, timeout=10)
        except Exception:
            pass
        logger.info(f"QG-0 failed for {plane_id}: {errors}")
        return

    # QG-0 passed — proceed with init
    # ... (rest of existing code stays the same)

Task 11: Max analyst question rounds (3)

Файл: /home/slin/repos/orchestrator/src/webhooks/plane.py

В Task 3 (handle stakeholder response), before relaunching analyst, check retry count:

                        # Check analyst retry count (max 3 question rounds)
                        conn3 = get_db()
                        analyst_runs = conn3.execute(
                            "SELECT COUNT(*) FROM agent_runs WHERE task_id=? AND agent='analyst'",
                            (task_id,)
                        ).fetchone()[0]
                        conn3.close()
                        
                        if analyst_runs >= 4:  # initial + 3 retries
                            from ..plane_sync import set_issue_blocked
                            set_issue_blocked(work_item_id)
                            plane_add_comment(
                                work_item_id,
                                "🚨 3 раунда уточнений исчерпаны. Analyst не может сформировать ТЗ. "
                                "Требуется более детальное описание или встреча."
                            )
                            from ..notifications import send_telegram
                            send_telegram(f"🚨 {work_item_id}: 3 раунда вопросов analyst'а исчерпаны. Нужна помощь.")
                            return

Task 12: Rebuild, test, verify

cd /home/slin/repos/orchestrator
docker compose build --no-cache
docker compose up -d
sleep 3
curl -s http://localhost:8500/health

Verification:

docker exec orchestrator python3 -c "
from src.plane_sync import PLANE_STATES, set_issue_needs_input, set_issue_in_review, set_issue_blocked, set_issue_in_progress
print('PLANE_STATES keys:', list(PLANE_STATES.keys()))
print('All state functions imported OK')

from src.stages import STAGE_TRANSITIONS
assert STAGE_TRANSITIONS['testing']['agent'] == 'deployer'
print('deployer in stages: OK')
"
# Test Plane API — set ET-002 to Needs Input and back
docker exec orchestrator python3 -c "
from src.plane_sync import set_issue_needs_input, set_issue_in_progress
set_issue_needs_input('ET-002')
print('Set ET-002 to Needs Input')
import time; time.sleep(2)
set_issue_in_progress('ET-002')
print('Set ET-002 back to In Progress')
"

Ограничения

  • НЕ трогать deployer.md (уже готов)
  • НЕ менять AGENT_CONFIGS (deployer уже добавлен)
  • НЕ менять stages.py (deployer уже там)
  • Plane API URL: http://localhost:8091/api/v1 (проверить в config.py, может быть другой порт)
  • Все изменения в /home/slin/repos/orchestrator/src/
  • После изменений — docker compose build && docker compose up -d

Порядок применения

Файлы меняются в таком порядке:

  1. src/plane_sync.py (Tasks 1, 9)
  2. src/webhooks/plane.py (Tasks 3, 4, 5, 10, 11)
  3. src/agents/launcher.py (Tasks 2, 6, 7, 8)
  4. Rebuild (Task 12)

Acceptance

  • curl -s http://localhost:8500/health → OK
  • docker exec orchestrator python3 -c "from src.plane_sync import PLANE_STATES, set_issue_needs_input, set_issue_in_review, set_issue_blocked" → no error
  • Plane states: Needs Input, In Review, Blocked видны в UI
  • Syntax check: docker exec orchestrator python3 -c "import src.main" → no error
  • Test state change: set ET-002 to Needs Input → verify in Plane → set back to In Progress