Files
orchestrator/tests/test_orch020_estimator.py
claude-bot 375bd468dd
All checks were successful
CI / test (push) Successful in 1m20s
CI / test (pull_request) Successful in 1m22s
docs(overview): отразить операторскую способность «Оценка» в витрине (ORCH-020)
Закрывает P1 из 12-review.md (attempt 2/3): бизнес-витрина docs/overview/
не отражала новый операторский Plane-статус «Оценка» (ORCH-020).

- business.md: бизнес-способность «Оценка задачи до запуска» + Сценарий 7.
- presentation.md: статус-жест «Оценка» на слайдах 8/9/15.
- tech-pipeline.md: новая секция «Оценка задачи: статус «Оценка»» +
  переформулировано устаревшее «управляющих статусов ровно три» в семейство
  операторских статусов-действий (запуск / гейты / STOP / Оценка).
- tech-integrations.md: то же переформулирование + запись прогноза/факта в Plane.
- tech-data-model.md: таблица task_estimates в списке вспомогательных.
- tech-observability.md: блок estimator в GET /queue + пункт «Оценка» карточки.

Также (P3): ужесточён тавтологичный assert TC-20 (был `... or True`) —
теперь проверяет, что ни один таргет ребра STAGE_TRANSITIONS не равен estimate.

Машинные факты витрины (tests/test_system_docs.py) и control-path не тронуты;
ruff на изменённом файле чист; полный pytest зелёный (2248 passed).

Refs: ORCH-020

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-17 22:04:39 +03:00

558 lines
24 KiB
Python

