"""Функции работы с БД для GPS-треков (ET-008).""" import json import os import sqlite3 from datetime import datetime, timezone from typing import Optional from src.api.gps_tracks.models import TrackInsert _MIGRATION_PATH = os.path.join( os.path.dirname(__file__), "../../../migrations/gps_tracks_001_init.sql" ) def open_db(db_path: str) -> sqlite3.Connection: """Открывает соединение с SQLite БД.""" conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(conn: sqlite3.Connection) -> None: """Применяет миграцию SQL для создания схемы.""" migration_path = os.path.abspath(_MIGRATION_PATH) with open(migration_path, "r", encoding="utf-8") as f: sql = f.read() # Выполняем каждый statement отдельно (executescript не поддерживает параметры, # но зато не требует явного commit) conn.executescript(sql) conn.commit() def _now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def upsert_track( conn: sqlite3.Connection, track: TrackInsert, dedup_key: str, source_priority: int, ) -> str: """Вставляет или обновляет трек в БД. При коллизии dedup_key: - UNION sources (без дублей) - UNION external_urls (без дублей) - Метаданные обновляются если новый source_priority < существующего Returns: "inserted" или "updated" """ cur = conn.cursor() now = _now_iso() # Проверяем существующую запись cur.execute( "SELECT id, sources_json, external_urls_json, name, description, activity_type, " "user, created_at, source_priority FROM tracks WHERE dedup_key = ?", (dedup_key,), ) existing = cur.fetchone() if existing is None: # INSERT новой записи sources = [track.source_id] ext_urls = [track.external_url] if track.external_url else [] cur.execute( """ INSERT INTO tracks ( dedup_key, name, description, activity_type, user, created_at, length_m, points_count, min_lon, min_lat, max_lon, max_lat, geom, sources_json, external_urls_json, tags_json, inserted_at, updated_at, source_priority ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( dedup_key, track.name, track.description, track.activity_type, track.user, track.created_at, track.length_m, track.points_count, track.min_lon, track.min_lat, track.max_lon, track.max_lat, track.geom_wkb, json.dumps(sources), json.dumps(ext_urls), json.dumps(track.tags) if track.tags else json.dumps([]), now, now, source_priority, ), ) conn.commit() return "inserted" else: # UPDATE: мержим sources и external_urls existing_sources = json.loads(existing["sources_json"] or "[]") existing_urls = json.loads(existing["external_urls_json"] or "[]") # Union без дублей, сохраняя порядок merged_sources = list(dict.fromkeys(existing_sources + [track.source_id])) new_urls = [track.external_url] if track.external_url else [] merged_urls = list(dict.fromkeys(existing_urls + new_urls)) # Получаем текущий source_priority (может отсутствовать в старых записях) existing_priority = existing["source_priority"] if "source_priority" in existing.keys() else 999 # Обновляем метаданные только если новый источник имеет более высокий приоритет if source_priority < existing_priority: cur.execute( """ UPDATE tracks SET name = ?, description = ?, activity_type = ?, user = ?, created_at = ?, sources_json = ?, external_urls_json = ?, updated_at = ?, source_priority = ? WHERE dedup_key = ? """, ( track.name, track.description, track.activity_type, track.user, track.created_at, json.dumps(merged_sources), json.dumps(merged_urls), now, source_priority, dedup_key, ), ) else: # Только обновляем sources/urls и updated_at cur.execute( """ UPDATE tracks SET sources_json = ?, external_urls_json = ?, updated_at = ? WHERE dedup_key = ? """, ( json.dumps(merged_sources), json.dumps(merged_urls), now, dedup_key, ), ) conn.commit() return "updated" def get_tracks_in_bbox( conn: sqlite3.Connection, west: float, south: float, east: float, north: float, activities: Optional[list] = None, sources: Optional[list] = None, limit: int = 500, ) -> tuple: """Возвращает треки в указанном bbox. Returns: (tracks: list[sqlite3.Row], total_count: int) """ cur = conn.cursor() # Базовое условие bbox conditions = [ "min_lon <= :east", "max_lon >= :west", "min_lat <= :north", "max_lat >= :south", ] params: dict = {"west": west, "south": south, "east": east, "north": north} # Фильтр по activity_type if activities: placeholders = ",".join(f":act{i}" for i in range(len(activities))) conditions.append(f"activity_type IN ({placeholders})") for i, act in enumerate(activities): params[f"act{i}"] = act where_clause = " AND ".join(conditions) # Подсчёт общего числа (без фильтра по source, он применяется постфактум) count_sql = f"SELECT COUNT(*) as cnt FROM tracks WHERE {where_clause}" cur.execute(count_sql, params) total_count = cur.fetchone()["cnt"] # Основной запрос select_sql = f""" SELECT id, dedup_key, name, description, activity_type, user, created_at, length_m, points_count, min_lon, min_lat, max_lon, max_lat, sources_json, external_urls_json, tags_json, inserted_at, updated_at, geom FROM tracks WHERE {where_clause} LIMIT :limit """ params["limit"] = limit cur.execute(select_sql, params) rows = cur.fetchall() # Постфильтрация по sources (если задан) if sources: filtered = [] for row in rows: row_sources = json.loads(row["sources_json"] or "[]") if any(s in row_sources for s in sources): filtered.append(row) rows = filtered return rows, total_count