Files
wiki/tasks/flightradar24/docs/PHASE2_STEP1_DETAILED_SPEC.md
2026-04-20 13:00:01 +03:00

23 KiB
Raw Blame History

Детальная спецификация — Фаза 2, Шаг 1

Дополнение к PHASE2_STEP1_EXTERNAL_DATA.md — детальная логика модулей для Dev-агента.


1. Архитектура контейнера fr24-schedule

1.1 Структура процессов

main.py (supervisor)
  ├─ cron scheduler (APScheduler)
  │   └─ daily_job() @ 02:00 UTC
  │       ├─ yandex_worker.fetch_day(date)
  │       └─ opensky_worker.enrich_day(date)
  └─ healthcheck endpoint (Flask, port 8000)
      └─ GET /health → {"status": "ok", "last_run": "..."}

1.2 Конфигурация (config.py)

import os
from dataclasses import dataclass

@dataclass
class Config:
    # Database
    DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres")
    DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432"))
    DB_NAME: str = os.getenv("POSTGRES_DB", "fr24")
    DB_USER: str = os.getenv("POSTGRES_USER", "fr24")
    DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD")
    
    # API keys
    YANDEX_RASP_API_KEY: str = os.getenv("YANDEX_RASP_API_KEY")
    OPENSKY_USERNAME: str = os.getenv("OPENSKY_USERNAME", "")
    OPENSKY_PASSWORD: str = os.getenv("OPENSKY_PASSWORD", "")
    
    # Airports
    AIRPORTS = {
        "SVO": {"iata": "SVO", "icao": "UUEE", "yandex_code": "s9600213", "name": "Шереметьево"},
        "DME": {"iata": "DME", "icao": "UUDD", "yandex_code": "s9600366", "name": "Домодедово"},
        "VKO": {"iata": "VKO", "icao": "UUWW", "yandex_code": "s9600215", "name": "Внуково"},
        "ZIA": {"iata": "ZIA", "icao": "UUBW", "yandex_code": "s9881291", "name": "Жуковский"},
    }
    
    # Rate limits
    YANDEX_RATE_LIMIT_SEC: float = 1.0  # пауза между запросами
    OPENSKY_RATE_LIMIT_SEC: float = 30.0
    
    # Retention
    RETENTION_DAYS: int = int(os.getenv("SCHEDULE_RETENTION_DAYS", "1095"))  # 3 года
    
    # Cron schedule
    DAILY_RUN_HOUR: int = 2  # UTC
    DAILY_RUN_MINUTE: int = 0

config = Config()

2. Модуль yandex_worker.py

2.1 Основная функция

import requests
import time
from datetime import datetime, date
from typing import List, Dict
import psycopg2
from config import config
import logging

log = logging.getLogger(__name__)

def fetch_day(target_date: date, conn) -> int:
    """
    Загружает расписание за один день для всех аэропортов.
    Возвращает количество загруженных рейсов.
    """
    total_flights = 0
    
    for airport_iata, airport_info in config.AIRPORTS.items():
        log.info(f"Fetching Yandex schedule for {airport_iata} on {target_date}")
        
        try:
            flights = fetch_airport_schedule(
                yandex_code=airport_info["yandex_code"],
                target_date=target_date
            )
            
            inserted = upsert_flights(conn, flights, airport_iata, target_date)
            total_flights += inserted
            
            log.info(f"{airport_iata}: {inserted} flights upserted")
            
        except Exception as e:
            log.error(f"Failed to fetch {airport_iata}: {e}")
            # Продолжаем с другими аэропортами
        
        time.sleep(config.YANDEX_RATE_LIMIT_SEC)
    
    conn.commit()
    return total_flights

2.2 Запрос к Яндекс API

def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]:
    """
    Запрашивает расписание у Яндекс.Расписания.
    Возвращает список рейсов (arrivals + departures).
    """
    url = "https://api.rasp.yandex.net/v3.0/schedule/"
    params = {
        "apikey": config.YANDEX_RASP_API_KEY,
        "station": yandex_code,
        "date": target_date.isoformat(),
        "transport_types": "plane",
    }
    
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()
    
    data = response.json()
    flights = []
    
    # Arrivals
    for item in data.get("schedule", []):
        if item.get("thread", {}).get("transport_subtype", {}).get("code") != "plane":
            continue
        
        direction = "arrival" if item.get("is_fuzzy") else "departure"
        # Яндекс API не различает явно arrival/departure в одном запросе,
        # нужно делать два запроса: event=arrival и event=departure
        # Упрощение: парсим thread.title для определения направления
        
        flight = parse_yandex_flight(item, direction)
        if flight:
            flights.append(flight)
    
    return flights

