Files
wiki/tasks/enduro-trails/scripts/parse.py
2026-05-02 07:50:01 +03:00

376 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
parse.py — парсинг OSM PBF → Spatialite для Enduro Trails
Читает region.osm.pbf, сохраняет trails и POI в centralfederal.sqlite
"""
import os
import sys
import json
import math
import sqlite3
import argparse
try:
import osmium
except ImportError:
print("ERROR: python-osmium не установлен. pip install python-osmium")
sys.exit(1)
try:
# pysqlite3-binary предоставляет sqlite3 с поддержкой расширений
import pysqlite3 as sqlite3_ext
HAS_PYSQLITE3 = True
except ImportError:
HAS_PYSQLITE3 = False
sqlite3_ext = sqlite3
from shapely.geometry import LineString, Point
from shapely import wkb as shapely_wkb
# ─── Константы ────────────────────────────────────────────────────────────────
HIGHWAY_TYPES = {"track", "path", "bridleway", "cycleway", "footway"}
POI_FILTERS = {
"natural": {"water", "peak", "cave_entrance"},
"tourism": {"viewpoint"},
"historic": {"ruins"},
"ford": {"yes"},
}
EARTH_RADIUS_M = 6_371_000.0
# ─── Утилиты ──────────────────────────────────────────────────────────────────
def haversine_length(coords):
"""Длина ломаной в метрах по списку (lon, lat) пар."""
total = 0.0
for i in range(len(coords) - 1):
lon1, lat1 = math.radians(coords[i][0]), math.radians(coords[i][1])
lon2, lat2 = math.radians(coords[i+1][0]), math.radians(coords[i+1][1])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1)*math.cos(lat2)*math.sin(dlon/2)**2
total += 2 * EARTH_RADIUS_M * math.asin(math.sqrt(a))
return total
def geom_to_wkb_hex(geom):
"""Shapely geometry → WKB hex string для Spatialite."""
return shapely_wkb.dumps(geom, hex=True)
# ─── OSM Handlers ─────────────────────────────────────────────────────────────
class TrailHandler(osmium.SimpleHandler):
"""Собирает highway=track/path/... из OSM."""
def __init__(self):
super().__init__()
self.trails = []
def way(self, w):
tags = w.tags
hw = tags.get("highway", "")
if hw not in HIGHWAY_TYPES:
return
try:
coords = [(n.lon, n.lat) for n in w.nodes if n.location.valid()]
except Exception:
return
if len(coords) < 2:
return
length_m = haversine_length(coords)
geom = LineString(coords)
extra_tags = {}
for tag in w.tags:
extra_tags[tag.k] = tag.v
self.trails.append({
"osm_id": w.id,
"highway_type": hw,
"track_type": tags.get("tracktype", None),
"surface": tags.get("surface", None),
"name": tags.get("name", None),
"length_m": length_m,
"mtb_scale": tags.get("mtb:scale", None),
"visibility": tags.get("trail_visibility", None),
"smoothness": tags.get("smoothness", None),
"access": tags.get("access", None),
"tags": json.dumps(extra_tags, ensure_ascii=False),
"geom_wkb": geom_to_wkb_hex(geom),
})
class POIHandler(osmium.SimpleHandler):
"""Собирает POI: вершины, родники, смотровые и т.д."""
def __init__(self):
super().__init__()
self.pois = []
def _check_tags(self, tags):
"""Возвращает poi_type если тег совпадает с фильтром."""
for key, values in POI_FILTERS.items():
val = tags.get(key, "")
if val in values:
return f"{key}={val}"
return None
def node(self, n):
poi_type = self._check_tags(n.tags)
if not poi_type:
return
if not n.location.valid():
return
geom = Point(n.location.lon, n.location.lat)
self.pois.append({
"osm_id": n.id,
"poi_type": poi_type,
"name": n.tags.get("name", None),
"geom_wkb": geom_to_wkb_hex(geom),
})
def way(self, w):
"""Для водоёмов-полигонов берём центроид."""
poi_type = self._check_tags(w.tags)
if not poi_type:
return
try:
coords = [(n.lon, n.lat) for n in w.nodes if n.location.valid()]
except Exception:
return
if len(coords) < 2:
return
geom = LineString(coords).centroid
self.pois.append({
"osm_id": w.id,
"poi_type": poi_type,
"name": w.tags.get("name", None),
"geom_wkb": geom_to_wkb_hex(geom),
})
# ─── Spatialite ───────────────────────────────────────────────────────────────
def open_spatialite(db_path):
"""Открывает соединение с Spatialite, загружает расширение."""
conn = sqlite3_ext.connect(db_path)
conn.enable_load_extension(True)
# Пробуем разные пути к mod_spatialite
spatialite_paths = [
"mod_spatialite",
"/usr/lib/x86_64-linux-gnu/mod_spatialite.so",
"/usr/lib/mod_spatialite.so",
"/usr/local/lib/mod_spatialite.so",
]
loaded = False
for path in spatialite_paths:
try:
conn.load_extension(path)
loaded = True
print(f" Spatialite загружен: {path}")
break
except Exception:
continue
if not loaded:
print("WARNING: mod_spatialite не найден — геометрия будет храниться как WKB blob без пространственных индексов")
return conn, loaded
def init_db(conn, has_spatialite):
"""Создаёт таблицы и индексы."""
cur = conn.cursor()
if has_spatialite:
cur.execute("SELECT InitSpatialMetaData(1)")
cur.executescript("""
DROP TABLE IF EXISTS trails;
CREATE TABLE trails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
osm_id INTEGER NOT NULL,
highway_type TEXT,
track_type TEXT,
surface TEXT,
name TEXT,
length_m REAL,
mtb_scale TEXT,
visibility TEXT,
smoothness TEXT,
access TEXT,
tags TEXT,
geom GEOMETRY
);
DROP TABLE IF EXISTS poi;
CREATE TABLE poi (
id INTEGER PRIMARY KEY AUTOINCREMENT,
osm_id INTEGER NOT NULL,
poi_type TEXT,
name TEXT,
geom GEOMETRY
);
""")
if has_spatialite:
try:
cur.execute("SELECT AddGeometryColumn('trails', 'geom', 4326, 'LINESTRING', 'XY')")
except Exception:
pass # колонка уже добавлена через CREATE TABLE
try:
cur.execute("SELECT AddGeometryColumn('poi', 'geom', 4326, 'POINT', 'XY')")
except Exception:
pass
cur.executescript("""
CREATE INDEX IF NOT EXISTS idx_trails_highway ON trails(highway_type);
CREATE INDEX IF NOT EXISTS idx_trails_surface ON trails(surface);
CREATE INDEX IF NOT EXISTS idx_poi_type ON poi(poi_type);
""")
conn.commit()
def insert_trails(conn, trails, has_spatialite):
cur = conn.cursor()
batch = []
for t in trails:
if has_spatialite:
geom_expr = f"GeomFromWKB(x'{t['geom_wkb']}', 4326)"
else:
geom_expr = f"x'{t['geom_wkb']}'"
batch.append((
t["osm_id"], t["highway_type"], t["track_type"], t["surface"],
t["name"], t["length_m"], t["mtb_scale"], t["visibility"],
t["smoothness"], t["access"], t["tags"],
))
# Вставляем батчами по 1000
BATCH = 1000
for i in range(0, len(trails), BATCH):
chunk = trails[i:i+BATCH]
for t in chunk:
cur.execute("""
INSERT INTO trails
(osm_id, highway_type, track_type, surface, name, length_m,
mtb_scale, visibility, smoothness, access, tags, geom)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
""", (
t["osm_id"], t["highway_type"], t["track_type"], t["surface"],
t["name"], t["length_m"], t["mtb_scale"], t["visibility"],
t["smoothness"], t["access"], t["tags"],
bytes.fromhex(t["geom_wkb"]),
))
conn.commit()
print(f" trails: вставлено {min(i+BATCH, len(trails))}/{len(trails)}")
def insert_pois(conn, pois):
cur = conn.cursor()
BATCH = 1000
for i in range(0, len(pois), BATCH):
chunk = pois[i:i+BATCH]
for p in chunk:
cur.execute("""
INSERT INTO poi (osm_id, poi_type, name, geom)
VALUES (?,?,?,?)
""", (
p["osm_id"], p["poi_type"], p["name"],
bytes.fromhex(p["geom_wkb"]),
))
conn.commit()
print(f" poi: вставлено {min(i+BATCH, len(pois))}/{len(pois)}")
def create_spatial_indexes(conn, has_spatialite):
if not has_spatialite:
return
cur = conn.cursor()
try:
cur.execute("SELECT CreateSpatialIndex('trails', 'geom')")
conn.commit()
print(" Пространственный индекс trails создан")
except Exception as e:
print(f" WARNING: индекс trails: {e}")
try:
cur.execute("SELECT CreateSpatialIndex('poi', 'geom')")
conn.commit()
print(" Пространственный индекс poi создан")
except Exception as e:
print(f" WARNING: индекс poi: {e}")
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Парсинг OSM PBF → Spatialite")
parser.add_argument(
"--pbf",
default=os.path.join(os.path.dirname(__file__), "../data/region.osm.pbf"),
help="Путь к PBF файлу",
)
parser.add_argument(
"--db",
default=os.path.join(os.path.dirname(__file__), "../data/centralfederal.sqlite"),
help="Путь к выходному SQLite/Spatialite файлу",
)
args = parser.parse_args()
pbf_path = os.path.abspath(args.pbf)
db_path = os.path.abspath(args.db)
if not os.path.exists(pbf_path):
print(f"ERROR: PBF файл не найден: {pbf_path}")
print("Сначала запустите scripts/download.sh")
sys.exit(1)
print(f"==> Читаем PBF: {pbf_path}")
print(" Парсим дороги...")
trail_handler = TrailHandler()
trail_handler.apply_file(pbf_path, locations=True)
print(f" Найдено дорог: {len(trail_handler.trails)}")
print(" Парсим POI...")
poi_handler = POIHandler()
poi_handler.apply_file(pbf_path, locations=True)
print(f" Найдено POI: {len(poi_handler.pois)}")
print(f"==> Открываем БД: {db_path}")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn, has_spatialite = open_spatialite(db_path)
print("==> Инициализируем схему...")
init_db(conn, has_spatialite)
print("==> Вставляем дороги...")
insert_trails(conn, trail_handler.trails, has_spatialite)
print("==> Вставляем POI...")
insert_pois(conn, poi_handler.pois)
print("==> Создаём пространственные индексы...")
create_spatial_indexes(conn, has_spatialite)
conn.close()
print(f"\n✓ Готово! БД сохранена: {db_path}")
if __name__ == "__main__":
main()