""" FR24 Preprocess Service — Step 2: Real SBS-1 parsing Reads raw_packets written by the capture service (SBS-1 format, base64-encoded), extracts real ADS-B fields, and builds aircraft/flights/tracks/track_points. """ import os import time import base64 import logging import signal import sys import json from datetime import datetime, timezone import psycopg2 import psycopg2.extras logging.basicConfig( level=logging.INFO, format="%(asctime)s [preprocess] %(levelname)s %(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) log = logging.getLogger("preprocess") DB_DSN = ( f"host={os.environ['POSTGRES_HOST']} " f"port={os.environ.get('POSTGRES_PORT', 5432)} " f"dbname={os.environ['POSTGRES_DB']} " f"user={os.environ['POSTGRES_USER']} " f"password={os.environ['POSTGRES_PASSWORD']}" ) POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SECONDS", 5.0)) BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 50)) HEALTHCHECK_FILE = "/tmp/preprocess-ready" STATE_KEY = "preprocess_cursor" # ── SBS-1 parser ───────────────────────────────────────────────────────────── # MSG format (22 comma-separated fields): # MSG,,,,,,,,,, # ,,,,,,, # ,,,, def _float(s: str) -> float | None: try: return float(s) if s.strip() else None except ValueError: return None def parse_sbs1(payload_base64: str) -> dict | None: """ Decode base64 payload, parse SBS-1 MSG line. Returns dict with extracted fields, or None if unparseable / not a MSG. """ try: line = base64.b64decode(payload_base64).decode("ascii", errors="replace").strip() except Exception: return None parts = line.split(",") if len(parts) < 22 or parts[0] != "MSG": return None icao24 = parts[4].strip().upper() if not icao24: return None return { "msg_type": parts[1].strip(), "icao24": icao24, "callsign": parts[10].strip() or None, "altitude": _float(parts[11]), # feet "speed": _float(parts[12]), # knots "heading": _float(parts[13]), # degrees "lat": _float(parts[14]), "lon": _float(parts[15]), "vrate": _float(parts[16]), # ft/min "squawk": parts[17].strip() or None, "on_ground": parts[21].strip() == "-1", } def altitude_ft_to_m(ft: float | None) -> float | None: return round(ft * 0.3048, 2) if ft is not None else None # ── db ──────────────────────────────────────────────────────────────────────── def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection: for attempt in range(1, max_attempts + 1): try: conn = psycopg2.connect(DB_DSN) psycopg2.extras.register_uuid(conn) log.info("PostgreSQL connected (attempt %d)", attempt) return conn except psycopg2.OperationalError as e: log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e) time.sleep(2) log.error("Could not connect to PostgreSQL after %d attempts", max_attempts) sys.exit(1) def get_cursor(conn) -> int: with conn.cursor() as cur: cur.execute( "SELECT state_value FROM fr24.processing_state WHERE state_key = %s", (STATE_KEY,), ) row = cur.fetchone() if row: return int(row[0].get("last_raw_packet_id", 0)) return 0 def set_cursor(conn, last_id: int): with conn.cursor() as cur: cur.execute( """ INSERT INTO fr24.processing_state (state_key, state_value, updated_at, note) VALUES (%s, %s::jsonb, now(), 'preprocess cursor') ON CONFLICT (state_key) DO UPDATE SET state_value = EXCLUDED.state_value, updated_at = now() """, (STATE_KEY, json.dumps({"last_raw_packet_id": last_id})), ) conn.commit() def fetch_unprocessed(conn, after_id: int, limit: int) -> list: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ SELECT raw_packet_id, capture_id, observed_at, payload_base64, message_type, decoded_format FROM fr24.raw_packets WHERE raw_packet_id > %s ORDER BY raw_packet_id LIMIT %s """, (after_id, limit), ) return cur.fetchall() # ── upsert helpers ──────────────────────────────────────────────────────────── def upsert_aircraft(conn, icao24: str, callsign: str | None, now: datetime) -> int: with conn.cursor() as cur: cur.execute( """ INSERT INTO fr24.aircraft (icao24, callsign, first_seen_at, last_seen_at) VALUES (%s, %s, %s, %s) ON CONFLICT (icao24) DO UPDATE SET last_seen_at = EXCLUDED.last_seen_at, callsign = COALESCE(EXCLUDED.callsign, fr24.aircraft.callsign), updated_at = now() RETURNING aircraft_id """, (icao24, callsign, now, now), ) return cur.fetchone()[0] FLIGHT_GAP_MINUTES = 30 # close flight if no points for this long def get_or_create_flight(conn, aircraft_id: int, callsign: str | None, observed_at) -> int: with conn.cursor() as cur: # reuse active flight only if last point was recent enough cur.execute( """ SELECT f.flight_id, MAX(tp.observed_at) as last_seen FROM fr24.flights f LEFT JOIN fr24.track_points tp ON tp.flight_id = f.flight_id WHERE f.aircraft_id = %s AND f.status = 'active' GROUP BY f.flight_id ORDER BY f.started_at DESC LIMIT 1 """, (aircraft_id,), ) row = cur.fetchone() if row: flight_id, last_seen = row if last_seen is None or (observed_at - last_seen).total_seconds() < FLIGHT_GAP_MINUTES * 60: return flight_id # gap too large — close old flight and open new one cur.execute( "UPDATE fr24.flights SET status = 'completed', ended_at = %s WHERE flight_id = %s", (last_seen, flight_id), ) cur.execute( """ INSERT INTO fr24.flights (aircraft_id, started_at, status, source, callsign) VALUES (%s, %s, 'active', 'rtl-sdr', %s) RETURNING flight_id """, (aircraft_id, observed_at, callsign), ) return cur.fetchone()[0] def update_flight_callsign(conn, aircraft_id: int, callsign: str) -> None: """Update callsign on active flight if not already set.""" with conn.cursor() as cur: cur.execute( """ UPDATE fr24.flights SET callsign = %s WHERE aircraft_id = %s AND status = 'active' AND callsign IS NULL """, (callsign, aircraft_id), ) def get_or_create_track(conn, flight_id: int) -> int: with conn.cursor() as cur: cur.execute( "SELECT track_id FROM fr24.tracks WHERE flight_id = %s LIMIT 1", (flight_id,), ) row = cur.fetchone() if row: return row[0] cur.execute( "INSERT INTO fr24.tracks (flight_id) VALUES (%s) RETURNING track_id", (flight_id,), ) return cur.fetchone()[0] def append_track_point(conn, track_id: int, flight_id: int, observed_at, raw_packet_id: int, partition_date, lat: float, lon: float, alt_m: float | None, speed: float | None, heading: float | None, vrate: float | None) -> None: with conn.cursor() as cur: cur.execute( "SELECT COALESCE(MAX(point_order), 0) + 1 FROM fr24.track_points WHERE track_id = %s", (track_id,), ) point_order = cur.fetchone()[0] cur.execute( """ INSERT INTO fr24.track_points (track_id, flight_id, observed_at, point_order, geom, altitude_m, ground_speed_kt, vertical_rate_fpm, heading_deg, source_packet_id, source_partition_date) VALUES (%s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), %s, %s, %s, %s, %s, %s) """, ( track_id, flight_id, observed_at, point_order, lon, lat, alt_m, speed, vrate, heading, raw_packet_id, partition_date, ), ) cur.execute( """ UPDATE fr24.tracks SET point_count = point_count + 1, last_point_at = %s, min_altitude_m = LEAST(COALESCE(min_altitude_m, %s), %s), max_altitude_m = GREATEST(COALESCE(max_altitude_m, %s), %s), updated_at = now() WHERE track_id = %s """, (observed_at, alt_m, alt_m, alt_m, alt_m, track_id), ) # ── batch processor ─────────────────────────────────────────────────────────── def process_batch(conn, packets: list, aircraft_state: dict) -> int: last_id = 0 skipped = 0 for pkt in packets: raw_id = pkt["raw_packet_id"] observed_at = pkt["observed_at"] partition_date = observed_at.date() if hasattr(observed_at, "date") else None parsed = parse_sbs1(pkt["payload_base64"]) if not parsed: # not an SBS-1 MSG line — skip silently last_id = raw_id skipped += 1 continue # only process messages that carry position (MSG type 3) # MSG1=callsign, MSG2=surface, MSG3=airborne pos, MSG4=airborne vel, # MSG5=surveillance alt, MSG6=surveillance id, MSG7=air-to-air, MSG8=all-call # We store all but only build track_points when we have lat+lon icao24 = parsed["icao24"] callsign = parsed["callsign"] lat = parsed["lat"] lon = parsed["lon"] alt_m = altitude_ft_to_m(parsed["altitude"]) speed = parsed["speed"] heading = parsed["heading"] vrate = parsed["vrate"] msg_type = parsed.get("msg_type") onground = parsed.get("onground") # update state cache from MSG4 (velocity) if msg_type == "4": if icao24 not in aircraft_state: aircraft_state[icao24] = {} if parsed["speed"] is not None: aircraft_state[icao24]["speed"] = float(parsed["speed"]) if parsed["heading"] is not None: aircraft_state[icao24]["heading"] = float(parsed["heading"]) if parsed["vrate"] is not None: aircraft_state[icao24]["vrate"] = float(parsed["vrate"]) if onground is not None: aircraft_state[icao24]["onground"] = onground aircraft_state[icao24]["velocity_ts"] = observed_at # update onground from any message if onground is not None and icao24 in aircraft_state: aircraft_state[icao24]["onground"] = onground # for MSG3: enrich with cached velocity if msg_type == "3": cached = aircraft_state.get(icao24, {}) velocity_ts = cached.get("velocity_ts") # use cached velocity only if it's fresh (< 30 sec old) velocity_fresh = ( velocity_ts is not None and (observed_at - velocity_ts).total_seconds() < 30 ) if speed is None and velocity_fresh: speed = cached.get("speed") if heading is None and velocity_fresh: heading = cached.get("heading") if vrate is None and velocity_fresh: vrate = cached.get("vrate") if onground is None: onground = cached.get("onground") # skip ground-stationary aircraft (onground=1 means parked/taxiing) if onground == "1": last_id = raw_id continue try: now = observed_at if isinstance(observed_at, datetime) else datetime.now(timezone.utc) aircraft_id = upsert_aircraft(conn, icao24, callsign, now) flight_id = get_or_create_flight(conn, aircraft_id, callsign, observed_at) # update callsign on active flight if MSG1 brought a callsign if msg_type == "1" and callsign: update_flight_callsign(conn, aircraft_id, callsign) # sanity-check: skip obviously invalid coordinates if lat is not None and lon is not None and not (-90 <= lat <= 90 and -180 <= lon <= 180): log.warning("Invalid coords icao24=%s lat=%s lon=%s — skipping", icao24, lat, lon) lat, lon = None, None if lat is not None and lon is not None: track_id = get_or_create_track(conn, flight_id) append_track_point( conn, track_id, flight_id, observed_at, raw_id, partition_date, lat, lon, alt_m, speed, heading, vrate, ) conn.commit() last_id = raw_id except Exception as e: log.error("Failed processing packet %s (icao24=%s): %s", raw_id, icao24, e) conn.rollback() last_id = raw_id # advance cursor even on error to avoid infinite retry if skipped: log.debug("Skipped %d non-MSG packets in batch", skipped) return last_id # ── main ────────────────────────────────────────────────────────────────────── def main(): conn = wait_for_db(30) open(HEALTHCHECK_FILE, "w").close() log.info("Healthcheck file written: %s", HEALTHCHECK_FILE) shutdown = {"flag": False} def _handle_signal(sig, frame): log.info("Signal %s — shutting down", sig) shutdown["flag"] = True signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) # cache: icao24 -> {speed, heading, vrate, onground} aircraft_state: dict = {} total = 0 log.info("Preprocess loop started (poll=%.1fs, batch=%d)", POLL_INTERVAL, BATCH_SIZE) while not shutdown["flag"]: try: cursor = get_cursor(conn) packets = fetch_unprocessed(conn, cursor, BATCH_SIZE) if packets: last_id = process_batch(conn, packets, aircraft_state) if last_id: set_cursor(conn, last_id) total += len(packets) log.info( "Processed %d packets (cursor→%d, total=%d)", len(packets), last_id, total, ) else: log.debug("No new packets, sleeping") except Exception as e: log.error("Poll error: %s", e) try: conn.rollback() except Exception: pass time.sleep(POLL_INTERVAL) conn.close() log.info("Preprocess service stopped. Total processed: %d", total) if __name__ == "__main__": main()