2.3 Парсинг ответа Яндекс

def parse_yandex_flight(item: Dict, direction: str) -> Dict:
    """
    Преобразует элемент Яндекс API в структуру для БД.
    """
    thread = item.get("thread", {})
    
    # Номер рейса
    flight_number = thread.get("number", "")
    if not flight_number:
        return None
    
    # Авиакомпания
    carrier = thread.get("carrier", {})
    airline_iata = carrier.get("code")
    airline_name = carrier.get("title")
    
    # Маршрут (из thread.title, например "Москва — Санкт-Петербург")
    title = thread.get("title", "")
    origin_iata, destination_iata = parse_route_from_title(title)
    
    # Время
    scheduled_at = item.get("departure") or item.get("arrival")
    if not scheduled_at:
        return None
    
    return {
        "flight_number": flight_number,
        "airline_iata": airline_iata,
        "airline_name": airline_name,
        "origin_iata": origin_iata,
        "destination_iata": destination_iata,
        "aircraft_type": thread.get("vehicle"),
        "scheduled_at": scheduled_at,
        "status": "scheduled",  # Яндекс не даёт статус в расписании
        "source": "yandex",
    }

def parse_route_from_title(title: str) -> tuple:
    """
    Извлекает origin/destination из строки "Москва — Санкт-Петербург".
    Возвращает (None, None) если не удалось распарсить.
    """
    # Упрощённая логика, в реальности нужен маппинг город → IATA
    parts = title.split("—")
    if len(parts) == 2:
        origin = parts[0].strip()
        destination = parts[1].strip()
        # TODO: маппинг город → IATA через справочник
        return (None, None)  # пока не реализовано
    return (None, None)

2.4 Upsert в БД

def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: date) -> int:
    """
    Вставляет или обновляет рейсы в fr24_ext.schedule.
    Возвращает количество обработанных записей.
    """
    if not flights:
        return 0
    
    with conn.cursor() as cur:
        for flight in flights:
            cur.execute("""
                INSERT INTO fr24_ext.schedule
                  (flight_date, airport_iata, direction, flight_number,
                   airline_iata, airline_name, origin_iata, destination_iata,
                   aircraft_type, scheduled_at, status, source)
                VALUES
                  (%(flight_date)s, %(airport_iata)s, %(direction)s, %(flight_number)s,
                   %(airline_iata)s, %(airline_name)s, %(origin_iata)s, %(destination_iata)s,
                   %(aircraft_type)s, %(scheduled_at)s, %(status)s, %(source)s)
                ON CONFLICT (flight_number, airport_iata, scheduled_at, direction)
                DO UPDATE SET
                  airline_name = EXCLUDED.airline_name,
                  status = EXCLUDED.status,
                  fetched_at = now()
            """, {
                "flight_date": flight_date,
                "airport_iata": airport_iata,
                "direction": flight.get("direction", "departure"),
                "flight_number": flight["flight_number"],
                "airline_iata": flight.get("airline_iata"),
                "airline_name": flight.get("airline_name"),
                "origin_iata": flight.get("origin_iata"),
                "destination_iata": flight.get("destination_iata"),
                "aircraft_type": flight.get("aircraft_type"),
                "scheduled_at": flight["scheduled_at"],
                "status": flight.get("status", "scheduled"),
                "source": flight["source"],
            })
    
    return len(flights)

3. Модуль opensky_worker.py

3.1 Основная функция

