Files
wiki/tasks/flightradar24/ingest/preprocess/main.py
2026-04-19 15:30:01 +03:00

363 lines
12 KiB
Python

"""
FR24 Preprocess Service — Step 2: Real SBS-1 parsing
Reads raw_packets written by the capture service (SBS-1 format, base64-encoded),
extracts real ADS-B fields, and builds aircraft/flights/tracks/track_points.
"""
import os
import time
import base64
import logging
import signal
import sys
import json
from datetime import datetime, timezone
import psycopg2
import psycopg2.extras
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [preprocess] %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
log = logging.getLogger("preprocess")
DB_DSN = (
f"host={os.environ['POSTGRES_HOST']} "
f"port={os.environ.get('POSTGRES_PORT', 5432)} "
f"dbname={os.environ['POSTGRES_DB']} "
f"user={os.environ['POSTGRES_USER']} "
f"password={os.environ['POSTGRES_PASSWORD']}"
)
POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SECONDS", 5.0))
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 50))
HEALTHCHECK_FILE = "/tmp/preprocess-ready"
STATE_KEY = "preprocess_cursor"
# ── SBS-1 parser ─────────────────────────────────────────────────────────────
# MSG format (22 comma-separated fields):
# MSG,<msgtype>,<sid>,<aid>,<hex>,<fid>,<date_gen>,<time_gen>,<date_log>,<time_log>,
# <callsign>,<alt_ft>,<speed_kt>,<track_deg>,<lat>,<lon>,<vrate_fpm>,
# <squawk>,<alert>,<emerg>,<spi>,<onground>
def _float(s: str) -> float | None:
try:
return float(s) if s.strip() else None
except ValueError:
return None
def parse_sbs1(payload_base64: str) -> dict | None:
"""
Decode base64 payload, parse SBS-1 MSG line.
Returns dict with extracted fields, or None if unparseable / not a MSG.
"""
try:
line = base64.b64decode(payload_base64).decode("ascii", errors="replace").strip()
except Exception:
return None
parts = line.split(",")
if len(parts) < 22 or parts[0] != "MSG":
return None
icao24 = parts[4].strip().upper()
if not icao24:
return None
return {
"msg_type": parts[1].strip(),
"icao24": icao24,
"callsign": parts[10].strip() or None,
"altitude": _float(parts[11]), # feet
"speed": _float(parts[12]), # knots
"heading": _float(parts[13]), # degrees
"lat": _float(parts[14]),
"lon": _float(parts[15]),
"vrate": _float(parts[16]), # ft/min
"squawk": parts[17].strip() or None,
"on_ground": parts[21].strip() == "-1",
}
def altitude_ft_to_m(ft: float | None) -> float | None:
return round(ft * 0.3048, 2) if ft is not None else None
# ── db ────────────────────────────────────────────────────────────────────────
def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection:
for attempt in range(1, max_attempts + 1):
try:
conn = psycopg2.connect(DB_DSN)
psycopg2.extras.register_uuid(conn)
log.info("PostgreSQL connected (attempt %d)", attempt)
return conn
except psycopg2.OperationalError as e:
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
time.sleep(2)
log.error("Could not connect to PostgreSQL after %d attempts", max_attempts)
sys.exit(1)
def get_cursor(conn) -> int:
with conn.cursor() as cur:
cur.execute(
"SELECT state_value FROM fr24.processing_state WHERE state_key = %s",
(STATE_KEY,),
)
row = cur.fetchone()
if row:
return int(row[0].get("last_raw_packet_id", 0))
return 0
def set_cursor(conn, last_id: int):
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO fr24.processing_state (state_key, state_value, updated_at, note)
VALUES (%s, %s::jsonb, now(), 'preprocess cursor')
ON CONFLICT (state_key) DO UPDATE
SET state_value = EXCLUDED.state_value,
updated_at = now()
""",
(STATE_KEY, json.dumps({"last_raw_packet_id": last_id})),
)
conn.commit()
def fetch_unprocessed(conn, after_id: int, limit: int) -> list:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT raw_packet_id, capture_id, observed_at,
payload_base64, message_type, decoded_format
FROM fr24.raw_packets
WHERE raw_packet_id > %s
ORDER BY raw_packet_id
LIMIT %s
""",
(after_id, limit),
)
return cur.fetchall()
# ── upsert helpers ────────────────────────────────────────────────────────────
def upsert_aircraft(conn, icao24: str, callsign: str | None, now: datetime) -> int:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO fr24.aircraft
(icao24, callsign, first_seen_at, last_seen_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (icao24) DO UPDATE
SET last_seen_at = EXCLUDED.last_seen_at,
callsign = COALESCE(EXCLUDED.callsign, fr24.aircraft.callsign),
updated_at = now()
RETURNING aircraft_id
""",
(icao24, callsign, now, now),
)
return cur.fetchone()[0]
def get_or_create_flight(conn, aircraft_id: int, callsign: str | None, observed_at) -> int:
with conn.cursor() as cur:
# reuse active flight for this aircraft
cur.execute(
"""
SELECT flight_id FROM fr24.flights
WHERE aircraft_id = %s AND status = 'active'
ORDER BY started_at DESC LIMIT 1
""",
(aircraft_id,),
)
row = cur.fetchone()
if row:
return row[0]
cur.execute(
"""
INSERT INTO fr24.flights
(aircraft_id, started_at, status, source, callsign)
VALUES (%s, %s, 'active', 'rtl-sdr', %s)
RETURNING flight_id
""",
(aircraft_id, observed_at, callsign),
)
return cur.fetchone()[0]
def get_or_create_track(conn, flight_id: int) -> int:
with conn.cursor() as cur:
cur.execute(
"SELECT track_id FROM fr24.tracks WHERE flight_id = %s LIMIT 1",
(flight_id,),
)
row = cur.fetchone()
if row:
return row[0]
cur.execute(
"INSERT INTO fr24.tracks (flight_id) VALUES (%s) RETURNING track_id",
(flight_id,),
)
return cur.fetchone()[0]
def append_track_point(conn, track_id: int, flight_id: int, observed_at,
raw_packet_id: int, partition_date,
lat: float, lon: float, alt_m: float | None,
speed: float | None, heading: float | None,
vrate: float | None) -> None:
with conn.cursor() as cur:
cur.execute(
"SELECT COALESCE(MAX(point_order), 0) + 1 FROM fr24.track_points WHERE track_id = %s",
(track_id,),
)
point_order = cur.fetchone()[0]
cur.execute(
"""
INSERT INTO fr24.track_points
(track_id, flight_id, observed_at, point_order, geom,
altitude_m, ground_speed_kt, vertical_rate_fpm, heading_deg,
source_packet_id, source_partition_date)
VALUES
(%s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s)
""",
(
track_id, flight_id, observed_at, point_order, lon, lat,
alt_m, speed, vrate, heading,
raw_packet_id, partition_date,
),
)
cur.execute(
"""
UPDATE fr24.tracks SET
point_count = point_count + 1,
last_point_at = %s,
min_altitude_m = LEAST(COALESCE(min_altitude_m, %s), %s),
max_altitude_m = GREATEST(COALESCE(max_altitude_m, %s), %s),
updated_at = now()
WHERE track_id = %s
""",
(observed_at, alt_m, alt_m, alt_m, alt_m, track_id),
)
# ── batch processor ───────────────────────────────────────────────────────────
def process_batch(conn, packets: list) -> int:
last_id = 0
skipped = 0
for pkt in packets:
raw_id = pkt["raw_packet_id"]
observed_at = pkt["observed_at"]
partition_date = observed_at.date() if hasattr(observed_at, "date") else None
parsed = parse_sbs1(pkt["payload_base64"])
if not parsed:
# not an SBS-1 MSG line — skip silently
last_id = raw_id
skipped += 1
continue
# only process messages that carry position (MSG type 3)
# MSG1=callsign, MSG2=surface, MSG3=airborne pos, MSG4=airborne vel,
# MSG5=surveillance alt, MSG6=surveillance id, MSG7=air-to-air, MSG8=all-call
# We store all but only build track_points when we have lat+lon
icao24 = parsed["icao24"]
callsign = parsed["callsign"]
lat = parsed["lat"]
lon = parsed["lon"]
alt_m = altitude_ft_to_m(parsed["altitude"])
speed = parsed["speed"]
heading = parsed["heading"]
vrate = parsed["vrate"]
try:
now = observed_at if isinstance(observed_at, datetime) else datetime.now(timezone.utc)
aircraft_id = upsert_aircraft(conn, icao24, callsign, now)
flight_id = get_or_create_flight(conn, aircraft_id, callsign, observed_at)
if lat is not None and lon is not None:
track_id = get_or_create_track(conn, flight_id)
append_track_point(
conn, track_id, flight_id, observed_at,
raw_id, partition_date,
lat, lon, alt_m, speed, heading, vrate,
)
conn.commit()
last_id = raw_id
except Exception as e:
log.error("Failed processing packet %s (icao24=%s): %s", raw_id, icao24, e)
conn.rollback()
last_id = raw_id # advance cursor even on error to avoid infinite retry
if skipped:
log.debug("Skipped %d non-MSG packets in batch", skipped)
return last_id
# ── main ──────────────────────────────────────────────────────────────────────
def main():
conn = wait_for_db(30)
open(HEALTHCHECK_FILE, "w").close()
log.info("Healthcheck file written: %s", HEALTHCHECK_FILE)
shutdown = {"flag": False}
def _handle_signal(sig, frame):
log.info("Signal %s — shutting down", sig)
shutdown["flag"] = True
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
total = 0
log.info("Preprocess loop started (poll=%.1fs, batch=%d)", POLL_INTERVAL, BATCH_SIZE)
while not shutdown["flag"]:
try:
cursor = get_cursor(conn)
packets = fetch_unprocessed(conn, cursor, BATCH_SIZE)
if packets:
last_id = process_batch(conn, packets)
if last_id:
set_cursor(conn, last_id)
total += len(packets)
log.info(
"Processed %d packets (cursor→%d, total=%d)",
len(packets), last_id, total,
)
else:
log.debug("No new packets, sleeping")
except Exception as e:
log.error("Poll error: %s", e)
try:
conn.rollback()
except Exception:
pass
time.sleep(POLL_INTERVAL)
conn.close()
log.info("Preprocess service stopped. Total processed: %d", total)
if __name__ == "__main__":
main()