"""Integration тесты для GPS-треков endpoint (ET-008). I-20: GeoJSON с фильтрами I-21: truncation I-22: невалидный bbox → 400 I-23: bbox в океане → пустой I-30: MVT тайл отдаётся I-31: cache hit I-40: health endpoint """ import pytest from httpx import AsyncClient, ASGITransport from fastapi import FastAPI from src.api.gps_tracks.db import open_db, init_db, upsert_track from src.api.gps_tracks.dedup import compute_dedup_key from src.api.gps_tracks.endpoint import create_gps_router from src.api.gps_tracks.models import TrackInsert def _make_test_app(db_path: str) -> FastAPI: """Создаёт тестовое FastAPI приложение с GPS router.""" app = FastAPI() router = create_gps_router(db_path) app.include_router(router) return app def _make_track( external_id="T1", source_id="osm", length_m=5000.0, created_at="2024-05-12T10:00:00Z", min_lon=37.60, min_lat=55.74, max_lon=37.65, max_lat=55.78, activity_type="other", external_url=None, source_priority=50, ) -> TrackInsert: from shapely.geometry import LineString from shapely import wkb coords = [ (min_lon, min_lat), ((min_lon + max_lon) / 2, (min_lat + max_lat) / 2), (max_lon, max_lat), ] geom_wkb = wkb.dumps(LineString(coords)) return TrackInsert( external_id=external_id, source_id=source_id, external_url=external_url, name=f"Track {external_id}", description=None, activity_type=activity_type, user=None, created_at=created_at, length_m=length_m, points_count=3, geom_wkb=geom_wkb, min_lon=min_lon, min_lat=min_lat, max_lon=max_lon, max_lat=max_lat, tags=[], source_priority=source_priority, ) @pytest.fixture def db_with_tracks(tmp_path): """БД с несколькими тестовыми треками.""" db_path = str(tmp_path / "test.sqlite") conn = open_db(db_path) init_db(conn) # Добавляем треки вокруг Москвы tracks = [ _make_track("T1", "osm", activity_type="enduro", length_m=8000), _make_track("T2", "osm", activity_type="moto", length_m=3000, min_lon=37.70, min_lat=55.80, max_lon=37.75, max_lat=55.85), _make_track("T3", "enduro_russia", activity_type="bicycle", length_m=12000), ] for track in tracks: dedup_key = compute_dedup_key( (track.min_lon, track.min_lat, track.max_lon, track.max_lat), {"length_m": track.length_m, "created_at": track.created_at}, ) upsert_track(conn, track, dedup_key, source_priority=50) conn.close() yield db_path @pytest.fixture def db_with_pipeline_runs(db_with_tracks): """БД с треками и записями о прогонах pipeline (REQ-F-12). Один прогон охватывает два региона и один источник. Имитирует ситуацию когда pipeline записал две строки с одинаковым started_at (один запуск скрипта). """ db_path = db_with_tracks conn = open_db(db_path) conn.executemany( """ INSERT INTO pipeline_runs (started_at, finished_at, region_id, source_id, status, tracks_new, tracks_updated, errors_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, [ ( "2026-05-30T03:00:00Z", "2026-05-30T04:00:00Z", "cfo", "osm", "ok", 42, 5, None, ), ( "2026-05-30T03:00:00Z", "2026-05-30T05:14:00Z", "chuvashia", "osm", "ok", 10, 2, None, ), ], ) conn.commit() conn.close() yield db_path # ─── I-20: GeoJSON с фильтрами ──────────────────────────────────────────────── @pytest.mark.asyncio async def test_i20_geojson_basic(db_with_tracks): """I-20: базовый запрос GeoJSON.""" app = _make_test_app(db_with_tracks) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get( "/api/gps-tracks", params={"bbox": "37.5,55.7,37.9,55.9"}, ) assert resp.status_code == 200 data = resp.json() assert data["type"] == "FeatureCollection" assert isinstance(data["features"], list) assert len(data["features"]) > 0 assert "total_in_bbox" in data assert "returned" in data assert "truncated" in data @pytest.mark.asyncio async def test_i20_filter_by_activity(db_with_tracks): """I-20: фильтрация по activity_type.""" app = _make_test_app(db_with_tracks) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get( "/api/gps-tracks", params={"bbox": "37.5,55.7,37.9,55.9", "activity": "enduro"}, ) assert resp.status_code == 200 data = resp.json() for feature in data["features"]: assert feature["properties"]["activity_type"] == "enduro" @pytest.mark.asyncio async def test_i20_filter_by_source(db_with_tracks): """I-20: фильтрация по source.""" app = _make_test_app(db_with_tracks) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get( "/api/gps-tracks", params={"bbox": "37.5,55.7,37.9,55.9", "source": "enduro_russia"}, ) assert resp.status_code == 200 data = resp.json() # Все returned треки должны иметь enduro_russia в sources for feature in data["features"]: assert "enduro_russia" in feature["properties"]["sources"] # ─── I-21: truncation ──────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_i21_truncation(tmp_path): """I-21: truncation при limit меньше total.""" db_path = str(tmp_path / "trunc.sqlite") conn = open_db(db_path) init_db(conn) # Создаём 10 треков с разными bbox for i in range(10): t = _make_track( external_id=f"T{i}", source_id="osm", min_lon=37.60 + i * 0.001, min_lat=55.74, max_lon=37.65 + i * 0.001, max_lat=55.78, length_m=5000 + i * 100, created_at=f"2024-05-{12 + i:02d}T10:00:00Z", ) dedup_key = compute_dedup_key( (t.min_lon, t.min_lat, t.max_lon, t.max_lat), {"length_m": t.length_m, "created_at": t.created_at}, ) upsert_track(conn, t, dedup_key, source_priority=50) conn.close() app = _make_test_app(db_path) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get( "/api/gps-tracks", params={"bbox": "37.5,55.7,37.9,55.9", "limit": 3}, ) assert resp.status_code == 200 data = resp.json() assert data["returned"] == 3 assert data["total_in_bbox"] >= 3 assert data["truncated"] is True # ─── I-22: невалидный bbox → 400 ───────────────────────────────────────────── @pytest.mark.asyncio @pytest.mark.parametrize("bad_bbox", [ "abc,def,ghi,jkl", # не числа "37.5,55.7,37.9", # 3 значения "37.5,55.7,37.9,55.9,1.0", # 5 значений "200,55.7,37.9,55.9", # lon out of range "37.5,95,37.9,55.9", # lat out of range "37.9,55.7,37.5,55.9", # west > east "37.5,55.9,37.9,55.7", # south > north ]) async def test_i22_invalid_bbox_returns_400(tmp_path, bad_bbox): """I-22: невалидный bbox → 400.""" db_path = str(tmp_path / "test.sqlite") conn = open_db(db_path) init_db(conn) conn.close() app = _make_test_app(db_path) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get( "/api/gps-tracks", params={"bbox": bad_bbox}, ) assert resp.status_code == 400 # ─── I-23: bbox в океане → пустой ──────────────────────────────────────────── @pytest.mark.asyncio async def test_i23_ocean_bbox_returns_empty(db_with_tracks): """I-23: bbox в океане (нет треков) → пустой FeatureCollection.""" app = _make_test_app(db_with_tracks) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get( "/api/gps-tracks", # Средина Атлантического океана params={"bbox": "-30.0,0.0,-20.0,10.0"}, ) assert resp.status_code == 200 data = resp.json() assert data["type"] == "FeatureCollection" assert data["features"] == [] assert data["total_in_bbox"] == 0 assert data["truncated"] is False # ─── I-30: MVT тайл ────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_i30_mvt_tile_returns(db_with_tracks): """I-30: MVT тайл с треками возвращается.""" app = _make_test_app(db_with_tracks) # z=10, x=620, y=320 — покрывает Москву async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get("/api/gps-tracks/tiles/10/620/320.mvt") assert resp.status_code == 200 assert resp.headers["content-type"] == "application/x-protobuf" assert "X-Cache" in resp.headers @pytest.mark.asyncio async def test_i30_mvt_tile_empty_ocean(tmp_path): """I-30: MVT тайл без треков возвращает пустой ответ.""" db_path = str(tmp_path / "empty.sqlite") conn = open_db(db_path) init_db(conn) conn.close() app = _make_test_app(db_path) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get("/api/gps-tracks/tiles/10/400/300.mvt") assert resp.status_code == 200 assert resp.content == b"" # ─── I-31: cache hit ───────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_i31_cache_hit(db_with_tracks): """I-31: второй запрос к тому же тайлу возвращает X-Cache: HIT.""" from src.api.gps_tracks.mvt import clear_gps_tile_cache clear_gps_tile_cache() app = _make_test_app(db_with_tracks) # z=10 x=621 y=319 — близко к Москве, должен вернуть данные z, x, y = 10, 621, 319 async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: # Первый запрос — MISS resp1 = await client.get(f"/api/gps-tracks/tiles/{z}/{x}/{y}.mvt") assert resp1.status_code == 200 # Второй запрос к пустому тайлу — кэш не заполняется для пустых # Используем тайл с треками resp2 = await client.get(f"/api/gps-tracks/tiles/{z}/{x}/{y}.mvt") assert resp2.status_code == 200 # Если первый вернул данные, второй должен быть HIT if resp1.content: assert resp2.headers.get("X-Cache") == "HIT" # ─── I-40: health endpoint ──────────────────────────────────────────────────── @pytest.mark.asyncio async def test_i40_health_endpoint(db_with_pipeline_runs): """I-40: health endpoint возвращает корректную статистику. REQ-F-12: last_pipeline_run — агрегированный объект, а не сырая строка БД. Структура: started_at, finished_at, regions[], sources_ok[], sources_error[], tracks_added. """ app = _make_test_app(db_with_pipeline_runs) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get("/api/gps-tracks/health") assert resp.status_code == 200 data = resp.json() assert data["status"] == "ok" assert "tracks_total" in data assert data["tracks_total"] > 0 assert "tracks_by_activity" in data # REQ-F-12: агрегированный объект last_pipeline_run assert "last_pipeline_run" in data run = data["last_pipeline_run"] assert run is not None, "last_pipeline_run must not be None when pipeline_runs exist" # Обязательные поля assert "started_at" in run assert "finished_at" in run assert "regions" in run assert "sources_ok" in run assert "sources_error" in run assert "tracks_added" in run # Типы assert isinstance(run["regions"], list) assert isinstance(run["sources_ok"], list) assert isinstance(run["sources_error"], list) assert isinstance(run["tracks_added"], int) # Нет сырых полей строки БД (region_id, source_id — не агрегированные) assert "region_id" not in run, "raw DB field region_id must not be present" assert "source_id" not in run, "raw DB field source_id must not be present" # Конкретные агрегированные значения из fixture (2 строки одного прогона) assert run["started_at"] == "2026-05-30T03:00:00Z" assert run["finished_at"] == "2026-05-30T05:14:00Z" # max из двух строк assert set(run["regions"]) == {"cfo", "chuvashia"} assert "osm" in run["sources_ok"] assert run["sources_error"] == [] assert run["tracks_added"] == 52 # 42 + 10 @pytest.mark.asyncio async def test_i40_health_empty_db(tmp_path): """I-40: health endpoint для пустой БД.""" db_path = str(tmp_path / "empty.sqlite") conn = open_db(db_path) init_db(conn) conn.close() app = _make_test_app(db_path) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get("/api/gps-tracks/health") assert resp.status_code == 200 data = resp.json() assert data["tracks_total"] == 0 assert data["last_pipeline_run"] is None # ─── Cache clear endpoint ───────────────────────────────────────────────────── @pytest.mark.asyncio async def test_cache_clear_endpoint(db_with_tracks): """POST /api/gps-tracks/cache/clear очищает кэш.""" app = _make_test_app(db_with_tracks) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.post("/api/gps-tracks/cache/clear") assert resp.status_code == 200 data = resp.json() assert data["cleared"] is True # ─── F-01/F-02: GeoJSON normalised properties ───────────────────────────────── @pytest.mark.asyncio async def test_f01_f02_geojson_normalised_properties(db_with_tracks): """F-01/F-02: GeoJSON features carry activity/source (MVT-compatible) and length_km.""" app = _make_test_app(db_with_tracks) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get( "/api/gps-tracks", params={"bbox": "37.5,55.7,37.9,55.9"}, ) assert resp.status_code == 200 data = resp.json() assert len(data["features"]) > 0 for feature in data["features"]: props = feature["properties"] # F-01: MVT-compatible aliases assert "activity" in props, "activity field missing (F-01)" assert "source" in props, "source field missing (F-01)" assert isinstance(props["source"], str), "source must be str (F-01)" assert props["activity"] == props["activity_type"], "activity must equal activity_type" # F-02: length in km assert "length_km" in props, "length_km missing (F-02)" assert isinstance(props["length_km"], float), "length_km must be float" if props["length_m"]: assert abs(props["length_km"] - props["length_m"] / 1000) < 0.01 # ─── F-04: health extended fields ───────────────────────────────────────────── @pytest.mark.asyncio async def test_f04_health_extended_fields(db_with_tracks): """F-04: /health returns db_size_mb, tracks_by_source, tile_cache_size.""" app = _make_test_app(db_with_tracks) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get("/api/gps-tracks/health") assert resp.status_code == 200 data = resp.json() # db_size_mb assert "db_size_mb" in data, "db_size_mb missing (F-04)" assert isinstance(data["db_size_mb"], (int, float)) assert data["db_size_mb"] >= 0 # tracks_by_source assert "tracks_by_source" in data, "tracks_by_source missing (F-04)" assert isinstance(data["tracks_by_source"], dict) # tile_cache_size assert "tile_cache_size" in data, "tile_cache_size missing (F-04)" assert isinstance(data["tile_cache_size"], int) assert data["tile_cache_size"] >= 0