183 lines
5.9 KiB
Python
183 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
|
"""FR24 monitoring service — checks disk, DB size, capture lag, throughput every 60s."""
|
|
|
|
import os
|
|
import time
|
|
import logging
|
|
import subprocess
|
|
import psycopg2
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(message)s",
|
|
datefmt="%Y-%m-%dT%H:%M:%S",
|
|
)
|
|
log = logging.getLogger("monitor")
|
|
|
|
DB_DSN = (
|
|
f"host={os.environ.get('POSTGRES_HOST', 'postgres')} "
|
|
f"port={os.environ.get('POSTGRES_PORT', '5432')} "
|
|
f"dbname={os.environ.get('POSTGRES_DB', 'fr24')} "
|
|
f"user={os.environ.get('POSTGRES_USER', 'fr24')} "
|
|
f"password={os.environ.get('POSTGRES_PASSWORD', 'change-me')}"
|
|
)
|
|
INTERVAL = int(os.environ.get("MONITORING_INTERVAL_SECONDS", "60"))
|
|
DISK_WARN_PCT = 80
|
|
LAG_WARN_SEC = 300 # 5 minutes
|
|
METRICS_KEEP = 100 # rows to retain in monitoring_metrics
|
|
|
|
|
|
def ensure_metrics_table(conn):
|
|
"""Create fr24.monitoring_metrics if it doesn't exist."""
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS fr24.monitoring_metrics (
|
|
id SERIAL PRIMARY KEY,
|
|
collected_at TIMESTAMPTZ DEFAULT now(),
|
|
disk_pct INTEGER,
|
|
db_size_mb FLOAT,
|
|
capture_lag_sec INTEGER,
|
|
throughput_5min INTEGER
|
|
)
|
|
""")
|
|
conn.commit()
|
|
log.info("monitoring_metrics table ready")
|
|
|
|
|
|
def get_disk_usage() -> str:
|
|
"""Return disk usage percent for / as integer string, e.g. '45'."""
|
|
try:
|
|
result = subprocess.run(
|
|
["df", "-P", "/"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
# last line: Filesystem 1024-blocks Used Available Capacity Mounted
|
|
line = result.stdout.strip().splitlines()[-1]
|
|
pct = line.split()[4].rstrip("%")
|
|
return pct
|
|
except Exception as e:
|
|
log.warning("disk check failed: %s", e)
|
|
return "?"
|
|
|
|
|
|
def run_checks():
|
|
disk_pct_str = get_disk_usage()
|
|
|
|
try:
|
|
conn = psycopg2.connect(DB_DSN, connect_timeout=5)
|
|
conn.autocommit = True
|
|
cur = conn.cursor()
|
|
|
|
# DB size
|
|
cur.execute("SELECT pg_database_size(current_database())")
|
|
db_bytes = cur.fetchone()[0]
|
|
db_size_gb = db_bytes / (1024 ** 3)
|
|
db_size_str = f"{db_size_gb:.2f}GB" if db_size_gb >= 1 else f"{db_bytes / (1024**2):.1f}MB"
|
|
|
|
# Capture lag
|
|
cur.execute("SELECT EXTRACT(EPOCH FROM (now() - MAX(observed_at))) FROM fr24.raw_packets")
|
|
row = cur.fetchone()
|
|
lag_sec = int(row[0]) if row and row[0] is not None else None
|
|
lag_str = f"{lag_sec}s" if lag_sec is not None else "N/A"
|
|
|
|
# Throughput: packets in last 5 minutes
|
|
cur.execute(
|
|
"SELECT COUNT(*) FROM fr24.raw_packets "
|
|
"WHERE observed_at >= now() - INTERVAL '5 minutes'"
|
|
)
|
|
throughput = cur.fetchone()[0]
|
|
|
|
# Unprocessed packets (cursor lag)
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
(SELECT COALESCE((state_value->>'last_raw_packet_id')::bigint, 0)
|
|
FROM fr24.processing_state WHERE state_key = 'preprocess_cursor') AS cursor_id,
|
|
(SELECT MAX(raw_packet_id) FROM fr24.raw_packets) AS max_id,
|
|
(SELECT COUNT(*) FROM fr24.raw_packets) AS total
|
|
"""
|
|
)
|
|
row = cur.fetchone()
|
|
cursor_id, max_id, total_packets = row if row else (0, 0, 0)
|
|
unprocessed = max(0, (max_id or 0) - (cursor_id or 0))
|
|
unprocessed_pct = round(unprocessed / total_packets * 100, 1) if total_packets else 0
|
|
|
|
# Write metrics row
|
|
disk_pct_int = int(disk_pct_str) if disk_pct_str not in ("?",) else None
|
|
db_size_mb = db_bytes / (1024 ** 2)
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO fr24.monitoring_metrics (disk_pct, db_size_mb, capture_lag_sec, throughput_5min)
|
|
VALUES (%s, %s, %s, %s)
|
|
""",
|
|
(disk_pct_int, round(db_size_mb, 2), lag_sec, int(throughput)),
|
|
)
|
|
# Prune old rows, keep last METRICS_KEEP
|
|
cur.execute(
|
|
"""
|
|
DELETE FROM fr24.monitoring_metrics
|
|
WHERE id NOT IN (
|
|
SELECT id FROM fr24.monitoring_metrics ORDER BY id DESC LIMIT %s
|
|
)
|
|
""",
|
|
(METRICS_KEEP,),
|
|
)
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
db_ok = True
|
|
except Exception as e:
|
|
log.warning("db check failed: %s", e)
|
|
db_size_str = "ERR"
|
|
lag_sec = None
|
|
lag_str = "ERR"
|
|
throughput = "ERR"
|
|
db_ok = False
|
|
|
|
# Emit metrics line
|
|
disk_display = f"{disk_pct_str}%" if disk_pct_str != "?" else "?"
|
|
print(
|
|
f"[monitor] disk={disk_display} db_size={db_size_str} "
|
|
f"capture_lag={lag_str} throughput={throughput}pkt/5min "
|
|
f"unprocessed={unprocessed}({unprocessed_pct}%)",
|
|
flush=True,
|
|
)
|
|
|
|
# Warnings
|
|
if disk_pct_str not in ("?",) and int(disk_pct_str) > DISK_WARN_PCT:
|
|
log.warning("DISK USAGE HIGH: %s%%", disk_pct_str)
|
|
|
|
if db_ok and lag_sec is not None and lag_sec > LAG_WARN_SEC:
|
|
log.warning("CAPTURE LAG HIGH: %ds (threshold %ds)", lag_sec, LAG_WARN_SEC)
|
|
|
|
|
|
def main():
|
|
log.info("FR24 monitoring started (interval=%ds)", INTERVAL)
|
|
|
|
# Ensure metrics table exists before first check
|
|
for attempt in range(10):
|
|
try:
|
|
conn = psycopg2.connect(DB_DSN, connect_timeout=5)
|
|
conn.autocommit = False
|
|
ensure_metrics_table(conn)
|
|
conn.close()
|
|
break
|
|
except Exception as e:
|
|
log.warning("waiting for DB (%d/10): %s", attempt + 1, e)
|
|
time.sleep(3)
|
|
|
|
# Signal readiness
|
|
open("/tmp/monitoring-ready", "w").close()
|
|
|
|
while True:
|
|
try:
|
|
run_checks()
|
|
except Exception as e:
|
|
log.error("unexpected error in run_checks: %s", e)
|
|
time.sleep(INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|