def enrich_day(target_date: date, conn) -> int:
    """
    Обогащает записи в fr24_ext.schedule данными из OpenSky (icao24).
    Возвращает количество обогащённых записей.
    """
    total_enriched = 0
    
    begin_ts = int(datetime.combine(target_date, datetime.min.time()).timestamp())
    end_ts = begin_ts + 86400  # +24 часа
    
    for airport_iata, airport_info in config.AIRPORTS.items():
        icao = airport_info["icao"]
        
        log.info(f"Enriching OpenSky data for {airport_iata} ({icao}) on {target_date}")
        
        try:
            # Arrivals
            arrivals = fetch_opensky_flights(icao, begin_ts, end_ts, "arrival")
            enriched_arr = enrich_flights(conn, arrivals, airport_iata, "arrival")
            
            time.sleep(config.OPENSKY_RATE_LIMIT_SEC)
            
            # Departures
            departures = fetch_opensky_flights(icao, begin_ts, end_ts, "departure")
            enriched_dep = enrich_flights(conn, departures, airport_iata, "departure")
            
            total_enriched += enriched_arr + enriched_dep
            
            log.info(f"{airport_iata}: {enriched_arr} arrivals, {enriched_dep} departures enriched")
            
        except Exception as e:
            log.error(f"Failed to enrich {airport_iata}: {e}")
        
        time.sleep(config.OPENSKY_RATE_LIMIT_SEC)
    
    conn.commit()
    return total_enriched

3.2 Запрос к OpenSky API

def fetch_opensky_flights(icao: str, begin_ts: int, end_ts: int, direction: str) -> List[Dict]:
    """
    Запрашивает arrivals или departures у OpenSky Network.
    """
    endpoint = f"https://opensky-network.org/api/flights/{direction}"
    params = {
        "airport": icao,
        "begin": begin_ts,
        "end": end_ts,
    }
    
    auth = None
    if config.OPENSKY_USERNAME and config.OPENSKY_PASSWORD:
        auth = (config.OPENSKY_USERNAME, config.OPENSKY_PASSWORD)
    
    response = requests.get(endpoint, params=params, auth=auth, timeout=60)
    response.raise_for_status()
    
    return response.json()  # список словарей

3.3 Обогащение записей

def enrich_flights(conn, opensky_flights: List[Dict], airport_iata: str, direction: str) -> int:
    """
    Обновляет icao24 и actual_at в существующих записях fr24_ext.schedule.
    """
    if not opensky_flights:
        return 0
    
    enriched_count = 0
    
    with conn.cursor() as cur:
        for flight in opensky_flights:
            icao24 = flight.get("icao24")
            callsign = flight.get("callsign", "").strip()
            
            if not icao24 or not callsign:
                continue
            
            # Фактическое время прилёта/вылета
            actual_ts = flight.get("lastSeen") or flight.get("firstSeen")
            if not actual_ts:
                continue
            
            actual_at = datetime.fromtimestamp(actual_ts)
            
            # Матчим по callsign (номер рейса) и времени (±2 часа от scheduled)
            cur.execute("""
                UPDATE fr24_ext.schedule
                SET icao24 = %s,
                    actual_at = %s,
                    source = CASE WHEN source = 'yandex' THEN 'merged' ELSE source END
                WHERE airport_iata = %s
                  AND direction = %s
                  AND flight_number = %s
                  AND scheduled_at BETWEEN %s - INTERVAL '2 hours' AND %s + INTERVAL '2 hours'
                  AND icao24 IS NULL
            """, (icao24, actual_at, airport_iata, direction, callsign, actual_at, actual_at))
            
            if cur.rowcount > 0:
                enriched_count += cur.rowcount
    
    return enriched_count

4. Модуль backfill.py

4.1 CLI интерфейс

import argparse
from datetime import date, timedelta
import psycopg2
from yandex_worker import fetch_day as yandex_fetch_day
from opensky_worker import enrich_day as opensky_enrich_day
from config import config
import logging

log = logging.getLogger(__name__)

