auto-sync: 2026-04-20 02:30:01
This commit is contained in:
@@ -359,17 +359,20 @@ def monitoring_status():
|
||||
unprocessed = query_one(
|
||||
"""
|
||||
SELECT
|
||||
(SELECT COALESCE((state_value->>'last_raw_packet_id')::bigint, 0)
|
||||
FROM fr24.processing_state WHERE state_key = 'preprocess_cursor') AS cursor_id,
|
||||
(SELECT MAX(raw_packet_id) FROM fr24.raw_packets) AS max_id,
|
||||
COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) AS cursor_id,
|
||||
(SELECT COUNT(*) FROM fr24.raw_packets) AS total
|
||||
FROM fr24.processing_state
|
||||
WHERE state_key = 'preprocess_cursor'
|
||||
"""
|
||||
)
|
||||
if unprocessed:
|
||||
cursor_id = unprocessed["cursor_id"] or 0
|
||||
max_id = unprocessed["max_id"] or 0
|
||||
total = unprocessed["total"] or 0
|
||||
pending = max(0, max_id - cursor_id)
|
||||
pending_rows = query_one(
|
||||
"SELECT COUNT(*) as cnt FROM fr24.raw_packets WHERE raw_packet_id > %s",
|
||||
(cursor_id,)
|
||||
)
|
||||
pending = pending_rows["cnt"] if pending_rows else 0
|
||||
pending_pct = round(pending / total * 100, 1) if total else 0
|
||||
unprocessed_info = {"pending": pending, "pending_pct": pending_pct, "total": total}
|
||||
else:
|
||||
|
||||
@@ -181,7 +181,7 @@
|
||||
<th>Disk %</th>
|
||||
<th>DB Size</th>
|
||||
<th>Lag (сек)</th>
|
||||
<th>Throughput</th>
|
||||
<th>Capture / 5min</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody id="history-body">
|
||||
|
||||
@@ -194,9 +194,12 @@ def create_capture_session(conn) -> str:
|
||||
return capture_id
|
||||
|
||||
|
||||
def insert_packet(conn, row: dict):
|
||||
def insert_packets_batch(conn, rows: list[dict]):
|
||||
"""Insert a batch of packets in a single transaction."""
|
||||
if not rows:
|
||||
return
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
cur.executemany(
|
||||
"""
|
||||
INSERT INTO fr24.raw_packets
|
||||
(capture_id, observed_at, partition_date, frequency_hz,
|
||||
@@ -207,7 +210,7 @@ def insert_packet(conn, row: dict):
|
||||
%(rssi_dbm)s, %(snr_db)s, %(samplerate_hz)s, %(payload_base64)s,
|
||||
%(payload_bytes)s, %(decoded_format)s, %(message_type)s)
|
||||
""",
|
||||
row,
|
||||
rows,
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
@@ -258,6 +261,11 @@ def main():
|
||||
)
|
||||
reader_thread.start()
|
||||
|
||||
BATCH_SIZE = 50
|
||||
BATCH_TIMEOUT = 2.0 # seconds
|
||||
|
||||
batch: list[dict] = []
|
||||
last_flush = time.time()
|
||||
packet_count = 0
|
||||
log.info("Listening for ADS-B messages on SBS port %d …", DUMP1090_SBS_PORT)
|
||||
|
||||
@@ -300,17 +308,25 @@ def main():
|
||||
if parsed["msg_type"] not in KEEP_TYPES:
|
||||
continue
|
||||
|
||||
try:
|
||||
insert_packet(conn, row)
|
||||
packet_count += 1
|
||||
if packet_count % 50 == 0:
|
||||
log.info("Packets written: %d (last icao24=%s)", packet_count, parsed["icao24"])
|
||||
except Exception as e:
|
||||
log.error("Packet insert failed: %s", e)
|
||||
batch.append(row)
|
||||
packet_count += 1
|
||||
|
||||
now_ts = time.time()
|
||||
if len(batch) >= BATCH_SIZE or (now_ts - last_flush) >= BATCH_TIMEOUT:
|
||||
try:
|
||||
conn.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
insert_packets_batch(conn, batch)
|
||||
if packet_count % 50 == 0:
|
||||
log.info("Packets written: %d (last icao24=%s)", packet_count, parsed["icao24"])
|
||||
batch = []
|
||||
last_flush = now_ts
|
||||
except Exception as e:
|
||||
log.error("Batch insert failed: %s", e)
|
||||
try:
|
||||
conn.rollback()
|
||||
except Exception:
|
||||
pass
|
||||
batch = []
|
||||
last_flush = now_ts
|
||||
|
||||
# cleanup
|
||||
log.info("Stopping dump1090 …")
|
||||
@@ -320,6 +336,14 @@ def main():
|
||||
except subprocess.TimeoutExpired:
|
||||
dump1090_proc.kill()
|
||||
|
||||
# flush remaining batch
|
||||
if batch:
|
||||
try:
|
||||
insert_packets_batch(conn, batch)
|
||||
log.info("Flushed %d remaining packets", len(batch))
|
||||
except Exception as e:
|
||||
log.error("Final batch flush failed: %s", e)
|
||||
|
||||
close_capture_session(conn, capture_id)
|
||||
conn.close()
|
||||
log.info("Capture service stopped. Total packets: %d", packet_count)
|
||||
|
||||
@@ -80,10 +80,10 @@ def run_checks():
|
||||
lag_sec = int(row[0]) if row and row[0] is not None else None
|
||||
lag_str = f"{lag_sec}s" if lag_sec is not None else "N/A"
|
||||
|
||||
# Throughput: packets in last 5 minutes
|
||||
# Throughput: packets captured in last 5 minutes (by created_at = capture write time)
|
||||
cur.execute(
|
||||
"SELECT COUNT(*) FROM fr24.raw_packets "
|
||||
"WHERE observed_at >= now() - INTERVAL '5 minutes'"
|
||||
"WHERE created_at >= now() - INTERVAL '5 minutes'"
|
||||
)
|
||||
throughput = cur.fetchone()[0]
|
||||
|
||||
@@ -91,16 +91,21 @@ def run_checks():
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
(SELECT COALESCE((state_value->>'last_raw_packet_id')::bigint, 0)
|
||||
FROM fr24.processing_state WHERE state_key = 'preprocess_cursor') AS cursor_id,
|
||||
(SELECT MAX(raw_packet_id) FROM fr24.raw_packets) AS max_id,
|
||||
(SELECT COUNT(*) FROM fr24.raw_packets) AS total
|
||||
COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) AS cursor_id,
|
||||
COUNT(*) AS total
|
||||
FROM fr24.processing_state, fr24.raw_packets
|
||||
WHERE state_key = 'preprocess_cursor'
|
||||
GROUP BY cursor_id
|
||||
"""
|
||||
)
|
||||
row = cur.fetchone()
|
||||
cursor_id, max_id, total_packets = row if row else (0, 0, 0)
|
||||
unprocessed = max(0, (max_id or 0) - (cursor_id or 0))
|
||||
unprocessed_pct = round(unprocessed / total_packets * 100, 1) if total_packets else 0
|
||||
if row:
|
||||
cursor_id, total_packets = row
|
||||
cur.execute("SELECT COUNT(*) FROM fr24.raw_packets WHERE raw_packet_id > %s", (cursor_id,))
|
||||
unprocessed = cur.fetchone()[0]
|
||||
unprocessed_pct = round(unprocessed / total_packets * 100, 1) if total_packets else 0
|
||||
else:
|
||||
unprocessed, total_packets, unprocessed_pct = 0, 0, 0
|
||||
|
||||
# Write metrics row
|
||||
disk_pct_int = int(disk_pct_str) if disk_pct_str not in ("?",) else None
|
||||
|
||||
Reference in New Issue
Block a user