Files
wiki/tasks/flightradar24/docs/PHASE2_STEP1_DETAILED_SPEC.md
2026-04-20 13:00:01 +03:00

731 lines
23 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Детальная спецификация — Фаза 2, Шаг 1
Дополнение к `PHASE2_STEP1_EXTERNAL_DATA.md` — детальная логика модулей для Dev-агента.
---
## 1. Архитектура контейнера `fr24-schedule`
### 1.1 Структура процессов
```
main.py (supervisor)
├─ cron scheduler (APScheduler)
│ └─ daily_job() @ 02:00 UTC
│ ├─ yandex_worker.fetch_day(date)
│ └─ opensky_worker.enrich_day(date)
└─ healthcheck endpoint (Flask, port 8000)
└─ GET /health → {"status": "ok", "last_run": "..."}
```
### 1.2 Конфигурация (config.py)
```python
import os
from dataclasses import dataclass
@dataclass
class Config:
# Database
DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres")
DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432"))
DB_NAME: str = os.getenv("POSTGRES_DB", "fr24")
DB_USER: str = os.getenv("POSTGRES_USER", "fr24")
DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD")
# API keys
YANDEX_RASP_API_KEY: str = os.getenv("YANDEX_RASP_API_KEY")
OPENSKY_USERNAME: str = os.getenv("OPENSKY_USERNAME", "")
OPENSKY_PASSWORD: str = os.getenv("OPENSKY_PASSWORD", "")
# Airports
AIRPORTS = {
"SVO": {"iata": "SVO", "icao": "UUEE", "yandex_code": "s9600213", "name": "Шереметьево"},
"DME": {"iata": "DME", "icao": "UUDD", "yandex_code": "s9600366", "name": "Домодедово"},
"VKO": {"iata": "VKO", "icao": "UUWW", "yandex_code": "s9600215", "name": "Внуково"},
"ZIA": {"iata": "ZIA", "icao": "UUBW", "yandex_code": "s9881291", "name": "Жуковский"},
}
# Rate limits
YANDEX_RATE_LIMIT_SEC: float = 1.0 # пауза между запросами
OPENSKY_RATE_LIMIT_SEC: float = 30.0
# Retention
RETENTION_DAYS: int = int(os.getenv("SCHEDULE_RETENTION_DAYS", "1095")) # 3 года
# Cron schedule
DAILY_RUN_HOUR: int = 2 # UTC
DAILY_RUN_MINUTE: int = 0
config = Config()
```
---
## 2. Модуль `yandex_worker.py`
### 2.1 Основная функция
```python
import requests
import time
from datetime import datetime, date
from typing import List, Dict
import psycopg2
from config import config
import logging
log = logging.getLogger(__name__)
def fetch_day(target_date: date, conn) -> int:
"""
Загружает расписание за один день для всех аэропортов.
Возвращает количество загруженных рейсов.
"""
total_flights = 0
for airport_iata, airport_info in config.AIRPORTS.items():
log.info(f"Fetching Yandex schedule for {airport_iata} on {target_date}")
try:
flights = fetch_airport_schedule(
yandex_code=airport_info["yandex_code"],
target_date=target_date
)
inserted = upsert_flights(conn, flights, airport_iata, target_date)
total_flights += inserted
log.info(f"{airport_iata}: {inserted} flights upserted")
except Exception as e:
log.error(f"Failed to fetch {airport_iata}: {e}")
# Продолжаем с другими аэропортами
time.sleep(config.YANDEX_RATE_LIMIT_SEC)
conn.commit()
return total_flights
```
### 2.2 Запрос к Яндекс API
```python
def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]:
"""
Запрашивает расписание у Яндекс.Расписания.
Возвращает список рейсов (arrivals + departures).
"""
url = "https://api.rasp.yandex.net/v3.0/schedule/"
params = {
"apikey": config.YANDEX_RASP_API_KEY,
"station": yandex_code,
"date": target_date.isoformat(),
"transport_types": "plane",
}
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
flights = []
# Arrivals
for item in data.get("schedule", []):
if item.get("thread", {}).get("transport_subtype", {}).get("code") != "plane":
continue
direction = "arrival" if item.get("is_fuzzy") else "departure"
# Яндекс API не различает явно arrival/departure в одном запросе,
# нужно делать два запроса: event=arrival и event=departure
# Упрощение: парсим thread.title для определения направления
flight = parse_yandex_flight(item, direction)
if flight:
flights.append(flight)
return flights
```
### 2.3 Парсинг ответа Яндекс
```python
def parse_yandex_flight(item: Dict, direction: str) -> Dict:
"""
Преобразует элемент Яндекс API в структуру для БД.
"""
thread = item.get("thread", {})
# Номер рейса
flight_number = thread.get("number", "")
if not flight_number:
return None
# Авиакомпания
carrier = thread.get("carrier", {})
airline_iata = carrier.get("code")
airline_name = carrier.get("title")
# Маршрут (из thread.title, например "Москва — Санкт-Петербург")
title = thread.get("title", "")
origin_iata, destination_iata = parse_route_from_title(title)
# Время
scheduled_at = item.get("departure") or item.get("arrival")
if not scheduled_at:
return None
return {
"flight_number": flight_number,
"airline_iata": airline_iata,
"airline_name": airline_name,
"origin_iata": origin_iata,
"destination_iata": destination_iata,
"aircraft_type": thread.get("vehicle"),
"scheduled_at": scheduled_at,
"status": "scheduled", # Яндекс не даёт статус в расписании
"source": "yandex",
}
def parse_route_from_title(title: str) -> tuple:
"""
Извлекает origin/destination из строки "Москва — Санкт-Петербург".
Возвращает (None, None) если не удалось распарсить.
"""
# Упрощённая логика, в реальности нужен маппинг город → IATA
parts = title.split("")
if len(parts) == 2:
origin = parts[0].strip()
destination = parts[1].strip()
# TODO: маппинг город → IATA через справочник
return (None, None) # пока не реализовано
return (None, None)
```
### 2.4 Upsert в БД
```python
def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: date) -> int:
"""
Вставляет или обновляет рейсы в fr24_ext.schedule.
Возвращает количество обработанных записей.
"""
if not flights:
return 0
with conn.cursor() as cur:
for flight in flights:
cur.execute("""
INSERT INTO fr24_ext.schedule
(flight_date, airport_iata, direction, flight_number,
airline_iata, airline_name, origin_iata, destination_iata,
aircraft_type, scheduled_at, status, source)
VALUES
(%(flight_date)s, %(airport_iata)s, %(direction)s, %(flight_number)s,
%(airline_iata)s, %(airline_name)s, %(origin_iata)s, %(destination_iata)s,
%(aircraft_type)s, %(scheduled_at)s, %(status)s, %(source)s)
ON CONFLICT (flight_number, airport_iata, scheduled_at, direction)
DO UPDATE SET
airline_name = EXCLUDED.airline_name,
status = EXCLUDED.status,
fetched_at = now()
""", {
"flight_date": flight_date,
"airport_iata": airport_iata,
"direction": flight.get("direction", "departure"),
"flight_number": flight["flight_number"],
"airline_iata": flight.get("airline_iata"),
"airline_name": flight.get("airline_name"),
"origin_iata": flight.get("origin_iata"),
"destination_iata": flight.get("destination_iata"),
"aircraft_type": flight.get("aircraft_type"),
"scheduled_at": flight["scheduled_at"],
"status": flight.get("status", "scheduled"),
"source": flight["source"],
})
return len(flights)
```
---
## 3. Модуль `opensky_worker.py`
### 3.1 Основная функция
```python
def enrich_day(target_date: date, conn) -> int:
"""
Обогащает записи в fr24_ext.schedule данными из OpenSky (icao24).
Возвращает количество обогащённых записей.
"""
total_enriched = 0
begin_ts = int(datetime.combine(target_date, datetime.min.time()).timestamp())
end_ts = begin_ts + 86400 # +24 часа
for airport_iata, airport_info in config.AIRPORTS.items():
icao = airport_info["icao"]
log.info(f"Enriching OpenSky data for {airport_iata} ({icao}) on {target_date}")
try:
# Arrivals
arrivals = fetch_opensky_flights(icao, begin_ts, end_ts, "arrival")
enriched_arr = enrich_flights(conn, arrivals, airport_iata, "arrival")
time.sleep(config.OPENSKY_RATE_LIMIT_SEC)
# Departures
departures = fetch_opensky_flights(icao, begin_ts, end_ts, "departure")
enriched_dep = enrich_flights(conn, departures, airport_iata, "departure")
total_enriched += enriched_arr + enriched_dep
log.info(f"{airport_iata}: {enriched_arr} arrivals, {enriched_dep} departures enriched")
except Exception as e:
log.error(f"Failed to enrich {airport_iata}: {e}")
time.sleep(config.OPENSKY_RATE_LIMIT_SEC)
conn.commit()
return total_enriched
```
### 3.2 Запрос к OpenSky API
```python
def fetch_opensky_flights(icao: str, begin_ts: int, end_ts: int, direction: str) -> List[Dict]:
"""
Запрашивает arrivals или departures у OpenSky Network.
"""
endpoint = f"https://opensky-network.org/api/flights/{direction}"
params = {
"airport": icao,
"begin": begin_ts,
"end": end_ts,
}
auth = None
if config.OPENSKY_USERNAME and config.OPENSKY_PASSWORD:
auth = (config.OPENSKY_USERNAME, config.OPENSKY_PASSWORD)
response = requests.get(endpoint, params=params, auth=auth, timeout=60)
response.raise_for_status()
return response.json() # список словарей
```
### 3.3 Обогащение записей
```python
def enrich_flights(conn, opensky_flights: List[Dict], airport_iata: str, direction: str) -> int:
"""
Обновляет icao24 и actual_at в существующих записях fr24_ext.schedule.
"""
if not opensky_flights:
return 0
enriched_count = 0
with conn.cursor() as cur:
for flight in opensky_flights:
icao24 = flight.get("icao24")
callsign = flight.get("callsign", "").strip()
if not icao24 or not callsign:
continue
# Фактическое время прилёта/вылета
actual_ts = flight.get("lastSeen") or flight.get("firstSeen")
if not actual_ts:
continue
actual_at = datetime.fromtimestamp(actual_ts)
# Матчим по callsign (номер рейса) и времени (±2 часа от scheduled)
cur.execute("""
UPDATE fr24_ext.schedule
SET icao24 = %s,
actual_at = %s,
source = CASE WHEN source = 'yandex' THEN 'merged' ELSE source END
WHERE airport_iata = %s
AND direction = %s
AND flight_number = %s
AND scheduled_at BETWEEN %s - INTERVAL '2 hours' AND %s + INTERVAL '2 hours'
AND icao24 IS NULL
""", (icao24, actual_at, airport_iata, direction, callsign, actual_at, actual_at))
if cur.rowcount > 0:
enriched_count += cur.rowcount
return enriched_count
```
---
## 4. Модуль `backfill.py`
### 4.1 CLI интерфейс
```python
import argparse
from datetime import date, timedelta
import psycopg2
from yandex_worker import fetch_day as yandex_fetch_day
from opensky_worker import enrich_day as opensky_enrich_day
from config import config
import logging
log = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(description="Backfill schedule data")
parser.add_argument("--start-date", required=True, help="YYYY-MM-DD")
parser.add_argument("--end-date", required=True, help="YYYY-MM-DD")
parser.add_argument("--skip-opensky", action="store_true", help="Skip OpenSky enrichment")
args = parser.parse_args()
start = date.fromisoformat(args.start_date)
end = date.fromisoformat(args.end_date)
conn = psycopg2.connect(
host=config.DB_HOST,
port=config.DB_PORT,
dbname=config.DB_NAME,
user=config.DB_USER,
password=config.DB_PASSWORD,
)
# Загрузить состояние
last_loaded = load_state(conn, "backfill_last_date")
if last_loaded:
start = max(start, date.fromisoformat(last_loaded) + timedelta(days=1))
log.info(f"Resuming from {start}")
current = start
while current <= end:
log.info(f"Processing {current}")
try:
# Яндекс
yandex_count = yandex_fetch_day(current, conn)
log.info(f"Yandex: {yandex_count} flights")
# OpenSky
if not args.skip_opensky:
opensky_count = opensky_enrich_day(current, conn)
log.info(f"OpenSky: {opensky_count} enriched")
# Сохранить прогресс
save_state(conn, "backfill_last_date", current.isoformat())
except Exception as e:
log.error(f"Failed on {current}: {e}")
break
current += timedelta(days=1)
conn.close()
log.info("Backfill complete")
def load_state(conn, key: str) -> str:
with conn.cursor() as cur:
cur.execute("SELECT state_value->>'last_date' FROM fr24_ext.load_state WHERE state_key = %s", (key,))
row = cur.fetchone()
return row[0] if row else None
def save_state(conn, key: str, value: str):
with conn.cursor() as cur:
cur.execute("""
INSERT INTO fr24_ext.load_state (state_key, state_value)
VALUES (%s, %s::jsonb)
ON CONFLICT (state_key) DO UPDATE SET state_value = EXCLUDED.state_value, updated_at = now()
""", (key, f'{{"last_date": "{value}"}}'))
conn.commit()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()
```
---
## 5. Frontend API (`frontend/main.py`)
### 5.1 Endpoint `/api/schedule/data`
```python
from flask import Flask, request, jsonify
import psycopg2
from config import config
app = Flask(__name__)
@app.route("/api/schedule/data")
def get_schedule_data():
# Параметры
date_from = request.args.get("date_from")
date_to = request.args.get("date_to")
airport = request.args.get("airport", "all")
direction = request.args.get("direction", "all")
flight_number = request.args.get("flight_number", "")
time_from = request.args.get("time_from") # HH:MM
time_to = request.args.get("time_to")
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
# Построить WHERE
where_clauses = []
params = []
if date_from:
where_clauses.append("flight_date >= %s")
params.append(date_from)
if date_to:
where_clauses.append("flight_date <= %s")
params.append(date_to)
if airport != "all":
where_clauses.append("airport_iata = %s")
params.append(airport)
if direction != "all":
where_clauses.append("direction = %s")
params.append(direction)
if flight_number:
where_clauses.append("flight_number ILIKE %s")
params.append(f"%{flight_number}%")
if time_from:
where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) >= %s")
h, m = map(int, time_from.split(":"))
params.append(h * 60 + m)
if time_to:
where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) <= %s")
h, m = map(int, time_to.split(":"))
params.append(h * 60 + m)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
conn = psycopg2.connect(...)
cur = conn.cursor()
# Count
cur.execute(f"SELECT COUNT(*) FROM fr24_ext.schedule WHERE {where_sql}", params)
total = cur.fetchone()[0]
# Data
cur.execute(f"""
SELECT flight_number, airline_name, airport_iata, direction,
origin_iata, destination_iata, scheduled_at, actual_at, status, icao24
FROM fr24_ext.schedule
WHERE {where_sql}
ORDER BY scheduled_at DESC
LIMIT %s OFFSET %s
""", params + [limit, offset])
rows = cur.fetchall()
flights = []
for row in rows:
scheduled = row[6]
actual = row[7]
delay_min = int((actual - scheduled).total_seconds() / 60) if actual else None
flights.append({
"flight_number": row[0],
"airline": row[1],
"airport": row[2],
"direction": row[3],
"origin": row[4],
"destination": row[5],
"scheduled_at": scheduled.isoformat(),
"actual_at": actual.isoformat() if actual else None,
"delay_min": delay_min,
"status": row[8],
"icao24": row[9],
})
conn.close()
return jsonify({"total": total, "flights": flights})
```
### 5.2 Endpoint `/api/schedule/export`
```python
import csv
from io import StringIO
from flask import Response
@app.route("/api/schedule/export")
def export_schedule():
# Те же фильтры что у /data
# ... (код аналогичен get_schedule_data)
# Запрос без LIMIT/OFFSET
cur.execute(f"SELECT ... FROM fr24_ext.schedule WHERE {where_sql} ORDER BY scheduled_at DESC", params)
rows = cur.fetchall()
# CSV
output = StringIO()
writer = csv.writer(output)
writer.writerow(["Flight", "Airline", "Airport", "Direction", "Route", "Scheduled", "Actual", "Delay (min)", "Status"])
for row in rows:
writer.writerow([
row[0], # flight_number
row[1], # airline_name
row[2], # airport_iata
row[3], # direction
f"{row[4]}{row[5]}", # route
row[6].isoformat(), # scheduled_at
row[7].isoformat() if row[7] else "", # actual_at
int((row[7] - row[6]).total_seconds() / 60) if row[7] else "", # delay
row[8], # status
])
output.seek(0)
return Response(
output.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=schedule.csv"}
)
```
---
## 6. Обработка ошибок
### 6.1 Retry логика
```python
import time
from functools import wraps
def retry_on_error(max_retries=3, delay=5):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.RequestException as e:
if attempt < max_retries - 1:
log.warning(f"Retry {attempt+1}/{max_retries} after error: {e}")
time.sleep(delay * (attempt + 1))
else:
raise
return wrapper
return decorator
@retry_on_error(max_retries=3, delay=5)
def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]:
# ... (код запроса)
```
### 6.2 Логирование
```python
import logging
import sys
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
# В main.py
if __name__ == "__main__":
setup_logging()
```
---
## 7. Docker
### 7.1 Dockerfile
```dockerfile
FROM python:3.11-slim
WORKDIR /app
# Dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Code
COPY *.py ./
# Healthcheck
HEALTHCHECK --interval=60s --timeout=5s --start-period=10s \
CMD python -c "import requests; requests.get('http://localhost:8000/health', timeout=3).raise_for_status()"
CMD ["python", "main.py"]
```
### 7.2 requirements.txt
```
requests==2.31.0
psycopg2-binary==2.9.9
APScheduler==3.10.4
Flask==3.0.0
```
### 7.3 docker-compose.yml (фрагмент)
```yaml
services:
schedule:
build: ./ingest/schedule
container_name: fr24-schedule
environment:
POSTGRES_HOST: fr24-postgres
POSTGRES_DB: fr24
POSTGRES_USER: fr24
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
YANDEX_RASP_API_KEY: ${YANDEX_RASP_API_KEY}
OPENSKY_USERNAME: ${OPENSKY_USERNAME}
OPENSKY_PASSWORD: ${OPENSKY_PASSWORD}
SCHEDULE_RETENTION_DAYS: 1095
depends_on:
fr24-postgres:
condition: service_healthy
restart: unless-stopped
networks:
- fr24-network
```
---
## 8. Retention (автоочистка)
### 8.1 Cron job в main.py
```python
from apscheduler.schedulers.background import BackgroundScheduler
def cleanup_old_data(conn):
"""Удаляет данные старше RETENTION_DAYS."""
with conn.cursor() as cur:
cur.execute("""
DELETE FROM fr24_ext.schedule
WHERE flight_date < CURRENT_DATE - INTERVAL '%s days'
""", (config.RETENTION_DAYS,))
deleted = cur.rowcount
conn.commit()
log.info(f"Cleanup: {deleted} old records deleted")
# В main.py
scheduler = BackgroundScheduler()
scheduler.add_job(cleanup_old_data, 'cron', hour=3, minute=0, args=[conn])
scheduler.start()
```
---
Это детальная спецификация готова для передачи Dev-агенту.