1256 lines
45 KiB
Python
1256 lines
45 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
app.py — FastAPI сервер для Enduro Trails
|
||
- Раздаёт статику из static/
|
||
- /api/tiles/{z}/{x}/{y}.mvt — векторные тайлы из SQLite
|
||
- /api/route (POST) — роутинг через OSRM с альтернативами и статистикой покрытия
|
||
- /api/health — статус БД
|
||
"""
|
||
|
||
import os
|
||
import math
|
||
import struct
|
||
import sqlite3
|
||
|
||
import itertools
|
||
|
||
from shapely.geometry import LineString
|
||
from typing import List
|
||
|
||
|
||
from fastapi import FastAPI, HTTPException, Response
|
||
from fastapi.responses import FileResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel
|
||
import httpx
|
||
import uvicorn
|
||
|
||
# ─── Tile cache ──────────────────────────────────────────────────────────────
|
||
|
||
_tile_cache: dict = {}
|
||
_TILE_CACHE_MAX = 512
|
||
|
||
|
||
def get_cached_tile(z, x, y):
|
||
return _tile_cache.get((z, x, y))
|
||
|
||
|
||
def set_cached_tile(z, x, y, data: bytes):
|
||
if len(_tile_cache) >= _TILE_CACHE_MAX:
|
||
# FIFO вытеснение — удаляем самый старый ключ
|
||
_tile_cache.pop(next(iter(_tile_cache)))
|
||
_tile_cache[(z, x, y)] = data
|
||
|
||
|
||
# ─── Конфиг ───────────────────────────────────────────────────────────────────
|
||
|
||
DATA_PATH = os.environ.get(
|
||
"DATA_PATH",
|
||
os.path.join(os.path.dirname(__file__), "../data/centralfederal.sqlite"),
|
||
)
|
||
OSRM_URL = os.environ.get("OSRM_URL", "http://172.22.0.1:5559")
|
||
DATA_PATH = os.path.abspath(DATA_PATH)
|
||
STATIC_DIR = os.environ.get("STATIC_DIR", os.path.join(os.path.dirname(__file__), "../../src/web"))
|
||
PORT = int(os.environ.get("PORT", 5558))
|
||
|
||
app = FastAPI(title="Enduro Trails API")
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# ─── DB ───────────────────────────────────────────────────────────────────────
|
||
|
||
def get_db():
|
||
conn = sqlite3.connect(DATA_PATH)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
|
||
# ─── Tile math ────────────────────────────────────────────────────────────────
|
||
|
||
def tile_to_bbox(z: int, x: int, y: int):
|
||
n = 2 ** z
|
||
west = x / n * 360.0 - 180.0
|
||
east = (x + 1) / n * 360.0 - 180.0
|
||
north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))
|
||
south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n))))
|
||
return west, south, east, north
|
||
|
||
|
||
# ─── WKB parser ───────────────────────────────────────────────────────────────
|
||
|
||
def wkb_to_coords(blob: bytes):
|
||
"""Парсит WKB LineString с опциональным SRID, возвращает [(lon, lat), ...]."""
|
||
try:
|
||
b = bytes(blob)
|
||
if len(b) < 9:
|
||
return None
|
||
endian = "<" if b[0] == 1 else ">"
|
||
gtype = struct.unpack_from(endian + "I", b, 1)[0]
|
||
base_type = gtype & 0xFF
|
||
if base_type != 2:
|
||
return None
|
||
offset = 5
|
||
if gtype & 0x20000000:
|
||
offset += 4
|
||
npts = struct.unpack_from(endian + "I", b, offset)[0]
|
||
offset += 4
|
||
coords = []
|
||
for _ in range(npts):
|
||
lon, lat = struct.unpack_from(endian + "dd", b, offset)
|
||
offset += 16
|
||
coords.append((lon, lat))
|
||
return coords if len(coords) >= 2 else None
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def wkb_point_coords(blob: bytes):
|
||
"""Парсит WKB Point, возвращает (lon, lat)."""
|
||
try:
|
||
b = bytes(blob)
|
||
if len(b) < 21:
|
||
return None
|
||
endian = "<" if b[0] == 1 else ">"
|
||
gtype = struct.unpack_from(endian + "I", b, 1)[0]
|
||
base_type = gtype & 0xFF
|
||
if base_type != 1:
|
||
return None
|
||
offset = 5
|
||
if gtype & 0x20000000:
|
||
offset += 4
|
||
lon, lat = struct.unpack_from(endian + "dd", b, offset)
|
||
return lon, lat
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
# ─── MVT builder ──────────────────────────────────────────────────────────────
|
||
|
||
def simplify_coords(coords, z):
|
||
"""Упрощает геометрию трека по зуму через Douglas-Peucker."""
|
||
if z >= 12:
|
||
return coords # без упрощения на детальных зумах
|
||
elif z >= 10:
|
||
tolerance = 0.0005 # ~50м
|
||
elif z >= 8:
|
||
tolerance = 0.002 # ~200м
|
||
else:
|
||
tolerance = 0.008 # ~800м на z7 и ниже
|
||
|
||
if len(coords) < 3:
|
||
return coords
|
||
|
||
line = LineString(coords)
|
||
simplified = line.simplify(tolerance, preserve_topology=False)
|
||
result = list(simplified.coords)
|
||
return result if len(result) >= 2 else coords
|
||
|
||
|
||
def build_mvt(trails_rows, poi_rows, z, x, y) -> bytes:
|
||
"""Собирает MVT тайл."""
|
||
import mapbox_vector_tile
|
||
|
||
west, south, east, north = tile_to_bbox(z, x, y)
|
||
|
||
trails_features = []
|
||
for row in trails_rows:
|
||
coords = wkb_to_coords(row["geom"])
|
||
if not coords:
|
||
continue
|
||
coords = simplify_coords(coords, z)
|
||
try:
|
||
props = {
|
||
"highway": row["highway_type"] or "",
|
||
"tracktype": row["track_type"] or "",
|
||
"surface": row["surface"] or "",
|
||
"name": row["name"] or "",
|
||
"length_m": row["length_m"] or 0,
|
||
"mtb_scale": row["mtb_scale"] or "",
|
||
}
|
||
trails_features.append({
|
||
"geometry": {"type": "LineString", "coordinates": coords},
|
||
"properties": props,
|
||
})
|
||
except Exception:
|
||
continue
|
||
|
||
poi_features = []
|
||
for row in poi_rows:
|
||
pt = wkb_point_coords(row["geom"])
|
||
if not pt:
|
||
continue
|
||
try:
|
||
props = {
|
||
"poi_type": row["poi_type"] or "",
|
||
"name": row["name"] or "",
|
||
}
|
||
poi_features.append({
|
||
"geometry": {"type": "Point", "coordinates": list(pt)},
|
||
"properties": props,
|
||
})
|
||
except Exception:
|
||
continue
|
||
|
||
layers = []
|
||
if trails_features:
|
||
layers.append({"name": "trails", "features": trails_features})
|
||
if poi_features:
|
||
layers.append({"name": "poi", "features": poi_features})
|
||
|
||
if not layers:
|
||
return b""
|
||
|
||
return mapbox_vector_tile.encode(
|
||
layers,
|
||
quantize_bounds=(west, south, east, north),
|
||
extents=4096,
|
||
default_options={'y_coord_down': False},
|
||
)
|
||
|
||
|
||
# ─── Route stats ──────────────────────────────────────────────────────────────
|
||
|
||
def haversine_m(lon1, lat1, lon2, lat2) -> float:
|
||
"""Расстояние между двумя точками в метрах."""
|
||
R = 6371000
|
||
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||
dphi = math.radians(lat2 - lat1)
|
||
dlam = math.radians(lon2 - lon1)
|
||
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
||
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
||
|
||
|
||
def calc_route_stats(geometry: dict, conn) -> dict | None:
|
||
"""
|
||
Считает статистику покрытия маршрута по типам дорог.
|
||
Оптимизированная версия: сэмплирует каждые ~500м, один запрос на сэмпл.
|
||
geometry — GeoJSON LineString {"type":"LineString","coordinates":[[lon,lat],...]}
|
||
"""
|
||
try:
|
||
coords = geometry.get("coordinates", [])
|
||
if len(coords) < 2:
|
||
return None
|
||
|
||
# Считаем длины сегментов и общую длину
|
||
total_len = 0.0
|
||
seg_lengths = []
|
||
for i in range(len(coords) - 1):
|
||
d = haversine_m(coords[i][0], coords[i][1], coords[i+1][0], coords[i+1][1])
|
||
seg_lengths.append(d)
|
||
total_len += d
|
||
|
||
if total_len < 1:
|
||
return None
|
||
|
||
# Сэмплируем каждые ~500м (не чаще чем каждые 5 точек)
|
||
# Для маршрута 100км → ~200 сэмплов, для 500км → ~1000 сэмплов
|
||
avg_step = total_len / max(len(seg_lengths), 1)
|
||
# Шаг в точках для ~500м интервала
|
||
pts_per_500m = max(5, int(round(500.0 / avg_step))) if avg_step > 0 else 20
|
||
|
||
cur = conn.cursor()
|
||
|
||
stats = {
|
||
"track_lev12_m": 0.0,
|
||
"track_lev345_m": 0.0,
|
||
"path_m": 0.0,
|
||
"asphalt_m": 0.0,
|
||
}
|
||
|
||
# Кэш результатов по ячейкам сетки ~0.01° (~1км)
|
||
# Чтобы не делать повторные запросы для близких точек
|
||
grid_cache: dict = {}
|
||
|
||
i = 0
|
||
while i < len(coords) - 1:
|
||
end_i = min(i + pts_per_500m, len(coords) - 1)
|
||
|
||
# Средняя точка сегмента
|
||
mid_lon = (coords[i][0] + coords[end_i][0]) / 2
|
||
mid_lat = (coords[i][1] + coords[end_i][1]) / 2
|
||
|
||
# Длина этого сегмента
|
||
seg_len = sum(seg_lengths[i:end_i])
|
||
|
||
# Ключ кэша: ячейка 0.01° (~1км)
|
||
grid_key = (round(mid_lon * 100), round(mid_lat * 100))
|
||
|
||
if grid_key in grid_cache:
|
||
hw, tt = grid_cache[grid_key]
|
||
else:
|
||
# Tight bbox ~150m — index on (min_lon, max_lon, min_lat, max_lat) is used
|
||
# No ORDER BY to avoid full scan; first bbox hit is good enough for stats
|
||
delta = 0.0015
|
||
try:
|
||
cur.execute("""
|
||
SELECT highway_type, track_type
|
||
FROM trails
|
||
WHERE min_lon <= ? AND max_lon >= ?
|
||
AND min_lat <= ? AND max_lat >= ?
|
||
LIMIT 1
|
||
""", (
|
||
mid_lon + delta, mid_lon - delta,
|
||
mid_lat + delta, mid_lat - delta,
|
||
))
|
||
row = cur.fetchone()
|
||
if row:
|
||
hw = (row["highway_type"] or "").lower()
|
||
tt = (row["track_type"] or "").lower()
|
||
else:
|
||
# Widen search to ~500m if nothing found nearby
|
||
delta2 = 0.005
|
||
cur.execute("""
|
||
SELECT highway_type, track_type
|
||
FROM trails
|
||
WHERE min_lon <= ? AND max_lon >= ?
|
||
AND min_lat <= ? AND max_lat >= ?
|
||
LIMIT 1
|
||
""", (
|
||
mid_lon + delta2, mid_lon - delta2,
|
||
mid_lat + delta2, mid_lat - delta2,
|
||
))
|
||
row = cur.fetchone()
|
||
if row:
|
||
hw = (row["highway_type"] or "").lower()
|
||
tt = (row["track_type"] or "").lower()
|
||
else:
|
||
hw, tt = "asphalt", ""
|
||
except Exception:
|
||
hw, tt = "asphalt", ""
|
||
grid_cache[grid_key] = (hw, tt)
|
||
|
||
if hw == "track":
|
||
if tt in ("grade1", "grade2"):
|
||
stats["track_lev12_m"] += seg_len
|
||
else:
|
||
stats["track_lev345_m"] += seg_len
|
||
elif hw in ("path", "bridleway", "footway"):
|
||
stats["path_m"] += seg_len
|
||
else:
|
||
stats["asphalt_m"] += seg_len
|
||
|
||
i = end_i
|
||
|
||
computed_total = (
|
||
stats["track_lev12_m"] + stats["track_lev345_m"] +
|
||
stats["path_m"] + stats["asphalt_m"]
|
||
)
|
||
if computed_total < 1:
|
||
return None
|
||
|
||
def pct(v):
|
||
return round(v / computed_total * 100)
|
||
|
||
dirt_total = stats["track_lev12_m"] + stats["track_lev345_m"] + stats["path_m"]
|
||
|
||
return {
|
||
"track_lev12_m": round(stats["track_lev12_m"]),
|
||
"track_lev345_m": round(stats["track_lev345_m"]),
|
||
"path_m": round(stats["path_m"]),
|
||
"asphalt_m": round(stats["asphalt_m"]),
|
||
"track_lev12_pct": pct(stats["track_lev12_m"]),
|
||
"track_lev345_pct": pct(stats["track_lev345_m"]),
|
||
"path_pct": pct(stats["path_m"]),
|
||
"asphalt_pct": pct(stats["asphalt_m"]),
|
||
"dirt_total_pct": pct(dirt_total),
|
||
}
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
# ─── Pydantic models ──────────────────────────────────────────────────────────
|
||
|
||
class Waypoint(BaseModel):
|
||
lon: float
|
||
lat: float
|
||
|
||
|
||
class RouteRequest(BaseModel):
|
||
waypoints: List[Waypoint]
|
||
alternatives: int = 5
|
||
|
||
|
||
class ReconRequest(BaseModel):
|
||
lon: float
|
||
lat: float
|
||
radius_km: float = 20
|
||
|
||
|
||
class ScenicRequest(BaseModel):
|
||
lon: float
|
||
lat: float
|
||
target_km: float = 100
|
||
|
||
|
||
# ─── API endpoints ────────────────────────────────────────────────────────────
|
||
|
||
@app.get("/api/cache/clear")
|
||
async def clear_cache():
|
||
_tile_cache.clear()
|
||
return {"status": "ok", "cleared": True}
|
||
|
||
|
||
@app.get("/api/tiles/{z}/{x}/{y}.mvt")
|
||
async def get_tile(z: int, x: int, y: int):
|
||
if z < 0 or z > 22:
|
||
raise HTTPException(400, "Invalid z")
|
||
max_coord = 2 ** z
|
||
if x < 0 or x >= max_coord or y < 0 or y >= max_coord:
|
||
raise HTTPException(400, "Invalid x/y for zoom level")
|
||
|
||
cached = get_cached_tile(z, x, y)
|
||
if cached is not None:
|
||
return Response(
|
||
content=cached,
|
||
media_type="application/x-protobuf",
|
||
headers={"Content-Encoding": "identity", "Access-Control-Allow-Origin": "*", "X-Cache": "HIT"},
|
||
)
|
||
|
||
if not os.path.exists(DATA_PATH):
|
||
raise HTTPException(503, f"База данных не найдена: {DATA_PATH}")
|
||
|
||
west, south, east, north = tile_to_bbox(z, x, y)
|
||
|
||
buf_x = (east - west) * 0.15
|
||
buf_y = (north - south) * 0.15
|
||
q_west = west - buf_x
|
||
q_east = east + buf_x
|
||
q_south = south - buf_y
|
||
q_north = north + buf_y
|
||
|
||
if z <= 7:
|
||
limit = 3000
|
||
elif z <= 9:
|
||
limit = 8000
|
||
elif z <= 11:
|
||
limit = 15000
|
||
else:
|
||
limit = 25000
|
||
|
||
if z <= 7:
|
||
min_length = 2000
|
||
elif z == 8:
|
||
min_length = 500
|
||
elif z == 9:
|
||
min_length = 200
|
||
elif z == 10:
|
||
min_length = 50
|
||
else:
|
||
min_length = 0
|
||
|
||
try:
|
||
conn = get_db()
|
||
cur = conn.cursor()
|
||
|
||
cur.execute("""
|
||
SELECT osm_id, highway_type, track_type, surface, name, length_m, mtb_scale, geom
|
||
FROM trails
|
||
WHERE min_lon <= ? AND max_lon >= ?
|
||
AND min_lat <= ? AND max_lat >= ?
|
||
AND (length_m >= ? OR length_m IS NULL)
|
||
LIMIT ?
|
||
""", (q_east, q_west, q_north, q_south, min_length, limit))
|
||
trails_rows = cur.fetchall()
|
||
|
||
cur.execute("""
|
||
SELECT osm_id, poi_type, name, geom
|
||
FROM poi
|
||
WHERE lon >= ? AND lon <= ? AND lat >= ? AND lat <= ?
|
||
LIMIT 500
|
||
""", (q_west, q_east, q_south, q_north))
|
||
poi_rows = cur.fetchall()
|
||
|
||
conn.close()
|
||
except Exception as e:
|
||
raise HTTPException(500, f"Ошибка БД: {e}")
|
||
|
||
mvt = build_mvt(trails_rows, poi_rows, z, x, y)
|
||
|
||
if mvt:
|
||
set_cached_tile(z, x, y, mvt)
|
||
|
||
return Response(
|
||
content=mvt,
|
||
media_type="application/x-protobuf",
|
||
headers={
|
||
"Content-Encoding": "identity",
|
||
"Access-Control-Allow-Origin": "*",
|
||
"X-Cache": "MISS",
|
||
},
|
||
)
|
||
|
||
|
||
@app.post("/api/recon")
|
||
async def post_recon(req: ReconRequest):
|
||
"""
|
||
Разведка: агрегация trails + POI в радиусе от точки.
|
||
"""
|
||
lon, lat, radius_km = req.lon, req.lat, req.radius_km
|
||
lat_rad = math.radians(lat)
|
||
delta_lat = radius_km / 111.0
|
||
delta_lon = radius_km / (111.0 * math.cos(lat_rad))
|
||
|
||
try:
|
||
conn = get_db()
|
||
cur = conn.cursor()
|
||
|
||
# Trails — агрегация по типам
|
||
cur.execute("""
|
||
SELECT highway_type, track_type, SUM(length_m) as total_m, COUNT(*) as cnt
|
||
FROM trails
|
||
WHERE min_lon <= ? AND max_lon >= ?
|
||
AND min_lat <= ? AND max_lat >= ?
|
||
AND length_m >= 100
|
||
GROUP BY highway_type, track_type
|
||
""", (lon + delta_lon, lon - delta_lon, lat + delta_lat, lat - delta_lat))
|
||
trail_rows = cur.fetchall()
|
||
|
||
# Агрегация по категориям
|
||
lev12_count, lev12_km = 0, 0.0
|
||
lev345_count, lev345_km = 0, 0.0
|
||
path_count, path_km = 0, 0.0
|
||
|
||
for row in trail_rows:
|
||
hw = (row["highway_type"] or "").lower()
|
||
tt = (row["track_type"] or "").lower() if row["track_type"] else ""
|
||
cnt = row["cnt"]
|
||
km = (row["total_m"] or 0) / 1000.0
|
||
|
||
if hw == "track":
|
||
if tt in ("grade1", "grade2"):
|
||
lev12_count += cnt
|
||
lev12_km += km
|
||
else:
|
||
lev345_count += cnt
|
||
lev345_km += km
|
||
elif hw in ("path", "bridleway"):
|
||
path_count += cnt
|
||
path_km += km
|
||
|
||
total_count = lev12_count + lev345_count + path_count
|
||
total_km = lev12_km + lev345_km + path_km
|
||
|
||
# POI — подсчёт по типам
|
||
cur.execute("""
|
||
SELECT poi_type, COUNT(*) as cnt
|
||
FROM poi
|
||
WHERE lon >= ? AND lon <= ? AND lat >= ? AND lat <= ?
|
||
GROUP BY poi_type
|
||
""", (lon - delta_lon, lon + delta_lon, lat - delta_lat, lat + delta_lat))
|
||
poi_rows = cur.fetchall()
|
||
|
||
poi_counts = {}
|
||
for row in poi_rows:
|
||
poi_counts[row["poi_type"]] = row["cnt"]
|
||
|
||
conn.close()
|
||
|
||
# POI types to report
|
||
poi_report = {}
|
||
for key in ["natural=water", "tourism=viewpoint", "historic=ruins",
|
||
"ford=yes", "natural=peak", "natural=cave_entrance"]:
|
||
poi_report[key] = poi_counts.get(key, 0)
|
||
|
||
return {
|
||
"center": {"lon": lon, "lat": lat},
|
||
"radius_km": radius_km,
|
||
"trails": {
|
||
"total_count": total_count,
|
||
"total_km": round(total_km, 1),
|
||
"lev12_count": lev12_count,
|
||
"lev12_km": round(lev12_km, 1),
|
||
"lev345_count": lev345_count,
|
||
"lev345_km": round(lev345_km, 1),
|
||
"path_count": path_count,
|
||
"path_km": round(path_km, 1),
|
||
},
|
||
"poi": poi_report,
|
||
}
|
||
except Exception as e:
|
||
raise HTTPException(500, f"Ошибка БД: {e}")
|
||
|
||
|
||
# ─── Scenic route helpers ─────────────────────────────────────────────────────
|
||
|
||
SCENIC_POI_SCORES = {
|
||
"natural=water": 10,
|
||
"tourism=viewpoint": 15,
|
||
"historic=ruins": 10,
|
||
"natural=peak": 12,
|
||
"natural=cave_entrance": 8,
|
||
"ford=yes": 5,
|
||
}
|
||
|
||
SCENIC_POI_ICONS = {
|
||
"natural=water": "💧",
|
||
"tourism=viewpoint": "👁",
|
||
"historic=ruins": "🏚",
|
||
"natural=peak": "🔺",
|
||
"natural=cave_entrance": "🕳",
|
||
"ford=yes": "🌊",
|
||
}
|
||
|
||
DIRECTION_NAMES = {0: "Северный", 1: "Восточный", 2: "Южный", 3: "Западный"}
|
||
|
||
|
||
def _angle_bucket(lon, lat, center_lon, center_lat):
|
||
"""Возвращает сектор 0-3 (С, В, Ю, З) от центра."""
|
||
a = math.degrees(math.atan2(lon - center_lon, lat - center_lat)) % 360
|
||
return int(a // 90)
|
||
|
||
|
||
async def _osrm_route_waypoints(waypoints_lonlat: list[tuple[float, float]]) -> dict | None:
|
||
"""Запрос к OSRM со списком waypoints. Возвращает полный ответ OSRM."""
|
||
coords_str = ";".join(f"{lon},{lat}" for lon, lat in waypoints_lonlat)
|
||
radiuses_str = ";".join(["5000"] * len(waypoints_lonlat))
|
||
url = (
|
||
f"{OSRM_URL}/route/v1/driving/{coords_str}"
|
||
f"?overview=full&geometries=geojson&alternatives=false"
|
||
f"&radiuses={radiuses_str}"
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
resp = await client.get(url)
|
||
data = resp.json()
|
||
except Exception:
|
||
return None
|
||
if data.get("code") != "Ok" or not data.get("routes"):
|
||
# Retry with wider snap
|
||
radiuses_wide = ";".join(["10000"] * len(waypoints_lonlat))
|
||
url2 = (
|
||
f"{OSRM_URL}/route/v1/driving/{coords_str}"
|
||
f"?overview=full&geometries=geojson&alternatives=false"
|
||
f"&radiuses={radiuses_wide}"
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
resp = await client.get(url2)
|
||
data = resp.json()
|
||
except Exception:
|
||
return None
|
||
if data.get("code") != "Ok" or not data.get("routes"):
|
||
return None
|
||
return data
|
||
|
||
|
||
def _merge_geometries(geometries: list[dict]) -> dict:
|
||
"""Объединяет несколько LineString GeoJSON в одну."""
|
||
all_coords = []
|
||
for g in geometries:
|
||
if g.get("type") == "LineString":
|
||
all_coords.extend(g.get("coordinates", []))
|
||
return {"type": "LineString", "coordinates": all_coords}
|
||
|
||
|
||
async def _build_scenic_for_cluster(
|
||
start_lon, start_lat, poi_list, target_km, conn
|
||
) -> dict | None:
|
||
"""
|
||
Строит кольцевой маршрут: start → POI1 → POI2 → ... → start.
|
||
poi_list: [{lon, lat, poi_type, name, score}, ...]
|
||
"""
|
||
if not poi_list:
|
||
return None
|
||
|
||
# Ограничиваем до 5 POI
|
||
pois = poi_list[:5]
|
||
|
||
# Waypoints: start → POI sequence → start
|
||
wp = [(start_lon, start_lat)]
|
||
for p in pois:
|
||
wp.append((p["lon"], p["lat"]))
|
||
wp.append((start_lon, start_lat))
|
||
|
||
osrm_data = await _osrm_route_waypoints(wp)
|
||
if not osrm_data or not osrm_data.get("routes"):
|
||
return None
|
||
|
||
route = osrm_data["routes"][0]
|
||
geometry = route["geometry"]
|
||
distance_m = route["distance"]
|
||
duration_s = route["duration"]
|
||
|
||
# Если слишком длинный (> target * 1.3) — убрать последний POI и перестроить
|
||
if distance_m > target_km * 1300 and len(pois) > 1:
|
||
pois = pois[:-1]
|
||
wp = [(start_lon, start_lat)]
|
||
for p in pois:
|
||
wp.append((p["lon"], p["lat"]))
|
||
wp.append((start_lon, start_lat))
|
||
osrm_data2 = await _osrm_route_waypoints(wp)
|
||
if osrm_data2 and osrm_data2.get("routes"):
|
||
route = osrm_data2["routes"][0]
|
||
geometry = route["geometry"]
|
||
distance_m = route["distance"]
|
||
duration_s = route["duration"]
|
||
|
||
# Статистика покрытия
|
||
stats = None
|
||
if conn:
|
||
try:
|
||
stats = calc_route_stats(geometry, conn)
|
||
except Exception:
|
||
pass
|
||
|
||
# Waypoint labels для ответа
|
||
waypoints_out = [{"lon": start_lon, "lat": start_lat, "label": "Старт"}]
|
||
for p in pois:
|
||
icon = SCENIC_POI_ICONS.get(p["poi_type"], "📍")
|
||
name = p.get("name") or p["poi_type"]
|
||
waypoints_out.append({"lon": p["lon"], "lat": p["lat"], "label": f"{icon} {name}"})
|
||
waypoints_out.append({"lon": start_lon, "lat": start_lat, "label": "Финиш"})
|
||
|
||
scenic_score = sum(p.get("score", 0) for p in pois)
|
||
scenic_pois_out = [
|
||
{
|
||
"type": p["poi_type"],
|
||
"name": p.get("name", ""),
|
||
"lon": p["lon"],
|
||
"lat": p["lat"],
|
||
}
|
||
for p in pois
|
||
]
|
||
|
||
return {
|
||
"name": "",
|
||
"waypoints": waypoints_out,
|
||
"geometry": geometry,
|
||
"distance_m": round(distance_m),
|
||
"duration_s": round(duration_s),
|
||
"stats": stats,
|
||
"scenic_score": scenic_score,
|
||
"scenic_pois": scenic_pois_out,
|
||
}
|
||
|
||
|
||
@app.post("/api/scenic")
|
||
async def post_scenic(req: ScenicRequest):
|
||
"""
|
||
Красивый маршрут: кольцевой маршрут через живописные POI.
|
||
"""
|
||
lon, lat, target_km = req.lon, req.lat, req.target_km
|
||
if target_km < 20 or target_km > 500:
|
||
raise HTTPException(400, "Дистанция должна быть от 20 до 500 км")
|
||
|
||
# Радиус поиска POI
|
||
search_radius_km = target_km * 0.6
|
||
lat_rad = math.radians(lat)
|
||
delta_lat = search_radius_km / 111.0
|
||
delta_lon = search_radius_km / (111.0 * math.cos(lat_rad))
|
||
|
||
try:
|
||
conn = get_db()
|
||
cur = conn.cursor()
|
||
|
||
# Найти POI в радиусе
|
||
cur.execute("""
|
||
SELECT poi_type, name, lon, lat
|
||
FROM poi
|
||
WHERE lon >= ? AND lon <= ? AND lat >= ? AND lat <= ?
|
||
ORDER BY poi_type
|
||
""", (lon - delta_lon, lon + delta_lon, lat - delta_lat, lat + delta_lat))
|
||
poi_rows = cur.fetchall()
|
||
|
||
# Фильтруем POI по score и назначаем баллы
|
||
scored_pois = []
|
||
for row in poi_rows:
|
||
pt = row["poi_type"] or ""
|
||
if pt in SCENIC_POI_SCORES:
|
||
p_lon = row["lon"]
|
||
p_lat = row["lat"]
|
||
# Не берём POI ближе 3 км от старта
|
||
d = haversine_m(lon, lat, p_lon, p_lat)
|
||
if d < 3000:
|
||
continue
|
||
scored_pois.append({
|
||
"poi_type": pt,
|
||
"name": row["name"] or "",
|
||
"lon": p_lon,
|
||
"lat": p_lat,
|
||
"score": SCENIC_POI_SCORES[pt],
|
||
"distance_m": d,
|
||
})
|
||
|
||
if not scored_pois:
|
||
# Нет красивых мест — строим просто кольцо через ближайшие грунтовки
|
||
# Пробуем кольцо: старт → точка на расстоянии ~target_km/3 → старт
|
||
_ = 0
|
||
third_dist = target_km / 3.0
|
||
mid_lat = lat + (third_dist / 111.0)
|
||
mid_lon = lon
|
||
osrm_data = await _osrm_route_waypoints([
|
||
(lon, lat), (mid_lon, mid_lat), (lon, lat)
|
||
])
|
||
if osrm_data and osrm_data.get("routes"):
|
||
route = osrm_data["routes"][0]
|
||
geometry = route["geometry"]
|
||
stats = calc_route_stats(geometry, conn) if conn else None
|
||
conn.close()
|
||
return {
|
||
"routes": [{
|
||
"name": "Маршрут по грунтовкам",
|
||
"waypoints": [
|
||
{"lon": lon, "lat": lat, "label": "Старт"},
|
||
{"lon": mid_lon, "lat": mid_lat, "label": "Разворот"},
|
||
{"lon": lon, "lat": lat, "label": "Финиш"},
|
||
],
|
||
"geometry": geometry,
|
||
"distance_m": round(route["distance"]),
|
||
"duration_s": round(route["duration"]),
|
||
"stats": stats,
|
||
"scenic_score": 0,
|
||
"scenic_pois": [],
|
||
}]
|
||
}
|
||
conn.close()
|
||
raise HTTPException(404, "Не удалось построить маршрут")
|
||
|
||
# Разделить POI на кластеры по азимуту (4 сектора)
|
||
clusters: dict[int, list] = {0: [], 1: [], 2: [], 3: []}
|
||
for p in scored_pois:
|
||
bucket = _angle_bucket(p["lon"], p["lat"], lon, lat)
|
||
clusters[bucket].append(p)
|
||
|
||
# Для каждого кластера — жадный выбор POI
|
||
remaining_km = target_km * 0.8 # 80% дистанции на POI, 20% на возврат
|
||
|
||
cluster_poi_lists = {}
|
||
for bucket, pois in clusters.items():
|
||
if not pois:
|
||
cluster_poi_lists[bucket] = []
|
||
continue
|
||
# Сортируем по score/distance (жадный)
|
||
for p in pois:
|
||
p["score_per_km"] = p["score"] / max(p["distance_m"] / 1000.0, 1.0)
|
||
pois.sort(key=lambda p: p["score_per_km"], reverse=True)
|
||
|
||
selected = []
|
||
used_km = 0.0
|
||
for p in pois:
|
||
d_km = p["distance_m"] / 1000.0
|
||
if d_km > remaining_km * 0.5:
|
||
continue
|
||
if d_km < 3:
|
||
continue
|
||
if used_km + d_km > remaining_km:
|
||
continue
|
||
selected.append(p)
|
||
used_km += d_km
|
||
if len(selected) >= 5:
|
||
break
|
||
|
||
cluster_poi_lists[bucket] = selected
|
||
|
||
# Строим маршруты для кластеров с POI (до 3 альтернативных)
|
||
routes_out = []
|
||
used_buckets = 0
|
||
for bucket in range(4):
|
||
if used_buckets >= 3:
|
||
break
|
||
pois = cluster_poi_lists.get(bucket, [])
|
||
if not pois:
|
||
continue
|
||
|
||
result = await _build_scenic_for_cluster(lon, lat, pois, target_km, conn)
|
||
if result:
|
||
# Проверяем дистанцию: не < 50% target
|
||
if result["distance_m"] < target_km * 500:
|
||
continue
|
||
result["name"] = DIRECTION_NAMES.get(bucket, f"Вариант {used_buckets + 1}")
|
||
routes_out.append(result)
|
||
used_buckets += 1
|
||
|
||
# Если ни одного маршрута — попробовать все POI вместе
|
||
if not routes_out and scored_pois:
|
||
# Топ-5 POI по score_per_km
|
||
all_sorted = sorted(scored_pois, key=lambda p: p["score_per_km"], reverse=True)[:5]
|
||
result = await _build_scenic_for_cluster(lon, lat, all_sorted, target_km, conn)
|
||
if result:
|
||
result["name"] = "Маршрут"
|
||
routes_out.append(result)
|
||
|
||
conn.close()
|
||
|
||
if not routes_out:
|
||
raise HTTPException(404, "Не удалось построить красивый маршрут")
|
||
|
||
return {"routes": routes_out}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
raise HTTPException(500, f"Ошибка: {e}")
|
||
|
||
|
||
async def _osrm_segment_alternatives(lon_a: float, lat_a: float, lon_b: float, lat_b: float, depth: int = 0) -> list:
|
||
"""Запросить альтернативы для одного сегмента. При TooBig — разбить пополам."""
|
||
coords_str = f"{lon_a},{lat_a};{lon_b},{lat_b}"
|
||
url = (
|
||
f"{OSRM_URL}/route/v1/driving/{coords_str}"
|
||
f"?alternatives=true&overview=full&geometries=geojson&annotations=false&radiuses=5000;5000"
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
resp = await client.get(url)
|
||
data = resp.json()
|
||
except Exception:
|
||
return []
|
||
|
||
if data.get("code") == "TooBig" and depth < 2:
|
||
# Разбить пополам — найти середину через среднее координат
|
||
mid_lon = (lon_a + lon_b) / 2
|
||
mid_lat = (lat_a + lat_b) / 2
|
||
# Рекурсивно получить альтернативы для каждой половины
|
||
alts_a = await _osrm_segment_alternatives(lon_a, lat_a, mid_lon, mid_lat, depth + 1)
|
||
alts_b = await _osrm_segment_alternatives(mid_lon, mid_lat, lon_b, lat_b, depth + 1)
|
||
if not alts_a or not alts_b:
|
||
return []
|
||
# Скомбинировать: каждый вариант первой половины × каждый вариант второй
|
||
combined = []
|
||
for r_a, r_b in itertools.product(alts_a[:3], alts_b[:3]):
|
||
coords_a = r_a["geometry"]["coordinates"]
|
||
coords_b = r_b["geometry"]["coordinates"]
|
||
merged_coords = coords_a + coords_b[1:] # убрать дублирующую точку стыка
|
||
combined.append({
|
||
"distance": r_a["distance"] + r_b["distance"],
|
||
"duration": r_a["duration"] + r_b["duration"],
|
||
"geometry": {"type": "LineString", "coordinates": merged_coords},
|
||
})
|
||
return combined
|
||
|
||
if data.get("code") != "Ok" or not data.get("routes"):
|
||
# Попробовать без alternatives
|
||
url_single = (
|
||
f"{OSRM_URL}/route/v1/driving/{coords_str}"
|
||
f"?overview=full&geometries=geojson&annotations=false&radiuses=5000;5000"
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
resp = await client.get(url_single)
|
||
data = resp.json()
|
||
except Exception:
|
||
return []
|
||
if data.get("code") != "Ok":
|
||
return []
|
||
|
||
return data.get("routes", [])
|
||
|
||
|
||
async def _build_segmented_route(req: RouteRequest) -> dict:
|
||
"""Строит маршрут через промежуточные точки с альтернативами по сегментам."""
|
||
waypoints = req.waypoints
|
||
segments_count = len(waypoints) - 1
|
||
|
||
# Получить альтернативы для каждого сегмента
|
||
segment_alternatives = []
|
||
for i in range(segments_count):
|
||
wp_a = waypoints[i]
|
||
wp_b = waypoints[i + 1]
|
||
alts = await _osrm_segment_alternatives(wp_a.lon, wp_a.lat, wp_b.lon, wp_b.lat)
|
||
if not alts:
|
||
raise HTTPException(404, f"Маршрут не найден на сегменте {i + 1}")
|
||
segment_alternatives.append(alts)
|
||
|
||
# Скомбинировать сегменты (до 3 вариантов на сегмент чтобы не взорвать комбинаторику)
|
||
max_per_segment = 3
|
||
trimmed = [alts[:max_per_segment] for alts in segment_alternatives]
|
||
all_combos = list(itertools.product(*trimmed))
|
||
|
||
# Склеить геометрию для каждой комбинации
|
||
combined_routes = []
|
||
for combo in all_combos:
|
||
total_distance = sum(r["distance"] for r in combo)
|
||
total_duration = sum(r["duration"] for r in combo)
|
||
all_coords: list = []
|
||
for r in combo:
|
||
coords = r["geometry"]["coordinates"]
|
||
if all_coords:
|
||
all_coords.extend(coords[1:])
|
||
else:
|
||
all_coords.extend(coords)
|
||
combined_routes.append({
|
||
"distance": total_distance,
|
||
"duration": total_duration,
|
||
"geometry": {"type": "LineString", "coordinates": all_coords},
|
||
})
|
||
|
||
# Дедупликация по геометрии (5 контрольных точек)
|
||
def route_sig(coords):
|
||
n = len(coords)
|
||
if n == 0:
|
||
return ()
|
||
idxs = [0, n//4, n//2, 3*n//4, n-1]
|
||
return tuple((round(coords[i][0], 3), round(coords[i][1], 3)) for i in idxs if i < n)
|
||
|
||
seen = set()
|
||
deduped = []
|
||
for route in combined_routes:
|
||
sig = route_sig(route["geometry"]["coordinates"])
|
||
if sig not in seen:
|
||
seen.add(sig)
|
||
deduped.append(route)
|
||
|
||
deduped = deduped[:5]
|
||
|
||
if not deduped:
|
||
raise HTTPException(404, "Маршрут не найден")
|
||
|
||
# Считаем статистику через существующую calc_route_stats
|
||
try:
|
||
conn = get_db()
|
||
except Exception:
|
||
conn = None
|
||
|
||
routes_out = []
|
||
for idx, route in enumerate(deduped):
|
||
stats = None
|
||
if conn is not None:
|
||
try:
|
||
stats = calc_route_stats(route["geometry"], conn)
|
||
except Exception:
|
||
stats = None
|
||
routes_out.append({
|
||
"index": idx,
|
||
"distance_m": round(route["distance"]),
|
||
"duration_s": round(route["duration"]),
|
||
"geometry": route["geometry"],
|
||
"stats": stats,
|
||
})
|
||
|
||
if conn is not None:
|
||
try:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
|
||
if not routes_out:
|
||
raise HTTPException(404, "Маршрут не найден")
|
||
|
||
# Сортировать по dirt_total_pct убывающий
|
||
routes_out.sort(key=lambda r: (r["stats"] or {}).get("dirt_total_pct", 0), reverse=True)
|
||
for idx, r in enumerate(routes_out):
|
||
r["index"] = idx
|
||
|
||
# Waypoints для ответа
|
||
waypoints_out = []
|
||
for i, wp in enumerate(req.waypoints):
|
||
label = "Старт" if i == 0 else ("Финиш" if i == len(req.waypoints) - 1 else f"Точка {i}")
|
||
waypoints_out.append({"lon": wp.lon, "lat": wp.lat, "label": label})
|
||
|
||
return {"routes": routes_out, "waypoints": waypoints_out}
|
||
|
||
|
||
@app.post("/api/route")
|
||
async def post_route(req: RouteRequest):
|
||
"""
|
||
Роутинг через OSRM с альтернативными маршрутами и статистикой покрытия.
|
||
Принимает JSON: {"waypoints": [{"lon":..,"lat":..}, ...], "alternatives": 5}
|
||
"""
|
||
if len(req.waypoints) < 2:
|
||
raise HTTPException(400, "Нужно минимум 2 точки")
|
||
|
||
# При промежуточных точках — сегментный подход
|
||
if len(req.waypoints) > 2:
|
||
return await _build_segmented_route(req)
|
||
|
||
# Строим строку координат для OSRM
|
||
coords_str = ";".join(f"{wp.lon},{wp.lat}" for wp in req.waypoints)
|
||
alternatives = max(1, min(5, req.alternatives))
|
||
|
||
# Увеличенный snap radius для длинных маршрутов (5 км)
|
||
radiuses_str = ";".join(["5000"] * len(req.waypoints))
|
||
|
||
url = (
|
||
f"{OSRM_URL}/route/v1/driving/{coords_str}"
|
||
f"?alternatives={alternatives}&overview=full&geometries=geojson&annotations=false"
|
||
f"&radiuses={radiuses_str}"
|
||
)
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
resp = await client.get(url)
|
||
data = resp.json()
|
||
except Exception as e:
|
||
raise HTTPException(503, f"OSRM недоступен: {e}")
|
||
|
||
# OSRM возвращает TooBig когда маршрут слишком длинный для N альтернатив
|
||
# Retry-цепочка: alternatives=5 → 3 → 1, ВСЕГДА с radiuses (snap radius критичен для длинных маршрутов)
|
||
if data.get("code") == "TooBig":
|
||
url_retry3 = (
|
||
f"{OSRM_URL}/route/v1/driving/{coords_str}"
|
||
f"?alternatives=3&overview=full&geometries=geojson&annotations=false"
|
||
f"&radiuses={radiuses_str}"
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
resp = await client.get(url_retry3)
|
||
data = resp.json()
|
||
except Exception as e:
|
||
raise HTTPException(503, f"OSRM недоступен: {e}")
|
||
|
||
if data.get("code") == "TooBig":
|
||
url_retry1 = (
|
||
f"{OSRM_URL}/route/v1/driving/{coords_str}"
|
||
f"?overview=full&geometries=geojson&alternatives=false"
|
||
f"&radiuses={radiuses_str}"
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
resp = await client.get(url_retry1)
|
||
data = resp.json()
|
||
except Exception as e:
|
||
raise HTTPException(503, f"OSRM недоступен: {e}")
|
||
|
||
# Если NoSegment — пробуем с увеличенным radius (10 км вместо 5)
|
||
if data.get("code") == "NoSegment":
|
||
radiuses_wide = ";".join(["10000"] * len(req.waypoints))
|
||
url_wide = (
|
||
f"{OSRM_URL}/route/v1/driving/{coords_str}"
|
||
f"?overview=full&geometries=geojson&alternatives=false"
|
||
f"&radiuses={radiuses_wide}"
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=60) as client:
|
||
resp = await client.get(url_wide)
|
||
data = resp.json()
|
||
except Exception as e:
|
||
raise HTTPException(503, f"OSRM недоступен: {e}")
|
||
|
||
if data.get("code") != "Ok" or not data.get("routes"):
|
||
osrm_code = data.get("code", "Unknown")
|
||
osrm_msg = data.get("message", "")
|
||
if osrm_code == "NoRoute":
|
||
raise HTTPException(404, "Маршрут не найден: нет пути между точками")
|
||
elif osrm_code == "NoSegment":
|
||
raise HTTPException(404, "Маршрут не найден: точки слишком далеко от дорог")
|
||
elif osrm_code == "InvalidValue":
|
||
raise HTTPException(400, f"Некорректные координаты: {osrm_msg}")
|
||
else:
|
||
raise HTTPException(404, f"Маршрут не найден ({osrm_code})")
|
||
|
||
# Открываем БД один раз для всех маршрутов
|
||
try:
|
||
conn = get_db()
|
||
except Exception:
|
||
conn = None
|
||
|
||
routes_out = []
|
||
for idx, route in enumerate(data["routes"]):
|
||
geometry = route["geometry"]
|
||
distance_m = route["distance"]
|
||
duration_s = route["duration"]
|
||
|
||
# Считаем статистику покрытия
|
||
stats = None
|
||
if conn is not None:
|
||
try:
|
||
stats = calc_route_stats(geometry, conn)
|
||
except Exception:
|
||
stats = None
|
||
|
||
routes_out.append({
|
||
"index": idx,
|
||
"geometry": geometry,
|
||
"distance_m": round(distance_m),
|
||
"duration_s": round(duration_s),
|
||
"stats": stats,
|
||
})
|
||
|
||
if conn is not None:
|
||
try:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
|
||
return {"routes": routes_out}
|
||
|
||
|
||
# Обратная совместимость — старый GET endpoint (для линейки и прочего)
|
||
@app.get("/api/route")
|
||
async def get_route(
|
||
from_lon: float, from_lat: float,
|
||
to_lon: float, to_lat: float
|
||
):
|
||
"""Роутинг через OSRM (legacy GET). Параметры: from_lon, from_lat, to_lon, to_lat"""
|
||
url = (
|
||
f"{OSRM_URL}/route/v1/driving/"
|
||
f"{from_lon},{from_lat};{to_lon},{to_lat}"
|
||
f"?overview=full&geometries=geojson&annotations=false"
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
resp = await client.get(url)
|
||
data = resp.json()
|
||
except Exception as e:
|
||
raise HTTPException(503, f"OSRM недоступен: {e}")
|
||
|
||
if data.get("code") != "Ok" or not data.get("routes"):
|
||
raise HTTPException(404, "Маршрут не найден")
|
||
|
||
route = data["routes"][0]
|
||
geometry = route["geometry"]
|
||
distance_m = route["distance"]
|
||
duration_s = route["duration"]
|
||
|
||
return {
|
||
"type": "Feature",
|
||
"geometry": geometry,
|
||
"properties": {
|
||
"distance_m": round(distance_m),
|
||
"distance_km": round(distance_m / 1000, 1),
|
||
"duration_min": round(duration_s / 60),
|
||
"duration_s": round(duration_s),
|
||
}
|
||
}
|
||
|
||
|
||
@app.get("/api/health")
|
||
async def health():
|
||
return {
|
||
"status": "ok",
|
||
"db_path": DATA_PATH,
|
||
"db_exists": os.path.exists(DATA_PATH),
|
||
}
|
||
|
||
|
||
# ─── Terrain tiles ───────────────────────────────────────────────────────────
|
||
|
||
TERRAIN_DIR = os.environ.get(
|
||
"TERRAIN_DIR",
|
||
os.path.join(os.path.dirname(__file__), "../data/terrain"),
|
||
)
|
||
|
||
@app.get("/terrain/{layer}/{z}/{x}/{y}.png")
|
||
async def terrain_tile(layer: str, z: int, x: int, y: int):
|
||
"""Отдаёт растровые тайлы рельефа (hypso/hillshade)"""
|
||
if layer not in ("hypso", "hillshade"):
|
||
raise HTTPException(404, "Unknown layer")
|
||
tile_path = os.path.join(TERRAIN_DIR, layer, str(z), str(x), f"{y}.png")
|
||
if not os.path.exists(tile_path):
|
||
raise HTTPException(404, "Tile not found")
|
||
return FileResponse(
|
||
tile_path,
|
||
media_type="image/png",
|
||
headers={
|
||
"Cache-Control": "public, max-age=31536000, immutable",
|
||
"Access-Control-Allow-Origin": "*",
|
||
}
|
||
)
|
||
|
||
|
||
# ─── Static files ─────────────────────────────────────────────────────────────
|
||
|
||
if os.path.exists(STATIC_DIR):
|
||
app.mount("/", StaticFiles(directory=STATIC_DIR, html=True), name="static")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print(f"==> Enduro Trails API DB={DATA_PATH} Port={PORT}")
|
||
uvicorn.run("app:app", host="0.0.0.0", port=PORT, workers=4)
|