Files
2026-04-25 13:00:01 +03:00

992 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
FR24 API Service
Minimal Flask API reading from PostgreSQL fr24 schema.
"""
import csv
import io
import os
import time
import logging
from datetime import datetime, timezone
from functools import wraps
import psycopg2
import psycopg2.extras
from flask import Flask, jsonify, request, send_from_directory, Response
# ── IATA → город маппинг ──────────────────────────────────────────────────────
IATA_CITY = {
"AAQ": "Анапа", "ABA": "Абакан", "ADB": "Измир", "ADD": "Аддис-Абеба",
"AER": "Сочи", "AKX": "Актобе", "ALA": "Алматы", "AMM": "Амман",
"AMS": "Амстердам", "ARH": "Архангельск", "ARN": "Стокгольм", "ASB": "Ашхабад",
"ASF": "Астрахань", "ATH": "Афины", "AUH": "Абу-Даби", "AYT": "Анталья",
"AZN": "Андижан", "BAX": "Барнаул", "BCN": "Барселона", "BEG": "Белград",
"BER": "Берлин", "BEY": "Бейрут", "BJV": "Бодрум", "BKK": "Бангкок",
"BNE": "Брисбен", "BOG": "Богота", "BOM": "Мумбаи", "BQS": "Благовещенск",
"BRU": "Брюссель", "BTK": "Братск", "BUS": "Батуми", "BXK": "Бухара",
"BZK": "Брянск", "CAI": "Каир", "CCJ": "Кожикоде", "CEK": "Челябинск",
"CGK": "Джакарта", "CGN": "Кёльн", "CMN": "Касабланка", "CMB": "Коломбо",
"CSY": "Чебоксары", "CXI": "Кашгар", "CZM": "Косумель", "DEL": "Дели",
"DME": "Домодедово", "DMJ": "Домодедово", "DOH": "Доха", "DSS": "Дакар",
"DXB": "Дубай", "EGO": "Белгород", "EKB": "Екатеринбург", "EVN": "Ереван",
"FRA": "Франкфурт", "FRU": "Бишкек", "GDZ": "Геленджик", "GDX": "Магадан",
"GIF": "Гифу", "GOI": "Гоа", "GOJ": "Нижний Новгород", "GRV": "Грозный",
"GRZ": "Грац", "GYD": "Баку", "HAD": "Ханчжоу", "HAM": "Гамбург",
"HEL": "Хельсинки", "HMA": "Ханты-Мансийск", "HRB": "Харбин",
"HRG": "Хургада", "HKT": "Пхукет", "HYD": "Хайдарабад", "IKT": "Иркутск",
"IKA": "Тегеран", "IST": "Стамбул", "IXC": "Чандигарх", "JED": "Джидда",
"JFK": "Нью-Йорк", "KGD": "Калининград", "KGV": "Когалым",
"KHV": "Хабаровск", "KJA": "Красноярск", "KLO": "Калининград",
"KRR": "Краснодар", "KUF": "Самара", "KUL": "Куала-Лумпур",
"KZN": "Казань", "LAX": "Лос-Анджелес", "LED": "Санкт-Петербург",
"LHR": "Лондон", "LPK": "Липецк", "MIA": "Майами", "MLE": "Мале",
"MMK": "Мурманск", "MRV": "Минеральные Воды", "MSQ": "Минск",
"MUC": "Мюнхен", "NBO": "Найроби", "NJC": "Нижневартовск",
"NNM": "Нарьян-Мар", "NQZ": "Астана", "NSK": "Норильск", "NVR": "Набережные Челны",
"NVT": "Новороссийск", "OVB": "Новосибирск", "OMS": "Омск",
"OSL": "Осло", "OSS": "Ош", "OZH": "Ургенч", "PEE": "Пермь",
"PEK": "Пекин", "PKC": "Петропавловск-Камчатский", "PRG": "Прага",
"PSA": "Пиза", "PUY": "Пула", "PUS": "Пусан", "REN": "Оренбург",
"RGK": "Горно-Алтайск", "RIX": "Рига", "ROV": "Ростов-на-Дону",
"RTW": "Саратов", "SAW": "Стамбул (Сабиха)", "SGB": "Балаково",
"SGC": "Сургут", "SIP": "Симферополь", "SKG": "Салоники", "SLY": "Салехард",
"SIN": "Сингапур", "SJJ": "Сараево", "SKZ": "Сыктывкар",
"SVO": "Шереметьево", "SVX": "Екатеринбург", "SWT": "Стрежевой",
"SYD": "Сидней", "TAS": "Ташкент", "TBS": "Тбилиси", "TJM": "Тюмень",
"TKM": "Туркменбаши", "TLL": "Таллин", "TLS": "Тулуза", "TSE": "Астана",
"TSQ": "Тамбов", "TXL": "Берлин", "ULV": "Ульяновск", "UFA": "Уфа",
"UJD": "Усть-Каменогорск", "VOG": "Волгоград", "VOZ": "Воронеж",
"VNO": "Вильнюс", "VVO": "Владивосток", "VKO": "Внуково", "WAW": "Варшава",
"WLG": "Веллингтон", "XRY": "Херес", "YKS": "Якутск", "YYZ": "Торонто",
"ZIA": "Жуковский", "ZRH": "Цюрих", "ZYR": "Зырянка",
}
def _city(iata: str) -> str:
"""IATA code → human-readable city name."""
return IATA_CITY.get(iata.strip() if iata else "", iata or "")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [api] %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
log = logging.getLogger("api")
app = Flask(__name__, static_folder="/app/static", static_url_path="/static")
DB_DSN = (
f"host={os.environ['POSTGRES_HOST']} "
f"port={os.environ.get('POSTGRES_PORT', 5432)} "
f"dbname={os.environ['POSTGRES_DB']} "
f"user={os.environ['POSTGRES_USER']} "
f"password={os.environ['POSTGRES_PASSWORD']}"
)
API_PORT = int(os.environ.get("API_PORT", 8080))
HEALTHCHECK_FILE = "/tmp/api-ready"
START_TIME = datetime.now(timezone.utc)
# ── db connection (simple persistent conn with reconnect) ─────────────────────
_conn = None
def get_conn():
global _conn
if _conn is None or _conn.closed:
_conn = psycopg2.connect(DB_DSN)
psycopg2.extras.register_uuid(_conn)
log.info("DB connection established")
return _conn
def query(sql: str, params=None) -> list:
for attempt in range(2):
try:
conn = get_conn()
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, params)
return [dict(r) for r in cur.fetchall()]
except psycopg2.OperationalError:
global _conn
_conn = None
if attempt == 1:
raise
def query_one(sql: str, params=None) -> dict | None:
rows = query(sql, params)
return rows[0] if rows else None
# ── serialisation helper ──────────────────────────────────────────────────────
def serial(obj):
"""Make psycopg2 types JSON-serialisable."""
import decimal, uuid
if isinstance(obj, (datetime,)):
return obj.isoformat()
if isinstance(obj, decimal.Decimal):
return float(obj)
if isinstance(obj, uuid.UUID):
return str(obj)
raise TypeError(f"Not serialisable: {type(obj)}")
def ok(data, **kwargs):
return app.response_class(
__import__("json").dumps(data, default=serial),
mimetype="application/json",
**kwargs,
)
def err(msg: str, status: int = 500):
return ok({"error": msg}, status=status)
# ── routes ────────────────────────────────────────────────────────────────────
@app.get("/")
def index():
return send_from_directory("/app/static", "index.html")
@app.get("/health")
def health():
try:
query_one("SELECT 1")
db_ok = True
except Exception:
db_ok = False
return ok({
"status": "ok" if db_ok else "degraded",
"db": "ok" if db_ok else "error",
"uptime_seconds": int((datetime.now(timezone.utc) - START_TIME).total_seconds()),
}, status=200 if db_ok else 503)
@app.get("/dashboard/status")
def dashboard_status():
try:
captures = query_one("SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE status='active') AS active FROM fr24.captures")
packets = query_one("SELECT COUNT(*) AS total FROM fr24.raw_packets")
state = query_one("SELECT state_value FROM fr24.processing_state WHERE state_key='preprocess_cursor'")
aircraft = query_one("SELECT COUNT(*) AS total FROM fr24.aircraft")
flights = query_one("SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE status='active') AS active FROM fr24.flights")
return ok({
"captures": captures,
"raw_packets": packets,
"processing_state": state["state_value"] if state else None,
"aircraft": aircraft,
"flights": flights,
})
except Exception as e:
return err(str(e))
@app.get("/viewer/config")
def viewer_config():
return ok({
"system": "fr24-ingest",
"version": "0.1.0",
"stage": "step-3-real-tracks",
"db_schema": "fr24",
"center": {"lat": 55.75, "lon": 37.62},
"zoom": 7,
"features": {
"adsb_decode": True,
"real_rtlsdr": True,
"noise_model": False,
},
})
@app.get("/captures")
def captures():
try:
limit = min(int(request.args.get("limit", 50)), 200)
rows = query(
"""
SELECT capture_id, started_at, ended_at, source, device_index,
center_frequency_hz, sample_rate_hz, gain_db, status, notes, created_at
FROM fr24.captures
ORDER BY started_at DESC
LIMIT %s
""",
(limit,),
)
return ok({"captures": rows, "count": len(rows)})
except Exception as e:
return err(str(e))
@app.get("/aircraft")
def aircraft_list():
try:
limit = min(int(request.args.get("limit", 100)), 500)
rows = query(
"""
SELECT aircraft_id, icao24, callsign, registration, aircraft_type,
operator_name, first_seen_at, last_seen_at
FROM fr24.aircraft
ORDER BY last_seen_at DESC NULLS LAST
LIMIT %s
""",
(limit,),
)
return ok({"aircraft": rows, "count": len(rows)})
except Exception as e:
return err(str(e))
@app.get("/flights")
def flights():
try:
limit = min(int(request.args.get("limit", 100)), 500)
status_filter = request.args.get("status")
if status_filter:
rows = query(
"""
SELECT f.flight_id, f.aircraft_id, a.icao24, f.callsign,
f.departure_airport, f.arrival_airport,
f.started_at, f.ended_at, f.status, f.source
FROM fr24.flights f
JOIN fr24.aircraft a USING (aircraft_id)
WHERE f.status = %s
ORDER BY f.started_at DESC
LIMIT %s
""",
(status_filter, limit),
)
else:
rows = query(
"""
SELECT f.flight_id, f.aircraft_id, a.icao24, f.callsign,
f.departure_airport, f.arrival_airport,
f.started_at, f.ended_at, f.status, f.source
FROM fr24.flights f
JOIN fr24.aircraft a USING (aircraft_id)
ORDER BY f.started_at DESC
LIMIT %s
""",
(limit,),
)
return ok({"flights": rows, "count": len(rows)})
except Exception as e:
return err(str(e))
# ── map / live endpoints ──────────────────────────────────────────────────────
@app.get("/api/aircraft/live")
def aircraft_live():
"""Active aircraft with their latest position as GeoJSON FeatureCollection."""
try:
minutes = int(request.args.get("minutes", 60))
rows = query(
"""
SELECT
a.icao24,
a.callsign,
a.registration,
a.aircraft_type,
tp.observed_at,
ST_X(tp.geom) AS lon,
ST_Y(tp.geom) AS lat,
tp.altitude_m,
tp.ground_speed_kt,
tp.heading_deg,
tp.vertical_rate_fpm
FROM fr24.aircraft a
JOIN fr24.flights f ON f.aircraft_id = a.aircraft_id
JOIN LATERAL (
SELECT tp2.geom, tp2.observed_at, tp2.altitude_m,
tp2.ground_speed_kt, tp2.heading_deg, tp2.vertical_rate_fpm
FROM fr24.track_points tp2
WHERE tp2.flight_id = f.flight_id
ORDER BY tp2.observed_at DESC
LIMIT 1
) tp ON true
WHERE f.status = 'active'
AND tp.observed_at >= now() - (%s || ' minutes')::interval
ORDER BY tp.observed_at DESC
""",
(minutes,),
)
features = []
for r in rows:
if r["lon"] is None or r["lat"] is None:
continue
features.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [r["lon"], r["lat"]]},
"properties": {
"icao24": r["icao24"],
"callsign": r["callsign"] or r["icao24"],
"registration": r["registration"],
"aircraft_type": r["aircraft_type"],
"altitude_m": r["altitude_m"],
"ground_speed_kt": r["ground_speed_kt"],
"heading_deg": r["heading_deg"],
"vertical_rate_fpm": r["vertical_rate_fpm"],
"observed_at": r["observed_at"].isoformat() if r["observed_at"] else None,
},
})
return ok({"type": "FeatureCollection", "features": features, "count": len(features)})
except Exception as e:
return err(str(e))
@app.get("/api/aircraft/<icao24>")
def aircraft_detail(icao24: str):
"""Details for a single aircraft including recent track points."""
try:
ac = query_one(
"SELECT * FROM fr24.aircraft WHERE icao24 = %s",
(icao24.lower(),),
)
if not ac:
return err("not found", 404)
flights_rows = query(
"""
SELECT flight_id, callsign, departure_airport, arrival_airport,
started_at, ended_at, status
FROM fr24.flights
WHERE aircraft_id = %s
ORDER BY started_at DESC
LIMIT 10
""",
(ac["aircraft_id"],),
)
# last 100 track points across all recent flights
points = query(
"""
SELECT tp.observed_at,
ST_X(tp.geom) AS lon, ST_Y(tp.geom) AS lat,
tp.altitude_m, tp.ground_speed_kt, tp.heading_deg
FROM fr24.track_points tp
JOIN fr24.flights f ON f.flight_id = tp.flight_id
WHERE f.aircraft_id = %s
ORDER BY tp.observed_at DESC
LIMIT 100
""",
(ac["aircraft_id"],),
)
return ok({
"aircraft": ac,
"flights": flights_rows,
"recent_points": points,
})
except Exception as e:
return err(str(e))
@app.get("/monitoring")
def monitoring_page():
return send_from_directory("/app/static", "monitoring.html")
@app.get("/schedule")
def schedule_page():
return send_from_directory("/app/static", "schedule.html")
# ── schedule API ──────────────────────────────────────────────────────────────
def _schedule_where(args):
"""Build WHERE clause + params list from request args."""
clauses = []
params = []
date_from = args.get("date_from")
date_to = args.get("date_to")
airport = args.get("airport", "all")
direction = args.get("direction", "all")
flight_number = args.get("flight_number", "").strip()
time_from = args.get("time_from", "").strip()
time_to = args.get("time_to", "").strip()
if date_from:
clauses.append("flight_date >= %s")
params.append(date_from)
if date_to:
clauses.append("flight_date <= %s")
params.append(date_to)
if airport and airport != "all":
clauses.append("airport_iata = %s")
params.append(airport)
if direction and direction != "all":
clauses.append("direction = %s")
params.append(direction)
if flight_number:
clauses.append("flight_number ILIKE %s")
params.append(f"%{flight_number}%")
if time_from:
try:
h, m = map(int, time_from.split(":"))
clauses.append(
"(EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC') * 60 "
"+ EXTRACT(MINUTE FROM scheduled_at AT TIME ZONE 'UTC')) >= %s"
)
params.append(h * 60 + m)
except ValueError:
pass
if time_to:
try:
h, m = map(int, time_to.split(":"))
clauses.append(
"(EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC') * 60 "
"+ EXTRACT(MINUTE FROM scheduled_at AT TIME ZONE 'UTC')) <= %s"
)
params.append(h * 60 + m)
except ValueError:
pass
where = " AND ".join(clauses) if clauses else "1=1"
return where, params
@app.get("/api/schedule/data")
def schedule_data():
try:
limit = min(int(request.args.get("limit", 100)), 1000)
offset = max(int(request.args.get("offset", 0)), 0)
where, params = _schedule_where(request.args)
total_row = query_one(
f"SELECT COUNT(*) AS cnt FROM fr24_ext.schedule WHERE {where}",
params,
)
total = total_row["cnt"] if total_row else 0
# Qualify bare column names that become ambiguous after JOIN
import re as _re
safe_where = _re.sub(r'\bflight_date\b', 's.flight_date', where)
safe_where = _re.sub(r'\bflight_number\b', 's.flight_number', safe_where)
rows = query(
f"""
SELECT
s.flight_number, s.airline_name, s.airport_iata, s.direction,
s.origin_iata, s.destination_iata,
s.scheduled_at, s.actual_at, s.status, s.icao24,
s.flight_date, s.duration_min, s.thread_title,
s.actual_takeoff, s.actual_landed,
s.delay_takeoff_min, s.delay_landed_min,
s.fr24_id, s.flight_category, s.source AS sched_source,
-- из flight_actual:
fa.runway_takeoff, fa.runway_landed,
fa.actual_distance,
fa.flight_time AS fa_flight_time,
fa.registration,
fa.operated_as,
fa.category AS fa_category,
fa.orig_iata AS fa_orig_iata,
fa.dest_iata AS fa_dest_iata,
-- из mart.flights (тип ВС):
COALESCE(mf.aircraft_type, s.aircraft_type) AS aircraft_type,
mf.track_source, mf.track_points
FROM fr24_ext.schedule s
LEFT JOIN fr24_ext.flight_actual fa
ON fa.fr24_id = s.fr24_id AND s.fr24_id IS NOT NULL
LEFT JOIN fr24_mart.flights mf
ON mf.flight_number = s.flight_number
AND mf.flight_date = s.flight_date
WHERE {safe_where}
ORDER BY s.scheduled_at DESC
LIMIT %s OFFSET %s
""",
params + [limit, offset],
)
flights = []
for r in rows:
sched = r["scheduled_at"]
actual = r["actual_at"]
delay_min = (
int((actual - sched).total_seconds() / 60)
if actual and sched else None
)
actual_takeoff = r.get("actual_takeoff")
actual_landed = r.get("actual_landed")
delay_takeoff = r.get("delay_takeoff_min")
delay_landed = r.get("delay_landed_min")
fr24_id = r.get("fr24_id")
flight_cat = r.get("flight_category")
flights.append({
"flight_number": r["flight_number"],
"airline": r["airline_name"],
"airport": r["airport_iata"],
"direction": r["direction"],
"origin": _city(r["origin_iata"]) or r["origin_iata"] or "",
"destination": _city(r["destination_iata"]) or r["destination_iata"] or "",
"origin_iata": r["origin_iata"] or "",
"destination_iata": r["destination_iata"] or "",
"thread_title": r["thread_title"] or "",
"scheduled_at": sched.isoformat() if sched else None,
"actual_at": actual.isoformat() if actual else None,
"delay_min": delay_min,
"duration_min": r["duration_min"],
"status": r["status"],
"icao24": r["icao24"],
# FR24 actual times
"actual_takeoff": actual_takeoff.isoformat() if hasattr(actual_takeoff, 'isoformat') else None,
"actual_landed": actual_landed.isoformat() if hasattr(actual_landed, 'isoformat') else None,
"delay_takeoff_min": delay_takeoff,
"delay_landed_min": delay_landed,
"fr24_id": fr24_id,
"flight_category": flight_cat,
# Новые поля из flight_actual
"runway_takeoff": r.get("runway_takeoff"),
"runway_landed": r.get("runway_landed"),
"actual_distance": r.get("actual_distance"),
"flight_time_min": r.get("fa_flight_time"),
"registration": r.get("registration"),
"aircraft_type": r.get("aircraft_type"),
"track_source": r.get("track_source"),
"track_points": r.get("track_points"),
"sched_source": r.get("sched_source"),
# Эффективная длительность: FA > schedule
# fa_flight_time — секунды (FR24), duration_min — минуты (Яндекс)
"duration_eff": (lambda fa_ft: (int(fa_ft) // 60 if fa_ft else None) or r.get("duration_min"))(r.get("fa_flight_time")),
# Маршрут из FR24 (IATA коды → города)
"fa_origin_iata": r.get("fa_orig_iata"),
"fa_dest_iata": r.get("fa_dest_iata"),
"fa_origin_city": _city(r.get("fa_orig_iata") or ""),
"fa_dest_city": _city(r.get("fa_dest_iata") or ""),
})
return ok({"total": total, "flights": flights})
except Exception as e:
log.exception("schedule_data error")
return err(str(e))
# ── actual flight data API ──────────────────────────────────────────────────
@app.get("/api/flight-actual")
def flight_actual():
"""Query fr24_ext.flight_actual table directly."""
try:
limit = min(int(request.args.get("limit", 200)), 2000)
offset = max(int(request.args.get("offset", 0)), 0)
date_from = request.args.get("date_from")
date_to = request.args.get("date_to")
category = request.args.get("category")
airport = request.args.get("airport") # ICAO filter
clauses = []
params = []
if date_from:
clauses.append("flight_date >= %s"); params.append(date_from)
if date_to:
clauses.append("flight_date <= %s"); params.append(date_to)
if category:
clauses.append("category = %s"); params.append(category)
if airport:
clauses.append("(origin_icao = %s OR dest_icao = %s)"); params.extend([airport, airport])
where = " AND ".join(clauses) if clauses else "1=1"
rows = query(
f"""
SELECT fr24_id, flight, callsign, operated_as,
origin_icao, dest_icao,
datetime_takeoff, datetime_landed, flight_time,
runway_takeoff, runway_landed, actual_distance,
category, flight_ended, first_seen, last_seen, flight_date
FROM fr24_ext.flight_actual
WHERE {where}
ORDER BY flight_date DESC, datetime_takeoff DESC NULLS LAST
LIMIT %s OFFSET %s
""",
params + [limit, offset],
)
total_row = query_one(
f"SELECT COUNT(*) AS cnt FROM fr24_ext.flight_actual WHERE {where}",
params,
)
return ok({"total": total_row["cnt"] if total_row else 0, "flights": rows})
except Exception as e:
log.exception("flight_actual error")
return err(str(e))
@app.get("/api/schedule/export")
def schedule_export():
try:
where, params = _schedule_where(request.args)
rows = query(
f"""
SELECT
flight_date, flight_number, airline_name, airport_iata, direction,
origin_iata, destination_iata,
scheduled_at, actual_at, status, icao24, duration_min,
actual_takeoff, actual_landed, delay_takeoff_min, delay_landed_min,
fr24_id, flight_category
FROM fr24_ext.schedule
WHERE {where}
ORDER BY scheduled_at DESC
LIMIT 100000
""",
params,
)
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow([
"Date", "Flight", "Airline", "Airport", "Direction",
"Origin", "Destination", "Scheduled", "Actual",
"Delay (min)", "Duration (min)", "Status", "ICAO24",
"Actual Takeoff", "Actual Landed", "Delay Takeoff (min)", "Delay Landed (min)",
"FR24 ID", "Category",
])
for r in rows:
sched = r["scheduled_at"]
actual = r["actual_at"]
delay = (
int((actual - sched).total_seconds() / 60)
if actual and sched else ""
)
writer.writerow([
str(r["flight_date"]),
r["flight_number"],
r["airline_name"] or "",
r["airport_iata"],
r["direction"],
_city(r["origin_iata"]) or r["origin_iata"] or "",
_city(r["destination_iata"]) or r["destination_iata"] or "",
sched.isoformat() if sched else "",
actual.isoformat() if actual else "",
delay,
r["duration_min"] or "",
r["status"] or "",
r["icao24"] or "",
r["actual_takeoff"].isoformat() if hasattr(r.get("actual_takeoff"), 'isoformat') else "",
r["actual_landed"].isoformat() if hasattr(r.get("actual_landed"), 'isoformat') else "",
r["delay_takeoff_min"] if r.get("delay_takeoff_min") is not None else "",
r["delay_landed_min"] if r.get("delay_landed_min") is not None else "",
r["fr24_id"] or "",
r["flight_category"] or "",
])
buf.seek(0)
return Response(
buf.getvalue(),
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": "attachment; filename=schedule.csv"},
)
except Exception as e:
log.exception("schedule_export error")
return err(str(e))
@app.get("/api/monitoring/status")
def monitoring_status():
"""Return latest monitoring metrics + last 20 rows history."""
try:
latest = query_one(
"""
SELECT id, collected_at, disk_pct, db_size_mb, capture_lag_sec, throughput_5min
FROM fr24.monitoring_metrics
ORDER BY id DESC
LIMIT 1
"""
)
history = query(
"""
SELECT id, collected_at, disk_pct, db_size_mb, capture_lag_sec, throughput_5min
FROM fr24.monitoring_metrics
ORDER BY id DESC
LIMIT 20
"""
)
# live unprocessed count
unprocessed = query_one(
"""
SELECT
COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) AS cursor_id,
(SELECT COUNT(*) FROM fr24.raw_packets) AS total
FROM fr24.processing_state
WHERE state_key = 'preprocess_cursor'
"""
)
if unprocessed:
cursor_id = unprocessed["cursor_id"] or 0
total = unprocessed["total"] or 0
pending_rows = query_one(
"SELECT COUNT(*) as cnt FROM fr24.raw_packets WHERE raw_packet_id > %s",
(cursor_id,)
)
pending = pending_rows["cnt"] if pending_rows else 0
pending_pct = round(pending / total * 100, 1) if total else 0
unprocessed_info = {"pending": pending, "pending_pct": pending_pct, "total": total}
else:
unprocessed_info = None
return ok({"latest": latest, "history": history, "unprocessed": unprocessed_info})
except Exception as e:
return err(str(e))
@app.get("/api/tracks")
def tracks():
"""Track points as GeoJSON LineStrings, filtered by time and optional bbox."""
try:
minutes = int(request.args.get("minutes", 30))
limit = min(int(request.args.get("limit", 50)), 200)
# optional bbox: ?bbox=minlon,minlat,maxlon,maxlat
bbox = request.args.get("bbox")
bbox_clause = ""
bbox_params: list = []
if bbox:
try:
minlon, minlat, maxlon, maxlat = map(float, bbox.split(","))
bbox_clause = (
"AND tp.geom && ST_MakeEnvelope(%s, %s, %s, %s, 4326)"
)
bbox_params = [minlon, minlat, maxlon, maxlat]
except ValueError:
pass
rows = query(
f"""
SELECT
t.track_id,
f.flight_id,
a.icao24,
f.callsign,
json_agg(
json_build_object(
'lon', ST_X(tp.geom),
'lat', ST_Y(tp.geom),
'alt', tp.altitude_m,
'spd', tp.ground_speed_kt,
'hdg', tp.heading_deg,
'ts', tp.observed_at
)
ORDER BY tp.point_order
) AS points
FROM fr24.tracks t
JOIN fr24.flights f ON f.flight_id = t.flight_id
JOIN fr24.aircraft a ON a.aircraft_id = f.aircraft_id
JOIN fr24.track_points tp ON tp.track_id = t.track_id
WHERE tp.observed_at >= now() - (%s || ' minutes')::interval
{bbox_clause}
GROUP BY t.track_id, f.flight_id, a.icao24, f.callsign
ORDER BY t.last_point_at DESC NULLS LAST
LIMIT %s
""",
(minutes, *bbox_params, limit),
)
features = []
for r in rows:
pts = r["points"] if isinstance(r["points"], list) else []
coords = [[p["lon"], p["lat"]] for p in pts if p.get("lon") is not None]
times = [p["ts"].isoformat() if hasattr(p.get("ts"), "isoformat") else str(p["ts"]) if p.get("ts") else None for p in pts if p.get("lon") is not None]
if len(coords) < 2:
continue
features.append({
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": coords},
"properties": {
"track_id": r["track_id"],
"flight_id": r["flight_id"],
"icao24": r["icao24"],
"callsign": r["callsign"] or r["icao24"],
"point_count": len(coords),
"times": times,
},
})
return ok({"type": "FeatureCollection", "features": features, "count": len(features)})
except Exception as e:
return err(str(e))
# ── data-sources page & API ─────────────────────────────────────────────────
@app.get("/data-sources")
def data_sources_page():
return send_from_directory("/app/static", "data_sources.html")
def _ds_where(args, date_col="coverage_date"):
clauses, params = [], []
if args.get("date_from"):
clauses.append(f"{date_col} >= %s"); params.append(args["date_from"])
if args.get("date_to"):
clauses.append(f"{date_col} <= %s"); params.append(args["date_to"])
return (" AND ".join(clauses) if clauses else "1=1"), params
@app.get("/api/data-sources/coverage")
def ds_coverage():
try:
where, params = _ds_where(request.args)
rows = query(
f"""
SELECT
coverage_date,
total_schedule,
with_rtlsdr, with_fr24, with_fa, schedule_only,
rtlsdr_pct, fr24_pct, fa_pct
FROM fr24_mart.source_coverage
WHERE {where}
ORDER BY coverage_date
""",
params,
)
totals = query_one(
f"""
SELECT
SUM(total_schedule) AS total_schedule,
SUM(with_rtlsdr) AS with_rtlsdr,
SUM(with_fr24) AS with_fr24,
SUM(with_fa) AS with_fa,
COUNT(*) AS days
FROM fr24_mart.source_coverage
WHERE {where}
""",
params,
)
return ok({"rows": rows, "totals": totals or {}})
except Exception as e:
return err(str(e))
@app.get("/api/data-sources/quality")
def ds_quality():
try:
where, params = _ds_where(request.args, date_col="flight_date")
q = query_one(
f"""
SELECT
COUNT(*) AS total,
COUNT(*) FILTER (WHERE origin_iata IS NOT NULL
AND destination_iata IS NOT NULL) AS with_route,
COUNT(*) FILTER (WHERE track_source IS NOT NULL) AS with_track,
COUNT(*) FILTER (WHERE actual_dep IS NOT NULL
OR actual_arr IS NOT NULL) AS with_actual_time,
COUNT(*) FILTER (WHERE aircraft_type IS NOT NULL) AS with_aircraft_type
FROM fr24_mart.flights
WHERE {where}
""",
params,
)
# median points per source
def median_pts(source):
r = query_one(
f"""
SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY track_points) AS med
FROM fr24_mart.flights
WHERE {where} AND track_source = %s AND track_points > 0
""",
params + [source],
)
v = r["med"] if r else None
return int(v) if v is not None else None
quality = dict(q or {})
quality["median_points_rtlsdr"] = median_pts("rtlsdr")
quality["median_points_fr24"] = median_pts("fr24")
quality["median_points_fa"] = median_pts("fa")
return ok({"quality": quality})
except Exception as e:
return err(str(e))
@app.get("/api/data-sources/top-airlines")
def ds_top_airlines():
try:
where, params = _ds_where(request.args, date_col="flight_date")
rows = query(
f"""
SELECT airline_iata, COUNT(*) AS flight_count
FROM fr24_mart.flights
WHERE {where} AND airline_iata IS NOT NULL
GROUP BY airline_iata
ORDER BY flight_count DESC
LIMIT 20
""",
params,
)
return ok({"rows": rows})
except Exception as e:
return err(str(e))
@app.get("/api/data-sources/top-routes")
def ds_top_routes():
try:
where, params = _ds_where(request.args, date_col="flight_date")
rows = query(
f"""
SELECT origin_iata, destination_iata, COUNT(*) AS flight_count
FROM fr24_mart.flights
WHERE {where}
AND origin_iata IS NOT NULL
AND destination_iata IS NOT NULL
GROUP BY origin_iata, destination_iata
ORDER BY flight_count DESC
LIMIT 20
""",
params,
)
return ok({"rows": rows})
except Exception as e:
return err(str(e))
@app.get("/api/data-sources/airport-load")
def ds_airport_load():
try:
where, params = _ds_where(request.args, date_col="flight_date")
rows = query(
f"""
SELECT
airport_iata,
EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC')::int AS hour,
COUNT(*) AS flight_count
FROM fr24_ext.schedule
WHERE {where}
AND airport_iata IN ('SVO','DME','VKO','ZIA')
GROUP BY airport_iata, hour
ORDER BY airport_iata, hour
""",
params,
)
return ok({"rows": rows})
except Exception as e:
return err(str(e))
# ── startup ───────────────────────────────────────────────────────────────────
def wait_for_db(max_attempts: int = 30):
for attempt in range(1, max_attempts + 1):
try:
get_conn()
return
except psycopg2.OperationalError as e:
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
time.sleep(2)
log.error("Could not connect to DB")
raise SystemExit(1)
if __name__ == "__main__":
wait_for_db()
open(HEALTHCHECK_FILE, "w").close()
log.info("Healthcheck file written: %s", HEALTHCHECK_FILE)
log.info("Starting API on port %d", API_PORT)
app.run(host="0.0.0.0", port=API_PORT, debug=False)