diff --git a/tasks/flightradar24/ingest/preprocess/main.py b/tasks/flightradar24/ingest/preprocess/main.py index 768ff11..992412a 100644 --- a/tasks/flightradar24/ingest/preprocess/main.py +++ b/tasks/flightradar24/ingest/preprocess/main.py @@ -164,20 +164,33 @@ def upsert_aircraft(conn, icao24: str, callsign: str | None, now: datetime) -> i return cur.fetchone()[0] +FLIGHT_GAP_MINUTES = 30 # close flight if no points for this long + + def get_or_create_flight(conn, aircraft_id: int, callsign: str | None, observed_at) -> int: with conn.cursor() as cur: - # reuse active flight for this aircraft + # reuse active flight only if last point was recent enough cur.execute( """ - SELECT flight_id FROM fr24.flights - WHERE aircraft_id = %s AND status = 'active' - ORDER BY started_at DESC LIMIT 1 + SELECT f.flight_id, MAX(tp.observed_at) as last_seen + FROM fr24.flights f + LEFT JOIN fr24.track_points tp ON tp.flight_id = f.flight_id + WHERE f.aircraft_id = %s AND f.status = 'active' + GROUP BY f.flight_id + ORDER BY f.started_at DESC LIMIT 1 """, (aircraft_id,), ) row = cur.fetchone() if row: - return row[0] + flight_id, last_seen = row + if last_seen is None or (observed_at - last_seen).total_seconds() < FLIGHT_GAP_MINUTES * 60: + return flight_id + # gap too large — close old flight and open new one + cur.execute( + "UPDATE fr24.flights SET status = 'completed', ended_at = %s WHERE flight_id = %s", + (last_seen, flight_id), + ) cur.execute( """ INSERT INTO fr24.flights