Files
wiki/tasks/enduro-trails/scripts/parse.py
2026-05-02 08:30:02 +03:00

320 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
parse.py — парсинг OSM PBF → Spatialite для Enduro Trails
Использует osmium export → GeoJSONSeq → построчный парсинг (низкое потребление памяти)
"""
import os
import sys
import json
import math
import sqlite3
import argparse
import subprocess
import tempfile
# ─── Константы ────────────────────────────────────────────────────────────────
HIGHWAY_TYPES = {"track", "path", "bridleway", "cycleway", "footway"}
EARTH_RADIUS_M = 6_371_000.0
def haversine_length(coords):
"""Длина ломаной в метрах по списку [lon, lat] пар."""
total = 0.0
for i in range(len(coords) - 1):
lon1, lat1 = math.radians(coords[i][0]), math.radians(coords[i][1])
lon2, lat2 = math.radians(coords[i+1][0]), math.radians(coords[i+1][1])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1)*math.cos(lat2)*math.sin(dlon/2)**2
total += 2 * EARTH_RADIUS_M * math.asin(math.sqrt(a))
return total
def open_db(db_path):
"""Открывает SQLite с попыткой загрузить Spatialite."""
conn = sqlite3.connect(db_path)
conn.enable_load_extension(True)
has_spatialite = False
for path in ["mod_spatialite",
"/usr/lib/x86_64-linux-gnu/mod_spatialite.so",
"/usr/lib/mod_spatialite.so",
"/usr/local/lib/mod_spatialite.so"]:
try:
conn.load_extension(path)
has_spatialite = True
print(f" Spatialite: {path}")
break
except Exception:
continue
if not has_spatialite:
print(" WARNING: mod_spatialite не найден — без пространственных индексов")
conn.enable_load_extension(False)
return conn, has_spatialite
def init_db(conn, has_spatialite):
cur = conn.cursor()
if has_spatialite:
try:
cur.execute("SELECT InitSpatialMetaData(1)")
except Exception:
pass
cur.executescript("""
DROP TABLE IF EXISTS trails;
CREATE TABLE trails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
osm_id INTEGER NOT NULL,
highway_type TEXT,
track_type TEXT,
surface TEXT,
name TEXT,
length_m REAL,
mtb_scale TEXT,
visibility TEXT,
smoothness TEXT,
access TEXT,
tags TEXT,
geom BLOB
);
CREATE INDEX IF NOT EXISTS idx_trails_highway ON trails(highway_type);
CREATE INDEX IF NOT EXISTS idx_trails_surface ON trails(surface);
DROP TABLE IF EXISTS poi;
CREATE TABLE poi (
id INTEGER PRIMARY KEY AUTOINCREMENT,
osm_id INTEGER NOT NULL,
poi_type TEXT,
name TEXT,
geom BLOB
);
CREATE INDEX IF NOT EXISTS idx_poi_type ON poi(poi_type);
""")
conn.commit()
def coords_to_wkb_linestring(coords):
"""Конвертирует список [lon, lat] в WKB LineString (little-endian, SRID=4326)."""
import struct
n = len(coords)
# WKB: byte order (1) + type (2=LineString, with SRID flag 0x20000000) + SRID + num_points + points
buf = struct.pack('<B', 1) # little endian
buf += struct.pack('<I', 0x20000002) # LineString with SRID
buf += struct.pack('<I', 4326) # SRID
buf += struct.pack('<I', n)
for lon, lat in coords:
buf += struct.pack('<dd', lon, lat)
return buf
def coords_to_wkb_point(lon, lat):
"""Конвертирует lon/lat в WKB Point (little-endian, SRID=4326)."""
import struct
buf = struct.pack('<B', 1)
buf += struct.pack('<I', 0x20000001) # Point with SRID
buf += struct.pack('<I', 4326)
buf += struct.pack('<dd', lon, lat)
return buf
def export_to_geojsonseq(pbf_path, output_path):
"""Запускает osmium export для конвертации PBF → GeoJSONSeq."""
print(f" osmium export: {pbf_path}{output_path}")
cmd = [
"osmium", "export",
"--geometry-types=linestring,point",
"--output-format=geojsonseq",
"--overwrite",
"-o", output_path,
pbf_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" ERROR osmium export: {result.stderr}")
sys.exit(1)
print(f" osmium export завершён")
def parse_geojsonseq(geojson_path, conn):
"""Построчно читает GeoJSONSeq и вставляет в БД."""
cur = conn.cursor()
trails_count = 0
poi_count = 0
batch_trails = []
batch_poi = []
BATCH_SIZE = 500
def flush_trails():
nonlocal trails_count
if batch_trails:
cur.executemany("""
INSERT INTO trails
(osm_id, highway_type, track_type, surface, name, length_m,
mtb_scale, visibility, smoothness, access, tags, geom)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
""", batch_trails)
conn.commit()
trails_count += len(batch_trails)
batch_trails.clear()
print(f" trails: {trails_count}", end="\r", flush=True)
def flush_poi():
nonlocal poi_count
if batch_poi:
cur.executemany("""
INSERT INTO poi (osm_id, poi_type, name, geom)
VALUES (?,?,?,?)
""", batch_poi)
conn.commit()
poi_count += len(batch_poi)
batch_poi.clear()
print(f" poi: {poi_count}", end="\r", flush=True)
with open(geojson_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
feat = json.loads(line)
except Exception:
continue
geom = feat.get("geometry", {})
props = feat.get("properties", {}) or {}
osm_id = feat.get("id", 0)
# osmium export id format: "w123456789" or "n123456789"
if isinstance(osm_id, str):
osm_id = int(osm_id[1:]) if osm_id and osm_id[0] in "wnr" else 0
geom_type = geom.get("type", "")
# ── Trails (LineString) ──
if geom_type == "LineString":
hw = props.get("highway", "")
if hw not in HIGHWAY_TYPES:
continue
coords = geom.get("coordinates", [])
if len(coords) < 2:
continue
length_m = haversine_length(coords)
wkb = coords_to_wkb_linestring(coords)
extra = {k: v for k, v in props.items()
if k not in ("highway", "tracktype", "surface", "name",
"mtb:scale", "trail_visibility", "smoothness", "access")}
batch_trails.append((
osm_id,
hw,
props.get("tracktype"),
props.get("surface"),
props.get("name"),
length_m,
props.get("mtb:scale"),
props.get("trail_visibility"),
props.get("smoothness"),
props.get("access"),
json.dumps(extra, ensure_ascii=False),
wkb,
))
if len(batch_trails) >= BATCH_SIZE:
flush_trails()
# ── POI (Point) ──
elif geom_type == "Point":
poi_type = None
if props.get("natural") in ("water", "peak", "cave_entrance"):
poi_type = f"natural={props['natural']}"
elif props.get("tourism") == "viewpoint":
poi_type = "tourism=viewpoint"
elif props.get("historic") == "ruins":
poi_type = "historic=ruins"
elif props.get("ford") == "yes":
poi_type = "ford=yes"
elif props.get("abandoned"):
poi_type = "abandoned"
if not poi_type:
continue
coords = geom.get("coordinates", [])
if len(coords) < 2:
continue
wkb = coords_to_wkb_point(coords[0], coords[1])
batch_poi.append((
osm_id,
poi_type,
props.get("name"),
wkb,
))
if len(batch_poi) >= BATCH_SIZE:
flush_poi()
flush_trails()
flush_poi()
print(f"\n Итого trails: {trails_count}, poi: {poi_count}")
return trails_count, poi_count
def create_spatial_indexes(conn, has_spatialite):
if not has_spatialite:
return
cur = conn.cursor()
for table, col in [("trails", "geom"), ("poi", "geom")]:
try:
cur.execute(f"SELECT CreateSpatialIndex('{table}', '{col}')")
conn.commit()
print(f" Пространственный индекс {table} создан")
except Exception as e:
print(f" WARNING индекс {table}: {e}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--pbf", default="/data/region.osm.pbf")
parser.add_argument("--db", default="/data/centralfederal.sqlite")
args = parser.parse_args()
pbf_path = os.path.abspath(args.pbf)
db_path = os.path.abspath(args.db)
if not os.path.exists(pbf_path):
print(f"ERROR: PBF не найден: {pbf_path}")
sys.exit(1)
print(f"==> PBF: {pbf_path} ({os.path.getsize(pbf_path) // 1024 // 1024} МБ)")
# Экспортируем в GeoJSONSeq
geojson_path = db_path.replace(".sqlite", ".geojsonseq")
print("==> Конвертируем PBF → GeoJSONSeq (osmium export)...")
export_to_geojsonseq(pbf_path, geojson_path)
print(f"==> Открываем БД: {db_path}")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn, has_spatialite = open_db(db_path)
print("==> Инициализируем схему...")
init_db(conn, has_spatialite)
print("==> Парсим GeoJSONSeq построчно...")
parse_geojsonseq(geojson_path, conn)
print("==> Создаём пространственные индексы...")
create_spatial_indexes(conn, has_spatialite)
conn.close()
# Удаляем временный GeoJSONSeq
try:
os.remove(geojson_path)
except Exception:
pass
print(f"\n✓ Готово! БД: {db_path} ({os.path.getsize(db_path) // 1024 // 1024} МБ)")
if __name__ == "__main__":
main()