Files
wiki/tasks/flightradar24/ingest/schedule/yandex_worker.py
2026-04-21 01:40:01 +03:00

407 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Yandex.Rasp API worker — loads airport schedule into fr24_ext.schedule.
Makes two requests per airport per day: event=departure and event=arrival.
Route parsing: Yandex /v3.0/schedule/ does not return from/to fields reliably.
Routes are extracted from thread.title (format: "Город1 — Город2") using
CITY_TO_IATA lookup table from city_iata.py.
Duration: for arrival direction Yandex provides both 'arrival' and 'departure'
fields, so duration_min is computed directly. After full upsert a SQL UPDATE
fills any remaining gaps via cross-join between departure/arrival records.
"""
import logging
import time
from datetime import date, datetime, timezone
from typing import Dict, List, Optional
from functools import wraps
import requests
import psycopg2
from config import config
from city_iata import CITY_TO_IATA
log = logging.getLogger(__name__)
YANDEX_URL = "https://api.rasp.yandex-net.ru/v3.0/schedule/"
# Separator used by Yandex in thread.title
_ROUTE_SEP = ""
# ── retry decorator ───────────────────────────────────────────────────────────
def retry(max_retries: int = 3, base_delay: float = 5.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.RequestException as e:
if attempt < max_retries - 1:
wait = base_delay * (2 ** attempt)
log.warning("Retry %d/%d after %.0fs: %s", attempt + 1, max_retries, wait, e)
time.sleep(wait)
else:
raise
return wrapper
return decorator
# ── route / IATA helpers ──────────────────────────────────────────────────────
def _lookup_city(city_name: str) -> Optional[str]:
"""Look up IATA code for a Russian city name. Returns None if not found."""
name = city_name.strip()
iata = CITY_TO_IATA.get(name)
if iata:
return iata
# Try case-insensitive match as fallback
name_lower = name.lower()
for key, val in CITY_TO_IATA.items():
if key.lower() == name_lower:
return val
return None
def _parse_route(title: str, airport_iata: str, direction: str) -> tuple[Optional[str], Optional[str]]:
"""
Parse origin/destination IATA codes from thread.title.
thread.title format: "Город1 — Город2"
- departure: airport_iata is origin, lookup(Город2) = destination
- arrival: lookup(Город1) = origin, airport_iata = destination
Returns (origin_iata, destination_iata). Either can be None if city not found.
"""
if not title or _ROUTE_SEP not in title:
log.debug("Cannot parse route from title: %r", title)
if direction == "departure":
return airport_iata, None
else:
return None, airport_iata
parts = title.split(_ROUTE_SEP, 1)
city_from = parts[0].strip()
city_to = parts[1].strip()
if direction == "departure":
origin_iata = airport_iata
destination_iata = _lookup_city(city_to)
if destination_iata is None:
log.debug("City not found in CITY_TO_IATA: %r (title: %r)", city_to, title)
else: # arrival
origin_iata = _lookup_city(city_from)
destination_iata = airport_iata
if origin_iata is None:
log.debug("City not found in CITY_TO_IATA: %r (title: %r)", city_from, title)
return origin_iata, destination_iata
def _parse_dt(value) -> Optional[datetime]:
"""Parse ISO timestamp string to datetime (timezone-aware)."""
if not value:
return None
if isinstance(value, datetime):
return value
try:
dt = datetime.fromisoformat(str(value))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (ValueError, TypeError):
return None
def _station_iata(station: Dict) -> Optional[str]:
"""Extract IATA code from station dict (legacy fallback, kept for reference)."""
codes = station.get("codes", {})
if isinstance(codes, dict):
iata = codes.get("iata")
if iata:
return iata[:3].upper()
code = station.get("code", "")
if code and not code.startswith("s"):
return code[:3].upper()
return None
# ── API fetch ─────────────────────────────────────────────────────────────────
@retry(max_retries=3, base_delay=5.0)
def _fetch_page(yandex_code: str, target_date: date, event: str, offset: int = 0) -> dict:
params = {
"apikey": config.YANDEX_RASP_API_KEY,
"station": yandex_code,
"date": target_date.isoformat(),
"transport_types": "plane",
"event": event,
"offset": offset,
"limit": 100,
"show_systems": "iata",
}
resp = requests.get(YANDEX_URL, params=params, timeout=30)
resp.raise_for_status()
return resp.json()
def fetch_airport_schedule(yandex_code: str, target_date: date, direction: str, airport_iata: str) -> List[Dict]:
"""
Fetches all flights for one airport/direction with pagination.
direction: 'departure' | 'arrival'
"""
flights = []
offset = 0
while True:
data = _fetch_page(yandex_code, target_date, event=direction, offset=offset)
items = data.get("schedule", [])
pagination = data.get("pagination", {})
for item in items:
flight = _parse_item(item, direction, airport_iata)
if flight:
flights.append(flight)
total = pagination.get("total", 0)
offset += len(items)
if offset >= total or not items:
break
time.sleep(config.YANDEX_RATE_LIMIT_SEC)
return flights
def _parse_item(item: Dict, direction: str, airport_iata: str) -> Optional[Dict]:
thread = item.get("thread", {})
flight_number = thread.get("number", "").strip()
if not flight_number:
return None
carrier = thread.get("carrier", {})
# Normalize flight number: keep only airline code + number (e.g. "SU 1234")
parts = flight_number.split()
if len(parts) >= 2:
flight_number = f"{parts[0]} {parts[1]}"
else:
flight_number = parts[0] if parts else flight_number
# Scheduled time: departure event → 'departure' field; arrival → 'arrival' field
scheduled_at = item.get("departure") if direction == "departure" else item.get("arrival")
if not scheduled_at:
scheduled_at = item.get("departure") or item.get("arrival")
if not scheduled_at:
return None
# ── Route: parse from thread.title ────────────────────────────────────────
title = thread.get("title", "")
origin_iata, destination_iata = _parse_route(title, airport_iata, direction)
# ── Duration: for arrival Yandex returns both 'arrival' and 'departure' ──
# thread.duration is in seconds (when present)
duration_min: Optional[float] = None
thread_duration = thread.get("duration")
if thread_duration is not None:
try:
duration_min = float(thread_duration) / 60.0
except (TypeError, ValueError):
pass
if duration_min is None and direction == "arrival":
dep_dt = _parse_dt(item.get("departure"))
# For arrival records, 'arrival' field is often None,
# but scheduled_at IS the arrival time in this airport
arr_dt = _parse_dt(item.get("arrival")) or scheduled_at
if dep_dt and arr_dt and arr_dt > dep_dt:
duration_min = (arr_dt - dep_dt).total_seconds() / 60.0
if duration_min is None and direction == "departure":
# For departure records, 'arrival' field is None,
# but we can try to find the matching arrival record later via SQL
pass
return {
"flight_number": flight_number,
"airline_iata": carrier.get("code"),
"airline_name": carrier.get("title"),
"origin_iata": origin_iata,
"destination_iata": destination_iata,
"aircraft_type": thread.get("vehicle"),
"duration_min": duration_min,
"scheduled_at": scheduled_at,
"direction": direction,
"status": "scheduled",
"source": "yandex",
"thread_title": title,
}
# ── DB upsert ─────────────────────────────────────────────────────────────────
def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: date) -> int:
if not flights:
return 0
with conn.cursor() as cur:
for flight in flights:
cur.execute(
"""
INSERT INTO fr24_ext.schedule
(flight_date, airport_iata, direction, flight_number,
airline_iata, airline_name, origin_iata, destination_iata,
aircraft_type, duration_min, scheduled_at, status, source, thread_title)
VALUES
(%(flight_date)s, %(airport_iata)s, %(direction)s, %(flight_number)s,
%(airline_iata)s, %(airline_name)s, %(origin_iata)s, %(destination_iata)s,
%(aircraft_type)s, %(duration_min)s, %(scheduled_at)s, %(status)s, %(source)s, %(thread_title)s)
ON CONFLICT (flight_number, airport_iata, scheduled_at, direction)
DO UPDATE SET
airline_name = EXCLUDED.airline_name,
origin_iata = COALESCE(EXCLUDED.origin_iata, fr24_ext.schedule.origin_iata),
destination_iata = COALESCE(EXCLUDED.destination_iata, fr24_ext.schedule.destination_iata),
aircraft_type = COALESCE(EXCLUDED.aircraft_type, fr24_ext.schedule.aircraft_type),
duration_min = COALESCE(EXCLUDED.duration_min, fr24_ext.schedule.duration_min),
thread_title = COALESCE(EXCLUDED.thread_title, fr24_ext.schedule.thread_title),
status = EXCLUDED.status,
fetched_at = now()
""",
{
"flight_date": flight_date,
"airport_iata": airport_iata,
"direction": flight["direction"],
"flight_number": flight["flight_number"],
"airline_iata": flight.get("airline_iata"),
"airline_name": flight.get("airline_name"),
"origin_iata": flight.get("origin_iata"),
"destination_iata": flight.get("destination_iata"),
"aircraft_type": flight.get("aircraft_type"),
"duration_min": flight.get("duration_min"),
"scheduled_at": flight["scheduled_at"],
"status": flight.get("status", "scheduled"),
"source": flight["source"],
"thread_title": flight.get("thread_title"),
},
)
return len(flights)
def fill_duration_from_crossmatch(conn, airport_iata: str, flight_date: date) -> int:
"""
SQL cross-join: match departure ↔ arrival records for same flight_number + date
and compute duration_min from the arrival record's scheduled_at minus
the departure record's scheduled_at.
This fills gaps left when Yandex doesn't return duration in the thread object
and when the arrival items don't carry a 'departure' timestamp.
Returns number of rows updated.
"""
with conn.cursor() as cur:
cur.execute(
"""
UPDATE fr24_ext.schedule AS dep
SET duration_min = ROUND(
EXTRACT(EPOCH FROM (arr.scheduled_at - dep.scheduled_at)) / 60.0
)
FROM fr24_ext.schedule AS arr
WHERE dep.flight_number = arr.flight_number
AND dep.flight_date = arr.flight_date
AND dep.direction = 'departure'
AND arr.direction = 'arrival'
AND dep.airport_iata != arr.airport_iata
AND dep.duration_min IS NULL
AND dep.flight_date = %(flight_date)s
-- sanity: flight must be 20 min 20 h
AND EXTRACT(EPOCH FROM (arr.scheduled_at - dep.scheduled_at)) BETWEEN 1200 AND 72000
""",
{"flight_date": flight_date},
)
updated = cur.rowcount
return updated
def update_durations(conn, flight_date: date) -> int:
"""
Global cross-join UPDATE: для всех аэропортов за flight_date сопоставляет
departure- и arrival-записи по flight_number + flight_date, заполняет
duration_min там, где оно ещё не задано.
В отличие от fill_duration_from_crossmatch() работает по всей дате сразу,
без фильтра по airport_iata, что позволяет подобрать пары
(аэропорт вылета) ↔ (аэропорт прилёта) независимо от того,
с какого аэропорта забирали расписание.
Returns number of rows updated.
"""
with conn.cursor() as cur:
cur.execute(
"""
UPDATE fr24_ext.schedule AS dep
SET duration_min = ROUND(
EXTRACT(EPOCH FROM (arr.scheduled_at - dep.scheduled_at)) / 60.0
)
FROM fr24_ext.schedule AS arr
WHERE dep.flight_number = arr.flight_number
AND dep.flight_date = arr.flight_date
AND dep.direction = 'departure'
AND arr.direction = 'arrival'
AND dep.duration_min IS NULL
AND dep.flight_date = %(flight_date)s
AND EXTRACT(EPOCH FROM (arr.scheduled_at - dep.scheduled_at)) BETWEEN 1200 AND 72000
""",
{"flight_date": flight_date},
)
updated = cur.rowcount
log.info("update_durations: %d rows updated for %s", updated, flight_date)
return updated
# ── main entry ────────────────────────────────────────────────────────────────
def fetch_day(target_date: date, conn) -> int:
"""Load schedule for all airports for one day. Returns total flights upserted."""
total = 0
for airport_iata, airport_info in config.AIRPORTS.items():
yandex_code = airport_info["yandex_code"]
log.info("Yandex: fetching %s (%s) for %s", airport_iata, yandex_code, target_date)
for direction in ("departure", "arrival"):
try:
flights = fetch_airport_schedule(
yandex_code, target_date, direction, airport_iata
)
count = upsert_flights(conn, flights, airport_iata, target_date)
total += count
log.info("Yandex: %s %s%d flights upserted", airport_iata, direction, count)
except Exception as e:
log.error("Yandex: failed %s %s: %s", airport_iata, direction, e)
time.sleep(config.YANDEX_RATE_LIMIT_SEC)
# Cross-match departure/arrival to fill missing duration_min
try:
updated = fill_duration_from_crossmatch(conn, airport_iata, target_date)
if updated:
log.info("Yandex: %s — filled duration_min for %d departure records", airport_iata, updated)
except Exception as e:
log.error("Yandex: duration crossmatch failed for %s: %s", airport_iata, e)
conn.commit()
# Final global pass: fill any remaining duration_min gaps across all airports
try:
updated = update_durations(conn, target_date)
conn.commit()
log.info("Yandex: update_durations → %d rows filled for %s", updated, target_date)
except Exception as e:
log.error("Yandex: update_durations failed for %s: %s", target_date, e)
return total