diff --git a/tasks/flightradar24/docs/PHASE2_STEP1_DETAILED_SPEC.md b/tasks/flightradar24/docs/PHASE2_STEP1_DETAILED_SPEC.md new file mode 100644 index 0000000..8499628 --- /dev/null +++ b/tasks/flightradar24/docs/PHASE2_STEP1_DETAILED_SPEC.md @@ -0,0 +1,730 @@ +# Детальная спецификация — Фаза 2, Шаг 1 + +Дополнение к `PHASE2_STEP1_EXTERNAL_DATA.md` — детальная логика модулей для Dev-агента. + +--- + +## 1. Архитектура контейнера `fr24-schedule` + +### 1.1 Структура процессов + +``` +main.py (supervisor) + ├─ cron scheduler (APScheduler) + │ └─ daily_job() @ 02:00 UTC + │ ├─ yandex_worker.fetch_day(date) + │ └─ opensky_worker.enrich_day(date) + └─ healthcheck endpoint (Flask, port 8000) + └─ GET /health → {"status": "ok", "last_run": "..."} +``` + +### 1.2 Конфигурация (config.py) + +```python +import os +from dataclasses import dataclass + +@dataclass +class Config: + # Database + DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres") + DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432")) + DB_NAME: str = os.getenv("POSTGRES_DB", "fr24") + DB_USER: str = os.getenv("POSTGRES_USER", "fr24") + DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD") + + # API keys + YANDEX_RASP_API_KEY: str = os.getenv("YANDEX_RASP_API_KEY") + OPENSKY_USERNAME: str = os.getenv("OPENSKY_USERNAME", "") + OPENSKY_PASSWORD: str = os.getenv("OPENSKY_PASSWORD", "") + + # Airports + AIRPORTS = { + "SVO": {"iata": "SVO", "icao": "UUEE", "yandex_code": "s9600213", "name": "Шереметьево"}, + "DME": {"iata": "DME", "icao": "UUDD", "yandex_code": "s9600366", "name": "Домодедово"}, + "VKO": {"iata": "VKO", "icao": "UUWW", "yandex_code": "s9600215", "name": "Внуково"}, + "ZIA": {"iata": "ZIA", "icao": "UUBW", "yandex_code": "s9881291", "name": "Жуковский"}, + } + + # Rate limits + YANDEX_RATE_LIMIT_SEC: float = 1.0 # пауза между запросами + OPENSKY_RATE_LIMIT_SEC: float = 30.0 + + # Retention + RETENTION_DAYS: int = int(os.getenv("SCHEDULE_RETENTION_DAYS", "1095")) # 3 года + + # Cron schedule + DAILY_RUN_HOUR: int = 2 # UTC + DAILY_RUN_MINUTE: int = 0 + +config = Config() +``` + +--- + +## 2. Модуль `yandex_worker.py` + +### 2.1 Основная функция + +```python +import requests +import time +from datetime import datetime, date +from typing import List, Dict +import psycopg2 +from config import config +import logging + +log = logging.getLogger(__name__) + +def fetch_day(target_date: date, conn) -> int: + """ + Загружает расписание за один день для всех аэропортов. + Возвращает количество загруженных рейсов. + """ + total_flights = 0 + + for airport_iata, airport_info in config.AIRPORTS.items(): + log.info(f"Fetching Yandex schedule for {airport_iata} on {target_date}") + + try: + flights = fetch_airport_schedule( + yandex_code=airport_info["yandex_code"], + target_date=target_date + ) + + inserted = upsert_flights(conn, flights, airport_iata, target_date) + total_flights += inserted + + log.info(f"{airport_iata}: {inserted} flights upserted") + + except Exception as e: + log.error(f"Failed to fetch {airport_iata}: {e}") + # Продолжаем с другими аэропортами + + time.sleep(config.YANDEX_RATE_LIMIT_SEC) + + conn.commit() + return total_flights +``` + +### 2.2 Запрос к Яндекс API + +```python +def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]: + """ + Запрашивает расписание у Яндекс.Расписания. + Возвращает список рейсов (arrivals + departures). + """ + url = "https://api.rasp.yandex.net/v3.0/schedule/" + params = { + "apikey": config.YANDEX_RASP_API_KEY, + "station": yandex_code, + "date": target_date.isoformat(), + "transport_types": "plane", + } + + response = requests.get(url, params=params, timeout=30) + response.raise_for_status() + + data = response.json() + flights = [] + + # Arrivals + for item in data.get("schedule", []): + if item.get("thread", {}).get("transport_subtype", {}).get("code") != "plane": + continue + + direction = "arrival" if item.get("is_fuzzy") else "departure" + # Яндекс API не различает явно arrival/departure в одном запросе, + # нужно делать два запроса: event=arrival и event=departure + # Упрощение: парсим thread.title для определения направления + + flight = parse_yandex_flight(item, direction) + if flight: + flights.append(flight) + + return flights +``` + +### 2.3 Парсинг ответа Яндекс + +```python +def parse_yandex_flight(item: Dict, direction: str) -> Dict: + """ + Преобразует элемент Яндекс API в структуру для БД. + """ + thread = item.get("thread", {}) + + # Номер рейса + flight_number = thread.get("number", "") + if not flight_number: + return None + + # Авиакомпания + carrier = thread.get("carrier", {}) + airline_iata = carrier.get("code") + airline_name = carrier.get("title") + + # Маршрут (из thread.title, например "Москва — Санкт-Петербург") + title = thread.get("title", "") + origin_iata, destination_iata = parse_route_from_title(title) + + # Время + scheduled_at = item.get("departure") or item.get("arrival") + if not scheduled_at: + return None + + return { + "flight_number": flight_number, + "airline_iata": airline_iata, + "airline_name": airline_name, + "origin_iata": origin_iata, + "destination_iata": destination_iata, + "aircraft_type": thread.get("vehicle"), + "scheduled_at": scheduled_at, + "status": "scheduled", # Яндекс не даёт статус в расписании + "source": "yandex", + } + +def parse_route_from_title(title: str) -> tuple: + """ + Извлекает origin/destination из строки "Москва — Санкт-Петербург". + Возвращает (None, None) если не удалось распарсить. + """ + # Упрощённая логика, в реальности нужен маппинг город → IATA + parts = title.split("—") + if len(parts) == 2: + origin = parts[0].strip() + destination = parts[1].strip() + # TODO: маппинг город → IATA через справочник + return (None, None) # пока не реализовано + return (None, None) +``` + +### 2.4 Upsert в БД + +```python +def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: date) -> int: + """ + Вставляет или обновляет рейсы в fr24_ext.schedule. + Возвращает количество обработанных записей. + """ + if not flights: + return 0 + + with conn.cursor() as cur: + for flight in flights: + cur.execute(""" + INSERT INTO fr24_ext.schedule + (flight_date, airport_iata, direction, flight_number, + airline_iata, airline_name, origin_iata, destination_iata, + aircraft_type, scheduled_at, status, source) + VALUES + (%(flight_date)s, %(airport_iata)s, %(direction)s, %(flight_number)s, + %(airline_iata)s, %(airline_name)s, %(origin_iata)s, %(destination_iata)s, + %(aircraft_type)s, %(scheduled_at)s, %(status)s, %(source)s) + ON CONFLICT (flight_number, airport_iata, scheduled_at, direction) + DO UPDATE SET + airline_name = EXCLUDED.airline_name, + status = EXCLUDED.status, + fetched_at = now() + """, { + "flight_date": flight_date, + "airport_iata": airport_iata, + "direction": flight.get("direction", "departure"), + "flight_number": flight["flight_number"], + "airline_iata": flight.get("airline_iata"), + "airline_name": flight.get("airline_name"), + "origin_iata": flight.get("origin_iata"), + "destination_iata": flight.get("destination_iata"), + "aircraft_type": flight.get("aircraft_type"), + "scheduled_at": flight["scheduled_at"], + "status": flight.get("status", "scheduled"), + "source": flight["source"], + }) + + return len(flights) +``` + +--- + +## 3. Модуль `opensky_worker.py` + +### 3.1 Основная функция + +```python +def enrich_day(target_date: date, conn) -> int: + """ + Обогащает записи в fr24_ext.schedule данными из OpenSky (icao24). + Возвращает количество обогащённых записей. + """ + total_enriched = 0 + + begin_ts = int(datetime.combine(target_date, datetime.min.time()).timestamp()) + end_ts = begin_ts + 86400 # +24 часа + + for airport_iata, airport_info in config.AIRPORTS.items(): + icao = airport_info["icao"] + + log.info(f"Enriching OpenSky data for {airport_iata} ({icao}) on {target_date}") + + try: + # Arrivals + arrivals = fetch_opensky_flights(icao, begin_ts, end_ts, "arrival") + enriched_arr = enrich_flights(conn, arrivals, airport_iata, "arrival") + + time.sleep(config.OPENSKY_RATE_LIMIT_SEC) + + # Departures + departures = fetch_opensky_flights(icao, begin_ts, end_ts, "departure") + enriched_dep = enrich_flights(conn, departures, airport_iata, "departure") + + total_enriched += enriched_arr + enriched_dep + + log.info(f"{airport_iata}: {enriched_arr} arrivals, {enriched_dep} departures enriched") + + except Exception as e: + log.error(f"Failed to enrich {airport_iata}: {e}") + + time.sleep(config.OPENSKY_RATE_LIMIT_SEC) + + conn.commit() + return total_enriched +``` + +### 3.2 Запрос к OpenSky API + +```python +def fetch_opensky_flights(icao: str, begin_ts: int, end_ts: int, direction: str) -> List[Dict]: + """ + Запрашивает arrivals или departures у OpenSky Network. + """ + endpoint = f"https://opensky-network.org/api/flights/{direction}" + params = { + "airport": icao, + "begin": begin_ts, + "end": end_ts, + } + + auth = None + if config.OPENSKY_USERNAME and config.OPENSKY_PASSWORD: + auth = (config.OPENSKY_USERNAME, config.OPENSKY_PASSWORD) + + response = requests.get(endpoint, params=params, auth=auth, timeout=60) + response.raise_for_status() + + return response.json() # список словарей +``` + +### 3.3 Обогащение записей + +```python +def enrich_flights(conn, opensky_flights: List[Dict], airport_iata: str, direction: str) -> int: + """ + Обновляет icao24 и actual_at в существующих записях fr24_ext.schedule. + """ + if not opensky_flights: + return 0 + + enriched_count = 0 + + with conn.cursor() as cur: + for flight in opensky_flights: + icao24 = flight.get("icao24") + callsign = flight.get("callsign", "").strip() + + if not icao24 or not callsign: + continue + + # Фактическое время прилёта/вылета + actual_ts = flight.get("lastSeen") or flight.get("firstSeen") + if not actual_ts: + continue + + actual_at = datetime.fromtimestamp(actual_ts) + + # Матчим по callsign (номер рейса) и времени (±2 часа от scheduled) + cur.execute(""" + UPDATE fr24_ext.schedule + SET icao24 = %s, + actual_at = %s, + source = CASE WHEN source = 'yandex' THEN 'merged' ELSE source END + WHERE airport_iata = %s + AND direction = %s + AND flight_number = %s + AND scheduled_at BETWEEN %s - INTERVAL '2 hours' AND %s + INTERVAL '2 hours' + AND icao24 IS NULL + """, (icao24, actual_at, airport_iata, direction, callsign, actual_at, actual_at)) + + if cur.rowcount > 0: + enriched_count += cur.rowcount + + return enriched_count +``` + +--- + +## 4. Модуль `backfill.py` + +### 4.1 CLI интерфейс + +```python +import argparse +from datetime import date, timedelta +import psycopg2 +from yandex_worker import fetch_day as yandex_fetch_day +from opensky_worker import enrich_day as opensky_enrich_day +from config import config +import logging + +log = logging.getLogger(__name__) + +def main(): + parser = argparse.ArgumentParser(description="Backfill schedule data") + parser.add_argument("--start-date", required=True, help="YYYY-MM-DD") + parser.add_argument("--end-date", required=True, help="YYYY-MM-DD") + parser.add_argument("--skip-opensky", action="store_true", help="Skip OpenSky enrichment") + + args = parser.parse_args() + + start = date.fromisoformat(args.start_date) + end = date.fromisoformat(args.end_date) + + conn = psycopg2.connect( + host=config.DB_HOST, + port=config.DB_PORT, + dbname=config.DB_NAME, + user=config.DB_USER, + password=config.DB_PASSWORD, + ) + + # Загрузить состояние + last_loaded = load_state(conn, "backfill_last_date") + if last_loaded: + start = max(start, date.fromisoformat(last_loaded) + timedelta(days=1)) + log.info(f"Resuming from {start}") + + current = start + while current <= end: + log.info(f"Processing {current}") + + try: + # Яндекс + yandex_count = yandex_fetch_day(current, conn) + log.info(f"Yandex: {yandex_count} flights") + + # OpenSky + if not args.skip_opensky: + opensky_count = opensky_enrich_day(current, conn) + log.info(f"OpenSky: {opensky_count} enriched") + + # Сохранить прогресс + save_state(conn, "backfill_last_date", current.isoformat()) + + except Exception as e: + log.error(f"Failed on {current}: {e}") + break + + current += timedelta(days=1) + + conn.close() + log.info("Backfill complete") + +def load_state(conn, key: str) -> str: + with conn.cursor() as cur: + cur.execute("SELECT state_value->>'last_date' FROM fr24_ext.load_state WHERE state_key = %s", (key,)) + row = cur.fetchone() + return row[0] if row else None + +def save_state(conn, key: str, value: str): + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO fr24_ext.load_state (state_key, state_value) + VALUES (%s, %s::jsonb) + ON CONFLICT (state_key) DO UPDATE SET state_value = EXCLUDED.state_value, updated_at = now() + """, (key, f'{{"last_date": "{value}"}}')) + conn.commit() + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main() +``` + +--- + +## 5. Frontend API (`frontend/main.py`) + +### 5.1 Endpoint `/api/schedule/data` + +```python +from flask import Flask, request, jsonify +import psycopg2 +from config import config + +app = Flask(__name__) + +@app.route("/api/schedule/data") +def get_schedule_data(): + # Параметры + date_from = request.args.get("date_from") + date_to = request.args.get("date_to") + airport = request.args.get("airport", "all") + direction = request.args.get("direction", "all") + flight_number = request.args.get("flight_number", "") + time_from = request.args.get("time_from") # HH:MM + time_to = request.args.get("time_to") + limit = int(request.args.get("limit", 100)) + offset = int(request.args.get("offset", 0)) + + # Построить WHERE + where_clauses = [] + params = [] + + if date_from: + where_clauses.append("flight_date >= %s") + params.append(date_from) + if date_to: + where_clauses.append("flight_date <= %s") + params.append(date_to) + if airport != "all": + where_clauses.append("airport_iata = %s") + params.append(airport) + if direction != "all": + where_clauses.append("direction = %s") + params.append(direction) + if flight_number: + where_clauses.append("flight_number ILIKE %s") + params.append(f"%{flight_number}%") + if time_from: + where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) >= %s") + h, m = map(int, time_from.split(":")) + params.append(h * 60 + m) + if time_to: + where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) <= %s") + h, m = map(int, time_to.split(":")) + params.append(h * 60 + m) + + where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" + + conn = psycopg2.connect(...) + cur = conn.cursor() + + # Count + cur.execute(f"SELECT COUNT(*) FROM fr24_ext.schedule WHERE {where_sql}", params) + total = cur.fetchone()[0] + + # Data + cur.execute(f""" + SELECT flight_number, airline_name, airport_iata, direction, + origin_iata, destination_iata, scheduled_at, actual_at, status, icao24 + FROM fr24_ext.schedule + WHERE {where_sql} + ORDER BY scheduled_at DESC + LIMIT %s OFFSET %s + """, params + [limit, offset]) + + rows = cur.fetchall() + flights = [] + for row in rows: + scheduled = row[6] + actual = row[7] + delay_min = int((actual - scheduled).total_seconds() / 60) if actual else None + + flights.append({ + "flight_number": row[0], + "airline": row[1], + "airport": row[2], + "direction": row[3], + "origin": row[4], + "destination": row[5], + "scheduled_at": scheduled.isoformat(), + "actual_at": actual.isoformat() if actual else None, + "delay_min": delay_min, + "status": row[8], + "icao24": row[9], + }) + + conn.close() + + return jsonify({"total": total, "flights": flights}) +``` + +### 5.2 Endpoint `/api/schedule/export` + +```python +import csv +from io import StringIO +from flask import Response + +@app.route("/api/schedule/export") +def export_schedule(): + # Те же фильтры что у /data + # ... (код аналогичен get_schedule_data) + + # Запрос без LIMIT/OFFSET + cur.execute(f"SELECT ... FROM fr24_ext.schedule WHERE {where_sql} ORDER BY scheduled_at DESC", params) + rows = cur.fetchall() + + # CSV + output = StringIO() + writer = csv.writer(output) + writer.writerow(["Flight", "Airline", "Airport", "Direction", "Route", "Scheduled", "Actual", "Delay (min)", "Status"]) + + for row in rows: + writer.writerow([ + row[0], # flight_number + row[1], # airline_name + row[2], # airport_iata + row[3], # direction + f"{row[4]} → {row[5]}", # route + row[6].isoformat(), # scheduled_at + row[7].isoformat() if row[7] else "", # actual_at + int((row[7] - row[6]).total_seconds() / 60) if row[7] else "", # delay + row[8], # status + ]) + + output.seek(0) + return Response( + output.getvalue(), + mimetype="text/csv", + headers={"Content-Disposition": "attachment; filename=schedule.csv"} + ) +``` + +--- + +## 6. Обработка ошибок + +### 6.1 Retry логика + +```python +import time +from functools import wraps + +def retry_on_error(max_retries=3, delay=5): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + for attempt in range(max_retries): + try: + return func(*args, **kwargs) + except requests.RequestException as e: + if attempt < max_retries - 1: + log.warning(f"Retry {attempt+1}/{max_retries} after error: {e}") + time.sleep(delay * (attempt + 1)) + else: + raise + return wrapper + return decorator + +@retry_on_error(max_retries=3, delay=5) +def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]: + # ... (код запроса) +``` + +### 6.2 Логирование + +```python +import logging +import sys + +def setup_logging(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[logging.StreamHandler(sys.stdout)] + ) + +# В main.py +if __name__ == "__main__": + setup_logging() +``` + +--- + +## 7. Docker + +### 7.1 Dockerfile + +```dockerfile +FROM python:3.11-slim + +WORKDIR /app + +# Dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Code +COPY *.py ./ + +# Healthcheck +HEALTHCHECK --interval=60s --timeout=5s --start-period=10s \ + CMD python -c "import requests; requests.get('http://localhost:8000/health', timeout=3).raise_for_status()" + +CMD ["python", "main.py"] +``` + +### 7.2 requirements.txt + +``` +requests==2.31.0 +psycopg2-binary==2.9.9 +APScheduler==3.10.4 +Flask==3.0.0 +``` + +### 7.3 docker-compose.yml (фрагмент) + +```yaml +services: + schedule: + build: ./ingest/schedule + container_name: fr24-schedule + environment: + POSTGRES_HOST: fr24-postgres + POSTGRES_DB: fr24 + POSTGRES_USER: fr24 + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + YANDEX_RASP_API_KEY: ${YANDEX_RASP_API_KEY} + OPENSKY_USERNAME: ${OPENSKY_USERNAME} + OPENSKY_PASSWORD: ${OPENSKY_PASSWORD} + SCHEDULE_RETENTION_DAYS: 1095 + depends_on: + fr24-postgres: + condition: service_healthy + restart: unless-stopped + networks: + - fr24-network +``` + +--- + +## 8. Retention (автоочистка) + +### 8.1 Cron job в main.py + +```python +from apscheduler.schedulers.background import BackgroundScheduler + +def cleanup_old_data(conn): + """Удаляет данные старше RETENTION_DAYS.""" + with conn.cursor() as cur: + cur.execute(""" + DELETE FROM fr24_ext.schedule + WHERE flight_date < CURRENT_DATE - INTERVAL '%s days' + """, (config.RETENTION_DAYS,)) + deleted = cur.rowcount + conn.commit() + log.info(f"Cleanup: {deleted} old records deleted") + +# В main.py +scheduler = BackgroundScheduler() +scheduler.add_job(cleanup_old_data, 'cron', hour=3, minute=0, args=[conn]) +scheduler.start() +``` + +--- + +Это детальная спецификация готова для передачи Dev-агенту. diff --git a/tasks/flightradar24/docs/PHASE2_STEP1_EXTERNAL_DATA.md b/tasks/flightradar24/docs/PHASE2_STEP1_EXTERNAL_DATA.md index 712f730..bb6748d 100644 --- a/tasks/flightradar24/docs/PHASE2_STEP1_EXTERNAL_DATA.md +++ b/tasks/flightradar24/docs/PHASE2_STEP1_EXTERNAL_DATA.md @@ -136,6 +136,16 @@ fr24_ext.load_state ( ## ТЗ для Dev-агента +**📋 Детальная спецификация:** `docs/PHASE2_STEP1_DETAILED_SPEC.md` + +Детальная спецификация содержит: +- Архитектуру контейнера и структуру процессов +- Полный код модулей с обработкой ошибок +- Конфигурацию и переменные окружения +- Retry логику и rate limiting +- Docker setup и docker-compose интеграцию +- Retention и автоочистку + ### Файлы для создания ``` tasks/flightradar24/ingest/schedule/