Replace raw single-row fetch with aggregation over all pipeline_runs rows sharing the latest started_at. Returns structured object with regions[], sources_ok[], sources_error[], tracks_added instead of a raw DB row with region_id/source_id strings. Returns null when no runs exist (empty DB). Update test_i40_health_endpoint: add db_with_pipeline_runs fixture (two rows, same started_at, two regions) and assert the full aggregated shape including concrete values. Refs: ET-008 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
311 lines
10 KiB
Python
311 lines
10 KiB
Python
"""FastAPI router для GPS-треков (ET-008)."""
|
||
import json
|
||
import os
|
||
from typing import Optional
|
||
|
||
from fastapi import APIRouter, HTTPException, Query, Response
|
||
|
||
from src.api.gps_tracks.db import get_tracks_in_bbox, init_db, open_db
|
||
from src.api.gps_tracks.mvt import (
|
||
_gps_tile_cache,
|
||
build_gps_mvt,
|
||
clear_gps_tile_cache,
|
||
get_gps_cached_tile,
|
||
set_gps_cached_tile,
|
||
_tile_to_bbox,
|
||
)
|
||
|
||
|
||
def _parse_bbox(bbox_str: str) -> tuple:
|
||
"""Парсит и валидирует bbox строку 'west,south,east,north'.
|
||
|
||
Returns:
|
||
(west, south, east, north)
|
||
|
||
Raises:
|
||
HTTPException 400 при невалидных значениях
|
||
"""
|
||
try:
|
||
parts = [float(v.strip()) for v in bbox_str.split(",")]
|
||
except (ValueError, AttributeError):
|
||
raise HTTPException(400, "bbox must be 4 comma-separated floats")
|
||
|
||
if len(parts) != 4:
|
||
raise HTTPException(400, "bbox must have exactly 4 values: west,south,east,north")
|
||
|
||
west, south, east, north = parts
|
||
|
||
if not (-180 <= west <= 180) or not (-180 <= east <= 180):
|
||
raise HTTPException(400, "bbox longitude values must be in range -180..180")
|
||
|
||
if not (-90 <= south <= 90) or not (-90 <= north <= 90):
|
||
raise HTTPException(400, "bbox latitude values must be in range -90..90")
|
||
|
||
if west >= east:
|
||
raise HTTPException(400, "bbox west must be < east")
|
||
|
||
if south >= north:
|
||
raise HTTPException(400, "bbox south must be < north")
|
||
|
||
return west, south, east, north
|
||
|
||
|
||
def _row_to_geojson_feature(row) -> dict:
|
||
"""Конвертирует sqlite3.Row в GeoJSON Feature."""
|
||
from src.api.gps_tracks.mvt import _wkb_to_coords
|
||
|
||
coords = _wkb_to_coords(row["geom"])
|
||
|
||
sources = json.loads(row["sources_json"] or "[]")
|
||
ext_urls = json.loads(row["external_urls_json"] or "[]")
|
||
tags = json.loads(row["tags_json"] or "[]")
|
||
|
||
activity_type = row["activity_type"] or "other"
|
||
first_source = sources[0] if sources else ""
|
||
length_m = row["length_m"] or 0
|
||
length_km = round(length_m / 1000, 2)
|
||
|
||
geometry = None
|
||
if coords:
|
||
geometry = {"type": "LineString", "coordinates": coords}
|
||
|
||
return {
|
||
"type": "Feature",
|
||
"geometry": geometry,
|
||
"properties": {
|
||
"id": row["id"],
|
||
"dedup_key": row["dedup_key"],
|
||
"name": row["name"],
|
||
"description": row["description"],
|
||
"activity_type": row["activity_type"],
|
||
"activity": activity_type,
|
||
"user": row["user"],
|
||
"created_at": row["created_at"],
|
||
"length_m": row["length_m"],
|
||
"length_km": length_km,
|
||
"points_count": row["points_count"],
|
||
"sources": sources,
|
||
"source": first_source,
|
||
"external_urls": ext_urls,
|
||
"tags": tags,
|
||
"inserted_at": row["inserted_at"],
|
||
"updated_at": row["updated_at"],
|
||
},
|
||
}
|
||
|
||
|
||
def create_gps_router(db_path: str) -> APIRouter:
|
||
"""Создаёт FastAPI router для GPS-треков.
|
||
|
||
Args:
|
||
db_path: путь к SQLite БД для GPS-треков
|
||
|
||
Returns:
|
||
APIRouter с prefix="/api/gps-tracks"
|
||
"""
|
||
router = APIRouter(prefix="/api/gps-tracks", tags=["gps-tracks"])
|
||
|
||
def _get_conn():
|
||
conn = open_db(db_path)
|
||
init_db(conn)
|
||
return conn
|
||
|
||
@router.get("")
|
||
async def get_tracks(
|
||
bbox: str = Query(..., description="west,south,east,north"),
|
||
activity: Optional[str] = Query(None, description="Comma-separated activity types"),
|
||
source: Optional[str] = Query(None, description="Comma-separated source ids"),
|
||
limit: int = Query(500, ge=1, le=2000),
|
||
):
|
||
"""Возвращает GPS-треки в bbox как GeoJSON FeatureCollection."""
|
||
west, south, east, north = _parse_bbox(bbox)
|
||
|
||
activities = [a.strip() for a in activity.split(",")] if activity else None
|
||
sources = [s.strip() for s in source.split(",")] if source else None
|
||
|
||
try:
|
||
conn = _get_conn()
|
||
rows, total_count = get_tracks_in_bbox(
|
||
conn, west, south, east, north,
|
||
activities=activities,
|
||
sources=sources,
|
||
limit=limit,
|
||
)
|
||
conn.close()
|
||
except Exception as exc:
|
||
raise HTTPException(500, f"DB error: {exc}")
|
||
|
||
features = [_row_to_geojson_feature(row) for row in rows]
|
||
returned = len(features)
|
||
|
||
return {
|
||
"type": "FeatureCollection",
|
||
"features": features,
|
||
"total_in_bbox": total_count,
|
||
"returned": returned,
|
||
"truncated": total_count > returned,
|
||
}
|
||
|
||
@router.get("/tiles/{z}/{x}/{y}.mvt")
|
||
async def get_gps_tile(z: int, x: int, y: int):
|
||
"""Возвращает MVT тайл с GPS-треками."""
|
||
if z < 0 or z > 22:
|
||
raise HTTPException(400, "Invalid z")
|
||
max_coord = 2 ** z
|
||
if x < 0 or x >= max_coord or y < 0 or y >= max_coord:
|
||
raise HTTPException(400, "Invalid x/y for zoom level")
|
||
|
||
# Проверяем кэш
|
||
cached = get_gps_cached_tile(z, x, y)
|
||
if cached is not None:
|
||
return Response(
|
||
content=cached,
|
||
media_type="application/x-protobuf",
|
||
headers={
|
||
"Content-Encoding": "identity",
|
||
"Access-Control-Allow-Origin": "*",
|
||
"X-Cache": "HIT",
|
||
},
|
||
)
|
||
|
||
west, south, east, north = _tile_to_bbox(z, x, y)
|
||
|
||
# Небольшой буфер для edge features
|
||
buf_x = (east - west) * 0.1
|
||
buf_y = (north - south) * 0.1
|
||
|
||
try:
|
||
conn = _get_conn()
|
||
rows, _ = get_tracks_in_bbox(
|
||
conn,
|
||
west - buf_x,
|
||
south - buf_y,
|
||
east + buf_x,
|
||
north + buf_y,
|
||
limit=25000,
|
||
)
|
||
conn.close()
|
||
except Exception as exc:
|
||
raise HTTPException(500, f"DB error: {exc}")
|
||
|
||
mvt = build_gps_mvt(rows, z, x, y)
|
||
|
||
if mvt:
|
||
set_gps_cached_tile(z, x, y, mvt)
|
||
|
||
return Response(
|
||
content=mvt,
|
||
media_type="application/x-protobuf",
|
||
headers={
|
||
"Content-Encoding": "identity",
|
||
"Access-Control-Allow-Origin": "*",
|
||
"X-Cache": "MISS",
|
||
},
|
||
)
|
||
|
||
@router.get("/health")
|
||
async def gps_health():
|
||
"""Статистика GPS-треков БД.
|
||
|
||
Поле last_pipeline_run агрегирует все записи pipeline_runs,
|
||
принадлежащие последнему запуску (по максимальному started_at).
|
||
Возвращает None если прогонов ещё не было.
|
||
"""
|
||
try:
|
||
conn = _get_conn()
|
||
cur = conn.cursor()
|
||
|
||
cur.execute("SELECT COUNT(*) as cnt FROM tracks")
|
||
total_tracks = cur.fetchone()["cnt"]
|
||
|
||
cur.execute(
|
||
"SELECT activity_type, COUNT(*) as cnt FROM tracks GROUP BY activity_type"
|
||
)
|
||
by_activity = {row["activity_type"] or "other": row["cnt"] for row in cur.fetchall()}
|
||
|
||
# REQ-F-12: агрегированный объект по всем строкам последнего прогона.
|
||
# Все строки одного запуска pipeline имеют одинаковый started_at —
|
||
# pipeline устанавливает его перед итерацией по (region, source).
|
||
cur.execute(
|
||
"""
|
||
SELECT started_at, finished_at, region_id, source_id,
|
||
status, tracks_new, errors_json
|
||
FROM pipeline_runs
|
||
WHERE started_at = (SELECT MAX(started_at) FROM pipeline_runs)
|
||
ORDER BY region_id, source_id
|
||
"""
|
||
)
|
||
run_rows = cur.fetchall()
|
||
|
||
if run_rows:
|
||
regions: list = []
|
||
sources_ok: list = []
|
||
sources_error: list = []
|
||
tracks_added = 0
|
||
finished_at_values: list = []
|
||
|
||
for row in run_rows:
|
||
region = row["region_id"]
|
||
if region not in regions:
|
||
regions.append(region)
|
||
|
||
if row["status"] in ("ok", "partial"):
|
||
sources_ok.append(row["source_id"])
|
||
else:
|
||
sources_error.append(row["source_id"])
|
||
|
||
tracks_added += row["tracks_new"] or 0
|
||
|
||
if row["finished_at"]:
|
||
finished_at_values.append(row["finished_at"])
|
||
|
||
last_run: Optional[dict] = {
|
||
"started_at": run_rows[0]["started_at"],
|
||
"finished_at": max(finished_at_values) if finished_at_values else None,
|
||
"regions": regions,
|
||
"sources_ok": sources_ok,
|
||
"sources_error": sources_error,
|
||
"tracks_added": tracks_added,
|
||
}
|
||
else:
|
||
last_run = None
|
||
|
||
cur.execute("SELECT sources_json FROM tracks")
|
||
tracks_by_source: dict = {}
|
||
for trow in cur.fetchall():
|
||
try:
|
||
src_list = json.loads(trow["sources_json"] or "[]")
|
||
except Exception:
|
||
src_list = []
|
||
for src in src_list:
|
||
tracks_by_source[src] = tracks_by_source.get(src, 0) + 1
|
||
|
||
conn.close()
|
||
except Exception as exc:
|
||
raise HTTPException(500, f"DB error: {exc}")
|
||
|
||
db_size_mb = 0.0
|
||
try:
|
||
db_size_mb = os.path.getsize(db_path) / 1024 / 1024
|
||
except OSError:
|
||
pass
|
||
|
||
return {
|
||
"status": "ok",
|
||
"db_path": db_path,
|
||
"tracks_total": total_tracks,
|
||
"tracks_by_activity": by_activity,
|
||
"last_pipeline_run": last_run,
|
||
"db_size_mb": db_size_mb,
|
||
"tracks_by_source": tracks_by_source,
|
||
"tile_cache_size": len(_gps_tile_cache),
|
||
}
|
||
|
||
@router.post("/cache/clear")
|
||
async def clear_cache():
|
||
"""Сбрасывает LRU-кэш GPS-тайлов."""
|
||
clear_gps_tile_cache()
|
||
return {"status": "ok", "cleared": True}
|
||
|
||
return router
|