Files
enduro-trails/src/api/gps_tracks/db.py
claude-bot 0060003f28
Some checks failed
CI / lint (push) Failing after 4s
CI / test (push) Failing after 4s
CI / build (push) Has been skipped
CI / lint (pull_request) Failing after 4s
CI / test (pull_request) Failing after 4s
CI / build (pull_request) Has been skipped
feat(gps-tracks): ET-008 публичные GPS-треки с публичных платформ
Backend:
- Миграция gps_tracks_001_init.sql: таблицы tracks + pipeline_runs
- Пакет src/api/gps_tracks/: models, db (WAL+upsert с dedup), dedup
  (bbox+length+date bucket-hash), mvt (LRU-кэш 1024 тайла), endpoint
  (GET /api/gps-tracks, GET /api/gps-tracks/tiles/{z}/{x}/{y}.mvt,
   GET /api/gps-tracks/health, POST /api/gps-tracks/cache/clear), config
- Парсеры: osm (split_bbox, haversine, defusedxml XXE-защита),
  enduro_russia + ttrails — заглушки (ADR-010/011 proposed, блокированы)
- Licensing guard: pipeline проверяет status ADR-файла до запуска источника
- scripts/gps_collect.py: CLI с --region/--source/--dry-run/--gc

Frontend:
- src/web/gps_tracks.js: двухрежимный слой (MVT z≤11, GeoJSON z≥12),
  debounced fetch + AbortController, фильтры активности/источника,
  цветовая палитра by-source/by-activity, halo на спутнике, popup трека,
  restorePublicTracksState(), localStorage persistence
- index.html: чекбокс «Публичные треки» в terrain-popup, #sheet-gps-filters
- app.css: .terrain-link-btn, .gps-filter-grid, .track-popup
- app.js: вызов restorePublicTracksState() в rebuildMapOverlays(),
  applyGpsHaloVisibility() в applyBaseLayer()

Конфиги:
- config/gps_sources.yaml: osm (enabled), enduro_russia/ttrails (disabled)
- config/gps_regions.yaml: ЦФО+Чувашия (enabled), Кавказ (disabled)

Docker:
- gps-collector service с profiles: [batch]

Тесты: 48 новых тестов (unit + integration), 125/125 pass

Refs: ET-008

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 12:28:54 +00:00

