auto-sync: 2026-04-21 00:50:01

This commit is contained in:
Stream
2026-04-21 00:50:01 +03:00
parent 21c629464a
commit ff3cbd57ec
3 changed files with 550 additions and 35 deletions

View File

@@ -0,0 +1,335 @@
"""
CITY_TO_IATA — маппинг русских названий городов → IATA-коды аэропортов.
Используется для парсинга маршрутов из заголовков рейсов Яндекс.Расписаний.
Ключи: нижний регистр (сравнение делать через .lower().strip()).
"""
CITY_TO_IATA: dict[str, str] = {
# ── Россия ────────────────────────────────────────────────────────────────
"москва": "SVO", # Шереметьево (основной)
"москва (шереметьево)": "SVO",
"москва (домодедово)": "DME",
"москва (внуково)": "VKO",
"москва (жуковский)": "ZIA",
"шереметьево": "SVO",
"домодедово": "DME",
"внуково": "VKO",
"жуковский": "ZIA",
"санкт-петербург": "LED",
"петербург": "LED",
"питер": "LED",
"пулково": "LED",
"новосибирск": "OVB",
"екатеринбург": "SVX",
"казань": "KZN",
"сочи": "AER",
"адлер": "AER",
"краснодар": "KRR",
"уфа": "UFA",
"ростов-на-дону": "ROV",
"пермь": "PEE",
"воронеж": "VOZ",
"самара": "KUF",
"омск": "OMS",
"челябинск": "CEK",
"красноярск": "KJA",
"иркутск": "IKT",
"владивосток": "VVO",
"нижний новгород": "GOJ",
"хабаровск": "KHV",
"тюмень": "TJM",
"барнаул": "BAX",
"томск": "TOF",
"кемерово": "KEJ",
"новокузнецк": "NOZ",
"якутск": "YKS",
"магадан": "GDX",
"петропавловск-камчатский": "PKC",
"южно-сахалинск": "UUS",
"мурманск": "MMK",
"архангельск": "ARH",
"астрахань": "ASF",
"волгоград": "VOG",
"саратов": "RTW",
"оренбург": "REN",
"ставрополь": "STW",
"минеральные воды": "MRV",
"нальчик": "NAL",
"владикавказ": "OGZ",
"махачкала": "MCX",
"грозный": "GRV",
"симферополь": "SIP",
"калининград": "KGD",
"псков": "PKV",
"великий новгород": "NNV",
"вологда": "VGD",
"ярославль": "IAR",
"иваново": "IWA",
"тверь": "KLD",
"рязань": "RZN",
"липецк": "LPK",
"белгород": "EGO",
"курск": "URS",
"брянск": "BZK",
"смоленск": "LNX",
"тула": "TYA",
"калуга": "KLF",
"орёл": "OEL",
"тамбов": "TBW",
"пенза": "PEZ",
"ульяновск": "ULV",
"нижнекамск": "NBC",
"чебоксары": "CSY",
"йошкар-ола": "JOK",
"саранск": "SKX",
"киров": "KVX",
"сыктывкар": "SCW",
"ухта": "UCT",
"нарьян-мар": "NNM",
"воркута": "VKT",
"салехард": "SLY",
"ханты-мансийск": "HMA",
"сургут": "SGC",
"нижневартовск": "NJC",
"ноябрьск": "NOJ",
"новый уренгой": "NUX",
"надым": "NYM",
"нефтеюганск": "NFG",
"когалым": "KGP",
"стрежевой": "SWT",
"горно-алтайск": "RGK",
"абакан": "ABA",
"кызыл": "KYZ",
"улан-удэ": "UUD",
"чита": "HTA",
"благовещенск": "BQS",
"комсомольск-на-амуре": "KXK",
"николаевск-на-амуре": "NLI",
"магнитогорск": "MQF",
"нижний тагил": "NTL",
"курган": "KRO",
"тобольск": "TOX",
"ижевск": "IJK",
"нижнеудинск": "NER",
"братск": "BTK",
"усть-илимск": "UIK",
"бодайбо": "ODO",
"мирный": "MJZ",
"нерюнгри": "NER",
"анадырь": "DYR",
"провидения": "PVS",
"певек": "PWE",
"тикси": "IKS",
"черский": "CYX",
"хандыга": "HTG",
"усть-нера": "USR",
"сусуман": "SUY",
"сеймчан": "SEK",
"зырянка": "ZYR",
"среднеколымск": "SEK",
"беслан": "OGZ",
"элиста": "ESL",
"астрахань": "ASF",
"геленджик": "GDZ",
"анапа": "AAQ",
"темрюк": "TBW",
"ейск": "EIK",
"майкоп": "DJU",
# ── СНГ / постсоветское пространство ─────────────────────────────────────
"минск": "MSQ",
"алматы": "ALA",
"нур-султан": "NQZ",
"астана": "NQZ",
"ташкент": "TAS",
"баку": "GYD",
"ереван": "EVN",
"тбилиси": "TBS",
"бишкек": "FRU",
"душанбе": "DYU",
"ашхабад": "ASB",
"самарканд": "SKD",
"ургенч": "UGC",
"фергана": "FEG",
"андижан": "AZN",
"навои": "NVI",
"актобе": "AKX",
"актау": "SCO",
"атырау": "GUW",
"шымкент": "CIT",
"павлодар": "PWQ",
"усть-каменогорск": "UKK",
"семей": "PLX",
"костанай": "KSN",
"кокшетау": "KOV",
"тараз": "DMB",
"кызылорда": "KZO",
"уральск": "URA",
"туркестан": "HSA",
"киев": "KBP",
"одесса": "ODS",
"харьков": "HRK",
"львов": "LWO",
"кишинёв": "KIV",
"кишинев": "KIV",
"рига": "RIX",
"таллин": "TLL",
"вильнюс": "VNO",
"ашгабат": "ASB",
# ── Популярные зарубежные направления ─────────────────────────────────────
# Турция
"стамбул": "IST",
"анкара": "ESB",
"анталья": "AYT",
"анталия": "AYT",
"бодрум": "BJV",
"даламан": "DLM",
"измир": "ADB",
"трабзон": "TZX",
# ОАЭ
"дубай": "DXB",
"абу-даби": "AUH",
"шарджа": "SHJ",
# Египет
"каир": "CAI",
"хургада": "HRG",
"шарм-эль-шейх": "SSH",
"шарм эль шейх": "SSH",
# Таиланд
"бангкок": "BKK",
"пхукет": "HKT",
"самуи": "USM",
"паттайя": "UTP",
# Европа
"париж": "CDG",
"лондон": "LHR",
"берлин": "BER",
"рим": "FCO",
"милан": "MXP",
"барселона": "BCN",
"мадрид": "MAD",
"вена": "VIE",
"прага": "PRG",
"варшава": "WAW",
"амстердам": "AMS",
"брюссель": "BRU",
"цюрих": "ZRH",
"женева": "GVA",
"франкфурт": "FRA",
"мюнхен": "MUC",
"гамбург": "HAM",
"дюссельдорф": "DUS",
"стокгольм": "ARN",
"копенгаген": "CPH",
"осло": "OSL",
"хельсинки": "HEL",
"будапешт": "BUD",
"бухарест": "OTP",
"афины": "ATH",
"лиссабон": "LIS",
"дублин": "DUB",
"лимасол": "LCA",
"ларнака": "LCA",
"никосия": "LCA",
"белград": "BEG",
"загреб": "ZAG",
"любляна": "LJU",
"скопье": "SKP",
"тирана": "TIA",
"подгорица": "TGD",
"сараево": "SJJ",
"черногория": "TIV",
"тиват": "TIV",
"дубровник": "DBV",
"сплит": "SPU",
"пула": "PUY",
"задар": "ZAD",
"хорватия": "ZAG",
# Азия
"токио": "NRT",
"осака": "KIX",
"сеул": "ICN",
"пекин": "PEK",
"шанхай": "PVG",
"гонконг": "HKG",
"сингапур": "SIN",
"куала-лумпур": "KUL",
"джакарта": "CGK",
"бали": "DPS",
"манила": "MNL",
"ханой": "HAN",
"хошимин": "SGN",
"коломбо": "CMB",
"мале": "MLE",
"катманду": "KTM",
"дели": "DEL",
"мумбаи": "BOM",
"бомбей": "BOM",
"карачи": "KHI",
"лахор": "LHE",
"исламабад": "ISB",
"тегеран": "IKA",
"маскат": "MCT",
"доха": "DOH",
"кувейт": "KWI",
"амман": "AMM",
"бейрут": "BEY",
"тель-авив": "TLV",
"тель авив": "TLV",
# Африка
"найроби": "NBO",
"аддис-абеба": "ADD",
"йоханнесбург": "JNB",
"касабланка": "CMN",
"тунис": "TUN",
# Америка
"нью-йорк": "JFK",
"нью йорк": "JFK",
"лос-анджелес": "LAX",
"лос анджелес": "LAX",
"майами": "MIA",
"чикаго": "ORD",
"торонто": "YYZ",
"ванкувер": "YVR",
"монреаль": "YUL",
"мехико": "MEX",
"гавана": "HAV",
"канкун": "CUN",
"лима": "LIM",
"богота": "BOG",
"сан-паулу": "GRU",
"буэнос-айрес": "EZE",
"сантьяго": "SCL",
# Австралия/Океания
"сидней": "SYD",
"мельбурн": "MEL",
"перт": "PER",
"брисбен": "BNE",
# Дополнительные российские города
"читать": "HTA",
"нефтекамск": "UUA",
"березники": "PEE",
"стерлитамак": "UFA",
"тольятти": "KUF",
"рыбинск": "IAR",
"дзержинск": "GOJ",
"владимир": "VKO",
"кострома": "KMW",
"орск": "OSW",
"нижний тагил": "NTL",
"асбест": "SVX",
"нижневартовск": "NJC",
"мегион": "NFG",
"лангепас": "KGP",
"лысьва": "PEE",
"краснотурьинск": "SVX",
"серов": "SVX",
"первоуральск": "SVX",
"каменск-уральский": "SVX",
}
def get_iata(city_name: str) -> str | None:
"""Возвращает IATA-код по русскому названию города (регистронезависимо)."""
return CITY_TO_IATA.get(city_name.lower().strip())

