auto-sync: 2026-04-22 09:00:01

This commit is contained in:
Stream
2026-04-22 09:00:01 +03:00
parent 811183fea1
commit 78f434c677
4 changed files with 302 additions and 50 deletions

View File

@@ -30,6 +30,9 @@ class Config:
# Whether to fetch tracks after summaries (costs extra credits)
FETCH_TRACKS: bool = os.getenv("FR24_FETCH_TRACKS", "false").lower() == "true"
# Max pages to paginate through (safety cap). 200 × 20 = 4000 flights max
MAX_PAGES: int = int(os.getenv("FR24_MAX_PAGES", "200"))
@property
def DB_DSN(self) -> str:
return (

View File

@@ -58,36 +58,57 @@ def _build_airports_param() -> str:
return ",".join(f"{prefix}{code}" for code in codes)
def fetch_flight_summaries(target_date: date) -> List[Dict]:
"""Fetch all flights from flight-summary/full for a single day.
Explorer tier returns max 20 results per query — paginate with offset."""
def iter_flight_summary_pages(target_date: date) -> Iterator[List[Dict]]:
"""Yield one page (list of flights) at a time. Stops on error/empty/MAX_PAGES.
Deduplicates across pages by fr24_id — avoids ×4 duplicates from
airports param 'both:SVO,both:DME,both:VKO,both:ZIA'.
"""
PAGE = 20 # Explorer tier hard limit per request
airports_param = _build_airports_param()
dt_from = f"{target_date}T00:00:00"
dt_to = f"{target_date}T23:59:59"
airports_param = _build_airports_param()
PAGE = 20 # Explorer tier hard limit per request
all_items: List[Dict] = []
offset = 0
seen_fr24_ids: set = set() # dedup across pages
page_num = 0
while True:
data = _get("/api/flight-summary/full", params={
"flight_datetime_from": dt_from,
"flight_datetime_to": dt_to,
"airports": airports_param,
"limit": PAGE,
"offset": offset,
})
try:
data = _get("/api/flight-summary/full", params={
"flight_datetime_from": dt_from,
"flight_datetime_to": dt_to,
"airports": airports_param,
"limit": PAGE,
"offset": offset,
})
except Exception as e:
log.error("fetch page offset=%d failed: %s", offset, e)
break
items = data.get("data", data) if isinstance(data, dict) else data
if not items or not isinstance(items, list):
break
all_items.extend(items)
log.debug("fetch_flight_summaries: offset=%d got %d, total so far %d",
offset, len(items), len(all_items))
# Deduplicate by fr24_id
unique = [x for x in items if x.get("fr24_id") not in seen_fr24_ids]
seen_fr24_ids.update(x["fr24_id"] for x in items if x.get("fr24_id"))
log.debug(
"iter_flight_summary_pages: page=%d offset=%d got=%d unique=%d total_seen=%d",
page_num, offset, len(items), len(unique), len(seen_fr24_ids),
)
yield unique
page_num += 1
if page_num >= config.MAX_PAGES:
log.warning("Reached MAX_PAGES=%d, stopping pagination", config.MAX_PAGES)
break
if len(items) < PAGE:
break # last page
offset += PAGE
return all_items
def fetch_track(fr24_id: str) -> Optional[List[Dict]]:
"""Fetch track points for a single flight."""
@@ -442,43 +463,46 @@ def run(target_date: date, conn) -> Dict:
"errors": 0,
}
# 1. Fetch flight summaries from /full endpoint
try:
summaries = fetch_flight_summaries(target_date)
stats["flights_found"] = len(summaries)
log.info("FR24 worker: found %d flights", len(summaries))
except Exception as e:
log.error("FR24 worker: failed to fetch summaries: %s", e)
stats["errors"] += 1
return stats
# 1. Fetch flight summaries page by page, commit after each page
for page in iter_flight_summary_pages(target_date):
stats["flights_found"] += len(page)
for item in page:
fr24_id = item.get("fr24_id")
if not fr24_id:
continue
try:
actual_id = upsert_flight_actual(conn, item, target_date)
if actual_id:
stats["flights_upserted"] += 1
# 2. Upsert into flight_actual + optionally fetch tracks
for item in summaries:
fr24_id = item.get("fr24_id")
if not fr24_id:
continue
# Optionally fetch tracks (costs extra credits)
if config.FETCH_TRACKS:
track_id = upsert_flight(conn, item, target_date)
if track_id:
points = fetch_track(fr24_id)
if points is not None:
upsert_track_points(conn, track_id, points)
stats["tracks_loaded"] += 1
else:
stats["errors"] += 1
log.debug("FR24: %s upserted", fr24_id)
except Exception as e:
conn.rollback()
stats["errors"] += 1
log.error("FR24: error processing %s: %s", fr24_id, e)
# Commit after each page — partial progress survives errors on later pages
try:
actual_id = upsert_flight_actual(conn, item, target_date)
if actual_id:
stats["flights_upserted"] += 1
# Optionally fetch tracks (costs extra credits)
if config.FETCH_TRACKS:
track_id = upsert_flight(conn, item, target_date)
if track_id:
points = fetch_track(fr24_id)
if points is not None:
upsert_track_points(conn, track_id, points)
stats["tracks_loaded"] += 1
else:
stats["errors"] += 1
conn.commit()
log.debug("FR24: %s upserted", fr24_id)
log.debug("Committed page, total so far: %d", stats["flights_upserted"])
except Exception as e:
conn.rollback()
log.error("Commit failed: %s", e)
stats["errors"] += 1
log.error("FR24: error processing %s: %s", fr24_id, e)
log.info("FR24 worker: found %d flights, upserted %d",
stats["flights_found"], stats["flights_upserted"])
# 3. Enrich schedule with actual times
try:

