Backend:
- Миграция gps_tracks_001_init.sql: таблицы tracks + pipeline_runs
- Пакет src/api/gps_tracks/: models, db (WAL+upsert с dedup), dedup
(bbox+length+date bucket-hash), mvt (LRU-кэш 1024 тайла), endpoint
(GET /api/gps-tracks, GET /api/gps-tracks/tiles/{z}/{x}/{y}.mvt,
GET /api/gps-tracks/health, POST /api/gps-tracks/cache/clear), config
- Парсеры: osm (split_bbox, haversine, defusedxml XXE-защита),
enduro_russia + ttrails — заглушки (ADR-010/011 proposed, блокированы)
- Licensing guard: pipeline проверяет status ADR-файла до запуска источника
- scripts/gps_collect.py: CLI с --region/--source/--dry-run/--gc
Frontend:
- src/web/gps_tracks.js: двухрежимный слой (MVT z≤11, GeoJSON z≥12),
debounced fetch + AbortController, фильтры активности/источника,
цветовая палитра by-source/by-activity, halo на спутнике, popup трека,
restorePublicTracksState(), localStorage persistence
- index.html: чекбокс «Публичные треки» в terrain-popup, #sheet-gps-filters
- app.css: .terrain-link-btn, .gps-filter-grid, .track-popup
- app.js: вызов restorePublicTracksState() в rebuildMapOverlays(),
applyGpsHaloVisibility() в applyBaseLayer()
Конфиги:
- config/gps_sources.yaml: osm (enabled), enduro_russia/ttrails (disabled)
- config/gps_regions.yaml: ЦФО+Чувашия (enabled), Кавказ (disabled)
Docker:
- gps-collector service с profiles: [batch]
Тесты: 48 новых тестов (unit + integration), 125/125 pass
Refs: ET-008
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
367 lines
12 KiB
Python
367 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""CLI pipeline для сбора GPS-треков из публичных источников (ET-008).
|
||
|
||
Usage:
|
||
python scripts/gps_collect.py [--region <id>] [--source <id>] [--dry-run] [--gc]
|
||
|
||
Exit code: 0 (success) or 1 (any error/skip)
|
||
"""
|
||
import argparse
|
||
import asyncio
|
||
import importlib
|
||
import json
|
||
import logging
|
||
import os
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
|
||
# Добавляем корень проекта в PYTHONPATH
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||
|
||
from src.api.gps_tracks.config import load_regions_config, load_sources_config
|
||
from src.api.gps_tracks.db import init_db, open_db, upsert_track
|
||
from src.api.gps_tracks.dedup import compute_dedup_key
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||
stream=sys.stderr,
|
||
)
|
||
logger = logging.getLogger("gps_collect")
|
||
|
||
|
||
def _now_iso() -> str:
|
||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
|
||
def _check_license_adr(adr_path: str, project_root: str) -> str:
|
||
"""Читает ADR файл и возвращает статус ('accepted', 'proposed', ...).
|
||
|
||
Returns:
|
||
str статус или 'unknown' если файл не найден/не парсится
|
||
"""
|
||
full_path = os.path.join(project_root, adr_path)
|
||
if not os.path.exists(full_path):
|
||
logger.warning("ADR file not found: %s", full_path)
|
||
return "unknown"
|
||
|
||
try:
|
||
import yaml
|
||
|
||
with open(full_path, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
# Ищем YAML front-matter или поле status
|
||
if content.startswith("---"):
|
||
parts = content.split("---", 2)
|
||
if len(parts) >= 3:
|
||
front_matter = yaml.safe_load(parts[1])
|
||
if isinstance(front_matter, dict) and "status" in front_matter:
|
||
return str(front_matter["status"]).lower()
|
||
|
||
# Fallback: ищем строку "status: <value>"
|
||
for line in content.splitlines():
|
||
stripped = line.strip().lower()
|
||
if stripped.startswith("status:"):
|
||
value = stripped.split(":", 1)[1].strip()
|
||
return value
|
||
|
||
return "unknown"
|
||
except Exception as exc:
|
||
logger.warning("Failed to parse ADR %s: %s", adr_path, exc)
|
||
return "unknown"
|
||
|
||
|
||
def _record_pipeline_run(
|
||
conn,
|
||
region_id: str,
|
||
source_id: str,
|
||
started_at: str,
|
||
finished_at: str,
|
||
status: str,
|
||
tracks_new: int = 0,
|
||
tracks_updated: int = 0,
|
||
errors: list = None,
|
||
) -> None:
|
||
"""Записывает результат запуска pipeline в БД."""
|
||
errors_json = json.dumps(errors) if errors else None
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO pipeline_runs
|
||
(started_at, finished_at, region_id, source_id, status,
|
||
tracks_new, tracks_updated, errors_json)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
started_at,
|
||
finished_at,
|
||
region_id,
|
||
source_id,
|
||
status,
|
||
tracks_new,
|
||
tracks_updated,
|
||
errors_json,
|
||
),
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
async def _collect_source_for_region(
|
||
region: dict,
|
||
source_cfg: dict,
|
||
conn,
|
||
dry_run: bool,
|
||
) -> dict:
|
||
"""Запускает сбор треков для одного (region, source).
|
||
|
||
Returns:
|
||
dict с ключами: status, tracks_new, tracks_updated, errors
|
||
"""
|
||
source_id = source_cfg["id"]
|
||
region_id = region["id"]
|
||
bbox = tuple(region["bbox"]) # (west, south, east, north)
|
||
|
||
parser_module_path = source_cfg.get("parser_module", "")
|
||
if not parser_module_path:
|
||
return {"status": "error", "tracks_new": 0, "tracks_updated": 0, "errors": ["No parser_module"]}
|
||
|
||
try:
|
||
module = importlib.import_module(parser_module_path)
|
||
# Конвенция: класс называется <CamelCase>Parser
|
||
class_name = source_id.replace("_", " ").title().replace(" ", "") + "Parser"
|
||
parser_class = getattr(module, class_name, None)
|
||
if parser_class is None:
|
||
# Fallback: первый класс с суффиксом Parser
|
||
for name in dir(module):
|
||
if name.endswith("Parser") and name != "SourceParser":
|
||
parser_class = getattr(module, name)
|
||
break
|
||
|
||
if parser_class is None:
|
||
return {
|
||
"status": "error",
|
||
"tracks_new": 0,
|
||
"tracks_updated": 0,
|
||
"errors": [f"Parser class not found in {parser_module_path}"],
|
||
}
|
||
|
||
parser = parser_class(source_cfg)
|
||
except Exception as exc:
|
||
return {
|
||
"status": "error",
|
||
"tracks_new": 0,
|
||
"tracks_updated": 0,
|
||
"errors": [f"Failed to load parser: {exc}"],
|
||
}
|
||
|
||
tracks_new = 0
|
||
tracks_updated = 0
|
||
errors = []
|
||
source_priority = source_cfg.get("source_priority", 50)
|
||
|
||
try:
|
||
async for track in parser.collect(bbox, {"dry_run": dry_run, "conn": conn}):
|
||
if dry_run:
|
||
logger.info("[dry-run] Would upsert track from %s: %s", source_id, track.external_id)
|
||
tracks_new += 1
|
||
continue
|
||
|
||
try:
|
||
dedup_key = compute_dedup_key(
|
||
(track.min_lon, track.min_lat, track.max_lon, track.max_lat),
|
||
{"length_m": track.length_m, "created_at": track.created_at},
|
||
)
|
||
result = upsert_track(conn, track, dedup_key, source_priority)
|
||
if result == "inserted":
|
||
tracks_new += 1
|
||
else:
|
||
tracks_updated += 1
|
||
except Exception as exc:
|
||
errors.append(f"upsert error for {track.external_id}: {exc}")
|
||
logger.error("Upsert error: %s", exc)
|
||
except NotImplementedError as exc:
|
||
return {
|
||
"status": "error",
|
||
"tracks_new": 0,
|
||
"tracks_updated": 0,
|
||
"errors": [str(exc)],
|
||
}
|
||
except Exception as exc:
|
||
errors.append(str(exc))
|
||
logger.error("Collect error for %s/%s: %s", region_id, source_id, exc)
|
||
return {
|
||
"status": "error",
|
||
"tracks_new": tracks_new,
|
||
"tracks_updated": tracks_updated,
|
||
"errors": errors,
|
||
}
|
||
|
||
status = "ok" if not errors else "partial"
|
||
return {
|
||
"status": status,
|
||
"tracks_new": tracks_new,
|
||
"tracks_updated": tracks_updated,
|
||
"errors": errors,
|
||
}
|
||
|
||
|
||
async def main() -> int:
|
||
"""Главная функция pipeline сбора GPS-треков."""
|
||
parser = argparse.ArgumentParser(description="GPS tracks collection pipeline")
|
||
parser.add_argument("--region", help="Region ID to process (all if not set)")
|
||
parser.add_argument("--source", help="Source ID to process (all if not set)")
|
||
parser.add_argument("--dry-run", action="store_true", help="Simulate without writing to DB")
|
||
parser.add_argument("--gc", action="store_true", help="Run garbage collection after each region")
|
||
args = parser.parse_args()
|
||
|
||
project_root = os.path.join(os.path.dirname(__file__), "..")
|
||
|
||
sources_config_path = os.environ.get(
|
||
"GPS_SOURCES_CONFIG",
|
||
os.path.join(project_root, "config/gps_sources.yaml"),
|
||
)
|
||
regions_config_path = os.environ.get(
|
||
"GPS_REGIONS_CONFIG",
|
||
os.path.join(project_root, "config/gps_regions.yaml"),
|
||
)
|
||
db_path = os.environ.get(
|
||
"GPS_TRACKS_DB_PATH",
|
||
os.path.join(project_root, "data/gps_tracks.sqlite"),
|
||
)
|
||
|
||
# Загружаем конфигурации
|
||
try:
|
||
sources = load_sources_config(sources_config_path)
|
||
regions = load_regions_config(regions_config_path)
|
||
except Exception as exc:
|
||
logger.error("Failed to load config: %s", exc)
|
||
return 1
|
||
|
||
# Фильтруем по параметрам CLI
|
||
if args.region:
|
||
regions = [r for r in regions if r["id"] == args.region]
|
||
if not regions:
|
||
logger.error("Region '%s' not found", args.region)
|
||
return 1
|
||
|
||
if args.source:
|
||
sources = [s for s in sources if s["id"] == args.source]
|
||
if not sources:
|
||
logger.error("Source '%s' not found", args.source)
|
||
return 1
|
||
|
||
# Открываем БД
|
||
try:
|
||
conn = open_db(db_path)
|
||
init_db(conn)
|
||
except Exception as exc:
|
||
logger.error("Failed to open DB: %s", exc)
|
||
return 1
|
||
|
||
# Строим индекс источников по id
|
||
sources_by_id = {s["id"]: s for s in sources}
|
||
|
||
has_error = False
|
||
|
||
for region in regions:
|
||
if not region.get("enabled", True):
|
||
logger.info("Skipping disabled region: %s", region["id"])
|
||
continue
|
||
|
||
region_sources = region.get("sources", [])
|
||
|
||
for source_id in region_sources:
|
||
if source_id not in sources_by_id:
|
||
logger.warning("Source '%s' not found in sources config", source_id)
|
||
continue
|
||
|
||
source_cfg = sources_by_id[source_id]
|
||
|
||
# Фильтр по --source
|
||
if args.source and source_cfg["id"] != args.source:
|
||
continue
|
||
|
||
if not source_cfg.get("enabled", False):
|
||
logger.info("Skipping disabled source: %s", source_id)
|
||
started_at = _now_iso()
|
||
_record_pipeline_run(
|
||
conn,
|
||
region["id"],
|
||
source_id,
|
||
started_at,
|
||
_now_iso(),
|
||
"skipped_disabled",
|
||
)
|
||
continue
|
||
|
||
# Проверяем лицензию
|
||
license_adr = source_cfg.get("license_adr", "")
|
||
started_at = _now_iso()
|
||
|
||
if license_adr:
|
||
license_status = _check_license_adr(license_adr, project_root)
|
||
if license_status != "accepted":
|
||
logger.warning(
|
||
"Skipping %s/%s: license ADR status is '%s' (need 'accepted')",
|
||
region["id"],
|
||
source_id,
|
||
license_status,
|
||
)
|
||
_record_pipeline_run(
|
||
conn,
|
||
region["id"],
|
||
source_id,
|
||
started_at,
|
||
_now_iso(),
|
||
"skipped_license",
|
||
)
|
||
has_error = True
|
||
continue
|
||
|
||
logger.info(
|
||
"Collecting %s for region %s (bbox=%s)",
|
||
source_id,
|
||
region["id"],
|
||
region["bbox"],
|
||
)
|
||
|
||
result = await _collect_source_for_region(region, source_cfg, conn, args.dry_run)
|
||
|
||
finished_at = _now_iso()
|
||
_record_pipeline_run(
|
||
conn,
|
||
region["id"],
|
||
source_id,
|
||
started_at,
|
||
finished_at,
|
||
result["status"],
|
||
result["tracks_new"],
|
||
result["tracks_updated"],
|
||
result["errors"] or None,
|
||
)
|
||
|
||
logger.info(
|
||
"Done %s/%s: status=%s new=%d updated=%d errors=%d",
|
||
region["id"],
|
||
source_id,
|
||
result["status"],
|
||
result["tracks_new"],
|
||
result["tracks_updated"],
|
||
len(result["errors"]),
|
||
)
|
||
|
||
if result["status"] in ("error",):
|
||
has_error = True
|
||
|
||
if args.gc:
|
||
import gc
|
||
gc.collect()
|
||
logger.info("GC collected after region %s", region["id"])
|
||
|
||
conn.close()
|
||
return 1 if has_error else 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(asyncio.run(main()))
|