447 lines
16 KiB
Python
447 lines
16 KiB
Python
"""
|
|
FR24 Preprocess Service — Step 2: Real SBS-1 parsing
|
|
Reads raw_packets written by the capture service (SBS-1 format, base64-encoded),
|
|
extracts real ADS-B fields, and builds aircraft/flights/tracks/track_points.
|
|
"""
|
|
import os
|
|
import time
|
|
import base64
|
|
import logging
|
|
import signal
|
|
import sys
|
|
import json
|
|
from datetime import datetime, timezone
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [preprocess] %(levelname)s %(message)s",
|
|
datefmt="%Y-%m-%dT%H:%M:%S",
|
|
)
|
|
log = logging.getLogger("preprocess")
|
|
|
|
DB_DSN = (
|
|
f"host={os.environ['POSTGRES_HOST']} "
|
|
f"port={os.environ.get('POSTGRES_PORT', 5432)} "
|
|
f"dbname={os.environ['POSTGRES_DB']} "
|
|
f"user={os.environ['POSTGRES_USER']} "
|
|
f"password={os.environ['POSTGRES_PASSWORD']}"
|
|
)
|
|
POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SECONDS", 5.0))
|
|
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 50))
|
|
HEALTHCHECK_FILE = "/tmp/preprocess-ready"
|
|
STATE_KEY = "preprocess_cursor"
|
|
|
|
|
|
# ── SBS-1 parser ─────────────────────────────────────────────────────────────
|
|
# MSG format (22 comma-separated fields):
|
|
# MSG,<msgtype>,<sid>,<aid>,<hex>,<fid>,<date_gen>,<time_gen>,<date_log>,<time_log>,
|
|
# <callsign>,<alt_ft>,<speed_kt>,<track_deg>,<lat>,<lon>,<vrate_fpm>,
|
|
# <squawk>,<alert>,<emerg>,<spi>,<onground>
|
|
|
|
def _float(s: str) -> float | None:
|
|
try:
|
|
return float(s) if s.strip() else None
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def parse_sbs1(payload_base64: str) -> dict | None:
|
|
"""
|
|
Decode base64 payload, parse SBS-1 MSG line.
|
|
Returns dict with extracted fields, or None if unparseable / not a MSG.
|
|
"""
|
|
try:
|
|
line = base64.b64decode(payload_base64).decode("ascii", errors="replace").strip()
|
|
except Exception:
|
|
return None
|
|
|
|
parts = line.split(",")
|
|
if len(parts) < 22 or parts[0] != "MSG":
|
|
return None
|
|
|
|
icao24 = parts[4].strip().upper()
|
|
if not icao24:
|
|
return None
|
|
|
|
return {
|
|
"msg_type": parts[1].strip(),
|
|
"icao24": icao24,
|
|
"callsign": parts[10].strip() or None,
|
|
"altitude": _float(parts[11]), # feet
|
|
"speed": _float(parts[12]), # knots
|
|
"heading": _float(parts[13]), # degrees
|
|
"lat": _float(parts[14]),
|
|
"lon": _float(parts[15]),
|
|
"vrate": _float(parts[16]), # ft/min
|
|
"squawk": parts[17].strip() or None,
|
|
"on_ground": parts[21].strip() == "-1",
|
|
}
|
|
|
|
|
|
def altitude_ft_to_m(ft: float | None) -> float | None:
|
|
return round(ft * 0.3048, 2) if ft is not None else None
|
|
|
|
|
|
# ── db ────────────────────────────────────────────────────────────────────────
|
|
|
|
def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection:
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
conn = psycopg2.connect(DB_DSN)
|
|
psycopg2.extras.register_uuid(conn)
|
|
log.info("PostgreSQL connected (attempt %d)", attempt)
|
|
return conn
|
|
except psycopg2.OperationalError as e:
|
|
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
|
|
time.sleep(2)
|
|
log.error("Could not connect to PostgreSQL after %d attempts", max_attempts)
|
|
sys.exit(1)
|
|
|
|
|
|
def get_cursor(conn) -> int:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT state_value FROM fr24.processing_state WHERE state_key = %s",
|
|
(STATE_KEY,),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
return int(row[0].get("last_raw_packet_id", 0))
|
|
return 0
|
|
|
|
|
|
def set_cursor(conn, last_id: int):
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO fr24.processing_state (state_key, state_value, updated_at, note)
|
|
VALUES (%s, %s::jsonb, now(), 'preprocess cursor')
|
|
ON CONFLICT (state_key) DO UPDATE
|
|
SET state_value = EXCLUDED.state_value,
|
|
updated_at = now()
|
|
""",
|
|
(STATE_KEY, json.dumps({"last_raw_packet_id": last_id})),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def fetch_unprocessed(conn, after_id: int, limit: int) -> list:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT raw_packet_id, capture_id, observed_at,
|
|
payload_base64, message_type, decoded_format
|
|
FROM fr24.raw_packets
|
|
WHERE raw_packet_id > %s
|
|
ORDER BY raw_packet_id
|
|
LIMIT %s
|
|
""",
|
|
(after_id, limit),
|
|
)
|
|
return cur.fetchall()
|
|
|
|
|
|
# ── upsert helpers ────────────────────────────────────────────────────────────
|
|
|
|
def upsert_aircraft(conn, icao24: str, callsign: str | None, now: datetime) -> int:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO fr24.aircraft
|
|
(icao24, callsign, first_seen_at, last_seen_at)
|
|
VALUES (%s, %s, %s, %s)
|
|
ON CONFLICT (icao24) DO UPDATE
|
|
SET last_seen_at = EXCLUDED.last_seen_at,
|
|
callsign = COALESCE(EXCLUDED.callsign, fr24.aircraft.callsign),
|
|
updated_at = now()
|
|
RETURNING aircraft_id
|
|
""",
|
|
(icao24, callsign, now, now),
|
|
)
|
|
return cur.fetchone()[0]
|
|
|
|
|
|
FLIGHT_GAP_MINUTES = 30 # close flight if no points for this long
|
|
|
|
|
|
def get_or_create_flight(conn, aircraft_id: int, callsign: str | None, observed_at) -> int:
|
|
with conn.cursor() as cur:
|
|
# reuse active flight only if last point was recent enough
|
|
cur.execute(
|
|
"""
|
|
SELECT f.flight_id, MAX(tp.observed_at) as last_seen
|
|
FROM fr24.flights f
|
|
LEFT JOIN fr24.track_points tp ON tp.flight_id = f.flight_id
|
|
WHERE f.aircraft_id = %s AND f.status = 'active'
|
|
GROUP BY f.flight_id
|
|
ORDER BY f.started_at DESC LIMIT 1
|
|
""",
|
|
(aircraft_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
flight_id, last_seen = row
|
|
if last_seen is None or (observed_at - last_seen).total_seconds() < FLIGHT_GAP_MINUTES * 60:
|
|
return flight_id
|
|
# gap too large — close old flight and open new one
|
|
cur.execute(
|
|
"UPDATE fr24.flights SET status = 'completed', ended_at = %s WHERE flight_id = %s",
|
|
(last_seen, flight_id),
|
|
)
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO fr24.flights
|
|
(aircraft_id, started_at, status, source, callsign)
|
|
VALUES (%s, %s, 'active', 'rtl-sdr', %s)
|
|
RETURNING flight_id
|
|
""",
|
|
(aircraft_id, observed_at, callsign),
|
|
)
|
|
return cur.fetchone()[0]
|
|
|
|
|
|
def update_flight_callsign(conn, aircraft_id: int, callsign: str) -> None:
|
|
"""Update callsign on active flight if not already set."""
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
UPDATE fr24.flights
|
|
SET callsign = %s
|
|
WHERE aircraft_id = %s
|
|
AND status = 'active'
|
|
AND callsign IS NULL
|
|
""",
|
|
(callsign, aircraft_id),
|
|
)
|
|
|
|
|
|
def get_or_create_track(conn, flight_id: int) -> int:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT track_id FROM fr24.tracks WHERE flight_id = %s LIMIT 1",
|
|
(flight_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
return row[0]
|
|
cur.execute(
|
|
"INSERT INTO fr24.tracks (flight_id) VALUES (%s) RETURNING track_id",
|
|
(flight_id,),
|
|
)
|
|
return cur.fetchone()[0]
|
|
|
|
|
|
def append_track_point(conn, track_id: int, flight_id: int, observed_at,
|
|
raw_packet_id: int, partition_date,
|
|
lat: float, lon: float, alt_m: float | None,
|
|
speed: float | None, heading: float | None,
|
|
vrate: float | None) -> None:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT COALESCE(MAX(point_order), 0) + 1 FROM fr24.track_points WHERE track_id = %s",
|
|
(track_id,),
|
|
)
|
|
point_order = cur.fetchone()[0]
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO fr24.track_points
|
|
(track_id, flight_id, observed_at, point_order, geom,
|
|
altitude_m, ground_speed_kt, vertical_rate_fpm, heading_deg,
|
|
source_packet_id, source_partition_date)
|
|
VALUES
|
|
(%s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
|
|
%s, %s, %s, %s, %s, %s)
|
|
""",
|
|
(
|
|
track_id, flight_id, observed_at, point_order, lon, lat,
|
|
alt_m, speed, vrate, heading,
|
|
raw_packet_id, partition_date,
|
|
),
|
|
)
|
|
|
|
cur.execute(
|
|
"""
|
|
UPDATE fr24.tracks SET
|
|
point_count = point_count + 1,
|
|
last_point_at = %s,
|
|
min_altitude_m = LEAST(COALESCE(min_altitude_m, %s), %s),
|
|
max_altitude_m = GREATEST(COALESCE(max_altitude_m, %s), %s),
|
|
updated_at = now()
|
|
WHERE track_id = %s
|
|
""",
|
|
(observed_at, alt_m, alt_m, alt_m, alt_m, track_id),
|
|
)
|
|
|
|
|
|
# ── batch processor ───────────────────────────────────────────────────────────
|
|
|
|
def process_batch(conn, packets: list, aircraft_state: dict) -> int:
|
|
last_id = 0
|
|
skipped = 0
|
|
|
|
for pkt in packets:
|
|
raw_id = pkt["raw_packet_id"]
|
|
observed_at = pkt["observed_at"]
|
|
partition_date = observed_at.date() if hasattr(observed_at, "date") else None
|
|
|
|
parsed = parse_sbs1(pkt["payload_base64"])
|
|
|
|
if not parsed:
|
|
# not an SBS-1 MSG line — skip silently
|
|
last_id = raw_id
|
|
skipped += 1
|
|
continue
|
|
|
|
# only process messages that carry position (MSG type 3)
|
|
# MSG1=callsign, MSG2=surface, MSG3=airborne pos, MSG4=airborne vel,
|
|
# MSG5=surveillance alt, MSG6=surveillance id, MSG7=air-to-air, MSG8=all-call
|
|
# We store all but only build track_points when we have lat+lon
|
|
icao24 = parsed["icao24"]
|
|
callsign = parsed["callsign"]
|
|
lat = parsed["lat"]
|
|
lon = parsed["lon"]
|
|
alt_m = altitude_ft_to_m(parsed["altitude"])
|
|
speed = parsed["speed"]
|
|
heading = parsed["heading"]
|
|
vrate = parsed["vrate"]
|
|
|
|
msg_type = parsed.get("msg_type")
|
|
onground = parsed.get("onground")
|
|
|
|
# update state cache from MSG4 (velocity)
|
|
if msg_type == "4":
|
|
if icao24 not in aircraft_state:
|
|
aircraft_state[icao24] = {}
|
|
if parsed["speed"] is not None:
|
|
aircraft_state[icao24]["speed"] = float(parsed["speed"])
|
|
if parsed["heading"] is not None:
|
|
aircraft_state[icao24]["heading"] = float(parsed["heading"])
|
|
if parsed["vrate"] is not None:
|
|
aircraft_state[icao24]["vrate"] = float(parsed["vrate"])
|
|
if onground is not None:
|
|
aircraft_state[icao24]["onground"] = onground
|
|
aircraft_state[icao24]["velocity_ts"] = observed_at
|
|
|
|
# update onground from any message
|
|
if onground is not None and icao24 in aircraft_state:
|
|
aircraft_state[icao24]["onground"] = onground
|
|
|
|
# for MSG3: enrich with cached velocity
|
|
if msg_type == "3":
|
|
cached = aircraft_state.get(icao24, {})
|
|
velocity_ts = cached.get("velocity_ts")
|
|
# use cached velocity only if it's fresh (< 30 sec old)
|
|
velocity_fresh = (
|
|
velocity_ts is not None and
|
|
(observed_at - velocity_ts).total_seconds() < 30
|
|
)
|
|
if speed is None and velocity_fresh:
|
|
speed = cached.get("speed")
|
|
if heading is None and velocity_fresh:
|
|
heading = cached.get("heading")
|
|
if vrate is None and velocity_fresh:
|
|
vrate = cached.get("vrate")
|
|
if onground is None:
|
|
onground = cached.get("onground")
|
|
|
|
# skip ground-stationary aircraft (onground=1 means parked/taxiing)
|
|
if onground == "1":
|
|
last_id = raw_id
|
|
continue
|
|
|
|
try:
|
|
now = observed_at if isinstance(observed_at, datetime) else datetime.now(timezone.utc)
|
|
aircraft_id = upsert_aircraft(conn, icao24, callsign, now)
|
|
flight_id = get_or_create_flight(conn, aircraft_id, callsign, observed_at)
|
|
|
|
# update callsign on active flight if MSG1 brought a callsign
|
|
if msg_type == "1" and callsign:
|
|
update_flight_callsign(conn, aircraft_id, callsign)
|
|
|
|
# sanity-check: skip obviously invalid coordinates
|
|
if lat is not None and lon is not None and not (-90 <= lat <= 90 and -180 <= lon <= 180):
|
|
log.warning("Invalid coords icao24=%s lat=%s lon=%s — skipping", icao24, lat, lon)
|
|
lat, lon = None, None
|
|
|
|
if lat is not None and lon is not None:
|
|
track_id = get_or_create_track(conn, flight_id)
|
|
append_track_point(
|
|
conn, track_id, flight_id, observed_at,
|
|
raw_id, partition_date,
|
|
lat, lon, alt_m, speed, heading, vrate,
|
|
)
|
|
|
|
conn.commit()
|
|
last_id = raw_id
|
|
|
|
except Exception as e:
|
|
log.error("Failed processing packet %s (icao24=%s): %s", raw_id, icao24, e)
|
|
conn.rollback()
|
|
last_id = raw_id # advance cursor even on error to avoid infinite retry
|
|
|
|
if skipped:
|
|
log.debug("Skipped %d non-MSG packets in batch", skipped)
|
|
|
|
return last_id
|
|
|
|
|
|
# ── main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
conn = wait_for_db(30)
|
|
|
|
open(HEALTHCHECK_FILE, "w").close()
|
|
log.info("Healthcheck file written: %s", HEALTHCHECK_FILE)
|
|
|
|
shutdown = {"flag": False}
|
|
|
|
def _handle_signal(sig, frame):
|
|
log.info("Signal %s — shutting down", sig)
|
|
shutdown["flag"] = True
|
|
|
|
signal.signal(signal.SIGTERM, _handle_signal)
|
|
signal.signal(signal.SIGINT, _handle_signal)
|
|
|
|
# cache: icao24 -> {speed, heading, vrate, onground}
|
|
aircraft_state: dict = {}
|
|
|
|
total = 0
|
|
log.info("Preprocess loop started (poll=%.1fs, batch=%d)", POLL_INTERVAL, BATCH_SIZE)
|
|
|
|
while not shutdown["flag"]:
|
|
try:
|
|
cursor = get_cursor(conn)
|
|
packets = fetch_unprocessed(conn, cursor, BATCH_SIZE)
|
|
|
|
if packets:
|
|
last_id = process_batch(conn, packets, aircraft_state)
|
|
if last_id:
|
|
set_cursor(conn, last_id)
|
|
total += len(packets)
|
|
log.info(
|
|
"Processed %d packets (cursor→%d, total=%d)",
|
|
len(packets), last_id, total,
|
|
)
|
|
else:
|
|
log.debug("No new packets, sleeping")
|
|
|
|
except Exception as e:
|
|
log.error("Poll error: %s", e)
|
|
try:
|
|
conn.rollback()
|
|
except Exception:
|
|
pass
|
|
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
conn.close()
|
|
log.info("Preprocess service stopped. Total processed: %d", total)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|