def main():
    parser = argparse.ArgumentParser(description="Backfill schedule data")
    parser.add_argument("--start-date", required=True, help="YYYY-MM-DD")
    parser.add_argument("--end-date", required=True, help="YYYY-MM-DD")
    parser.add_argument("--skip-opensky", action="store_true", help="Skip OpenSky enrichment")
    
    args = parser.parse_args()
    
    start = date.fromisoformat(args.start_date)
    end = date.fromisoformat(args.end_date)
    
    conn = psycopg2.connect(
        host=config.DB_HOST,
        port=config.DB_PORT,
        dbname=config.DB_NAME,
        user=config.DB_USER,
        password=config.DB_PASSWORD,
    )
    
    # Загрузить состояние
    last_loaded = load_state(conn, "backfill_last_date")
    if last_loaded:
        start = max(start, date.fromisoformat(last_loaded) + timedelta(days=1))
        log.info(f"Resuming from {start}")
    
    current = start
    while current <= end:
        log.info(f"Processing {current}")
        
        try:
            # Яндекс
            yandex_count = yandex_fetch_day(current, conn)
            log.info(f"Yandex: {yandex_count} flights")
            
            # OpenSky
            if not args.skip_opensky:
                opensky_count = opensky_enrich_day(current, conn)
                log.info(f"OpenSky: {opensky_count} enriched")
            
            # Сохранить прогресс
            save_state(conn, "backfill_last_date", current.isoformat())
            
        except Exception as e:
            log.error(f"Failed on {current}: {e}")
            break
        
        current += timedelta(days=1)
    
    conn.close()
    log.info("Backfill complete")

def load_state(conn, key: str) -> str:
    with conn.cursor() as cur:
        cur.execute("SELECT state_value->>'last_date' FROM fr24_ext.load_state WHERE state_key = %s", (key,))
        row = cur.fetchone()
        return row[0] if row else None

def save_state(conn, key: str, value: str):
    with conn.cursor() as cur:
        cur.execute("""
            INSERT INTO fr24_ext.load_state (state_key, state_value)
            VALUES (%s, %s::jsonb)
            ON CONFLICT (state_key) DO UPDATE SET state_value = EXCLUDED.state_value, updated_at = now()
        """, (key, f'{{"last_date": "{value}"}}'))
    conn.commit()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    main()

5. Frontend API (frontend/main.py)

5.1 Endpoint /api/schedule/data

from flask import Flask, request, jsonify
import psycopg2
from config import config

app = Flask(__name__)

@app.route("/api/schedule/data")
def get_schedule_data():
    # Параметры
    date_from = request.args.get("date_from")
    date_to = request.args.get("date_to")
    airport = request.args.get("airport", "all")
    direction = request.args.get("direction", "all")
    flight_number = request.args.get("flight_number", "")
    time_from = request.args.get("time_from")  # HH:MM
    time_to = request.args.get("time_to")
    limit = int(request.args.get("limit", 100))
    offset = int(request.args.get("offset", 0))
    
    # Построить WHERE
    where_clauses = []
    params = []
    
    if date_from:
        where_clauses.append("flight_date >= %s")
        params.append(date_from)
    if date_to:
        where_clauses.append("flight_date <= %s")
        params.append(date_to)
    if airport != "all":
        where_clauses.append("airport_iata = %s")
        params.append(airport)
    if direction != "all":
        where_clauses.append("direction = %s")
        params.append(direction)
    if flight_number:
        where_clauses.append("flight_number ILIKE %s")
        params.append(f"%{flight_number}%")
    if time_from:
        where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) >= %s")
        h, m = map(int, time_from.split(":"))
        params.append(h * 60 + m)
    if time_to:
        where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) <= %s")
        h, m = map(int, time_to.split(":"))
        params.append(h * 60 + m)
    
    where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
    
    conn = psycopg2.connect(...)
    cur = conn.cursor()
    
    # Count
    cur.execute(f"SELECT COUNT(*) FROM fr24_ext.schedule WHERE {where_sql}", params)
    total = cur.fetchone()[0]
    
    # Data
    cur.execute(f"""
        SELECT flight_number, airline_name, airport_iata, direction,
               origin_iata, destination_iata, scheduled_at, actual_at, status, icao24
        FROM fr24_ext.schedule
        WHERE {where_sql}
        ORDER BY scheduled_at DESC
        LIMIT %s OFFSET %s
    """, params + [limit, offset])
    
    rows = cur.fetchall()
    flights = []
    for row in rows:
        scheduled = row[6]
        actual = row[7]
        delay_min = int((actual - scheduled).total_seconds() / 60) if actual else None
        
        flights.append({
            "flight_number": row[0],
            "airline": row[1],
            "airport": row[2],
            "direction": row[3],
            "origin": row[4],
            "destination": row[5],
            "scheduled_at": scheduled.isoformat(),
            "actual_at": actual.isoformat() if actual else None,
            "delay_min": delay_min,
            "status": row[8],
            "icao24": row[9],
        })
    
    conn.close()
    
    return jsonify({"total": total, "flights": flights})

