Files
enduro-trails/src/api/main.py
claude-bot 0060003f28
Some checks failed
CI / lint (push) Failing after 4s
CI / test (push) Failing after 4s
CI / build (push) Has been skipped
CI / lint (pull_request) Failing after 4s
CI / test (pull_request) Failing after 4s
CI / build (pull_request) Has been skipped
feat(gps-tracks): ET-008 публичные GPS-треки с публичных платформ
Backend:
- Миграция gps_tracks_001_init.sql: таблицы tracks + pipeline_runs
- Пакет src/api/gps_tracks/: models, db (WAL+upsert с dedup), dedup
  (bbox+length+date bucket-hash), mvt (LRU-кэш 1024 тайла), endpoint
  (GET /api/gps-tracks, GET /api/gps-tracks/tiles/{z}/{x}/{y}.mvt,
   GET /api/gps-tracks/health, POST /api/gps-tracks/cache/clear), config
- Парсеры: osm (split_bbox, haversine, defusedxml XXE-защита),
  enduro_russia + ttrails — заглушки (ADR-010/011 proposed, блокированы)
- Licensing guard: pipeline проверяет status ADR-файла до запуска источника
- scripts/gps_collect.py: CLI с --region/--source/--dry-run/--gc

Frontend:
- src/web/gps_tracks.js: двухрежимный слой (MVT z≤11, GeoJSON z≥12),
  debounced fetch + AbortController, фильтры активности/источника,
  цветовая палитра by-source/by-activity, halo на спутнике, popup трека,
  restorePublicTracksState(), localStorage persistence
- index.html: чекбокс «Публичные треки» в terrain-popup, #sheet-gps-filters
- app.css: .terrain-link-btn, .gps-filter-grid, .track-popup
- app.js: вызов restorePublicTracksState() в rebuildMapOverlays(),
  applyGpsHaloVisibility() в applyBaseLayer()

Конфиги:
- config/gps_sources.yaml: osm (enabled), enduro_russia/ttrails (disabled)
- config/gps_regions.yaml: ЦФО+Чувашия (enabled), Кавказ (disabled)

Docker:
- gps-collector service с profiles: [batch]

Тесты: 48 новых тестов (unit + integration), 125/125 pass

Refs: ET-008

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 12:28:54 +00:00

