# Детальная спецификация — Фаза 2, Шаг 1 Дополнение к `PHASE2_STEP1_EXTERNAL_DATA.md` — детальная логика модулей для Dev-агента. --- ## 1. Архитектура контейнера `fr24-schedule` ### 1.1 Структура процессов ``` main.py (supervisor) ├─ cron scheduler (APScheduler) │ └─ daily_job() @ 02:00 UTC │ ├─ yandex_worker.fetch_day(date) │ └─ opensky_worker.enrich_day(date) └─ healthcheck endpoint (Flask, port 8000) └─ GET /health → {"status": "ok", "last_run": "..."} ``` ### 1.2 Конфигурация (config.py) ```python import os from dataclasses import dataclass @dataclass class Config: # Database DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres") DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432")) DB_NAME: str = os.getenv("POSTGRES_DB", "fr24") DB_USER: str = os.getenv("POSTGRES_USER", "fr24") DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD") # API keys YANDEX_RASP_API_KEY: str = os.getenv("YANDEX_RASP_API_KEY") OPENSKY_USERNAME: str = os.getenv("OPENSKY_USERNAME", "") OPENSKY_PASSWORD: str = os.getenv("OPENSKY_PASSWORD", "") # Airports AIRPORTS = { "SVO": {"iata": "SVO", "icao": "UUEE", "yandex_code": "s9600213", "name": "Шереметьево"}, "DME": {"iata": "DME", "icao": "UUDD", "yandex_code": "s9600366", "name": "Домодедово"}, "VKO": {"iata": "VKO", "icao": "UUWW", "yandex_code": "s9600215", "name": "Внуково"}, "ZIA": {"iata": "ZIA", "icao": "UUBW", "yandex_code": "s9881291", "name": "Жуковский"}, } # Rate limits YANDEX_RATE_LIMIT_SEC: float = 1.0 # пауза между запросами OPENSKY_RATE_LIMIT_SEC: float = 30.0 # Retention RETENTION_DAYS: int = int(os.getenv("SCHEDULE_RETENTION_DAYS", "1095")) # 3 года # Cron schedule DAILY_RUN_HOUR: int = 2 # UTC DAILY_RUN_MINUTE: int = 0 config = Config() ``` --- ## 2. Модуль `yandex_worker.py` ### 2.1 Основная функция ```python import requests import time from datetime import datetime, date from typing import List, Dict import psycopg2 from config import config import logging log = logging.getLogger(__name__) def fetch_day(target_date: date, conn) -> int: """ Загружает расписание за один день для всех аэропортов. Возвращает количество загруженных рейсов. """ total_flights = 0 for airport_iata, airport_info in config.AIRPORTS.items(): log.info(f"Fetching Yandex schedule for {airport_iata} on {target_date}") try: flights = fetch_airport_schedule( yandex_code=airport_info["yandex_code"], target_date=target_date ) inserted = upsert_flights(conn, flights, airport_iata, target_date) total_flights += inserted log.info(f"{airport_iata}: {inserted} flights upserted") except Exception as e: log.error(f"Failed to fetch {airport_iata}: {e}") # Продолжаем с другими аэропортами time.sleep(config.YANDEX_RATE_LIMIT_SEC) conn.commit() return total_flights ``` ### 2.2 Запрос к Яндекс API ```python def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]: """ Запрашивает расписание у Яндекс.Расписания. Возвращает список рейсов (arrivals + departures). """ url = "https://api.rasp.yandex.net/v3.0/schedule/" params = { "apikey": config.YANDEX_RASP_API_KEY, "station": yandex_code, "date": target_date.isoformat(), "transport_types": "plane", } response = requests.get(url, params=params, timeout=30) response.raise_for_status() data = response.json() flights = [] # Arrivals for item in data.get("schedule", []): if item.get("thread", {}).get("transport_subtype", {}).get("code") != "plane": continue direction = "arrival" if item.get("is_fuzzy") else "departure" # Яндекс API не различает явно arrival/departure в одном запросе, # нужно делать два запроса: event=arrival и event=departure # Упрощение: парсим thread.title для определения направления flight = parse_yandex_flight(item, direction) if flight: flights.append(flight) return flights ``` ### 2.3 Парсинг ответа Яндекс ```python def parse_yandex_flight(item: Dict, direction: str) -> Dict: """ Преобразует элемент Яндекс API в структуру для БД. """ thread = item.get("thread", {}) # Номер рейса flight_number = thread.get("number", "") if not flight_number: return None # Авиакомпания carrier = thread.get("carrier", {}) airline_iata = carrier.get("code") airline_name = carrier.get("title") # Маршрут (из thread.title, например "Москва — Санкт-Петербург") title = thread.get("title", "") origin_iata, destination_iata = parse_route_from_title(title) # Время scheduled_at = item.get("departure") or item.get("arrival") if not scheduled_at: return None return { "flight_number": flight_number, "airline_iata": airline_iata, "airline_name": airline_name, "origin_iata": origin_iata, "destination_iata": destination_iata, "aircraft_type": thread.get("vehicle"), "scheduled_at": scheduled_at, "status": "scheduled", # Яндекс не даёт статус в расписании "source": "yandex", } def parse_route_from_title(title: str) -> tuple: """ Извлекает origin/destination из строки "Москва — Санкт-Петербург". Возвращает (None, None) если не удалось распарсить. """ # Упрощённая логика, в реальности нужен маппинг город → IATA parts = title.split("—") if len(parts) == 2: origin = parts[0].strip() destination = parts[1].strip() # TODO: маппинг город → IATA через справочник return (None, None) # пока не реализовано return (None, None) ``` ### 2.4 Upsert в БД ```python def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: date) -> int: """ Вставляет или обновляет рейсы в fr24_ext.schedule. Возвращает количество обработанных записей. """ if not flights: return 0 with conn.cursor() as cur: for flight in flights: cur.execute(""" INSERT INTO fr24_ext.schedule (flight_date, airport_iata, direction, flight_number, airline_iata, airline_name, origin_iata, destination_iata, aircraft_type, scheduled_at, status, source) VALUES (%(flight_date)s, %(airport_iata)s, %(direction)s, %(flight_number)s, %(airline_iata)s, %(airline_name)s, %(origin_iata)s, %(destination_iata)s, %(aircraft_type)s, %(scheduled_at)s, %(status)s, %(source)s) ON CONFLICT (flight_number, airport_iata, scheduled_at, direction) DO UPDATE SET airline_name = EXCLUDED.airline_name, status = EXCLUDED.status, fetched_at = now() """, { "flight_date": flight_date, "airport_iata": airport_iata, "direction": flight.get("direction", "departure"), "flight_number": flight["flight_number"], "airline_iata": flight.get("airline_iata"), "airline_name": flight.get("airline_name"), "origin_iata": flight.get("origin_iata"), "destination_iata": flight.get("destination_iata"), "aircraft_type": flight.get("aircraft_type"), "scheduled_at": flight["scheduled_at"], "status": flight.get("status", "scheduled"), "source": flight["source"], }) return len(flights) ``` --- ## 3. Модуль `opensky_worker.py` ### 3.1 Основная функция ```python def enrich_day(target_date: date, conn) -> int: """ Обогащает записи в fr24_ext.schedule данными из OpenSky (icao24). Возвращает количество обогащённых записей. """ total_enriched = 0 begin_ts = int(datetime.combine(target_date, datetime.min.time()).timestamp()) end_ts = begin_ts + 86400 # +24 часа for airport_iata, airport_info in config.AIRPORTS.items(): icao = airport_info["icao"] log.info(f"Enriching OpenSky data for {airport_iata} ({icao}) on {target_date}") try: # Arrivals arrivals = fetch_opensky_flights(icao, begin_ts, end_ts, "arrival") enriched_arr = enrich_flights(conn, arrivals, airport_iata, "arrival") time.sleep(config.OPENSKY_RATE_LIMIT_SEC) # Departures departures = fetch_opensky_flights(icao, begin_ts, end_ts, "departure") enriched_dep = enrich_flights(conn, departures, airport_iata, "departure") total_enriched += enriched_arr + enriched_dep log.info(f"{airport_iata}: {enriched_arr} arrivals, {enriched_dep} departures enriched") except Exception as e: log.error(f"Failed to enrich {airport_iata}: {e}") time.sleep(config.OPENSKY_RATE_LIMIT_SEC) conn.commit() return total_enriched ``` ### 3.2 Запрос к OpenSky API ```python def fetch_opensky_flights(icao: str, begin_ts: int, end_ts: int, direction: str) -> List[Dict]: """ Запрашивает arrivals или departures у OpenSky Network. """ endpoint = f"https://opensky-network.org/api/flights/{direction}" params = { "airport": icao, "begin": begin_ts, "end": end_ts, } auth = None if config.OPENSKY_USERNAME and config.OPENSKY_PASSWORD: auth = (config.OPENSKY_USERNAME, config.OPENSKY_PASSWORD) response = requests.get(endpoint, params=params, auth=auth, timeout=60) response.raise_for_status() return response.json() # список словарей ``` ### 3.3 Обогащение записей ```python def enrich_flights(conn, opensky_flights: List[Dict], airport_iata: str, direction: str) -> int: """ Обновляет icao24 и actual_at в существующих записях fr24_ext.schedule. """ if not opensky_flights: return 0 enriched_count = 0 with conn.cursor() as cur: for flight in opensky_flights: icao24 = flight.get("icao24") callsign = flight.get("callsign", "").strip() if not icao24 or not callsign: continue # Фактическое время прилёта/вылета actual_ts = flight.get("lastSeen") or flight.get("firstSeen") if not actual_ts: continue actual_at = datetime.fromtimestamp(actual_ts) # Матчим по callsign (номер рейса) и времени (±2 часа от scheduled) cur.execute(""" UPDATE fr24_ext.schedule SET icao24 = %s, actual_at = %s, source = CASE WHEN source = 'yandex' THEN 'merged' ELSE source END WHERE airport_iata = %s AND direction = %s AND flight_number = %s AND scheduled_at BETWEEN %s - INTERVAL '2 hours' AND %s + INTERVAL '2 hours' AND icao24 IS NULL """, (icao24, actual_at, airport_iata, direction, callsign, actual_at, actual_at)) if cur.rowcount > 0: enriched_count += cur.rowcount return enriched_count ``` --- ## 4. Модуль `backfill.py` ### 4.1 CLI интерфейс ```python import argparse from datetime import date, timedelta import psycopg2 from yandex_worker import fetch_day as yandex_fetch_day from opensky_worker import enrich_day as opensky_enrich_day from config import config import logging log = logging.getLogger(__name__) def main(): parser = argparse.ArgumentParser(description="Backfill schedule data") parser.add_argument("--start-date", required=True, help="YYYY-MM-DD") parser.add_argument("--end-date", required=True, help="YYYY-MM-DD") parser.add_argument("--skip-opensky", action="store_true", help="Skip OpenSky enrichment") args = parser.parse_args() start = date.fromisoformat(args.start_date) end = date.fromisoformat(args.end_date) conn = psycopg2.connect( host=config.DB_HOST, port=config.DB_PORT, dbname=config.DB_NAME, user=config.DB_USER, password=config.DB_PASSWORD, ) # Загрузить состояние last_loaded = load_state(conn, "backfill_last_date") if last_loaded: start = max(start, date.fromisoformat(last_loaded) + timedelta(days=1)) log.info(f"Resuming from {start}") current = start while current <= end: log.info(f"Processing {current}") try: # Яндекс yandex_count = yandex_fetch_day(current, conn) log.info(f"Yandex: {yandex_count} flights") # OpenSky if not args.skip_opensky: opensky_count = opensky_enrich_day(current, conn) log.info(f"OpenSky: {opensky_count} enriched") # Сохранить прогресс save_state(conn, "backfill_last_date", current.isoformat()) except Exception as e: log.error(f"Failed on {current}: {e}") break current += timedelta(days=1) conn.close() log.info("Backfill complete") def load_state(conn, key: str) -> str: with conn.cursor() as cur: cur.execute("SELECT state_value->>'last_date' FROM fr24_ext.load_state WHERE state_key = %s", (key,)) row = cur.fetchone() return row[0] if row else None def save_state(conn, key: str, value: str): with conn.cursor() as cur: cur.execute(""" INSERT INTO fr24_ext.load_state (state_key, state_value) VALUES (%s, %s::jsonb) ON CONFLICT (state_key) DO UPDATE SET state_value = EXCLUDED.state_value, updated_at = now() """, (key, f'{{"last_date": "{value}"}}')) conn.commit() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) main() ``` --- ## 5. Frontend API (`frontend/main.py`) ### 5.1 Endpoint `/api/schedule/data` ```python from flask import Flask, request, jsonify import psycopg2 from config import config app = Flask(__name__) @app.route("/api/schedule/data") def get_schedule_data(): # Параметры date_from = request.args.get("date_from") date_to = request.args.get("date_to") airport = request.args.get("airport", "all") direction = request.args.get("direction", "all") flight_number = request.args.get("flight_number", "") time_from = request.args.get("time_from") # HH:MM time_to = request.args.get("time_to") limit = int(request.args.get("limit", 100)) offset = int(request.args.get("offset", 0)) # Построить WHERE where_clauses = [] params = [] if date_from: where_clauses.append("flight_date >= %s") params.append(date_from) if date_to: where_clauses.append("flight_date <= %s") params.append(date_to) if airport != "all": where_clauses.append("airport_iata = %s") params.append(airport) if direction != "all": where_clauses.append("direction = %s") params.append(direction) if flight_number: where_clauses.append("flight_number ILIKE %s") params.append(f"%{flight_number}%") if time_from: where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) >= %s") h, m = map(int, time_from.split(":")) params.append(h * 60 + m) if time_to: where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) <= %s") h, m = map(int, time_to.split(":")) params.append(h * 60 + m) where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" conn = psycopg2.connect(...) cur = conn.cursor() # Count cur.execute(f"SELECT COUNT(*) FROM fr24_ext.schedule WHERE {where_sql}", params) total = cur.fetchone()[0] # Data cur.execute(f""" SELECT flight_number, airline_name, airport_iata, direction, origin_iata, destination_iata, scheduled_at, actual_at, status, icao24 FROM fr24_ext.schedule WHERE {where_sql} ORDER BY scheduled_at DESC LIMIT %s OFFSET %s """, params + [limit, offset]) rows = cur.fetchall() flights = [] for row in rows: scheduled = row[6] actual = row[7] delay_min = int((actual - scheduled).total_seconds() / 60) if actual else None flights.append({ "flight_number": row[0], "airline": row[1], "airport": row[2], "direction": row[3], "origin": row[4], "destination": row[5], "scheduled_at": scheduled.isoformat(), "actual_at": actual.isoformat() if actual else None, "delay_min": delay_min, "status": row[8], "icao24": row[9], }) conn.close() return jsonify({"total": total, "flights": flights}) ``` ### 5.2 Endpoint `/api/schedule/export` ```python import csv from io import StringIO from flask import Response @app.route("/api/schedule/export") def export_schedule(): # Те же фильтры что у /data # ... (код аналогичен get_schedule_data) # Запрос без LIMIT/OFFSET cur.execute(f"SELECT ... FROM fr24_ext.schedule WHERE {where_sql} ORDER BY scheduled_at DESC", params) rows = cur.fetchall() # CSV output = StringIO() writer = csv.writer(output) writer.writerow(["Flight", "Airline", "Airport", "Direction", "Route", "Scheduled", "Actual", "Delay (min)", "Status"]) for row in rows: writer.writerow([ row[0], # flight_number row[1], # airline_name row[2], # airport_iata row[3], # direction f"{row[4]} → {row[5]}", # route row[6].isoformat(), # scheduled_at row[7].isoformat() if row[7] else "", # actual_at int((row[7] - row[6]).total_seconds() / 60) if row[7] else "", # delay row[8], # status ]) output.seek(0) return Response( output.getvalue(), mimetype="text/csv", headers={"Content-Disposition": "attachment; filename=schedule.csv"} ) ``` --- ## 6. Обработка ошибок ### 6.1 Retry логика ```python import time from functools import wraps def retry_on_error(max_retries=3, delay=5): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except requests.RequestException as e: if attempt < max_retries - 1: log.warning(f"Retry {attempt+1}/{max_retries} after error: {e}") time.sleep(delay * (attempt + 1)) else: raise return wrapper return decorator @retry_on_error(max_retries=3, delay=5) def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]: # ... (код запроса) ``` ### 6.2 Логирование ```python import logging import sys def setup_logging(): logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler(sys.stdout)] ) # В main.py if __name__ == "__main__": setup_logging() ``` --- ## 7. Docker ### 7.1 Dockerfile ```dockerfile FROM python:3.11-slim WORKDIR /app # Dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Code COPY *.py ./ # Healthcheck HEALTHCHECK --interval=60s --timeout=5s --start-period=10s \ CMD python -c "import requests; requests.get('http://localhost:8000/health', timeout=3).raise_for_status()" CMD ["python", "main.py"] ``` ### 7.2 requirements.txt ``` requests==2.31.0 psycopg2-binary==2.9.9 APScheduler==3.10.4 Flask==3.0.0 ``` ### 7.3 docker-compose.yml (фрагмент) ```yaml services: schedule: build: ./ingest/schedule container_name: fr24-schedule environment: POSTGRES_HOST: fr24-postgres POSTGRES_DB: fr24 POSTGRES_USER: fr24 POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} YANDEX_RASP_API_KEY: ${YANDEX_RASP_API_KEY} OPENSKY_USERNAME: ${OPENSKY_USERNAME} OPENSKY_PASSWORD: ${OPENSKY_PASSWORD} SCHEDULE_RETENTION_DAYS: 1095 depends_on: fr24-postgres: condition: service_healthy restart: unless-stopped networks: - fr24-network ``` --- ## 8. Retention (автоочистка) ### 8.1 Cron job в main.py ```python from apscheduler.schedulers.background import BackgroundScheduler def cleanup_old_data(conn): """Удаляет данные старше RETENTION_DAYS.""" with conn.cursor() as cur: cur.execute(""" DELETE FROM fr24_ext.schedule WHERE flight_date < CURRENT_DATE - INTERVAL '%s days' """, (config.RETENTION_DAYS,)) deleted = cur.rowcount conn.commit() log.info(f"Cleanup: {deleted} old records deleted") # В main.py scheduler = BackgroundScheduler() scheduler.add_job(cleanup_old_data, 'cron', hour=3, minute=0, args=[conn]) scheduler.start() ``` --- Это детальная спецификация готова для передачи Dev-агенту.