auto-sync: 2026-04-20 20:00:01
This commit is contained in:
@@ -4,7 +4,7 @@ Saves progress to fr24_ext.load_state so it can resume after interruption.
|
||||
|
||||
Usage:
|
||||
python backfill.py --start-date 2026-04-01 --end-date 2026-04-19
|
||||
python backfill.py --start-date 2026-04-01 --end-date 2026-04-19 --skip-opensky
|
||||
python backfill.py --start-date 2026-04-01 --end-date 2026-04-19 --resume
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
@@ -16,7 +16,6 @@ import psycopg2
|
||||
|
||||
from config import config
|
||||
from yandex_worker import fetch_day as yandex_fetch_day
|
||||
from opensky_worker import enrich_day as opensky_enrich_day
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -52,10 +51,8 @@ def main():
|
||||
parser = argparse.ArgumentParser(description="Backfill fr24_ext.schedule")
|
||||
parser.add_argument("--start-date", required=True, help="YYYY-MM-DD")
|
||||
parser.add_argument("--end-date", required=True, help="YYYY-MM-DD")
|
||||
parser.add_argument("--skip-opensky", action="store_true",
|
||||
help="Skip OpenSky enrichment (faster, no icao24)")
|
||||
parser.add_argument("--resume", action="store_true",
|
||||
help="Resume from last saved state (ignores --start-date if state exists)")
|
||||
help="Resume from last saved state")
|
||||
args = parser.parse_args()
|
||||
|
||||
start = date.fromisoformat(args.start_date)
|
||||
@@ -67,7 +64,6 @@ def main():
|
||||
|
||||
conn = psycopg2.connect(config.DB_DSN)
|
||||
|
||||
# Resume from saved state if requested
|
||||
if args.resume:
|
||||
state = load_state(conn, STATE_KEY)
|
||||
if state and state.get("last_date"):
|
||||
@@ -79,7 +75,6 @@ def main():
|
||||
|
||||
current = start
|
||||
total_flights = 0
|
||||
total_enriched = 0
|
||||
|
||||
log.info("Backfill: %s → %s (%d days)", start, end, (end - start).days + 1)
|
||||
|
||||
@@ -90,12 +85,6 @@ def main():
|
||||
yandex_count = yandex_fetch_day(current, conn)
|
||||
total_flights += yandex_count
|
||||
log.info("Yandex: %d flights", yandex_count)
|
||||
|
||||
if not args.skip_opensky:
|
||||
opensky_count = opensky_enrich_day(current, conn)
|
||||
total_enriched += opensky_count
|
||||
log.info("OpenSky: %d enriched", opensky_count)
|
||||
|
||||
save_state(conn, STATE_KEY, {"last_date": current.isoformat()})
|
||||
|
||||
except KeyboardInterrupt:
|
||||
@@ -108,7 +97,7 @@ def main():
|
||||
current += timedelta(days=1)
|
||||
|
||||
conn.close()
|
||||
log.info("Backfill done. Flights: %d, Enriched: %d", total_flights, total_enriched)
|
||||
log.info("Backfill done. Flights: %d", total_flights)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -14,8 +14,6 @@ class Config:
|
||||
|
||||
# API keys
|
||||
YANDEX_RASP_API_KEY: str = os.getenv("YANDEX_RASP_API_KEY", "")
|
||||
OPENSKY_USERNAME: str = os.getenv("OPENSKY_USERNAME", "")
|
||||
OPENSKY_PASSWORD: str = os.getenv("OPENSKY_PASSWORD", "")
|
||||
|
||||
# Airports: IATA → metadata
|
||||
AIRPORTS: Dict = field(default_factory=lambda: {
|
||||
@@ -27,7 +25,6 @@ class Config:
|
||||
|
||||
# Rate limits (seconds between requests)
|
||||
YANDEX_RATE_LIMIT_SEC: float = float(os.getenv("YANDEX_RATE_LIMIT_SEC", "1.0"))
|
||||
OPENSKY_RATE_LIMIT_SEC: float = float(os.getenv("OPENSKY_RATE_LIMIT_SEC", "30.0"))
|
||||
|
||||
# Retention
|
||||
RETENTION_DAYS: int = int(os.getenv("SCHEDULE_RETENTION_DAYS", "1095"))
|
||||
|
||||
@@ -15,7 +15,6 @@ from flask import Flask, jsonify
|
||||
|
||||
from config import config
|
||||
from yandex_worker import fetch_day as yandex_fetch_day
|
||||
from opensky_worker import enrich_day as opensky_enrich_day
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -27,7 +26,7 @@ log = logging.getLogger("schedule")
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
_last_run: dict = {"at": None, "status": "never", "flights": 0, "enriched": 0}
|
||||
_last_run: dict = {"at": None, "status": "never", "flights": 0}
|
||||
_conn = None
|
||||
|
||||
|
||||
@@ -40,7 +39,7 @@ def get_conn():
|
||||
|
||||
|
||||
def daily_job():
|
||||
"""T-1: load yesterday's schedule from Yandex then enrich with OpenSky."""
|
||||
"""T-1: load yesterday's schedule from Yandex."""
|
||||
target = date.today() - timedelta(days=1)
|
||||
log.info("daily_job: starting for %s", target)
|
||||
_last_run["at"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
@@ -1,141 +0,0 @@
|
||||
"""
|
||||
OpenSky Network API worker — enriches fr24_ext.schedule with icao24 + actual times.
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
from datetime import date, datetime, timezone
|
||||
from typing import Dict, List, Optional
|
||||
from functools import wraps
|
||||
|
||||
import requests
|
||||
import psycopg2
|
||||
|
||||
from config import config
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
OPENSKY_BASE = "https://opensky-network.org/api/flights"
|
||||
|
||||
|
||||
# ── retry decorator ───────────────────────────────────────────────────────────
|
||||
|
||||
def retry(max_retries: int = 3, base_delay: float = 10.0):
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except requests.RequestException as e:
|
||||
if attempt < max_retries - 1:
|
||||
wait = base_delay * (2 ** attempt)
|
||||
log.warning("Retry %d/%d after %.0fs: %s", attempt + 1, max_retries, wait, e)
|
||||
time.sleep(wait)
|
||||
else:
|
||||
raise
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
# ── API fetch ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@retry(max_retries=3, base_delay=10.0)
|
||||
def fetch_opensky_flights(icao: str, begin_ts: int, end_ts: int, direction: str) -> List[Dict]:
|
||||
"""
|
||||
Fetch arrivals or departures from OpenSky for one airport/day.
|
||||
direction: 'arrival' | 'departure'
|
||||
"""
|
||||
url = f"{OPENSKY_BASE}/{direction}"
|
||||
params = {"airport": icao, "begin": begin_ts, "end": end_ts}
|
||||
|
||||
auth = None
|
||||
if config.OPENSKY_USERNAME and config.OPENSKY_PASSWORD:
|
||||
auth = (config.OPENSKY_USERNAME, config.OPENSKY_PASSWORD)
|
||||
|
||||
resp = requests.get(url, params=params, auth=auth, timeout=60)
|
||||
|
||||
# 404 means no data for this period — not an error
|
||||
if resp.status_code == 404:
|
||||
return []
|
||||
|
||||
resp.raise_for_status()
|
||||
return resp.json() or []
|
||||
|
||||
|
||||
# ── DB enrichment ─────────────────────────────────────────────────────────────
|
||||
|
||||
def enrich_flights(conn, opensky_flights: List[Dict], airport_iata: str, direction: str) -> int:
|
||||
"""
|
||||
Update icao24 + actual_at on existing schedule rows matched by callsign.
|
||||
Match window: ±3 hours around the OpenSky actual time.
|
||||
"""
|
||||
if not opensky_flights:
|
||||
return 0
|
||||
|
||||
enriched = 0
|
||||
with conn.cursor() as cur:
|
||||
for flight in opensky_flights:
|
||||
icao24 = (flight.get("icao24") or "").strip().lower()
|
||||
callsign = (flight.get("callsign") or "").strip()
|
||||
|
||||
if not icao24 or not callsign:
|
||||
continue
|
||||
|
||||
# actual time: lastSeen for arrivals, firstSeen for departures
|
||||
actual_ts = (
|
||||
flight.get("lastSeen") if direction == "arrival"
|
||||
else flight.get("firstSeen")
|
||||
)
|
||||
if not actual_ts:
|
||||
continue
|
||||
|
||||
actual_at = datetime.fromtimestamp(actual_ts, tz=timezone.utc)
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE fr24_ext.schedule
|
||||
SET
|
||||
icao24 = %s,
|
||||
actual_at = %s,
|
||||
source = CASE WHEN source = 'yandex' THEN 'merged' ELSE source END
|
||||
WHERE airport_iata = %s
|
||||
AND direction = %s
|
||||
AND flight_number = %s
|
||||
AND scheduled_at BETWEEN %s - INTERVAL '3 hours'
|
||||
AND %s + INTERVAL '3 hours'
|
||||
""",
|
||||
(icao24, actual_at, airport_iata, direction, callsign, actual_at, actual_at),
|
||||
)
|
||||
enriched += cur.rowcount
|
||||
|
||||
return enriched
|
||||
|
||||
|
||||
# ── main entry ────────────────────────────────────────────────────────────────
|
||||
|
||||
def enrich_day(target_date: date, conn) -> int:
|
||||
"""Enrich all airports for one day. Returns total rows updated."""
|
||||
# Unix timestamps for start/end of the day (UTC)
|
||||
day_start = int(datetime(target_date.year, target_date.month, target_date.day,
|
||||
tzinfo=timezone.utc).timestamp())
|
||||
day_end = day_start + 86400
|
||||
|
||||
total = 0
|
||||
|
||||
for airport_iata, airport_info in config.AIRPORTS.items():
|
||||
icao = airport_info["icao"]
|
||||
log.info("OpenSky: enriching %s (%s) for %s", airport_iata, icao, target_date)
|
||||
|
||||
for direction in ("arrival", "departure"):
|
||||
try:
|
||||
flights = fetch_opensky_flights(icao, day_start, day_end, direction)
|
||||
count = enrich_flights(conn, flights, airport_iata, direction)
|
||||
total += count
|
||||
log.info("OpenSky: %s %s → %d rows enriched", airport_iata, direction, count)
|
||||
except Exception as e:
|
||||
log.error("OpenSky: failed %s %s: %s", airport_iata, direction, e)
|
||||
|
||||
time.sleep(config.OPENSKY_RATE_LIMIT_SEC)
|
||||
|
||||
conn.commit()
|
||||
return total
|
||||
Reference in New Issue
Block a user