"""ORCH-020 / TC-01..TC-20: task-estimation side-mechanism (src/estimator.py +
the «Оценка» Plane-status trigger).
These exercise the DETERMINISTIC core (no network, no LLM, no live Plane/Telegram):
* the trigger wiring in handle_issue_updated -> handle_estimate (fail-closed, mutual
exclusion, anti-disruption, auto-return to Backlog, anti-loop, massivity);
* the pure forecast + story-point bucketiser (border cases, bootstrap, never-raise);
* the idempotent UPSERT ledger (task_estimates, keyed by work_item_id);
* the Plane writes (estimate_point for the forecast, point for the fact — not swapped,
fail-safe when the estimate-system is absent);
* the Telegram card line, the GET /queue block, the leaf kill-switch / scope.
Контракт (ADR-001): the estimator is an OBSERVER/PRODUCER, never a Quality Gate / stage
— STAGE_TRANSITIONS / QG_CHECKS / check_* / verdict-keys / existing schemas are NOT
touched (TC-20). All Plane writes go through the ORCH-117 guard (the autouse
``_plane_sandbox_only`` floor keeps the opt-in OFF -> a real write is impossible from a
test process); we spy at the plane_sync boundary to assert the calls without network.
"""
import asyncio
import os
import tempfile
os.environ.setdefault("ORCH_DB_PATH", os.path.join(tempfile.gettempdir(), "test_orch020_estimator.db"))
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
import pytest # noqa: E402
import src.db as db # noqa: E402
from src import config as cfg # noqa: E402
from src import estimator # noqa: E402
_REPO = "orchestrator"
_ORCH_PROJECT = "8da6aa25-a60e-44d6-a1e2-d8ae59aa7d6a" # orchestrator Plane project id
# ===========================================================================
# Fixtures
# ===========================================================================
@pytest.fixture(autouse=True)
def fresh_db(tmp_path, monkeypatch):
"""Isolated tmp SQLite DB + estimator ON / empty scope (self-hosting) by default."""
dbfile = tmp_path / "est.db"
monkeypatch.setattr(db.settings, "db_path", str(dbfile))
monkeypatch.setattr(cfg.settings, "estimator_enabled", True, raising=False)
monkeypatch.setattr(cfg.settings, "estimator_repos", "", raising=False)
monkeypatch.setattr(cfg.settings, "estimator_min_samples", 3, raising=False)
monkeypatch.setattr(cfg.settings, "estimator_bootstrap_tokens", 2_000_000, raising=False)
monkeypatch.setattr(cfg.settings, "estimator_bootstrap_cost_usd", 3.0, raising=False)
monkeypatch.setattr(cfg.settings, "estimator_bootstrap_seconds", 1800, raising=False)
monkeypatch.setattr(cfg.settings, "estimator_sp_cost_thresholds", "0.50,2.00,5.00,12.00", raising=False)
monkeypatch.setattr(cfg.settings, "estimator_wall_cap_s", 86400, raising=False)
# reset in-process counters between tests
for k in estimator._COUNTERS:
estimator._COUNTERS[k] = 0
db.init_db()
yield
@pytest.fixture
def plane_spy(monkeypatch):
"""Patch the plane_sync write boundary estimator uses, recording calls (no network)."""
calls = {"estimate_point": [], "point": [], "comment": [], "backlog": []}
monkeypatch.setattr(
"src.plane_sync.set_issue_estimate_point",
lambda wi, val, project_id=None: (calls["estimate_point"].append((wi, val)) or True),
raising=True,
)
monkeypatch.setattr(
"src.plane_sync.set_issue_point",
lambda wi, val, project_id=None: (calls["point"].append((wi, val)) or True),
raising=True,
)
monkeypatch.setattr(
"src.plane_sync.add_comment",
lambda wi, text, project_id=None, author=None: calls["comment"].append((wi, text)),
raising=True,
)
monkeypatch.setattr(
"src.plane_sync.set_issue_backlog",
lambda wi, project_id=None: calls["backlog"].append(wi),
raising=True,
)
return calls
def _mk_task(plane_id, work_item_id, *, stage="created", track="full",
created_at="2026-06-17 10:00:00", updated_at="2026-06-17 10:30:00") -> int:
conn = db.get_db()
cur = conn.execute(
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, track, "
"created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)",
(plane_id, work_item_id, _REPO, f"feature/{work_item_id}", stage, track,
created_at, updated_at),
)
conn.commit()
tid = int(cur.lastrowid)
conn.close()
return tid
def _mk_run(task_id, *, cost=4.0, in_tok=1000, out_tok=500):
conn = db.get_db()
conn.execute(
"INSERT INTO agent_runs (task_id, agent, input_tokens, output_tokens, "
"cost_usd, started_at, finished_at) VALUES (?,?,?,?,?,?,?)",
(task_id, "developer", in_tok, out_tok, cost,
"2026-06-17 10:00:00", "2026-06-17 10:20:00"),
)
conn.commit()
conn.close()
# ===========================================================================
# TC-01 — trigger recognised (AC-T1)
# ===========================================================================
@pytest.mark.asyncio
async def test_tc01_estimate_status_routes_to_handle_estimate(monkeypatch):
from src.webhooks import plane as plane_wh
from src.plane_sync import _PLANE_NAME_TO_KEY
assert _PLANE_NAME_TO_KEY.get("Оценка") == "estimate"
proj_states = {
"estimate": "EST-UUID", "stop": "STOP-UUID", "to_analyse": "TA-UUID",
"approved": "AP-UUID", "rejected": "RJ-UUID", "confirm_deploy": None,
}
monkeypatch.setattr("src.plane_sync.get_project_states", lambda pid: proj_states)
seen = []
async def _stub(data, project_id=""):
seen.append(data.get("id"))
monkeypatch.setattr(plane_wh, "handle_estimate", _stub)
await plane_wh.handle_issue_updated({"id": "PL-1", "state": {"id": "EST-UUID"}}, "proj")
assert seen == ["PL-1"]
# ===========================================================================
# TC-02 — fail-closed on a board without «Оценка» (AC-T5)
# ===========================================================================
@pytest.mark.asyncio
async def test_tc02_failclosed_no_status(monkeypatch):
from src.webhooks import plane as plane_wh
from src.plane_sync import _DEFAULT_STATES
assert "estimate" not in _DEFAULT_STATES # never force-filled -> .get -> None
proj_states = {
"stop": "STOP-UUID", "to_analyse": "TA-UUID", "approved": "AP-UUID",
"rejected": "RJ-UUID", "confirm_deploy": None,
# NOTE: no "estimate" key (board без статуса)
}
monkeypatch.setattr("src.plane_sync.get_project_states", lambda pid: proj_states)
seen = []
async def _stub(data, project_id=""):
seen.append(data.get("id"))
monkeypatch.setattr(plane_wh, "handle_estimate", _stub)
# An unrelated state -> the estimate branch is inert (no KeyError), no estimate.
await plane_wh.handle_issue_updated({"id": "PL-2", "state": {"id": "SOMETHING-ELSE"}}, "proj")
assert seen == []
# ===========================================================================
# TC-02b — the estimate gesture never aliases stop/approved/rejected
# ===========================================================================
@pytest.mark.asyncio
async def test_tc02b_mutual_exclusion(monkeypatch):
from src.webhooks import plane as plane_wh
proj_states = {
"estimate": "EST-UUID", "stop": "STOP-UUID", "to_analyse": "TA-UUID",
"approved": "AP-UUID", "rejected": "RJ-UUID", "confirm_deploy": None,
}
monkeypatch.setattr("src.plane_sync.get_project_states", lambda pid: proj_states)
seen = {"estimate": [], "stop": []}
async def _est(data, project_id=""):
seen["estimate"].append(data.get("id"))
async def _stop(data, project_id=""):
seen["stop"].append(data.get("id"))
monkeypatch.setattr(plane_wh, "handle_estimate", _est)
monkeypatch.setattr(plane_wh, "handle_stop", _stop)
await plane_wh.handle_issue_updated({"id": "S", "state": {"id": "STOP-UUID"}}, "proj")
await plane_wh.handle_issue_updated({"id": "E", "state": {"id": "EST-UUID"}}, "proj")
assert seen["stop"] == ["S"]
assert seen["estimate"] == ["E"]
# ===========================================================================
# TC-03 — backlog estimate + auto-return to Backlog (AC-T1, AC-T2)
# ===========================================================================
@pytest.mark.asyncio
async def test_tc03_backlog_estimate_and_return(monkeypatch, plane_spy):
from src.webhooks import plane as plane_wh
monkeypatch.setattr("src.plane_sync.fetch_issue_sequence_id", lambda iid, pid: 20)
await plane_wh.handle_estimate({"id": "PL-3"}, _ORCH_PROJECT)
row = db.get_estimate("ORCH-020")
assert row is not None
assert row["forecast_story_points"] in (1, 2, 3, 5, 8)
assert "ORCH-020" in plane_spy["backlog"] # returned to Backlog
# ===========================================================================
# TC-04 — anti-disruption: active job -> no-op (AC-T6)
# ===========================================================================
@pytest.mark.asyncio
async def test_tc04_anti_disruption_active_job(monkeypatch, plane_spy):
from src.webhooks import plane as plane_wh
tid = _mk_task("PL-4", "ORCH-044", stage="development")
db.enqueue_job("developer", _REPO, "x", task_id=tid) # active (queued) job
called = []
monkeypatch.setattr(estimator, "estimate", lambda *a, **k: called.append(a))
await plane_wh.handle_estimate({"id": "PL-4"}, _ORCH_PROJECT)
assert called == [] # estimate not run
assert db.get_estimate("ORCH-044") is None # nothing written
assert plane_spy["backlog"] == [] # status not changed
# ===========================================================================
# TC-05 — anti-loop: Backlog matches no trigger branch (AC-T6)
# ===========================================================================
@pytest.mark.asyncio
async def test_tc05_anti_loop_backlog_echo(monkeypatch):
from src.webhooks import plane as plane_wh
proj_states = {
"backlog": "BACKLOG-UUID", "estimate": "EST-UUID", "stop": "STOP-UUID",
"to_analyse": "TA-UUID", "approved": "AP-UUID", "rejected": "RJ-UUID",
"confirm_deploy": "CD-UUID",
}
# Backlog UUID must collide with NONE of the trigger branches (anti-loop invariant).
triggers = {proj_states[k] for k in
("estimate", "stop", "to_analyse", "approved", "rejected", "confirm_deploy")}
assert proj_states["backlog"] not in triggers
monkeypatch.setattr("src.plane_sync.get_project_states", lambda pid: proj_states)
seen = []
async def _est(data, project_id=""):
seen.append(data.get("id"))
monkeypatch.setattr(plane_wh, "handle_estimate", _est)
# Inbound "state -> Backlog" echo: no trigger branch matches -> no estimate.
await plane_wh.handle_issue_updated({"id": "PL-5", "state": {"id": "BACKLOG-UUID"}}, "proj")
assert seen == []
# ===========================================================================
# TC-06 — massivity: N webhooks -> N estimates (AC-T3)
# ===========================================================================
@pytest.mark.asyncio
async def test_tc06_massivity(monkeypatch, plane_spy):
from src.webhooks import plane as plane_wh
seqs = {"PL-A": 61, "PL-B": 62, "PL-C": 63}
monkeypatch.setattr("src.plane_sync.fetch_issue_sequence_id", lambda iid, pid: seqs[iid])
for pid in ("PL-A", "PL-B", "PL-C"):
await plane_wh.handle_estimate({"id": pid}, _ORCH_PROJECT)
for wi in ("ORCH-061", "ORCH-062", "ORCH-063"):
assert db.get_estimate(wi) is not None
assert len(plane_spy["backlog"]) == 3
# ===========================================================================
# TC-07 — idempotent re-estimation: UPSERT by work_item_id (AC-T4)
# ===========================================================================
def test_tc07_idempotent_upsert(plane_spy):
estimator.estimate("ORCH-070", None, _REPO, "status")
estimator.estimate("ORCH-070", None, _REPO, "status") # re-estimate
conn = db.get_db()
n = conn.execute(
"SELECT COUNT(*) FROM task_estimates WHERE work_item_id = ?", ("ORCH-070",)
).fetchone()[0]
conn.close()
assert n == 1 # single row, no duplicate
row = db.get_estimate("ORCH-070")
assert row["estimate_count"] == 2 # bumped on re-estimate
assert len(plane_spy["estimate_point"]) == 2 # estimate_point written each time
# ===========================================================================
# TC-08 — estimate() returns 4 values, SP in {1,2,3,5,8} (AC-1)
# ===========================================================================
def test_tc08_forecast_shape(plane_spy):
f = estimator.estimate("ORCH-080", None, _REPO, "status")
assert set(f) >= {"forecast_tokens", "forecast_seconds", "forecast_cost_usd", "story_points"}
assert f["story_points"] in (1, 2, 3, 5, 8)
assert isinstance(f["forecast_tokens"], int)
assert isinstance(f["forecast_cost_usd"], float)
# ===========================================================================
# TC-09 — story-point bucketiser: exact border semantics (AC-2)
# ===========================================================================
def test_tc09_story_points_borders():
sp = estimator.story_points_for
# thresholds 0.50, 2.00, 5.00, 12.00 ; <= ascending
assert sp(0.0) == 1
assert sp(0.50) == 1
assert sp(0.51) == 2
assert sp(2.00) == 2
assert sp(2.01) == 3
assert sp(5.00) == 3
assert sp(5.01) == 5
assert sp(12.00) == 5
assert sp(12.01) == 8
assert sp(1000.0) == 8
# accepts a forecast dict too
assert sp({"forecast_cost_usd": 0.4}) == 1
# every output is on-scale
for c in (0, 0.5, 1, 2, 3, 5, 8, 13, 50):
assert sp(c) in (1, 2, 3, 5, 8)
# ===========================================================================
# TC-10 — empty history -> bootstrap, never-raise on broken data (AC-1, AC-9)
# ===========================================================================
def test_tc10_bootstrap_and_never_raise(monkeypatch, plane_spy):
# empty history -> bootstrap (not an exception)
f = estimator._forecast(_REPO, "full")
assert f["forecast_cost_usd"] == 3.0
assert f["story_points"] == estimator.story_points_for(3.0)
# broken DB aggregate -> still bootstrap, no exception escapes
def _boom(repo, track):
raise RuntimeError("db down")
monkeypatch.setattr("src.db.completed_task_stats", _boom)
f2 = estimator._forecast(_REPO, "full")
assert f2["forecast_cost_usd"] == 3.0
# estimate() never-raise even if persistence fails
def _boom_rec(**kw):
raise RuntimeError("persist down")
monkeypatch.setattr("src.db.record_estimate", _boom_rec)
f3 = estimator.estimate("ORCH-100", None, _REPO, "status")
assert f3["story_points"] in (1, 2, 3, 5, 8)
# ===========================================================================
# TC-11 — fact on done from usage aggregates (AC-4)
# ===========================================================================
def test_tc11_fact_on_done(plane_spy):
tid = _mk_task("PL-11", "ORCH-110", stage="done")
_mk_run(tid, cost=4.0, in_tok=1000, out_tok=500) # cost 4.0 -> SP 3
ok = estimator.record_actual_on_done(tid, _REPO, "ORCH-110")
assert ok is True
row = db.get_estimate("ORCH-110")
assert row["actual_story_points"] == 3
assert abs(row["actual_cost_usd"] - 4.0) < 1e-6
assert row["actual_tokens"] == 1500 # total_in (1000) + total_out (500)
assert row["actual_seconds"] == 1800 # 30-min wall
assert ("ORCH-110", 3) in plane_spy["point"] # fact -> Plane `point`
# ===========================================================================
# TC-12 — forecast -> estimate_point, fact -> point; not swapped, no overwrite (AC-3/AC-4)
# ===========================================================================
def test_tc12_fields_not_swapped(plane_spy):
# forecast write -> estimate_point only
f = estimator.estimate("ORCH-120", None, _REPO, "status")
assert ("ORCH-120", f["story_points"]) in plane_spy["estimate_point"]
assert plane_spy["point"] == [] # forecast does NOT touch `point`
forecast_sp = db.get_estimate("ORCH-120")["forecast_story_points"]
# fact write -> point only, forecast untouched
tid = _mk_task("PL-12", "ORCH-120", stage="done")
_mk_run(tid, cost=0.2) # cost 0.2 -> SP 1
estimator.record_actual_on_done(tid, _REPO, "ORCH-120")
assert ("ORCH-120", 1) in plane_spy["point"]
row = db.get_estimate("ORCH-120")
assert row["actual_story_points"] == 1
assert row["forecast_story_points"] == forecast_sp # forecast NOT overwritten
# ===========================================================================
# TC-13 — «Оценка» line in the Telegram card; empty forecast -> omitted (AC-5)
# ===========================================================================
def test_tc13_card_line(plane_spy):
from src.notifications import render_task_tracker
# no forecast yet -> no line, card still renders
assert estimator.card_line("ORCH-130") is None
tid = _mk_task("PL-13", "ORCH-130", stage="development")
card_before = render_task_tracker(tid)
assert "Оценка" not in card_before
# after a forecast -> the line appears
estimator.estimate("ORCH-130", None, _REPO, "status")
line = estimator.card_line("ORCH-130")
assert line is not None and "Оценка" in line
card_after = render_task_tracker(tid)
assert "Оценка" in card_after
# ===========================================================================
# TC-14 — Plane comment with the forecast (AC-6)
# ===========================================================================
def test_tc14_comment_posted(plane_spy):
estimator.estimate("ORCH-140", None, _REPO, "status")
assert len(plane_spy["comment"]) == 1
wi, text = plane_spy["comment"][0]
assert wi == "ORCH-140"
assert "Оценка" in text
# ===========================================================================
# TC-15 — kill-switch off -> module inert (AC-9)
# ===========================================================================
@pytest.mark.asyncio
async def test_tc15_kill_switch_off(monkeypatch, plane_spy):
from src.webhooks import plane as plane_wh
monkeypatch.setattr(cfg.settings, "estimator_enabled", False, raising=False)
assert estimator.applies(_REPO) is False
monkeypatch.setattr("src.plane_sync.fetch_issue_sequence_id", lambda iid, pid: 99)
await plane_wh.handle_estimate({"id": "PL-15"}, _ORCH_PROJECT)
assert db.get_estimate("ORCH-099") is None # nothing written
assert plane_spy["backlog"] == []
assert plane_spy["estimate_point"] == []
# ===========================================================================
# TC-16 — scope: empty -> self-hosting only (AC-9)
# ===========================================================================
def test_tc16_scope(monkeypatch):
# empty scope -> self-hosting only
monkeypatch.setattr(cfg.settings, "estimator_repos", "", raising=False)
assert estimator.applies("orchestrator") is True
assert estimator.applies("enduro-trails") is False
# explicit CSV scope
monkeypatch.setattr(cfg.settings, "estimator_repos", "enduro-trails", raising=False)
assert estimator.applies("enduro-trails") is True
assert estimator.applies("orchestrator") is False
# ===========================================================================
# TC-17 — GET /queue has the estimator block (AC-9)
# ===========================================================================
def test_tc17_queue_block():
from src.main import queue
result = asyncio.run(queue())
assert "estimator" in result
block = result["estimator"]
assert block["enabled"] is True
assert "counters" in block
assert "ledger" in block
# ===========================================================================
# TC-18 — additive table + helpers; idempotent migration (AC-12)
# ===========================================================================
def test_tc18_table_and_helpers():
# CREATE TABLE IF NOT EXISTS is idempotent
db.init_db()
db.init_db()
# record (task_id nullable) -> get
rid = db.record_estimate(
"ORCH-180", repo=_REPO, task_id=None,
forecast_tokens=1000, forecast_seconds=600, forecast_cost_usd=1.5,
forecast_story_points=2, source="status",
)
assert rid > 0
row = db.get_estimate("ORCH-180")
assert row["task_id"] is None
assert row["forecast_story_points"] == 2
assert row["estimate_count"] == 1
# set_actual stores the fact + delta-computable; does not touch forecast
db.set_actual("ORCH-180", actual_tokens=2000, actual_seconds=900,
actual_cost_usd=2.5, actual_story_points=3, task_id=42)
row = db.get_estimate("ORCH-180")
assert row["actual_story_points"] == 3
assert row["forecast_story_points"] == 2 # forecast preserved
assert row["task_id"] == 42 # linked later
# existing tables intact (additive only)
conn = db.get_db()
names = {r[0] for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
conn.close()
assert {"tasks", "agent_runs", "jobs", "task_estimates"} <= names
# ===========================================================================
# TC-19 — fail-safe Plane write when estimate-system absent (AC-12, NFR-7)
# ===========================================================================
def test_tc19_failsafe_plane_write(monkeypatch):
import src.plane_sync as ps
# estimate-system not configured -> resolve returns {} -> best-effort skip, no raise
monkeypatch.setattr(ps, "get_project_estimate_points", lambda pid: {})
assert ps.set_issue_estimate_point("ORCH-190", 3) is False # skipped, never raises
# the forecast persists and the flow does not crash even with the estimate-system absent
monkeypatch.setattr(ps, "add_comment", lambda *a, **k: None)
f = estimator.estimate("ORCH-190", None, _REPO, "status")
assert f["story_points"] in (1, 2, 3, 5, 8)
assert db.get_estimate("ORCH-190") is not None
# ===========================================================================
# TC-20 — control-path anti-regression (AC-10, AC-11)
# ===========================================================================
def test_tc20_control_path_untouched():
from src.stages import STAGE_TRANSITIONS
from src.qg.checks import QG_CHECKS
# «Оценка» is NOT a pipeline stage / edge.
assert "estimate" not in STAGE_TRANSITIONS
for _stage, edges in STAGE_TRANSITIONS.items():
# no edge target (next stage / rollback / gate) is the estimate gesture:
# estimation never advances or routes a task through the stage machine.
string_targets = [v for v in edges.values() if isinstance(v, str)]
assert "estimate" not in string_targets, (
f"STAGE_TRANSITIONS[{_stage!r}] routes to the estimate gesture"
)
# No new QG check registered for estimation.
assert not any("estimate" in str(k).lower() for k in QG_CHECKS)
# The estimator leaf does not import stage_engine / launcher at module load
# (leaf invariant — never on the control path).
import sys
import importlib
importlib.reload(importlib.import_module("src.estimator"))
src_estimator = sys.modules["src.estimator"]
# the module references config + lazy imports only; assert it has no module-level
# binding to stage_engine / launcher
assert not hasattr(src_estimator, "stage_engine")
assert not hasattr(src_estimator, "launcher")
# Step 2 (adaptive model selection) is out of scope: no model/effort override here.
src_text = open(src_estimator.__file__, encoding="utf-8").read()
assert "resolve_agent_model" not in src_text
assert "resolve_agent_effort" not in src_text