Files
wiki/tasks/flightradar24/ingest/tracks_fa/fa_worker.py
2026-04-21 17:00:01 +03:00

209 lines
7.5 KiB
Python

"""
FlightAware AeroAPI tracks worker.
1. Query fr24_ext.schedule for unique flight numbers on target_date
2. GET /aeroapi/flights/{ident} → get fa_flight_id
3. GET /aeroapi/flights/{fa_flight_id}/track → track points
4. Upsert into fr24_ext.flight_tracks_fa + fr24_ext.track_points_fa
"""
import logging
import time
from datetime import date, datetime, timezone
from typing import List, Dict, Optional
import requests
import psycopg2
import psycopg2.extras
from config import config
log = logging.getLogger("fa_worker")
HEADERS = {"x-apikey": config.FA_API_KEY}
_last_request_at: float = 0.0
def _icao_or_none(code: Optional[str]) -> Optional[str]:
"""Return code only if it looks like a valid ICAO airport code (4 uppercase letters).
FA sometimes returns coordinates like 'L 55.61740 39.72253' instead of ICAO."""
if code and len(code) == 4 and code.isalpha() and code.isupper():
return code
return None
def _throttle():
global _last_request_at
elapsed = time.monotonic() - _last_request_at
if elapsed < config.RATE_LIMIT_SEC:
time.sleep(config.RATE_LIMIT_SEC - elapsed)
_last_request_at = time.monotonic()
def _get(path: str, params: dict = None) -> dict:
_throttle()
url = f"{config.FA_API_BASE}{path}"
resp = requests.get(url, headers=HEADERS, params=params, timeout=30)
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
log.warning("Rate limited, sleeping %ds", retry_after)
time.sleep(retry_after)
return _get(path, params)
if resp.status_code == 404:
return {}
resp.raise_for_status()
return resp.json()
def get_flights_for_ident(ident: str, target_date: date) -> List[Dict]:
"""GET /aeroapi/flights/{ident} filtered to target_date window."""
start = f"{target_date}T00:00:00Z"
end = f"{target_date}T23:59:59Z"
data = _get(f"/flights/{ident}", params={"start": start, "end": end})
return data.get("flights", [])
def get_track(fa_flight_id: str) -> List[Dict]:
"""GET /aeroapi/flights/{fa_flight_id}/track → list of positions."""
data = _get(f"/flights/{fa_flight_id}/track")
return data.get("positions", [])
def get_schedule_flights(conn, target_date: date) -> List[Dict]:
"""Return distinct flight_number + airline_iata from schedule for the date."""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT DISTINCT flight_number, airline_iata
FROM fr24_ext.schedule
WHERE flight_date = %s
AND flight_number IS NOT NULL
ORDER BY flight_number
""",
(target_date,),
)
return [dict(r) for r in cur.fetchall()]
def upsert_flight(conn, fa_flight: Dict, target_date: date) -> Optional[int]:
"""Insert/update FA flight header, return DB id."""
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO fr24_ext.flight_tracks_fa
(fa_flight_id, ident_iata, ident_icao, registration, aircraft_type,
origin_icao, destination_icao, actual_off, actual_on,
departure_delay, arrival_delay, actual_distance, flight_date)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (fa_flight_id) DO UPDATE SET
ident_iata = EXCLUDED.ident_iata,
ident_icao = EXCLUDED.ident_icao,
registration = EXCLUDED.registration,
aircraft_type = EXCLUDED.aircraft_type,
origin_icao = EXCLUDED.origin_icao,
destination_icao = EXCLUDED.destination_icao,
actual_off = EXCLUDED.actual_off,
actual_on = EXCLUDED.actual_on,
departure_delay = EXCLUDED.departure_delay,
arrival_delay = EXCLUDED.arrival_delay,
actual_distance = EXCLUDED.actual_distance,
fetched_at = now()
RETURNING id
""",
(
fa_flight.get("fa_flight_id"),
fa_flight.get("ident"),
fa_flight.get("ident_icao"),
fa_flight.get("registration"),
fa_flight.get("aircraft_type"),
_icao_or_none((fa_flight.get("origin") or {}).get("code_icao")),
_icao_or_none((fa_flight.get("destination") or {}).get("code_icao")),
fa_flight.get("actual_off"),
fa_flight.get("actual_on"),
fa_flight.get("departure_delay"),
fa_flight.get("arrival_delay"),
fa_flight.get("route_distance"),
target_date,
),
)
row = cur.fetchone()
return row[0] if row else None
def upsert_track_points(conn, track_id: int, positions: List[Dict]):
"""Delete old points and insert fresh ones. altitude is hundreds of feet → *100."""
with conn.cursor() as cur:
cur.execute("DELETE FROM fr24_ext.track_points_fa WHERE track_id = %s", (track_id,))
if not positions:
return
args = [
(
track_id,
p.get("timestamp"),
p.get("latitude"),
p.get("longitude"),
(p["altitude"] * 100) if p.get("altitude") is not None else None,
p.get("groundspeed"),
p.get("heading"),
p.get("update_type"),
)
for p in positions
if p.get("latitude") is not None and p.get("longitude") is not None
]
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO fr24_ext.track_points_fa
(track_id, observed_at, lat, lon, altitude_ft, gspeed_kt, heading, update_type)
VALUES %s
""",
args,
)
def run(target_date: date, conn) -> Dict:
"""Main entry: load FA tracks for all scheduled flights on target_date."""
log.info("FA tracks: starting for %s", target_date)
stats = {
"date": str(target_date),
"schedule_flights": 0,
"fa_flights_found": 0,
"tracks_loaded": 0,
"errors": 0,
}
schedule_flights = get_schedule_flights(conn, target_date)
stats["schedule_flights"] = len(schedule_flights)
log.info("FA tracks: %d unique flights in schedule", len(schedule_flights))
for sched in schedule_flights:
ident = sched["flight_number"].replace(" ", "") # "SU 208" → "SU208"
try:
fa_flights = get_flights_for_ident(ident, target_date)
if not fa_flights:
log.debug("FA: no flights found for %s", ident)
continue
for fa_flight in fa_flights:
fa_flight_id = fa_flight.get("fa_flight_id")
if not fa_flight_id:
continue
stats["fa_flights_found"] += 1
track_id = upsert_flight(conn, fa_flight, target_date)
if track_id is None:
continue
positions = get_track(fa_flight_id)
upsert_track_points(conn, track_id, positions)
conn.commit()
stats["tracks_loaded"] += 1
log.debug("FA: %s (%s) → %d points", ident, fa_flight_id, len(positions))
except Exception as e:
conn.rollback()
stats["errors"] += 1
log.error("FA: error processing %s: %s", ident, e)
log.info("FA tracks done: %s", stats)
return stats