#!/usr/bin/env python3 """CLI pipeline для сбора GPS-треков из публичных источников (ET-008). Usage: python scripts/gps_collect.py [--region ] [--source ] [--dry-run] [--gc] Exit code: 0 (success) or 1 (any error/skip) """ import argparse import asyncio import importlib import json import logging import os import sys from datetime import datetime, timezone # Добавляем корень проекта в PYTHONPATH sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from src.api.gps_tracks.config import load_regions_config, load_sources_config from src.api.gps_tracks.db import init_db, open_db, upsert_track from src.api.gps_tracks.dedup import compute_dedup_key logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", stream=sys.stderr, ) logger = logging.getLogger("gps_collect") def _now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _check_license_adr(adr_path: str, project_root: str) -> str: """Читает ADR файл и возвращает статус ('accepted', 'proposed', ...). Returns: str статус или 'unknown' если файл не найден/не парсится """ full_path = os.path.join(project_root, adr_path) if not os.path.exists(full_path): logger.warning("ADR file not found: %s", full_path) return "unknown" try: import yaml with open(full_path, "r", encoding="utf-8") as f: content = f.read() # Ищем YAML front-matter или поле status if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 3: front_matter = yaml.safe_load(parts[1]) if isinstance(front_matter, dict) and "status" in front_matter: return str(front_matter["status"]).lower() # Fallback: ищем строку "status: " for line in content.splitlines(): stripped = line.strip().lower() if stripped.startswith("status:"): value = stripped.split(":", 1)[1].strip() return value return "unknown" except Exception as exc: logger.warning("Failed to parse ADR %s: %s", adr_path, exc) return "unknown" def _record_pipeline_run( conn, region_id: str, source_id: str, started_at: str, finished_at: str, status: str, tracks_new: int = 0, tracks_updated: int = 0, errors: list = None, ) -> None: """Записывает результат запуска pipeline в БД.""" errors_json = json.dumps(errors) if errors else None conn.execute( """ INSERT INTO pipeline_runs (started_at, finished_at, region_id, source_id, status, tracks_new, tracks_updated, errors_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( started_at, finished_at, region_id, source_id, status, tracks_new, tracks_updated, errors_json, ), ) conn.commit() async def _collect_source_for_region( region: dict, source_cfg: dict, conn, dry_run: bool, ) -> dict: """Запускает сбор треков для одного (region, source). Returns: dict с ключами: status, tracks_new, tracks_updated, errors """ source_id = source_cfg["id"] region_id = region["id"] bbox = tuple(region["bbox"]) # (west, south, east, north) parser_module_path = source_cfg.get("parser_module", "") if not parser_module_path: return {"status": "error", "tracks_new": 0, "tracks_updated": 0, "errors": ["No parser_module"]} try: module = importlib.import_module(parser_module_path) # Конвенция: класс называется Parser class_name = source_id.replace("_", " ").title().replace(" ", "") + "Parser" parser_class = getattr(module, class_name, None) if parser_class is None: # Fallback: первый класс с суффиксом Parser for name in dir(module): if name.endswith("Parser") and name != "SourceParser": parser_class = getattr(module, name) break if parser_class is None: return { "status": "error", "tracks_new": 0, "tracks_updated": 0, "errors": [f"Parser class not found in {parser_module_path}"], } parser = parser_class(source_cfg) except Exception as exc: return { "status": "error", "tracks_new": 0, "tracks_updated": 0, "errors": [f"Failed to load parser: {exc}"], } tracks_new = 0 tracks_updated = 0 errors = [] source_priority = source_cfg.get("source_priority", 50) try: async for track in parser.collect(bbox, {"dry_run": dry_run, "conn": conn}): if dry_run: logger.info("[dry-run] Would upsert track from %s: %s", source_id, track.external_id) tracks_new += 1 continue try: dedup_key = compute_dedup_key( (track.min_lon, track.min_lat, track.max_lon, track.max_lat), {"length_m": track.length_m, "created_at": track.created_at}, ) result = upsert_track(conn, track, dedup_key, source_priority) if result == "inserted": tracks_new += 1 else: tracks_updated += 1 except Exception as exc: errors.append(f"upsert error for {track.external_id}: {exc}") logger.error("Upsert error: %s", exc) except NotImplementedError as exc: return { "status": "error", "tracks_new": 0, "tracks_updated": 0, "errors": [str(exc)], } except Exception as exc: errors.append(str(exc)) logger.error("Collect error for %s/%s: %s", region_id, source_id, exc) return { "status": "error", "tracks_new": tracks_new, "tracks_updated": tracks_updated, "errors": errors, } status = "ok" if not errors else "partial" return { "status": status, "tracks_new": tracks_new, "tracks_updated": tracks_updated, "errors": errors, } async def main() -> int: """Главная функция pipeline сбора GPS-треков.""" parser = argparse.ArgumentParser(description="GPS tracks collection pipeline") parser.add_argument("--region", help="Region ID to process (all if not set)") parser.add_argument("--source", help="Source ID to process (all if not set)") parser.add_argument("--dry-run", action="store_true", help="Simulate without writing to DB") parser.add_argument("--gc", action="store_true", help="Run garbage collection after each region") args = parser.parse_args() project_root = os.path.join(os.path.dirname(__file__), "..") sources_config_path = os.environ.get( "GPS_SOURCES_CONFIG", os.path.join(project_root, "config/gps_sources.yaml"), ) regions_config_path = os.environ.get( "GPS_REGIONS_CONFIG", os.path.join(project_root, "config/gps_regions.yaml"), ) db_path = os.environ.get( "GPS_TRACKS_DB_PATH", os.path.join(project_root, "data/gps_tracks.sqlite"), ) # Загружаем конфигурации try: sources = load_sources_config(sources_config_path) regions = load_regions_config(regions_config_path) except Exception as exc: logger.error("Failed to load config: %s", exc) return 1 # Фильтруем по параметрам CLI if args.region: regions = [r for r in regions if r["id"] == args.region] if not regions: logger.error("Region '%s' not found", args.region) return 1 if args.source: sources = [s for s in sources if s["id"] == args.source] if not sources: logger.error("Source '%s' not found", args.source) return 1 # Открываем БД try: conn = open_db(db_path) init_db(conn) except Exception as exc: logger.error("Failed to open DB: %s", exc) return 1 # Строим индекс источников по id sources_by_id = {s["id"]: s for s in sources} has_error = False for region in regions: if not region.get("enabled", True): logger.info("Skipping disabled region: %s", region["id"]) continue region_sources = region.get("sources", []) for source_id in region_sources: if source_id not in sources_by_id: logger.warning("Source '%s' not found in sources config", source_id) continue source_cfg = sources_by_id[source_id] # Фильтр по --source if args.source and source_cfg["id"] != args.source: continue if not source_cfg.get("enabled", False): logger.info("Skipping disabled source: %s", source_id) started_at = _now_iso() _record_pipeline_run( conn, region["id"], source_id, started_at, _now_iso(), "skipped_disabled", ) continue # Проверяем лицензию license_adr = source_cfg.get("license_adr", "") started_at = _now_iso() if license_adr: license_status = _check_license_adr(license_adr, project_root) if license_status != "accepted": logger.warning( "Skipping %s/%s: license ADR status is '%s' (need 'accepted')", region["id"], source_id, license_status, ) _record_pipeline_run( conn, region["id"], source_id, started_at, _now_iso(), "skipped_license", ) has_error = True continue logger.info( "Collecting %s for region %s (bbox=%s)", source_id, region["id"], region["bbox"], ) result = await _collect_source_for_region(region, source_cfg, conn, args.dry_run) finished_at = _now_iso() _record_pipeline_run( conn, region["id"], source_id, started_at, finished_at, result["status"], result["tracks_new"], result["tracks_updated"], result["errors"] or None, ) logger.info( "Done %s/%s: status=%s new=%d updated=%d errors=%d", region["id"], source_id, result["status"], result["tracks_new"], result["tracks_updated"], len(result["errors"]), ) if result["status"] in ("error",): has_error = True if args.gc: import gc gc.collect() logger.info("GC collected after region %s", region["id"]) conn.close() return 1 if has_error else 0 if __name__ == "__main__": sys.exit(asyncio.run(main()))