auto-sync: 2026-05-02 14:40:01

This commit is contained in:
Stream
2026-05-02 14:40:01 +03:00
parent 145e2747a8
commit 7e205cef5f

View File

@@ -1,16 +1,13 @@
#!/usr/bin/env python3
"""
app.py — FastAPI сервер для Enduro Trails
- Раздаёт статику из static/
- /api/tiles/{z}/{x}/{y}.mvt — векторные тайлы из Spatialite
- /api/poi — список POI как GeoJSON
MVT тайлы через mapbox-vector-tile + shapely (без клиппинга — MapLibre клипит сам)
"""
import os
import math
import struct
import sqlite3
import json
from pathlib import Path
from fastapi import FastAPI, HTTPException, Response
@@ -18,510 +15,211 @@ from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
# ─── Конфиг ───────────────────────────────────────────────────────────────────
DATA_PATH = os.environ.get(
"DATA_PATH",
os.path.join(os.path.dirname(__file__), "../data/centralfederal.sqlite"),
DATA_PATH = os.path.abspath(
os.environ.get("DATA_PATH", os.path.join(os.path.dirname(__file__), "../data/centralfederal.sqlite"))
)
DATA_PATH = os.path.abspath(DATA_PATH)
STATIC_DIR = os.path.join(os.path.dirname(__file__), "static")
PORT = int(os.environ.get("PORT", 5558))
app = FastAPI(title="Enduro Trails API")
app.add_middleware(CORMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ─── Spatialite helper ────────────────────────────────────────────────────────
def get_db():
"""Открывает соединение с SQLite (с попыткой загрузить Spatialite)."""
conn = sqlite3.connect(DATA_PATH)
conn.row_factory = sqlite3.Row
conn.enable_load_extension(True)
for path in ["mod_spatialite", "/usr/lib/x86_64-linux-gnu/mod_spatialite.so",
"/usr/lib/mod_spatialite.so", "/usr/local/lib/mod_spatialite.so"]:
try:
conn.load_extension(path)
break
except Exception:
continue
conn.enable_load_extension(False)
for p in ["/usr/lib/x86_64-linux-gnu/mod_spatialite.so", "/usr/lib/mod_spatialite.so",
"/usr/local
def get_db():
conn = sqlite3.connect(DATA_PATH)
conn.row_factory = sqlite3.Row
return conn
# ─── Tile math ────────────────────────────────────────────────────────────────
def tile_to_bbox(z: int, x: int, y: int):
"""Возвращает (west, south, east, north) в градусах для тайла z/x/y."""
def tile_to_bbox(z, x, y):
n = 2 ** z
west = x / n * 360.0 - 180.0
east = (x + 1) / n * 360.0 - 180.0
north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))
south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n))))
return west, south, east, north
w = x / n * 360 - 180
e = (x + 1) / n * 360 - 180
n2 = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))
s = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n))))
return w, s, e, n2
def lon_to_tile_x(lon: float, z: int) -> float:
return (lon + 180.0) / 360.0 * (2 ** z)
def lat_to_tile_y(lat: float, z: int) -> float:
lat_r = math.radians(lat)
return (1.0 - math.log(math.tan(lat_r) + 1.0 / math.cos(lat_r)) / math.pi) / 2.0 * (2 ** z)
TILE_EXTENT = 4096
def lonlat_to_tile_coords(lon: float, lat: float, z: int, tx: int, ty: int):
"""Конвертирует lon/lat в пиксельные координаты тайла [0, TILE_EXTENT]."""
px = (lon_to_tile_x(lon, z) - tx) * TILE_EXTENT
py = (lat_to_tile_y(lat, z) - ty) * TILE_EXTENT
return int(px), int(py)
# ─── Minimal MVT encoder ──────────────────────────────────────────────────────
# Реализует минимальный Mapbox Vector Tile (protobuf) без внешних зависимостей
# Spec: https://github.com/mapbox/vector-tile-spec/tree/master/2.1
def _varint(value: int) -> bytes:
"""Encode unsigned varint."""
buf = []
while True:
towrite = value & 0x7F
value >>= 7
if value:
buf.append(towrite | 0x80)
else:
buf.append(towrite)
break
return bytes(buf)
def _zigzag(n: int) -> int:
return (n << 1) ^ (n >> 31)
def _pb_field(field_num: int, wire_type: int, data: bytes) -> bytes:
tag = (field_num << 3) | wire_type
return _varint(tag) + data
def _pb_string(field_num: int, s: str) -> bytes:
encoded = s.encode("utf-8")
return _pb_field(field_num, 2, _varint(len(encoded)) + encoded)
def _pb_bytes(field_num: int, b: bytes) -> bytes:
return _pb_field(field_num, 2, _varint(len(b)) + b)
def _pb_uint32(field_num: int, v: int) -> bytes:
return _pb_field(field_num, 0, _varint(v))
def encode_linestring_geometry(coords_px):
"""
Кодирует список (x, y) пикселей в MVT geometry (LINESTRING).
Возвращает bytes команд.
"""
if len(coords_px) < 2:
return None
cmds = []
# MoveTo первой точки
dx = coords_px[0][0]
dy = coords_px[0][1]
cmds.append((1 << 3) | 1) # MoveTo, count=1
cmds.append(_zigzag(dx))
cmds.append(_zigzag(dy))
# LineTo остальных
cmds.append(((len(coords_px) - 1) << 3) | 2)
prev_x, prev_y = coords_px[0]
for x, y in coords_px[1:]:
cmds.append(_zigzag(x - prev_x))
cmds.append(_zigzag(y - prev_y))
prev_x, prev_y = x, y
return b"".join(_varint(c) for c in cmds)
def encode_point_geometry(x: int, y: int) -> bytes:
cmds = []
cmds.append((1 << 3) | 1) # MoveTo, count=1
cmds.append(_zigzag(x))
cmds.append(_zigzag(y))
return b"".join(_varint(c) for c in cmds)
def build_mvt_tile(trails_rows, poi_rows, z: int, tx: int, ty: int) -> bytes:
"""Строит MVT тайл из строк trails и poi."""
def build_layer(name: str, features_data: list, geom_type: int) -> bytes:
"""
features_data: list of {"geom_bytes": bytes, "props": dict}
geom_type: 2=LINESTRING, 1=POINT
"""
if not features_data:
return b""
# Собираем ключи и значения
keys = []
key_index = {}
values = []
value_index = {}
feature_blobs = []
for fd in features_data:
geom_bytes = fd["geom_bytes"]
props = fd["props"]
if not geom_bytes:
continue
tags = []
for k, v in props.items():
if v is None:
continue
v_str = str(v)
if k not in key_index:
key_index[k] = len(keys)
keys.append(k)
ki = key_index[k]
vk = (type(v).__name__, v_str)
if vk not in value_index:
value_index[vk] = len(values)
values.append((type(v).__name__, v_str))
vi = value_index[vk]
tags.extend([ki, vi])
# Feature proto
feat = b""
feat += _pb_uint32(3, geom_type) # type
# geometry field=4
feat += _pb_bytes(4, geom_bytes)
# tags field=2
for t in tags:
feat += _pb_uint32(2, t)
feature_blobs.append(feat)
if not feature_blobs:
return b""
layer = b""
layer += _pb_uint32(15, 2) # version=2
layer += _pb_string(1, name) # name
layer += _pb_uint32(5, TILE_EXTENT) # extent
for k in keys:
layer += _pb_string(3, k)
for (vtype, vstr) in values:
# value message: string_value=1, float_value=2, ...
val_msg = b""
if vtype == "int" or vtype == "float":
try:
fv = float(vstr)
val_msg += _pb_field(3, 1, struct.pack("<d", fv)) # double_value
except Exception:
val_msg += _pb_string(1, vstr)
else:
val_msg += _pb_string(1, vstr)
layer += _pb_bytes(4, val_msg)
for fb in feature_blobs:
layer += _pb_bytes(2, fb)
return layer
# ── Trails layer ──
trails_features = []
for row in trails_rows:
geom_blob = row["geom"]
if not geom_blob:
continue
coords_px = wkb_linestring_to_pixels(geom_blob, z, tx, ty)
if not coords_px:
continue
geom_bytes = encode_linestring_geometry(coords_px)
if not geom_bytes:
continue
trails_features.append({
"geom_bytes": geom_bytes,
"props": {
"name": row["name"],
"highway": row["highway_type"],
"tracktype": row["track_type"],
"surface": row["surface"],
"length_m": row["length_m"],
"mtb_scale": row["mtb_scale"],
},
})
# ── POI layer ──
poi_features = []
for row in poi_rows:
geom_blob = row["geom"]
if not geom_blob:
continue
pt = wkb_point_to_pixels(geom_blob, z, tx, ty)
if pt is None:
continue
geom_bytes = encode_point_geometry(*pt)
poi_features.append({
"geom_bytes": geom_bytes,
"props": {
"name": row["name"],
"poi_type": row["poi_type"],
},
})
tile = b""
trails_layer = build_layer("trails", trails_features, 2)
poi_layer = build_layer("poi", poi_features, 1)
if trails_layer:
tile += _pb_bytes(3, trails_layer)
if poi_layer:
tile += _pb_bytes(3, poi_layer)
return tile
# ─── WKB parsers ──────────────────────────────────────────────────────────────
def wkb_linestring_to_pixels(blob: bytes, z: int, tx: int, ty: int):
"""Парсит WKB LineString и возвращает список пиксельных координат."""
def wkb_to_coords(blob):
try:
if len(blob) < 9:
b = bytes(blob)
if len(b) < 9:
return None
byte_order = blob[0]
endian = "<" if byte_order == 1 else ">"
geom_type = struct.unpack_from(endian + "I", blob, 1)[0]
# 2 = LineString, 1002 = LineString with SRID
if geom_type not in (2, 1000002, 2147483650):
# попробуем через shapely
return _wkb_via_shapely(blob, z, tx, ty, "line")
offset = 5
# Если есть SRID (geom_type & 0x20000000)
if geom_type & 0x20000000:
offset += 4
num_points = struct.unpack_from(endian + "I", blob, offset)[0]
endian = "<" if b[0] == 1 else ">"
gtype = struct.unpack_from(endian + "I", b, 1)[0]
offset = 5 + (4 if gtype & 0x20000000 else 0)
npts = struct.unpack_from(endian + "I", b, offset)[0]
offset += 4
coords_px = []
for _ in range(num_points):
lon, lat = struct.unpack_from(endian + "dd", blob, offset)
offset += 16
px, py = lonlat_to_tile_coords(lon, lat, z, tx, ty)
coords_px.append((px, py))
return coords_px if len(coords_px) >= 2 else None
coords = [struct.unpack_from(endian + "dd", b, offset + i * 16) for i range(npts)]
return coords if len(coords) >= 2 else None
except Exception:
return _wkb_via_shapely(blob, z, tx, ty, "line")
def wkb_point_to_pixels(blob: bytes, z: int, tx: int, ty: int):
"""Парсит WKB Point и возвращает (px, py)."""
try:
if len(blob) < 21:
try:
from shapely import wkb as swkb
return list(swkb.loads(bytes(blob)).coords)
except Exception:
return None
byte_order = blob[0]
endian = "<" if byte_order == 1 else ">"
geom_type = struct.unpack_from(endian + "I", blob, 1)[0]
if geom_type not in (1, 1000001, 2147483649):
return _wkb_via_shapely(blob, z, tx, ty, "point")
offset = 5
if geom_type & 0x20000000:
offset += 4
lon, lat = struct.unpack_from(endian + "dd", blob, offset)
return lonlat_to_tile_coords(lon, lat, z, tx, ty)
except Exception:
return _wkb_via_shapely(blob, z, tx, ty, "point")
def _wkb_via_shapely(blob, z, tx, ty, kind):
"""Fallback: парсим через shapely."""
def wkb_point_coords(blob):
try:
from shapely import wkb as swkb
geom = swkb.loads(bytes(blob))
if kind == "point":
return lonlat_to_tile_coords(geom.x, geom.y, z, tx, ty)
else:
coords = list(geom.coords)
return [lonlat_to_tile_coords(lon, lat, z, tx, ty) for lon, lat in coords]
b = bytes(blob)
endian = "<" if b[0] == 1 else ">"
gtype = struct.unpack_from(endian + "I", b, 1)[0]
offset = 5 + (4 if gtype & 0x20000000 else 0)
return struct.unpack_from(endian + "dd", b, offset)
except Exception:
return None
# ─── API endpoints ────────────────────────────────────────────────────────────
def build_mvt(trails_rows, poi_rows, z, x, y) -> bytes:
import mapbox_vector_tile
@app.get("/api/tiles/{z}/{x}/{y}.mvt")
async def get_tile(z: int, x: int, y: int):
if not os.path.exists(DATA_PATH):
raise HTTPException(503, f"База данных не найдена: {DATA_PATH}. Запустите parse.py")
w, s, e, n = tile_to_bbox(z, x, y)
trails_feats = []
for row in trails_rows:
coords = wkb_to_coords(row["geom"])
if not coords:
continue
props = {
"highway": row["highway_type"] or "",
"tracktype": row["track_type"] or "",
"surface": row["
def build_mvt(trails_rows, poi_rows, z, x, y) -> bytes:
import mapbox_vector_tile
from shapely.geometry import LineString, Point, MultiLineString
from shapely import clip_by_rect
west, south, east, north = tile_to_bbox(z, x, y)
# Небольшой буфер для краевых геометрий
buf = (east - west) * 0.1
q_west, q_south, q_east, q_north = west - buf, south - buf, east + buf, north + buf
# --- Trails ---
trails_feats = []
for row in trails_rows:
coords = wkb_to_coords(row["geom"]
if not coords:
continue
try:
line = LineString(coords)
clipped = clip_by_rect(line, west, south, east, north)
if clipped.is_empty:
continue
props = {
"highway": row["highway_type"] or "",
"tracktype": row["track_type"] or "",
"surface": row["surface"] or "",
"name": row["name"] or "",
}
if clipped.geom_type == "LineString":
trails_feats.append({"geometry": clipped.__geo_interface__, "properties": props})
elif clipped.geom_type == "MultiLineString":
for part in clipped.geoms:
trails_feats.append({"geometry": part.__geo_interface__, "properties": props})
except Exception:
continue
# --- POI ---
poi_feats = []
for row in poi_rows:
lon, lat = wkb_point_coords(row["geom"])
if lon is None:
continue
poi_feats.append({
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties": {"poi_type": row["poi_type"] or "", "name": "
})
layers = []
if trails_feats:
layers.append({"name": "trails", "features": trails_feats})
if poi_feats:
layers.append({"name": "poi", "features": poi_feats})
if not layers:
return b""
# MapLibre требует Y=0 сверху (=north) → y_coord_down=True НЕ флипает (оставляет как есть)
# По умолчанию библиотека флипает Y, нам нужно y_coord_down=True чтобы НЕ флипать
return mapbox_vector_tile.encode(
layers,
quantize_bounds=(west, south, east, north),
extents=4096,
default_options={'y_coord_down': False},
)
@app.get("/api/tiles/{z}/{x}/{y}.mvt")
async def get_tile(z: int, x: int, y: int
if not os.path.exists(DATA_PATH):
raise HTTPException(503, f"База данных не найдена: {DATA_PATH}")
w, s, e, n = tile_to_bbox(z, x, y)
buf_x = (e - w) * 0.15
buf_y = (n - s) * 0.15
qw, qs, qe, qn = w - buf_x, s - buf_y, e + buf_x, n + buf_y
limit = 2000 if z <= 7 else (5000 if z <= 9 else (10000 if z <= 11 else 20000))
try:
conn = get_db()
cur = conn.cursor()
cur.execute(
"SELECT osm_id, highway_type, track_type, surface, name, length_m, mtb_scale, geom "
"FROM trails WHERE min_lon <= ? AND max_lon >= ? AND min_lat <= ? AND max_lat >= ? "
"LIMIT ?",
(qe, qw, qn, qs, limit),
)
trails_rows = cur.fetchall()
# Читаем все trails и фильтруем по bbox через WKB парсинг в Python
# (ST_Intersects не работает с raw WKB без правильной инициализации Spatialite)
cur.execute("""
SELECT osm_id, highway_type, track_type, surface, name,
length_m, mtb_scale, geom
FROM trails
""")
all_trails = cur.fetchall()
# Фильтруем по bbox через первую точку WKB
trails_rows = []
for row in all_trails:
geom_blob = row["geom"]
if not geom_blob:
continue
try:
blob = bytes(geom_blob)
endian = "<" if blob[0] == 1 else ">"
gtype = struct.unpack_from(endian + "I", blob, 1)[0]
offset = 5
if gtype & 0x20000000:
offset += 4 # skip SRID
npts = struct.unpack_from(endian + "I", blob, offset)[0]
offset += 4
if npts < 2:
continue
# Проверяем первую и последнюю точку для bbox
lon1, lat1 = struct.unpack_from(endian + "dd", blob, offset)
lon2, lat2 = struct.unpack_from(endian + "dd", blob, offset + (npts - 1) * 16)
min_lon = min(lon1, lon2)
max_lon = max(lon1, lon2)
min_lat = min(lat1, lat2)
max_lat = max(lat1, lat2)
if max_lon < q_west or min_lon > q_east or max_lat < q_south or min_lat > q_north:
continue
trails_rows.append(row)
if len(trails_rows) >= 5000:
break
except Exception:
continue
cur.execute("SELECT osm_id, poi_type, name, geom FROM poi")
all_poi = cur.fetchall()
poi_rows = []
for row in all_poi:
geom_blob = row["geom"]
if not geom_blob:
continue
try:
blob = bytes(geom_blob)
endian = "<" if blob[0] == 1 else ">"
offset = 5
gtype = struct.unpack_from(endian + "I", blob, 1)[0]
if gtype & 0x20000000:
offset += 4
lon, lat = struct.unpack_from(endian + "dd", blob, offset)
if q_west <= lon <= q_east and q_south <= lat <= q_north:
poi_rows.append(row)
except Exception:
continue
cur.execute(
"SELECT osm_id, poi_type, name, geom "
"FROM poi WHERE min_lon <= ? AND max_lon >= ? AND min_lat <= ? AND max_lat >= ?",
(qe, qw, qn, qs),
)
poi_rows = cur.fetchall()
conn.close()
except Exception as exc:
raise HTTPException(500, f"Ошибка БД: {exc}")
except Exception as e:
raise HTTPException(500, f"Ошибка БД: {e}")
mvt = build_mvt_tile(trails_rows, poi_rows, z, x, y)
return Response(
content=mvt,
media_type="application/x-protobuf",
headers={
"Content-Encoding": "identity",
"Access-Control-Allow-Origin": "*",
},
)
mvt = build_mvt(trails_rows, poi_rows, z, x, y)
return Response(content=mvt, media_type="application/x-protobuf",
headers={"Content-Encoding": "identity", "Access-Control-Allow-Origin": "*"
})
@app.get("/api/poi")
async def get_poi():
if not os.path.exists(DATA_PATH):
raise HTTPException(503, f"База данных не найдена: {DATA_PATH}")
try:
conn = get_db()
cur = conn.cursor()
cur.execute("SELECT osm_id, poi_type, name, geom FROM poi LIMIT 10000")
rows = cur.fetchall()
conn.close()
except Exception as e:
raise HTTPException(500, f"Ошибка БД: {e}")
except Exception as exc:
raise HTTPException(500, f"Ошибка БД: {exc}")
features = []
for row in rows:
geom_blob = row["geom"]
if not geom_blob:
pt = wkb_point_coords(row["geom])
if not pt:
continue
try:
byte_order = geom_blob[0]
endian = "<" if byte_order == 1 else ">"
lon, lat = struct.unpack_from(endian + "dd", bytes(geom_blob), 5)
features.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties": {
"osm_id": row["osm_id"],
"poi_type": row["poi_type"],
"name": row["name"],
},
})
except Exception:
continue
return {"type": "FeatureCollection", "features": features}
features.append({"type": "Feature", "geometry": {"type": "Point", "coordinates": list(pt)},
"properties": {"osm_id": row["osm_id"], "poi_type": row["poi_type"], "name": row["name"]}})
return {"type": "FeatureCollection", "features": feature
@app.get("/api/health")
async def health():
db_exists = os.path.exists(DATA_PATH)
return {
"status": "ok",
"db_path": DATA_PATH,
"db_exists": db_exists,
}
# ─── Static files ─────────────────────────────────────────────────────────────
return {"status": "ok", "db_path": DATA_PATH, "db_exists": os.path.exists(DATA_PATH)}
if os.path.exists(STATIC_DIR):
app.mount("/", StaticFiles(directory=STATIC_DIR, html=True), name="static")
# ─── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
print(f"==> Enduro Trails API")
print(f" DB: {DATA_PATH}")
print(f" Static: {STATIC_DIR}")
print(f" Port: {PORT}")
print(f"==> Enduro Trails API DB={DATA_PATH} Port={PORT}")
uvicorn.run(app, host="0.0.0.0", port=PORT)