Files
wiki/tasks/flightradar24/ingest/mart/main.py
2026-04-20 23:20:01 +03:00

121 lines
3.4 KiB
Python

"""
fr24-mart service.
- GET /health — healthcheck
- POST /run?date=YYYY-MM-DD — manual trigger
- APScheduler: runs build every hour for yesterday
"""
import logging
import sys
import time
from datetime import date, timedelta, datetime, timezone
import psycopg2
import psycopg2.extras
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, jsonify, request
from config import config
from build_mart import build
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [mart] %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
log = logging.getLogger("mart")
app = Flask(__name__)
_last_run: dict = {"at": None, "status": "never", "stats": {}}
_conn = None
def get_conn():
global _conn
if _conn is None or _conn.closed:
_conn = psycopg2.connect(config.DB_DSN)
psycopg2.extras.register_uuid(_conn)
log.info("DB connection established")
return _conn
def wait_for_db(max_attempts: int = 30):
for attempt in range(1, max_attempts + 1):
try:
get_conn()
return
except psycopg2.OperationalError as e:
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
time.sleep(3)
raise SystemExit("Could not connect to DB")
def scheduled_build():
target = date.today() - timedelta(days=1)
log.info("Scheduled mart build for %s", target)
_last_run["at"] = datetime.now(timezone.utc).isoformat()
_last_run["status"] = "running"
try:
stats = build(target, get_conn())
_last_run.update(status="ok", stats=stats)
except Exception as e:
_last_run["status"] = f"error: {e}"
log.error("Scheduled build failed: %s", e)
@app.get("/health")
def health():
try:
get_conn().cursor().execute("SELECT 1")
db_ok = True
except Exception:
db_ok = False
return jsonify({
"status": "ok" if db_ok else "degraded",
"db": "ok" if db_ok else "error",
"last_run": _last_run,
}), 200 if db_ok else 503
@app.post("/run")
def run_manual():
date_str = request.args.get("date")
if date_str:
try:
target = date.fromisoformat(date_str)
except ValueError:
return jsonify({"error": "invalid date, use YYYY-MM-DD"}), 400
else:
target = date.today() - timedelta(days=1)
if _last_run.get("status") == "running":
return jsonify({"error": "already running"}), 409
_last_run["at"] = datetime.now(timezone.utc).isoformat()
_last_run["status"] = "running"
try:
stats = build(target, get_conn())
_last_run.update(status="ok", stats=stats)
return jsonify({"status": "ok", "stats": stats})
except Exception as e:
_last_run["status"] = f"error: {e}"
log.error("Manual run failed: %s", e)
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
wait_for_db()
scheduler = BackgroundScheduler(timezone="UTC")
scheduler.add_job(
scheduled_build, "interval",
minutes=config.BUILD_INTERVAL_MINUTES,
next_run_time=datetime.now(timezone.utc),
)
scheduler.start()
log.info("Scheduler started, interval=%d min", config.BUILD_INTERVAL_MINUTES)
open("/tmp/ready", "w").close()
app.run(host="0.0.0.0", port=8003, debug=False)