Files
enduro-trails/scripts/gps_collect.py
claude-bot 0060003f28
Some checks failed
CI / lint (push) Failing after 4s
CI / test (push) Failing after 4s
CI / build (push) Has been skipped
CI / lint (pull_request) Failing after 4s
CI / test (pull_request) Failing after 4s
CI / build (pull_request) Has been skipped
feat(gps-tracks): ET-008 публичные GPS-треки с публичных платформ
Backend:
- Миграция gps_tracks_001_init.sql: таблицы tracks + pipeline_runs
- Пакет src/api/gps_tracks/: models, db (WAL+upsert с dedup), dedup
  (bbox+length+date bucket-hash), mvt (LRU-кэш 1024 тайла), endpoint
  (GET /api/gps-tracks, GET /api/gps-tracks/tiles/{z}/{x}/{y}.mvt,
   GET /api/gps-tracks/health, POST /api/gps-tracks/cache/clear), config
- Парсеры: osm (split_bbox, haversine, defusedxml XXE-защита),
  enduro_russia + ttrails — заглушки (ADR-010/011 proposed, блокированы)
- Licensing guard: pipeline проверяет status ADR-файла до запуска источника
- scripts/gps_collect.py: CLI с --region/--source/--dry-run/--gc

Frontend:
- src/web/gps_tracks.js: двухрежимный слой (MVT z≤11, GeoJSON z≥12),
  debounced fetch + AbortController, фильтры активности/источника,
  цветовая палитра by-source/by-activity, halo на спутнике, popup трека,
  restorePublicTracksState(), localStorage persistence
- index.html: чекбокс «Публичные треки» в terrain-popup, #sheet-gps-filters
- app.css: .terrain-link-btn, .gps-filter-grid, .track-popup
- app.js: вызов restorePublicTracksState() в rebuildMapOverlays(),
  applyGpsHaloVisibility() в applyBaseLayer()

Конфиги:
- config/gps_sources.yaml: osm (enabled), enduro_russia/ttrails (disabled)
- config/gps_regions.yaml: ЦФО+Чувашия (enabled), Кавказ (disabled)

Docker:
- gps-collector service с profiles: [batch]

Тесты: 48 новых тестов (unit + integration), 125/125 pass

Refs: ET-008

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 12:28:54 +00:00