View File

@@ -0,0 +1,160 @@
# ТЗ: Фикс пагинации fr24_worker — commit per page + дедупликация
**Дата:** 2026-04-22
**Статус:** READY FOR DEV
**Приоритет:** Критический (из-за бага слиты 44K кредитов за ночь)
---
## Суть проблемы
`fetch_flight_summaries()` накапливает **все** страницы в памяти, потом `run()` вставляет всё разом.
При ошибке на любой странице — ничего не коммитится, все данные теряются.
Дополнительно: `both:SVO,both:DME,both:VKO,both:ZIA` возвращает дубли —
рейс SVO→DME попадает в выборку и как рейс SVO, и как рейс DME.
Итого: ~3 500 уникальных рейсов → 14 660 записей (×4 дублирование).
---
## Изменения в `fetch_flight_summaries()` → убрать, заменить на генератор
Файл: `ingest/tracks_fr24/fr24_worker.py`
### Было
```python
def fetch_flight_summaries(target_date: date) -> List[Dict]:
all_items = []
offset = 0
while True:
data = _get(...)
items = data.get("data", ...)
all_items.extend(items)
if len(items) < PAGE: break
offset += PAGE
return all_items
```
### Стало — генератор страниц
```python
def iter_flight_summary_pages(target_date: date):
"""Yield one page (list of flights) at a time. Stops on 402/empty."""
PAGE = 20
airports_param = _build_airports_param()
offset = 0
seen_fr24_ids = set() # дедупликация между страницами
while True:
try:
data = _get("/api/flight-summary/full", params={
"flight_datetime_from": f"{target_date}T00:00:00",
"flight_datetime_to": f"{target_date}T23:59:59",
"airports": airports_param,
"limit": PAGE,
"offset": offset,
})
except Exception as e:
log.error("fetch page offset=%d failed: %s", offset, e)
break
items = data.get("data", data) if isinstance(data, dict) else data
if not items or not isinstance(items, list):
break
# Дедупликация по fr24_id
unique = [x for x in items if x.get("fr24_id") not in seen_fr24_ids]
seen_fr24_ids.update(x["fr24_id"] for x in items if x.get("fr24_id"))
yield unique
if len(items) < PAGE:
break
offset += PAGE
```
---
## Изменения в `run()` — commit после каждой страницы
### Было
```python
summaries = fetch_flight_summaries(target_date)
stats["flights_found"] = len(summaries)
for item in summaries:
...upsert...
conn.commit()
```
### Стало
```python
for page in iter_flight_summary_pages(target_date):
stats["flights_found"] += len(page)
for item in page:
fr24_id = item.get("fr24_id")
if not fr24_id:
continue
try:
actual_id = upsert_flight_actual(conn, item, target_date)
if actual_id:
stats["flights_upserted"] += 1
...FETCH_TRACKS логика...
except Exception as e:
conn.rollback()
stats["errors"] += 1
log.error("FR24: error processing %s: %s", fr24_id, e)
# Коммит после каждой страницы
try:
conn.commit()
log.debug("Committed page, total so far: %d", stats["flights_upserted"])
except Exception as e:
conn.rollback()
log.error("Commit failed: %s", e)
stats["errors"] += 1
```
---
## Дополнительно — лимит страниц
Добавить защитный лимит:
```python
MAX_PAGES = int(os.getenv("FR24_MAX_PAGES", "200")) # 200 × 20 = 4000 рейсов max
```
В генераторе добавить счётчик:
```python
page_num = 0
while True:
...
page_num += 1
if page_num >= MAX_PAGES:
log.warning("Reached MAX_PAGES=%d, stopping pagination", MAX_PAGES)
break
```
В `config.py` добавить:
```python
MAX_PAGES: int = int(os.getenv("FR24_MAX_PAGES", "200"))
```
---
## Проверка
После деплоя запустить:
```
POST http://fr24-vm:8001/run?date=2026-04-19
```
(дата когда кредиты были) — проверить что `flight_actual` заполняется постепенно по страницам.
В БД сразу должны появляться строки, не ждать конца всей загрузки.
---
## Файлы для изменения
- `ingest/tracks_fr24/fr24_worker.py` — главные изменения
- `ingest/tracks_fr24/config.py` — добавить `MAX_PAGES`
**Деплой:** только `docker cp` (не rebuild), потом проверить синтаксис.

