From 8e37569b31b550e25406212c46ca2024568a53ad Mon Sep 17 00:00:00 2001 From: Stream Date: Mon, 20 Apr 2026 01:40:01 +0300 Subject: [PATCH] auto-sync: 2026-04-20 01:40:01 --- tasks/flightradar24/ingest/preprocess/main.py | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/tasks/flightradar24/ingest/preprocess/main.py b/tasks/flightradar24/ingest/preprocess/main.py index 992412a..79805ca 100644 --- a/tasks/flightradar24/ingest/preprocess/main.py +++ b/tasks/flightradar24/ingest/preprocess/main.py @@ -264,7 +264,7 @@ def append_track_point(conn, track_id: int, flight_id: int, observed_at, # ── batch processor ─────────────────────────────────────────────────────────── -def process_batch(conn, packets: list) -> int: +def process_batch(conn, packets: list, aircraft_state: dict) -> int: last_id = 0 skipped = 0 @@ -294,6 +294,43 @@ def process_batch(conn, packets: list) -> int: heading = parsed["heading"] vrate = parsed["vrate"] + msg_type = parsed.get("msg_type") + onground = parsed.get("onground") + + # update state cache from MSG4 (velocity) + if msg_type == "4": + if icao24 not in aircraft_state: + aircraft_state[icao24] = {} + if parsed["speed"] is not None: + aircraft_state[icao24]["speed"] = float(parsed["speed"]) + if parsed["heading"] is not None: + aircraft_state[icao24]["heading"] = float(parsed["heading"]) + if parsed["vrate"] is not None: + aircraft_state[icao24]["vrate"] = float(parsed["vrate"]) + if onground is not None: + aircraft_state[icao24]["onground"] = onground + + # update onground from any message + if onground is not None and icao24 in aircraft_state: + aircraft_state[icao24]["onground"] = onground + + # for MSG3: enrich with cached velocity + if msg_type == "3": + cached = aircraft_state.get(icao24, {}) + if speed is None: + speed = cached.get("speed") + if heading is None: + heading = cached.get("heading") + if vrate is None: + vrate = cached.get("vrate") + if onground is None: + onground = cached.get("onground") + + # skip ground-stationary aircraft (onground=1 means parked/taxiing) + if onground == "1": + last_id = raw_id + continue + try: now = observed_at if isinstance(observed_at, datetime) else datetime.now(timezone.utc) aircraft_id = upsert_aircraft(conn, icao24, callsign, now) @@ -343,6 +380,9 @@ def main(): signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) + # cache: icao24 -> {speed, heading, vrate, onground} + aircraft_state: dict = {} + total = 0 log.info("Preprocess loop started (poll=%.1fs, batch=%d)", POLL_INTERVAL, BATCH_SIZE) @@ -352,7 +392,7 @@ def main(): packets = fetch_unprocessed(conn, cursor, BATCH_SIZE) if packets: - last_id = process_batch(conn, packets) + last_id = process_batch(conn, packets, aircraft_state) if last_id: set_cursor(conn, last_id) total += len(packets)