fix(gps-tracks): aggregate last_pipeline_run in health endpoint (REQ-F-12)
Replace raw single-row fetch with aggregation over all pipeline_runs rows sharing the latest started_at. Returns structured object with regions[], sources_ok[], sources_error[], tracks_added instead of a raw DB row with region_id/source_id strings. Returns null when no runs exist (empty DB). Update test_i40_health_endpoint: add db_with_pipeline_runs fixture (two rows, same started_at, two regions) and assert the full aggregated shape including concrete values. Refs: ET-008 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -205,7 +205,12 @@ def create_gps_router(db_path: str) -> APIRouter:
|
|||||||
|
|
||||||
@router.get("/health")
|
@router.get("/health")
|
||||||
async def gps_health():
|
async def gps_health():
|
||||||
"""Статистика GPS-треков БД."""
|
"""Статистика GPS-треков БД.
|
||||||
|
|
||||||
|
Поле last_pipeline_run агрегирует все записи pipeline_runs,
|
||||||
|
принадлежащие последнему запуску (по максимальному started_at).
|
||||||
|
Возвращает None если прогонов ещё не было.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
conn = _get_conn()
|
conn = _get_conn()
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
@@ -218,17 +223,52 @@ def create_gps_router(db_path: str) -> APIRouter:
|
|||||||
)
|
)
|
||||||
by_activity = {row["activity_type"] or "other": row["cnt"] for row in cur.fetchall()}
|
by_activity = {row["activity_type"] or "other": row["cnt"] for row in cur.fetchall()}
|
||||||
|
|
||||||
|
# REQ-F-12: агрегированный объект по всем строкам последнего прогона.
|
||||||
|
# Все строки одного запуска pipeline имеют одинаковый started_at —
|
||||||
|
# pipeline устанавливает его перед итерацией по (region, source).
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"""
|
"""
|
||||||
SELECT id, started_at, finished_at, region_id, source_id,
|
SELECT started_at, finished_at, region_id, source_id,
|
||||||
status, tracks_new, tracks_updated
|
status, tracks_new, errors_json
|
||||||
FROM pipeline_runs
|
FROM pipeline_runs
|
||||||
ORDER BY started_at DESC
|
WHERE started_at = (SELECT MAX(started_at) FROM pipeline_runs)
|
||||||
LIMIT 1
|
ORDER BY region_id, source_id
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
last_run_row = cur.fetchone()
|
run_rows = cur.fetchall()
|
||||||
last_run = dict(last_run_row) if last_run_row else None
|
|
||||||
|
if run_rows:
|
||||||
|
regions: list = []
|
||||||
|
sources_ok: list = []
|
||||||
|
sources_error: list = []
|
||||||
|
tracks_added = 0
|
||||||
|
finished_at_values: list = []
|
||||||
|
|
||||||
|
for row in run_rows:
|
||||||
|
region = row["region_id"]
|
||||||
|
if region not in regions:
|
||||||
|
regions.append(region)
|
||||||
|
|
||||||
|
if row["status"] in ("ok", "partial"):
|
||||||
|
sources_ok.append(row["source_id"])
|
||||||
|
else:
|
||||||
|
sources_error.append(row["source_id"])
|
||||||
|
|
||||||
|
tracks_added += row["tracks_new"] or 0
|
||||||
|
|
||||||
|
if row["finished_at"]:
|
||||||
|
finished_at_values.append(row["finished_at"])
|
||||||
|
|
||||||
|
last_run: Optional[dict] = {
|
||||||
|
"started_at": run_rows[0]["started_at"],
|
||||||
|
"finished_at": max(finished_at_values) if finished_at_values else None,
|
||||||
|
"regions": regions,
|
||||||
|
"sources_ok": sources_ok,
|
||||||
|
"sources_error": sources_error,
|
||||||
|
"tracks_added": tracks_added,
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
last_run = None
|
||||||
|
|
||||||
cur.execute("SELECT sources_json FROM tracks")
|
cur.execute("SELECT sources_json FROM tracks")
|
||||||
tracks_by_source: dict = {}
|
tracks_by_source: dict = {}
|
||||||
|
|||||||
@@ -8,9 +8,7 @@ I-30: MVT тайл отдаётся
|
|||||||
I-31: cache hit
|
I-31: cache hit
|
||||||
I-40: health endpoint
|
I-40: health endpoint
|
||||||
"""
|
"""
|
||||||
import json
|
|
||||||
import pytest
|
import pytest
|
||||||
import pytest_asyncio
|
|
||||||
|
|
||||||
from httpx import AsyncClient, ASGITransport
|
from httpx import AsyncClient, ASGITransport
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
@@ -99,6 +97,52 @@ def db_with_tracks(tmp_path):
|
|||||||
yield db_path
|
yield db_path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def db_with_pipeline_runs(db_with_tracks):
|
||||||
|
"""БД с треками и записями о прогонах pipeline (REQ-F-12).
|
||||||
|
|
||||||
|
Один прогон охватывает два региона и один источник.
|
||||||
|
Имитирует ситуацию когда pipeline записал две строки
|
||||||
|
с одинаковым started_at (один запуск скрипта).
|
||||||
|
"""
|
||||||
|
db_path = db_with_tracks
|
||||||
|
conn = open_db(db_path)
|
||||||
|
|
||||||
|
conn.executemany(
|
||||||
|
"""
|
||||||
|
INSERT INTO pipeline_runs
|
||||||
|
(started_at, finished_at, region_id, source_id,
|
||||||
|
status, tracks_new, tracks_updated, errors_json)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
[
|
||||||
|
(
|
||||||
|
"2026-05-30T03:00:00Z",
|
||||||
|
"2026-05-30T04:00:00Z",
|
||||||
|
"cfo",
|
||||||
|
"osm",
|
||||||
|
"ok",
|
||||||
|
42,
|
||||||
|
5,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"2026-05-30T03:00:00Z",
|
||||||
|
"2026-05-30T05:14:00Z",
|
||||||
|
"chuvashia",
|
||||||
|
"osm",
|
||||||
|
"ok",
|
||||||
|
10,
|
||||||
|
2,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
yield db_path
|
||||||
|
|
||||||
|
|
||||||
# ─── I-20: GeoJSON с фильтрами ────────────────────────────────────────────────
|
# ─── I-20: GeoJSON с фильтрами ────────────────────────────────────────────────
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@@ -304,9 +348,13 @@ async def test_i31_cache_hit(db_with_tracks):
|
|||||||
# ─── I-40: health endpoint ────────────────────────────────────────────────────
|
# ─── I-40: health endpoint ────────────────────────────────────────────────────
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_i40_health_endpoint(db_with_tracks):
|
async def test_i40_health_endpoint(db_with_pipeline_runs):
|
||||||
"""I-40: health endpoint возвращает корректную статистику."""
|
"""I-40: health endpoint возвращает корректную статистику.
|
||||||
app = _make_test_app(db_with_tracks)
|
|
||||||
|
REQ-F-12: last_pipeline_run — агрегированный объект, а не сырая строка БД.
|
||||||
|
Структура: started_at, finished_at, regions[], sources_ok[], sources_error[], tracks_added.
|
||||||
|
"""
|
||||||
|
app = _make_test_app(db_with_pipeline_runs)
|
||||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
||||||
resp = await client.get("/api/gps-tracks/health")
|
resp = await client.get("/api/gps-tracks/health")
|
||||||
|
|
||||||
@@ -316,7 +364,37 @@ async def test_i40_health_endpoint(db_with_tracks):
|
|||||||
assert "tracks_total" in data
|
assert "tracks_total" in data
|
||||||
assert data["tracks_total"] > 0
|
assert data["tracks_total"] > 0
|
||||||
assert "tracks_by_activity" in data
|
assert "tracks_by_activity" in data
|
||||||
|
|
||||||
|
# REQ-F-12: агрегированный объект last_pipeline_run
|
||||||
assert "last_pipeline_run" in data
|
assert "last_pipeline_run" in data
|
||||||
|
run = data["last_pipeline_run"]
|
||||||
|
assert run is not None, "last_pipeline_run must not be None when pipeline_runs exist"
|
||||||
|
|
||||||
|
# Обязательные поля
|
||||||
|
assert "started_at" in run
|
||||||
|
assert "finished_at" in run
|
||||||
|
assert "regions" in run
|
||||||
|
assert "sources_ok" in run
|
||||||
|
assert "sources_error" in run
|
||||||
|
assert "tracks_added" in run
|
||||||
|
|
||||||
|
# Типы
|
||||||
|
assert isinstance(run["regions"], list)
|
||||||
|
assert isinstance(run["sources_ok"], list)
|
||||||
|
assert isinstance(run["sources_error"], list)
|
||||||
|
assert isinstance(run["tracks_added"], int)
|
||||||
|
|
||||||
|
# Нет сырых полей строки БД (region_id, source_id — не агрегированные)
|
||||||
|
assert "region_id" not in run, "raw DB field region_id must not be present"
|
||||||
|
assert "source_id" not in run, "raw DB field source_id must not be present"
|
||||||
|
|
||||||
|
# Конкретные агрегированные значения из fixture (2 строки одного прогона)
|
||||||
|
assert run["started_at"] == "2026-05-30T03:00:00Z"
|
||||||
|
assert run["finished_at"] == "2026-05-30T05:14:00Z" # max из двух строк
|
||||||
|
assert set(run["regions"]) == {"cfo", "chuvashia"}
|
||||||
|
assert "osm" in run["sources_ok"]
|
||||||
|
assert run["sources_error"] == []
|
||||||
|
assert run["tracks_added"] == 52 # 42 + 10
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
Reference in New Issue
Block a user