From 78f434c6779e79ea4c01c4983a59e45df9dd32f1 Mon Sep 17 00:00:00 2001 From: Stream Date: Wed, 22 Apr 2026 09:00:01 +0300 Subject: [PATCH] auto-sync: 2026-04-22 09:00:01 --- .../ingest/tracks_fr24/config.py | 3 + .../ingest/tracks_fr24/fr24_worker.py | 124 ++++++++------ .../reports/TZ-fr24-worker-pagination-fix.md | 160 ++++++++++++++++++ .../reports/dev-2026-04-22-pagination-fix.md | 65 +++++++ 4 files changed, 302 insertions(+), 50 deletions(-) create mode 100644 tasks/flightradar24/reports/TZ-fr24-worker-pagination-fix.md create mode 100644 tasks/flightradar24/reports/dev-2026-04-22-pagination-fix.md diff --git a/tasks/flightradar24/ingest/tracks_fr24/config.py b/tasks/flightradar24/ingest/tracks_fr24/config.py index 300c614..756e297 100644 --- a/tasks/flightradar24/ingest/tracks_fr24/config.py +++ b/tasks/flightradar24/ingest/tracks_fr24/config.py @@ -30,6 +30,9 @@ class Config: # Whether to fetch tracks after summaries (costs extra credits) FETCH_TRACKS: bool = os.getenv("FR24_FETCH_TRACKS", "false").lower() == "true" + # Max pages to paginate through (safety cap). 200 × 20 = 4000 flights max + MAX_PAGES: int = int(os.getenv("FR24_MAX_PAGES", "200")) + @property def DB_DSN(self) -> str: return ( diff --git a/tasks/flightradar24/ingest/tracks_fr24/fr24_worker.py b/tasks/flightradar24/ingest/tracks_fr24/fr24_worker.py index 21ed62c..b07169d 100644 --- a/tasks/flightradar24/ingest/tracks_fr24/fr24_worker.py +++ b/tasks/flightradar24/ingest/tracks_fr24/fr24_worker.py @@ -58,36 +58,57 @@ def _build_airports_param() -> str: return ",".join(f"{prefix}{code}" for code in codes) -def fetch_flight_summaries(target_date: date) -> List[Dict]: - """Fetch all flights from flight-summary/full for a single day. - Explorer tier returns max 20 results per query — paginate with offset.""" +def iter_flight_summary_pages(target_date: date) -> Iterator[List[Dict]]: + """Yield one page (list of flights) at a time. Stops on error/empty/MAX_PAGES. + + Deduplicates across pages by fr24_id — avoids ×4 duplicates from + airports param 'both:SVO,both:DME,both:VKO,both:ZIA'. + """ + PAGE = 20 # Explorer tier hard limit per request + airports_param = _build_airports_param() dt_from = f"{target_date}T00:00:00" dt_to = f"{target_date}T23:59:59" - airports_param = _build_airports_param() - PAGE = 20 # Explorer tier hard limit per request - - all_items: List[Dict] = [] offset = 0 + seen_fr24_ids: set = set() # dedup across pages + page_num = 0 + while True: - data = _get("/api/flight-summary/full", params={ - "flight_datetime_from": dt_from, - "flight_datetime_to": dt_to, - "airports": airports_param, - "limit": PAGE, - "offset": offset, - }) + try: + data = _get("/api/flight-summary/full", params={ + "flight_datetime_from": dt_from, + "flight_datetime_to": dt_to, + "airports": airports_param, + "limit": PAGE, + "offset": offset, + }) + except Exception as e: + log.error("fetch page offset=%d failed: %s", offset, e) + break + items = data.get("data", data) if isinstance(data, dict) else data if not items or not isinstance(items, list): break - all_items.extend(items) - log.debug("fetch_flight_summaries: offset=%d got %d, total so far %d", - offset, len(items), len(all_items)) + + # Deduplicate by fr24_id + unique = [x for x in items if x.get("fr24_id") not in seen_fr24_ids] + seen_fr24_ids.update(x["fr24_id"] for x in items if x.get("fr24_id")) + + log.debug( + "iter_flight_summary_pages: page=%d offset=%d got=%d unique=%d total_seen=%d", + page_num, offset, len(items), len(unique), len(seen_fr24_ids), + ) + + yield unique + + page_num += 1 + if page_num >= config.MAX_PAGES: + log.warning("Reached MAX_PAGES=%d, stopping pagination", config.MAX_PAGES) + break + if len(items) < PAGE: break # last page offset += PAGE - return all_items - def fetch_track(fr24_id: str) -> Optional[List[Dict]]: """Fetch track points for a single flight.""" @@ -442,43 +463,46 @@ def run(target_date: date, conn) -> Dict: "errors": 0, } - # 1. Fetch flight summaries from /full endpoint - try: - summaries = fetch_flight_summaries(target_date) - stats["flights_found"] = len(summaries) - log.info("FR24 worker: found %d flights", len(summaries)) - except Exception as e: - log.error("FR24 worker: failed to fetch summaries: %s", e) - stats["errors"] += 1 - return stats + # 1. Fetch flight summaries page by page, commit after each page + for page in iter_flight_summary_pages(target_date): + stats["flights_found"] += len(page) + for item in page: + fr24_id = item.get("fr24_id") + if not fr24_id: + continue + try: + actual_id = upsert_flight_actual(conn, item, target_date) + if actual_id: + stats["flights_upserted"] += 1 - # 2. Upsert into flight_actual + optionally fetch tracks - for item in summaries: - fr24_id = item.get("fr24_id") - if not fr24_id: - continue + # Optionally fetch tracks (costs extra credits) + if config.FETCH_TRACKS: + track_id = upsert_flight(conn, item, target_date) + if track_id: + points = fetch_track(fr24_id) + if points is not None: + upsert_track_points(conn, track_id, points) + stats["tracks_loaded"] += 1 + else: + stats["errors"] += 1 + + log.debug("FR24: %s upserted", fr24_id) + except Exception as e: + conn.rollback() + stats["errors"] += 1 + log.error("FR24: error processing %s: %s", fr24_id, e) + + # Commit after each page — partial progress survives errors on later pages try: - actual_id = upsert_flight_actual(conn, item, target_date) - if actual_id: - stats["flights_upserted"] += 1 - - # Optionally fetch tracks (costs extra credits) - if config.FETCH_TRACKS: - track_id = upsert_flight(conn, item, target_date) - if track_id: - points = fetch_track(fr24_id) - if points is not None: - upsert_track_points(conn, track_id, points) - stats["tracks_loaded"] += 1 - else: - stats["errors"] += 1 - conn.commit() - log.debug("FR24: %s upserted", fr24_id) + log.debug("Committed page, total so far: %d", stats["flights_upserted"]) except Exception as e: conn.rollback() + log.error("Commit failed: %s", e) stats["errors"] += 1 - log.error("FR24: error processing %s: %s", fr24_id, e) + + log.info("FR24 worker: found %d flights, upserted %d", + stats["flights_found"], stats["flights_upserted"]) # 3. Enrich schedule with actual times try: diff --git a/tasks/flightradar24/reports/TZ-fr24-worker-pagination-fix.md b/tasks/flightradar24/reports/TZ-fr24-worker-pagination-fix.md new file mode 100644 index 0000000..dbc6ec6 --- /dev/null +++ b/tasks/flightradar24/reports/TZ-fr24-worker-pagination-fix.md @@ -0,0 +1,160 @@ +# ТЗ: Фикс пагинации fr24_worker — commit per page + дедупликация + +**Дата:** 2026-04-22 +**Статус:** READY FOR DEV +**Приоритет:** Критический (из-за бага слиты 44K кредитов за ночь) + +--- + +## Суть проблемы + +`fetch_flight_summaries()` накапливает **все** страницы в памяти, потом `run()` вставляет всё разом. +При ошибке на любой странице — ничего не коммитится, все данные теряются. + +Дополнительно: `both:SVO,both:DME,both:VKO,both:ZIA` возвращает дубли — +рейс SVO→DME попадает в выборку и как рейс SVO, и как рейс DME. +Итого: ~3 500 уникальных рейсов → 14 660 записей (×4 дублирование). + +--- + +## Изменения в `fetch_flight_summaries()` → убрать, заменить на генератор + +Файл: `ingest/tracks_fr24/fr24_worker.py` + +### Было +```python +def fetch_flight_summaries(target_date: date) -> List[Dict]: + all_items = [] + offset = 0 + while True: + data = _get(...) + items = data.get("data", ...) + all_items.extend(items) + if len(items) < PAGE: break + offset += PAGE + return all_items +``` + +### Стало — генератор страниц +```python +def iter_flight_summary_pages(target_date: date): + """Yield one page (list of flights) at a time. Stops on 402/empty.""" + PAGE = 20 + airports_param = _build_airports_param() + offset = 0 + seen_fr24_ids = set() # дедупликация между страницами + + while True: + try: + data = _get("/api/flight-summary/full", params={ + "flight_datetime_from": f"{target_date}T00:00:00", + "flight_datetime_to": f"{target_date}T23:59:59", + "airports": airports_param, + "limit": PAGE, + "offset": offset, + }) + except Exception as e: + log.error("fetch page offset=%d failed: %s", offset, e) + break + + items = data.get("data", data) if isinstance(data, dict) else data + if not items or not isinstance(items, list): + break + + # Дедупликация по fr24_id + unique = [x for x in items if x.get("fr24_id") not in seen_fr24_ids] + seen_fr24_ids.update(x["fr24_id"] for x in items if x.get("fr24_id")) + + yield unique + + if len(items) < PAGE: + break + offset += PAGE +``` + +--- + +## Изменения в `run()` — commit после каждой страницы + +### Было +```python +summaries = fetch_flight_summaries(target_date) +stats["flights_found"] = len(summaries) +for item in summaries: + ...upsert... + conn.commit() +``` + +### Стало +```python +for page in iter_flight_summary_pages(target_date): + stats["flights_found"] += len(page) + for item in page: + fr24_id = item.get("fr24_id") + if not fr24_id: + continue + try: + actual_id = upsert_flight_actual(conn, item, target_date) + if actual_id: + stats["flights_upserted"] += 1 + ...FETCH_TRACKS логика... + except Exception as e: + conn.rollback() + stats["errors"] += 1 + log.error("FR24: error processing %s: %s", fr24_id, e) + + # Коммит после каждой страницы + try: + conn.commit() + log.debug("Committed page, total so far: %d", stats["flights_upserted"]) + except Exception as e: + conn.rollback() + log.error("Commit failed: %s", e) + stats["errors"] += 1 +``` + +--- + +## Дополнительно — лимит страниц + +Добавить защитный лимит: +```python +MAX_PAGES = int(os.getenv("FR24_MAX_PAGES", "200")) # 200 × 20 = 4000 рейсов max +``` + +В генераторе добавить счётчик: +```python +page_num = 0 +while True: + ... + page_num += 1 + if page_num >= MAX_PAGES: + log.warning("Reached MAX_PAGES=%d, stopping pagination", MAX_PAGES) + break +``` + +В `config.py` добавить: +```python +MAX_PAGES: int = int(os.getenv("FR24_MAX_PAGES", "200")) +``` + +--- + +## Проверка + +После деплоя запустить: +``` +POST http://fr24-vm:8001/run?date=2026-04-19 +``` +(дата когда кредиты были) — проверить что `flight_actual` заполняется постепенно по страницам. + +В БД сразу должны появляться строки, не ждать конца всей загрузки. + +--- + +## Файлы для изменения + +- `ingest/tracks_fr24/fr24_worker.py` — главные изменения +- `ingest/tracks_fr24/config.py` — добавить `MAX_PAGES` + +**Деплой:** только `docker cp` (не rebuild), потом проверить синтаксис. diff --git a/tasks/flightradar24/reports/dev-2026-04-22-pagination-fix.md b/tasks/flightradar24/reports/dev-2026-04-22-pagination-fix.md new file mode 100644 index 0000000..f5a7c8d --- /dev/null +++ b/tasks/flightradar24/reports/dev-2026-04-22-pagination-fix.md @@ -0,0 +1,65 @@ +# Dev Report: Pagination Fix — commit per page + дедупликация + +**Дата:** 2026-04-22 +**Статус:** DONE ✅ +**ТЗ:** TZ-fr24-worker-pagination-fix.md +**Исполнитель:** subagent dev-pagination-fix + +--- + +## Итог + +Все изменения реализованы строго по ТЗ. Синтаксис проверен — ошибок нет. + +--- + +## Изменения + +### `config.py` +- Добавлено поле `MAX_PAGES: int = int(os.getenv("FR24_MAX_PAGES", "200"))` + Защитный лимит: 200 страниц × 20 = 4000 рейсов максимум. + +### `fr24_worker.py` +1. **Удалена** функция `fetch_flight_summaries()` — накапливала все страницы в памяти, при ошибке всё терялось. + +2. **Добавлен** генератор `iter_flight_summary_pages(target_date: date) -> Iterator[List[Dict]]`: + - Возвращает страницы по одной (yield) + - Дедупликация по `fr24_id` через `seen_fr24_ids: set` — убирает ×4 дубли от `both:SVO,both:DME,both:VKO,both:ZIA` + - Ловит исключения на каждой странице (break с логом, не падает весь процесс) + - Защитный лимит `config.MAX_PAGES` с предупреждением в лог + - Debug-лог: page, offset, got, unique, total_seen + +3. **Переписан** блок в `run()`: + - Убран блок `try: summaries = fetch_flight_summaries(...)` + - Итерация через `for page in iter_flight_summary_pages(target_date):` + - `stats["flights_found"] += len(page)` — инкрементальный счётчик + - После обработки каждой страницы — `conn.commit()` с логом + - При ошибке commit — `conn.rollback()` + `stats["errors"] += 1` + - Индивидуальные `conn.commit()` per-flight убраны (остался только per-page) + +--- + +## Проверка синтаксиса + +``` +$ python3 -m py_compile config.py && echo OK +config.py OK + +$ python3 -m py_compile fr24_worker.py && echo OK +fr24_worker.py OK +``` + +--- + +## Следующий шаг (деплой) + +```bash +docker cp ingest/tracks_fr24/fr24_worker.py fr24-ingest:/app/ingest/tracks_fr24/fr24_worker.py +docker cp ingest/tracks_fr24/config.py fr24-ingest:/app/ingest/tracks_fr24/config.py +``` + +Проверка: +``` +POST http://fr24-vm:8001/run?date=2026-04-19 +``` +Должны появляться строки в `flight_actual` постепенно, не ждать конца всей загрузки.