auto-sync: 2026-04-19 15:30:01

This commit is contained in:
Stream
2026-04-19 15:30:01 +03:00
parent 206612b75d
commit 63a95b6539
4 changed files with 359 additions and 109 deletions

View File

@@ -18,6 +18,7 @@ x-common-env: &common-env
RTLSDR_SAMPLE_RATE: ${RTLSDR_SAMPLE_RATE:-2000000}
RTLSDR_CENTER_FREQUENCY: ${RTLSDR_CENTER_FREQUENCY:-1090000000}
RTLSDR_GAIN: ${RTLSDR_GAIN:-auto}
RTLSDR_BIAS_T: ${RTLSDR_BIAS_T:-0}
services:
postgres:
@@ -52,6 +53,7 @@ services:
environment:
<<: *common-env
SERVICE_ROLE: capture
privileged: true
devices:
- "/dev/bus/usb:/dev/bus/usb"
volumes:

View File

@@ -1,8 +1,44 @@
FROM python:3.11-slim
# ── system deps ───────────────────────────────────────────────────────────────
# libusb-1.0 + udev needed for RTL-SDR USB access
# dump1090-fa from FlightAware PPA (Debian/Ubuntu compatible)
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq-dev gcc python3-dev && rm -rf /var/lib/apt/lists/*
libpq-dev gcc python3-dev \
libusb-1.0-0 udev \
wget gnupg ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# ── rtl-sdr-blog driver (RTL-SDR Blog V4 / R828D tuner) ──────────────────────
# The standard rtl-sdr package does NOT support V4. We install the rtl-sdr-blog
# fork which adds R828D support and bias-T control.
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential cmake pkg-config \
libusb-1.0-0-dev \
git \
&& rm -rf /var/lib/apt/lists/*
RUN git clone https://github.com/rtlsdrblog/rtl-sdr-blog.git /tmp/rtl-sdr-blog \
&& cmake -S /tmp/rtl-sdr-blog -B /tmp/rtl-sdr-blog/build \
-DINSTALL_UDEV_RULES=ON \
-DDETACH_KERNEL_DRIVER=ON \
&& cmake --build /tmp/rtl-sdr-blog/build --parallel $(nproc) \
&& cmake --install /tmp/rtl-sdr-blog/build \
&& ldconfig \
&& rm -rf /tmp/rtl-sdr-blog
# ── dump1090-fa (FlightAware fork — best SBS-1 output support) ───────────────
# Build from source: works on any Debian/Ubuntu without PPA
RUN apt-get update && apt-get install -y --no-install-recommends \
libncurses-dev \
&& rm -rf /var/lib/apt/lists/*
RUN git clone --depth 1 https://github.com/flightaware/dump1090.git /tmp/dump1090 \
&& make -C /tmp/dump1090 -j$(nproc) \
&& cp /tmp/dump1090/dump1090 /usr/local/bin/dump1090-fa \
&& rm -rf /tmp/dump1090
# ── python app ────────────────────────────────────────────────────────────────
WORKDIR /app
COPY requirements.txt .
@@ -10,4 +46,7 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
# dump1090 JSON output dir
RUN mkdir -p /tmp/dump1090-json
CMD ["python", "-u", "main.py"]

View File

@@ -1,16 +1,19 @@
"""
FR24 Capture Service
Connects to PostgreSQL, creates a capture session, and writes fake raw_packets.
In production: replace the fake packet loop with actual RTL-SDR / dump1090 input.
FR24 Capture Service — Step 2: Real ADS-B via dump1090
Launches dump1090-fa as a subprocess, reads SBS-1 (BaseStation) messages
from its TCP port 30003, and writes real raw_packets to PostgreSQL.
"""
import os
import time
import base64
import random
import logging
import signal
import sys
import socket
import subprocess
import threading
from datetime import datetime, timezone
from queue import Queue, Empty
import psycopg2
import psycopg2.extras
@@ -30,44 +33,128 @@ DB_DSN = (
f"user={os.environ['POSTGRES_USER']} "
f"password={os.environ['POSTGRES_PASSWORD']}"
)
CENTER_FREQ = int(os.environ.get("RTLSDR_CENTER_FREQUENCY", 1090000000))
SAMPLE_RATE = int(os.environ.get("RTLSDR_SAMPLE_RATE", 2000000))
DEVICE_INDEX = int(os.environ.get("RTLSDR_DEVICE_INDEX", 0))
GAIN_RAW = os.environ.get("RTLSDR_GAIN", "auto")
GAIN_DB = None if GAIN_RAW == "auto" else float(GAIN_RAW)
PACKET_INTERVAL = float(os.environ.get("PACKET_INTERVAL_SECONDS", 2.0))
CENTER_FREQ = int(os.environ.get("RTLSDR_CENTER_FREQUENCY", 1090000000))
SAMPLE_RATE = int(os.environ.get("RTLSDR_SAMPLE_RATE", 2000000))
DEVICE_INDEX = int(os.environ.get("RTLSDR_DEVICE_INDEX", 0))
GAIN_RAW = os.environ.get("RTLSDR_GAIN", "auto")
GAIN_DB = None if GAIN_RAW == "auto" else float(GAIN_RAW)
ENABLE_BIAS_T = os.environ.get("RTLSDR_BIAS_T", "0") == "1"
DUMP1090_HOST = "127.0.0.1"
DUMP1090_SBS_PORT = 30003
DUMP1090_STARTUP_WAIT = 5 # seconds to wait for dump1090 to bind
DUMP1090_RECONNECT_DELAY = 3
HEALTHCHECK_FILE = "/tmp/capture-ready"
# ── fake ADS-B payload generator ─────────────────────────────────────────────
# Real ADS-B Mode-S messages are 7 or 14 bytes.
# We generate plausible-looking random bytes tagged as DF17 (extended squitter).
_ICAO_POOL = [f"{i:06X}" for i in random.sample(range(0x400000, 0xFFFFFF), 20)]
# ── dump1090 process ──────────────────────────────────────────────────────────
def _fake_adsb_bytes() -> bytes:
"""14-byte fake Mode-S extended squitter (DF17)."""
df17_first_byte = 0x8D # downlink format 17
icao = bytes.fromhex(random.choice(_ICAO_POOL))
payload = bytes([random.randint(0, 255) for _ in range(7)])
crc = bytes([random.randint(0, 255) for _ in range(3)])
return bytes([df17_first_byte]) + icao + payload + crc
def build_dump1090_cmd() -> list[str]:
"""Build dump1090-fa command for RTL-SDR Blog V4."""
cmd = [
"dump1090-fa",
"--device-index", str(DEVICE_INDEX),
"--freq", str(CENTER_FREQ),
"--net", # enable network output
"--net-sbs-port", str(DUMP1090_SBS_PORT),
"--net-ro-port", "0", # disable raw output port
"--net-ri-port", "0",
"--net-bi-port", "0",
"--quiet", # suppress per-message stdout noise
"--write-json", "/tmp/dump1090-json", # optional JSON output
]
if GAIN_RAW == "auto":
cmd += ["--gain", "-10"] # dump1090 uses -10 for AGC
else:
cmd += ["--gain", GAIN_RAW]
def _fake_packet_row(capture_id: str) -> dict:
raw = _fake_adsb_bytes()
if ENABLE_BIAS_T:
cmd += ["--enable-bias-t"]
return cmd
def start_dump1090() -> subprocess.Popen:
cmd = build_dump1090_cmd()
log.info("Starting dump1090: %s", " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# drain dump1090 stdout in a background thread so it never blocks
def _drain(p):
for line in p.stdout:
line = line.rstrip()
if line:
log.debug("[dump1090] %s", line)
threading.Thread(target=_drain, args=(proc,), daemon=True).start()
return proc
# ── SBS-1 reader ─────────────────────────────────────────────────────────────
def sbs_reader(queue: Queue, shutdown: dict):
"""
Connect to dump1090 SBS port, read lines, push to queue.
Reconnects automatically on disconnect.
"""
while not shutdown["flag"]:
try:
log.info("Connecting to dump1090 SBS port %s:%d", DUMP1090_HOST, DUMP1090_SBS_PORT)
with socket.create_connection((DUMP1090_HOST, DUMP1090_SBS_PORT), timeout=10) as sock:
log.info("Connected to dump1090 SBS port")
buf = ""
sock.settimeout(2.0)
while not shutdown["flag"]:
try:
chunk = sock.recv(4096)
if not chunk:
log.warning("dump1090 SBS connection closed")
break
buf += chunk.decode("ascii", errors="replace")
while "\n" in buf:
line, buf = buf.split("\n", 1)
line = line.strip()
if line:
queue.put(line)
except socket.timeout:
continue
except (ConnectionRefusedError, OSError) as e:
if not shutdown["flag"]:
log.warning("SBS connect failed: %s — retry in %ds", e, DUMP1090_RECONNECT_DELAY)
time.sleep(DUMP1090_RECONNECT_DELAY)
# ── SBS-1 parser ─────────────────────────────────────────────────────────────
# SBS-1 BaseStation format:
# MSG,<msgtype>,<sid>,<aid>,<hex>,<fid>,<date>,<time>,<date>,<time>,
# <callsign>,<alt>,<speed>,<track>,<lat>,<lon>,<vrate>,<squawk>,<alert>,<emerg>,<spi>,<onground>
def parse_sbs_line(line: str) -> dict | None:
"""Parse a SBS-1 MSG line. Returns dict or None if not a MSG."""
parts = line.split(",")
if len(parts) < 22 or parts[0] != "MSG":
return None
return {
"capture_id": capture_id,
"observed_at": datetime.now(timezone.utc),
"partition_date": datetime.now(timezone.utc).date(),
"frequency_hz": CENTER_FREQ,
"rssi_dbm": round(random.uniform(-90.0, -40.0), 3),
"snr_db": round(random.uniform(5.0, 30.0), 3),
"samplerate_hz": SAMPLE_RATE,
"payload_base64": base64.b64encode(raw).decode(),
"payload_bytes": len(raw),
"decoded_format": "mode-s",
"message_type": "DF17",
"msg_type": parts[1], # 1-8
"icao24": parts[4].upper().strip(),
"date_gen": parts[6],
"time_gen": parts[7],
"callsign": parts[10].strip() or None,
"altitude": parts[11].strip() or None,
"speed": parts[12].strip() or None,
"track": parts[13].strip() or None,
"lat": parts[14].strip() or None,
"lon": parts[15].strip() or None,
"vrate": parts[16].strip() or None,
"squawk": parts[17].strip() or None,
}
# ── db helpers ────────────────────────────────────────────────────────────────
def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection:
for attempt in range(1, max_attempts + 1):
try:
@@ -80,6 +167,7 @@ def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection:
log.error("Could not connect to PostgreSQL after %d attempts", max_attempts)
sys.exit(1)
def create_capture_session(conn) -> str:
with conn.cursor() as cur:
cur.execute(
@@ -97,7 +185,7 @@ def create_capture_session(conn) -> str:
CENTER_FREQ,
SAMPLE_RATE,
GAIN_DB,
"fake-data mode (step-1 scaffold)",
f"dump1090-fa real ADS-B capture, bias-t={'on' if ENABLE_BIAS_T else 'off'}",
),
)
capture_id = str(cur.fetchone()[0])
@@ -105,6 +193,7 @@ def create_capture_session(conn) -> str:
log.info("Capture session created: %s", capture_id)
return capture_id
def insert_packet(conn, row: dict):
with conn.cursor() as cur:
cur.execute(
@@ -122,6 +211,7 @@ def insert_packet(conn, row: dict):
)
conn.commit()
def close_capture_session(conn, capture_id: str):
with conn.cursor() as cur:
cur.execute(
@@ -131,15 +221,13 @@ def close_capture_session(conn, capture_id: str):
conn.commit()
log.info("Capture session closed: %s", capture_id)
# ── main ──────────────────────────────────────────────────────────────────────
def main():
conn = wait_for_db()
capture_id = create_capture_session(conn)
# signal healthcheck
open(HEALTHCHECK_FILE, "w").close()
log.info("Healthcheck file written: %s", HEALTHCHECK_FILE)
shutdown = {"flag": False}
def _handle_signal(sig, frame):
@@ -149,16 +237,68 @@ def main():
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
# start dump1090
dump1090_proc = start_dump1090()
log.info("Waiting %ds for dump1090 to start …", DUMP1090_STARTUP_WAIT)
time.sleep(DUMP1090_STARTUP_WAIT)
if dump1090_proc.poll() is not None:
log.error("dump1090 exited immediately (rc=%d) — check USB device", dump1090_proc.returncode)
close_capture_session(conn, capture_id)
sys.exit(1)
# signal healthcheck
open(HEALTHCHECK_FILE, "w").close()
log.info("Healthcheck file written: %s", HEALTHCHECK_FILE)
# start SBS reader thread
sbs_queue: Queue = Queue(maxsize=10000)
reader_thread = threading.Thread(
target=sbs_reader, args=(sbs_queue, shutdown), daemon=True
)
reader_thread.start()
packet_count = 0
log.info("Starting fake packet loop (interval=%.1fs)", PACKET_INTERVAL)
log.info("Listening for ADS-B messages on SBS port %d", DUMP1090_SBS_PORT)
while not shutdown["flag"]:
# check dump1090 still alive
if dump1090_proc.poll() is not None:
log.error("dump1090 died (rc=%d), restarting …", dump1090_proc.returncode)
dump1090_proc = start_dump1090()
time.sleep(DUMP1090_STARTUP_WAIT)
try:
line = sbs_queue.get(timeout=1.0)
except Empty:
continue
parsed = parse_sbs_line(line)
if not parsed:
continue
now = datetime.now(timezone.utc)
# encode the raw SBS line as the payload (base64 of UTF-8 bytes)
raw_bytes = line.encode("utf-8")
row = {
"capture_id": capture_id,
"observed_at": now,
"partition_date": now.date(),
"frequency_hz": CENTER_FREQ,
"rssi_dbm": None, # SBS format doesn't carry RSSI
"snr_db": None,
"samplerate_hz": SAMPLE_RATE,
"payload_base64": base64.b64encode(raw_bytes).decode(),
"payload_bytes": len(raw_bytes),
"decoded_format": "sbs1",
"message_type": f"MSG{parsed['msg_type']}",
}
try:
row = _fake_packet_row(capture_id)
insert_packet(conn, row)
packet_count += 1
if packet_count % 10 == 0:
log.info("Packets written: %d", packet_count)
if packet_count % 50 == 0:
log.info("Packets written: %d (last icao24=%s)", packet_count, parsed["icao24"])
except Exception as e:
log.error("Packet insert failed: %s", e)
try:
@@ -166,11 +306,18 @@ def main():
except Exception:
pass
time.sleep(PACKET_INTERVAL)
# cleanup
log.info("Stopping dump1090 …")
dump1090_proc.terminate()
try:
dump1090_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
dump1090_proc.kill()
close_capture_session(conn, capture_id)
conn.close()
log.info("Capture service stopped. Total packets: %d", packet_count)
if __name__ == "__main__":
main()

View File

@@ -1,11 +1,11 @@
"""
FR24 Preprocess Service
Reads unprocessed raw_packets, builds fake aircraft/flights/tracks/track_points,
and advances the processing_state cursor.
FR24 Preprocess Service — Step 2: Real SBS-1 parsing
Reads raw_packets written by the capture service (SBS-1 format, base64-encoded),
extracts real ADS-B fields, and builds aircraft/flights/tracks/track_points.
"""
import os
import time
import random
import base64
import logging
import signal
import sys
@@ -30,13 +30,59 @@ DB_DSN = (
f"password={os.environ['POSTGRES_PASSWORD']}"
)
POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SECONDS", 5.0))
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 20))
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 50))
HEALTHCHECK_FILE = "/tmp/preprocess-ready"
STATE_KEY = "preprocess_cursor"
# Bounding box: roughly Central Europe for plausible fake coords
LAT_MIN, LAT_MAX = 48.0, 56.0
LON_MIN, LON_MAX = 14.0, 40.0
# ── SBS-1 parser ─────────────────────────────────────────────────────────────
# MSG format (22 comma-separated fields):
# MSG,<msgtype>,<sid>,<aid>,<hex>,<fid>,<date_gen>,<time_gen>,<date_log>,<time_log>,
# <callsign>,<alt_ft>,<speed_kt>,<track_deg>,<lat>,<lon>,<vrate_fpm>,
# <squawk>,<alert>,<emerg>,<spi>,<onground>
def _float(s: str) -> float | None:
try:
return float(s) if s.strip() else None
except ValueError:
return None
def parse_sbs1(payload_base64: str) -> dict | None:
"""
Decode base64 payload, parse SBS-1 MSG line.
Returns dict with extracted fields, or None if unparseable / not a MSG.
"""
try:
line = base64.b64decode(payload_base64).decode("ascii", errors="replace").strip()
except Exception:
return None
parts = line.split(",")
if len(parts) < 22 or parts[0] != "MSG":
return None
icao24 = parts[4].strip().upper()
if not icao24:
return None
return {
"msg_type": parts[1].strip(),
"icao24": icao24,
"callsign": parts[10].strip() or None,
"altitude": _float(parts[11]), # feet
"speed": _float(parts[12]), # knots
"heading": _float(parts[13]), # degrees
"lat": _float(parts[14]),
"lon": _float(parts[15]),
"vrate": _float(parts[16]), # ft/min
"squawk": parts[17].strip() or None,
"on_ground": parts[21].strip() == "-1",
}
def altitude_ft_to_m(ft: float | None) -> float | None:
return round(ft * 0.3048, 2) if ft is not None else None
# ── db ────────────────────────────────────────────────────────────────────────
@@ -56,7 +102,6 @@ def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection:
def get_cursor(conn) -> int:
"""Return last processed raw_packet_id (0 if none)."""
with conn.cursor() as cur:
cur.execute(
"SELECT state_value FROM fr24.processing_state WHERE state_key = %s",
@@ -87,7 +132,8 @@ def fetch_unprocessed(conn, after_id: int, limit: int) -> list:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT raw_packet_id, capture_id, observed_at, payload_base64, message_type
SELECT raw_packet_id, capture_id, observed_at,
payload_base64, message_type, decoded_format
FROM fr24.raw_packets
WHERE raw_packet_id > %s
ORDER BY raw_packet_id
@@ -100,34 +146,27 @@ def fetch_unprocessed(conn, after_id: int, limit: int) -> list:
# ── upsert helpers ────────────────────────────────────────────────────────────
def upsert_aircraft(conn, icao24: str) -> int:
now = datetime.now(timezone.utc)
def upsert_aircraft(conn, icao24: str, callsign: str | None, now: datetime) -> int:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO fr24.aircraft
(icao24, callsign, registration, aircraft_type, first_seen_at, last_seen_at)
VALUES (%s, %s, %s, %s, %s, %s)
(icao24, callsign, first_seen_at, last_seen_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (icao24) DO UPDATE
SET last_seen_at = EXCLUDED.last_seen_at,
callsign = COALESCE(EXCLUDED.callsign, fr24.aircraft.callsign),
updated_at = now()
RETURNING aircraft_id
""",
(
icao24,
f"TEST{icao24[:4]}",
f"RA-{random.randint(10000,99999)}",
random.choice(["B738", "A320", "B77W", "A321", "E190"]),
now,
now,
),
(icao24, callsign, now, now),
)
return cur.fetchone()[0]
def get_or_create_flight(conn, aircraft_id: int, observed_at) -> int:
def get_or_create_flight(conn, aircraft_id: int, callsign: str | None, observed_at) -> int:
with conn.cursor() as cur:
# reuse an active flight for this aircraft if one exists
# reuse active flight for this aircraft
cur.execute(
"""
SELECT flight_id FROM fr24.flights
@@ -142,18 +181,11 @@ def get_or_create_flight(conn, aircraft_id: int, observed_at) -> int:
cur.execute(
"""
INSERT INTO fr24.flights
(aircraft_id, started_at, status, source,
departure_airport, arrival_airport, callsign)
VALUES (%s, %s, 'active', 'rtl-sdr', %s, %s, %s)
(aircraft_id, started_at, status, source, callsign)
VALUES (%s, %s, 'active', 'rtl-sdr', %s)
RETURNING flight_id
""",
(
aircraft_id,
observed_at,
random.choice(["SVO", "DME", "VKO", "LED", "AER"]),
random.choice(["IST", "FRA", "AMS", "CDG", "LHR"]),
f"AFL{random.randint(100,999)}",
),
(aircraft_id, observed_at, callsign),
)
return cur.fetchone()[0]
@@ -175,13 +207,11 @@ def get_or_create_track(conn, flight_id: int) -> int:
def append_track_point(conn, track_id: int, flight_id: int, observed_at,
raw_packet_id: int, partition_date) -> None:
lat = round(random.uniform(LAT_MIN, LAT_MAX), 6)
lon = round(random.uniform(LON_MIN, LON_MAX), 6)
alt = round(random.uniform(1000, 12000), 2)
raw_packet_id: int, partition_date,
lat: float, lon: float, alt_m: float | None,
speed: float | None, heading: float | None,
vrate: float | None) -> None:
with conn.cursor() as cur:
# next point_order
cur.execute(
"SELECT COALESCE(MAX(point_order), 0) + 1 FROM fr24.track_points WHERE track_id = %s",
(track_id,),
@@ -200,57 +230,86 @@ def append_track_point(conn, track_id: int, flight_id: int, observed_at,
""",
(
track_id, flight_id, observed_at, point_order, lon, lat,
alt,
round(random.uniform(200, 500), 2),
round(random.uniform(-1000, 1000), 2),
round(random.uniform(0, 360), 2),
raw_packet_id,
partition_date,
alt_m, speed, vrate, heading,
raw_packet_id, partition_date,
),
)
# update track aggregate
cur.execute(
"""
UPDATE fr24.tracks SET
point_count = point_count + 1,
last_point_at = %s,
point_count = point_count + 1,
last_point_at = %s,
min_altitude_m = LEAST(COALESCE(min_altitude_m, %s), %s),
max_altitude_m = GREATEST(COALESCE(max_altitude_m, %s), %s),
updated_at = now()
updated_at = now()
WHERE track_id = %s
""",
(observed_at, alt, alt, alt, alt, track_id),
(observed_at, alt_m, alt_m, alt_m, alt_m, track_id),
)
# ── main loop ─────────────────────────────────────────────────────────────────
# ── batch processor ───────────────────────────────────────────────────────────
def process_batch(conn, packets: list) -> int:
"""Process a batch of raw packets. Returns last processed raw_packet_id."""
last_id = 0
skipped = 0
for pkt in packets:
raw_id = pkt["raw_packet_id"]
observed_at = pkt["observed_at"]
raw_id = pkt["raw_packet_id"]
observed_at = pkt["observed_at"]
partition_date = observed_at.date() if hasattr(observed_at, "date") else None
# derive a fake ICAO24 from the packet id (deterministic per id)
icao24 = f"{(raw_id * 7919) % 0xFFFFFF:06X}"
parsed = parse_sbs1(pkt["payload_base64"])
if not parsed:
# not an SBS-1 MSG line — skip silently
last_id = raw_id
skipped += 1
continue
# only process messages that carry position (MSG type 3)
# MSG1=callsign, MSG2=surface, MSG3=airborne pos, MSG4=airborne vel,
# MSG5=surveillance alt, MSG6=surveillance id, MSG7=air-to-air, MSG8=all-call
# We store all but only build track_points when we have lat+lon
icao24 = parsed["icao24"]
callsign = parsed["callsign"]
lat = parsed["lat"]
lon = parsed["lon"]
alt_m = altitude_ft_to_m(parsed["altitude"])
speed = parsed["speed"]
heading = parsed["heading"]
vrate = parsed["vrate"]
try:
aircraft_id = upsert_aircraft(conn, icao24)
flight_id = get_or_create_flight(conn, aircraft_id, observed_at)
track_id = get_or_create_track(conn, flight_id)
append_track_point(conn, track_id, flight_id, observed_at, raw_id, partition_date)
now = observed_at if isinstance(observed_at, datetime) else datetime.now(timezone.utc)
aircraft_id = upsert_aircraft(conn, icao24, callsign, now)
flight_id = get_or_create_flight(conn, aircraft_id, callsign, observed_at)
if lat is not None and lon is not None:
track_id = get_or_create_track(conn, flight_id)
append_track_point(
conn, track_id, flight_id, observed_at,
raw_id, partition_date,
lat, lon, alt_m, speed, heading, vrate,
)
conn.commit()
last_id = raw_id
except Exception as e:
log.error("Failed processing packet %s: %s", raw_id, e)
log.error("Failed processing packet %s (icao24=%s): %s", raw_id, icao24, e)
conn.rollback()
last_id = raw_id # advance cursor even on error to avoid infinite retry
if skipped:
log.debug("Skipped %d non-MSG packets in batch", skipped)
return last_id
# ── main ──────────────────────────────────────────────────────────────────────
def main():
conn = wait_for_db(30)
@@ -271,7 +330,7 @@ def main():
while not shutdown["flag"]:
try:
cursor = get_cursor(conn)
cursor = get_cursor(conn)
packets = fetch_unprocessed(conn, cursor, BATCH_SIZE)
if packets:
@@ -279,7 +338,10 @@ def main():
if last_id:
set_cursor(conn, last_id)
total += len(packets)
log.info("Processed %d packets (cursor→%d, total=%d)", len(packets), last_id, total)
log.info(
"Processed %d packets (cursor→%d, total=%d)",
len(packets), last_id, total,
)
else:
log.debug("No new packets, sleeping")