View File

@@ -0,0 +1,65 @@
# Dev Report: Pagination Fix — commit per page + дедупликация
**Дата:** 2026-04-22
**Статус:** DONE ✅
**ТЗ:** TZ-fr24-worker-pagination-fix.md
**Исполнитель:** subagent dev-pagination-fix
---
## Итог
Все изменения реализованы строго по ТЗ. Синтаксис проверен — ошибок нет.
---
## Изменения
### `config.py`
- Добавлено поле `MAX_PAGES: int = int(os.getenv("FR24_MAX_PAGES", "200"))`
Защитный лимит: 200 страниц × 20 = 4000 рейсов максимум.
### `fr24_worker.py`
1. **Удалена** функция `fetch_flight_summaries()` — накапливала все страницы в памяти, при ошибке всё терялось.
2. **Добавлен** генератор `iter_flight_summary_pages(target_date: date) -> Iterator[List[Dict]]`:
- Возвращает страницы по одной (yield)
- Дедупликация по `fr24_id` через `seen_fr24_ids: set` — убирает ×4 дубли от `both:SVO,both:DME,both:VKO,both:ZIA`
- Ловит исключения на каждой странице (break с логом, не падает весь процесс)
- Защитный лимит `config.MAX_PAGES` с предупреждением в лог
- Debug-лог: page, offset, got, unique, total_seen
3. **Переписан** блок в `run()`:
- Убран блок `try: summaries = fetch_flight_summaries(...)`
- Итерация через `for page in iter_flight_summary_pages(target_date):`
- `stats["flights_found"] += len(page)` — инкрементальный счётчик
- После обработки каждой страницы — `conn.commit()` с логом
- При ошибке commit — `conn.rollback()` + `stats["errors"] += 1`
- Индивидуальные `conn.commit()` per-flight убраны (остался только per-page)
---
## Проверка синтаксиса
```
$ python3 -m py_compile config.py && echo OK
config.py OK
$ python3 -m py_compile fr24_worker.py && echo OK
fr24_worker.py OK
```
---
## Следующий шаг (деплой)
```bash
docker cp ingest/tracks_fr24/fr24_worker.py fr24-ingest:/app/ingest/tracks_fr24/fr24_worker.py
docker cp ingest/tracks_fr24/config.py fr24-ingest:/app/ingest/tracks_fr24/config.py
```
Проверка:
```
POST http://fr24-vm:8001/run?date=2026-04-19
```
Должны появляться строки в `flight_actual` постепенно, не ждать конца всей загрузки.