Files
2026-04-20 10:10:01 +03:00

447 lines
16 KiB
Python

"""
FR24 Preprocess Service — Step 2: Real SBS-1 parsing
Reads raw_packets written by the capture service (SBS-1 format, base64-encoded),
extracts real ADS-B fields, and builds aircraft/flights/tracks/track_points.
"""
import os
import time
import base64
import logging
import signal
import sys
import json
from datetime import datetime, timezone
import psycopg2
import psycopg2.extras
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [preprocess] %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
log = logging.getLogger("preprocess")
DB_DSN = (
f"host={os.environ['POSTGRES_HOST']} "
f"port={os.environ.get('POSTGRES_PORT', 5432)} "
f"dbname={os.environ['POSTGRES_DB']} "
f"user={os.environ['POSTGRES_USER']} "
f"password={os.environ['POSTGRES_PASSWORD']}"
)
POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SECONDS", 5.0))
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 50))
HEALTHCHECK_FILE = "/tmp/preprocess-ready"
STATE_KEY = "preprocess_cursor"
# ── SBS-1 parser ─────────────────────────────────────────────────────────────
# MSG format (22 comma-separated fields):
# MSG,<msgtype>,<sid>,<aid>,<hex>,<fid>,<date_gen>,<time_gen>,<date_log>,<time_log>,
# <callsign>,<alt_ft>,<speed_kt>,<track_deg>,<lat>,<lon>,<vrate_fpm>,
# <squawk>,<alert>,<emerg>,<spi>,<onground>
def _float(s: str) -> float | None:
try:
return float(s) if s.strip() else None
except ValueError:
return None
def parse_sbs1(payload_base64: str) -> dict | None:
"""
Decode base64 payload, parse SBS-1 MSG line.
Returns dict with extracted fields, or None if unparseable / not a MSG.
"""
try:
line = base64.b64decode(payload_base64).decode("ascii", errors="replace").strip()
except Exception:
return None
parts = line.split(",")
if len(parts) < 22 or parts[0] != "MSG":
return None
icao24 = parts[4].strip().upper()
if not icao24:
return None
return {
"msg_type": parts[1].strip(),
"icao24": icao24,
"callsign": parts[10].strip() or None,
"altitude": _float(parts[11]), # feet
"speed": _float(parts[12]), # knots
"heading": _float(parts[13]), # degrees
"lat": _float(parts[14]),
"lon": _float(parts[15]),
"vrate": _float(parts[16]), # ft/min
"squawk": parts[17].strip() or None,
"on_ground": parts[21].strip() == "-1",
}
def altitude_ft_to_m(ft: float | None) -> float | None:
return round(ft * 0.3048, 2) if ft is not None else None
# ── db ────────────────────────────────────────────────────────────────────────
def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection:
for attempt in range(1, max_attempts + 1):
try:
conn = psycopg2.connect(DB_DSN)
psycopg2.extras.register_uuid(conn)
log.info("PostgreSQL connected (attempt %d)", attempt)
return conn
except psycopg2.OperationalError as e:
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
time.sleep(2)
log.error("Could not connect to PostgreSQL after %d attempts", max_attempts)
sys.exit(1)
def get_cursor(conn) -> int:
with conn.cursor() as cur:
cur.execute(
"SELECT state_value FROM fr24.processing_state WHERE state_key = %s",
(STATE_KEY,),
)
row = cur.fetchone()
if row:
return int(row[0].get("last_raw_packet_id", 0))
return 0
def set_cursor(conn, last_id: int):
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO fr24.processing_state (state_key, state_value, updated_at, note)
VALUES (%s, %s::jsonb, now(), 'preprocess cursor')
ON CONFLICT (state_key) DO UPDATE
SET state_value = EXCLUDED.state_value,
updated_at = now()
""",
(STATE_KEY, json.dumps({"last_raw_packet_id": last_id})),
)
conn.commit()
def fetch_unprocessed(conn, after_id: int, limit: int) -> list:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT raw_packet_id, capture_id, observed_at,
payload_base64, message_type, decoded_format
FROM fr24.raw_packets
WHERE raw_packet_id > %s
ORDER BY raw_packet_id
LIMIT %s
""",
(after_id, limit),
)
return cur.fetchall()
# ── upsert helpers ────────────────────────────────────────────────────────────
def upsert_aircraft(conn, icao24: str, callsign: str | None, now: datetime) -> int:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO fr24.aircraft
(icao24, callsign, first_seen_at, last_seen_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (icao24) DO UPDATE
SET last_seen_at = EXCLUDED.last_seen_at,
callsign = COALESCE(EXCLUDED.callsign, fr24.aircraft.callsign),
updated_at = now()
RETURNING aircraft_id
""",
(icao24, callsign, now, now),
)
return cur.fetchone()[0]
FLIGHT_GAP_MINUTES = 30 # close flight if no points for this long
def get_or_create_flight(conn, aircraft_id: int, callsign: str | None, observed_at) -> int:
with conn.cursor() as cur:
# reuse active flight only if last point was recent enough
cur.execute(
"""
SELECT f.flight_id, MAX(tp.observed_at) as last_seen
FROM fr24.flights f
LEFT JOIN fr24.track_points tp ON tp.flight_id = f.flight_id
WHERE f.aircraft_id = %s AND f.status = 'active'
GROUP BY f.flight_id
ORDER BY f.started_at DESC LIMIT 1
""",
(aircraft_id,),
)
row = cur.fetchone()
if row:
flight_id, last_seen = row
if last_seen is None or (observed_at - last_seen).total_seconds() < FLIGHT_GAP_MINUTES * 60:
return flight_id
# gap too large — close old flight and open new one
cur.execute(
"UPDATE fr24.flights SET status = 'completed', ended_at = %s WHERE flight_id = %s",
(last_seen, flight_id),
)
cur.execute(
"""
INSERT INTO fr24.flights
(aircraft_id, started_at, status, source, callsign)
VALUES (%s, %s, 'active', 'rtl-sdr', %s)
RETURNING flight_id
""",
(aircraft_id, observed_at, callsign),
)
return cur.fetchone()[0]
def update_flight_callsign(conn, aircraft_id: int, callsign: str) -> None:
"""Update callsign on active flight if not already set."""
with conn.cursor() as cur:
cur.execute(
"""
UPDATE fr24.flights
SET callsign = %s
WHERE aircraft_id = %s
AND status = 'active'
AND callsign IS NULL
""",
(callsign, aircraft_id),
)
def get_or_create_track(conn, flight_id: int) -> int:
with conn.cursor() as cur:
cur.execute(
"SELECT track_id FROM fr24.tracks WHERE flight_id = %s LIMIT 1",
(flight_id,),
)
row = cur.fetchone()
if row:
return row[0]
cur.execute(
"INSERT INTO fr24.tracks (flight_id) VALUES (%s) RETURNING track_id",
(flight_id,),
)
return cur.fetchone()[0]
def append_track_point(conn, track_id: int, flight_id: int, observed_at,
raw_packet_id: int, partition_date,
lat: float, lon: float, alt_m: float | None,
speed: float | None, heading: float | None,
vrate: float | None) -> None:
with conn.cursor() as cur:
cur.execute(
"SELECT COALESCE(MAX(point_order), 0) + 1 FROM fr24.track_points WHERE track_id = %s",
(track_id,),
)
point_order = cur.fetchone()[0]
cur.execute(
"""
INSERT INTO fr24.track_points
(track_id, flight_id, observed_at, point_order, geom,
altitude_m, ground_speed_kt, vertical_rate_fpm, heading_deg,
source_packet_id, source_partition_date)
VALUES
(%s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s)
""",
(
track_id, flight_id, observed_at, point_order, lon, lat,
alt_m, speed, vrate, heading,
raw_packet_id, partition_date,
),
)
cur.execute(
"""
UPDATE fr24.tracks SET
point_count = point_count + 1,
last_point_at = %s,
min_altitude_m = LEAST(COALESCE(min_altitude_m, %s), %s),
max_altitude_m = GREATEST(COALESCE(max_altitude_m, %s), %s),
updated_at = now()
WHERE track_id = %s
""",
(observed_at, alt_m, alt_m, alt_m, alt_m, track_id),
)
# ── batch processor ───────────────────────────────────────────────────────────
def process_batch(conn, packets: list, aircraft_state: dict) -> int:
last_id = 0
skipped = 0
for pkt in packets:
raw_id = pkt["raw_packet_id"]
observed_at = pkt["observed_at"]
partition_date = observed_at.date() if hasattr(observed_at, "date") else None
parsed = parse_sbs1(pkt["payload_base64"])
if not parsed:
# not an SBS-1 MSG line — skip silently
last_id = raw_id
skipped += 1
continue
# only process messages that carry position (MSG type 3)
# MSG1=callsign, MSG2=surface, MSG3=airborne pos, MSG4=airborne vel,
# MSG5=surveillance alt, MSG6=surveillance id, MSG7=air-to-air, MSG8=all-call
# We store all but only build track_points when we have lat+lon
icao24 = parsed["icao24"]
callsign = parsed["callsign"]
lat = parsed["lat"]
lon = parsed["lon"]
alt_m = altitude_ft_to_m(parsed["altitude"])
speed = parsed["speed"]
heading = parsed["heading"]
vrate = parsed["vrate"]
msg_type = parsed.get("msg_type")
onground = parsed.get("onground")
# update state cache from MSG4 (velocity)
if msg_type == "4":
if icao24 not in aircraft_state:
aircraft_state[icao24] = {}
if parsed["speed"] is not None:
aircraft_state[icao24]["speed"] = float(parsed["speed"])
if parsed["heading"] is not None:
aircraft_state[icao24]["heading"] = float(parsed["heading"])
if parsed["vrate"] is not None:
aircraft_state[icao24]["vrate"] = float(parsed["vrate"])
if onground is not None:
aircraft_state[icao24]["onground"] = onground
aircraft_state[icao24]["velocity_ts"] = observed_at
# update onground from any message
if onground is not None and icao24 in aircraft_state:
aircraft_state[icao24]["onground"] = onground
# for MSG3: enrich with cached velocity
if msg_type == "3":
cached = aircraft_state.get(icao24, {})
velocity_ts = cached.get("velocity_ts")
# use cached velocity only if it's fresh (< 30 sec old)
velocity_fresh = (
velocity_ts is not None and
(observed_at - velocity_ts).total_seconds() < 30
)
if speed is None and velocity_fresh:
speed = cached.get("speed")
if heading is None and velocity_fresh:
heading = cached.get("heading")
if vrate is None and velocity_fresh:
vrate = cached.get("vrate")
if onground is None:
onground = cached.get("onground")
# skip ground-stationary aircraft (onground=1 means parked/taxiing)
if onground == "1":
last_id = raw_id
continue
try:
now = observed_at if isinstance(observed_at, datetime) else datetime.now(timezone.utc)
aircraft_id = upsert_aircraft(conn, icao24, callsign, now)
flight_id = get_or_create_flight(conn, aircraft_id, callsign, observed_at)
# update callsign on active flight if MSG1 brought a callsign
if msg_type == "1" and callsign:
update_flight_callsign(conn, aircraft_id, callsign)
# sanity-check: skip obviously invalid coordinates
if lat is not None and lon is not None and not (-90 <= lat <= 90 and -180 <= lon <= 180):
log.warning("Invalid coords icao24=%s lat=%s lon=%s — skipping", icao24, lat, lon)
lat, lon = None, None
if lat is not None and lon is not None:
track_id = get_or_create_track(conn, flight_id)
append_track_point(
conn, track_id, flight_id, observed_at,
raw_id, partition_date,
lat, lon, alt_m, speed, heading, vrate,
)
conn.commit()
last_id = raw_id
except Exception as e:
log.error("Failed processing packet %s (icao24=%s): %s", raw_id, icao24, e)
conn.rollback()
last_id = raw_id # advance cursor even on error to avoid infinite retry
if skipped:
log.debug("Skipped %d non-MSG packets in batch", skipped)
return last_id
# ── main ──────────────────────────────────────────────────────────────────────
def main():
conn = wait_for_db(30)
open(HEALTHCHECK_FILE, "w").close()
log.info("Healthcheck file written: %s", HEALTHCHECK_FILE)
shutdown = {"flag": False}
def _handle_signal(sig, frame):
log.info("Signal %s — shutting down", sig)
shutdown["flag"] = True
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
# cache: icao24 -> {speed, heading, vrate, onground}
aircraft_state: dict = {}
total = 0
log.info("Preprocess loop started (poll=%.1fs, batch=%d)", POLL_INTERVAL, BATCH_SIZE)
while not shutdown["flag"]:
try:
cursor = get_cursor(conn)
packets = fetch_unprocessed(conn, cursor, BATCH_SIZE)
if packets:
last_id = process_batch(conn, packets, aircraft_state)
if last_id:
set_cursor(conn, last_id)
total += len(packets)
log.info(
"Processed %d packets (cursor→%d, total=%d)",
len(packets), last_id, total,
)
else:
log.debug("No new packets, sleeping")
except Exception as e:
log.error("Poll error: %s", e)
try:
conn.rollback()
except Exception:
pass
time.sleep(POLL_INTERVAL)
conn.close()
log.info("Preprocess service stopped. Total processed: %d", total)
if __name__ == "__main__":
main()