5.2 Endpoint /api/schedule/export

import csv
from io import StringIO
from flask import Response

@app.route("/api/schedule/export")
def export_schedule():
    # Те же фильтры что у /data
    # ... (код аналогичен get_schedule_data)
    
    # Запрос без LIMIT/OFFSET
    cur.execute(f"SELECT ... FROM fr24_ext.schedule WHERE {where_sql} ORDER BY scheduled_at DESC", params)
    rows = cur.fetchall()
    
    # CSV
    output = StringIO()
    writer = csv.writer(output)
    writer.writerow(["Flight", "Airline", "Airport", "Direction", "Route", "Scheduled", "Actual", "Delay (min)", "Status"])
    
    for row in rows:
        writer.writerow([
            row[0],  # flight_number
            row[1],  # airline_name
            row[2],  # airport_iata
            row[3],  # direction
            f"{row[4]}{row[5]}",  # route
            row[6].isoformat(),  # scheduled_at
            row[7].isoformat() if row[7] else "",  # actual_at
            int((row[7] - row[6]).total_seconds() / 60) if row[7] else "",  # delay
            row[8],  # status
        ])
    
    output.seek(0)
    return Response(
        output.getvalue(),
        mimetype="text/csv",
        headers={"Content-Disposition": "attachment; filename=schedule.csv"}
    )

6. Обработка ошибок

6.1 Retry логика

import time
from functools import wraps

def retry_on_error(max_retries=3, delay=5):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except requests.RequestException as e:
                    if attempt < max_retries - 1:
                        log.warning(f"Retry {attempt+1}/{max_retries} after error: {e}")
                        time.sleep(delay * (attempt + 1))
                    else:
                        raise
        return wrapper
    return decorator

@retry_on_error(max_retries=3, delay=5)
def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]:
    # ... (код запроса)

6.2 Логирование

import logging
import sys

def setup_logging():
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
        handlers=[logging.StreamHandler(sys.stdout)]
    )

# В main.py
if __name__ == "__main__":
    setup_logging()

7. Docker

7.1 Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Code
COPY *.py ./

# Healthcheck
HEALTHCHECK --interval=60s --timeout=5s --start-period=10s \
  CMD python -c "import requests; requests.get('http://localhost:8000/health', timeout=3).raise_for_status()"

CMD ["python", "main.py"]

7.2 requirements.txt

requests==2.31.0
psycopg2-binary==2.9.9
APScheduler==3.10.4
Flask==3.0.0

7.3 docker-compose.yml (фрагмент)

services:
  schedule:
    build: ./ingest/schedule
    container_name: fr24-schedule
    environment:
      POSTGRES_HOST: fr24-postgres
      POSTGRES_DB: fr24
      POSTGRES_USER: fr24
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      YANDEX_RASP_API_KEY: ${YANDEX_RASP_API_KEY}
      OPENSKY_USERNAME: ${OPENSKY_USERNAME}
      OPENSKY_PASSWORD: ${OPENSKY_PASSWORD}
      SCHEDULE_RETENTION_DAYS: 1095
    depends_on:
      fr24-postgres:
        condition: service_healthy
    restart: unless-stopped
    networks:
      - fr24-network

8. Retention (автоочистка)

8.1 Cron job в main.py

from apscheduler.schedulers.background import BackgroundScheduler

def cleanup_old_data(conn):
    """Удаляет данные старше RETENTION_DAYS."""
    with conn.cursor() as cur:
        cur.execute("""
            DELETE FROM fr24_ext.schedule
            WHERE flight_date < CURRENT_DATE - INTERVAL '%s days'
        """, (config.RETENTION_DAYS,))
        deleted = cur.rowcount
    conn.commit()
    log.info(f"Cleanup: {deleted} old records deleted")

# В main.py
scheduler = BackgroundScheduler()
scheduler.add_job(cleanup_old_data, 'cron', hour=3, minute=0, args=[conn])
scheduler.start()

Это детальная спецификация готова для передачи Dev-агенту.