View File

@@ -1,10 +1,18 @@
"""
Yandex.Rasp API worker — loads airport schedule into fr24_ext.schedule.
Makes two requests per airport per day: event=departure and event=arrival.
Route parsing: Yandex /v3.0/schedule/ does not return from/to fields reliably.
Routes are extracted from thread.title (format: "Город1 — Город2") using
CITY_TO_IATA lookup table from city_iata.py.
Duration: for arrival direction Yandex provides both 'arrival' and 'departure'
fields, so duration_min is computed directly. After full upsert a SQL UPDATE
fills any remaining gaps via cross-join between departure/arrival records.
"""
import logging
import time
from datetime import date
from datetime import date, datetime, timezone
from typing import Dict, List, Optional
from functools import wraps
@@ -12,11 +20,15 @@ import requests
import psycopg2
from config import config
from city_iata import CITY_TO_IATA
log = logging.getLogger(__name__)
YANDEX_URL = "https://api.rasp.yandex-net.ru/v3.0/schedule/"
# Separator used by Yandex in thread.title
_ROUTE_SEP = ""
# ── retry decorator ───────────────────────────────────────────────────────────
@@ -38,6 +50,85 @@ def retry(max_retries: int = 3, base_delay: float = 5.0):
return decorator
# ── route / IATA helpers ──────────────────────────────────────────────────────
def _lookup_city(city_name: str) -> Optional[str]:
"""Look up IATA code for a Russian city name. Returns None if not found."""
name = city_name.strip()
iata = CITY_TO_IATA.get(name)
if iata:
return iata
# Try case-insensitive match as fallback
name_lower = name.lower()
for key, val in CITY_TO_IATA.items():
if key.lower() == name_lower:
return val
return None
def _parse_route(title: str, airport_iata: str, direction: str) -> tuple[Optional[str], Optional[str]]:
"""
Parse origin/destination IATA codes from thread.title.
thread.title format: "Город1 — Город2"
- departure: airport_iata is origin, lookup(Город2) = destination
- arrival: lookup(Город1) = origin, airport_iata = destination
Returns (origin_iata, destination_iata). Either can be None if city not found.
"""
if not title or _ROUTE_SEP not in title:
log.debug("Cannot parse route from title: %r", title)
if direction == "departure":
return airport_iata, None
else:
return None, airport_iata
parts = title.split(_ROUTE_SEP, 1)
city_from = parts[0].strip()
city_to = parts[1].strip()
if direction == "departure":
origin_iata = airport_iata
destination_iata = _lookup_city(city_to)
if destination_iata is None:
log.debug("City not found in CITY_TO_IATA: %r (title: %r)", city_to, title)
else: # arrival
origin_iata = _lookup_city(city_from)
destination_iata = airport_iata
if origin_iata is None:
log.debug("City not found in CITY_TO_IATA: %r (title: %r)", city_from, title)
return origin_iata, destination_iata
def _parse_dt(value) -> Optional[datetime]:
"""Parse ISO timestamp string to datetime (timezone-aware)."""
if not value:
return None
if isinstance(value, datetime):
return value
try:
dt = datetime.fromisoformat(str(value))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (ValueError, TypeError):
return None
def _station_iata(station: Dict) -> Optional[str]:
"""Extract IATA code from station dict (legacy fallback, kept for reference)."""
codes = station.get("codes", {})
if isinstance(codes, dict):
iata = codes.get("iata")
if iata:
return iata[:3].upper()
code = station.get("code", "")
if code and not code.startswith("s"):
return code[:3].upper()
return None
# ── API fetch ─────────────────────────────────────────────────────────────────
@retry(max_retries=3, base_delay=5.0)
@@ -57,7 +148,7 @@ def _fetch_page(yandex_code: str, target_date: date, event: str, offset: int = 0
return resp.json()
def fetch_airport_schedule(yandex_code: str, target_date: date, direction: str) -> List[Dict]:
def fetch_airport_schedule(yandex_code: str, target_date: date, direction: str, airport_iata: str) -> List[Dict]:
"""
Fetches all flights for one airport/direction with pagination.
direction: 'departure' | 'arrival'
@@ -71,7 +162,7 @@ def fetch_airport_schedule(yandex_code: str, target_date: date, direction: str)
pagination = data.get("pagination", {})
for item in items:
flight = _parse_item(item, direction)
flight = _parse_item(item, direction, airport_iata)
if flight:
flights.append(flight)
@@ -86,7 +177,7 @@ def fetch_airport_schedule(yandex_code: str, target_date: date, direction: str)
return flights
def _parse_item(item: Dict, direction: str) -> Optional[Dict]:
def _parse_item(item: Dict, direction: str, airport_iata: str) -> Optional[Dict]:
thread = item.get("thread", {})
flight_number = thread.get("number", "").strip()
if not flight_number:
@@ -94,27 +185,39 @@ def _parse_item(item: Dict, direction: str) -> Optional[Dict]:
carrier = thread.get("carrier", {})
# Normalize flight number: Yandex returns "SU 1234 12345" (number + extra codes)
# Keep only first two tokens: airline code + flight number → "SU 1234"
# Normalize flight number: keep only airline code + number (e.g. "SU 1234")
parts = flight_number.split()
if len(parts) >= 2:
flight_number = f"{parts[0]} {parts[1]}"
else:
flight_number = parts[0] if parts else flight_number
# Scheduled time: departure event → use 'departure' field; arrival → 'arrival'
# Scheduled time: departure event → 'departure' field; arrival → 'arrival' field
scheduled_at = item.get("departure") if direction == "departure" else item.get("arrival")
if not scheduled_at:
# fallback
scheduled_at = item.get("departure") or item.get("arrival")
if not scheduled_at:
return None
# Route: from/to station objects
from_station = item.get("from", {}) or {}
to_station = item.get("to", {}) or {}
origin_iata = _station_iata(from_station)
destination_iata = _station_iata(to_station)
# ── Route: parse from thread.title ────────────────────────────────────────
title = thread.get("title", "")
origin_iata, destination_iata = _parse_route(title, airport_iata, direction)
# ── Duration: for arrival Yandex returns both 'arrival' and 'departure' ──
# thread.duration is in seconds (when present)
duration_min: Optional[float] = None
thread_duration = thread.get("duration")
if thread_duration is not None:
try:
duration_min = float(thread_duration) / 60.0
except (TypeError, ValueError):
pass
if duration_min is None and direction == "arrival":
dep_dt = _parse_dt(item.get("departure"))
arr_dt = _parse_dt(item.get("arrival"))
if dep_dt and arr_dt and arr_dt > dep_dt:
duration_min = (arr_dt - dep_dt).total_seconds() / 60.0
return {
"flight_number": flight_number,
@@ -123,7 +226,7 @@ def _parse_item(item: Dict, direction: str) -> Optional[Dict]:
"origin_iata": origin_iata,
"destination_iata": destination_iata,
"aircraft_type": thread.get("vehicle"),
"duration_min": thread.get("duration"),
"duration_min": duration_min,
"scheduled_at": scheduled_at,
"direction": direction,
"status": "scheduled",
@@ -131,20 +234,6 @@ def _parse_item(item: Dict, direction: str) -> Optional[Dict]:
}
def _station_iata(station: Dict) -> Optional[str]:
"""Extract IATA code from station dict (codes list or direct field)."""
codes = station.get("codes", {})
if isinstance(codes, dict):
iata = codes.get("iata")
if iata:
return iata[:3].upper()
# fallback: station code field
code = station.get("code", "")
if code and not code.startswith("s"): # yandex internal codes start with 's'
return code[:3].upper()
return None
# ── DB upsert ─────────────────────────────────────────────────────────────────
def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: date) -> int:
@@ -165,13 +254,13 @@ def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: da
%(aircraft_type)s, %(duration_min)s, %(scheduled_at)s, %(status)s, %(source)s)
ON CONFLICT (flight_number, airport_iata, scheduled_at, direction)
DO UPDATE SET
airline_name = EXCLUDED.airline_name,
origin_iata = COALESCE(EXCLUDED.origin_iata, fr24_ext.schedule.origin_iata),
airline_name = EXCLUDED.airline_name,
origin_iata = COALESCE(EXCLUDED.origin_iata, fr24_ext.schedule.origin_iata),
destination_iata = COALESCE(EXCLUDED.destination_iata, fr24_ext.schedule.destination_iata),
aircraft_type = COALESCE(EXCLUDED.aircraft_type, fr24_ext.schedule.aircraft_type),
duration_min = COALESCE(EXCLUDED.duration_min, fr24_ext.schedule.duration_min),
status = EXCLUDED.status,
fetched_at = now()
aircraft_type = COALESCE(EXCLUDED.aircraft_type, fr24_ext.schedule.aircraft_type),
duration_min = COALESCE(EXCLUDED.duration_min, fr24_ext.schedule.duration_min),
status = EXCLUDED.status,
fetched_at = now()
""",
{
"flight_date": flight_date,
@@ -192,6 +281,77 @@ def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: da
return len(flights)
def fill_duration_from_crossmatch(conn, airport_iata: str, flight_date: date) -> int:
"""
SQL cross-join: match departure ↔ arrival records for same flight_number + date
and compute duration_min from the arrival record's scheduled_at minus
the departure record's scheduled_at.
This fills gaps left when Yandex doesn't return duration in the thread object
and when the arrival items don't carry a 'departure' timestamp.
Returns number of rows updated.
"""
with conn.cursor() as cur:
cur.execute(
"""
UPDATE fr24_ext.schedule AS dep
SET duration_min = ROUND(
EXTRACT(EPOCH FROM (arr.scheduled_at - dep.scheduled_at)) / 60.0
)
FROM fr24_ext.schedule AS arr
WHERE dep.flight_number = arr.flight_number
AND dep.flight_date = arr.flight_date
AND dep.direction = 'departure'
AND arr.direction = 'arrival'
AND dep.airport_iata != arr.airport_iata
AND dep.duration_min IS NULL
AND dep.flight_date = %(flight_date)s
-- sanity: flight must be 20 min 20 h
AND EXTRACT(EPOCH FROM (arr.scheduled_at - dep.scheduled_at)) BETWEEN 1200 AND 72000
""",
{"flight_date": flight_date},
)
updated = cur.rowcount
return updated
def update_durations(conn, flight_date: date) -> int:
"""
Global cross-join UPDATE: для всех аэропортов за flight_date сопоставляет
departure- и arrival-записи по flight_number + flight_date, заполняет
duration_min там, где оно ещё не задано.
В отличие от fill_duration_from_crossmatch() работает по всей дате сразу,
без фильтра по airport_iata, что позволяет подобрать пары
(аэропорт вылета) ↔ (аэропорт прилёта) независимо от того,
с какого аэропорта забирали расписание.
Returns number of rows updated.
"""
with conn.cursor() as cur:
cur.execute(
"""
UPDATE fr24_ext.schedule AS dep
SET duration_min = ROUND(
EXTRACT(EPOCH FROM (arr.scheduled_at - dep.scheduled_at)) / 60.0
)
FROM fr24_ext.schedule AS arr
WHERE dep.flight_number = arr.flight_number
AND dep.flight_date = arr.flight_date
AND dep.direction = 'departure'
AND arr.direction = 'arrival'
AND dep.duration_min IS NULL
AND dep.flight_date = %(flight_date)s
AND EXTRACT(EPOCH FROM (arr.scheduled_at - dep.scheduled_at)) BETWEEN 1200 AND 72000
""",
{"flight_date": flight_date},
)
updated = cur.rowcount
log.info("update_durations: %d rows updated for %s", updated, flight_date)
return updated
# ── main entry ────────────────────────────────────────────────────────────────
def fetch_day(target_date: date, conn) -> int:
@@ -204,7 +364,9 @@ def fetch_day(target_date: date, conn) -> int:
for direction in ("departure", "arrival"):
try:
flights = fetch_airport_schedule(yandex_code, target_date, direction)
flights = fetch_airport_schedule(
yandex_code, target_date, direction, airport_iata
)
count = upsert_flights(conn, flights, airport_iata, target_date)
total += count
log.info("Yandex: %s %s%d flights upserted", airport_iata, direction, count)
@@ -213,5 +375,22 @@ def fetch_day(target_date: date, conn) -> int:
time.sleep(config.YANDEX_RATE_LIMIT_SEC)
# Cross-match departure/arrival to fill missing duration_min
try:
updated = fill_duration_from_crossmatch(conn, airport_iata, target_date)
if updated:
log.info("Yandex: %s — filled duration_min for %d departure records", airport_iata, updated)
except Exception as e:
log.error("Yandex: duration crossmatch failed for %s: %s", airport_iata, e)
conn.commit()
# Final global pass: fill any remaining duration_min gaps across all airports
try:
updated = update_durations(conn, target_date)
conn.commit()
log.info("Yandex: update_durations → %d rows filled for %s", updated, target_date)
except Exception as e:
log.error("Yandex: update_durations failed for %s: %s", target_date, e)
return total