diff --git a/tasks/flightradar24/frontend/main.py b/tasks/flightradar24/frontend/main.py
index 514ab9d..ae6ab62 100644
--- a/tasks/flightradar24/frontend/main.py
+++ b/tasks/flightradar24/frontend/main.py
@@ -359,17 +359,20 @@ def monitoring_status():
unprocessed = query_one(
"""
SELECT
- (SELECT COALESCE((state_value->>'last_raw_packet_id')::bigint, 0)
- FROM fr24.processing_state WHERE state_key = 'preprocess_cursor') AS cursor_id,
- (SELECT MAX(raw_packet_id) FROM fr24.raw_packets) AS max_id,
+ COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) AS cursor_id,
(SELECT COUNT(*) FROM fr24.raw_packets) AS total
+ FROM fr24.processing_state
+ WHERE state_key = 'preprocess_cursor'
"""
)
if unprocessed:
cursor_id = unprocessed["cursor_id"] or 0
- max_id = unprocessed["max_id"] or 0
total = unprocessed["total"] or 0
- pending = max(0, max_id - cursor_id)
+ pending_rows = query_one(
+ "SELECT COUNT(*) as cnt FROM fr24.raw_packets WHERE raw_packet_id > %s",
+ (cursor_id,)
+ )
+ pending = pending_rows["cnt"] if pending_rows else 0
pending_pct = round(pending / total * 100, 1) if total else 0
unprocessed_info = {"pending": pending, "pending_pct": pending_pct, "total": total}
else:
diff --git a/tasks/flightradar24/frontend/static/monitoring.html b/tasks/flightradar24/frontend/static/monitoring.html
index ad412ea..ae5f982 100644
--- a/tasks/flightradar24/frontend/static/monitoring.html
+++ b/tasks/flightradar24/frontend/static/monitoring.html
@@ -181,7 +181,7 @@
Disk % |
DB Size |
Lag (сек) |
- Throughput |
+ Capture / 5min |
diff --git a/tasks/flightradar24/ingest/capture/main.py b/tasks/flightradar24/ingest/capture/main.py
index d2de8a5..ee06aa4 100644
--- a/tasks/flightradar24/ingest/capture/main.py
+++ b/tasks/flightradar24/ingest/capture/main.py
@@ -194,9 +194,12 @@ def create_capture_session(conn) -> str:
return capture_id
-def insert_packet(conn, row: dict):
+def insert_packets_batch(conn, rows: list[dict]):
+ """Insert a batch of packets in a single transaction."""
+ if not rows:
+ return
with conn.cursor() as cur:
- cur.execute(
+ cur.executemany(
"""
INSERT INTO fr24.raw_packets
(capture_id, observed_at, partition_date, frequency_hz,
@@ -207,7 +210,7 @@ def insert_packet(conn, row: dict):
%(rssi_dbm)s, %(snr_db)s, %(samplerate_hz)s, %(payload_base64)s,
%(payload_bytes)s, %(decoded_format)s, %(message_type)s)
""",
- row,
+ rows,
)
conn.commit()
@@ -258,6 +261,11 @@ def main():
)
reader_thread.start()
+ BATCH_SIZE = 50
+ BATCH_TIMEOUT = 2.0 # seconds
+
+ batch: list[dict] = []
+ last_flush = time.time()
packet_count = 0
log.info("Listening for ADS-B messages on SBS port %d …", DUMP1090_SBS_PORT)
@@ -300,17 +308,25 @@ def main():
if parsed["msg_type"] not in KEEP_TYPES:
continue
- try:
- insert_packet(conn, row)
- packet_count += 1
- if packet_count % 50 == 0:
- log.info("Packets written: %d (last icao24=%s)", packet_count, parsed["icao24"])
- except Exception as e:
- log.error("Packet insert failed: %s", e)
+ batch.append(row)
+ packet_count += 1
+
+ now_ts = time.time()
+ if len(batch) >= BATCH_SIZE or (now_ts - last_flush) >= BATCH_TIMEOUT:
try:
- conn.rollback()
- except Exception:
- pass
+ insert_packets_batch(conn, batch)
+ if packet_count % 50 == 0:
+ log.info("Packets written: %d (last icao24=%s)", packet_count, parsed["icao24"])
+ batch = []
+ last_flush = now_ts
+ except Exception as e:
+ log.error("Batch insert failed: %s", e)
+ try:
+ conn.rollback()
+ except Exception:
+ pass
+ batch = []
+ last_flush = now_ts
# cleanup
log.info("Stopping dump1090 …")
@@ -320,6 +336,14 @@ def main():
except subprocess.TimeoutExpired:
dump1090_proc.kill()
+ # flush remaining batch
+ if batch:
+ try:
+ insert_packets_batch(conn, batch)
+ log.info("Flushed %d remaining packets", len(batch))
+ except Exception as e:
+ log.error("Final batch flush failed: %s", e)
+
close_capture_session(conn, capture_id)
conn.close()
log.info("Capture service stopped. Total packets: %d", packet_count)
diff --git a/tasks/flightradar24/monitoring/main.py b/tasks/flightradar24/monitoring/main.py
index 1e98f67..1ad56a1 100644
--- a/tasks/flightradar24/monitoring/main.py
+++ b/tasks/flightradar24/monitoring/main.py
@@ -80,10 +80,10 @@ def run_checks():
lag_sec = int(row[0]) if row and row[0] is not None else None
lag_str = f"{lag_sec}s" if lag_sec is not None else "N/A"
- # Throughput: packets in last 5 minutes
+ # Throughput: packets captured in last 5 minutes (by created_at = capture write time)
cur.execute(
"SELECT COUNT(*) FROM fr24.raw_packets "
- "WHERE observed_at >= now() - INTERVAL '5 minutes'"
+ "WHERE created_at >= now() - INTERVAL '5 minutes'"
)
throughput = cur.fetchone()[0]
@@ -91,16 +91,21 @@ def run_checks():
cur.execute(
"""
SELECT
- (SELECT COALESCE((state_value->>'last_raw_packet_id')::bigint, 0)
- FROM fr24.processing_state WHERE state_key = 'preprocess_cursor') AS cursor_id,
- (SELECT MAX(raw_packet_id) FROM fr24.raw_packets) AS max_id,
- (SELECT COUNT(*) FROM fr24.raw_packets) AS total
+ COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) AS cursor_id,
+ COUNT(*) AS total
+ FROM fr24.processing_state, fr24.raw_packets
+ WHERE state_key = 'preprocess_cursor'
+ GROUP BY cursor_id
"""
)
row = cur.fetchone()
- cursor_id, max_id, total_packets = row if row else (0, 0, 0)
- unprocessed = max(0, (max_id or 0) - (cursor_id or 0))
- unprocessed_pct = round(unprocessed / total_packets * 100, 1) if total_packets else 0
+ if row:
+ cursor_id, total_packets = row
+ cur.execute("SELECT COUNT(*) FROM fr24.raw_packets WHERE raw_packet_id > %s", (cursor_id,))
+ unprocessed = cur.fetchone()[0]
+ unprocessed_pct = round(unprocessed / total_packets * 100, 1) if total_packets else 0
+ else:
+ unprocessed, total_packets, unprocessed_pct = 0, 0, 0
# Write metrics row
disk_pct_int = int(disk_pct_str) if disk_pct_str not in ("?",) else None