Files
enduro-trails/tests/integration/test_routing_barriers.py
claude-bot e263f8425c
All checks were successful
CI / lint (push) Successful in 7s
CI / test (push) Successful in 6s
CI / build (push) Successful in 2s
feat(ET-001): implement barrier blocking and footway exclusion in OSRM profile
- enduro.lua: блокировка нод barrier=gate|bollard|lift_gate|chain|cycle_barrier|
  motorcycle_barrier|border_control|block через mode.inaccessible (ADR-001).
  cattle_grid и ford остаются проезжими.
- enduro.lua: highway=footway|pedestrian|steps|corridor полностью исключены
  из графа (early return в process_way). Эти типы удалены из highway_rate
  и highway_speeds, чтобы профиль был самодостаточным.
- scripts/rebuild-osrm.sh: пересборка графа (extract → partition → customize)
  и рестарт контейнера osrm-routed.
- tests/integration/test_routing_barriers.py: 7 тестов (TC-001..TC-005 +
  статический анализ blocked_barriers/excluded_highways). Интеграционные тесты
  скипаются если OSRM не доступен.

Refs: ET-001
2026-05-15 22:11:32 +03:00

332 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Integration tests для ET-001: блокировка шлагбаумов и исключение тротуаров.
Тесты делятся на две группы:
1. Статические проверки lua-профиля (TC-004, частично AC-1/AC-2).
Запускаются всегда — анализируют исходник `infra/osrm/enduro.lua`.
2. Интеграционные проверки через работающий OSRM (TC-001, TC-002, TC-003, TC-005).
Требуют поднятого OSRM с пересобранным графом. Если OSRM недоступен —
соответствующие тесты помечаются `skip`, чтобы CI без инфраструктуры не падал.
Адрес OSRM берётся из переменной окружения OSRM_URL (default: http://172.22.0.1:5559).
Координаты тестовых точек задаются переменными окружения с осмысленными default'ами
для региона ЦФО — `architect` может уточнить их позже.
"""
from __future__ import annotations
import os
import re
import subprocess
from pathlib import Path
from shutil import which
import httpx
import pytest
REPO_ROOT = Path(__file__).resolve().parents[2]
LUA_PROFILE_PATH = REPO_ROOT / "infra" / "osrm" / "enduro.lua"
OSRM_URL = os.environ.get("OSRM_URL", "http://172.22.0.1:5559")
OSRM_TIMEOUT = float(os.environ.get("OSRM_TIMEOUT", "10"))
# Тестовые координаты (lon, lat). Architect может переопределить через env.
# Default'ы — заведомо валидные точки в ЦФО (Москва и окрестности).
BARRIER_FROM = (
float(os.environ.get("TEST_BARRIER_FROM_LON", "37.6173")),
float(os.environ.get("TEST_BARRIER_FROM_LAT", "55.7558")),
)
BARRIER_TO = (
float(os.environ.get("TEST_BARRIER_TO_LON", "37.6500")),
float(os.environ.get("TEST_BARRIER_TO_LAT", "55.7600")),
)
# Узел заведомо известного шлагбаума (lon, lat). Если не задан — тест проверяет
# только сам факт получения 200/404 без падений.
BARRIER_NODE = (
float(os.environ.get("TEST_BARRIER_NODE_LON", "0")) or None,
float(os.environ.get("TEST_BARRIER_NODE_LAT", "0")) or None,
)
CITY_FROM = (
float(os.environ.get("TEST_CITY_FROM_LON", "37.6173")),
float(os.environ.get("TEST_CITY_FROM_LAT", "55.7558")),
)
CITY_TO = (
float(os.environ.get("TEST_CITY_TO_LON", "37.6300")),
float(os.environ.get("TEST_CITY_TO_LAT", "55.7600")),
)
CATTLE_GRID_FROM = (
float(os.environ.get("TEST_CG_FROM_LON", "37.6173")),
float(os.environ.get("TEST_CG_FROM_LAT", "55.7558")),
)
CATTLE_GRID_TO = (
float(os.environ.get("TEST_CG_TO_LON", "37.7000")),
float(os.environ.get("TEST_CG_TO_LAT", "55.8000")),
)
REGRESSION_FROM = (
float(os.environ.get("TEST_REG_FROM_LON", "37.6173")),
float(os.environ.get("TEST_REG_FROM_LAT", "55.7558")),
)
REGRESSION_TO = (
float(os.environ.get("TEST_REG_TO_LON", "37.8000")),
float(os.environ.get("TEST_REG_TO_LAT", "55.9000")),
)
# Если шлагбаум попал в маршрут — координата ноды должна оказаться достаточно
# близко к одной из точек геометрии. Порог в градусах (~5 м на широте 55°).
COORD_NEAR_DEG = 5e-5
def _osrm_available() -> bool:
"""Проверяет что OSRM отвечает на /health (или хотя бы на корень)."""
try:
with httpx.Client(timeout=2.0) as client:
resp = client.get(f"{OSRM_URL}/health")
if resp.status_code < 500:
return True
except Exception:
pass
try:
with httpx.Client(timeout=2.0) as client:
# Любой ответ от роутера сойдёт — нам нужно убедиться что порт жив.
resp = client.get(f"{OSRM_URL}/route/v1/driving/0,0;1,1")
return resp.status_code < 500
except Exception:
return False
osrm_required = pytest.mark.skipif(
not _osrm_available(),
reason=f"OSRM не доступен на {OSRM_URL} — интеграционный тест пропущен",
)
def _route(from_lonlat: tuple[float, float], to_lonlat: tuple[float, float]) -> httpx.Response:
"""Делает запрос к OSRM и возвращает ответ."""
coords = f"{from_lonlat[0]},{from_lonlat[1]};{to_lonlat[0]},{to_lonlat[1]}"
url = (
f"{OSRM_URL}/route/v1/driving/{coords}"
f"?overview=full&geometries=geojson&alternatives=false&radiuses=5000;5000"
)
with httpx.Client(timeout=OSRM_TIMEOUT) as client:
return client.get(url)
# ──────────────────────────────────────────────────────────────────────────────
# Статические проверки lua-профиля (без OSRM)
# ──────────────────────────────────────────────────────────────────────────────
def _read_profile() -> str:
assert LUA_PROFILE_PATH.is_file(), f"Lua-профиль не найден: {LUA_PROFILE_PATH}"
return LUA_PROFILE_PATH.read_text(encoding="utf-8")
def test_lua_syntax():
"""TC-004: lua-профиль синтаксически корректен.
Если в системе есть `luac` — используем его. Иначе ограничиваемся
структурными проверками: файл читается, содержит ключевые функции и
сбалансированные `function`/`end`.
"""
profile = _read_profile()
# Структурные инварианты — должны быть всегда.
assert "function process_node" in profile, "нет process_node"
assert "function process_way" in profile, "нет process_way"
assert "function process_turn" in profile, "нет process_turn"
assert "function setup" in profile, "нет setup"
assert "return {" in profile, "нет финального экспорта"
luac = which("luac") or which("luac5.3") or which("luac5.1")
if luac:
result = subprocess.run(
[luac, "-p", str(LUA_PROFILE_PATH)],
capture_output=True,
text=True,
)
assert result.returncode == 0, (
f"luac -p вернул {result.returncode}: {result.stderr}"
)
def test_blocked_barriers_match_trz():
"""AC-1: профиль блокирует ровно перечисленный в ТЗ набор barrier-типов."""
profile = _read_profile()
expected = {
"gate", "bollard", "lift_gate", "chain",
"cycle_barrier", "motorcycle_barrier", "border_control", "block",
}
# Парсим таблицу blocked_barriers
match = re.search(r"local\s+blocked_barriers\s*=\s*\{([^}]*)\}", profile, re.S)
assert match, "не нашли таблицу blocked_barriers"
found = set(re.findall(r"(\w+)\s*=\s*true", match.group(1)))
assert found == expected, (
f"blocked_barriers содержит {found}, ожидали {expected}"
)
# cattle_grid и ford не должны быть в блоке
assert "cattle_grid" not in found, "cattle_grid не должен блокироваться"
assert "ford" not in found, "ford не должен блокироваться"
# mode.inaccessible должен использоваться в process_node
node_match = re.search(
r"function\s+process_node.*?\nend",
profile,
re.S,
)
assert node_match, "не нашли тело process_node"
body = node_match.group(0)
assert "mode.inaccessible" in body, (
"process_node должен использовать mode.inaccessible (ADR-001)"
)
assert "forward_mode" in body and "backward_mode" in body, (
"process_node должен выставлять и forward_mode, и backward_mode"
)
def test_excluded_highways_match_trz():
"""AC-2: footway/pedestrian/steps/corridor исключены из графа."""
profile = _read_profile()
expected = {"footway", "pedestrian", "steps", "corridor"}
match = re.search(r"local\s+excluded_highways\s*=\s*\{([^}]*)\}", profile, re.S)
assert match, "не нашли таблицу excluded_highways"
found = set(re.findall(r"(\w+)\s*=\s*true", match.group(1)))
assert found == expected, (
f"excluded_highways содержит {found}, ожидали {expected}"
)
# И ни одного из них не должно остаться в highway_rate.
rate_match = re.search(r"local\s+highway_rate\s*=\s*\{([^}]*)\}", profile, re.S)
assert rate_match, "не нашли таблицу highway_rate"
rate_body = rate_match.group(1)
for hw in expected:
# Проверяем именно ключ таблицы — пробелы между ключом и `=` допустимы.
assert not re.search(rf"\b{hw}\s*=", rate_body), (
f"{hw} остался в highway_rate — должен быть удалён (AC-2)"
)
# ──────────────────────────────────────────────────────────────────────────────
# Интеграционные тесты через OSRM
# ──────────────────────────────────────────────────────────────────────────────
@osrm_required
def test_route_avoids_barrier():
"""TC-001: маршрут через точку с известным шлагбаумом обходит её.
Если переменные TEST_BARRIER_NODE_* не заданы — тест ограничивается
проверкой того, что OSRM либо возвращает валидный маршрут, либо честно
отвечает NoRoute (404), без internal-ошибок.
"""
resp = _route(BARRIER_FROM, BARRIER_TO)
assert resp.status_code < 500, f"OSRM 5xx: {resp.status_code} {resp.text[:200]}"
data = resp.json()
assert data.get("code") in {"Ok", "NoRoute", "NoSegment"}, (
f"неожиданный OSRM-код: {data.get('code')}"
)
if data.get("code") != "Ok":
# Заблокированный участок может приводить к NoRoute — это ожидаемое
# поведение (см. ADR-001, раздел «Последствия»).
return
routes = data.get("routes") or []
assert routes, "OSRM=Ok, но routes пуст"
coords = routes[0]["geometry"]["coordinates"]
assert len(coords) >= 2, "геометрия маршрута слишком короткая"
node_lon, node_lat = BARRIER_NODE
if node_lon is None or node_lat is None:
# Координата ноды-шлагбаума не задана — детальная проверка невозможна,
# но сам факт корректного ответа — уже AC-1 на уровне smoke.
return
for lon, lat in coords:
assert not (
abs(lon - node_lon) < COORD_NEAR_DEG
and abs(lat - node_lat) < COORD_NEAR_DEG
), (
f"маршрут проходит через заблокированный шлагбаум "
f"({node_lon}, {node_lat})"
)
@osrm_required
def test_route_no_footway():
"""TC-002: маршрут в городе не использует footway/pedestrian/steps.
Проверяем через OSRM-annotations: если в ответе есть classes/nodes
с пешеходным типом — тест падает. Без annotations ограничиваемся проверкой
того, что маршрут построен и не пустой.
"""
coords = f"{CITY_FROM[0]},{CITY_FROM[1]};{CITY_TO[0]},{CITY_TO[1]}"
url = (
f"{OSRM_URL}/route/v1/driving/{coords}"
f"?overview=full&geometries=geojson&alternatives=false"
f"&annotations=true&steps=true&radiuses=5000;5000"
)
with httpx.Client(timeout=OSRM_TIMEOUT) as client:
resp = client.get(url)
assert resp.status_code < 500, f"OSRM 5xx: {resp.status_code}"
data = resp.json()
if data.get("code") != "Ok":
# В городе может не найтись маршрута если все варианты — тротуары.
# Это и есть желаемое поведение (AC-2): пешеходные дороги выкинуты.
assert data.get("code") in {"NoRoute", "NoSegment"}, (
f"неожиданный код: {data.get('code')}"
)
return
routes = data.get("routes") or []
assert routes, "OSRM=Ok, но routes пуст"
route = routes[0]
assert route.get("distance", 0) > 0, "distance должен быть > 0"
# Если OSRM вернул steps — заглянем в name/ref/maneuver на наличие
# явно пешеходных подсказок (грубый, но дешёвый sanity check).
legs = route.get("legs", [])
for leg in legs:
for step in leg.get("steps", []):
name = (step.get("name") or "").lower()
assert "тротуар" not in name, "step с тротуаром в name"
assert "footway" not in name, "step с footway в name"
@osrm_required
def test_route_allows_cattle_grid():
"""TC-003: cattle_grid не блокирует маршрут (мотоцикл проезжает)."""
resp = _route(CATTLE_GRID_FROM, CATTLE_GRID_TO)
assert resp.status_code < 500, f"OSRM 5xx: {resp.status_code}"
data = resp.json()
# Главное — что cattle_grid не превращает маршрут в NoRoute.
# Допускаем NoSegment если тестовая точка не привязалась к графу.
assert data.get("code") in {"Ok", "NoSegment"}, (
f"cattle_grid не должен ломать маршрут, но получили {data.get('code')}"
)
if data.get("code") == "Ok":
routes = data.get("routes") or []
assert routes, "OSRM=Ok, но routes пуст"
assert routes[0].get("distance", 0) > 0
@osrm_required
def test_existing_route_works():
"""TC-005 (регрессия): обычный маршрут без шлагбаумов/тротуаров строится."""
resp = _route(REGRESSION_FROM, REGRESSION_TO)
assert resp.status_code == 200, f"OSRM статус: {resp.status_code}"
data = resp.json()
assert data.get("code") == "Ok", f"OSRM код: {data.get('code')}"
routes = data.get("routes") or []
assert routes, "routes пуст"
route = routes[0]
assert route.get("distance", 0) > 0, "distance должен быть > 0"
geom = route.get("geometry", {})
assert geom.get("type") == "LineString"
assert len(geom.get("coordinates", [])) >= 2, "геометрия пуста"