1265 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
app.py — FastAPI сервер для Enduro Trails
- Раздаёт статику из static/
- /api/tiles/{z}/{x}/{y}.mvt — векторные тайлы из SQLite
- /api/route (POST) — роутинг через OSRM с альтернативами и статистикой покрытия
- /api/health — статус БД
"""
import os
import math
import struct
import sqlite3
import itertools
GPS_TRACKS_DB_PATH = os.environ.get(
"GPS_TRACKS_DB_PATH",
os.path.join(os.path.dirname(__file__), "../../data/gps_tracks.sqlite"),
)
from shapely.geometry import LineString
from typing import List
from fastapi import FastAPI, HTTPException, Response
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import httpx
import uvicorn
# ─── Tile cache ──────────────────────────────────────────────────────────────
_tile_cache: dict = {}
_TILE_CACHE_MAX = 512
def get_cached_tile(z, x, y):
return _tile_cache.get((z, x, y))
def set_cached_tile(z, x, y, data: bytes):
if len(_tile_cache) >= _TILE_CACHE_MAX:
# FIFO вытеснение — удаляем самый старый ключ
_tile_cache.pop(next(iter(_tile_cache)))
_tile_cache[(z, x, y)] = data
# ─── Конфиг ───────────────────────────────────────────────────────────────────
DATA_PATH = os.environ.get(
"DATA_PATH",
os.path.join(os.path.dirname(__file__), "../data/centralfederal.sqlite"),
)
OSRM_URL = os.environ.get("OSRM_URL", "http://172.22.0.1:5559")
DATA_PATH = os.path.abspath(DATA_PATH)
STATIC_DIR = os.environ.get("STATIC_DIR", os.path.join(os.path.dirname(__file__), "../../src/web"))
PORT = int(os.environ.get("PORT", 5558))
app = FastAPI(title="Enduro Trails API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ─── DB ───────────────────────────────────────────────────────────────────────
def get_db():
conn = sqlite3.connect(DATA_PATH)
conn.row_factory = sqlite3.Row
return conn
# ─── Tile math ────────────────────────────────────────────────────────────────
def tile_to_bbox(z: int, x: int, y: int):
n = 2 ** z
west = x / n * 360.0 - 180.0
east = (x + 1) / n * 360.0 - 180.0
north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))
south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n))))
return west, south, east, north
# ─── WKB parser ───────────────────────────────────────────────────────────────
def wkb_to_coords(blob: bytes):
"""Парсит WKB LineString с опциональным SRID, возвращает [(lon, lat), ...]."""
try:
b = bytes(blob)
if len(b) < 9:
return None
endian = "<" if b[0] == 1 else ">"
gtype = struct.unpack_from(endian + "I", b, 1)[0]
base_type = gtype & 0xFF
if base_type != 2:
return None
offset = 5
if gtype & 0x20000000:
offset += 4
npts = struct.unpack_from(endian + "I", b, offset)[0]
offset += 4
coords = []
for _ in range(npts):
lon, lat = struct.unpack_from(endian + "dd", b, offset)
offset += 16
coords.append((lon, lat))
return coords if len(coords) >= 2 else None
except Exception:
return None
def wkb_point_coords(blob: bytes):
"""Парсит WKB Point, возвращает (lon, lat)."""
try:
b = bytes(blob)
if len(b) < 21:
return None
endian = "<" if b[0] == 1 else ">"
gtype = struct.unpack_from(endian + "I", b, 1)[0]
base_type = gtype & 0xFF
if base_type != 1:
return None
offset = 5
if gtype & 0x20000000:
offset += 4
lon, lat = struct.unpack_from(endian + "dd", b, offset)
return lon, lat
except Exception:
return None
# ─── MVT builder ──────────────────────────────────────────────────────────────
def simplify_coords(coords, z):
"""Упрощает геометрию трека по зуму через Douglas-Peucker."""
if z >= 12:
return coords # без упрощения на детальных зумах
elif z >= 10:
tolerance = 0.0005 # ~50м
elif z >= 8:
tolerance = 0.002 # ~200м
else:
tolerance = 0.008 # ~800м на z7 и ниже
if len(coords) < 3:
return coords
line = LineString(coords)
simplified = line.simplify(tolerance, preserve_topology=False)
result = list(simplified.coords)
return result if len(result) >= 2 else coords
def build_mvt(trails_rows, poi_rows, z, x, y) -> bytes:
"""Собирает MVT тайл."""
import mapbox_vector_tile
west, south, east, north = tile_to_bbox(z, x, y)
trails_features = []
for row in trails_rows:
coords = wkb_to_coords(row["geom"])
if not coords:
continue
coords = simplify_coords(coords, z)
try:
props = {
"highway": row["highway_type"] or "",
"tracktype": row["track_type"] or "",
"surface": row["surface"] or "",
"name": row["name"] or "",
"length_m": row["length_m"] or 0,
"mtb_scale": row["mtb_scale"] or "",
}
trails_features.append({
"geometry": {"type": "LineString", "coordinates": coords},
"properties": props,
})
except Exception:
continue
poi_features = []
for row in poi_rows:
pt = wkb_point_coords(row["geom"])
if not pt:
continue
try:
props = {
"poi_type": row["poi_type"] or "",
"name": row["name"] or "",
}
poi_features.append({
"geometry": {"type": "Point", "coordinates": list(pt)},
"properties": props,
})
except Exception:
continue
layers = []
if trails_features:
layers.append({"name": "trails", "features": trails_features})
if poi_features:
layers.append({"name": "poi", "features": poi_features})
if not layers:
return b""
return mapbox_vector_tile.encode(
layers,
quantize_bounds=(west, south, east, north),
extents=4096,
default_options={'y_coord_down': False},
)
# ─── Route stats ──────────────────────────────────────────────────────────────
def haversine_m(lon1, lat1, lon2, lat2) -> float:
"""Расстояние между двумя точками в метрах."""
R = 6371000
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def calc_route_stats(geometry: dict, conn) -> dict | None:
"""
Считает статистику покрытия маршрута по типам дорог.
Оптимизированная версия: сэмплирует каждые ~500м, один запрос на сэмпл.
geometry — GeoJSON LineString {"type":"LineString","coordinates":[[lon,lat],...]}
"""
try:
coords = geometry.get("coordinates", [])
if len(coords) < 2:
return None
# Считаем длины сегментов и общую длину
total_len = 0.0
seg_lengths = []
for i in range(len(coords) - 1):
d = haversine_m(coords[i][0], coords[i][1], coords[i+1][0], coords[i+1][1])
seg_lengths.append(d)
total_len += d
if total_len < 1:
return None
# Сэмплируем каждые ~500м (не чаще чем каждые 5 точек)
# Для маршрута 100км → ~200 сэмплов, для 500км → ~1000 сэмплов
avg_step = total_len / max(len(seg_lengths), 1)
# Шаг в точках для ~500м интервала
pts_per_500m = max(5, int(round(500.0 / avg_step))) if avg_step > 0 else 20
cur = conn.cursor()
stats = {
"track_lev12_m": 0.0,
"track_lev345_m": 0.0,
"path_m": 0.0,
"asphalt_m": 0.0,
}
# Кэш результатов по ячейкам сетки ~0.01° (~1км)
# Чтобы не делать повторные запросы для близких точек
grid_cache: dict = {}
i = 0
while i < len(coords) - 1:
end_i = min(i + pts_per_500m, len(coords) - 1)
# Средняя точка сегмента
mid_lon = (coords[i][0] + coords[end_i][0]) / 2
mid_lat = (coords[i][1] + coords[end_i][1]) / 2
# Длина этого сегмента
seg_len = sum(seg_lengths[i:end_i])
# Ключ кэша: ячейка 0.01° (~1км)
grid_key = (round(mid_lon * 100), round(mid_lat * 100))
if grid_key in grid_cache:
hw, tt = grid_cache[grid_key]
else:
# Tight bbox ~150m — index on (min_lon, max_lon, min_lat, max_lat) is used
# No ORDER BY to avoid full scan; first bbox hit is good enough for stats
delta = 0.0015
try:
cur.execute("""
SELECT highway_type, track_type
FROM trails
WHERE min_lon <= ? AND max_lon >= ?
AND min_lat <= ? AND max_lat >= ?
LIMIT 1
""", (
mid_lon + delta, mid_lon - delta,
mid_lat + delta, mid_lat - delta,
))
row = cur.fetchone()
if row:
hw = (row["highway_type"] or "").lower()
tt = (row["track_type"] or "").lower()
else:
# Widen search to ~500m if nothing found nearby
delta2 = 0.005
cur.execute("""
SELECT highway_type, track_type
FROM trails
WHERE min_lon <= ? AND max_lon >= ?
AND min_lat <= ? AND max_lat >= ?
LIMIT 1
""", (
mid_lon + delta2, mid_lon - delta2,
mid_lat + delta2, mid_lat - delta2,
))
row = cur.fetchone()
if row:
hw = (row["highway_type"] or "").lower()
tt = (row["track_type"] or "").lower()
else:
hw, tt = "asphalt", ""
except Exception:
hw, tt = "asphalt", ""
grid_cache[grid_key] = (hw, tt)
if hw == "track":
if tt in ("grade1", "grade2"):
stats["track_lev12_m"] += seg_len
else:
stats["track_lev345_m"] += seg_len
elif hw in ("path", "bridleway", "footway"):
stats["path_m"] += seg_len
else:
stats["asphalt_m"] += seg_len
i = end_i
computed_total = (
stats["track_lev12_m"] + stats["track_lev345_m"] +
stats["path_m"] + stats["asphalt_m"]
)
if computed_total < 1:
return None
def pct(v):
return round(v / computed_total * 100)
dirt_total = stats["track_lev12_m"] + stats["track_lev345_m"] + stats["path_m"]
return {
"track_lev12_m": round(stats["track_lev12_m"]),
"track_lev345_m": round(stats["track_lev345_m"]),
"path_m": round(stats["path_m"]),
"asphalt_m": round(stats["asphalt_m"]),
"track_lev12_pct": pct(stats["track_lev12_m"]),
"track_lev345_pct": pct(stats["track_lev345_m"]),
"path_pct": pct(stats["path_m"]),
"asphalt_pct": pct(stats["asphalt_m"]),
"dirt_total_pct": pct(dirt_total),
}
except Exception:
return None
# ─── Pydantic models ──────────────────────────────────────────────────────────
class Waypoint(BaseModel):
lon: float
lat: float
class RouteRequest(BaseModel):
waypoints: List[Waypoint]
alternatives: int = 5
class ReconRequest(BaseModel):
lon: float
lat: float
radius_km: float = 20
class ScenicRequest(BaseModel):
lon: float
lat: float
target_km: float = 100
# ─── API endpoints ────────────────────────────────────────────────────────────
@app.get("/api/cache/clear")
async def clear_cache():
_tile_cache.clear()
return {"status": "ok", "cleared": True}
@app.get("/api/tiles/{z}/{x}/{y}.mvt")
async def get_tile(z: int, x: int, y: int):
if z < 0 or z > 22:
raise HTTPException(400, "Invalid z")
max_coord = 2 ** z
if x < 0 or x >= max_coord or y < 0 or y >= max_coord:
raise HTTPException(400, "Invalid x/y for zoom level")
cached = get_cached_tile(z, x, y)
if cached is not None:
return Response(
content=cached,
media_type="application/x-protobuf",
headers={"Content-Encoding": "identity", "Access-Control-Allow-Origin": "*", "X-Cache": "HIT"},
)
if not os.path.exists(DATA_PATH):
raise HTTPException(503, f"База данных не найдена: {DATA_PATH}")
west, south, east, north = tile_to_bbox(z, x, y)
buf_x = (east - west) * 0.15
buf_y = (north - south) * 0.15
q_west = west - buf_x
q_east = east + buf_x
q_south = south - buf_y
q_north = north + buf_y
if z <= 7:
limit = 3000
elif z <= 9:
limit = 8000
elif z <= 11:
limit = 15000
else:
limit = 25000
if z <= 7:
min_length = 2000
elif z == 8:
min_length = 500
elif z == 9:
min_length = 200
elif z == 10:
min_length = 50
else:
min_length = 0
try:
conn = get_db()
cur = conn.cursor()
cur.execute("""
SELECT osm_id, highway_type, track_type, surface, name, length_m, mtb_scale, geom
FROM trails
WHERE min_lon <= ? AND max_lon >= ?
AND min_lat <= ? AND max_lat >= ?
AND (length_m >= ? OR length_m IS NULL)
LIMIT ?
""", (q_east, q_west, q_north, q_south, min_length, limit))
trails_rows = cur.fetchall()
cur.execute("""
SELECT osm_id, poi_type, name, geom
FROM poi
WHERE lon >= ? AND lon <= ? AND lat >= ? AND lat <= ?
LIMIT 500
""", (q_west, q_east, q_south, q_north))
poi_rows = cur.fetchall()
conn.close()
except Exception as e:
raise HTTPException(500, f"Ошибка БД: {e}")
mvt = build_mvt(trails_rows, poi_rows, z, x, y)
if mvt:
set_cached_tile(z, x, y, mvt)
return Response(
content=mvt,
media_type="application/x-protobuf",
headers={
"Content-Encoding": "identity",
"Access-Control-Allow-Origin": "*",
"X-Cache": "MISS",
},
)
@app.post("/api/recon")
async def post_recon(req: ReconRequest):
"""
Разведка: агрегация trails + POI в радиусе от точки.
"""
lon, lat, radius_km = req.lon, req.lat, req.radius_km
lat_rad = math.radians(lat)
delta_lat = radius_km / 111.0
delta_lon = radius_km / (111.0 * math.cos(lat_rad))
try:
conn = get_db()
cur = conn.cursor()
# Trails — агрегация по типам
cur.execute("""
SELECT highway_type, track_type, SUM(length_m) as total_m, COUNT(*) as cnt
FROM trails
WHERE min_lon <= ? AND max_lon >= ?
AND min_lat <= ? AND max_lat >= ?
AND length_m >= 100
GROUP BY highway_type, track_type
""", (lon + delta_lon, lon - delta_lon, lat + delta_lat, lat - delta_lat))
trail_rows = cur.fetchall()
# Агрегация по категориям
lev12_count, lev12_km = 0, 0.0
lev345_count, lev345_km = 0, 0.0
path_count, path_km = 0, 0.0
for row in trail_rows:
hw = (row["highway_type"] or "").lower()
tt = (row["track_type"] or "").lower() if row["track_type"] else ""
cnt = row["cnt"]
km = (row["total_m"] or 0) / 1000.0
if hw == "track":
if tt in ("grade1", "grade2"):
lev12_count += cnt
lev12_km += km
else:
lev345_count += cnt
lev345_km += km
elif hw in ("path", "bridleway"):
path_count += cnt
path_km += km
total_count = lev12_count + lev345_count + path_count
total_km = lev12_km + lev345_km + path_km
# POI — подсчёт по типам
cur.execute("""
SELECT poi_type, COUNT(*) as cnt
FROM poi
WHERE lon >= ? AND lon <= ? AND lat >= ? AND lat <= ?
GROUP BY poi_type
""", (lon - delta_lon, lon + delta_lon, lat - delta_lat, lat + delta_lat))
poi_rows = cur.fetchall()
poi_counts = {}
for row in poi_rows:
poi_counts[row["poi_type"]] = row["cnt"]
conn.close()
# POI types to report
poi_report = {}
for key in ["natural=water", "tourism=viewpoint", "historic=ruins",
"ford=yes", "natural=peak", "natural=cave_entrance"]:
poi_report[key] = poi_counts.get(key, 0)
return {
"center": {"lon": lon, "lat": lat},
"radius_km": radius_km,
"trails": {
"total_count": total_count,
"total_km": round(total_km, 1),
"lev12_count": lev12_count,
"lev12_km": round(lev12_km, 1),
"lev345_count": lev345_count,
"lev345_km": round(lev345_km, 1),
"path_count": path_count,
"path_km": round(path_km, 1),
},
"poi": poi_report,
}
except Exception as e:
raise HTTPException(500, f"Ошибка БД: {e}")
# ─── Scenic route helpers ─────────────────────────────────────────────────────
SCENIC_POI_SCORES = {
"natural=water": 10,
"tourism=viewpoint": 15,
"historic=ruins": 10,
"natural=peak": 12,
"natural=cave_entrance": 8,
"ford=yes": 5,
}
SCENIC_POI_ICONS = {
"natural=water": "💧",
"tourism=viewpoint": "👁",
"historic=ruins": "🏚",
"natural=peak": "🔺",
"natural=cave_entrance": "🕳",
"ford=yes": "🌊",
}
DIRECTION_NAMES = {0: "Северный", 1: "Восточный", 2: "Южный", 3: "Западный"}
def _angle_bucket(lon, lat, center_lon, center_lat):
"""Возвращает сектор 0-3 (С, В, Ю, З) от центра."""
a = math.degrees(math.atan2(lon - center_lon, lat - center_lat)) % 360
return int(a // 90)
async def _osrm_route_waypoints(waypoints_lonlat: list[tuple[float, float]]) -> dict | None:
"""Запрос к OSRM со списком waypoints. Возвращает полный ответ OSRM."""
coords_str = ";".join(f"{lon},{lat}" for lon, lat in waypoints_lonlat)
radiuses_str = ";".join(["5000"] * len(waypoints_lonlat))
url = (
f"{OSRM_URL}/route/v1/driving/{coords_str}"
f"?overview=full&geometries=geojson&alternatives=false"
f"&radiuses={radiuses_str}"
)
try:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.get(url)
data = resp.json()
except Exception:
return None
if data.get("code") != "Ok" or not data.get("routes"):
# Retry with wider snap
radiuses_wide = ";".join(["10000"] * len(waypoints_lonlat))
url2 = (
f"{OSRM_URL}/route/v1/driving/{coords_str}"
f"?overview=full&geometries=geojson&alternatives=false"
f"&radiuses={radiuses_wide}"
)
try:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.get(url2)
data = resp.json()
except Exception:
return None
if data.get("code") != "Ok" or not data.get("routes"):
return None
return data
def _merge_geometries(geometries: list[dict]) -> dict:
"""Объединяет несколько LineString GeoJSON в одну."""
all_coords = []
for g in geometries:
if g.get("type") == "LineString":
all_coords.extend(g.get("coordinates", []))
return {"type": "LineString", "coordinates": all_coords}
async def _build_scenic_for_cluster(
start_lon, start_lat, poi_list, target_km, conn
) -> dict | None:
"""
Строит кольцевой маршрут: start → POI1 → POI2 → ... → start.
poi_list: [{lon, lat, poi_type, name, score}, ...]
"""
if not poi_list:
return None
# Ограничиваем до 5 POI
pois = poi_list[:5]
# Waypoints: start → POI sequence → start
wp = [(start_lon, start_lat)]
for p in pois:
wp.append((p["lon"], p["lat"]))
wp.append((start_lon, start_lat))
osrm_data = await _osrm_route_waypoints(wp)
if not osrm_data or not osrm_data.get("routes"):
return None
route = osrm_data["routes"][0]
geometry = route["geometry"]
distance_m = route["distance"]
duration_s = route["duration"]
# Если слишком длинный (> target * 1.3) — убрать последний POI и перестроить
if distance_m > target_km * 1300 and len(pois) > 1:
pois = pois[:-1]
wp = [(start_lon, start_lat)]
for p in pois:
wp.append((p["lon"], p["lat"]))
wp.append((start_lon, start_lat))
osrm_data2 = await _osrm_route_waypoints(wp)
if osrm_data2 and osrm_data2.get("routes"):
route = osrm_data2["routes"][0]
geometry = route["geometry"]
distance_m = route["distance"]
duration_s = route["duration"]
# Статистика покрытия
stats = None
if conn:
try:
stats = calc_route_stats(geometry, conn)
except Exception:
pass
# Waypoint labels для ответа
waypoints_out = [{"lon": start_lon, "lat": start_lat, "label": "Старт"}]
for p in pois:
icon = SCENIC_POI_ICONS.get(p["poi_type"], "📍")
name = p.get("name") or p["poi_type"]
waypoints_out.append({"lon": p["lon"], "lat": p["lat"], "label": f"{icon} {name}"})
waypoints_out.append({"lon": start_lon, "lat": start_lat, "label": "Финиш"})
scenic_score = sum(p.get("score", 0) for p in pois)
scenic_pois_out = [
{
"type": p["poi_type"],
"name": p.get("name", ""),
"lon": p["lon"],
"lat": p["lat"],
}
for p in pois
]
return {
"name": "",
"waypoints": waypoints_out,
"geometry": geometry,
"distance_m": round(distance_m),
"duration_s": round(duration_s),
"stats": stats,
"scenic_score": scenic_score,
"scenic_pois": scenic_pois_out,
}
@app.post("/api/scenic")
async def post_scenic(req: ScenicRequest):
"""
Красивый маршрут: кольцевой маршрут через живописные POI.
"""
lon, lat, target_km = req.lon, req.lat, req.target_km
if target_km < 20 or target_km > 500:
raise HTTPException(400, "Дистанция должна быть от 20 до 500 км")
# Радиус поиска POI
search_radius_km = target_km * 0.6
lat_rad = math.radians(lat)
delta_lat = search_radius_km / 111.0
delta_lon = search_radius_km / (111.0 * math.cos(lat_rad))
try:
conn = get_db()
cur = conn.cursor()
# Найти POI в радиусе
cur.execute("""
SELECT poi_type, name, lon, lat
FROM poi
WHERE lon >= ? AND lon <= ? AND lat >= ? AND lat <= ?
ORDER BY poi_type
""", (lon - delta_lon, lon + delta_lon, lat - delta_lat, lat + delta_lat))
poi_rows = cur.fetchall()
# Фильтруем POI по score и назначаем баллы
scored_pois = []
for row in poi_rows:
pt = row["poi_type"] or ""
if pt in SCENIC_POI_SCORES:
p_lon = row["lon"]
p_lat = row["lat"]
# Не берём POI ближе 3 км от старта
d = haversine_m(lon, lat, p_lon, p_lat)
if d < 3000:
continue
scored_pois.append({
"poi_type": pt,
"name": row["name"] or "",
"lon": p_lon,
"lat": p_lat,
"score": SCENIC_POI_SCORES[pt],
"distance_m": d,
})
if not scored_pois:
# Нет красивых мест — строим просто кольцо через ближайшие грунтовки
# Пробуем кольцо: старт → точка на расстоянии ~target_km/3 → старт
_ = 0
third_dist = target_km / 3.0
mid_lat = lat + (third_dist / 111.0)
mid_lon = lon
osrm_data = await _osrm_route_waypoints([
(lon, lat), (mid_lon, mid_lat), (lon, lat)
])
if osrm_data and osrm_data.get("routes"):
route = osrm_data["routes"][0]
geometry = route["geometry"]
stats = calc_route_stats(geometry, conn) if conn else None
conn.close()
return {
"routes": [{
"name": "Маршрут по грунтовкам",
"waypoints": [
{"lon": lon, "lat": lat, "label": "Старт"},
{"lon": mid_lon, "lat": mid_lat, "label": "Разворот"},
{"lon": lon, "lat": lat, "label": "Финиш"},
],
"geometry": geometry,
"distance_m": round(route["distance"]),
"duration_s": round(route["duration"]),
"stats": stats,
"scenic_score": 0,
"scenic_pois": [],
}]
}
conn.close()
raise HTTPException(404, "Не удалось построить маршрут")
# Разделить POI на кластеры по азимуту (4 сектора)
clusters: dict[int, list] = {0: [], 1: [], 2: [], 3: []}
for p in scored_pois:
bucket = _angle_bucket(p["lon"], p["lat"], lon, lat)
clusters[bucket].append(p)
# Для каждого кластера — жадный выбор POI
remaining_km = target_km * 0.8 # 80% дистанции на POI, 20% на возврат
cluster_poi_lists = {}
for bucket, pois in clusters.items():
if not pois:
cluster_poi_lists[bucket] = []
continue
# Сортируем по score/distance (жадный)
for p in pois:
p["score_per_km"] = p["score"] / max(p["distance_m"] / 1000.0, 1.0)
pois.sort(key=lambda p: p["score_per_km"], reverse=True)
selected = []
used_km = 0.0
for p in pois:
d_km = p["distance_m"] / 1000.0
if d_km > remaining_km * 0.5:
continue
if d_km < 3:
continue
if used_km + d_km > remaining_km:
continue
selected.append(p)
used_km += d_km
if len(selected) >= 5:
break
cluster_poi_lists[bucket] = selected
# Строим маршруты для кластеров с POI (до 3 альтернативных)
routes_out = []
used_buckets = 0
for bucket in range(4):
if used_buckets >= 3:
break
pois = cluster_poi_lists.get(bucket, [])
if not pois:
continue
result = await _build_scenic_for_cluster(lon, lat, pois, target_km, conn)
if result:
# Проверяем дистанцию: не < 50% target
if result["distance_m"] < target_km * 500:
continue
result["name"] = DIRECTION_NAMES.get(bucket, f"Вариант {used_buckets + 1}")
routes_out.append(result)
used_buckets += 1
# Если ни одного маршрута — попробовать все POI вместе
if not routes_out and scored_pois:
# Топ-5 POI по score_per_km
all_sorted = sorted(scored_pois, key=lambda p: p["score_per_km"], reverse=True)[:5]
result = await _build_scenic_for_cluster(lon, lat, all_sorted, target_km, conn)
if result:
result["name"] = "Маршрут"
routes_out.append(result)
conn.close()
if not routes_out:
raise HTTPException(404, "Не удалось построить красивый маршрут")
return {"routes": routes_out}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"Ошибка: {e}")
async def _osrm_segment_alternatives(lon_a: float, lat_a: float, lon_b: float, lat_b: float, depth: int = 0) -> list:
"""Запросить альтернативы для одного сегмента. При TooBig — разбить пополам."""
coords_str = f"{lon_a},{lat_a};{lon_b},{lat_b}"
url = (
f"{OSRM_URL}/route/v1/driving/{coords_str}"
f"?alternatives=true&overview=full&geometries=geojson&annotations=false&radiuses=5000;5000"
)
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
data = resp.json()
except Exception:
return []
if data.get("code") == "TooBig" and depth < 2:
# Разбить пополам — найти середину через среднее координат
mid_lon = (lon_a + lon_b) / 2
mid_lat = (lat_a + lat_b) / 2
# Рекурсивно получить альтернативы для каждой половины
alts_a = await _osrm_segment_alternatives(lon_a, lat_a, mid_lon, mid_lat, depth + 1)
alts_b = await _osrm_segment_alternatives(mid_lon, mid_lat, lon_b, lat_b, depth + 1)
if not alts_a or not alts_b:
return []
# Скомбинировать: каждый вариант первой половины × каждый вариант второй
combined = []
for r_a, r_b in itertools.product(alts_a[:3], alts_b[:3]):
coords_a = r_a["geometry"]["coordinates"]
coords_b = r_b["geometry"]["coordinates"]
merged_coords = coords_a + coords_b[1:] # убрать дублирующую точку стыка
combined.append({
"distance": r_a["distance"] + r_b["distance"],
"duration": r_a["duration"] + r_b["duration"],
"geometry": {"type": "LineString", "coordinates": merged_coords},
})
return combined
if data.get("code") != "Ok" or not data.get("routes"):
# Попробовать без alternatives
url_single = (
f"{OSRM_URL}/route/v1/driving/{coords_str}"
f"?overview=full&geometries=geojson&annotations=false&radiuses=5000;5000"
)
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url_single)
data = resp.json()
except Exception:
return []
if data.get("code") != "Ok":
return []
return data.get("routes", [])
async def _build_segmented_route(req: RouteRequest) -> dict:
"""Строит маршрут через промежуточные точки с альтернативами по сегментам."""
waypoints = req.waypoints
segments_count = len(waypoints) - 1
# Получить альтернативы для каждого сегмента
segment_alternatives = []
for i in range(segments_count):
wp_a = waypoints[i]
wp_b = waypoints[i + 1]
alts = await _osrm_segment_alternatives(wp_a.lon, wp_a.lat, wp_b.lon, wp_b.lat)
if not alts:
raise HTTPException(404, f"Маршрут не найден на сегменте {i + 1}")
segment_alternatives.append(alts)
# Скомбинировать сегменты (до 3 вариантов на сегмент чтобы не взорвать комбинаторику)
max_per_segment = 3
trimmed = [alts[:max_per_segment] for alts in segment_alternatives]
all_combos = list(itertools.product(*trimmed))
# Склеить геометрию для каждой комбинации
combined_routes = []
for combo in all_combos:
total_distance = sum(r["distance"] for r in combo)
total_duration = sum(r["duration"] for r in combo)
all_coords: list = []
for r in combo:
coords = r["geometry"]["coordinates"]
if all_coords:
all_coords.extend(coords[1:])
else:
all_coords.extend(coords)
combined_routes.append({
"distance": total_distance,
"duration": total_duration,
"geometry": {"type": "LineString", "coordinates": all_coords},
})
# Дедупликация по геометрии (5 контрольных точек)
def route_sig(coords):
n = len(coords)
if n == 0:
return ()
idxs = [0, n//4, n//2, 3*n//4, n-1]
return tuple((round(coords[i][0], 3), round(coords[i][1], 3)) for i in idxs if i < n)
seen = set()
deduped = []
for route in combined_routes:
sig = route_sig(route["geometry"]["coordinates"])
if sig not in seen:
seen.add(sig)
deduped.append(route)
deduped = deduped[:5]
if not deduped:
raise HTTPException(404, "Маршрут не найден")
# Считаем статистику через существующую calc_route_stats
try:
conn = get_db()
except Exception:
conn = None
routes_out = []
for idx, route in enumerate(deduped):
stats = None
if conn is not None:
try:
stats = calc_route_stats(route["geometry"], conn)
except Exception:
stats = None
routes_out.append({
"index": idx,
"distance_m": round(route["distance"]),
"duration_s": round(route["duration"]),
"geometry": route["geometry"],
"stats": stats,
})
if conn is not None:
try:
conn.close()
except Exception:
pass
if not routes_out:
raise HTTPException(404, "Маршрут не найден")
# Сортировать по dirt_total_pct убывающий
routes_out.sort(key=lambda r: (r["stats"] or {}).get("dirt_total_pct", 0), reverse=True)
for idx, r in enumerate(routes_out):
r["index"] = idx
# Waypoints для ответа
waypoints_out = []
for i, wp in enumerate(req.waypoints):
label = "Старт" if i == 0 else ("Финиш" if i == len(req.waypoints) - 1 else f"Точка {i}")
waypoints_out.append({"lon": wp.lon, "lat": wp.lat, "label": label})
return {"routes": routes_out, "waypoints": waypoints_out}
@app.post("/api/route")
async def post_route(req: RouteRequest):
"""
Роутинг через OSRM с альтернативными маршрутами и статистикой покрытия.
Принимает JSON: {"waypoints": [{"lon":..,"lat":..}, ...], "alternatives": 5}
"""
if len(req.waypoints) < 2:
raise HTTPException(400, "Нужно минимум 2 точки")
# При промежуточных точках — сегментный подход
if len(req.waypoints) > 2:
return await _build_segmented_route(req)
# Строим строку координат для OSRM
coords_str = ";".join(f"{wp.lon},{wp.lat}" for wp in req.waypoints)
alternatives = max(1, min(5, req.alternatives))
# Увеличенный snap radius для длинных маршрутов (5 км)
radiuses_str = ";".join(["5000"] * len(req.waypoints))
url = (
f"{OSRM_URL}/route/v1/driving/{coords_str}"
f"?alternatives={alternatives}&overview=full&geometries=geojson&annotations=false"
f"&radiuses={radiuses_str}"
)
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
data = resp.json()
except Exception as e:
raise HTTPException(503, f"OSRM недоступен: {e}")
# OSRM возвращает TooBig когда маршрут слишком длинный для N альтернатив
# Retry-цепочка: alternatives=5 → 3 → 1, ВСЕГДА с radiuses (snap radius критичен для длинных маршрутов)
if data.get("code") == "TooBig":
url_retry3 = (
f"{OSRM_URL}/route/v1/driving/{coords_str}"
f"?alternatives=3&overview=full&geometries=geojson&annotations=false"
f"&radiuses={radiuses_str}"
)
try:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.get(url_retry3)
data = resp.json()
except Exception as e:
raise HTTPException(503, f"OSRM недоступен: {e}")
if data.get("code") == "TooBig":
url_retry1 = (
f"{OSRM_URL}/route/v1/driving/{coords_str}"
f"?overview=full&geometries=geojson&alternatives=false"
f"&radiuses={radiuses_str}"
)
try:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.get(url_retry1)
data = resp.json()
except Exception as e:
raise HTTPException(503, f"OSRM недоступен: {e}")
# Если NoSegment — пробуем с увеличенным radius (10 км вместо 5)
if data.get("code") == "NoSegment":
radiuses_wide = ";".join(["10000"] * len(req.waypoints))
url_wide = (
f"{OSRM_URL}/route/v1/driving/{coords_str}"
f"?overview=full&geometries=geojson&alternatives=false"
f"&radiuses={radiuses_wide}"
)
try:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.get(url_wide)
data = resp.json()
except Exception as e:
raise HTTPException(503, f"OSRM недоступен: {e}")
if data.get("code") != "Ok" or not data.get("routes"):
osrm_code = data.get("code", "Unknown")
osrm_msg = data.get("message", "")
if osrm_code == "NoRoute":
raise HTTPException(404, "Маршрут не найден: нет пути между точками")
elif osrm_code == "NoSegment":
raise HTTPException(404, "Маршрут не найден: точки слишком далеко от дорог")
elif osrm_code == "InvalidValue":
raise HTTPException(400, f"Некорректные координаты: {osrm_msg}")
else:
raise HTTPException(404, f"Маршрут не найден ({osrm_code})")
# Открываем БД один раз для всех маршрутов
try:
conn = get_db()
except Exception:
conn = None
routes_out = []
for idx, route in enumerate(data["routes"]):
geometry = route["geometry"]
distance_m = route["distance"]
duration_s = route["duration"]
# Считаем статистику покрытия
stats = None
if conn is not None:
try:
stats = calc_route_stats(geometry, conn)
except Exception:
stats = None
routes_out.append({
"index": idx,
"geometry": geometry,
"distance_m": round(distance_m),
"duration_s": round(duration_s),
"stats": stats,
})
if conn is not None:
try:
conn.close()
except Exception:
pass
return {"routes": routes_out}
# Обратная совместимость — старый GET endpoint (для линейки и прочего)
@app.get("/api/route")
async def get_route(
from_lon: float, from_lat: float,
to_lon: float, to_lat: float
):
"""Роутинг через OSRM (legacy GET). Параметры: from_lon, from_lat, to_lon, to_lat"""
url = (
f"{OSRM_URL}/route/v1/driving/"
f"{from_lon},{from_lat};{to_lon},{to_lat}"
f"?overview=full&geometries=geojson&annotations=false"
)
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
data = resp.json()
except Exception as e:
raise HTTPException(503, f"OSRM недоступен: {e}")
if data.get("code") != "Ok" or not data.get("routes"):
raise HTTPException(404, "Маршрут не найден")
route = data["routes"][0]
geometry = route["geometry"]
distance_m = route["distance"]
duration_s = route["duration"]
return {
"type": "Feature",
"geometry": geometry,
"properties": {
"distance_m": round(distance_m),
"distance_km": round(distance_m / 1000, 1),
"duration_min": round(duration_s / 60),
"duration_s": round(duration_s),
}
}
@app.get("/api/health")
async def health():
return {
"status": "ok",
"db_path": DATA_PATH,
"db_exists": os.path.exists(DATA_PATH),
}
# ─── Terrain tiles ───────────────────────────────────────────────────────────
TERRAIN_DIR = os.environ.get(
"TERRAIN_DIR",
os.path.join(os.path.dirname(__file__), "../data/terrain"),
)
@app.get("/terrain/{layer}/{z}/{x}/{y}.png")
async def terrain_tile(layer: str, z: int, x: int, y: int):
"""Отдаёт растровые тайлы рельефа (hypso/hillshade)"""
if layer not in ("hypso", "hillshade"):
raise HTTPException(404, "Unknown layer")
tile_path = os.path.join(TERRAIN_DIR, layer, str(z), str(x), f"{y}.png")
if not os.path.exists(tile_path):
raise HTTPException(404, "Tile not found")
return FileResponse(
tile_path,
media_type="image/png",
headers={
"Cache-Control": "public, max-age=31536000, immutable",
"Access-Control-Allow-Origin": "*",
}
)
# ─── Static files ─────────────────────────────────────────────────────────────
from src.api.gps_tracks.endpoint import create_gps_router
gps_router = create_gps_router(GPS_TRACKS_DB_PATH)
app.include_router(gps_router)
if os.path.exists(STATIC_DIR):
app.mount("/", StaticFiles(directory=STATIC_DIR, html=True), name="static")
if __name__ == "__main__":
print(f"==> Enduro Trails API DB={DATA_PATH} Port={PORT}")
uvicorn.run("app:app", host="0.0.0.0", port=PORT, workers=4)