160 lines
6.3 KiB
Python
160 lines
6.3 KiB
Python
"""ORCH-067 — Group A: bump is the DEFAULT tracker mode (AC-1..AC-4, AC-15).
|
|
|
|
The default flipped edit -> bump: out of the box the live card is re-created at
|
|
the BOTTOM of the chat (delete old + send new silent + repoint id), one card per
|
|
task. edit stays available via ORCH_TRACKER_MODE=edit. Network is isolated: the
|
|
low-level send/edit/delete helpers are patched per case; the DB is a temp SQLite.
|
|
|
|
Test ids TC-01..TC-04 + TC-17 from 04-test-plan.yaml.
|
|
"""
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
os.environ.setdefault("ORCH_PLANE_API_TOKEN", "test-token")
|
|
os.environ.setdefault("ORCH_GITEA_TOKEN", "test-token")
|
|
|
|
_test_db = os.path.join(tempfile.gettempdir(), "test_orchestrator_bump_default.db")
|
|
os.environ["ORCH_DB_PATH"] = _test_db
|
|
|
|
import pytest # noqa: E402
|
|
|
|
import src.db as db_module # noqa: E402
|
|
from src.config import Settings # noqa: E402
|
|
from src.db import ( # noqa: E402
|
|
init_db, get_db, get_tracker_message_id, set_tracker_message_id,
|
|
)
|
|
from src import notifications as N # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_db(monkeypatch):
|
|
monkeypatch.setattr(db_module.settings, "db_path", _test_db, raising=False)
|
|
if os.path.exists(_test_db):
|
|
os.unlink(_test_db)
|
|
init_db()
|
|
yield
|
|
if os.path.exists(_test_db):
|
|
os.unlink(_test_db)
|
|
|
|
|
|
def _mk_task(stage="development", wid="ORCH-067"):
|
|
conn = get_db()
|
|
cur = conn.execute(
|
|
"INSERT INTO tasks (plane_id, work_item_id, repo, branch, stage, title) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
("p1", wid, "orchestrator", "feature/ORCH-067-x", stage, "bump default"),
|
|
)
|
|
tid = cur.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
return tid
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-01 / AC-1 — default tracker_mode == "bump"
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc01_default_tracker_mode_is_bump(monkeypatch):
|
|
monkeypatch.delenv("ORCH_TRACKER_MODE", raising=False)
|
|
assert Settings().tracker_mode == "bump"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-02 / AC-2, AC-15 — repeat update: delete(old) -> send(silent) -> repoint
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc02_repeat_delete_send_silent_repoint(monkeypatch):
|
|
# No env -> resolves to the new bump default (no explicit mode pin).
|
|
monkeypatch.setattr(N._get_settings(), "tracker_mode", "bump", raising=False)
|
|
tid = _mk_task()
|
|
set_tracker_message_id(tid, 100)
|
|
|
|
order = []
|
|
monkeypatch.setattr(N, "delete_telegram",
|
|
lambda mid: order.append(("delete", mid)) or True)
|
|
monkeypatch.setattr(N, "send_telegram",
|
|
lambda text, disable_notification=False:
|
|
order.append(("send", disable_notification)) or 200)
|
|
|
|
N.update_task_tracker(tid)
|
|
|
|
# delete(old) strictly before send; the new card is SILENT (disable=True).
|
|
assert order == [("delete", 100), ("send", True)]
|
|
assert get_tracker_message_id(tid) == 200 # one card -> repointed
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-03 / AC-3 — transient send None must NOT wipe the pointer / duplicate
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc03_send_none_keeps_pointer_no_dupe(monkeypatch):
|
|
monkeypatch.setattr(N._get_settings(), "tracker_mode", "bump", raising=False)
|
|
tid = _mk_task()
|
|
set_tracker_message_id(tid, 100)
|
|
|
|
sends = []
|
|
monkeypatch.setattr(N, "delete_telegram", lambda mid: True)
|
|
monkeypatch.setattr(N, "send_telegram",
|
|
lambda text, disable_notification=False:
|
|
sends.append(1) or None)
|
|
|
|
N.update_task_tracker(tid) # must not raise
|
|
|
|
assert len(sends) == 1 # exactly one (failed) attempt, no retry
|
|
assert get_tracker_message_id(tid) == 100 # pointer preserved, not None
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-04 / AC-4 — edit mode still reachable via env -> editMessageText path
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc04_edit_mode_still_available(monkeypatch):
|
|
monkeypatch.setattr(N._get_settings(), "tracker_mode", "edit", raising=False)
|
|
tid = _mk_task()
|
|
set_tracker_message_id(tid, 777)
|
|
|
|
edited = {}
|
|
monkeypatch.setattr(N, "edit_telegram",
|
|
lambda mid, text: edited.update(mid=mid) or N.EDIT_OK)
|
|
monkeypatch.setattr(
|
|
N, "send_telegram",
|
|
lambda *a, **k: (_ for _ in ()).throw(
|
|
AssertionError("edit mode must not send when edit succeeds")),
|
|
)
|
|
|
|
N.update_task_tracker(tid)
|
|
assert edited["mid"] == 777 # edited in place, no new card
|
|
|
|
|
|
def test_tc04b_edit_mode_resolution_case_insensitive(monkeypatch):
|
|
"""Anything other than 'bump' resolves to edit (e.g. 'EDIT')."""
|
|
monkeypatch.setattr(N._get_settings(), "tracker_mode", "EDIT", raising=False)
|
|
tid = _mk_task()
|
|
set_tracker_message_id(tid, 5)
|
|
edited = {}
|
|
monkeypatch.setattr(N, "edit_telegram",
|
|
lambda mid, text: edited.update(mid=mid) or N.EDIT_OK)
|
|
monkeypatch.setattr(N, "send_telegram",
|
|
lambda *a, **k: (_ for _ in ()).throw(
|
|
AssertionError("should edit, not send")))
|
|
N.update_task_tracker(tid)
|
|
assert edited["mid"] == 5
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# TC-17 / AC-15 — first bump call: NO delete, silent send, id stored
|
|
# --------------------------------------------------------------------------- #
|
|
def test_tc17_first_call_silent_no_delete(monkeypatch):
|
|
monkeypatch.setattr(N._get_settings(), "tracker_mode", "bump", raising=False)
|
|
tid = _mk_task(stage="analysis")
|
|
|
|
sends = []
|
|
monkeypatch.setattr(N, "send_telegram",
|
|
lambda text, disable_notification=False:
|
|
sends.append(disable_notification) or 555)
|
|
monkeypatch.setattr(N, "delete_telegram",
|
|
lambda mid: (_ for _ in ()).throw(
|
|
AssertionError("delete must not run on first call")))
|
|
|
|
N.update_task_tracker(tid)
|
|
|
|
assert sends == [True] # exactly one SILENT send
|
|
assert get_tracker_message_id(tid) == 555 # id stored
|