367 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""CLI pipeline для сбора GPS-треков из публичных источников (ET-008).
Usage:
python scripts/gps_collect.py [--region <id>] [--source <id>] [--dry-run] [--gc]
Exit code: 0 (success) or 1 (any error/skip)
"""
import argparse
import asyncio
import importlib
import json
import logging
import os
import sys
from datetime import datetime, timezone
# Добавляем корень проекта в PYTHONPATH
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from src.api.gps_tracks.config import load_regions_config, load_sources_config
from src.api.gps_tracks.db import init_db, open_db, upsert_track
from src.api.gps_tracks.dedup import compute_dedup_key
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger("gps_collect")
def _now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _check_license_adr(adr_path: str, project_root: str) -> str:
"""Читает ADR файл и возвращает статус ('accepted', 'proposed', ...).
Returns:
str статус или 'unknown' если файл не найден/не парсится
"""
full_path = os.path.join(project_root, adr_path)
if not os.path.exists(full_path):
logger.warning("ADR file not found: %s", full_path)
return "unknown"
try:
import yaml
with open(full_path, "r", encoding="utf-8") as f:
content = f.read()
# Ищем YAML front-matter или поле status
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
front_matter = yaml.safe_load(parts[1])
if isinstance(front_matter, dict) and "status" in front_matter:
return str(front_matter["status"]).lower()
# Fallback: ищем строку "status: <value>"
for line in content.splitlines():
stripped = line.strip().lower()
if stripped.startswith("status:"):
value = stripped.split(":", 1)[1].strip()
return value
return "unknown"
except Exception as exc:
logger.warning("Failed to parse ADR %s: %s", adr_path, exc)
return "unknown"
def _record_pipeline_run(
conn,
region_id: str,
source_id: str,
started_at: str,
finished_at: str,
status: str,
tracks_new: int = 0,
tracks_updated: int = 0,
errors: list = None,
) -> None:
"""Записывает результат запуска pipeline в БД."""
errors_json = json.dumps(errors) if errors else None
conn.execute(
"""
INSERT INTO pipeline_runs
(started_at, finished_at, region_id, source_id, status,
tracks_new, tracks_updated, errors_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
started_at,
finished_at,
region_id,
source_id,
status,
tracks_new,
tracks_updated,
errors_json,
),
)
conn.commit()
async def _collect_source_for_region(
region: dict,
source_cfg: dict,
conn,
dry_run: bool,
) -> dict:
"""Запускает сбор треков для одного (region, source).
Returns:
dict с ключами: status, tracks_new, tracks_updated, errors
"""
source_id = source_cfg["id"]
region_id = region["id"]
bbox = tuple(region["bbox"]) # (west, south, east, north)
parser_module_path = source_cfg.get("parser_module", "")
if not parser_module_path:
return {"status": "error", "tracks_new": 0, "tracks_updated": 0, "errors": ["No parser_module"]}
try:
module = importlib.import_module(parser_module_path)
# Конвенция: класс называется <CamelCase>Parser
class_name = source_id.replace("_", " ").title().replace(" ", "") + "Parser"
parser_class = getattr(module, class_name, None)
if parser_class is None:
# Fallback: первый класс с суффиксом Parser
for name in dir(module):
if name.endswith("Parser") and name != "SourceParser":
parser_class = getattr(module, name)
break
if parser_class is None:
return {
"status": "error",
"tracks_new": 0,
"tracks_updated": 0,
"errors": [f"Parser class not found in {parser_module_path}"],
}
parser = parser_class(source_cfg)
except Exception as exc:
return {
"status": "error",
"tracks_new": 0,
"tracks_updated": 0,
"errors": [f"Failed to load parser: {exc}"],
}
tracks_new = 0
tracks_updated = 0
errors = []
source_priority = source_cfg.get("source_priority", 50)
try:
async for track in parser.collect(bbox, {"dry_run": dry_run, "conn": conn}):
if dry_run:
logger.info("[dry-run] Would upsert track from %s: %s", source_id, track.external_id)
tracks_new += 1
continue
try:
dedup_key = compute_dedup_key(
(track.min_lon, track.min_lat, track.max_lon, track.max_lat),
{"length_m": track.length_m, "created_at": track.created_at},
)
result = upsert_track(conn, track, dedup_key, source_priority)
if result == "inserted":
tracks_new += 1
else:
tracks_updated += 1
except Exception as exc:
errors.append(f"upsert error for {track.external_id}: {exc}")
logger.error("Upsert error: %s", exc)
except NotImplementedError as exc:
return {
"status": "error",
"tracks_new": 0,
"tracks_updated": 0,
"errors": [str(exc)],
}
except Exception as exc:
errors.append(str(exc))
logger.error("Collect error for %s/%s: %s", region_id, source_id, exc)
return {
"status": "error",
"tracks_new": tracks_new,
"tracks_updated": tracks_updated,
"errors": errors,
}
status = "ok" if not errors else "partial"
return {
"status": status,
"tracks_new": tracks_new,
"tracks_updated": tracks_updated,
"errors": errors,
}
async def main() -> int:
"""Главная функция pipeline сбора GPS-треков."""
parser = argparse.ArgumentParser(description="GPS tracks collection pipeline")
parser.add_argument("--region", help="Region ID to process (all if not set)")
parser.add_argument("--source", help="Source ID to process (all if not set)")
parser.add_argument("--dry-run", action="store_true", help="Simulate without writing to DB")
parser.add_argument("--gc", action="store_true", help="Run garbage collection after each region")
args = parser.parse_args()
project_root = os.path.join(os.path.dirname(__file__), "..")
sources_config_path = os.environ.get(
"GPS_SOURCES_CONFIG",
os.path.join(project_root, "config/gps_sources.yaml"),
)
regions_config_path = os.environ.get(
"GPS_REGIONS_CONFIG",
os.path.join(project_root, "config/gps_regions.yaml"),
)
db_path = os.environ.get(
"GPS_TRACKS_DB_PATH",
os.path.join(project_root, "data/gps_tracks.sqlite"),
)
# Загружаем конфигурации
try:
sources = load_sources_config(sources_config_path)
regions = load_regions_config(regions_config_path)
except Exception as exc:
logger.error("Failed to load config: %s", exc)
return 1
# Фильтруем по параметрам CLI
if args.region:
regions = [r for r in regions if r["id"] == args.region]
if not regions:
logger.error("Region '%s' not found", args.region)
return 1
if args.source:
sources = [s for s in sources if s["id"] == args.source]
if not sources:
logger.error("Source '%s' not found", args.source)
return 1
# Открываем БД
try:
conn = open_db(db_path)
init_db(conn)
except Exception as exc:
logger.error("Failed to open DB: %s", exc)
return 1
# Строим индекс источников по id
sources_by_id = {s["id"]: s for s in sources}
has_error = False
for region in regions:
if not region.get("enabled", True):
logger.info("Skipping disabled region: %s", region["id"])
continue
region_sources = region.get("sources", [])
for source_id in region_sources:
if source_id not in sources_by_id:
logger.warning("Source '%s' not found in sources config", source_id)
continue
source_cfg = sources_by_id[source_id]
# Фильтр по --source
if args.source and source_cfg["id"] != args.source:
continue
if not source_cfg.get("enabled", False):
logger.info("Skipping disabled source: %s", source_id)
started_at = _now_iso()
_record_pipeline_run(
conn,
region["id"],
source_id,
started_at,
_now_iso(),
"skipped_disabled",
)
continue
# Проверяем лицензию
license_adr = source_cfg.get("license_adr", "")
started_at = _now_iso()
if license_adr:
license_status = _check_license_adr(license_adr, project_root)
if license_status != "accepted":
logger.warning(
"Skipping %s/%s: license ADR status is '%s' (need 'accepted')",
region["id"],
source_id,
license_status,
)
_record_pipeline_run(
conn,
region["id"],
source_id,
started_at,
_now_iso(),
"skipped_license",
)
has_error = True
continue
logger.info(
"Collecting %s for region %s (bbox=%s)",
source_id,
region["id"],
region["bbox"],
)
result = await _collect_source_for_region(region, source_cfg, conn, args.dry_run)
finished_at = _now_iso()
_record_pipeline_run(
conn,
region["id"],
source_id,
started_at,
finished_at,
result["status"],
result["tracks_new"],
result["tracks_updated"],
result["errors"] or None,
)
logger.info(
"Done %s/%s: status=%s new=%d updated=%d errors=%d",
region["id"],
source_id,
result["status"],
result["tracks_new"],
result["tracks_updated"],
len(result["errors"]),
)
if result["status"] in ("error",):
has_error = True
if args.gc:
import gc
gc.collect()
logger.info("GC collected after region %s", region["id"])
conn.close()
return 1 if has_error else 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))