From 827da44a8e07e393f9c9a44695641fc558ab4f5f Mon Sep 17 00:00:00 2001 From: Stream Date: Mon, 20 Apr 2026 20:00:01 +0300 Subject: [PATCH] auto-sync: 2026-04-20 20:00:01 --- .../flightradar24/ingest/schedule/backfill.py | 17 +-- tasks/flightradar24/ingest/schedule/config.py | 3 - tasks/flightradar24/ingest/schedule/main.py | 5 +- .../ingest/schedule/opensky_worker.py | 141 ------------------ 4 files changed, 5 insertions(+), 161 deletions(-) delete mode 100644 tasks/flightradar24/ingest/schedule/opensky_worker.py diff --git a/tasks/flightradar24/ingest/schedule/backfill.py b/tasks/flightradar24/ingest/schedule/backfill.py index 18c98e1..6bec8d9 100644 --- a/tasks/flightradar24/ingest/schedule/backfill.py +++ b/tasks/flightradar24/ingest/schedule/backfill.py @@ -4,7 +4,7 @@ Saves progress to fr24_ext.load_state so it can resume after interruption. Usage: python backfill.py --start-date 2026-04-01 --end-date 2026-04-19 - python backfill.py --start-date 2026-04-01 --end-date 2026-04-19 --skip-opensky + python backfill.py --start-date 2026-04-01 --end-date 2026-04-19 --resume """ import argparse import json @@ -16,7 +16,6 @@ import psycopg2 from config import config from yandex_worker import fetch_day as yandex_fetch_day -from opensky_worker import enrich_day as opensky_enrich_day log = logging.getLogger(__name__) @@ -52,10 +51,8 @@ def main(): parser = argparse.ArgumentParser(description="Backfill fr24_ext.schedule") parser.add_argument("--start-date", required=True, help="YYYY-MM-DD") parser.add_argument("--end-date", required=True, help="YYYY-MM-DD") - parser.add_argument("--skip-opensky", action="store_true", - help="Skip OpenSky enrichment (faster, no icao24)") parser.add_argument("--resume", action="store_true", - help="Resume from last saved state (ignores --start-date if state exists)") + help="Resume from last saved state") args = parser.parse_args() start = date.fromisoformat(args.start_date) @@ -67,7 +64,6 @@ def main(): conn = psycopg2.connect(config.DB_DSN) - # Resume from saved state if requested if args.resume: state = load_state(conn, STATE_KEY) if state and state.get("last_date"): @@ -79,7 +75,6 @@ def main(): current = start total_flights = 0 - total_enriched = 0 log.info("Backfill: %s → %s (%d days)", start, end, (end - start).days + 1) @@ -90,12 +85,6 @@ def main(): yandex_count = yandex_fetch_day(current, conn) total_flights += yandex_count log.info("Yandex: %d flights", yandex_count) - - if not args.skip_opensky: - opensky_count = opensky_enrich_day(current, conn) - total_enriched += opensky_count - log.info("OpenSky: %d enriched", opensky_count) - save_state(conn, STATE_KEY, {"last_date": current.isoformat()}) except KeyboardInterrupt: @@ -108,7 +97,7 @@ def main(): current += timedelta(days=1) conn.close() - log.info("Backfill done. Flights: %d, Enriched: %d", total_flights, total_enriched) + log.info("Backfill done. Flights: %d", total_flights) if __name__ == "__main__": diff --git a/tasks/flightradar24/ingest/schedule/config.py b/tasks/flightradar24/ingest/schedule/config.py index dcbbed4..fee6833 100644 --- a/tasks/flightradar24/ingest/schedule/config.py +++ b/tasks/flightradar24/ingest/schedule/config.py @@ -14,8 +14,6 @@ class Config: # API keys YANDEX_RASP_API_KEY: str = os.getenv("YANDEX_RASP_API_KEY", "") - OPENSKY_USERNAME: str = os.getenv("OPENSKY_USERNAME", "") - OPENSKY_PASSWORD: str = os.getenv("OPENSKY_PASSWORD", "") # Airports: IATA → metadata AIRPORTS: Dict = field(default_factory=lambda: { @@ -27,7 +25,6 @@ class Config: # Rate limits (seconds between requests) YANDEX_RATE_LIMIT_SEC: float = float(os.getenv("YANDEX_RATE_LIMIT_SEC", "1.0")) - OPENSKY_RATE_LIMIT_SEC: float = float(os.getenv("OPENSKY_RATE_LIMIT_SEC", "30.0")) # Retention RETENTION_DAYS: int = int(os.getenv("SCHEDULE_RETENTION_DAYS", "1095")) diff --git a/tasks/flightradar24/ingest/schedule/main.py b/tasks/flightradar24/ingest/schedule/main.py index 9f4e8f4..9422a0a 100644 --- a/tasks/flightradar24/ingest/schedule/main.py +++ b/tasks/flightradar24/ingest/schedule/main.py @@ -15,7 +15,6 @@ from flask import Flask, jsonify from config import config from yandex_worker import fetch_day as yandex_fetch_day -from opensky_worker import enrich_day as opensky_enrich_day logging.basicConfig( level=logging.INFO, @@ -27,7 +26,7 @@ log = logging.getLogger("schedule") app = Flask(__name__) -_last_run: dict = {"at": None, "status": "never", "flights": 0, "enriched": 0} +_last_run: dict = {"at": None, "status": "never", "flights": 0} _conn = None @@ -40,7 +39,7 @@ def get_conn(): def daily_job(): - """T-1: load yesterday's schedule from Yandex then enrich with OpenSky.""" + """T-1: load yesterday's schedule from Yandex.""" target = date.today() - timedelta(days=1) log.info("daily_job: starting for %s", target) _last_run["at"] = datetime.now(timezone.utc).isoformat() diff --git a/tasks/flightradar24/ingest/schedule/opensky_worker.py b/tasks/flightradar24/ingest/schedule/opensky_worker.py deleted file mode 100644 index 3fc67f7..0000000 --- a/tasks/flightradar24/ingest/schedule/opensky_worker.py +++ /dev/null @@ -1,141 +0,0 @@ -""" -OpenSky Network API worker — enriches fr24_ext.schedule with icao24 + actual times. -""" -import logging -import time -from datetime import date, datetime, timezone -from typing import Dict, List, Optional -from functools import wraps - -import requests -import psycopg2 - -from config import config - -log = logging.getLogger(__name__) - -OPENSKY_BASE = "https://opensky-network.org/api/flights" - - -# ── retry decorator ─────────────────────────────────────────────────────────── - -def retry(max_retries: int = 3, base_delay: float = 10.0): - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - for attempt in range(max_retries): - try: - return func(*args, **kwargs) - except requests.RequestException as e: - if attempt < max_retries - 1: - wait = base_delay * (2 ** attempt) - log.warning("Retry %d/%d after %.0fs: %s", attempt + 1, max_retries, wait, e) - time.sleep(wait) - else: - raise - return wrapper - return decorator - - -# ── API fetch ───────────────────────────────────────────────────────────────── - -@retry(max_retries=3, base_delay=10.0) -def fetch_opensky_flights(icao: str, begin_ts: int, end_ts: int, direction: str) -> List[Dict]: - """ - Fetch arrivals or departures from OpenSky for one airport/day. - direction: 'arrival' | 'departure' - """ - url = f"{OPENSKY_BASE}/{direction}" - params = {"airport": icao, "begin": begin_ts, "end": end_ts} - - auth = None - if config.OPENSKY_USERNAME and config.OPENSKY_PASSWORD: - auth = (config.OPENSKY_USERNAME, config.OPENSKY_PASSWORD) - - resp = requests.get(url, params=params, auth=auth, timeout=60) - - # 404 means no data for this period — not an error - if resp.status_code == 404: - return [] - - resp.raise_for_status() - return resp.json() or [] - - -# ── DB enrichment ───────────────────────────────────────────────────────────── - -def enrich_flights(conn, opensky_flights: List[Dict], airport_iata: str, direction: str) -> int: - """ - Update icao24 + actual_at on existing schedule rows matched by callsign. - Match window: ±3 hours around the OpenSky actual time. - """ - if not opensky_flights: - return 0 - - enriched = 0 - with conn.cursor() as cur: - for flight in opensky_flights: - icao24 = (flight.get("icao24") or "").strip().lower() - callsign = (flight.get("callsign") or "").strip() - - if not icao24 or not callsign: - continue - - # actual time: lastSeen for arrivals, firstSeen for departures - actual_ts = ( - flight.get("lastSeen") if direction == "arrival" - else flight.get("firstSeen") - ) - if not actual_ts: - continue - - actual_at = datetime.fromtimestamp(actual_ts, tz=timezone.utc) - - cur.execute( - """ - UPDATE fr24_ext.schedule - SET - icao24 = %s, - actual_at = %s, - source = CASE WHEN source = 'yandex' THEN 'merged' ELSE source END - WHERE airport_iata = %s - AND direction = %s - AND flight_number = %s - AND scheduled_at BETWEEN %s - INTERVAL '3 hours' - AND %s + INTERVAL '3 hours' - """, - (icao24, actual_at, airport_iata, direction, callsign, actual_at, actual_at), - ) - enriched += cur.rowcount - - return enriched - - -# ── main entry ──────────────────────────────────────────────────────────────── - -def enrich_day(target_date: date, conn) -> int: - """Enrich all airports for one day. Returns total rows updated.""" - # Unix timestamps for start/end of the day (UTC) - day_start = int(datetime(target_date.year, target_date.month, target_date.day, - tzinfo=timezone.utc).timestamp()) - day_end = day_start + 86400 - - total = 0 - - for airport_iata, airport_info in config.AIRPORTS.items(): - icao = airport_info["icao"] - log.info("OpenSky: enriching %s (%s) for %s", airport_iata, icao, target_date) - - for direction in ("arrival", "departure"): - try: - flights = fetch_opensky_flights(icao, day_start, day_end, direction) - count = enrich_flights(conn, flights, airport_iata, direction) - total += count - log.info("OpenSky: %s %s → %d rows enriched", airport_iata, direction, count) - except Exception as e: - log.error("OpenSky: failed %s %s: %s", airport_iata, direction, e) - - time.sleep(config.OPENSKY_RATE_LIMIT_SEC) - - conn.commit() - return total