Files
wiki/tasks/flightradar24/ingest/capture/main.py
2026-04-20 02:30:01 +03:00

354 lines
13 KiB
Python

"""
FR24 Capture Service — Step 2: Real ADS-B via dump1090
Launches dump1090-fa as a subprocess, reads SBS-1 (BaseStation) messages
from its TCP port 30003, and writes real raw_packets to PostgreSQL.
"""
import os
import time
import base64
import logging
import signal
import sys
import socket
import subprocess
import threading
from datetime import datetime, timezone
from queue import Queue, Empty
import psycopg2
import psycopg2.extras
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [capture] %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
log = logging.getLogger("capture")
# ── config ────────────────────────────────────────────────────────────────────
DB_DSN = (
f"host={os.environ['POSTGRES_HOST']} "
f"port={os.environ.get('POSTGRES_PORT', 5432)} "
f"dbname={os.environ['POSTGRES_DB']} "
f"user={os.environ['POSTGRES_USER']} "
f"password={os.environ['POSTGRES_PASSWORD']}"
)
CENTER_FREQ = int(os.environ.get("RTLSDR_CENTER_FREQUENCY", 1090000000))
SAMPLE_RATE = int(os.environ.get("RTLSDR_SAMPLE_RATE", 2000000))
DEVICE_INDEX = int(os.environ.get("RTLSDR_DEVICE_INDEX", 0))
GAIN_RAW = os.environ.get("RTLSDR_GAIN", "auto")
GAIN_DB = None if GAIN_RAW == "auto" else float(GAIN_RAW)
ENABLE_BIAS_T = os.environ.get("RTLSDR_BIAS_T", "0") == "1"
DUMP1090_HOST = "127.0.0.1"
DUMP1090_SBS_PORT = 30003
DUMP1090_STARTUP_WAIT = 5 # seconds to wait for dump1090 to bind
DUMP1090_RECONNECT_DELAY = 3
HEALTHCHECK_FILE = "/tmp/capture-ready"
# ── dump1090 process ──────────────────────────────────────────────────────────
def build_dump1090_cmd() -> list[str]:
"""Build dump1090-fa command for RTL-SDR Blog V4."""
cmd = [
"dump1090-fa",
"--device-index", str(DEVICE_INDEX),
"--freq", str(CENTER_FREQ),
"--net", # enable network output
"--net-sbs-port", str(DUMP1090_SBS_PORT),
"--net-ro-port", "0", # disable raw output port
"--net-ri-port", "0",
"--net-bi-port", "0",
"--quiet", # suppress per-message stdout noise
"--write-json", "/tmp/dump1090-json", # optional JSON output
]
if GAIN_RAW == "auto":
cmd += ["--gain", "-10"] # dump1090 uses -10 for AGC
else:
cmd += ["--gain", GAIN_RAW]
if ENABLE_BIAS_T:
cmd += ["--enable-bias-t"]
return cmd
def start_dump1090() -> subprocess.Popen:
cmd = build_dump1090_cmd()
log.info("Starting dump1090: %s", " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
# drain dump1090 stdout in a background thread so it never blocks
def _drain(p):
for line in p.stdout:
line = line.rstrip()
if line:
log.debug("[dump1090] %s", line)
threading.Thread(target=_drain, args=(proc,), daemon=True).start()
return proc
# ── SBS-1 reader ─────────────────────────────────────────────────────────────
def sbs_reader(queue: Queue, shutdown: dict):
"""
Connect to dump1090 SBS port, read lines, push to queue.
Reconnects automatically on disconnect.
"""
while not shutdown["flag"]:
try:
log.info("Connecting to dump1090 SBS port %s:%d", DUMP1090_HOST, DUMP1090_SBS_PORT)
with socket.create_connection((DUMP1090_HOST, DUMP1090_SBS_PORT), timeout=10) as sock:
log.info("Connected to dump1090 SBS port")
buf = ""
sock.settimeout(2.0)
while not shutdown["flag"]:
try:
chunk = sock.recv(4096)
if not chunk:
log.warning("dump1090 SBS connection closed")
break
buf += chunk.decode("ascii", errors="replace")
while "\n" in buf:
line, buf = buf.split("\n", 1)
line = line.strip()
if line:
queue.put(line)
except socket.timeout:
continue
except (ConnectionRefusedError, OSError) as e:
if not shutdown["flag"]:
log.warning("SBS connect failed: %s — retry in %ds", e, DUMP1090_RECONNECT_DELAY)
time.sleep(DUMP1090_RECONNECT_DELAY)
# ── SBS-1 parser ─────────────────────────────────────────────────────────────
# SBS-1 BaseStation format:
# MSG,<msgtype>,<sid>,<aid>,<hex>,<fid>,<date>,<time>,<date>,<time>,
# <callsign>,<alt>,<speed>,<track>,<lat>,<lon>,<vrate>,<squawk>,<alert>,<emerg>,<spi>,<onground>
def parse_sbs_line(line: str) -> dict | None:
"""Parse a SBS-1 MSG line. Returns dict or None if not a MSG."""
parts = line.split(",")
if len(parts) < 22 or parts[0] != "MSG":
return None
return {
"msg_type": parts[1], # 1-8
"icao24": parts[4].upper().strip(),
"date_gen": parts[6],
"time_gen": parts[7],
"callsign": parts[10].strip() or None,
"altitude": parts[11].strip() or None,
"speed": parts[12].strip() or None,
"track": parts[13].strip() or None,
"lat": parts[14].strip() or None,
"lon": parts[15].strip() or None,
"vrate": parts[16].strip() or None,
"squawk": parts[17].strip() or None,
}
# ── db helpers ────────────────────────────────────────────────────────────────
def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection:
for attempt in range(1, max_attempts + 1):
try:
conn = psycopg2.connect(DB_DSN)
log.info("PostgreSQL connected (attempt %d)", attempt)
return conn
except psycopg2.OperationalError as e:
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
time.sleep(2)
log.error("Could not connect to PostgreSQL after %d attempts", max_attempts)
sys.exit(1)
def create_capture_session(conn) -> str:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO fr24.captures
(started_at, source, device_index, center_frequency_hz,
sample_rate_hz, gain_db, status, notes)
VALUES (%s, %s, %s, %s, %s, %s, 'active', %s)
RETURNING capture_id
""",
(
datetime.now(timezone.utc),
"rtl-sdr",
DEVICE_INDEX,
CENTER_FREQ,
SAMPLE_RATE,
GAIN_DB,
f"dump1090-fa real ADS-B capture, bias-t={'on' if ENABLE_BIAS_T else 'off'}",
),
)
capture_id = str(cur.fetchone()[0])
conn.commit()
log.info("Capture session created: %s", capture_id)
return capture_id
def insert_packets_batch(conn, rows: list[dict]):
"""Insert a batch of packets in a single transaction."""
if not rows:
return
with conn.cursor() as cur:
cur.executemany(
"""
INSERT INTO fr24.raw_packets
(capture_id, observed_at, partition_date, frequency_hz,
rssi_dbm, snr_db, samplerate_hz, payload_base64,
payload_bytes, decoded_format, message_type)
VALUES
(%(capture_id)s, %(observed_at)s, %(partition_date)s, %(frequency_hz)s,
%(rssi_dbm)s, %(snr_db)s, %(samplerate_hz)s, %(payload_base64)s,
%(payload_bytes)s, %(decoded_format)s, %(message_type)s)
""",
rows,
)
conn.commit()
def close_capture_session(conn, capture_id: str):
with conn.cursor() as cur:
cur.execute(
"UPDATE fr24.captures SET ended_at=%s, status='stopped', updated_at=now() WHERE capture_id=%s",
(datetime.now(timezone.utc), capture_id),
)
conn.commit()
log.info("Capture session closed: %s", capture_id)
# ── main ──────────────────────────────────────────────────────────────────────
def main():
conn = wait_for_db()
capture_id = create_capture_session(conn)
shutdown = {"flag": False}
def _handle_signal(sig, frame):
log.info("Signal %s received, shutting down", sig)
shutdown["flag"] = True
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
# start dump1090
dump1090_proc = start_dump1090()
log.info("Waiting %ds for dump1090 to start …", DUMP1090_STARTUP_WAIT)
time.sleep(DUMP1090_STARTUP_WAIT)
if dump1090_proc.poll() is not None:
log.error("dump1090 exited immediately (rc=%d) — check USB device", dump1090_proc.returncode)
close_capture_session(conn, capture_id)
sys.exit(1)
# signal healthcheck
open(HEALTHCHECK_FILE, "w").close()
log.info("Healthcheck file written: %s", HEALTHCHECK_FILE)
# start SBS reader thread
sbs_queue: Queue = Queue(maxsize=10000)
reader_thread = threading.Thread(
target=sbs_reader, args=(sbs_queue, shutdown), daemon=True
)
reader_thread.start()
BATCH_SIZE = 50
BATCH_TIMEOUT = 2.0 # seconds
batch: list[dict] = []
last_flush = time.time()
packet_count = 0
log.info("Listening for ADS-B messages on SBS port %d", DUMP1090_SBS_PORT)
while not shutdown["flag"]:
# check dump1090 still alive
if dump1090_proc.poll() is not None:
log.error("dump1090 died (rc=%d), restarting …", dump1090_proc.returncode)
dump1090_proc = start_dump1090()
time.sleep(DUMP1090_STARTUP_WAIT)
try:
line = sbs_queue.get(timeout=1.0)
except Empty:
continue
parsed = parse_sbs_line(line)
if not parsed:
continue
now = datetime.now(timezone.utc)
# encode the raw SBS line as the payload (base64 of UTF-8 bytes)
raw_bytes = line.encode("utf-8")
row = {
"capture_id": capture_id,
"observed_at": now,
"partition_date": now.date(),
"frequency_hz": CENTER_FREQ,
"rssi_dbm": None, # SBS format doesn't carry RSSI
"snr_db": None,
"samplerate_hz": SAMPLE_RATE,
"payload_base64": base64.b64encode(raw_bytes).decode(),
"payload_bytes": len(raw_bytes),
"decoded_format": "sbs1",
"message_type": f"MSG{parsed['msg_type']}",
}
# only store useful message types: MSG1 (callsign), MSG3 (position),
# MSG4 (speed/heading), MSG5 (altitude) — drop MSG6/7/8
KEEP_TYPES = {"1", "3", "4", "5"}
if parsed["msg_type"] not in KEEP_TYPES:
continue
batch.append(row)
packet_count += 1
now_ts = time.time()
if len(batch) >= BATCH_SIZE or (now_ts - last_flush) >= BATCH_TIMEOUT:
try:
insert_packets_batch(conn, batch)
if packet_count % 50 == 0:
log.info("Packets written: %d (last icao24=%s)", packet_count, parsed["icao24"])
batch = []
last_flush = now_ts
except Exception as e:
log.error("Batch insert failed: %s", e)
try:
conn.rollback()
except Exception:
pass
batch = []
last_flush = now_ts
# cleanup
log.info("Stopping dump1090 …")
dump1090_proc.terminate()
try:
dump1090_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
dump1090_proc.kill()
# flush remaining batch
if batch:
try:
insert_packets_batch(conn, batch)
log.info("Flushed %d remaining packets", len(batch))
except Exception as e:
log.error("Final batch flush failed: %s", e)
close_capture_session(conn, capture_id)
conn.close()
log.info("Capture service stopped. Total packets: %d", packet_count)
if __name__ == "__main__":
main()