fix(gps-tracks): aggregate last_pipeline_run in health endpoint (REQ-F-12)
Replace raw single-row fetch with aggregation over all pipeline_runs rows sharing the latest started_at. Returns structured object with regions[], sources_ok[], sources_error[], tracks_added instead of a raw DB row with region_id/source_id strings. Returns null when no runs exist (empty DB). Update test_i40_health_endpoint: add db_with_pipeline_runs fixture (two rows, same started_at, two regions) and assert the full aggregated shape including concrete values. Refs: ET-008 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -205,7 +205,12 @@ def create_gps_router(db_path: str) -> APIRouter:
|
||||
|
||||
@router.get("/health")
|
||||
async def gps_health():
|
||||
"""Статистика GPS-треков БД."""
|
||||
"""Статистика GPS-треков БД.
|
||||
|
||||
Поле last_pipeline_run агрегирует все записи pipeline_runs,
|
||||
принадлежащие последнему запуску (по максимальному started_at).
|
||||
Возвращает None если прогонов ещё не было.
|
||||
"""
|
||||
try:
|
||||
conn = _get_conn()
|
||||
cur = conn.cursor()
|
||||
@@ -218,17 +223,52 @@ def create_gps_router(db_path: str) -> APIRouter:
|
||||
)
|
||||
by_activity = {row["activity_type"] or "other": row["cnt"] for row in cur.fetchall()}
|
||||
|
||||
# REQ-F-12: агрегированный объект по всем строкам последнего прогона.
|
||||
# Все строки одного запуска pipeline имеют одинаковый started_at —
|
||||
# pipeline устанавливает его перед итерацией по (region, source).
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT id, started_at, finished_at, region_id, source_id,
|
||||
status, tracks_new, tracks_updated
|
||||
SELECT started_at, finished_at, region_id, source_id,
|
||||
status, tracks_new, errors_json
|
||||
FROM pipeline_runs
|
||||
ORDER BY started_at DESC
|
||||
LIMIT 1
|
||||
WHERE started_at = (SELECT MAX(started_at) FROM pipeline_runs)
|
||||
ORDER BY region_id, source_id
|
||||
"""
|
||||
)
|
||||
last_run_row = cur.fetchone()
|
||||
last_run = dict(last_run_row) if last_run_row else None
|
||||
run_rows = cur.fetchall()
|
||||
|
||||
if run_rows:
|
||||
regions: list = []
|
||||
sources_ok: list = []
|
||||
sources_error: list = []
|
||||
tracks_added = 0
|
||||
finished_at_values: list = []
|
||||
|
||||
for row in run_rows:
|
||||
region = row["region_id"]
|
||||
if region not in regions:
|
||||
regions.append(region)
|
||||
|
||||
if row["status"] in ("ok", "partial"):
|
||||
sources_ok.append(row["source_id"])
|
||||
else:
|
||||
sources_error.append(row["source_id"])
|
||||
|
||||
tracks_added += row["tracks_new"] or 0
|
||||
|
||||
if row["finished_at"]:
|
||||
finished_at_values.append(row["finished_at"])
|
||||
|
||||
last_run: Optional[dict] = {
|
||||
"started_at": run_rows[0]["started_at"],
|
||||
"finished_at": max(finished_at_values) if finished_at_values else None,
|
||||
"regions": regions,
|
||||
"sources_ok": sources_ok,
|
||||
"sources_error": sources_error,
|
||||
"tracks_added": tracks_added,
|
||||
}
|
||||
else:
|
||||
last_run = None
|
||||
|
||||
cur.execute("SELECT sources_json FROM tracks")
|
||||
tracks_by_source: dict = {}
|
||||
|
||||
Reference in New Issue
Block a user