"""FastAPI router для GPS-треков (ET-008).""" import json import os from typing import Optional from fastapi import APIRouter, HTTPException, Query, Response from src.api.gps_tracks.db import get_tracks_in_bbox, init_db, open_db from src.api.gps_tracks.mvt import ( _gps_tile_cache, build_gps_mvt, clear_gps_tile_cache, get_gps_cached_tile, set_gps_cached_tile, _tile_to_bbox, ) def _parse_bbox(bbox_str: str) -> tuple: """Парсит и валидирует bbox строку 'west,south,east,north'. Returns: (west, south, east, north) Raises: HTTPException 400 при невалидных значениях """ try: parts = [float(v.strip()) for v in bbox_str.split(",")] except (ValueError, AttributeError): raise HTTPException(400, "bbox must be 4 comma-separated floats") if len(parts) != 4: raise HTTPException(400, "bbox must have exactly 4 values: west,south,east,north") west, south, east, north = parts if not (-180 <= west <= 180) or not (-180 <= east <= 180): raise HTTPException(400, "bbox longitude values must be in range -180..180") if not (-90 <= south <= 90) or not (-90 <= north <= 90): raise HTTPException(400, "bbox latitude values must be in range -90..90") if west >= east: raise HTTPException(400, "bbox west must be < east") if south >= north: raise HTTPException(400, "bbox south must be < north") return west, south, east, north def _row_to_geojson_feature(row) -> dict: """Конвертирует sqlite3.Row в GeoJSON Feature.""" from src.api.gps_tracks.mvt import _wkb_to_coords coords = _wkb_to_coords(row["geom"]) sources = json.loads(row["sources_json"] or "[]") ext_urls = json.loads(row["external_urls_json"] or "[]") tags = json.loads(row["tags_json"] or "[]") activity_type = row["activity_type"] or "other" first_source = sources[0] if sources else "" length_m = row["length_m"] or 0 length_km = round(length_m / 1000, 2) geometry = None if coords: geometry = {"type": "LineString", "coordinates": coords} return { "type": "Feature", "geometry": geometry, "properties": { "id": row["id"], "dedup_key": row["dedup_key"], "name": row["name"], "description": row["description"], "activity_type": row["activity_type"], "activity": activity_type, "user": row["user"], "created_at": row["created_at"], "length_m": row["length_m"], "length_km": length_km, "points_count": row["points_count"], "sources": sources, "source": first_source, "external_urls": ext_urls, "tags": tags, "inserted_at": row["inserted_at"], "updated_at": row["updated_at"], }, } def create_gps_router(db_path: str) -> APIRouter: """Создаёт FastAPI router для GPS-треков. Args: db_path: путь к SQLite БД для GPS-треков Returns: APIRouter с prefix="/api/gps-tracks" """ router = APIRouter(prefix="/api/gps-tracks", tags=["gps-tracks"]) def _get_conn(): conn = open_db(db_path) init_db(conn) return conn @router.get("") async def get_tracks( bbox: str = Query(..., description="west,south,east,north"), activity: Optional[str] = Query(None, description="Comma-separated activity types"), source: Optional[str] = Query(None, description="Comma-separated source ids"), limit: int = Query(500, ge=1, le=2000), ): """Возвращает GPS-треки в bbox как GeoJSON FeatureCollection.""" west, south, east, north = _parse_bbox(bbox) activities = [a.strip() for a in activity.split(",")] if activity else None sources = [s.strip() for s in source.split(",")] if source else None try: conn = _get_conn() rows, total_count = get_tracks_in_bbox( conn, west, south, east, north, activities=activities, sources=sources, limit=limit, ) conn.close() except Exception as exc: raise HTTPException(500, f"DB error: {exc}") features = [_row_to_geojson_feature(row) for row in rows] returned = len(features) return { "type": "FeatureCollection", "features": features, "total_in_bbox": total_count, "returned": returned, "truncated": total_count > returned, } @router.get("/tiles/{z}/{x}/{y}.mvt") async def get_gps_tile(z: int, x: int, y: int): """Возвращает MVT тайл с GPS-треками.""" if z < 0 or z > 22: raise HTTPException(400, "Invalid z") max_coord = 2 ** z if x < 0 or x >= max_coord or y < 0 or y >= max_coord: raise HTTPException(400, "Invalid x/y for zoom level") # Проверяем кэш cached = get_gps_cached_tile(z, x, y) if cached is not None: return Response( content=cached, media_type="application/x-protobuf", headers={ "Content-Encoding": "identity", "Access-Control-Allow-Origin": "*", "X-Cache": "HIT", }, ) west, south, east, north = _tile_to_bbox(z, x, y) # Небольшой буфер для edge features buf_x = (east - west) * 0.1 buf_y = (north - south) * 0.1 try: conn = _get_conn() rows, _ = get_tracks_in_bbox( conn, west - buf_x, south - buf_y, east + buf_x, north + buf_y, limit=25000, ) conn.close() except Exception as exc: raise HTTPException(500, f"DB error: {exc}") mvt = build_gps_mvt(rows, z, x, y) if mvt: set_gps_cached_tile(z, x, y, mvt) return Response( content=mvt, media_type="application/x-protobuf", headers={ "Content-Encoding": "identity", "Access-Control-Allow-Origin": "*", "X-Cache": "MISS", }, ) @router.get("/health") async def gps_health(): """Статистика GPS-треков БД. Поле last_pipeline_run агрегирует все записи pipeline_runs, принадлежащие последнему запуску (по максимальному started_at). Возвращает None если прогонов ещё не было. """ try: conn = _get_conn() cur = conn.cursor() cur.execute("SELECT COUNT(*) as cnt FROM tracks") total_tracks = cur.fetchone()["cnt"] cur.execute( "SELECT activity_type, COUNT(*) as cnt FROM tracks GROUP BY activity_type" ) by_activity = {row["activity_type"] or "other": row["cnt"] for row in cur.fetchall()} # REQ-F-12: агрегированный объект по всем строкам последнего прогона. # Все строки одного запуска pipeline имеют одинаковый started_at — # pipeline устанавливает его перед итерацией по (region, source). cur.execute( """ SELECT started_at, finished_at, region_id, source_id, status, tracks_new, errors_json FROM pipeline_runs WHERE started_at = (SELECT MAX(started_at) FROM pipeline_runs) ORDER BY region_id, source_id """ ) run_rows = cur.fetchall() if run_rows: regions: list = [] sources_ok: list = [] sources_error: list = [] tracks_added = 0 finished_at_values: list = [] for row in run_rows: region = row["region_id"] if region not in regions: regions.append(region) if row["status"] in ("ok", "partial"): sources_ok.append(row["source_id"]) else: sources_error.append(row["source_id"]) tracks_added += row["tracks_new"] or 0 if row["finished_at"]: finished_at_values.append(row["finished_at"]) last_run: Optional[dict] = { "started_at": run_rows[0]["started_at"], "finished_at": max(finished_at_values) if finished_at_values else None, "regions": regions, "sources_ok": sources_ok, "sources_error": sources_error, "tracks_added": tracks_added, } else: last_run = None cur.execute("SELECT sources_json FROM tracks") tracks_by_source: dict = {} for trow in cur.fetchall(): try: src_list = json.loads(trow["sources_json"] or "[]") except Exception: src_list = [] for src in src_list: tracks_by_source[src] = tracks_by_source.get(src, 0) + 1 conn.close() except Exception as exc: raise HTTPException(500, f"DB error: {exc}") db_size_mb = 0.0 try: db_size_mb = os.path.getsize(db_path) / 1024 / 1024 except OSError: pass return { "status": "ok", "db_path": db_path, "tracks_total": total_tracks, "tracks_by_activity": by_activity, "last_pipeline_run": last_run, "db_size_mb": db_size_mb, "tracks_by_source": tracks_by_source, "tile_cache_size": len(_gps_tile_cache), } @router.post("/cache/clear") async def clear_cache(): """Сбрасывает LRU-кэш GPS-тайлов.""" clear_gps_tile_cache() return {"status": "ok", "cleared": True} return router