feat(lessons): machine lessons-journal — additive table + observer leaf (ORCH-098)

Step 1 ("Foundation", F2) of the self-improvement epic: formalise free-text
"lessons" from memory/ into a machine-readable `lessons` table — the foundation
for the future retrospective agent (E2), the RICE prioritiser (E3) and Стрим.

- src/lessons.py: pure never-raise observer leaf (record/get/update/snapshot),
  kill-switch only, NO repo scope (observer-only; records about any repo incl.
  enduro; repo cut on the read side). Slug-convention constants.
- src/db.py: additive idempotent `lessons` table in init_db() (+3 indexes);
  nullable attribution columns from the start (NFR-6, _ensure_column forward-safe);
  helpers record_lesson/get_lessons/update_lesson/lessons_snapshot/
  lessons_recent_dup_exists (auto-dedup window).
- 4 auto-detectors (best-effort, source="auto", deduped): gate_failure
  (_handle_qg_failure_rollbacks), merge_hold (_handle_merge_verify HOLD),
  transient_retry (launcher._finalize_transient budget-exhaustion), deploy_degraded
  (post-deploy DEGRADED -> set_repo_freeze).
- src/main.py: GET /lessons, POST /lessons, POST /lessons/{id} + read-only
  `lessons` block in GET /queue; off-switch -> {"enabled": false}.
- src/config.py: lessons_enabled / lessons_query_limit_default / lessons_dedup_window_s.
- tests/test_lessons.py: TC-01..TC-12 (unit + integration), all green.
- Docs: CLAUDE.md, docs/architecture/README.md (component + schema + API), CHANGELOG.

Invariant: the journal is an OBSERVER, not a Quality Gate — STAGE_TRANSITIONS /
QG_CHECKS / check_* / machine-verdict / existing table schemas are byte-for-byte
untouched; enduro not affected. never-raise on every public fn + injection.

Refs: ORCH-098
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-10 10:24:40 +03:00
parent 9f62df02eb
commit 7d21625d84
9 changed files with 985 additions and 3 deletions

185
src/db.py
View File

