Files
wiki/tasks/flightradar24/ingest/preprocess/main.py
2026-04-19 14:50:01 +03:00

301 lines
10 KiB
Python

"""
FR24 Preprocess Service
Reads unprocessed raw_packets, builds fake aircraft/flights/tracks/track_points,
and advances the processing_state cursor.
"""
import os
import time
import random
import logging
import signal
import sys
import json
from datetime import datetime, timezone
import psycopg2
import psycopg2.extras
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [preprocess] %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
log = logging.getLogger("preprocess")
DB_DSN = (
f"host={os.environ['POSTGRES_HOST']} "
f"port={os.environ.get('POSTGRES_PORT', 5432)} "
f"dbname={os.environ['POSTGRES_DB']} "
f"user={os.environ['POSTGRES_USER']} "
f"password={os.environ['POSTGRES_PASSWORD']}"
)
POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SECONDS", 5.0))
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 20))
HEALTHCHECK_FILE = "/tmp/preprocess-ready"
STATE_KEY = "preprocess_cursor"
# Bounding box: roughly Central Europe for plausible fake coords
LAT_MIN, LAT_MAX = 48.0, 56.0
LON_MIN, LON_MAX = 14.0, 40.0
# ── db ────────────────────────────────────────────────────────────────────────
def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection:
for attempt in range(1, max_attempts + 1):
try:
conn = psycopg2.connect(DB_DSN)
psycopg2.extras.register_uuid(conn)
log.info("PostgreSQL connected (attempt %d)", attempt)
return conn
except psycopg2.OperationalError as e:
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
time.sleep(2)
log.error("Could not connect to PostgreSQL after %d attempts", max_attempts)
sys.exit(1)
def get_cursor(conn) -> int:
"""Return last processed raw_packet_id (0 if none)."""
with conn.cursor() as cur:
cur.execute(
"SELECT state_value FROM fr24.processing_state WHERE state_key = %s",
(STATE_KEY,),
)
row = cur.fetchone()
if row:
return int(row[0].get("last_raw_packet_id", 0))
return 0
def set_cursor(conn, last_id: int):
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO fr24.processing_state (state_key, state_value, updated_at, note)
VALUES (%s, %s::jsonb, now(), 'preprocess cursor')
ON CONFLICT (state_key) DO UPDATE
SET state_value = EXCLUDED.state_value,
updated_at = now()
""",
(STATE_KEY, json.dumps({"last_raw_packet_id": last_id})),
)
conn.commit()
def fetch_unprocessed(conn, after_id: int, limit: int) -> list:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT raw_packet_id, capture_id, observed_at, payload_base64, message_type
FROM fr24.raw_packets
WHERE raw_packet_id > %s
ORDER BY raw_packet_id
LIMIT %s
""",
(after_id, limit),
)
return cur.fetchall()
# ── upsert helpers ────────────────────────────────────────────────────────────
def upsert_aircraft(conn, icao24: str) -> int:
now = datetime.now(timezone.utc)
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO fr24.aircraft
(icao24, callsign, registration, aircraft_type, first_seen_at, last_seen_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (icao24) DO UPDATE
SET last_seen_at = EXCLUDED.last_seen_at,
updated_at = now()
RETURNING aircraft_id
""",
(
icao24,
f"TEST{icao24[:4]}",
f"RA-{random.randint(10000,99999)}",
random.choice(["B738", "A320", "B77W", "A321", "E190"]),
now,
now,
),
)
return cur.fetchone()[0]
def get_or_create_flight(conn, aircraft_id: int, observed_at) -> int:
with conn.cursor() as cur:
# reuse an active flight for this aircraft if one exists
cur.execute(
"""
SELECT flight_id FROM fr24.flights
WHERE aircraft_id = %s AND status = 'active'
ORDER BY started_at DESC LIMIT 1
""",
(aircraft_id,),
)
row = cur.fetchone()
if row:
return row[0]
cur.execute(
"""
INSERT INTO fr24.flights
(aircraft_id, started_at, status, source,
departure_airport, arrival_airport, callsign)
VALUES (%s, %s, 'active', 'rtl-sdr', %s, %s, %s)
RETURNING flight_id
""",
(
aircraft_id,
observed_at,
random.choice(["SVO", "DME", "VKO", "LED", "AER"]),
random.choice(["IST", "FRA", "AMS", "CDG", "LHR"]),
f"AFL{random.randint(100,999)}",
),
)
return cur.fetchone()[0]
def get_or_create_track(conn, flight_id: int) -> int:
with conn.cursor() as cur:
cur.execute(
"SELECT track_id FROM fr24.tracks WHERE flight_id = %s LIMIT 1",
(flight_id,),
)
row = cur.fetchone()
if row:
return row[0]
cur.execute(
"INSERT INTO fr24.tracks (flight_id) VALUES (%s) RETURNING track_id",
(flight_id,),
)
return cur.fetchone()[0]
def append_track_point(conn, track_id: int, flight_id: int, observed_at,
raw_packet_id: int, partition_date) -> None:
lat = round(random.uniform(LAT_MIN, LAT_MAX), 6)
lon = round(random.uniform(LON_MIN, LON_MAX), 6)
alt = round(random.uniform(1000, 12000), 2)
with conn.cursor() as cur:
# next point_order
cur.execute(
"SELECT COALESCE(MAX(point_order), 0) + 1 FROM fr24.track_points WHERE track_id = %s",
(track_id,),
)
point_order = cur.fetchone()[0]
cur.execute(
"""
INSERT INTO fr24.track_points
(track_id, flight_id, observed_at, point_order, geom,
altitude_m, ground_speed_kt, vertical_rate_fpm, heading_deg,
source_packet_id, source_partition_date)
VALUES
(%s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326),
%s, %s, %s, %s, %s, %s)
""",
(
track_id, flight_id, observed_at, point_order, lon, lat,
alt,
round(random.uniform(200, 500), 2),
round(random.uniform(-1000, 1000), 2),
round(random.uniform(0, 360), 2),
raw_packet_id,
partition_date,
),
)
# update track aggregate
cur.execute(
"""
UPDATE fr24.tracks SET
point_count = point_count + 1,
last_point_at = %s,
min_altitude_m = LEAST(COALESCE(min_altitude_m, %s), %s),
max_altitude_m = GREATEST(COALESCE(max_altitude_m, %s), %s),
updated_at = now()
WHERE track_id = %s
""",
(observed_at, alt, alt, alt, alt, track_id),
)
# ── main loop ─────────────────────────────────────────────────────────────────
def process_batch(conn, packets: list) -> int:
"""Process a batch of raw packets. Returns last processed raw_packet_id."""
last_id = 0
for pkt in packets:
raw_id = pkt["raw_packet_id"]
observed_at = pkt["observed_at"]
partition_date = observed_at.date() if hasattr(observed_at, "date") else None
# derive a fake ICAO24 from the packet id (deterministic per id)
icao24 = f"{(raw_id * 7919) % 0xFFFFFF:06X}"
try:
aircraft_id = upsert_aircraft(conn, icao24)
flight_id = get_or_create_flight(conn, aircraft_id, observed_at)
track_id = get_or_create_track(conn, flight_id)
append_track_point(conn, track_id, flight_id, observed_at, raw_id, partition_date)
conn.commit()
last_id = raw_id
except Exception as e:
log.error("Failed processing packet %s: %s", raw_id, e)
conn.rollback()
return last_id
def main():
conn = wait_for_db(30)
open(HEALTHCHECK_FILE, "w").close()
log.info("Healthcheck file written: %s", HEALTHCHECK_FILE)
shutdown = {"flag": False}
def _handle_signal(sig, frame):
log.info("Signal %s — shutting down", sig)
shutdown["flag"] = True
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
total = 0
log.info("Preprocess loop started (poll=%.1fs, batch=%d)", POLL_INTERVAL, BATCH_SIZE)
while not shutdown["flag"]:
try:
cursor = get_cursor(conn)
packets = fetch_unprocessed(conn, cursor, BATCH_SIZE)
if packets:
last_id = process_batch(conn, packets)
if last_id:
set_cursor(conn, last_id)
total += len(packets)
log.info("Processed %d packets (cursor→%d, total=%d)", len(packets), last_id, total)
else:
log.debug("No new packets, sleeping")
except Exception as e:
log.error("Poll error: %s", e)
try:
conn.rollback()
except Exception:
pass
time.sleep(POLL_INTERVAL)
conn.close()
log.info("Preprocess service stopped. Total processed: %d", total)
if __name__ == "__main__":
main()