"""MVT-тайлы для GPS-треков (ET-008).""" import json import math import struct from typing import Optional from shapely.geometry import LineString # ─── LRU-like tile cache ───────────────────────────────────────────────────── _gps_tile_cache: dict = {} _GPS_TILE_CACHE_MAX = 1024 def get_gps_cached_tile(z: int, x: int, y: int) -> Optional[bytes]: return _gps_tile_cache.get((z, x, y)) def set_gps_cached_tile(z: int, x: int, y: int, data: bytes) -> None: if len(_gps_tile_cache) >= _GPS_TILE_CACHE_MAX: # FIFO вытеснение _gps_tile_cache.pop(next(iter(_gps_tile_cache))) _gps_tile_cache[(z, x, y)] = data def clear_gps_tile_cache() -> None: _gps_tile_cache.clear() # ─── Geometry helpers ──────────────────────────────────────────────────────── def _simplify_coords(coords: list, z: int) -> list: """Упрощает геометрию трека по зуму через Douglas-Peucker. Tolerance задаётся в градусах WGS84. На широте 55° с.ш. 1° долготы ≈ 64 км, поэтому tolerance=0.04 ≈ 2.6 км. На z5 один пиксель карты ≈ 5 км по долготе на 55° с.ш., так что 2.6 км даёт «одна точка на пиксель» — оптимум обзорного зума. ET-012 (ADR-016): добавлены тиры z==6 и z<=5; для z>=7 поведение не меняется (регрессия). """ if z >= 12: return coords elif z >= 10: tolerance = 0.0005 # ~50 м elif z >= 8: tolerance = 0.002 # ~200 м elif z == 7: tolerance = 0.008 # ~800 м (как было до ET-012) elif z == 6: tolerance = 0.018 # ~2 км else: tolerance = 0.04 # ~4 км (z5 и ниже) if len(coords) < 3: return coords line = LineString(coords) simplified = line.simplify(tolerance, preserve_topology=False) result = list(simplified.coords) return result if len(result) >= 2 else coords def _wkb_to_coords(blob: bytes) -> Optional[list]: """Парсит WKB LineString, возвращает [(lon, lat), ...].""" try: b = bytes(blob) if len(b) < 9: return None endian = "<" if b[0] == 1 else ">" gtype = struct.unpack_from(endian + "I", b, 1)[0] base_type = gtype & 0xFF if base_type != 2: return None offset = 5 if gtype & 0x20000000: offset += 4 npts = struct.unpack_from(endian + "I", b, offset)[0] offset += 4 coords = [] for _ in range(npts): lon, lat = struct.unpack_from(endian + "dd", b, offset) offset += 16 coords.append((lon, lat)) return coords if len(coords) >= 2 else None except Exception: return None def _tile_to_bbox(z: int, x: int, y: int) -> tuple: n = 2 ** z west = x / n * 360.0 - 180.0 east = (x + 1) / n * 360.0 - 180.0 north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n)))) south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n)))) return west, south, east, north # ─── MVT builder ───────────────────────────────────────────────────────────── def build_gps_mvt(rows: list, z: int, x: int, y: int) -> bytes: """Собирает MVT тайл с layer 'gps_tracks'. Args: rows: список sqlite3.Row из таблицы tracks z, x, y: координаты тайла Returns: bytes — protobuf MVT или b"" если нет фич """ import mapbox_vector_tile west, south, east, north = _tile_to_bbox(z, x, y) # Min-length фильтр и cap на число фич по зуму. # ET-012 (ADR-016): добавлены тиры z<=5 и z==6, чтобы при понижении # GPS_TRACKS_MIN_ZOOM до 5 размер тайла оставался <= 200 KB (M-8) # и в кадре оставались только «магистральные» треки (M-9). if z <= 5: min_length_m = 10000 # 10 км — только «магистральные» треки limit = 1500 elif z == 6: min_length_m = 5000 # 5 км limit = 2000 elif z == 7: min_length_m = 2000 # как было для z<=7 до ET-012 limit = 3000 elif z <= 9: min_length_m = 0 limit = 8000 elif z <= 11: min_length_m = 0 limit = 15000 else: min_length_m = 0 limit = 25000 features = [] for row in rows: length_m = row["length_m"] or 0 # Min-length фильтр if min_length_m > 0 and length_m < min_length_m: continue if len(features) >= limit: break coords = _wkb_to_coords(row["geom"]) if not coords: continue coords = _simplify_coords(coords, z) try: sources_list = json.loads(row["sources_json"] or "[]") sources_str = ",".join(sources_list) first_source = sources_list[0] if sources_list else "" ext_urls = json.loads(row["external_urls_json"] or "[]") ext_url = ext_urls[0] if ext_urls else "" props = { "id": row["id"], "activity": row["activity_type"] or "other", "source": first_source, "sources": sources_str, "length_km": round(length_m / 1000, 2), "name": row["name"] or "", "ext_url": ext_url, } features.append({ "geometry": {"type": "LineString", "coordinates": coords}, "properties": props, }) except Exception: continue if not features: return b"" return mapbox_vector_tile.encode( [{"name": "gps_tracks", "features": features}], quantize_bounds=(west, south, east, north), extents=4096, default_options={"y_coord_down": False}, )