Files
enduro-trails/tests/api/test_gps_tracks_endpoint.py
claude-bot 1ffa178b38
Some checks failed
CI / lint (push) Failing after 4s
CI / test (push) Failing after 5s
CI / build (push) Has been skipped
CI / lint (pull_request) Failing after 3s
CI / test (pull_request) Failing after 5s
CI / build (pull_request) Has been skipped
fix(gps-tracks): aggregate last_pipeline_run in health endpoint (REQ-F-12)
Replace raw single-row fetch with aggregation over all pipeline_runs
rows sharing the latest started_at. Returns structured object with
regions[], sources_ok[], sources_error[], tracks_added instead of
a raw DB row with region_id/source_id strings.

Returns null when no runs exist (empty DB).

Update test_i40_health_endpoint: add db_with_pipeline_runs fixture
(two rows, same started_at, two regions) and assert the full
aggregated shape including concrete values.

Refs: ET-008
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 14:27:52 +00:00

481 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Integration тесты для GPS-треков endpoint (ET-008).
I-20: GeoJSON с фильтрами
I-21: truncation
I-22: невалидный bbox → 400
I-23: bbox в океане → пустой
I-30: MVT тайл отдаётся
I-31: cache hit
I-40: health endpoint
"""
import pytest
from httpx import AsyncClient, ASGITransport
from fastapi import FastAPI
from src.api.gps_tracks.db import open_db, init_db, upsert_track
from src.api.gps_tracks.dedup import compute_dedup_key
from src.api.gps_tracks.endpoint import create_gps_router
from src.api.gps_tracks.models import TrackInsert
def _make_test_app(db_path: str) -> FastAPI:
"""Создаёт тестовое FastAPI приложение с GPS router."""
app = FastAPI()
router = create_gps_router(db_path)
app.include_router(router)
return app
def _make_track(
external_id="T1",
source_id="osm",
length_m=5000.0,
created_at="2024-05-12T10:00:00Z",
min_lon=37.60,
min_lat=55.74,
max_lon=37.65,
max_lat=55.78,
activity_type="other",
external_url=None,
source_priority=50,
) -> TrackInsert:
from shapely.geometry import LineString
from shapely import wkb
coords = [
(min_lon, min_lat),
((min_lon + max_lon) / 2, (min_lat + max_lat) / 2),
(max_lon, max_lat),
]
geom_wkb = wkb.dumps(LineString(coords))
return TrackInsert(
external_id=external_id,
source_id=source_id,
external_url=external_url,
name=f"Track {external_id}",
description=None,
activity_type=activity_type,
user=None,
created_at=created_at,
length_m=length_m,
points_count=3,
geom_wkb=geom_wkb,
min_lon=min_lon,
min_lat=min_lat,
max_lon=max_lon,
max_lat=max_lat,
tags=[],
source_priority=source_priority,
)
@pytest.fixture
def db_with_tracks(tmp_path):
"""БД с несколькими тестовыми треками."""
db_path = str(tmp_path / "test.sqlite")
conn = open_db(db_path)
init_db(conn)
# Добавляем треки вокруг Москвы
tracks = [
_make_track("T1", "osm", activity_type="enduro", length_m=8000),
_make_track("T2", "osm", activity_type="moto", length_m=3000,
min_lon=37.70, min_lat=55.80, max_lon=37.75, max_lat=55.85),
_make_track("T3", "enduro_russia", activity_type="bicycle", length_m=12000),
]
for track in tracks:
dedup_key = compute_dedup_key(
(track.min_lon, track.min_lat, track.max_lon, track.max_lat),
{"length_m": track.length_m, "created_at": track.created_at},
)
upsert_track(conn, track, dedup_key, source_priority=50)
conn.close()
yield db_path
@pytest.fixture
def db_with_pipeline_runs(db_with_tracks):
"""БД с треками и записями о прогонах pipeline (REQ-F-12).
Один прогон охватывает два региона и один источник.
Имитирует ситуацию когда pipeline записал две строки
с одинаковым started_at (один запуск скрипта).
"""
db_path = db_with_tracks
conn = open_db(db_path)
conn.executemany(
"""
INSERT INTO pipeline_runs
(started_at, finished_at, region_id, source_id,
status, tracks_new, tracks_updated, errors_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
"2026-05-30T03:00:00Z",
"2026-05-30T04:00:00Z",
"cfo",
"osm",
"ok",
42,
5,
None,
),
(
"2026-05-30T03:00:00Z",
"2026-05-30T05:14:00Z",
"chuvashia",
"osm",
"ok",
10,
2,
None,
),
],
)
conn.commit()
conn.close()
yield db_path
# ─── I-20: GeoJSON с фильтрами ────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_i20_geojson_basic(db_with_tracks):
"""I-20: базовый запрос GeoJSON."""
app = _make_test_app(db_with_tracks)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get(
"/api/gps-tracks",
params={"bbox": "37.5,55.7,37.9,55.9"},
)
assert resp.status_code == 200
data = resp.json()
assert data["type"] == "FeatureCollection"
assert isinstance(data["features"], list)
assert len(data["features"]) > 0
assert "total_in_bbox" in data
assert "returned" in data
assert "truncated" in data
@pytest.mark.asyncio
async def test_i20_filter_by_activity(db_with_tracks):
"""I-20: фильтрация по activity_type."""
app = _make_test_app(db_with_tracks)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get(
"/api/gps-tracks",
params={"bbox": "37.5,55.7,37.9,55.9", "activity": "enduro"},
)
assert resp.status_code == 200
data = resp.json()
for feature in data["features"]:
assert feature["properties"]["activity_type"] == "enduro"
@pytest.mark.asyncio
async def test_i20_filter_by_source(db_with_tracks):
"""I-20: фильтрация по source."""
app = _make_test_app(db_with_tracks)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get(
"/api/gps-tracks",
params={"bbox": "37.5,55.7,37.9,55.9", "source": "enduro_russia"},
)
assert resp.status_code == 200
data = resp.json()
# Все returned треки должны иметь enduro_russia в sources
for feature in data["features"]:
assert "enduro_russia" in feature["properties"]["sources"]
# ─── I-21: truncation ────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_i21_truncation(tmp_path):
"""I-21: truncation при limit меньше total."""
db_path = str(tmp_path / "trunc.sqlite")
conn = open_db(db_path)
init_db(conn)
# Создаём 10 треков с разными bbox
for i in range(10):
t = _make_track(
external_id=f"T{i}",
source_id="osm",
min_lon=37.60 + i * 0.001,
min_lat=55.74,
max_lon=37.65 + i * 0.001,
max_lat=55.78,
length_m=5000 + i * 100,
created_at=f"2024-05-{12 + i:02d}T10:00:00Z",
)
dedup_key = compute_dedup_key(
(t.min_lon, t.min_lat, t.max_lon, t.max_lat),
{"length_m": t.length_m, "created_at": t.created_at},
)
upsert_track(conn, t, dedup_key, source_priority=50)
conn.close()
app = _make_test_app(db_path)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get(
"/api/gps-tracks",
params={"bbox": "37.5,55.7,37.9,55.9", "limit": 3},
)
assert resp.status_code == 200
data = resp.json()
assert data["returned"] == 3
assert data["total_in_bbox"] >= 3
assert data["truncated"] is True
# ─── I-22: невалидный bbox → 400 ─────────────────────────────────────────────
@pytest.mark.asyncio
@pytest.mark.parametrize("bad_bbox", [
"abc,def,ghi,jkl", # не числа
"37.5,55.7,37.9", # 3 значения
"37.5,55.7,37.9,55.9,1.0", # 5 значений
"200,55.7,37.9,55.9", # lon out of range
"37.5,95,37.9,55.9", # lat out of range
"37.9,55.7,37.5,55.9", # west > east
"37.5,55.9,37.9,55.7", # south > north
])
async def test_i22_invalid_bbox_returns_400(tmp_path, bad_bbox):
"""I-22: невалидный bbox → 400."""
db_path = str(tmp_path / "test.sqlite")
conn = open_db(db_path)
init_db(conn)
conn.close()
app = _make_test_app(db_path)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get(
"/api/gps-tracks",
params={"bbox": bad_bbox},
)
assert resp.status_code == 400
# ─── I-23: bbox в океане → пустой ────────────────────────────────────────────
@pytest.mark.asyncio
async def test_i23_ocean_bbox_returns_empty(db_with_tracks):
"""I-23: bbox в океане (нет треков) → пустой FeatureCollection."""
app = _make_test_app(db_with_tracks)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get(
"/api/gps-tracks",
# Средина Атлантического океана
params={"bbox": "-30.0,0.0,-20.0,10.0"},
)
assert resp.status_code == 200
data = resp.json()
assert data["type"] == "FeatureCollection"
assert data["features"] == []
assert data["total_in_bbox"] == 0
assert data["truncated"] is False
# ─── I-30: MVT тайл ──────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_i30_mvt_tile_returns(db_with_tracks):
"""I-30: MVT тайл с треками возвращается."""
app = _make_test_app(db_with_tracks)
# z=10, x=620, y=320 — покрывает Москву
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get("/api/gps-tracks/tiles/10/620/320.mvt")
assert resp.status_code == 200
assert resp.headers["content-type"] == "application/x-protobuf"
assert "X-Cache" in resp.headers
@pytest.mark.asyncio
async def test_i30_mvt_tile_empty_ocean(tmp_path):
"""I-30: MVT тайл без треков возвращает пустой ответ."""
db_path = str(tmp_path / "empty.sqlite")
conn = open_db(db_path)
init_db(conn)
conn.close()
app = _make_test_app(db_path)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get("/api/gps-tracks/tiles/10/400/300.mvt")
assert resp.status_code == 200
assert resp.content == b""
# ─── I-31: cache hit ─────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_i31_cache_hit(db_with_tracks):
"""I-31: второй запрос к тому же тайлу возвращает X-Cache: HIT."""
from src.api.gps_tracks.mvt import clear_gps_tile_cache
clear_gps_tile_cache()
app = _make_test_app(db_with_tracks)
# z=10 x=621 y=319 — близко к Москве, должен вернуть данные
z, x, y = 10, 621, 319
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
# Первый запрос — MISS
resp1 = await client.get(f"/api/gps-tracks/tiles/{z}/{x}/{y}.mvt")
assert resp1.status_code == 200
# Второй запрос к пустому тайлу — кэш не заполняется для пустых
# Используем тайл с треками
resp2 = await client.get(f"/api/gps-tracks/tiles/{z}/{x}/{y}.mvt")
assert resp2.status_code == 200
# Если первый вернул данные, второй должен быть HIT
if resp1.content:
assert resp2.headers.get("X-Cache") == "HIT"
# ─── I-40: health endpoint ────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_i40_health_endpoint(db_with_pipeline_runs):
"""I-40: health endpoint возвращает корректную статистику.
REQ-F-12: last_pipeline_run — агрегированный объект, а не сырая строка БД.
Структура: started_at, finished_at, regions[], sources_ok[], sources_error[], tracks_added.
"""
app = _make_test_app(db_with_pipeline_runs)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get("/api/gps-tracks/health")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert "tracks_total" in data
assert data["tracks_total"] > 0
assert "tracks_by_activity" in data
# REQ-F-12: агрегированный объект last_pipeline_run
assert "last_pipeline_run" in data
run = data["last_pipeline_run"]
assert run is not None, "last_pipeline_run must not be None when pipeline_runs exist"
# Обязательные поля
assert "started_at" in run
assert "finished_at" in run
assert "regions" in run
assert "sources_ok" in run
assert "sources_error" in run
assert "tracks_added" in run
# Типы
assert isinstance(run["regions"], list)
assert isinstance(run["sources_ok"], list)
assert isinstance(run["sources_error"], list)
assert isinstance(run["tracks_added"], int)
# Нет сырых полей строки БД (region_id, source_id — не агрегированные)
assert "region_id" not in run, "raw DB field region_id must not be present"
assert "source_id" not in run, "raw DB field source_id must not be present"
# Конкретные агрегированные значения из fixture (2 строки одного прогона)
assert run["started_at"] == "2026-05-30T03:00:00Z"
assert run["finished_at"] == "2026-05-30T05:14:00Z" # max из двух строк
assert set(run["regions"]) == {"cfo", "chuvashia"}
assert "osm" in run["sources_ok"]
assert run["sources_error"] == []
assert run["tracks_added"] == 52 # 42 + 10
@pytest.mark.asyncio
async def test_i40_health_empty_db(tmp_path):
"""I-40: health endpoint для пустой БД."""
db_path = str(tmp_path / "empty.sqlite")
conn = open_db(db_path)
init_db(conn)
conn.close()
app = _make_test_app(db_path)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get("/api/gps-tracks/health")
assert resp.status_code == 200
data = resp.json()
assert data["tracks_total"] == 0
assert data["last_pipeline_run"] is None
# ─── Cache clear endpoint ─────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_cache_clear_endpoint(db_with_tracks):
"""POST /api/gps-tracks/cache/clear очищает кэш."""
app = _make_test_app(db_with_tracks)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post("/api/gps-tracks/cache/clear")
assert resp.status_code == 200
data = resp.json()
assert data["cleared"] is True
# ─── F-01/F-02: GeoJSON normalised properties ─────────────────────────────────
@pytest.mark.asyncio
async def test_f01_f02_geojson_normalised_properties(db_with_tracks):
"""F-01/F-02: GeoJSON features carry activity/source (MVT-compatible) and length_km."""
app = _make_test_app(db_with_tracks)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get(
"/api/gps-tracks",
params={"bbox": "37.5,55.7,37.9,55.9"},
)
assert resp.status_code == 200
data = resp.json()
assert len(data["features"]) > 0
for feature in data["features"]:
props = feature["properties"]
# F-01: MVT-compatible aliases
assert "activity" in props, "activity field missing (F-01)"
assert "source" in props, "source field missing (F-01)"
assert isinstance(props["source"], str), "source must be str (F-01)"
assert props["activity"] == props["activity_type"], "activity must equal activity_type"
# F-02: length in km
assert "length_km" in props, "length_km missing (F-02)"
assert isinstance(props["length_km"], float), "length_km must be float"
if props["length_m"]:
assert abs(props["length_km"] - props["length_m"] / 1000) < 0.01
# ─── F-04: health extended fields ─────────────────────────────────────────────
@pytest.mark.asyncio
async def test_f04_health_extended_fields(db_with_tracks):
"""F-04: /health returns db_size_mb, tracks_by_source, tile_cache_size."""
app = _make_test_app(db_with_tracks)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.get("/api/gps-tracks/health")
assert resp.status_code == 200
data = resp.json()
# db_size_mb
assert "db_size_mb" in data, "db_size_mb missing (F-04)"
assert isinstance(data["db_size_mb"], (int, float))
assert data["db_size_mb"] >= 0
# tracks_by_source
assert "tracks_by_source" in data, "tracks_by_source missing (F-04)"
assert isinstance(data["tracks_by_source"], dict)
# tile_cache_size
assert "tile_cache_size" in data, "tile_cache_size missing (F-04)"
assert isinstance(data["tile_cache_size"], int)
assert data["tile_cache_size"] >= 0