901 lines
34 KiB
Python
901 lines
34 KiB
Python
"""
|
||
FR24 API Service
|
||
Minimal Flask API reading from PostgreSQL fr24 schema.
|
||
"""
|
||
import csv
|
||
import io
|
||
import os
|
||
import time
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
from functools import wraps
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
from flask import Flask, jsonify, request, send_from_directory, Response
|
||
|
||
# ── IATA → город маппинг ──────────────────────────────────────────────────────
|
||
IATA_CITY = {
|
||
"AAQ": "Анапа", "ABA": "Абакан", "ADB": "Измир", "ADD": "Аддис-Абеба",
|
||
"AER": "Сочи", "AKX": "Актобе", "ALA": "Алматы", "AMM": "Амман",
|
||
"AMS": "Амстердам", "ARH": "Архангельск", "ARN": "Стокгольм", "ASB": "Ашхабад",
|
||
"ASF": "Астрахань", "ATH": "Афины", "AUH": "Абу-Даби", "AYT": "Анталья",
|
||
"AZN": "Андижан", "BAX": "Барнаул", "BCN": "Барселона", "BEG": "Белград",
|
||
"BER": "Берлин", "BEY": "Бейрут", "BJV": "Бодрум", "BKK": "Бангкок",
|
||
"BNE": "Брисбен", "BOG": "Богота", "BOM": "Мумбаи", "BQS": "Благовещенск",
|
||
"BRU": "Брюссель", "BTK": "Братск", "BUS": "Батуми", "BXK": "Бухара",
|
||
"BZK": "Брянск", "CAI": "Каир", "CCJ": "Кожикоде", "CEK": "Челябинск",
|
||
"CGK": "Джакарта", "CGN": "Кёльн", "CMN": "Касабланка", "CMB": "Коломбо",
|
||
"CSY": "Чебоксары", "CXI": "Кашгар", "CZM": "Косумель", "DEL": "Дели",
|
||
"DME": "Домодедово", "DMJ": "Домодедово", "DOH": "Доха", "DSS": "Дакар",
|
||
"DXB": "Дубай", "EGO": "Белгород", "EKB": "Екатеринбург", "EVN": "Ереван",
|
||
"FRA": "Франкфурт", "FRU": "Бишкек", "GDZ": "Геленджик", "GDX": "Магадан",
|
||
"GIF": "Гифу", "GOI": "Гоа", "GOJ": "Нижний Новгород", "GRV": "Грозный",
|
||
"GRZ": "Грац", "GYD": "Баку", "HAD": "Ханчжоу", "HAM": "Гамбург",
|
||
"HEL": "Хельсинки", "HMA": "Ханты-Мансийск", "HRB": "Харбин",
|
||
"HRG": "Хургада", "HKT": "Пхукет", "HYD": "Хайдарабад", "IKT": "Иркутск",
|
||
"IKA": "Тегеран", "IST": "Стамбул", "IXC": "Чандигарх", "JED": "Джидда",
|
||
"JFK": "Нью-Йорк", "KGD": "Калининград", "KGV": "Когалым",
|
||
"KHV": "Хабаровск", "KJA": "Красноярск", "KLO": "Калининград",
|
||
"KRR": "Краснодар", "KUF": "Самара", "KUL": "Куала-Лумпур",
|
||
"KZN": "Казань", "LAX": "Лос-Анджелес", "LED": "Санкт-Петербург",
|
||
"LHR": "Лондон", "LPK": "Липецк", "MIA": "Майами", "MLE": "Мале",
|
||
"MMK": "Мурманск", "MRV": "Минеральные Воды", "MSQ": "Минск",
|
||
"MUC": "Мюнхен", "NBO": "Найроби", "NJC": "Нижневартовск",
|
||
"NNM": "Нарьян-Мар", "NQZ": "Астана", "NSK": "Норильск", "NVR": "Набережные Челны",
|
||
"NVT": "Новороссийск", "OVB": "Новосибирск", "OMS": "Омск",
|
||
"OSL": "Осло", "OSS": "Ош", "OZH": "Ургенч", "PEE": "Пермь",
|
||
"PEK": "Пекин", "PKC": "Петропавловск-Камчатский", "PRG": "Прага",
|
||
"PSA": "Пиза", "PUY": "Пула", "PUS": "Пусан", "REN": "Оренбург",
|
||
"RGK": "Горно-Алтайск", "RIX": "Рига", "ROV": "Ростов-на-Дону",
|
||
"RTW": "Саратов", "SAW": "Стамбул (Сабиха)", "SGB": "Балаково",
|
||
"SGC": "Сургут", "SIP": "Симферополь", "SKG": "Салоники", "SLY": "Салехард",
|
||
"SIN": "Сингапур", "SJJ": "Сараево", "SKZ": "Сыктывкар",
|
||
"SVO": "Шереметьево", "SVX": "Екатеринбург", "SWT": "Стрежевой",
|
||
"SYD": "Сидней", "TAS": "Ташкент", "TBS": "Тбилиси", "TJM": "Тюмень",
|
||
"TKM": "Туркменбаши", "TLL": "Таллин", "TLS": "Тулуза", "TSE": "Астана",
|
||
"TSQ": "Тамбов", "TXL": "Берлин", "ULV": "Ульяновск", "UFA": "Уфа",
|
||
"UJD": "Усть-Каменогорск", "VOG": "Волгоград", "VOZ": "Воронеж",
|
||
"VNO": "Вильнюс", "VVO": "Владивосток", "VKO": "Внуково", "WAW": "Варшава",
|
||
"WLG": "Веллингтон", "XRY": "Херес", "YKS": "Якутск", "YYZ": "Торонто",
|
||
"ZIA": "Жуковский", "ZRH": "Цюрих", "ZYR": "Зырянка",
|
||
}
|
||
|
||
def _city(iata: str) -> str:
|
||
"""IATA code → human-readable city name."""
|
||
return IATA_CITY.get(iata.strip() if iata else "", iata or "")
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [api] %(levelname)s %(message)s",
|
||
datefmt="%Y-%m-%dT%H:%M:%S",
|
||
)
|
||
log = logging.getLogger("api")
|
||
|
||
app = Flask(__name__, static_folder="/app/static", static_url_path="/static")
|
||
|
||
DB_DSN = (
|
||
f"host={os.environ['POSTGRES_HOST']} "
|
||
f"port={os.environ.get('POSTGRES_PORT', 5432)} "
|
||
f"dbname={os.environ['POSTGRES_DB']} "
|
||
f"user={os.environ['POSTGRES_USER']} "
|
||
f"password={os.environ['POSTGRES_PASSWORD']}"
|
||
)
|
||
API_PORT = int(os.environ.get("API_PORT", 8080))
|
||
HEALTHCHECK_FILE = "/tmp/api-ready"
|
||
START_TIME = datetime.now(timezone.utc)
|
||
|
||
# ── db connection (simple persistent conn with reconnect) ─────────────────────
|
||
|
||
_conn = None
|
||
|
||
def get_conn():
|
||
global _conn
|
||
if _conn is None or _conn.closed:
|
||
_conn = psycopg2.connect(DB_DSN)
|
||
psycopg2.extras.register_uuid(_conn)
|
||
log.info("DB connection established")
|
||
return _conn
|
||
|
||
|
||
def query(sql: str, params=None) -> list:
|
||
for attempt in range(2):
|
||
try:
|
||
conn = get_conn()
|
||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||
cur.execute(sql, params)
|
||
return [dict(r) for r in cur.fetchall()]
|
||
except psycopg2.OperationalError:
|
||
global _conn
|
||
_conn = None
|
||
if attempt == 1:
|
||
raise
|
||
|
||
|
||
def query_one(sql: str, params=None) -> dict | None:
|
||
rows = query(sql, params)
|
||
return rows[0] if rows else None
|
||
|
||
|
||
# ── serialisation helper ──────────────────────────────────────────────────────
|
||
|
||
def serial(obj):
|
||
"""Make psycopg2 types JSON-serialisable."""
|
||
import decimal, uuid
|
||
if isinstance(obj, (datetime,)):
|
||
return obj.isoformat()
|
||
if isinstance(obj, decimal.Decimal):
|
||
return float(obj)
|
||
if isinstance(obj, uuid.UUID):
|
||
return str(obj)
|
||
raise TypeError(f"Not serialisable: {type(obj)}")
|
||
|
||
|
||
def ok(data, **kwargs):
|
||
return app.response_class(
|
||
__import__("json").dumps(data, default=serial),
|
||
mimetype="application/json",
|
||
**kwargs,
|
||
)
|
||
|
||
|
||
def err(msg: str, status: int = 500):
|
||
return ok({"error": msg}, status=status)
|
||
|
||
|
||
# ── routes ────────────────────────────────────────────────────────────────────
|
||
|
||
@app.get("/")
|
||
def index():
|
||
return send_from_directory("/app/static", "index.html")
|
||
|
||
|
||
@app.get("/health")
|
||
def health():
|
||
try:
|
||
query_one("SELECT 1")
|
||
db_ok = True
|
||
except Exception:
|
||
db_ok = False
|
||
return ok({
|
||
"status": "ok" if db_ok else "degraded",
|
||
"db": "ok" if db_ok else "error",
|
||
"uptime_seconds": int((datetime.now(timezone.utc) - START_TIME).total_seconds()),
|
||
}, status=200 if db_ok else 503)
|
||
|
||
|
||
@app.get("/dashboard/status")
|
||
def dashboard_status():
|
||
try:
|
||
captures = query_one("SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE status='active') AS active FROM fr24.captures")
|
||
packets = query_one("SELECT COUNT(*) AS total FROM fr24.raw_packets")
|
||
state = query_one("SELECT state_value FROM fr24.processing_state WHERE state_key='preprocess_cursor'")
|
||
aircraft = query_one("SELECT COUNT(*) AS total FROM fr24.aircraft")
|
||
flights = query_one("SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE status='active') AS active FROM fr24.flights")
|
||
return ok({
|
||
"captures": captures,
|
||
"raw_packets": packets,
|
||
"processing_state": state["state_value"] if state else None,
|
||
"aircraft": aircraft,
|
||
"flights": flights,
|
||
})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/viewer/config")
|
||
def viewer_config():
|
||
return ok({
|
||
"system": "fr24-ingest",
|
||
"version": "0.1.0",
|
||
"stage": "step-3-real-tracks",
|
||
"db_schema": "fr24",
|
||
"center": {"lat": 55.75, "lon": 37.62},
|
||
"zoom": 7,
|
||
"features": {
|
||
"adsb_decode": True,
|
||
"real_rtlsdr": True,
|
||
"noise_model": False,
|
||
},
|
||
})
|
||
|
||
|
||
@app.get("/captures")
|
||
def captures():
|
||
try:
|
||
limit = min(int(request.args.get("limit", 50)), 200)
|
||
rows = query(
|
||
"""
|
||
SELECT capture_id, started_at, ended_at, source, device_index,
|
||
center_frequency_hz, sample_rate_hz, gain_db, status, notes, created_at
|
||
FROM fr24.captures
|
||
ORDER BY started_at DESC
|
||
LIMIT %s
|
||
""",
|
||
(limit,),
|
||
)
|
||
return ok({"captures": rows, "count": len(rows)})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/aircraft")
|
||
def aircraft_list():
|
||
try:
|
||
limit = min(int(request.args.get("limit", 100)), 500)
|
||
rows = query(
|
||
"""
|
||
SELECT aircraft_id, icao24, callsign, registration, aircraft_type,
|
||
operator_name, first_seen_at, last_seen_at
|
||
FROM fr24.aircraft
|
||
ORDER BY last_seen_at DESC NULLS LAST
|
||
LIMIT %s
|
||
""",
|
||
(limit,),
|
||
)
|
||
return ok({"aircraft": rows, "count": len(rows)})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/flights")
|
||
def flights():
|
||
try:
|
||
limit = min(int(request.args.get("limit", 100)), 500)
|
||
status_filter = request.args.get("status")
|
||
if status_filter:
|
||
rows = query(
|
||
"""
|
||
SELECT f.flight_id, f.aircraft_id, a.icao24, f.callsign,
|
||
f.departure_airport, f.arrival_airport,
|
||
f.started_at, f.ended_at, f.status, f.source
|
||
FROM fr24.flights f
|
||
JOIN fr24.aircraft a USING (aircraft_id)
|
||
WHERE f.status = %s
|
||
ORDER BY f.started_at DESC
|
||
LIMIT %s
|
||
""",
|
||
(status_filter, limit),
|
||
)
|
||
else:
|
||
rows = query(
|
||
"""
|
||
SELECT f.flight_id, f.aircraft_id, a.icao24, f.callsign,
|
||
f.departure_airport, f.arrival_airport,
|
||
f.started_at, f.ended_at, f.status, f.source
|
||
FROM fr24.flights f
|
||
JOIN fr24.aircraft a USING (aircraft_id)
|
||
ORDER BY f.started_at DESC
|
||
LIMIT %s
|
||
""",
|
||
(limit,),
|
||
)
|
||
return ok({"flights": rows, "count": len(rows)})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
# ── map / live endpoints ──────────────────────────────────────────────────────
|
||
|
||
@app.get("/api/aircraft/live")
|
||
def aircraft_live():
|
||
"""Active aircraft with their latest position as GeoJSON FeatureCollection."""
|
||
try:
|
||
minutes = int(request.args.get("minutes", 60))
|
||
rows = query(
|
||
"""
|
||
SELECT
|
||
a.icao24,
|
||
a.callsign,
|
||
a.registration,
|
||
a.aircraft_type,
|
||
tp.observed_at,
|
||
ST_X(tp.geom) AS lon,
|
||
ST_Y(tp.geom) AS lat,
|
||
tp.altitude_m,
|
||
tp.ground_speed_kt,
|
||
tp.heading_deg,
|
||
tp.vertical_rate_fpm
|
||
FROM fr24.aircraft a
|
||
JOIN fr24.flights f ON f.aircraft_id = a.aircraft_id
|
||
JOIN LATERAL (
|
||
SELECT tp2.geom, tp2.observed_at, tp2.altitude_m,
|
||
tp2.ground_speed_kt, tp2.heading_deg, tp2.vertical_rate_fpm
|
||
FROM fr24.track_points tp2
|
||
WHERE tp2.flight_id = f.flight_id
|
||
ORDER BY tp2.observed_at DESC
|
||
LIMIT 1
|
||
) tp ON true
|
||
WHERE f.status = 'active'
|
||
AND tp.observed_at >= now() - (%s || ' minutes')::interval
|
||
ORDER BY tp.observed_at DESC
|
||
""",
|
||
(minutes,),
|
||
)
|
||
features = []
|
||
for r in rows:
|
||
if r["lon"] is None or r["lat"] is None:
|
||
continue
|
||
features.append({
|
||
"type": "Feature",
|
||
"geometry": {"type": "Point", "coordinates": [r["lon"], r["lat"]]},
|
||
"properties": {
|
||
"icao24": r["icao24"],
|
||
"callsign": r["callsign"] or r["icao24"],
|
||
"registration": r["registration"],
|
||
"aircraft_type": r["aircraft_type"],
|
||
"altitude_m": r["altitude_m"],
|
||
"ground_speed_kt": r["ground_speed_kt"],
|
||
"heading_deg": r["heading_deg"],
|
||
"vertical_rate_fpm": r["vertical_rate_fpm"],
|
||
"observed_at": r["observed_at"].isoformat() if r["observed_at"] else None,
|
||
},
|
||
})
|
||
return ok({"type": "FeatureCollection", "features": features, "count": len(features)})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/api/aircraft/<icao24>")
|
||
def aircraft_detail(icao24: str):
|
||
"""Details for a single aircraft including recent track points."""
|
||
try:
|
||
ac = query_one(
|
||
"SELECT * FROM fr24.aircraft WHERE icao24 = %s",
|
||
(icao24.lower(),),
|
||
)
|
||
if not ac:
|
||
return err("not found", 404)
|
||
|
||
flights_rows = query(
|
||
"""
|
||
SELECT flight_id, callsign, departure_airport, arrival_airport,
|
||
started_at, ended_at, status
|
||
FROM fr24.flights
|
||
WHERE aircraft_id = %s
|
||
ORDER BY started_at DESC
|
||
LIMIT 10
|
||
""",
|
||
(ac["aircraft_id"],),
|
||
)
|
||
|
||
# last 100 track points across all recent flights
|
||
points = query(
|
||
"""
|
||
SELECT tp.observed_at,
|
||
ST_X(tp.geom) AS lon, ST_Y(tp.geom) AS lat,
|
||
tp.altitude_m, tp.ground_speed_kt, tp.heading_deg
|
||
FROM fr24.track_points tp
|
||
JOIN fr24.flights f ON f.flight_id = tp.flight_id
|
||
WHERE f.aircraft_id = %s
|
||
ORDER BY tp.observed_at DESC
|
||
LIMIT 100
|
||
""",
|
||
(ac["aircraft_id"],),
|
||
)
|
||
|
||
return ok({
|
||
"aircraft": ac,
|
||
"flights": flights_rows,
|
||
"recent_points": points,
|
||
})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/monitoring")
|
||
def monitoring_page():
|
||
return send_from_directory("/app/static", "monitoring.html")
|
||
|
||
|
||
@app.get("/schedule")
|
||
def schedule_page():
|
||
return send_from_directory("/app/static", "schedule.html")
|
||
|
||
|
||
# ── schedule API ──────────────────────────────────────────────────────────────
|
||
|
||
def _schedule_where(args):
|
||
"""Build WHERE clause + params list from request args."""
|
||
clauses = []
|
||
params = []
|
||
|
||
date_from = args.get("date_from")
|
||
date_to = args.get("date_to")
|
||
airport = args.get("airport", "all")
|
||
direction = args.get("direction", "all")
|
||
flight_number = args.get("flight_number", "").strip()
|
||
time_from = args.get("time_from", "").strip()
|
||
time_to = args.get("time_to", "").strip()
|
||
|
||
if date_from:
|
||
clauses.append("flight_date >= %s")
|
||
params.append(date_from)
|
||
if date_to:
|
||
clauses.append("flight_date <= %s")
|
||
params.append(date_to)
|
||
if airport and airport != "all":
|
||
clauses.append("airport_iata = %s")
|
||
params.append(airport)
|
||
if direction and direction != "all":
|
||
clauses.append("direction = %s")
|
||
params.append(direction)
|
||
if flight_number:
|
||
clauses.append("flight_number ILIKE %s")
|
||
params.append(f"%{flight_number}%")
|
||
if time_from:
|
||
try:
|
||
h, m = map(int, time_from.split(":"))
|
||
clauses.append(
|
||
"(EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC') * 60 "
|
||
"+ EXTRACT(MINUTE FROM scheduled_at AT TIME ZONE 'UTC')) >= %s"
|
||
)
|
||
params.append(h * 60 + m)
|
||
except ValueError:
|
||
pass
|
||
if time_to:
|
||
try:
|
||
h, m = map(int, time_to.split(":"))
|
||
clauses.append(
|
||
"(EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC') * 60 "
|
||
"+ EXTRACT(MINUTE FROM scheduled_at AT TIME ZONE 'UTC')) <= %s"
|
||
)
|
||
params.append(h * 60 + m)
|
||
except ValueError:
|
||
pass
|
||
|
||
where = " AND ".join(clauses) if clauses else "1=1"
|
||
return where, params
|
||
|
||
|
||
@app.get("/api/schedule/data")
|
||
def schedule_data():
|
||
try:
|
||
limit = min(int(request.args.get("limit", 100)), 1000)
|
||
offset = max(int(request.args.get("offset", 0)), 0)
|
||
|
||
where, params = _schedule_where(request.args)
|
||
|
||
total_row = query_one(
|
||
f"SELECT COUNT(*) AS cnt FROM fr24_ext.schedule WHERE {where}",
|
||
params,
|
||
)
|
||
total = total_row["cnt"] if total_row else 0
|
||
|
||
rows = query(
|
||
f"""
|
||
SELECT
|
||
flight_number, airline_name, airport_iata, direction,
|
||
origin_iata, destination_iata,
|
||
scheduled_at, actual_at, status, icao24,
|
||
flight_date, duration_min, thread_title,
|
||
actual_takeoff, actual_landed,
|
||
delay_takeoff_min, delay_landed_min,
|
||
fr24_id, flight_category
|
||
FROM fr24_ext.schedule
|
||
WHERE {where}
|
||
ORDER BY scheduled_at DESC
|
||
LIMIT %s OFFSET %s
|
||
""",
|
||
params + [limit, offset],
|
||
)
|
||
|
||
flights = []
|
||
for r in rows:
|
||
sched = r["scheduled_at"]
|
||
actual = r["actual_at"]
|
||
delay_min = (
|
||
int((actual - sched).total_seconds() / 60)
|
||
if actual and sched else None
|
||
)
|
||
# Actual times from FR24 flight-summary/full enrichment
|
||
actual_takeoff = r.get("actual_takeoff")
|
||
actual_landed = r.get("actual_landed")
|
||
delay_takeoff = r.get("delay_takeoff_min")
|
||
delay_landed = r.get("delay_landed_min")
|
||
fr24_id = r.get("fr24_id")
|
||
flight_cat = r.get("flight_category")
|
||
|
||
flights.append({
|
||
"flight_number": r["flight_number"],
|
||
"airline": r["airline_name"],
|
||
"airport": r["airport_iata"],
|
||
"direction": r["direction"],
|
||
"origin": _city(r["origin_iata"]) or r["origin_iata"] or "",
|
||
"destination": _city(r["destination_iata"]) or r["destination_iata"] or "",
|
||
"origin_iata": r["origin_iata"] or "",
|
||
"destination_iata": r["destination_iata"] or "",
|
||
"thread_title": r["thread_title"] or "",
|
||
"scheduled_at": sched.isoformat() if sched else None,
|
||
"actual_at": actual.isoformat() if actual else None,
|
||
"delay_min": delay_min,
|
||
"duration_min": r["duration_min"],
|
||
"status": r["status"],
|
||
"icao24": r["icao24"],
|
||
# New fields from FR24 enrichment
|
||
"actual_takeoff": actual_takeoff.isoformat() if hasattr(actual_takeoff, 'isoformat') else None,
|
||
"actual_landed": actual_landed.isoformat() if hasattr(actual_landed, 'isoformat') else None,
|
||
"delay_takeoff_min": delay_takeoff,
|
||
"delay_landed_min": delay_landed,
|
||
"fr24_id": fr24_id,
|
||
"flight_category": flight_cat,
|
||
})
|
||
|
||
return ok({"total": total, "flights": flights})
|
||
except Exception as e:
|
||
log.exception("schedule_data error")
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/api/schedule/export")
|
||
def schedule_export():
|
||
try:
|
||
where, params = _schedule_where(request.args)
|
||
|
||
rows = query(
|
||
f"""
|
||
SELECT
|
||
flight_date, flight_number, airline_name, airport_iata, direction,
|
||
origin_iata, destination_iata,
|
||
scheduled_at, actual_at, status, icao24, duration_min,
|
||
actual_takeoff, actual_landed, delay_takeoff_min, delay_landed_min,
|
||
fr24_id, flight_category
|
||
FROM fr24_ext.schedule
|
||
WHERE {where}
|
||
ORDER BY scheduled_at DESC
|
||
LIMIT 100000
|
||
""",
|
||
params,
|
||
)
|
||
|
||
buf = io.StringIO()
|
||
writer = csv.writer(buf)
|
||
writer.writerow([
|
||
"Date", "Flight", "Airline", "Airport", "Direction",
|
||
"Origin", "Destination", "Scheduled", "Actual",
|
||
"Delay (min)", "Duration (min)", "Status", "ICAO24",
|
||
"Actual Takeoff", "Actual Landed", "Delay Takeoff (min)", "Delay Landed (min)",
|
||
"FR24 ID", "Category",
|
||
])
|
||
for r in rows:
|
||
sched = r["scheduled_at"]
|
||
actual = r["actual_at"]
|
||
delay = (
|
||
int((actual - sched).total_seconds() / 60)
|
||
if actual and sched else ""
|
||
)
|
||
writer.writerow([
|
||
str(r["flight_date"]),
|
||
r["flight_number"],
|
||
r["airline_name"] or "",
|
||
r["airport_iata"],
|
||
r["direction"],
|
||
_city(r["origin_iata"]) or r["origin_iata"] or "",
|
||
_city(r["destination_iata"]) or r["destination_iata"] or "",
|
||
sched.isoformat() if sched else "",
|
||
actual.isoformat() if actual else "",
|
||
delay,
|
||
r["duration_min"] or "",
|
||
r["status"] or "",
|
||
r["icao24"] or "",
|
||
r["actual_takeoff"].isoformat() if hasattr(r.get("actual_takeoff"), 'isoformat') else "",
|
||
r["actual_landed"].isoformat() if hasattr(r.get("actual_landed"), 'isoformat') else "",
|
||
r["delay_takeoff_min"] if r.get("delay_takeoff_min") is not None else "",
|
||
r["delay_landed_min"] if r.get("delay_landed_min") is not None else "",
|
||
r["fr24_id"] or "",
|
||
r["flight_category"] or "",
|
||
])
|
||
|
||
buf.seek(0)
|
||
return Response(
|
||
buf.getvalue(),
|
||
mimetype="text/csv; charset=utf-8",
|
||
headers={"Content-Disposition": "attachment; filename=schedule.csv"},
|
||
)
|
||
except Exception as e:
|
||
log.exception("schedule_export error")
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/api/monitoring/status")
|
||
def monitoring_status():
|
||
"""Return latest monitoring metrics + last 20 rows history."""
|
||
try:
|
||
latest = query_one(
|
||
"""
|
||
SELECT id, collected_at, disk_pct, db_size_mb, capture_lag_sec, throughput_5min
|
||
FROM fr24.monitoring_metrics
|
||
ORDER BY id DESC
|
||
LIMIT 1
|
||
"""
|
||
)
|
||
history = query(
|
||
"""
|
||
SELECT id, collected_at, disk_pct, db_size_mb, capture_lag_sec, throughput_5min
|
||
FROM fr24.monitoring_metrics
|
||
ORDER BY id DESC
|
||
LIMIT 20
|
||
"""
|
||
)
|
||
# live unprocessed count
|
||
unprocessed = query_one(
|
||
"""
|
||
SELECT
|
||
COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) AS cursor_id,
|
||
(SELECT COUNT(*) FROM fr24.raw_packets) AS total
|
||
FROM fr24.processing_state
|
||
WHERE state_key = 'preprocess_cursor'
|
||
"""
|
||
)
|
||
if unprocessed:
|
||
cursor_id = unprocessed["cursor_id"] or 0
|
||
total = unprocessed["total"] or 0
|
||
pending_rows = query_one(
|
||
"SELECT COUNT(*) as cnt FROM fr24.raw_packets WHERE raw_packet_id > %s",
|
||
(cursor_id,)
|
||
)
|
||
pending = pending_rows["cnt"] if pending_rows else 0
|
||
pending_pct = round(pending / total * 100, 1) if total else 0
|
||
unprocessed_info = {"pending": pending, "pending_pct": pending_pct, "total": total}
|
||
else:
|
||
unprocessed_info = None
|
||
return ok({"latest": latest, "history": history, "unprocessed": unprocessed_info})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/api/tracks")
|
||
def tracks():
|
||
"""Track points as GeoJSON LineStrings, filtered by time and optional bbox."""
|
||
try:
|
||
minutes = int(request.args.get("minutes", 30))
|
||
limit = min(int(request.args.get("limit", 50)), 200)
|
||
|
||
# optional bbox: ?bbox=minlon,minlat,maxlon,maxlat
|
||
bbox = request.args.get("bbox")
|
||
bbox_clause = ""
|
||
bbox_params: list = []
|
||
if bbox:
|
||
try:
|
||
minlon, minlat, maxlon, maxlat = map(float, bbox.split(","))
|
||
bbox_clause = (
|
||
"AND tp.geom && ST_MakeEnvelope(%s, %s, %s, %s, 4326)"
|
||
)
|
||
bbox_params = [minlon, minlat, maxlon, maxlat]
|
||
except ValueError:
|
||
pass
|
||
|
||
rows = query(
|
||
f"""
|
||
SELECT
|
||
t.track_id,
|
||
f.flight_id,
|
||
a.icao24,
|
||
f.callsign,
|
||
json_agg(
|
||
json_build_object(
|
||
'lon', ST_X(tp.geom),
|
||
'lat', ST_Y(tp.geom),
|
||
'alt', tp.altitude_m,
|
||
'spd', tp.ground_speed_kt,
|
||
'hdg', tp.heading_deg,
|
||
'ts', tp.observed_at
|
||
)
|
||
ORDER BY tp.point_order
|
||
) AS points
|
||
FROM fr24.tracks t
|
||
JOIN fr24.flights f ON f.flight_id = t.flight_id
|
||
JOIN fr24.aircraft a ON a.aircraft_id = f.aircraft_id
|
||
JOIN fr24.track_points tp ON tp.track_id = t.track_id
|
||
WHERE tp.observed_at >= now() - (%s || ' minutes')::interval
|
||
{bbox_clause}
|
||
GROUP BY t.track_id, f.flight_id, a.icao24, f.callsign
|
||
ORDER BY t.last_point_at DESC NULLS LAST
|
||
LIMIT %s
|
||
""",
|
||
(minutes, *bbox_params, limit),
|
||
)
|
||
|
||
features = []
|
||
for r in rows:
|
||
pts = r["points"] if isinstance(r["points"], list) else []
|
||
coords = [[p["lon"], p["lat"]] for p in pts if p.get("lon") is not None]
|
||
times = [p["ts"].isoformat() if hasattr(p.get("ts"), "isoformat") else str(p["ts"]) if p.get("ts") else None for p in pts if p.get("lon") is not None]
|
||
if len(coords) < 2:
|
||
continue
|
||
features.append({
|
||
"type": "Feature",
|
||
"geometry": {"type": "LineString", "coordinates": coords},
|
||
"properties": {
|
||
"track_id": r["track_id"],
|
||
"flight_id": r["flight_id"],
|
||
"icao24": r["icao24"],
|
||
"callsign": r["callsign"] or r["icao24"],
|
||
"point_count": len(coords),
|
||
"times": times,
|
||
},
|
||
})
|
||
|
||
return ok({"type": "FeatureCollection", "features": features, "count": len(features)})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
# ── data-sources page & API ─────────────────────────────────────────────────
|
||
|
||
@app.get("/data-sources")
|
||
def data_sources_page():
|
||
return send_from_directory("/app/static", "data_sources.html")
|
||
|
||
|
||
def _ds_where(args, date_col="coverage_date"):
|
||
clauses, params = [], []
|
||
if args.get("date_from"):
|
||
clauses.append(f"{date_col} >= %s"); params.append(args["date_from"])
|
||
if args.get("date_to"):
|
||
clauses.append(f"{date_col} <= %s"); params.append(args["date_to"])
|
||
return (" AND ".join(clauses) if clauses else "1=1"), params
|
||
|
||
|
||
@app.get("/api/data-sources/coverage")
|
||
def ds_coverage():
|
||
try:
|
||
where, params = _ds_where(request.args)
|
||
rows = query(
|
||
f"""
|
||
SELECT
|
||
coverage_date,
|
||
total_schedule,
|
||
with_rtlsdr, with_fr24, with_fa, schedule_only,
|
||
rtlsdr_pct, fr24_pct, fa_pct
|
||
FROM fr24_mart.source_coverage
|
||
WHERE {where}
|
||
ORDER BY coverage_date
|
||
""",
|
||
params,
|
||
)
|
||
totals = query_one(
|
||
f"""
|
||
SELECT
|
||
SUM(total_schedule) AS total_schedule,
|
||
SUM(with_rtlsdr) AS with_rtlsdr,
|
||
SUM(with_fr24) AS with_fr24,
|
||
SUM(with_fa) AS with_fa,
|
||
COUNT(*) AS days
|
||
FROM fr24_mart.source_coverage
|
||
WHERE {where}
|
||
""",
|
||
params,
|
||
)
|
||
return ok({"rows": rows, "totals": totals or {}})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/api/data-sources/quality")
|
||
def ds_quality():
|
||
try:
|
||
where, params = _ds_where(request.args, date_col="flight_date")
|
||
q = query_one(
|
||
f"""
|
||
SELECT
|
||
COUNT(*) AS total,
|
||
COUNT(*) FILTER (WHERE origin_iata IS NOT NULL
|
||
AND destination_iata IS NOT NULL) AS with_route,
|
||
COUNT(*) FILTER (WHERE track_source IS NOT NULL) AS with_track,
|
||
COUNT(*) FILTER (WHERE actual_dep IS NOT NULL
|
||
OR actual_arr IS NOT NULL) AS with_actual_time,
|
||
COUNT(*) FILTER (WHERE aircraft_type IS NOT NULL) AS with_aircraft_type
|
||
FROM fr24_mart.flights
|
||
WHERE {where}
|
||
""",
|
||
params,
|
||
)
|
||
# median points per source
|
||
def median_pts(source):
|
||
r = query_one(
|
||
f"""
|
||
SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY track_points) AS med
|
||
FROM fr24_mart.flights
|
||
WHERE {where} AND track_source = %s AND track_points > 0
|
||
""",
|
||
params + [source],
|
||
)
|
||
v = r["med"] if r else None
|
||
return int(v) if v is not None else None
|
||
|
||
quality = dict(q or {})
|
||
quality["median_points_rtlsdr"] = median_pts("rtlsdr")
|
||
quality["median_points_fr24"] = median_pts("fr24")
|
||
quality["median_points_fa"] = median_pts("fa")
|
||
return ok({"quality": quality})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/api/data-sources/top-airlines")
|
||
def ds_top_airlines():
|
||
try:
|
||
where, params = _ds_where(request.args, date_col="flight_date")
|
||
rows = query(
|
||
f"""
|
||
SELECT airline_iata, COUNT(*) AS flight_count
|
||
FROM fr24_mart.flights
|
||
WHERE {where} AND airline_iata IS NOT NULL
|
||
GROUP BY airline_iata
|
||
ORDER BY flight_count DESC
|
||
LIMIT 20
|
||
""",
|
||
params,
|
||
)
|
||
return ok({"rows": rows})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/api/data-sources/top-routes")
|
||
def ds_top_routes():
|
||
try:
|
||
where, params = _ds_where(request.args, date_col="flight_date")
|
||
rows = query(
|
||
f"""
|
||
SELECT origin_iata, destination_iata, COUNT(*) AS flight_count
|
||
FROM fr24_mart.flights
|
||
WHERE {where}
|
||
AND origin_iata IS NOT NULL
|
||
AND destination_iata IS NOT NULL
|
||
GROUP BY origin_iata, destination_iata
|
||
ORDER BY flight_count DESC
|
||
LIMIT 20
|
||
""",
|
||
params,
|
||
)
|
||
return ok({"rows": rows})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
@app.get("/api/data-sources/airport-load")
|
||
def ds_airport_load():
|
||
try:
|
||
where, params = _ds_where(request.args, date_col="flight_date")
|
||
rows = query(
|
||
f"""
|
||
SELECT
|
||
airport_iata,
|
||
EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC')::int AS hour,
|
||
COUNT(*) AS flight_count
|
||
FROM fr24_ext.schedule
|
||
WHERE {where}
|
||
AND airport_iata IN ('SVO','DME','VKO','ZIA')
|
||
GROUP BY airport_iata, hour
|
||
ORDER BY airport_iata, hour
|
||
""",
|
||
params,
|
||
)
|
||
return ok({"rows": rows})
|
||
except Exception as e:
|
||
return err(str(e))
|
||
|
||
|
||
# ── startup ───────────────────────────────────────────────────────────────────
|
||
|
||
def wait_for_db(max_attempts: int = 30):
|
||
for attempt in range(1, max_attempts + 1):
|
||
try:
|
||
get_conn()
|
||
return
|
||
except psycopg2.OperationalError as e:
|
||
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
|
||
time.sleep(2)
|
||
log.error("Could not connect to DB")
|
||
raise SystemExit(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
wait_for_db()
|
||
open(HEALTHCHECK_FILE, "w").close()
|
||
log.info("Healthcheck file written: %s", HEALTHCHECK_FILE)
|
||
log.info("Starting API on port %d", API_PORT)
|
||
app.run(host="0.0.0.0", port=API_PORT, debug=False)
|