Files
orchestrator/src/webhooks/gitea.py
Dev Agent 61e26a8930 fix(observability): merge-gate on deploy, full token input, Plane Done, artifact links
1. BUG 8 (second door): merge webhook no longer fake-completes a task at the
   deploy stage; done is gated by the deployer verdict (check_deploy_status).
   Other stages keep merge->done.
2. Token accounting: parse+persist cache_creation_input_tokens (new
   idempotent agent_runs column). usage_comment / task_summary now show the
   FULL input (input + cache_read + cache_creation) with a cached breakdown.
   cost_usd untouched.
3. deploy->done success now forces the Plane issue to terminal Done state.
4. All agents (architect/developer/reviewer/tester/deployer) attach artifact
   links to their finish comment via gitea_public_url.

Tests added for each fix; pytest 244 passed / 9 failed (off-limits HMAC group).
2026-06-04 11:17:58 +03:00

354 lines
15 KiB
Python

"""Gitea webhook handlers — full implementation."""
import hmac
import subprocess
import os
import hashlib
import json
import logging
import httpx
from fastapi import APIRouter, Request, HTTPException
from ..config import settings
from ..db import (
get_db,
get_task_by_repo_branch,
update_task_stage,
enqueue_job,
insert_event_dedup,
)
from ._dedup import gitea_delivery_id
from ..stages import get_next_stage, get_agent_for_stage
from ..qg.checks import check_ci_green, check_review_approved
from ..notifications import notify_stage_change, notify_qg_failure, notify_error
from ..agents.launcher import launcher
from ..plane_sync import notify_stage_change as plane_notify_stage
from ..projects import get_project_by_repo
logger = logging.getLogger("orchestrator.webhooks.gitea")
router = APIRouter()
# Max retries for developer on request_changes
MAX_DEV_RETRIES = 3
def verify_gitea_signature(body: bytes, signature: str) -> bool:
"""Verify Gitea webhook HMAC-SHA256 signature."""
if not settings.gitea_webhook_secret:
return True # Skip verification if no secret configured
expected = hmac.new(
settings.gitea_webhook_secret.encode(),
body,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, signature)
@router.post("/gitea")
async def gitea_webhook(request: Request):
"""Handle Gitea webhook events."""
body = await request.body()
# Verify HMAC signature
signature = request.headers.get("X-Gitea-Signature", "")
if not verify_gitea_signature(body, signature):
logger.warning("Gitea webhook: invalid signature")
raise HTTPException(status_code=401, detail="Invalid signature")
payload = json.loads(body)
# ORCH-5 (M-7): idempotent logging. Compute a stable delivery_id (X-Gitea-Delivery
# GUID, or sha256 fallback) and INSERT OR IGNORE. A repeated delivery (Gitea retry
# / manual replay) returns inserted=False -> log + return {"status":"duplicate"}
# WITHOUT re-dispatching, so the pipeline is not re-triggered (ET-009 class).
# Runs AFTER HMAC verification above.
event_type = request.headers.get("X-Gitea-Event", "unknown")
delivery_id = gitea_delivery_id(request.headers, event_type, body)
inserted = insert_event_dedup("gitea", event_type, body.decode(), delivery_id)
if not inserted:
logger.info(f"Gitea webhook duplicate delivery_id={delivery_id}, skipping dispatch")
return {"status": "duplicate"}
if event_type == "push":
await handle_push(payload)
elif event_type.startswith("pull_request"):
await handle_pr(payload)
elif event_type == "status":
await handle_ci_status(payload)
return {"status": "accepted"}
async def handle_push(payload: dict):
"""
Push event:
- If stage=architecture and push contains ADR files → advance to development
- If stage=development and push contains src/ → wait for CI
"""
ref = payload.get("ref", "")
# Extract branch: refs/heads/feature/ET-003-slug → feature/ET-003-slug
if not ref.startswith("refs/heads/"):
return
branch = ref.removeprefix("refs/heads/")
repo_name = payload.get("repository", {}).get("name", settings.default_repo)
# ORCH-6: ignore pushes to repos outside the project registry.
if not get_project_by_repo(repo_name):
logger.info(f"Gitea push: ignoring unknown repo '{repo_name}'")
return
task = get_task_by_repo_branch(repo_name, branch)
if not task:
logger.debug(f"Push to '{branch}' — no matching task found")
return
task_id = task["id"]
current_stage = task["stage"]
work_item_id = task.get("work_item_id", "")
# Collect modified files from commits
modified_files = set()
for commit in payload.get("commits", []):
modified_files.update(commit.get("added", []))
modified_files.update(commit.get("modified", []))
if current_stage == "architecture":
# Check if ADR files were pushed
has_adr = any(
f"docs/work-items/{work_item_id}/06-adr/" in f
or f"docs/work-items/{work_item_id}/07-infra-requirements.md" == f
for f in modified_files
)
if has_adr:
# Advance to development
next_stage = "development"
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: push triggered {current_stage}{next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
elif current_stage == "development":
# Source files pushed — just log, wait for CI
has_src = any(f.startswith("src/") for f in modified_files)
if has_src:
logger.info(f"Task {task_id}: source push detected on '{branch}', waiting for CI")
async def handle_ci_status(payload: dict):
"""
CI status update:
- If state=success and stage=development → advance to review, launch reviewer
- If state=failure → log
"""
state = payload.get("state", "")
# Extract branch from target_url or branches
branches = payload.get("branches", [])
branch = ""
if branches:
branch = branches[0].get("name", "")
# Alternative: find branch by SHA from tasks DB
if not branch:
sha = payload.get("sha", "")
repo_name = payload.get("repository", {}).get("name", settings.default_repo)
# Try to find task by checking git branch containing this SHA.
# ORCH-2 / S-4: this is a READ-ONLY query of remote-tracking refs in the main
# clone (no checkout / no mutation), so it is safe to keep on /repos/<repo>.
try:
result = subprocess.run(
["git", "-C", os.path.join(settings.repos_dir, repo_name),
"branch", "-r", "--contains", sha],
capture_output=True, text=True, timeout=10,
)
for line in result.stdout.strip().splitlines():
b = line.strip().replace("origin/", "")
if b.startswith("feature/"):
branch = b
break
except Exception:
pass
if not branch:
logger.debug(f"CI status event: could not determine branch for sha={sha}")
return
repo_name = payload.get("repository", {}).get("name", settings.default_repo)
# ORCH-6: ignore CI status for repos outside the project registry.
if not get_project_by_repo(repo_name):
logger.info(f"Gitea CI status: ignoring unknown repo '{repo_name}'")
return
task = get_task_by_repo_branch(repo_name, branch)
if not task:
return
task_id = task["id"]
current_stage = task["stage"]
work_item_id = task.get("work_item_id", "")
if state == "success" and current_stage == "development":
# Verify CI is actually green via API (double-check)
passed, reason = check_ci_green(repo_name, branch)
if passed:
next_stage = "review"
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI green → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
notify_qg_failure(task_id, current_stage, "check_ci_green", reason)
elif state == "failure" and current_stage == "development":
# CI is the authoritative gate for development -> review.
# On red CI: notify, then bounce the task back to the developer (capped retries),
# symmetric to the review REQUEST_CHANGES path.
notify_qg_failure(task_id, current_stage, "check_ci_green", f"Gitea CI failed on branch '{branch}'")
conn = get_db()
retry_count = conn.execute(
"SELECT COUNT(*) as cnt FROM agent_runs WHERE task_id = ? AND agent = 'developer'",
(task_id,),
).fetchone()["cnt"]
conn.close()
if retry_count < MAX_DEV_RETRIES:
# task already on 'development' — no stage change needed, just relaunch developer
try:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {branch}\n"
f"Stage: development\nNote: CI failed, fix and re-push (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: CI failed, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer after CI failure: {e}")
else:
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached after CI failure, escalating")
logger.error(f"Task {task_id}: max retries reached after CI failure, needs manual intervention")
async def handle_pr(payload: dict):
"""
PR event:
- action=reviewed + approved → advance to testing, launch tester
- action=reviewed + request_changes → back to development, relaunch developer (max 3x)
- action=closed + merged → stage=done
"""
action = payload.get("action", "")
pr = payload.get("pull_request", {})
review = payload.get("review", {})
# Get branch from PR head
head_branch = pr.get("head", {}).get("ref", "")
repo_name = payload.get("repository", {}).get("name", settings.default_repo)
if not head_branch:
return
# ORCH-6: ignore PR events for repos outside the project registry.
if not get_project_by_repo(repo_name):
logger.info(f"Gitea PR: ignoring unknown repo '{repo_name}'")
return
task = get_task_by_repo_branch(repo_name, head_branch)
if not task:
logger.debug(f"PR event for branch '{head_branch}' — no matching task")
return
task_id = task["id"]
current_stage = task["stage"]
work_item_id = task.get("work_item_id", "")
if action == "reviewed":
# Gitea sends review.state (older) or review.type (newer format)
review_state = review.get("state", "").upper()
if not review_state and review.get("type", ""):
# Map type field: "pull_request_review_approved" -> "APPROVED"
rtype = review.get("type", "")
if "approved" in rtype.lower():
review_state = "APPROVED"
elif "request_changes" in rtype.lower() or "rejected" in rtype.lower():
review_state = "REQUEST_CHANGES"
if review_state == "APPROVED" and current_stage == "review":
# Advance to testing
pr_number = pr.get("number")
passed, reason = check_review_approved(repo_name, pr_number)
if passed:
next_stage = "testing"
update_task_stage(task_id, next_stage)
notify_stage_change(task_id, current_stage, next_stage)
plane_notify_stage(work_item_id, current_stage, next_stage)
agent = get_agent_for_stage(current_stage)
if agent:
try:
task_desc = f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\nStage: {next_stage}"
job_id = enqueue_job(agent, repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: PR approved → {next_stage}, enqueued '{agent}' (job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to launch agent '{agent}': {e}")
else:
notify_qg_failure(task_id, current_stage, "check_review_approved", reason)
elif review_state == "REQUEST_CHANGES" and current_stage == "review":
# Count retries
conn = get_db()
retry_count = conn.execute(
"SELECT COUNT(*) as cnt FROM agent_runs WHERE task_id = ? AND agent = 'developer'",
(task_id,),
).fetchone()["cnt"]
conn.close()
if retry_count < MAX_DEV_RETRIES:
# Back to development, relaunch developer
update_task_stage(task_id, "development")
notify_stage_change(task_id, current_stage, "development")
try:
task_desc = (
f"Work item: {work_item_id}\nRepo: {repo_name}\nBranch: {head_branch}\n"
f"Stage: development\nNote: Changes requested in review (attempt {retry_count + 1}/{MAX_DEV_RETRIES})"
)
job_id = enqueue_job("developer", repo_name, task_desc, task_id=task_id)
logger.info(f"Task {task_id}: changes requested, enqueued developer (attempt {retry_count + 1}, job_id={job_id})")
except Exception as e:
notify_error(task_id, f"Failed to relaunch developer: {e}")
else:
notify_error(task_id, f"Max developer retries ({MAX_DEV_RETRIES}) reached, escalating")
logger.error(f"Task {task_id}: max retries reached, needs manual intervention")
elif action == "closed" and pr.get("merged", False):
# BUG 8 (second door): at the deploy stage `done` is gated by the
# deployer's verdict (check_deploy_status via advance_stage), NOT by the
# fact that the PR was merged. The deployer merges the PR at the START of
# its run, so a merged webhook arrives ~30s later while the deployer is
# still working — blindly setting done here would fake-complete the task
# and discard a later deploy_status: FAILED verdict. advance_stage will
# drive deploy→done (and Plane→Done) when the deployer job finishes.
# For every OTHER stage the merge-driven done behaviour is preserved.
if current_stage == "deploy":
logger.info(
f"Task {task_id}: PR merged at deploy stage — done gated by "
f"deployer verdict (check_deploy_status), ignoring merge-driven done."
)
return
update_task_stage(task_id, "done")
notify_stage_change(task_id, current_stage, "done")
logger.info(f"Task {task_id}: PR merged, stage → done")