From bfcb028189887d919aa51291354fafd213c44361 Mon Sep 17 00:00:00 2001 From: Stream Date: Mon, 20 Apr 2026 02:30:01 +0300 Subject: [PATCH] auto-sync: 2026-04-20 02:30:01 --- tasks/flightradar24/frontend/main.py | 13 +++-- .../frontend/static/monitoring.html | 2 +- tasks/flightradar24/ingest/capture/main.py | 50 ++++++++++++++----- tasks/flightradar24/monitoring/main.py | 23 +++++---- 4 files changed, 60 insertions(+), 28 deletions(-) diff --git a/tasks/flightradar24/frontend/main.py b/tasks/flightradar24/frontend/main.py index 514ab9d..ae6ab62 100644 --- a/tasks/flightradar24/frontend/main.py +++ b/tasks/flightradar24/frontend/main.py @@ -359,17 +359,20 @@ def monitoring_status(): unprocessed = query_one( """ SELECT - (SELECT COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) - FROM fr24.processing_state WHERE state_key = 'preprocess_cursor') AS cursor_id, - (SELECT MAX(raw_packet_id) FROM fr24.raw_packets) AS max_id, + COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) AS cursor_id, (SELECT COUNT(*) FROM fr24.raw_packets) AS total + FROM fr24.processing_state + WHERE state_key = 'preprocess_cursor' """ ) if unprocessed: cursor_id = unprocessed["cursor_id"] or 0 - max_id = unprocessed["max_id"] or 0 total = unprocessed["total"] or 0 - pending = max(0, max_id - cursor_id) + pending_rows = query_one( + "SELECT COUNT(*) as cnt FROM fr24.raw_packets WHERE raw_packet_id > %s", + (cursor_id,) + ) + pending = pending_rows["cnt"] if pending_rows else 0 pending_pct = round(pending / total * 100, 1) if total else 0 unprocessed_info = {"pending": pending, "pending_pct": pending_pct, "total": total} else: diff --git a/tasks/flightradar24/frontend/static/monitoring.html b/tasks/flightradar24/frontend/static/monitoring.html index ad412ea..ae5f982 100644 --- a/tasks/flightradar24/frontend/static/monitoring.html +++ b/tasks/flightradar24/frontend/static/monitoring.html @@ -181,7 +181,7 @@ Disk % DB Size Lag (сек) - Throughput + Capture / 5min diff --git a/tasks/flightradar24/ingest/capture/main.py b/tasks/flightradar24/ingest/capture/main.py index d2de8a5..ee06aa4 100644 --- a/tasks/flightradar24/ingest/capture/main.py +++ b/tasks/flightradar24/ingest/capture/main.py @@ -194,9 +194,12 @@ def create_capture_session(conn) -> str: return capture_id -def insert_packet(conn, row: dict): +def insert_packets_batch(conn, rows: list[dict]): + """Insert a batch of packets in a single transaction.""" + if not rows: + return with conn.cursor() as cur: - cur.execute( + cur.executemany( """ INSERT INTO fr24.raw_packets (capture_id, observed_at, partition_date, frequency_hz, @@ -207,7 +210,7 @@ def insert_packet(conn, row: dict): %(rssi_dbm)s, %(snr_db)s, %(samplerate_hz)s, %(payload_base64)s, %(payload_bytes)s, %(decoded_format)s, %(message_type)s) """, - row, + rows, ) conn.commit() @@ -258,6 +261,11 @@ def main(): ) reader_thread.start() + BATCH_SIZE = 50 + BATCH_TIMEOUT = 2.0 # seconds + + batch: list[dict] = [] + last_flush = time.time() packet_count = 0 log.info("Listening for ADS-B messages on SBS port %d …", DUMP1090_SBS_PORT) @@ -300,17 +308,25 @@ def main(): if parsed["msg_type"] not in KEEP_TYPES: continue - try: - insert_packet(conn, row) - packet_count += 1 - if packet_count % 50 == 0: - log.info("Packets written: %d (last icao24=%s)", packet_count, parsed["icao24"]) - except Exception as e: - log.error("Packet insert failed: %s", e) + batch.append(row) + packet_count += 1 + + now_ts = time.time() + if len(batch) >= BATCH_SIZE or (now_ts - last_flush) >= BATCH_TIMEOUT: try: - conn.rollback() - except Exception: - pass + insert_packets_batch(conn, batch) + if packet_count % 50 == 0: + log.info("Packets written: %d (last icao24=%s)", packet_count, parsed["icao24"]) + batch = [] + last_flush = now_ts + except Exception as e: + log.error("Batch insert failed: %s", e) + try: + conn.rollback() + except Exception: + pass + batch = [] + last_flush = now_ts # cleanup log.info("Stopping dump1090 …") @@ -320,6 +336,14 @@ def main(): except subprocess.TimeoutExpired: dump1090_proc.kill() + # flush remaining batch + if batch: + try: + insert_packets_batch(conn, batch) + log.info("Flushed %d remaining packets", len(batch)) + except Exception as e: + log.error("Final batch flush failed: %s", e) + close_capture_session(conn, capture_id) conn.close() log.info("Capture service stopped. Total packets: %d", packet_count) diff --git a/tasks/flightradar24/monitoring/main.py b/tasks/flightradar24/monitoring/main.py index 1e98f67..1ad56a1 100644 --- a/tasks/flightradar24/monitoring/main.py +++ b/tasks/flightradar24/monitoring/main.py @@ -80,10 +80,10 @@ def run_checks(): lag_sec = int(row[0]) if row and row[0] is not None else None lag_str = f"{lag_sec}s" if lag_sec is not None else "N/A" - # Throughput: packets in last 5 minutes + # Throughput: packets captured in last 5 minutes (by created_at = capture write time) cur.execute( "SELECT COUNT(*) FROM fr24.raw_packets " - "WHERE observed_at >= now() - INTERVAL '5 minutes'" + "WHERE created_at >= now() - INTERVAL '5 minutes'" ) throughput = cur.fetchone()[0] @@ -91,16 +91,21 @@ def run_checks(): cur.execute( """ SELECT - (SELECT COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) - FROM fr24.processing_state WHERE state_key = 'preprocess_cursor') AS cursor_id, - (SELECT MAX(raw_packet_id) FROM fr24.raw_packets) AS max_id, - (SELECT COUNT(*) FROM fr24.raw_packets) AS total + COALESCE((state_value->>'last_raw_packet_id')::bigint, 0) AS cursor_id, + COUNT(*) AS total + FROM fr24.processing_state, fr24.raw_packets + WHERE state_key = 'preprocess_cursor' + GROUP BY cursor_id """ ) row = cur.fetchone() - cursor_id, max_id, total_packets = row if row else (0, 0, 0) - unprocessed = max(0, (max_id or 0) - (cursor_id or 0)) - unprocessed_pct = round(unprocessed / total_packets * 100, 1) if total_packets else 0 + if row: + cursor_id, total_packets = row + cur.execute("SELECT COUNT(*) FROM fr24.raw_packets WHERE raw_packet_id > %s", (cursor_id,)) + unprocessed = cur.fetchone()[0] + unprocessed_pct = round(unprocessed / total_packets * 100, 1) if total_packets else 0 + else: + unprocessed, total_packets, unprocessed_pct = 0, 0, 0 # Write metrics row disk_pct_int = int(disk_pct_str) if disk_pct_str not in ("?",) else None