@@ -220,10 +220,195 @@ def init_db():
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
""")
# ORCH-098 (FR-1, ADR-001 D1): additive machine lessons-journal — a structured
# table of pipeline deviations (gate-fail / merge-hold / transient-retry /
# post-deploy-degraded), the foundation of the self-improvement epic (E2
# retrospective / E3 RICE prioritiser). Purely ADDITIVE (CREATE TABLE/INDEX IF NOT
# EXISTS, pattern repo_freeze/coverage_baseline) -> idempotent, restart-safe on
# the shared prod DB; existing tables untouched (NFR-3, enduro-trails not
# affected). The attribution columns (attribution/target_repo/target_domain) are
# NULLABLE and present FROM THE START (Слава 10.06, NFR-6) so the live shared DB
# never needs a schema rework — an auto-recorded `unknown` lesson is classified
# later via update. lesson_type / attribution / target_domain carry NO enum/CHECK
# constraint: the values are a forward-compatible slug convention (a new lesson
# type never needs a migration). See docs/work-items/ORCH-098/08-data-requirements.md.
conn.executescript("""
CREATE TABLE IF NOT EXISTS lessons (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT,
lesson_type TEXT NOT NULL,
work_item_id TEXT,
task_id INTEGER,
stage TEXT,
agent TEXT,
repo TEXT,
root_cause TEXT,
suggestion TEXT,
status TEXT NOT NULL DEFAULT 'new',
related_task TEXT,
attribution TEXT,
target_repo TEXT,
target_domain TEXT,
source TEXT,
detail TEXT
);
CREATE INDEX IF NOT EXISTS idx_lessons_type_status ON lessons (lesson_type, status);
CREATE INDEX IF NOT EXISTS idx_lessons_repo ON lessons (repo);
CREATE INDEX IF NOT EXISTS idx_lessons_wi_type ON lessons (work_item_id, lesson_type);
""")
# Forward-safe: on an already-created `lessons` table the attribution columns are
# added idempotently (_ensure_column is a no-op once present) so an old prod DB
# picks them up without a data migration (NFR-6, AC-2).
_ensure_column(conn, "lessons", "attribution", "TEXT")
_ensure_column(conn, "lessons", "target_repo", "TEXT")
_ensure_column(conn, "lessons", "target_domain", "TEXT")
conn.commit()
conn.close()
# ---------------------------------------------------------------------------
# ORCH-098 (FR-1..FR-5, ADR-001 D1): lessons-journal DDL helpers. Each opens its
# own connection and closes it in `finally` (pattern coverage_baseline). The leaf
# src/lessons.py wraps these in its never-raise contract — these may raise on a
# real DB fault (the leaf swallows it).
# ---------------------------------------------------------------------------
# The full column set, in INSERT order. Single source of truth so record/get stay
# in lockstep with the schema.
_LESSON_COLUMNS = (
"lesson_type", "work_item_id", "task_id", "stage", "agent", "repo",
"root_cause", "suggestion", "status", "related_task",
"attribution", "target_repo", "target_domain", "source", "detail",
)
# Fields an update() may set (everything mutable; never id/created_at/lesson_type).
_LESSON_UPDATABLE = (
"status", "attribution", "target_repo", "target_domain", "related_task",
"root_cause", "suggestion", "stage", "agent", "repo", "detail",
)
def record_lesson(**fields) -> int:
"""Insert one lessons row; return the new id. Raises only on a real DB fault.
Only the known columns in ``_LESSON_COLUMNS`` are written; unknown keys are
ignored (forward-safe). ``created_at`` is stamped by the table default.
"""
cols = [c for c in _LESSON_COLUMNS if c in fields]
if "lesson_type" not in cols:
raise ValueError("record_lesson requires lesson_type")
placeholders = ", ".join("?" for _ in cols)
sql = f"INSERT INTO lessons ({', '.join(cols)}) VALUES ({placeholders})"
conn = get_db()
try:
cur = conn.execute(sql, tuple(fields[c] for c in cols))
conn.commit()
return int(cur.lastrowid)
finally:
conn.close()
def lessons_recent_dup_exists(work_item_id, lesson_type, stage, window_s: int) -> bool:
"""ORCH-098 (D4): is there an auto-lesson with the same (work_item_id,
lesson_type, stage) within the last ``window_s`` seconds? One indexed lookup on
``idx_lessons_wi_type``. Used to suppress duplicate auto-records on retries.
"""
conn = get_db()
try:
row = conn.execute(
"SELECT 1 FROM lessons "
"WHERE work_item_id IS ? AND lesson_type = ? AND stage IS ? "
"AND source = 'auto' "
"AND created_at > datetime('now', ?) LIMIT 1",
(work_item_id, lesson_type, stage, f"-{int(window_s)} seconds"),
).fetchone()
finally:
conn.close()
return row is not None
def get_lessons(*, lesson_type=None, status=None, repo=None, work_item_id=None,
limit: int = 100) -> list[dict]:
"""Read-only parametrised SELECT of lessons (ORDER BY id DESC LIMIT ?)."""
where = []
params: list = []
if lesson_type:
where.append("lesson_type = ?")
params.append(lesson_type)
if status:
where.append("status = ?")
params.append(status)
if repo:
where.append("repo = ?")
params.append(repo)
if work_item_id:
where.append("work_item_id = ?")
params.append(work_item_id)
sql = "SELECT * FROM lessons"
if where:
sql += " WHERE " + " AND ".join(where)
sql += " ORDER BY id DESC LIMIT ?"
try:
lim = int(limit)
except (TypeError, ValueError):
lim = 100
params.append(max(1, lim))
conn = get_db()
try:
rows = conn.execute(sql, tuple(params)).fetchall()
finally:
conn.close()
return [dict(r) for r in rows]
def update_lesson(lesson_id: int, **fields) -> bool:
"""Update mutable fields of a lesson + stamp updated_at. Returns True iff a row
changed. Unknown / non-updatable keys are ignored (forward-safe).
"""
sets = [c for c in _LESSON_UPDATABLE if c in fields]
if not sets:
return False
assignments = ", ".join(f"{c} = ?" for c in sets)
sql = f"UPDATE lessons SET {assignments}, updated_at = datetime('now') WHERE id = ?"
conn = get_db()
try:
cur = conn.execute(sql, tuple(fields[c] for c in sets) + (int(lesson_id),))
conn.commit()
return (cur.rowcount or 0) > 0
finally:
conn.close()
def lessons_snapshot(recent: int = 10) -> dict:
"""Light GROUP BY summary (counts by type/status) + the last N lessons, for the
GET /queue observability block."""
conn = get_db()
try:
total = conn.execute("SELECT COUNT(*) FROM lessons").fetchone()[0]
by_type = {
r["lesson_type"]: r["n"]
for r in conn.execute(
"SELECT lesson_type, COUNT(*) AS n FROM lessons GROUP BY lesson_type"
).fetchall()
}
by_status = {
r["status"]: r["n"]
for r in conn.execute(
"SELECT status, COUNT(*) AS n FROM lessons GROUP BY status"
).fetchall()
}
rows = conn.execute(
"SELECT * FROM lessons ORDER BY id DESC LIMIT ?", (max(1, int(recent)),)
).fetchall()
finally:
conn.close()
return {
"total": total,
"by_type": by_type,
"by_status": by_status,
"recent": [dict(r) for r in rows],
}
def get_coverage_baseline(repo: str) -> float | None:
"""ORCH-027: read the per-repo coverage baseline (%, line coverage).