feat(ET-009): architect deliverables — ADR, infra requirements, data requirements, tech risks, wikiloc parser stub
This commit is contained in:
@@ -1,17 +1,253 @@
|
||||
"""Парсер EnduroRussia.ru — заглушка (ADR-010 status=proposed)."""
|
||||
"""Парсер EnduroRussia.ru — JSON API + GPX (ET-009)."""
|
||||
import asyncio
|
||||
import math
|
||||
import logging
|
||||
from typing import AsyncGenerator
|
||||
|
||||
import defusedxml.ElementTree as ET
|
||||
import httpx
|
||||
|
||||
from src.api.gps_tracks.models import TrackInsert
|
||||
from src.api.gps_tracks.sources.base import SourceParser
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EnduroRussiaParser(SourceParser):
|
||||
"""Парсер EnduroRussia.ru.
|
||||
"""Парсер EnduroRussia.ru через публичный JSON API.
|
||||
|
||||
Заблокирован до получения лицензии. См. ADR-010.
|
||||
API:
|
||||
GET /api/tracks?page=N&limit=50 -> {items: [...], total: N, page: N}
|
||||
GET /api/tracks/{id}/gpx -> GPX XML
|
||||
"""
|
||||
|
||||
MAPPING = {"enduro": "enduro", "мото": "moto"}
|
||||
MAPPING = {
|
||||
"enduro": "enduro",
|
||||
"мото": "moto",
|
||||
"hard": "enduro",
|
||||
"soft": "enduro",
|
||||
"тур": "moto",
|
||||
"motorcycle": "moto",
|
||||
"offroad": "offroad",
|
||||
}
|
||||
|
||||
async def collect(self, bbox, ctx):
|
||||
# ADR-010: blocked, status=proposed
|
||||
raise NotImplementedError("EnduroRussia parser not yet licensed (ADR-010)")
|
||||
return
|
||||
yield # make it a generator
|
||||
async def collect(self, bbox: tuple, ctx: dict) -> AsyncGenerator[TrackInsert, None]:
|
||||
"""Собирает треки из EnduroRussia.ru API.
|
||||
|
||||
Args:
|
||||
bbox: (west, south, east, north)
|
||||
ctx: контекст выполнения
|
||||
|
||||
Yields:
|
||||
TrackInsert объекты
|
||||
"""
|
||||
west, south, east, north = bbox
|
||||
base_url = self.config.get("base_url", "https://endurorussia.ru").rstrip("/")
|
||||
rate_limit = self.config.get("rate_limit_sec", 5)
|
||||
user_agent = self.config.get("user_agent", "enduro-trails/1.0")
|
||||
source_id = self.config.get("id", "enduro_russia")
|
||||
source_priority = self.config.get("source_priority", 80)
|
||||
|
||||
headers = {"User-Agent": user_agent, "Accept": "application/json"}
|
||||
|
||||
async with httpx.AsyncClient(timeout=30, headers=headers) as client:
|
||||
page = 0
|
||||
limit = 50
|
||||
total = None
|
||||
|
||||
while True:
|
||||
url = f"{base_url}/api/tracks?page={page}&limit={limit}"
|
||||
try:
|
||||
resp = await client.get(url)
|
||||
if resp.status_code == 429:
|
||||
logger.warning("EnduroRussia: rate limited on tracks list, stopping")
|
||||
return
|
||||
if resp.status_code != 200:
|
||||
logger.warning("EnduroRussia: tracks list returned %d", resp.status_code)
|
||||
return
|
||||
data = resp.json()
|
||||
except Exception as exc:
|
||||
logger.error("EnduroRussia: failed to fetch tracks list: %s", exc)
|
||||
return
|
||||
|
||||
items = data.get("items", [])
|
||||
if not items:
|
||||
break
|
||||
|
||||
if total is None:
|
||||
total = data.get("total", 0)
|
||||
logger.info("EnduroRussia: total tracks = %d", total)
|
||||
|
||||
for item in items:
|
||||
track_id = item.get("id")
|
||||
if not track_id:
|
||||
continue
|
||||
|
||||
gpx_url = f"{base_url}/api/tracks/{track_id}/gpx"
|
||||
try:
|
||||
await asyncio.sleep(rate_limit)
|
||||
gpx_resp = await client.get(
|
||||
gpx_url,
|
||||
headers={**headers, "Accept": "application/gpx+xml,application/xml,*/*"},
|
||||
)
|
||||
if gpx_resp.status_code == 429:
|
||||
logger.warning("EnduroRussia: rate limited on GPX %d, stopping", track_id)
|
||||
return
|
||||
if gpx_resp.status_code != 200:
|
||||
logger.warning("EnduroRussia: GPX %d returned %d", track_id, gpx_resp.status_code)
|
||||
continue
|
||||
gpx_content = gpx_resp.content
|
||||
except Exception as exc:
|
||||
logger.error("EnduroRussia: failed to fetch GPX %d: %s", track_id, exc)
|
||||
continue
|
||||
|
||||
track = _parse_gpx(
|
||||
gpx_content,
|
||||
track_id=track_id,
|
||||
meta=item,
|
||||
source_id=source_id,
|
||||
base_url=base_url,
|
||||
source_priority=source_priority,
|
||||
mapping=self.MAPPING,
|
||||
)
|
||||
if track is None:
|
||||
continue
|
||||
|
||||
if not _bbox_intersects(
|
||||
(track.min_lon, track.min_lat, track.max_lon, track.max_lat),
|
||||
(west, south, east, north),
|
||||
):
|
||||
logger.debug("EnduroRussia: track %d outside bbox, skipping", track_id)
|
||||
continue
|
||||
|
||||
yield track
|
||||
|
||||
fetched_so_far = (page + 1) * limit
|
||||
if total is not None and fetched_so_far >= total:
|
||||
break
|
||||
if len(items) < limit:
|
||||
break
|
||||
|
||||
page += 1
|
||||
|
||||
|
||||
def _parse_gpx(
|
||||
content: bytes,
|
||||
track_id: int,
|
||||
meta: dict,
|
||||
source_id: str,
|
||||
base_url: str,
|
||||
source_priority: int,
|
||||
mapping: dict,
|
||||
) -> "TrackInsert | None":
|
||||
"""Парсит GPX-файл EnduroRussia и возвращает TrackInsert."""
|
||||
try:
|
||||
root = ET.fromstring(content)
|
||||
except Exception as exc:
|
||||
logger.error("EnduroRussia: failed to parse GPX %d: %s", track_id, exc)
|
||||
return None
|
||||
|
||||
ns = ""
|
||||
tag = root.tag
|
||||
if tag.startswith("{"):
|
||||
ns = tag.split("}")[0] + "}"
|
||||
|
||||
coords = []
|
||||
for trk in root:
|
||||
local = trk.tag.replace(ns, "") if ns else trk.tag
|
||||
if local != "trk":
|
||||
continue
|
||||
for trkseg in trk:
|
||||
local2 = trkseg.tag.replace(ns, "") if ns else trkseg.tag
|
||||
if local2 != "trkseg":
|
||||
continue
|
||||
for trkpt in trkseg:
|
||||
try:
|
||||
lat = float(trkpt.get("lat", 0))
|
||||
lon = float(trkpt.get("lon", 0))
|
||||
if lat == 0 and lon == 0:
|
||||
continue
|
||||
coords.append((lon, lat))
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
|
||||
if len(coords) < 2:
|
||||
logger.debug("EnduroRussia: track %d has < 2 points, skipping", track_id)
|
||||
return None
|
||||
|
||||
lons = [c[0] for c in coords]
|
||||
lats = [c[1] for c in coords]
|
||||
min_lon, max_lon = min(lons), max(lons)
|
||||
min_lat, max_lat = min(lats), max(lats)
|
||||
|
||||
length_m = _calc_track_length(coords)
|
||||
if length_m < 10:
|
||||
return None
|
||||
|
||||
try:
|
||||
from shapely.geometry import LineString
|
||||
from shapely import wkb
|
||||
geom_wkb = wkb.dumps(LineString(coords))
|
||||
except Exception as exc:
|
||||
logger.error("EnduroRussia: shapely error for track %d: %s", track_id, exc)
|
||||
return None
|
||||
|
||||
name = meta.get("name")
|
||||
description = meta.get("description")
|
||||
created_at = meta.get("created_at", "")
|
||||
if created_at:
|
||||
created_at = created_at[:19].replace(" ", "T")
|
||||
|
||||
difficulty = (meta.get("difficulty") or "").lower()
|
||||
activity_type = mapping.get(difficulty, "enduro")
|
||||
from src.api.gps_tracks.models import ACTIVITY_TYPES
|
||||
if activity_type not in ACTIVITY_TYPES:
|
||||
activity_type = "enduro"
|
||||
|
||||
return TrackInsert(
|
||||
external_id=str(track_id),
|
||||
source_id=source_id,
|
||||
external_url=f"{base_url}/tracks/{track_id}",
|
||||
name=name,
|
||||
description=description,
|
||||
activity_type=activity_type,
|
||||
user=None,
|
||||
created_at=created_at or None,
|
||||
length_m=length_m,
|
||||
points_count=len(coords),
|
||||
geom_wkb=geom_wkb,
|
||||
min_lon=min_lon,
|
||||
min_lat=min_lat,
|
||||
max_lon=max_lon,
|
||||
max_lat=max_lat,
|
||||
tags=[],
|
||||
source_priority=source_priority,
|
||||
)
|
||||
|
||||
|
||||
def _haversine_m(lon1: float, lat1: float, lon2: float, lat2: float) -> float:
|
||||
"""Расстояние между двумя точками в метрах (Haversine)."""
|
||||
R = 6371000
|
||||
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||||
dphi = math.radians(lat2 - lat1)
|
||||
dlam = math.radians(lon2 - lon1)
|
||||
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
||||
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
||||
|
||||
|
||||
def _calc_track_length(coords: list) -> float:
|
||||
"""Считает длину трека через Haversine."""
|
||||
total = 0.0
|
||||
for i in range(len(coords) - 1):
|
||||
total += _haversine_m(coords[i][0], coords[i][1], coords[i + 1][0], coords[i + 1][1])
|
||||
return total
|
||||
|
||||
|
||||
def _bbox_intersects(a: tuple, b: tuple) -> bool:
|
||||
"""Проверяет пересечение двух bbox (west, south, east, north)."""
|
||||
a_west, a_south, a_east, a_north = a
|
||||
b_west, b_south, b_east, b_north = b
|
||||
return not (
|
||||
a_east < b_west or a_west > b_east or
|
||||
a_north < b_south or a_south > b_north
|
||||
)
|
||||
|
||||
365
src/api/gps_tracks/sources/wikiloc.py
Normal file
365
src/api/gps_tracks/sources/wikiloc.py
Normal file
@@ -0,0 +1,365 @@
|
||||
"""Парсер Wikiloc — HTML-парсинг публичных треков (ET-009)."""
|
||||
import asyncio
|
||||
import math
|
||||
import logging
|
||||
import re
|
||||
from typing import AsyncGenerator
|
||||
|
||||
import defusedxml.ElementTree as ET
|
||||
import httpx
|
||||
|
||||
from src.api.gps_tracks.models import TrackInsert
|
||||
from src.api.gps_tracks.sources.base import SourceParser
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Wikiloc activity codes для поиска
|
||||
_ACTIVITY_CODES = {
|
||||
"motorcycle": 19, # Motorcycle
|
||||
"enduro": 19,
|
||||
"mtb": 3, # Mountain biking
|
||||
}
|
||||
|
||||
# Паттерны для парсинга HTML
|
||||
_TRACK_URL_RE = re.compile(r'href="(/trails/[^"]+/\d+)"')
|
||||
_TRACK_ID_RE = re.compile(r'/trails/[^/]+/(\d+)')
|
||||
_GPX_LINK_RE = re.compile(r'href="([^"]*download[^"]*\.gpx[^"]*|[^"]*\.gpx[^"]*download[^"]*)"' , re.IGNORECASE)
|
||||
_TRAIL_JSON_RE = re.compile(r'wikiloc\.trail\s*=\s*(\{.*?\});', re.DOTALL)
|
||||
|
||||
|
||||
class WikilocParser(SourceParser):
|
||||
"""Парсер Wikiloc через HTTP-парсинг страниц поиска.
|
||||
|
||||
Wikiloc не имеет публичного API. Используем HTML-парсинг с агрессивным
|
||||
rate-limit (10 сек). При 403/429 — graceful stop без краша.
|
||||
"""
|
||||
|
||||
MAPPING = {
|
||||
"motorcycle": "moto",
|
||||
"enduro": "enduro",
|
||||
"mtb": "bicycle",
|
||||
"mountain biking": "bicycle",
|
||||
"hiking": "hike",
|
||||
"running": "hike",
|
||||
"trail running": "hike",
|
||||
"offroad": "offroad",
|
||||
}
|
||||
|
||||
async def collect(self, bbox: tuple, ctx: dict) -> AsyncGenerator[TrackInsert, None]:
|
||||
"""Собирает треки из Wikiloc через HTML-парсинг.
|
||||
|
||||
Args:
|
||||
bbox: (west, south, east, north)
|
||||
ctx: контекст выполнения
|
||||
|
||||
Yields:
|
||||
TrackInsert объекты
|
||||
"""
|
||||
west, south, east, north = bbox
|
||||
base_url = self.config.get("base_url", "https://www.wikiloc.com").rstrip("/")
|
||||
rate_limit = self.config.get("rate_limit_sec", 10)
|
||||
user_agent = self.config.get("user_agent", "enduro-trails/1.0")
|
||||
source_id = self.config.get("id", "wikiloc")
|
||||
source_priority = self.config.get("source_priority", 70)
|
||||
activity_filter = self.config.get("activity_filter", ["motorcycle", "enduro"])
|
||||
|
||||
headers = {
|
||||
"User-Agent": user_agent,
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
"Accept-Language": "en-US,en;q=0.5",
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
timeout=30,
|
||||
headers=headers,
|
||||
follow_redirects=True,
|
||||
) as client:
|
||||
for activity in activity_filter:
|
||||
act_code = _ACTIVITY_CODES.get(activity, 19)
|
||||
|
||||
page = 0
|
||||
while True:
|
||||
# URL поиска по bbox
|
||||
search_url = (
|
||||
f"{base_url}/wikiloc/find.do"
|
||||
f"?act={act_code}"
|
||||
f"&sw={south},{west}"
|
||||
f"&ne={north},{east}"
|
||||
f"&page={page}"
|
||||
)
|
||||
|
||||
try:
|
||||
await asyncio.sleep(rate_limit)
|
||||
resp = await client.get(search_url)
|
||||
except Exception as exc:
|
||||
logger.error("Wikiloc: failed to fetch search page: %s", exc)
|
||||
return
|
||||
|
||||
if resp.status_code in (403, 429):
|
||||
logger.warning(
|
||||
"Wikiloc: received %d on search, graceful stop",
|
||||
resp.status_code,
|
||||
)
|
||||
return
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.warning("Wikiloc: search returned %d", resp.status_code)
|
||||
break
|
||||
|
||||
html = resp.text
|
||||
track_paths = _extract_track_paths(html)
|
||||
|
||||
if not track_paths:
|
||||
logger.info("Wikiloc: no tracks on page %d for activity %s", page, activity)
|
||||
break
|
||||
|
||||
for path in track_paths:
|
||||
track_id_match = _TRACK_ID_RE.search(path)
|
||||
if not track_id_match:
|
||||
continue
|
||||
track_id = track_id_match.group(1)
|
||||
track_url = f"{base_url}{path}"
|
||||
|
||||
# Скачиваем страницу трека для получения GPX ссылки
|
||||
try:
|
||||
await asyncio.sleep(rate_limit)
|
||||
track_resp = await client.get(track_url)
|
||||
except Exception as exc:
|
||||
logger.error("Wikiloc: failed to fetch track %s: %s", track_id, exc)
|
||||
continue
|
||||
|
||||
if track_resp.status_code in (403, 429):
|
||||
logger.warning(
|
||||
"Wikiloc: received %d on track %s, graceful stop",
|
||||
track_resp.status_code,
|
||||
track_id,
|
||||
)
|
||||
return
|
||||
|
||||
if track_resp.status_code != 200:
|
||||
logger.warning("Wikiloc: track %s returned %d", track_id, track_resp.status_code)
|
||||
continue
|
||||
|
||||
track_html = track_resp.text
|
||||
|
||||
# Ищем ссылку на GPX
|
||||
gpx_url = _extract_gpx_url(track_html, base_url, track_id)
|
||||
if not gpx_url:
|
||||
logger.debug("Wikiloc: no GPX link found for track %s", track_id)
|
||||
continue
|
||||
|
||||
# Скачиваем GPX
|
||||
try:
|
||||
await asyncio.sleep(rate_limit)
|
||||
gpx_resp = await client.get(gpx_url)
|
||||
except Exception as exc:
|
||||
logger.error("Wikiloc: failed to fetch GPX %s: %s", track_id, exc)
|
||||
continue
|
||||
|
||||
if gpx_resp.status_code in (403, 429):
|
||||
logger.warning(
|
||||
"Wikiloc: received %d on GPX %s, graceful stop",
|
||||
gpx_resp.status_code,
|
||||
track_id,
|
||||
)
|
||||
return
|
||||
|
||||
if gpx_resp.status_code != 200:
|
||||
logger.warning("Wikiloc: GPX %s returned %d", track_id, gpx_resp.status_code)
|
||||
continue
|
||||
|
||||
# Парсим GPX
|
||||
name = _extract_track_name(track_html)
|
||||
track = _parse_gpx(
|
||||
gpx_resp.content,
|
||||
track_id=track_id,
|
||||
name=name,
|
||||
activity_type=self.MAPPING.get(activity, "moto"),
|
||||
source_id=source_id,
|
||||
track_url=track_url,
|
||||
source_priority=source_priority,
|
||||
)
|
||||
if track is None:
|
||||
continue
|
||||
|
||||
if not _bbox_intersects(
|
||||
(track.min_lon, track.min_lat, track.max_lon, track.max_lat),
|
||||
(west, south, east, north),
|
||||
):
|
||||
continue
|
||||
|
||||
yield track
|
||||
|
||||
page += 1
|
||||
|
||||
|
||||
def _extract_track_paths(html: str) -> list:
|
||||
"""Извлекает пути к трекам из HTML страницы поиска Wikiloc."""
|
||||
# Ищем ссылки вида /trails/motorcycle-enduro/name-12345678
|
||||
paths = _TRACK_URL_RE.findall(html)
|
||||
# Дедупликация с сохранением порядка
|
||||
seen = set()
|
||||
result = []
|
||||
for p in paths:
|
||||
if p not in seen and _TRACK_ID_RE.search(p):
|
||||
seen.add(p)
|
||||
result.append(p)
|
||||
return result
|
||||
|
||||
|
||||
def _extract_gpx_url(html: str, base_url: str, track_id: str) -> str | None:
|
||||
"""Извлекает URL для скачивания GPX из страницы трека."""
|
||||
# Вариант 1: прямая ссылка на GPX
|
||||
m = _GPX_LINK_RE.search(html)
|
||||
if m:
|
||||
url = m.group(1)
|
||||
if url.startswith("http"):
|
||||
return url
|
||||
return base_url + url
|
||||
|
||||
# Вариант 2: стандартный URL скачивания Wikiloc
|
||||
# https://www.wikiloc.com/wikiloc/downloadTrail.do?id=XXXXX
|
||||
dl_re = re.search(r'downloadTrail\.do\?id=(\d+)', html)
|
||||
if dl_re:
|
||||
return f"{base_url}/wikiloc/downloadTrail.do?id={dl_re.group(1)}"
|
||||
|
||||
# Вариант 3: по track_id
|
||||
return f"{base_url}/wikiloc/downloadTrail.do?id={track_id}"
|
||||
|
||||
|
||||
def _extract_track_name(html: str) -> str | None:
|
||||
"""Извлекает название трека из HTML страницы."""
|
||||
# Ищем <h1> или <title>
|
||||
m = re.search(r'<h1[^>]*>([^<]+)</h1>', html)
|
||||
if m:
|
||||
return m.group(1).strip()
|
||||
m = re.search(r'<title>([^<|]+)', html)
|
||||
if m:
|
||||
return m.group(1).strip()
|
||||
return None
|
||||
|
||||
|
||||
def _parse_gpx(
|
||||
content: bytes,
|
||||
track_id: str,
|
||||
name: str | None,
|
||||
activity_type: str,
|
||||
source_id: str,
|
||||
track_url: str,
|
||||
source_priority: int,
|
||||
) -> "TrackInsert | None":
|
||||
"""Парсит GPX-файл Wikiloc и возвращает TrackInsert."""
|
||||
try:
|
||||
root = ET.fromstring(content)
|
||||
except Exception as exc:
|
||||
logger.error("Wikiloc: failed to parse GPX %s: %s", track_id, exc)
|
||||
return None
|
||||
|
||||
ns = ""
|
||||
tag = root.tag
|
||||
if tag.startswith("{"):
|
||||
ns = tag.split("}")[0] + "}"
|
||||
|
||||
# Извлекаем название из GPX metadata если нет из HTML
|
||||
if not name:
|
||||
for child in root:
|
||||
local = child.tag.replace(ns, "") if ns else child.tag
|
||||
if local == "metadata":
|
||||
for meta_child in child:
|
||||
local2 = meta_child.tag.replace(ns, "") if ns else meta_child.tag
|
||||
if local2 == "name":
|
||||
name = meta_child.text
|
||||
break
|
||||
break
|
||||
|
||||
coords = []
|
||||
for trk in root:
|
||||
local = trk.tag.replace(ns, "") if ns else trk.tag
|
||||
if local != "trk":
|
||||
continue
|
||||
for trkseg in trk:
|
||||
local2 = trkseg.tag.replace(ns, "") if ns else trkseg.tag
|
||||
if local2 != "trkseg":
|
||||
continue
|
||||
for trkpt in trkseg:
|
||||
try:
|
||||
lat = float(trkpt.get("lat", 0))
|
||||
lon = float(trkpt.get("lon", 0))
|
||||
if lat == 0 and lon == 0:
|
||||
continue
|
||||
coords.append((lon, lat))
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
|
||||
if len(coords) < 2:
|
||||
logger.debug("Wikiloc: track %s has < 2 points, skipping", track_id)
|
||||
return None
|
||||
|
||||
lons = [c[0] for c in coords]
|
||||
lats = [c[1] for c in coords]
|
||||
min_lon, max_lon = min(lons), max(lons)
|
||||
min_lat, max_lat = min(lats), max(lats)
|
||||
|
||||
length_m = _calc_track_length(coords)
|
||||
if length_m < 10:
|
||||
return None
|
||||
|
||||
try:
|
||||
from shapely.geometry import LineString
|
||||
from shapely import wkb
|
||||
geom_wkb = wkb.dumps(LineString(coords))
|
||||
except Exception as exc:
|
||||
logger.error("Wikiloc: shapely error for track %s: %s", track_id, exc)
|
||||
return None
|
||||
|
||||
from src.api.gps_tracks.models import ACTIVITY_TYPES
|
||||
if activity_type not in ACTIVITY_TYPES:
|
||||
activity_type = "moto"
|
||||
|
||||
return TrackInsert(
|
||||
external_id=str(track_id),
|
||||
source_id=source_id,
|
||||
external_url=track_url,
|
||||
name=name,
|
||||
description=None,
|
||||
activity_type=activity_type,
|
||||
user=None,
|
||||
created_at=None,
|
||||
length_m=length_m,
|
||||
points_count=len(coords),
|
||||
geom_wkb=geom_wkb,
|
||||
min_lon=min_lon,
|
||||
min_lat=min_lat,
|
||||
max_lon=max_lon,
|
||||
max_lat=max_lat,
|
||||
tags=[],
|
||||
source_priority=source_priority,
|
||||
)
|
||||
|
||||
|
||||
def _haversine_m(lon1: float, lat1: float, lon2: float, lat2: float) -> float:
|
||||
"""Расстояние между двумя точками в метрах (Haversine)."""
|
||||
R = 6371000
|
||||
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||||
dphi = math.radians(lat2 - lat1)
|
||||
dlam = math.radians(lon2 - lon1)
|
||||
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
|
||||
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
||||
|
||||
|
||||
def _calc_track_length(coords: list) -> float:
|
||||
"""Считает длину трека через Haversine."""
|
||||
total = 0.0
|
||||
for i in range(len(coords) - 1):
|
||||
total += _haversine_m(coords[i][0], coords[i][1], coords[i + 1][0], coords[i + 1][1])
|
||||
return total
|
||||
|
||||
|
||||
def _bbox_intersects(a: tuple, b: tuple) -> bool:
|
||||
"""Проверяет пересечение двух bbox (west, south, east, north)."""
|
||||
a_west, a_south, a_east, a_north = a
|
||||
b_west, b_south, b_east, b_north = b
|
||||
return not (
|
||||
a_east < b_west or a_west > b_east or
|
||||
a_north < b_south or a_south > b_north
|
||||
)
|
||||
Reference in New Issue
Block a user