auto-sync: 2026-04-26 12:10:01

This commit is contained in:
Stream
2026-04-26 12:10:02 +03:00
parent f0949be53e
commit 8a69b47e63

View File

@@ -0,0 +1,161 @@
# SPEC: Фильтрация M4-выбросов в fr24-preprocess
**Дата:** 2026-04-26
**Статус:** ready for dev
**Файл для изменения:** `/home/fr24/projects/fr24/ingest/preprocess/main.py` на хосте `fr24` (192.168.2.67)
---
## Контекст
`fr24-preprocess` читает батчи из `fr24.raw_packets` (SBS-1, base64), парсит их и пишет точки трека в `fr24.track_points`.
Анализ данных показал, что ~4% точек в зоне уверенного приёма (ЗУП=35 км) — **выбросы координат (M4)**: dump1090 иногда декодирует повреждённый Mode-S пакет и выдаёт невалидные координаты. Вычисленная скорость между такой точкой и предыдущей превышает Mach 1 (350 м/с).
Сейчас в коде есть только проверка диапазона координат (`-90..90, -180..180`). Нужна скоростная фильтрация.
---
## Задача
Добавить в `process_batch` / `append_track_point` **скоростной фильтр M4** на основе вычисленной скорости между последовательными точками одного ВС.
---
## Требования
### 1. Новый env-параметр
```python
MAX_SPEED_MS = float(os.environ.get("MAX_SPEED_MS", 350.0)) # м/с, Mach 1
```
Добавить в начало `main.py` рядом с другими константами.
### 2. Расширить `aircraft_state`
`aircraft_state` — словарь `icao24 → dict`. Добавить поля:
```python
aircraft_state[icao24]["last_lat"] = float # широта последней принятой точки
aircraft_state[icao24]["last_lon"] = float # долгота
aircraft_state[icao24]["last_point_ts"] = datetime # время последней принятой точки
```
### 3. Функция haversine
Добавить в модуль (перед `process_batch`):
```python
import math
def haversine_m(lat1, lon1, lat2, lon2) -> float:
"""Расстояние между двумя точками в метрах (формула Haversine)."""
R = 6_371_000 # радиус Земли, м
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
return 2 * R * math.asin(math.sqrt(a))
```
### 4. Фильтрация в `process_batch` — перед вызовом `append_track_point`
В блоке обработки MSG3 (после того как определены `lat`, `lon`, `observed_at`), непосредственно **перед** `append_track_point`, добавить проверку:
```python
# --- M4 outlier filter ---
outlier = False
cached = aircraft_state.get(icao24, {})
last_lat = cached.get("last_lat")
last_lon = cached.get("last_lon")
last_pts = cached.get("last_point_ts")
if last_lat is not None and last_pts is not None:
dt = (observed_at - last_pts).total_seconds()
if 0 < dt <= 30: # только если gap небольшой (≤30 сек)
dist = haversine_m(last_lat, last_lon, lat, lon)
speed = dist / dt
if speed > MAX_SPEED_MS:
log.warning(
"M4 outlier icao24=%s speed=%.0f m/s (%.0f km) — skipping point",
icao24, speed, dist / 1000
)
outlier = True
if not outlier:
track_id = get_or_create_track(conn, flight_id)
append_track_point(
conn, track_id, flight_id, observed_at,
raw_id, partition_date,
lat, lon, alt_m, speed_kt, heading, vrate,
)
# обновляем last_point только если точка принята
if icao24 not in aircraft_state:
aircraft_state[icao24] = {}
aircraft_state[icao24]["last_lat"] = lat
aircraft_state[icao24]["last_lon"] = lon
aircraft_state[icao24]["last_point_ts"] = observed_at
```
**Важно:** переменная `speed` внутри блока outlier-фильтра — это `dist/dt` (м/с), не путать с `speed_kt` (скорость ВС из SBS-1 в узлах). Переименовать локальную переменную: `computed_speed_ms = dist / dt`.
### 5. Сброс last_point при разрыве трека
В функции `get_or_create_flight`, когда закрывается старый рейс (gap > 30 мин) — нужно сбросить `last_point` в `aircraft_state`. Передать `aircraft_state` в функцию **не нужно** — проще сбрасывать в `process_batch` сразу после определения gap:
В блоке где закрывается рейс или создаётся новый (уже есть логика gap в `get_or_create_flight`), добавить после вызова:
```python
flight_id = get_or_create_flight(conn, aircraft_id, callsign, observed_at)
# Сбросить last_point если это новый рейс (gap был большой)
# Простой способ: сбрасывать если dt от last_point > 30 мин
if icao24 in aircraft_state:
lpts = aircraft_state[icao24].get("last_point_ts")
if lpts is not None and (observed_at - lpts).total_seconds() > 1800:
aircraft_state[icao24].pop("last_lat", None)
aircraft_state[icao24].pop("last_lon", None)
aircraft_state[icao24].pop("last_point_ts", None)
```
Этот блок добавить **до** M4 outlier фильтра.
---
## Чего НЕ делать
- Не менять схему БД
- Не менять логику gap-рейсов в `get_or_create_flight`
- Не трогать `append_track_point`
- Не добавлять новые зависимости (только stdlib `math`)
---
## Деплой
После изменения `main.py` на хосте `fr24` выполнить:
```bash
cd /home/fr24/projects/fr24/compose
sg docker -c 'docker compose build preprocess && docker compose up -d preprocess'
```
Проверить логи через 30 сек:
```bash
sg docker -c 'docker compose logs --tail=50 preprocess'
```
Ожидаемые строки в логах при наличии выбросов:
```
M4 outlier icao24=ABCDEF speed=1240 m/s (74 km) — skipping point
```
---
## Доступ к хосту
- Хост: `fr24` (192.168.2.67), через jump `vpn-srv`
- SSH chain: `~/.openclaw/skills/installer/scripts/ssh_exec.sh --host fr24`
- Файл: `/home/fr24/projects/fr24/ingest/preprocess/main.py`
- Использовать **installer skill** для изменений файлов (backup + сессия)