"""Integration tests for GPS pipeline with new sources (ET-009).
Coverage:
- IT-ER-01: EnduroRussia pipeline with 3 fixture GPX (1 in-bbox, 2 empty, 3 out-of-bbox)
- IT-WL-01: Wikiloc pipeline with 1 fixture track
- IT-WL-02: Wikiloc graceful-stop on 403 → status='partial', exit_code=0
- IT-DEDUP-01: EnduroRussia + Wikiloc same track → 1 row, merged sources
- IT-LIC-01: License guard blocks source when ADR status=proposed
"""
import asyncio
import json
import os
import sys
from typing import Callable
import httpx
import yaml
# Add project root to path
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.insert(0, PROJECT_ROOT)
from src.api.gps_tracks.sources import enduro_russia as er_module # noqa: E402
from src.api.gps_tracks.sources import wikiloc as wl_module # noqa: E402
FIXTURES_DIR = os.path.join(os.path.dirname(__file__), "..", "fixtures", "gps-tracks")
def _read_fixture(name: str) -> bytes:
with open(os.path.join(FIXTURES_DIR, name), "rb") as f:
return f.read()
def _read_fixture_text(name: str) -> str:
return _read_fixture(name).decode("utf-8")
def _make_handler_combined(handlers: dict) -> Callable[[httpx.Request], httpx.Response]:
"""Combines multiple handler functions, selecting by URL host."""
def handler(req: httpx.Request) -> httpx.Response:
host = req.url.host
for host_pattern, h in handlers.items():
if host_pattern in host:
return h(req)
return httpx.Response(404)
return handler
def _patch_httpx(monkeypatch, handler):
"""Подменяет httpx.AsyncClient в обоих parser-модулях."""
transport = httpx.MockTransport(handler)
original = httpx.AsyncClient
def factory(*args, **kwargs):
kwargs["transport"] = transport
return original(*args, **kwargs)
monkeypatch.setattr(er_module.httpx, "AsyncClient", factory)
monkeypatch.setattr(wl_module.httpx, "AsyncClient", factory)
def _write_config(tmp_dir: str, sources: list, regions: list) -> tuple[str, str]:
"""Записывает временные конфиги."""
src_path = os.path.join(tmp_dir, "gps_sources.yaml")
reg_path = os.path.join(tmp_dir, "gps_regions.yaml")
with open(src_path, "w") as f:
yaml.safe_dump({"sources": sources}, f)
with open(reg_path, "w") as f:
yaml.safe_dump({"regions": regions}, f)
return src_path, reg_path
def _setup_env(monkeypatch, tmp_dir, sources, regions):
src_path, reg_path = _write_config(tmp_dir, sources, regions)
db_path = os.path.join(tmp_dir, "test_gps.sqlite")
monkeypatch.setenv("GPS_SOURCES_CONFIG", src_path)
monkeypatch.setenv("GPS_REGIONS_CONFIG", reg_path)
monkeypatch.setenv("GPS_TRACKS_DB_PATH", db_path)
return db_path
def _run_pipeline(args=None):
"""Запускает scripts/gps_collect.py::main() через asyncio.run."""
from scripts.gps_collect import main as pipeline_main
saved_argv = sys.argv[:]
try:
sys.argv = ["gps_collect.py"] + (args or [])
return asyncio.run(pipeline_main())
finally:
sys.argv = saved_argv
def _last_pipeline_run(db_path: str) -> dict:
import sqlite3
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM pipeline_runs ORDER BY id DESC LIMIT 1"
).fetchone()
conn.close()
return dict(row) if row else None
def _count_tracks(db_path: str) -> int:
import sqlite3
conn = sqlite3.connect(db_path)
n = conn.execute("SELECT COUNT(*) FROM tracks").fetchone()[0]
conn.close()
return n
def _all_tracks(db_path: str) -> list:
import sqlite3
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute("SELECT * FROM tracks").fetchall()
conn.close()
return [dict(r) for r in rows]
# Сорсы для тестов
ER_SOURCE = {
"id": "enduro_russia",
"name": "EnduroRussia.ru",
"enabled": True,
"license_adr": "docs/work-items/ET-008/06-adr/ADR-010-enduro-russia-licensing.md",
"base_url": "https://endurorussia.ru",
"rate_limit_sec": 0,
"user_agent": "test/1.0",
"attribution": "EnduroRussia.ru",
"parser_module": "src.api.gps_tracks.sources.enduro_russia",
"save_user_field": False,
"source_priority": 80,
}
WL_SOURCE = {
"id": "wikiloc",
"name": "Wikiloc",
"enabled": True,
"license_adr": "docs/work-items/ET-008/06-adr/ADR-012-wikiloc-licensing.md",
"base_url": "https://www.wikiloc.com",
"rate_limit_sec": 0,
"user_agent": "test/1.0",
"attribution": "© Wikiloc contributors",
"parser_module": "src.api.gps_tracks.sources.wikiloc",
"save_user_field": False,
"source_priority": 70,
"activity_filter": ["motorcycle"],
}
REGION_TSFO = {
"id": "tsfo_plus_chuvashia",
"name": "ЦФО + Чувашия",
"bbox": [29.0, 49.5, 47.5, 60.0],
"enabled": True,
"sources": ["enduro_russia", "wikiloc"],
}
# ─── IT-ER-01 ───────────────────────────────────────────────────────────────
def test_it_er_01_pipeline_enduro_russia_three_gpx(monkeypatch, tmp_path):
"""IT-ER-01: 3 фикстурных GPX → tracks_new=1 (track1 OK; track2 empty; track3 out-of-bbox)."""
api_data = {
"items": [
{"id": 1, "name": "Track1", "difficulty": "hard", "created_at": "2024-08-15 12:30:00"},
{"id": 2, "name": "Track2", "difficulty": "soft", "created_at": "2024-09-02 09:15:00"},
{"id": 3, "name": "Track3", "difficulty": "soft", "created_at": "2024-09-10 08:00:00"},
],
"total": 3,
"page": 0,
}
def handler(req: httpx.Request) -> httpx.Response:
if req.url.host == "endurorussia.ru":
if req.url.path == "/api/tracks":
return httpx.Response(200, json=api_data)
for tid in (1, 2, 3):
if req.url.path == f"/api/tracks/{tid}/gpx":
return httpx.Response(200, content=_read_fixture(f"enduro-russia-track-{tid}.gpx"))
return httpx.Response(404)
_patch_httpx(monkeypatch, handler)
db_path = _setup_env(monkeypatch, str(tmp_path), [ER_SOURCE], [REGION_TSFO])
exit_code = _run_pipeline(["--region", "tsfo_plus_chuvashia", "--source", "enduro_russia"])
assert exit_code == 0
assert _count_tracks(db_path) == 1
run = _last_pipeline_run(db_path)
assert run is not None
assert run["status"] == "ok"
assert run["tracks_new"] == 1
assert run["source_id"] == "enduro_russia"
# ─── IT-WL-01 ───────────────────────────────────────────────────────────────
def test_it_wl_01_pipeline_wikiloc_one_track(monkeypatch, tmp_path):
"""IT-WL-01: Wikiloc с 1 треком → tracks_new=1, status ∈ {ok, partial}."""
# Поиск возвращает 1 трек, дальше 404 чтобы остановиться
mini_search = 'x'
def handler(req: httpx.Request) -> httpx.Response:
if req.url.host == "www.wikiloc.com":
if "find.do" in req.url.path:
if "page=0" in str(req.url.query):
return httpx.Response(200, text=mini_search)
return httpx.Response(200, text="")
if req.url.path.startswith("/trails/"):
return httpx.Response(200, text=_read_fixture_text("wikiloc-trail-page.html"))
if "downloadTrail.do" in req.url.path:
return httpx.Response(200, content=_read_fixture("wikiloc-track.gpx"))
return httpx.Response(404)
_patch_httpx(monkeypatch, handler)
region = dict(REGION_TSFO, sources=["wikiloc"])
db_path = _setup_env(monkeypatch, str(tmp_path), [WL_SOURCE], [region])
exit_code = _run_pipeline(["--region", "tsfo_plus_chuvashia", "--source", "wikiloc"])
assert exit_code == 0
assert _count_tracks(db_path) == 1
run = _last_pipeline_run(db_path)
assert run["status"] in ("ok", "partial")
assert run["tracks_new"] == 1
# ─── IT-WL-02 ───────────────────────────────────────────────────────────────
def test_it_wl_02_pipeline_wikiloc_403_graceful(monkeypatch, tmp_path):
"""IT-WL-02: Wikiloc 403 на поиске → status='partial' (или 'ok'), exit_code=0."""
def handler(req: httpx.Request) -> httpx.Response:
if req.url.host == "www.wikiloc.com":
if "find.do" in req.url.path:
return httpx.Response(403, text="Forbidden")
return httpx.Response(404)
_patch_httpx(monkeypatch, handler)
region = dict(REGION_TSFO, sources=["wikiloc"])
db_path = _setup_env(monkeypatch, str(tmp_path), [WL_SOURCE], [region])
exit_code = _run_pipeline(["--region", "tsfo_plus_chuvashia", "--source", "wikiloc"])
assert exit_code == 0, "graceful-stop should not produce error exit"
assert _count_tracks(db_path) == 0
run = _last_pipeline_run(db_path)
# graceful-stop → status 'ok' (parser просто завершился без exception);
# в TZ ослабленно: ∈ {ok, partial, rate_limited}
assert run["status"] in ("ok", "partial", "rate_limited")
assert run["tracks_new"] == 0
# ─── IT-DEDUP-01 ────────────────────────────────────────────────────────────
def test_it_dedup_01_merge_enduro_russia_and_wikiloc(monkeypatch, tmp_path):
"""IT-DEDUP-01: одинаковый трек из 2 источников → 1 запись с merged sources."""
er_api = {
"items": [
{"id": 1, "name": "Дмитровский ER", "difficulty": "hard", "created_at": "2024-08-15 12:30:00"},
],
"total": 1,
"page": 0,
}
mini_search = 'x'
def handler(req: httpx.Request) -> httpx.Response:
if req.url.host == "endurorussia.ru":
if req.url.path == "/api/tracks":
return httpx.Response(200, json=er_api)
if req.url.path == "/api/tracks/1/gpx":
return httpx.Response(200, content=_read_fixture("enduro-russia-track-1.gpx"))
if req.url.host == "www.wikiloc.com":
if "find.do" in req.url.path:
if "page=0" in str(req.url.query):
return httpx.Response(200, text=mini_search)
return httpx.Response(200, text="")
if req.url.path.startswith("/trails/"):
return httpx.Response(200, text=_read_fixture_text("wikiloc-trail-page.html"))
if "downloadTrail.do" in req.url.path:
return httpx.Response(200, content=_read_fixture("wikiloc-track.gpx"))
return httpx.Response(404)
_patch_httpx(monkeypatch, handler)
region = dict(REGION_TSFO, sources=["enduro_russia", "wikiloc"])
db_path = _setup_env(monkeypatch, str(tmp_path), [ER_SOURCE, WL_SOURCE], [region])
# 1) сначала EnduroRussia
code1 = _run_pipeline(["--region", "tsfo_plus_chuvashia", "--source", "enduro_russia"])
assert code1 == 0
assert _count_tracks(db_path) == 1
# 2) затем Wikiloc
code2 = _run_pipeline(["--region", "tsfo_plus_chuvashia", "--source", "wikiloc"])
assert code2 == 0
# Должна быть 1 запись с обоими источниками
tracks = _all_tracks(db_path)
assert len(tracks) == 1, f"expected 1 merged record, got {len(tracks)}"
sources = json.loads(tracks[0]["sources_json"])
assert "enduro_russia" in sources
assert "wikiloc" in sources
ext_urls = json.loads(tracks[0]["external_urls_json"])
assert any("endurorussia.ru" in u for u in ext_urls)
assert any("wikiloc.com" in u for u in ext_urls)
# ─── IT-LIC-01 ──────────────────────────────────────────────────────────────
def test_it_lic_01_license_guard_blocks_proposed(monkeypatch, tmp_path):
"""IT-LIC-01: ADR со status: proposed → pipeline пропускает source с 'skipped_license'."""
# Создаём временный ADR с status: proposed
adr_dir = tmp_path / "docs" / "work-items" / "ET-008" / "06-adr"
adr_dir.mkdir(parents=True)
fake_adr = adr_dir / "ADR-FAKE-licensing.md"
fake_adr.write_text(
"---\n"
"type: adr\n"
"adr_id: ADR-FAKE\n"
"status: proposed\n"
"---\n\n"
"# Fake ADR for test\n"
)
er_source_proposed = dict(ER_SOURCE, license_adr="docs/work-items/ET-008/06-adr/ADR-FAKE-licensing.md")
def handler(req: httpx.Request) -> httpx.Response:
return httpx.Response(500) # не должно дойти
_patch_httpx(monkeypatch, handler)
# Pipeline берёт project_root относительно scripts/gps_collect.py.
# Нам надо подсунуть tmp_path как корень — самый простой способ: симлинком в tmp.
# Альтернатива: запускаем pipeline с cwd=tmp_path и патчим scripts module path.
# Но scripts.gps_collect использует __file__ → ../.. = project root.
# Подменим _check_license_adr через patch.
from scripts import gps_collect as collect_mod
real_check = collect_mod._check_license_adr
def patched_check(adr_path, project_root):
# Используем tmp_path как project_root для нашего fake ADR
return real_check(adr_path, str(tmp_path))
monkeypatch.setattr(collect_mod, "_check_license_adr", patched_check)
db_path = _setup_env(monkeypatch, str(tmp_path), [er_source_proposed], [REGION_TSFO])
exit_code = _run_pipeline(["--region", "tsfo_plus_chuvashia", "--source", "enduro_russia"])
# ET-009: license_guard выставляет has_error=True → exit_code=1
assert exit_code == 1
run = _last_pipeline_run(db_path)
assert run is not None
assert run["status"] == "skipped_license"
assert run["tracks_new"] == 0