Files
wiki/tasks/flightradar24/ingest/schedule/main.py
2026-04-20 17:10:01 +03:00

111 lines
3.3 KiB
Python

"""
fr24-schedule main entry point.
- APScheduler cron: daily_job() at 02:00 UTC
- Flask healthcheck on port 8000
- Cleanup job at 03:00 UTC (retention)
"""
import logging
import sys
import time
from datetime import date, timedelta, datetime, timezone
import psycopg2
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, jsonify
from config import config
from yandex_worker import fetch_day as yandex_fetch_day
from opensky_worker import enrich_day as opensky_enrich_day
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [schedule] %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
log = logging.getLogger("schedule")
app = Flask(__name__)
_last_run: dict = {"at": None, "status": "never", "flights": 0, "enriched": 0}
_conn = None
def get_conn():
global _conn
if _conn is None or _conn.closed:
_conn = psycopg2.connect(config.DB_DSN)
log.info("DB connection established")
return _conn
def daily_job():
"""T-1: load yesterday's schedule from Yandex then enrich with OpenSky."""
target = date.today() - timedelta(days=1)
log.info("daily_job: starting for %s", target)
_last_run["at"] = datetime.now(timezone.utc).isoformat()
_last_run["status"] = "running"
try:
conn = get_conn()
flights = yandex_fetch_day(target, conn)
_last_run.update(status="ok", flights=flights, enriched=0)
log.info("daily_job: done — %d flights", flights)
except Exception as e:
_last_run["status"] = f"error: {e}"
log.error("daily_job failed: %s", e)
def cleanup_job():
"""Delete records older than RETENTION_DAYS."""
try:
conn = get_conn()
with conn.cursor() as cur:
cur.execute(
"DELETE FROM fr24_ext.schedule WHERE flight_date < CURRENT_DATE - %s::int * INTERVAL '1 day'",
(config.RETENTION_DAYS,),
)
deleted = cur.rowcount
conn.commit()
log.info("cleanup_job: deleted %d old records (retention=%d days)", deleted, config.RETENTION_DAYS)
except Exception as e:
log.error("cleanup_job failed: %s", e)
@app.get("/health")
def health():
try:
get_conn().cursor().execute("SELECT 1")
db_ok = True
except Exception:
db_ok = False
return jsonify({
"status": "ok" if db_ok else "degraded",
"db": "ok" if db_ok else "error",
"last_run": _last_run,
}), 200 if db_ok else 503
def wait_for_db(max_attempts: int = 30):
for attempt in range(1, max_attempts + 1):
try:
get_conn()
return
except psycopg2.OperationalError as e:
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
time.sleep(3)
log.error("Could not connect to DB after %d attempts", max_attempts)
raise SystemExit(1)
if __name__ == "__main__":
wait_for_db()
scheduler = BackgroundScheduler(timezone="UTC")
scheduler.add_job(daily_job, "cron", hour=config.DAILY_RUN_HOUR, minute=config.DAILY_RUN_MINUTE)
scheduler.add_job(cleanup_job, "cron", hour=3, minute=0)
scheduler.start()
log.info("Scheduler started — daily job at %02d:%02d UTC", config.DAILY_RUN_HOUR, config.DAILY_RUN_MINUTE)
app.run(host="0.0.0.0", port=8000, debug=False)