auto-sync: 2026-04-20 13:00:01

This commit is contained in:
Stream
2026-04-20 13:00:01 +03:00
parent 882fc0fae2
commit 35bb8c2fa4
2 changed files with 740 additions and 0 deletions

View File

@@ -0,0 +1,730 @@
# Детальная спецификация — Фаза 2, Шаг 1
Дополнение к `PHASE2_STEP1_EXTERNAL_DATA.md` — детальная логика модулей для Dev-агента.
---
## 1. Архитектура контейнера `fr24-schedule`
### 1.1 Структура процессов
```
main.py (supervisor)
├─ cron scheduler (APScheduler)
│ └─ daily_job() @ 02:00 UTC
│ ├─ yandex_worker.fetch_day(date)
│ └─ opensky_worker.enrich_day(date)
└─ healthcheck endpoint (Flask, port 8000)
└─ GET /health → {"status": "ok", "last_run": "..."}
```
### 1.2 Конфигурация (config.py)
```python
import os
from dataclasses import dataclass
@dataclass
class Config:
# Database
DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres")
DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432"))
DB_NAME: str = os.getenv("POSTGRES_DB", "fr24")
DB_USER: str = os.getenv("POSTGRES_USER", "fr24")
DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD")
# API keys
YANDEX_RASP_API_KEY: str = os.getenv("YANDEX_RASP_API_KEY")
OPENSKY_USERNAME: str = os.getenv("OPENSKY_USERNAME", "")
OPENSKY_PASSWORD: str = os.getenv("OPENSKY_PASSWORD", "")
# Airports
AIRPORTS = {
"SVO": {"iata": "SVO", "icao": "UUEE", "yandex_code": "s9600213", "name": "Шереметьево"},
"DME": {"iata": "DME", "icao": "UUDD", "yandex_code": "s9600366", "name": "Домодедово"},
"VKO": {"iata": "VKO", "icao": "UUWW", "yandex_code": "s9600215", "name": "Внуково"},
"ZIA": {"iata": "ZIA", "icao": "UUBW", "yandex_code": "s9881291", "name": "Жуковский"},
}
# Rate limits
YANDEX_RATE_LIMIT_SEC: float = 1.0 # пауза между запросами
OPENSKY_RATE_LIMIT_SEC: float = 30.0
# Retention
RETENTION_DAYS: int = int(os.getenv("SCHEDULE_RETENTION_DAYS", "1095")) # 3 года
# Cron schedule
DAILY_RUN_HOUR: int = 2 # UTC
DAILY_RUN_MINUTE: int = 0
config = Config()
```
---
## 2. Модуль `yandex_worker.py`
### 2.1 Основная функция
```python
import requests
import time
from datetime import datetime, date
from typing import List, Dict
import psycopg2
from config import config
import logging
log = logging.getLogger(__name__)
def fetch_day(target_date: date, conn) -> int:
"""
Загружает расписание за один день для всех аэропортов.
Возвращает количество загруженных рейсов.
"""
total_flights = 0
for airport_iata, airport_info in config.AIRPORTS.items():
log.info(f"Fetching Yandex schedule for {airport_iata} on {target_date}")
try:
flights = fetch_airport_schedule(
yandex_code=airport_info["yandex_code"],
target_date=target_date
)
inserted = upsert_flights(conn, flights, airport_iata, target_date)
total_flights += inserted
log.info(f"{airport_iata}: {inserted} flights upserted")
except Exception as e:
log.error(f"Failed to fetch {airport_iata}: {e}")
# Продолжаем с другими аэропортами
time.sleep(config.YANDEX_RATE_LIMIT_SEC)
conn.commit()
return total_flights
```
### 2.2 Запрос к Яндекс API
```python
def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]:
"""
Запрашивает расписание у Яндекс.Расписания.
Возвращает список рейсов (arrivals + departures).
"""
url = "https://api.rasp.yandex.net/v3.0/schedule/"
params = {
"apikey": config.YANDEX_RASP_API_KEY,
"station": yandex_code,
"date": target_date.isoformat(),
"transport_types": "plane",
}
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
flights = []
# Arrivals
for item in data.get("schedule", []):
if item.get("thread", {}).get("transport_subtype", {}).get("code") != "plane":
continue
direction = "arrival" if item.get("is_fuzzy") else "departure"
# Яндекс API не различает явно arrival/departure в одном запросе,
# нужно делать два запроса: event=arrival и event=departure
# Упрощение: парсим thread.title для определения направления
flight = parse_yandex_flight(item, direction)
if flight:
flights.append(flight)
return flights
```
### 2.3 Парсинг ответа Яндекс
```python
def parse_yandex_flight(item: Dict, direction: str) -> Dict:
"""
Преобразует элемент Яндекс API в структуру для БД.
"""
thread = item.get("thread", {})
# Номер рейса
flight_number = thread.get("number", "")
if not flight_number:
return None
# Авиакомпания
carrier = thread.get("carrier", {})
airline_iata = carrier.get("code")
airline_name = carrier.get("title")
# Маршрут (из thread.title, например "Москва — Санкт-Петербург")
title = thread.get("title", "")
origin_iata, destination_iata = parse_route_from_title(title)
# Время
scheduled_at = item.get("departure") or item.get("arrival")
if not scheduled_at:
return None
return {
"flight_number": flight_number,
"airline_iata": airline_iata,
"airline_name": airline_name,
"origin_iata": origin_iata,
"destination_iata": destination_iata,
"aircraft_type": thread.get("vehicle"),
"scheduled_at": scheduled_at,
"status": "scheduled", # Яндекс не даёт статус в расписании
"source": "yandex",
}
def parse_route_from_title(title: str) -> tuple:
"""
Извлекает origin/destination из строки "Москва — Санкт-Петербург".
Возвращает (None, None) если не удалось распарсить.
"""
# Упрощённая логика, в реальности нужен маппинг город → IATA
parts = title.split("")
if len(parts) == 2:
origin = parts[0].strip()
destination = parts[1].strip()
# TODO: маппинг город → IATA через справочник
return (None, None) # пока не реализовано
return (None, None)
```
### 2.4 Upsert в БД
```python
def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: date) -> int:
"""
Вставляет или обновляет рейсы в fr24_ext.schedule.
Возвращает количество обработанных записей.
"""
if not flights:
return 0
with conn.cursor() as cur:
for flight in flights:
cur.execute("""
INSERT INTO fr24_ext.schedule
(flight_date, airport_iata, direction, flight_number,
airline_iata, airline_name, origin_iata, destination_iata,
aircraft_type, scheduled_at, status, source)
VALUES
(%(flight_date)s, %(airport_iata)s, %(direction)s, %(flight_number)s,
%(airline_iata)s, %(airline_name)s, %(origin_iata)s, %(destination_iata)s,
%(aircraft_type)s, %(scheduled_at)s, %(status)s, %(source)s)
ON CONFLICT (flight_number, airport_iata, scheduled_at, direction)
DO UPDATE SET
airline_name = EXCLUDED.airline_name,
status = EXCLUDED.status,
fetched_at = now()
""", {
"flight_date": flight_date,
"airport_iata": airport_iata,
"direction": flight.get("direction", "departure"),
"flight_number": flight["flight_number"],
"airline_iata": flight.get("airline_iata"),
"airline_name": flight.get("airline_name"),
"origin_iata": flight.get("origin_iata"),
"destination_iata": flight.get("destination_iata"),
"aircraft_type": flight.get("aircraft_type"),
"scheduled_at": flight["scheduled_at"],
"status": flight.get("status", "scheduled"),
"source": flight["source"],
})
return len(flights)
```
---
## 3. Модуль `opensky_worker.py`
### 3.1 Основная функция
```python
def enrich_day(target_date: date, conn) -> int:
"""
Обогащает записи в fr24_ext.schedule данными из OpenSky (icao24).
Возвращает количество обогащённых записей.
"""
total_enriched = 0
begin_ts = int(datetime.combine(target_date, datetime.min.time()).timestamp())
end_ts = begin_ts + 86400 # +24 часа
for airport_iata, airport_info in config.AIRPORTS.items():
icao = airport_info["icao"]
log.info(f"Enriching OpenSky data for {airport_iata} ({icao}) on {target_date}")
try:
# Arrivals
arrivals = fetch_opensky_flights(icao, begin_ts, end_ts, "arrival")
enriched_arr = enrich_flights(conn, arrivals, airport_iata, "arrival")
time.sleep(config.OPENSKY_RATE_LIMIT_SEC)
# Departures
departures = fetch_opensky_flights(icao, begin_ts, end_ts, "departure")
enriched_dep = enrich_flights(conn, departures, airport_iata, "departure")
total_enriched += enriched_arr + enriched_dep
log.info(f"{airport_iata}: {enriched_arr} arrivals, {enriched_dep} departures enriched")
except Exception as e:
log.error(f"Failed to enrich {airport_iata}: {e}")
time.sleep(config.OPENSKY_RATE_LIMIT_SEC)
conn.commit()
return total_enriched
```
### 3.2 Запрос к OpenSky API
```python
def fetch_opensky_flights(icao: str, begin_ts: int, end_ts: int, direction: str) -> List[Dict]:
"""
Запрашивает arrivals или departures у OpenSky Network.
"""
endpoint = f"https://opensky-network.org/api/flights/{direction}"
params = {
"airport": icao,
"begin": begin_ts,
"end": end_ts,
}
auth = None
if config.OPENSKY_USERNAME and config.OPENSKY_PASSWORD:
auth = (config.OPENSKY_USERNAME, config.OPENSKY_PASSWORD)
response = requests.get(endpoint, params=params, auth=auth, timeout=60)
response.raise_for_status()
return response.json() # список словарей
```
### 3.3 Обогащение записей
```python
def enrich_flights(conn, opensky_flights: List[Dict], airport_iata: str, direction: str) -> int:
"""
Обновляет icao24 и actual_at в существующих записях fr24_ext.schedule.
"""
if not opensky_flights:
return 0
enriched_count = 0
with conn.cursor() as cur:
for flight in opensky_flights:
icao24 = flight.get("icao24")
callsign = flight.get("callsign", "").strip()
if not icao24 or not callsign:
continue
# Фактическое время прилёта/вылета
actual_ts = flight.get("lastSeen") or flight.get("firstSeen")
if not actual_ts:
continue
actual_at = datetime.fromtimestamp(actual_ts)
# Матчим по callsign (номер рейса) и времени (±2 часа от scheduled)
cur.execute("""
UPDATE fr24_ext.schedule
SET icao24 = %s,
actual_at = %s,
source = CASE WHEN source = 'yandex' THEN 'merged' ELSE source END
WHERE airport_iata = %s
AND direction = %s
AND flight_number = %s
AND scheduled_at BETWEEN %s - INTERVAL '2 hours' AND %s + INTERVAL '2 hours'
AND icao24 IS NULL
""", (icao24, actual_at, airport_iata, direction, callsign, actual_at, actual_at))
if cur.rowcount > 0:
enriched_count += cur.rowcount
return enriched_count
```
---
## 4. Модуль `backfill.py`
### 4.1 CLI интерфейс
```python
import argparse
from datetime import date, timedelta
import psycopg2
from yandex_worker import fetch_day as yandex_fetch_day
from opensky_worker import enrich_day as opensky_enrich_day
from config import config
import logging
log = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(description="Backfill schedule data")
parser.add_argument("--start-date", required=True, help="YYYY-MM-DD")
parser.add_argument("--end-date", required=True, help="YYYY-MM-DD")
parser.add_argument("--skip-opensky", action="store_true", help="Skip OpenSky enrichment")
args = parser.parse_args()
start = date.fromisoformat(args.start_date)
end = date.fromisoformat(args.end_date)
conn = psycopg2.connect(
host=config.DB_HOST,
port=config.DB_PORT,
dbname=config.DB_NAME,
user=config.DB_USER,
password=config.DB_PASSWORD,
)
# Загрузить состояние
last_loaded = load_state(conn, "backfill_last_date")
if last_loaded:
start = max(start, date.fromisoformat(last_loaded) + timedelta(days=1))
log.info(f"Resuming from {start}")
current = start
while current <= end:
log.info(f"Processing {current}")
try:
# Яндекс
yandex_count = yandex_fetch_day(current, conn)
log.info(f"Yandex: {yandex_count} flights")
# OpenSky
if not args.skip_opensky:
opensky_count = opensky_enrich_day(current, conn)
log.info(f"OpenSky: {opensky_count} enriched")
# Сохранить прогресс
save_state(conn, "backfill_last_date", current.isoformat())
except Exception as e:
log.error(f"Failed on {current}: {e}")
break
current += timedelta(days=1)
conn.close()
log.info("Backfill complete")
def load_state(conn, key: str) -> str:
with conn.cursor() as cur:
cur.execute("SELECT state_value->>'last_date' FROM fr24_ext.load_state WHERE state_key = %s", (key,))
row = cur.fetchone()
return row[0] if row else None
def save_state(conn, key: str, value: str):
with conn.cursor() as cur:
cur.execute("""
INSERT INTO fr24_ext.load_state (state_key, state_value)
VALUES (%s, %s::jsonb)
ON CONFLICT (state_key) DO UPDATE SET state_value = EXCLUDED.state_value, updated_at = now()
""", (key, f'{{"last_date": "{value}"}}'))
conn.commit()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()
```
---
## 5. Frontend API (`frontend/main.py`)
### 5.1 Endpoint `/api/schedule/data`
```python
from flask import Flask, request, jsonify
import psycopg2
from config import config
app = Flask(__name__)
@app.route("/api/schedule/data")
def get_schedule_data():
# Параметры
date_from = request.args.get("date_from")
date_to = request.args.get("date_to")
airport = request.args.get("airport", "all")
direction = request.args.get("direction", "all")
flight_number = request.args.get("flight_number", "")
time_from = request.args.get("time_from") # HH:MM
time_to = request.args.get("time_to")
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
# Построить WHERE
where_clauses = []
params = []
if date_from:
where_clauses.append("flight_date >= %s")
params.append(date_from)
if date_to:
where_clauses.append("flight_date <= %s")
params.append(date_to)
if airport != "all":
where_clauses.append("airport_iata = %s")
params.append(airport)
if direction != "all":
where_clauses.append("direction = %s")
params.append(direction)
if flight_number:
where_clauses.append("flight_number ILIKE %s")
params.append(f"%{flight_number}%")
if time_from:
where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) >= %s")
h, m = map(int, time_from.split(":"))
params.append(h * 60 + m)
if time_to:
where_clauses.append("EXTRACT(HOUR FROM scheduled_at) * 60 + EXTRACT(MINUTE FROM scheduled_at) <= %s")
h, m = map(int, time_to.split(":"))
params.append(h * 60 + m)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
conn = psycopg2.connect(...)
cur = conn.cursor()
# Count
cur.execute(f"SELECT COUNT(*) FROM fr24_ext.schedule WHERE {where_sql}", params)
total = cur.fetchone()[0]
# Data
cur.execute(f"""
SELECT flight_number, airline_name, airport_iata, direction,
origin_iata, destination_iata, scheduled_at, actual_at, status, icao24
FROM fr24_ext.schedule
WHERE {where_sql}
ORDER BY scheduled_at DESC
LIMIT %s OFFSET %s
""", params + [limit, offset])
rows = cur.fetchall()
flights = []
for row in rows:
scheduled = row[6]
actual = row[7]
delay_min = int((actual - scheduled).total_seconds() / 60) if actual else None
flights.append({
"flight_number": row[0],
"airline": row[1],
"airport": row[2],
"direction": row[3],
"origin": row[4],
"destination": row[5],
"scheduled_at": scheduled.isoformat(),
"actual_at": actual.isoformat() if actual else None,
"delay_min": delay_min,
"status": row[8],
"icao24": row[9],
})
conn.close()
return jsonify({"total": total, "flights": flights})
```
### 5.2 Endpoint `/api/schedule/export`
```python
import csv
from io import StringIO
from flask import Response
@app.route("/api/schedule/export")
def export_schedule():
# Те же фильтры что у /data
# ... (код аналогичен get_schedule_data)
# Запрос без LIMIT/OFFSET
cur.execute(f"SELECT ... FROM fr24_ext.schedule WHERE {where_sql} ORDER BY scheduled_at DESC", params)
rows = cur.fetchall()
# CSV
output = StringIO()
writer = csv.writer(output)
writer.writerow(["Flight", "Airline", "Airport", "Direction", "Route", "Scheduled", "Actual", "Delay (min)", "Status"])
for row in rows:
writer.writerow([
row[0], # flight_number
row[1], # airline_name
row[2], # airport_iata
row[3], # direction
f"{row[4]}{row[5]}", # route
row[6].isoformat(), # scheduled_at
row[7].isoformat() if row[7] else "", # actual_at
int((row[7] - row[6]).total_seconds() / 60) if row[7] else "", # delay
row[8], # status
])
output.seek(0)
return Response(
output.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": "attachment; filename=schedule.csv"}
)
```
---
## 6. Обработка ошибок
### 6.1 Retry логика
```python
import time
from functools import wraps
def retry_on_error(max_retries=3, delay=5):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.RequestException as e:
if attempt < max_retries - 1:
log.warning(f"Retry {attempt+1}/{max_retries} after error: {e}")
time.sleep(delay * (attempt + 1))
else:
raise
return wrapper
return decorator
@retry_on_error(max_retries=3, delay=5)
def fetch_airport_schedule(yandex_code: str, target_date: date) -> List[Dict]:
# ... (код запроса)
```
### 6.2 Логирование
```python
import logging
import sys
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
# В main.py
if __name__ == "__main__":
setup_logging()
```
---
## 7. Docker
### 7.1 Dockerfile
```dockerfile
FROM python:3.11-slim
WORKDIR /app
# Dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Code
COPY *.py ./
# Healthcheck
HEALTHCHECK --interval=60s --timeout=5s --start-period=10s \
CMD python -c "import requests; requests.get('http://localhost:8000/health', timeout=3).raise_for_status()"
CMD ["python", "main.py"]
```
### 7.2 requirements.txt
```
requests==2.31.0
psycopg2-binary==2.9.9
APScheduler==3.10.4
Flask==3.0.0
```
### 7.3 docker-compose.yml (фрагмент)
```yaml
services:
schedule:
build: ./ingest/schedule
container_name: fr24-schedule
environment:
POSTGRES_HOST: fr24-postgres
POSTGRES_DB: fr24
POSTGRES_USER: fr24
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
YANDEX_RASP_API_KEY: ${YANDEX_RASP_API_KEY}
OPENSKY_USERNAME: ${OPENSKY_USERNAME}
OPENSKY_PASSWORD: ${OPENSKY_PASSWORD}
SCHEDULE_RETENTION_DAYS: 1095
depends_on:
fr24-postgres:
condition: service_healthy
restart: unless-stopped
networks:
- fr24-network
```
---
## 8. Retention (автоочистка)
### 8.1 Cron job в main.py
```python
from apscheduler.schedulers.background import BackgroundScheduler
def cleanup_old_data(conn):
"""Удаляет данные старше RETENTION_DAYS."""
with conn.cursor() as cur:
cur.execute("""
DELETE FROM fr24_ext.schedule
WHERE flight_date < CURRENT_DATE - INTERVAL '%s days'
""", (config.RETENTION_DAYS,))
deleted = cur.rowcount
conn.commit()
log.info(f"Cleanup: {deleted} old records deleted")
# В main.py
scheduler = BackgroundScheduler()
scheduler.add_job(cleanup_old_data, 'cron', hour=3, minute=0, args=[conn])
scheduler.start()
```
---
Это детальная спецификация готова для передачи Dev-агенту.

View File

@@ -136,6 +136,16 @@ fr24_ext.load_state (
## ТЗ для Dev-агента
**📋 Детальная спецификация:** `docs/PHASE2_STEP1_DETAILED_SPEC.md`
Детальная спецификация содержит:
- Архитектуру контейнера и структуру процессов
- Полный код модулей с обработкой ошибок
- Конфигурацию и переменные окружения
- Retry логику и rate limiting
- Docker setup и docker-compose интеграцию
- Retention и автоочистку
### Файлы для создания
```
tasks/flightradar24/ingest/schedule/