233 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Функции работы с БД для GPS-треков (ET-008)."""
import json
import os
import sqlite3
from datetime import datetime, timezone
from typing import Optional
from src.api.gps_tracks.models import TrackInsert
_MIGRATION_PATH = os.path.join(
os.path.dirname(__file__), "../../../migrations/gps_tracks_001_init.sql"
)
def open_db(db_path: str) -> sqlite3.Connection:
"""Открывает соединение с SQLite БД."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db(conn: sqlite3.Connection) -> None:
"""Применяет миграцию SQL для создания схемы."""
migration_path = os.path.abspath(_MIGRATION_PATH)
with open(migration_path, "r", encoding="utf-8") as f:
sql = f.read()
# Выполняем каждый statement отдельно (executescript не поддерживает параметры,
# но зато не требует явного commit)
conn.executescript(sql)
conn.commit()
def _now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def upsert_track(
conn: sqlite3.Connection,
track: TrackInsert,
dedup_key: str,
source_priority: int,
) -> str:
"""Вставляет или обновляет трек в БД.
При коллизии dedup_key:
- UNION sources (без дублей)
- UNION external_urls (без дублей)
- Метаданные обновляются если новый source_priority < существующего
Returns:
"inserted" или "updated"
"""
cur = conn.cursor()
now = _now_iso()
# Проверяем существующую запись
cur.execute(
"SELECT id, sources_json, external_urls_json, name, description, activity_type, "
"user, created_at, source_priority FROM tracks WHERE dedup_key = ?",
(dedup_key,),
)
existing = cur.fetchone()
if existing is None:
# INSERT новой записи
sources = [track.source_id]
ext_urls = [track.external_url] if track.external_url else []
cur.execute(
"""
INSERT INTO tracks (
dedup_key, name, description, activity_type, user, created_at,
length_m, points_count, min_lon, min_lat, max_lon, max_lat,
geom, sources_json, external_urls_json, tags_json,
inserted_at, updated_at, source_priority
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
dedup_key,
track.name,
track.description,
track.activity_type,
track.user,
track.created_at,
track.length_m,
track.points_count,
track.min_lon,
track.min_lat,
track.max_lon,
track.max_lat,
track.geom_wkb,
json.dumps(sources),
json.dumps(ext_urls),
json.dumps(track.tags) if track.tags else json.dumps([]),
now,
now,
source_priority,
),
)
conn.commit()
return "inserted"
else:
# UPDATE: мержим sources и external_urls
existing_sources = json.loads(existing["sources_json"] or "[]")
existing_urls = json.loads(existing["external_urls_json"] or "[]")
# Union без дублей, сохраняя порядок
merged_sources = list(dict.fromkeys(existing_sources + [track.source_id]))
new_urls = [track.external_url] if track.external_url else []
merged_urls = list(dict.fromkeys(existing_urls + new_urls))
# Получаем текущий source_priority (может отсутствовать в старых записях)
existing_priority = existing["source_priority"] if "source_priority" in existing.keys() else 999
# Обновляем метаданные только если новый источник имеет более высокий приоритет
if source_priority < existing_priority:
cur.execute(
"""
UPDATE tracks SET
name = ?,
description = ?,
activity_type = ?,
user = ?,
created_at = ?,
sources_json = ?,
external_urls_json = ?,
updated_at = ?,
source_priority = ?
WHERE dedup_key = ?
""",
(
track.name,
track.description,
track.activity_type,
track.user,
track.created_at,
json.dumps(merged_sources),
json.dumps(merged_urls),
now,
source_priority,
dedup_key,
),
)
else:
# Только обновляем sources/urls и updated_at
cur.execute(
"""
UPDATE tracks SET
sources_json = ?,
external_urls_json = ?,
updated_at = ?
WHERE dedup_key = ?
""",
(
json.dumps(merged_sources),
json.dumps(merged_urls),
now,
dedup_key,
),
)
conn.commit()
return "updated"
def get_tracks_in_bbox(
conn: sqlite3.Connection,
west: float,
south: float,
east: float,
north: float,
activities: Optional[list] = None,
sources: Optional[list] = None,
limit: int = 500,
) -> tuple:
"""Возвращает треки в указанном bbox.
Returns:
(tracks: list[sqlite3.Row], total_count: int)
"""
cur = conn.cursor()
# Базовое условие bbox
conditions = [
"min_lon <= :east",
"max_lon >= :west",
"min_lat <= :north",
"max_lat >= :south",
]
params: dict = {"west": west, "south": south, "east": east, "north": north}
# Фильтр по activity_type
if activities:
placeholders = ",".join(f":act{i}" for i in range(len(activities)))
conditions.append(f"activity_type IN ({placeholders})")
for i, act in enumerate(activities):
params[f"act{i}"] = act
where_clause = " AND ".join(conditions)
# Подсчёт общего числа (без фильтра по source, он применяется постфактум)
count_sql = f"SELECT COUNT(*) as cnt FROM tracks WHERE {where_clause}"
cur.execute(count_sql, params)
total_count = cur.fetchone()["cnt"]
# Основной запрос
select_sql = f"""
SELECT id, dedup_key, name, description, activity_type, user,
created_at, length_m, points_count,
min_lon, min_lat, max_lon, max_lat,
sources_json, external_urls_json, tags_json,
inserted_at, updated_at, geom
FROM tracks
WHERE {where_clause}
LIMIT :limit
"""
params["limit"] = limit
cur.execute(select_sql, params)
rows = cur.fetchall()
# Постфильтрация по sources (если задан)
if sources:
filtered = []
for row in rows:
row_sources = json.loads(row["sources_json"] or "[]")
if any(s in row_sources for s in sources):
filtered.append(row)
rows = filtered
return rows, total_count