From f8ae5ccc60635c94bcd8948c12cf717c673d3ab9 Mon Sep 17 00:00:00 2001 From: Stream Date: Sun, 19 Apr 2026 14:50:01 +0300 Subject: [PATCH] auto-sync: 2026-04-19 14:50:01 --- .../flightradar24/compose/docker-compose.yml | 27 +- tasks/flightradar24/frontend/Dockerfile | 10 + tasks/flightradar24/frontend/README.md | 46 +++ tasks/flightradar24/frontend/main.py | 239 ++++++++++++++ tasks/flightradar24/frontend/requirements.txt | 2 + tasks/flightradar24/ingest/capture/Dockerfile | 10 + tasks/flightradar24/ingest/capture/README.md | 47 +++ tasks/flightradar24/ingest/capture/main.py | 176 ++++++++++ .../ingest/capture/requirements.txt | 1 + .../ingest/preprocess/Dockerfile | 10 + .../flightradar24/ingest/preprocess/README.md | 45 +++ tasks/flightradar24/ingest/preprocess/main.py | 300 ++++++++++++++++++ .../ingest/preprocess/requirements.txt | 1 + 13 files changed, 902 insertions(+), 12 deletions(-) create mode 100644 tasks/flightradar24/frontend/Dockerfile create mode 100644 tasks/flightradar24/frontend/README.md create mode 100644 tasks/flightradar24/frontend/main.py create mode 100644 tasks/flightradar24/frontend/requirements.txt create mode 100644 tasks/flightradar24/ingest/capture/Dockerfile create mode 100644 tasks/flightradar24/ingest/capture/README.md create mode 100644 tasks/flightradar24/ingest/capture/main.py create mode 100644 tasks/flightradar24/ingest/capture/requirements.txt create mode 100644 tasks/flightradar24/ingest/preprocess/Dockerfile create mode 100644 tasks/flightradar24/ingest/preprocess/README.md create mode 100644 tasks/flightradar24/ingest/preprocess/main.py create mode 100644 tasks/flightradar24/ingest/preprocess/requirements.txt diff --git a/tasks/flightradar24/compose/docker-compose.yml b/tasks/flightradar24/compose/docker-compose.yml index 6c4dd19..157b5ec 100644 --- a/tasks/flightradar24/compose/docker-compose.yml +++ b/tasks/flightradar24/compose/docker-compose.yml @@ -44,23 +44,24 @@ services: - fr24-net capture: - image: alpine:3.20 + build: + context: ../ingest/capture + dockerfile: Dockerfile + image: fr24-capture container_name: fr24-capture - command: ["sh", "-c", "echo 'capture placeholder: read RTL-SDR and write raw_packets'; tail -f /dev/null"] environment: <<: *common-env SERVICE_ROLE: capture devices: - "/dev/bus/usb:/dev/bus/usb" volumes: - - ../ingest:/app - ../logs/capture:/var/log/fr24 - ../backup:/backup depends_on: postgres: condition: service_healthy healthcheck: - test: ["CMD-SHELL", "test -f /tmp/capture-ready || exit 1"] + test: ["CMD-SHELL", "test -f /tmp/capture-ready && python -c 'import psycopg2' 2>/dev/null || exit 1"] interval: 30s timeout: 5s retries: 3 @@ -70,14 +71,15 @@ services: - fr24-net preprocess: - image: alpine:3.20 + build: + context: ../ingest/preprocess + dockerfile: Dockerfile + image: fr24-preprocess container_name: fr24-preprocess - command: ["sh", "-c", "echo 'preprocess placeholder: normalize data and build flights/tracks'; tail -f /dev/null"] environment: <<: *common-env SERVICE_ROLE: preprocess volumes: - - ../ingest:/app - ../logs/preprocess:/var/log/fr24 - ../backup:/backup depends_on: @@ -86,7 +88,7 @@ services: capture: condition: service_started healthcheck: - test: ["CMD-SHELL", "test -f /tmp/preprocess-ready || exit 1"] + test: ["CMD-SHELL", "test -f /tmp/preprocess-ready && python -c 'import psycopg2' 2>/dev/null || exit 1"] interval: 30s timeout: 5s retries: 3 @@ -96,9 +98,11 @@ services: - fr24-net api: - image: alpine:3.20 + build: + context: ../frontend + dockerfile: Dockerfile + image: fr24-api container_name: fr24-api - command: ["sh", "-c", "echo 'api placeholder: noisemap reader and UI endpoints'; tail -f /dev/null"] environment: <<: *common-env SERVICE_ROLE: api @@ -106,7 +110,6 @@ services: ports: - "${API_PUBLISHED_PORT:-8080}:8080" volumes: - - ../frontend:/app - ../logs/api:/var/log/fr24 depends_on: postgres: @@ -114,7 +117,7 @@ services: preprocess: condition: service_started healthcheck: - test: ["CMD-SHELL", "test -f /tmp/api-ready || exit 1"] + test: ["CMD-SHELL", "wget -qO- http://localhost:8080/health || exit 1"] interval: 30s timeout: 5s retries: 3 diff --git a/tasks/flightradar24/frontend/Dockerfile b/tasks/flightradar24/frontend/Dockerfile new file mode 100644 index 0000000..e95e63f --- /dev/null +++ b/tasks/flightradar24/frontend/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.13-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY main.py . + +CMD ["python", "-u", "main.py"] diff --git a/tasks/flightradar24/frontend/README.md b/tasks/flightradar24/frontend/README.md new file mode 100644 index 0000000..0417887 --- /dev/null +++ b/tasks/flightradar24/frontend/README.md @@ -0,0 +1,46 @@ +# API Service + +Minimal Flask HTTP API that reads from the `fr24` PostgreSQL schema and exposes data for the noisemap UI. + +## Endpoints + +| Method | Path | Description | +|---|---|---| +| GET | `/health` | API + DB liveness check | +| GET | `/dashboard/status` | Ingest pipeline stats (captures, packets, processing cursor, aircraft, flights) | +| GET | `/viewer/config` | Static system config for the map viewer | +| GET | `/captures` | List capture sessions (`?limit=50`) | +| GET | `/aircraft` | List aircraft (`?limit=100`) | +| GET | `/flights` | List flights (`?limit=100&status=active`) | + +## Dependencies + +- `flask` — HTTP server +- `psycopg2-binary` — PostgreSQL driver + +## Environment variables + +| Variable | Default | Description | +|---|---|---| +| `POSTGRES_HOST` | required | DB host | +| `POSTGRES_PORT` | `5432` | DB port | +| `POSTGRES_DB` | required | DB name | +| `POSTGRES_USER` | required | DB user | +| `POSTGRES_PASSWORD` | required | DB password | +| `API_PORT` | `8080` | Port to listen on | + +## Run locally + +```bash +pip install -r requirements.txt +export POSTGRES_HOST=localhost POSTGRES_DB=fr24 POSTGRES_USER=fr24 POSTGRES_PASSWORD=change-me +python main.py +# → http://localhost:8080/health +``` + +## Build & run via Docker + +```bash +docker build -t fr24-api . +docker run -p 8080:8080 --env-file ../../compose/.env fr24-api +``` diff --git a/tasks/flightradar24/frontend/main.py b/tasks/flightradar24/frontend/main.py new file mode 100644 index 0000000..f463123 --- /dev/null +++ b/tasks/flightradar24/frontend/main.py @@ -0,0 +1,239 @@ +""" +FR24 API Service +Minimal Flask API reading from PostgreSQL fr24 schema. +""" +import os +import time +import logging +from datetime import datetime, timezone +from functools import wraps + +import psycopg2 +import psycopg2.extras +from flask import Flask, jsonify, request + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [api] %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", +) +log = logging.getLogger("api") + +app = Flask(__name__) + +DB_DSN = ( + f"host={os.environ['POSTGRES_HOST']} " + f"port={os.environ.get('POSTGRES_PORT', 5432)} " + f"dbname={os.environ['POSTGRES_DB']} " + f"user={os.environ['POSTGRES_USER']} " + f"password={os.environ['POSTGRES_PASSWORD']}" +) +API_PORT = int(os.environ.get("API_PORT", 8080)) +HEALTHCHECK_FILE = "/tmp/api-ready" +START_TIME = datetime.now(timezone.utc) + +# ── db connection (simple persistent conn with reconnect) ───────────────────── + +_conn = None + +def get_conn(): + global _conn + if _conn is None or _conn.closed: + _conn = psycopg2.connect(DB_DSN) + psycopg2.extras.register_uuid(_conn) + log.info("DB connection established") + return _conn + + +def query(sql: str, params=None) -> list: + for attempt in range(2): + try: + conn = get_conn() + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute(sql, params) + return [dict(r) for r in cur.fetchall()] + except psycopg2.OperationalError: + global _conn + _conn = None + if attempt == 1: + raise + + +def query_one(sql: str, params=None) -> dict | None: + rows = query(sql, params) + return rows[0] if rows else None + + +# ── serialisation helper ────────────────────────────────────────────────────── + +def serial(obj): + """Make psycopg2 types JSON-serialisable.""" + import decimal, uuid + if isinstance(obj, (datetime,)): + return obj.isoformat() + if isinstance(obj, decimal.Decimal): + return float(obj) + if isinstance(obj, uuid.UUID): + return str(obj) + raise TypeError(f"Not serialisable: {type(obj)}") + + +def ok(data, **kwargs): + return app.response_class( + __import__("json").dumps(data, default=serial), + mimetype="application/json", + **kwargs, + ) + + +def err(msg: str, status: int = 500): + return ok({"error": msg}, status=status) + + +# ── routes ──────────────────────────────────────────────────────────────────── + +@app.get("/health") +def health(): + try: + query_one("SELECT 1") + db_ok = True + except Exception as e: + db_ok = False + return ok({ + "status": "ok" if db_ok else "degraded", + "db": "ok" if db_ok else "error", + "uptime_seconds": int((datetime.now(timezone.utc) - START_TIME).total_seconds()), + }, status=200 if db_ok else 503) + + +@app.get("/dashboard/status") +def dashboard_status(): + try: + captures = query_one("SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE status='active') AS active FROM fr24.captures") + packets = query_one("SELECT COUNT(*) AS total FROM fr24.raw_packets") + state = query_one("SELECT state_value FROM fr24.processing_state WHERE state_key='preprocess_cursor'") + aircraft = query_one("SELECT COUNT(*) AS total FROM fr24.aircraft") + flights = query_one("SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE status='active') AS active FROM fr24.flights") + return ok({ + "captures": captures, + "raw_packets": packets, + "processing_state": state["state_value"] if state else None, + "aircraft": aircraft, + "flights": flights, + }) + except Exception as e: + return err(str(e)) + + +@app.get("/viewer/config") +def viewer_config(): + return ok({ + "system": "fr24-ingest", + "version": "0.1.0-scaffold", + "stage": "step-1-fake-data", + "db_schema": "fr24", + "center": {"lat": 52.0, "lon": 27.0}, + "zoom": 6, + "features": { + "adsb_decode": False, + "real_rtlsdr": False, + "noise_model": False, + }, + }) + + +@app.get("/captures") +def captures(): + try: + limit = min(int(request.args.get("limit", 50)), 200) + rows = query( + """ + SELECT capture_id, started_at, ended_at, source, device_index, + center_frequency_hz, sample_rate_hz, gain_db, status, notes, created_at + FROM fr24.captures + ORDER BY started_at DESC + LIMIT %s + """, + (limit,), + ) + return ok({"captures": rows, "count": len(rows)}) + except Exception as e: + return err(str(e)) + + +@app.get("/aircraft") +def aircraft(): + try: + limit = min(int(request.args.get("limit", 100)), 500) + rows = query( + """ + SELECT aircraft_id, icao24, callsign, registration, aircraft_type, + operator_name, first_seen_at, last_seen_at + FROM fr24.aircraft + ORDER BY last_seen_at DESC NULLS LAST + LIMIT %s + """, + (limit,), + ) + return ok({"aircraft": rows, "count": len(rows)}) + except Exception as e: + return err(str(e)) + + +@app.get("/flights") +def flights(): + try: + limit = min(int(request.args.get("limit", 100)), 500) + status_filter = request.args.get("status") + if status_filter: + rows = query( + """ + SELECT f.flight_id, f.aircraft_id, a.icao24, f.callsign, + f.departure_airport, f.arrival_airport, + f.started_at, f.ended_at, f.status, f.source + FROM fr24.flights f + JOIN fr24.aircraft a USING (aircraft_id) + WHERE f.status = %s + ORDER BY f.started_at DESC + LIMIT %s + """, + (status_filter, limit), + ) + else: + rows = query( + """ + SELECT f.flight_id, f.aircraft_id, a.icao24, f.callsign, + f.departure_airport, f.arrival_airport, + f.started_at, f.ended_at, f.status, f.source + FROM fr24.flights f + JOIN fr24.aircraft a USING (aircraft_id) + ORDER BY f.started_at DESC + LIMIT %s + """, + (limit,), + ) + return ok({"flights": rows, "count": len(rows)}) + except Exception as e: + return err(str(e)) + + +# ── startup ─────────────────────────────────────────────────────────────────── + +def wait_for_db(max_attempts: int = 30): + for attempt in range(1, max_attempts + 1): + try: + get_conn() + return + except psycopg2.OperationalError as e: + log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e) + time.sleep(2) + log.error("Could not connect to DB") + raise SystemExit(1) + + +if __name__ == "__main__": + wait_for_db() + open(HEALTHCHECK_FILE, "w").close() + log.info("Healthcheck file written: %s", HEALTHCHECK_FILE) + log.info("Starting API on port %d", API_PORT) + app.run(host="0.0.0.0", port=API_PORT, debug=False) diff --git a/tasks/flightradar24/frontend/requirements.txt b/tasks/flightradar24/frontend/requirements.txt new file mode 100644 index 0000000..06e45d8 --- /dev/null +++ b/tasks/flightradar24/frontend/requirements.txt @@ -0,0 +1,2 @@ +flask==3.1.0 +psycopg2-binary==2.9.9 diff --git a/tasks/flightradar24/ingest/capture/Dockerfile b/tasks/flightradar24/ingest/capture/Dockerfile new file mode 100644 index 0000000..e95e63f --- /dev/null +++ b/tasks/flightradar24/ingest/capture/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.13-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY main.py . + +CMD ["python", "-u", "main.py"] diff --git a/tasks/flightradar24/ingest/capture/README.md b/tasks/flightradar24/ingest/capture/README.md new file mode 100644 index 0000000..bb56743 --- /dev/null +++ b/tasks/flightradar24/ingest/capture/README.md @@ -0,0 +1,47 @@ +# Capture Service + +Reads RTL-SDR (or generates fake test data) and writes raw ADS-B packets to PostgreSQL. + +## What it does + +1. Waits for PostgreSQL to be ready +2. Creates a row in `fr24.captures` with session metadata +3. Writes fake `fr24.raw_packets` rows every ~2 seconds (14-byte DF17 Mode-S format) +4. Touches `/tmp/capture-ready` for Docker healthcheck +5. On SIGTERM: marks the capture session as `stopped` + +In production, replace the fake packet loop in `main.py` with real RTL-SDR / dump1090 input. + +## Dependencies + +- `psycopg2-binary` — PostgreSQL driver + +## Environment variables + +| Variable | Default | Description | +|---|---|---| +| `POSTGRES_HOST` | required | DB host | +| `POSTGRES_PORT` | `5432` | DB port | +| `POSTGRES_DB` | required | DB name | +| `POSTGRES_USER` | required | DB user | +| `POSTGRES_PASSWORD` | required | DB password | +| `RTLSDR_CENTER_FREQUENCY` | `1090000000` | Center freq Hz | +| `RTLSDR_SAMPLE_RATE` | `2000000` | Sample rate Hz | +| `RTLSDR_DEVICE_INDEX` | `0` | RTL-SDR device index | +| `RTLSDR_GAIN` | `auto` | Gain dB or `auto` | +| `PACKET_INTERVAL_SECONDS` | `2.0` | Interval between fake packets | + +## Run locally + +```bash +pip install -r requirements.txt +export POSTGRES_HOST=localhost POSTGRES_DB=fr24 POSTGRES_USER=fr24 POSTGRES_PASSWORD=change-me +python main.py +``` + +## Build & run via Docker + +```bash +docker build -t fr24-capture . +docker run --env-file ../../compose/.env fr24-capture +``` diff --git a/tasks/flightradar24/ingest/capture/main.py b/tasks/flightradar24/ingest/capture/main.py new file mode 100644 index 0000000..15e9c4c --- /dev/null +++ b/tasks/flightradar24/ingest/capture/main.py @@ -0,0 +1,176 @@ +""" +FR24 Capture Service +Connects to PostgreSQL, creates a capture session, and writes fake raw_packets. +In production: replace the fake packet loop with actual RTL-SDR / dump1090 input. +""" +import os +import time +import base64 +import random +import logging +import signal +import sys +from datetime import datetime, timezone + +import psycopg2 +import psycopg2.extras + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [capture] %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", +) +log = logging.getLogger("capture") + +# ── config ──────────────────────────────────────────────────────────────────── +DB_DSN = ( + f"host={os.environ['POSTGRES_HOST']} " + f"port={os.environ.get('POSTGRES_PORT', 5432)} " + f"dbname={os.environ['POSTGRES_DB']} " + f"user={os.environ['POSTGRES_USER']} " + f"password={os.environ['POSTGRES_PASSWORD']}" +) +CENTER_FREQ = int(os.environ.get("RTLSDR_CENTER_FREQUENCY", 1090000000)) +SAMPLE_RATE = int(os.environ.get("RTLSDR_SAMPLE_RATE", 2000000)) +DEVICE_INDEX = int(os.environ.get("RTLSDR_DEVICE_INDEX", 0)) +GAIN_RAW = os.environ.get("RTLSDR_GAIN", "auto") +GAIN_DB = None if GAIN_RAW == "auto" else float(GAIN_RAW) +PACKET_INTERVAL = float(os.environ.get("PACKET_INTERVAL_SECONDS", 2.0)) +HEALTHCHECK_FILE = "/tmp/capture-ready" + +# ── fake ADS-B payload generator ───────────────────────────────────────────── +# Real ADS-B Mode-S messages are 7 or 14 bytes. +# We generate plausible-looking random bytes tagged as DF17 (extended squitter). +_ICAO_POOL = [f"{i:06X}" for i in random.sample(range(0x400000, 0xFFFFFF), 20)] + +def _fake_adsb_bytes() -> bytes: + """14-byte fake Mode-S extended squitter (DF17).""" + df17_first_byte = 0x8D # downlink format 17 + icao = bytes.fromhex(random.choice(_ICAO_POOL)) + payload = bytes([random.randint(0, 255) for _ in range(7)]) + crc = bytes([random.randint(0, 255) for _ in range(3)]) + return bytes([df17_first_byte]) + icao + payload + crc + +def _fake_packet_row(capture_id: str) -> dict: + raw = _fake_adsb_bytes() + return { + "capture_id": capture_id, + "observed_at": datetime.now(timezone.utc), + "partition_date": datetime.now(timezone.utc).date(), + "frequency_hz": CENTER_FREQ, + "rssi_dbm": round(random.uniform(-90.0, -40.0), 3), + "snr_db": round(random.uniform(5.0, 30.0), 3), + "samplerate_hz": SAMPLE_RATE, + "payload_base64": base64.b64encode(raw).decode(), + "payload_bytes": len(raw), + "decoded_format": "mode-s", + "message_type": "DF17", + } + +# ── db helpers ──────────────────────────────────────────────────────────────── +def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection: + for attempt in range(1, max_attempts + 1): + try: + conn = psycopg2.connect(DB_DSN) + log.info("PostgreSQL connected (attempt %d)", attempt) + return conn + except psycopg2.OperationalError as e: + log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e) + time.sleep(2) + log.error("Could not connect to PostgreSQL after %d attempts", max_attempts) + sys.exit(1) + +def create_capture_session(conn) -> str: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24.captures + (started_at, source, device_index, center_frequency_hz, + sample_rate_hz, gain_db, status, notes) + VALUES (%s, %s, %s, %s, %s, %s, 'active', %s) + RETURNING capture_id + """, + ( + datetime.now(timezone.utc), + "rtl-sdr", + DEVICE_INDEX, + CENTER_FREQ, + SAMPLE_RATE, + GAIN_DB, + "fake-data mode (step-1 scaffold)", + ), + ) + capture_id = str(cur.fetchone()[0]) + conn.commit() + log.info("Capture session created: %s", capture_id) + return capture_id + +def insert_packet(conn, row: dict): + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24.raw_packets + (capture_id, observed_at, partition_date, frequency_hz, + rssi_dbm, snr_db, samplerate_hz, payload_base64, + payload_bytes, decoded_format, message_type) + VALUES + (%(capture_id)s, %(observed_at)s, %(partition_date)s, %(frequency_hz)s, + %(rssi_dbm)s, %(snr_db)s, %(samplerate_hz)s, %(payload_base64)s, + %(payload_bytes)s, %(decoded_format)s, %(message_type)s) + """, + row, + ) + conn.commit() + +def close_capture_session(conn, capture_id: str): + with conn.cursor() as cur: + cur.execute( + "UPDATE fr24.captures SET ended_at=%s, status='stopped', updated_at=now() WHERE capture_id=%s", + (datetime.now(timezone.utc), capture_id), + ) + conn.commit() + log.info("Capture session closed: %s", capture_id) + +# ── main ────────────────────────────────────────────────────────────────────── +def main(): + conn = wait_for_db() + capture_id = create_capture_session(conn) + + # signal healthcheck + open(HEALTHCHECK_FILE, "w").close() + log.info("Healthcheck file written: %s", HEALTHCHECK_FILE) + + shutdown = {"flag": False} + + def _handle_signal(sig, frame): + log.info("Signal %s received, shutting down", sig) + shutdown["flag"] = True + + signal.signal(signal.SIGTERM, _handle_signal) + signal.signal(signal.SIGINT, _handle_signal) + + packet_count = 0 + log.info("Starting fake packet loop (interval=%.1fs)", PACKET_INTERVAL) + + while not shutdown["flag"]: + try: + row = _fake_packet_row(capture_id) + insert_packet(conn, row) + packet_count += 1 + if packet_count % 10 == 0: + log.info("Packets written: %d", packet_count) + except Exception as e: + log.error("Packet insert failed: %s", e) + try: + conn.rollback() + except Exception: + pass + + time.sleep(PACKET_INTERVAL) + + close_capture_session(conn, capture_id) + conn.close() + log.info("Capture service stopped. Total packets: %d", packet_count) + +if __name__ == "__main__": + main() diff --git a/tasks/flightradar24/ingest/capture/requirements.txt b/tasks/flightradar24/ingest/capture/requirements.txt new file mode 100644 index 0000000..58ab769 --- /dev/null +++ b/tasks/flightradar24/ingest/capture/requirements.txt @@ -0,0 +1 @@ +psycopg2-binary==2.9.9 diff --git a/tasks/flightradar24/ingest/preprocess/Dockerfile b/tasks/flightradar24/ingest/preprocess/Dockerfile new file mode 100644 index 0000000..e95e63f --- /dev/null +++ b/tasks/flightradar24/ingest/preprocess/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.13-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY main.py . + +CMD ["python", "-u", "main.py"] diff --git a/tasks/flightradar24/ingest/preprocess/README.md b/tasks/flightradar24/ingest/preprocess/README.md new file mode 100644 index 0000000..787b77b --- /dev/null +++ b/tasks/flightradar24/ingest/preprocess/README.md @@ -0,0 +1,45 @@ +# Preprocess Service + +Reads unprocessed `raw_packets` from PostgreSQL and builds aircraft, flights, tracks, and track_points. + +## What it does + +1. Waits for PostgreSQL to be ready +2. Reads `fr24.processing_state` to find the last processed `raw_packet_id` +3. Fetches the next batch of unprocessed packets +4. For each packet: upserts `aircraft`, creates/reuses a `flight`, appends a `track_point` +5. Advances the cursor in `processing_state` +6. Touches `/tmp/preprocess-ready` for Docker healthcheck + +Data is fake/test for this stage — real ADS-B decoding comes later. + +## Dependencies + +- `psycopg2-binary` + +## Environment variables + +| Variable | Default | Description | +|---|---|---| +| `POSTGRES_HOST` | required | DB host | +| `POSTGRES_PORT` | `5432` | DB port | +| `POSTGRES_DB` | required | DB name | +| `POSTGRES_USER` | required | DB user | +| `POSTGRES_PASSWORD` | required | DB password | +| `POLL_INTERVAL_SECONDS` | `5.0` | How often to poll for new packets | +| `BATCH_SIZE` | `20` | Packets per processing batch | + +## Run locally + +```bash +pip install -r requirements.txt +export POSTGRES_HOST=localhost POSTGRES_DB=fr24 POSTGRES_USER=fr24 POSTGRES_PASSWORD=change-me +python main.py +``` + +## Build & run via Docker + +```bash +docker build -t fr24-preprocess . +docker run --env-file ../../compose/.env fr24-preprocess +``` diff --git a/tasks/flightradar24/ingest/preprocess/main.py b/tasks/flightradar24/ingest/preprocess/main.py new file mode 100644 index 0000000..77ef858 --- /dev/null +++ b/tasks/flightradar24/ingest/preprocess/main.py @@ -0,0 +1,300 @@ +""" +FR24 Preprocess Service +Reads unprocessed raw_packets, builds fake aircraft/flights/tracks/track_points, +and advances the processing_state cursor. +""" +import os +import time +import random +import logging +import signal +import sys +import json +from datetime import datetime, timezone + +import psycopg2 +import psycopg2.extras + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [preprocess] %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", +) +log = logging.getLogger("preprocess") + +DB_DSN = ( + f"host={os.environ['POSTGRES_HOST']} " + f"port={os.environ.get('POSTGRES_PORT', 5432)} " + f"dbname={os.environ['POSTGRES_DB']} " + f"user={os.environ['POSTGRES_USER']} " + f"password={os.environ['POSTGRES_PASSWORD']}" +) +POLL_INTERVAL = float(os.environ.get("POLL_INTERVAL_SECONDS", 5.0)) +BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 20)) +HEALTHCHECK_FILE = "/tmp/preprocess-ready" +STATE_KEY = "preprocess_cursor" + +# Bounding box: roughly Central Europe for plausible fake coords +LAT_MIN, LAT_MAX = 48.0, 56.0 +LON_MIN, LON_MAX = 14.0, 40.0 + + +# ── db ──────────────────────────────────────────────────────────────────────── + +def wait_for_db(max_attempts: int = 30) -> psycopg2.extensions.connection: + for attempt in range(1, max_attempts + 1): + try: + conn = psycopg2.connect(DB_DSN) + psycopg2.extras.register_uuid(conn) + log.info("PostgreSQL connected (attempt %d)", attempt) + return conn + except psycopg2.OperationalError as e: + log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e) + time.sleep(2) + log.error("Could not connect to PostgreSQL after %d attempts", max_attempts) + sys.exit(1) + + +def get_cursor(conn) -> int: + """Return last processed raw_packet_id (0 if none).""" + with conn.cursor() as cur: + cur.execute( + "SELECT state_value FROM fr24.processing_state WHERE state_key = %s", + (STATE_KEY,), + ) + row = cur.fetchone() + if row: + return int(row[0].get("last_raw_packet_id", 0)) + return 0 + + +def set_cursor(conn, last_id: int): + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24.processing_state (state_key, state_value, updated_at, note) + VALUES (%s, %s::jsonb, now(), 'preprocess cursor') + ON CONFLICT (state_key) DO UPDATE + SET state_value = EXCLUDED.state_value, + updated_at = now() + """, + (STATE_KEY, json.dumps({"last_raw_packet_id": last_id})), + ) + conn.commit() + + +def fetch_unprocessed(conn, after_id: int, limit: int) -> list: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + """ + SELECT raw_packet_id, capture_id, observed_at, payload_base64, message_type + FROM fr24.raw_packets + WHERE raw_packet_id > %s + ORDER BY raw_packet_id + LIMIT %s + """, + (after_id, limit), + ) + return cur.fetchall() + + +# ── upsert helpers ──────────────────────────────────────────────────────────── + +def upsert_aircraft(conn, icao24: str) -> int: + now = datetime.now(timezone.utc) + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24.aircraft + (icao24, callsign, registration, aircraft_type, first_seen_at, last_seen_at) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (icao24) DO UPDATE + SET last_seen_at = EXCLUDED.last_seen_at, + updated_at = now() + RETURNING aircraft_id + """, + ( + icao24, + f"TEST{icao24[:4]}", + f"RA-{random.randint(10000,99999)}", + random.choice(["B738", "A320", "B77W", "A321", "E190"]), + now, + now, + ), + ) + return cur.fetchone()[0] + + +def get_or_create_flight(conn, aircraft_id: int, observed_at) -> int: + with conn.cursor() as cur: + # reuse an active flight for this aircraft if one exists + cur.execute( + """ + SELECT flight_id FROM fr24.flights + WHERE aircraft_id = %s AND status = 'active' + ORDER BY started_at DESC LIMIT 1 + """, + (aircraft_id,), + ) + row = cur.fetchone() + if row: + return row[0] + cur.execute( + """ + INSERT INTO fr24.flights + (aircraft_id, started_at, status, source, + departure_airport, arrival_airport, callsign) + VALUES (%s, %s, 'active', 'rtl-sdr', %s, %s, %s) + RETURNING flight_id + """, + ( + aircraft_id, + observed_at, + random.choice(["SVO", "DME", "VKO", "LED", "AER"]), + random.choice(["IST", "FRA", "AMS", "CDG", "LHR"]), + f"AFL{random.randint(100,999)}", + ), + ) + return cur.fetchone()[0] + + +def get_or_create_track(conn, flight_id: int) -> int: + with conn.cursor() as cur: + cur.execute( + "SELECT track_id FROM fr24.tracks WHERE flight_id = %s LIMIT 1", + (flight_id,), + ) + row = cur.fetchone() + if row: + return row[0] + cur.execute( + "INSERT INTO fr24.tracks (flight_id) VALUES (%s) RETURNING track_id", + (flight_id,), + ) + return cur.fetchone()[0] + + +def append_track_point(conn, track_id: int, flight_id: int, observed_at, + raw_packet_id: int, partition_date) -> None: + lat = round(random.uniform(LAT_MIN, LAT_MAX), 6) + lon = round(random.uniform(LON_MIN, LON_MAX), 6) + alt = round(random.uniform(1000, 12000), 2) + + with conn.cursor() as cur: + # next point_order + cur.execute( + "SELECT COALESCE(MAX(point_order), 0) + 1 FROM fr24.track_points WHERE track_id = %s", + (track_id,), + ) + point_order = cur.fetchone()[0] + + cur.execute( + """ + INSERT INTO fr24.track_points + (track_id, flight_id, observed_at, point_order, geom, + altitude_m, ground_speed_kt, vertical_rate_fpm, heading_deg, + source_packet_id, source_partition_date) + VALUES + (%s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326), + %s, %s, %s, %s, %s, %s) + """, + ( + track_id, flight_id, observed_at, point_order, lon, lat, + alt, + round(random.uniform(200, 500), 2), + round(random.uniform(-1000, 1000), 2), + round(random.uniform(0, 360), 2), + raw_packet_id, + partition_date, + ), + ) + + # update track aggregate + cur.execute( + """ + UPDATE fr24.tracks SET + point_count = point_count + 1, + last_point_at = %s, + min_altitude_m = LEAST(COALESCE(min_altitude_m, %s), %s), + max_altitude_m = GREATEST(COALESCE(max_altitude_m, %s), %s), + updated_at = now() + WHERE track_id = %s + """, + (observed_at, alt, alt, alt, alt, track_id), + ) + + +# ── main loop ───────────────────────────────────────────────────────────────── + +def process_batch(conn, packets: list) -> int: + """Process a batch of raw packets. Returns last processed raw_packet_id.""" + last_id = 0 + for pkt in packets: + raw_id = pkt["raw_packet_id"] + observed_at = pkt["observed_at"] + partition_date = observed_at.date() if hasattr(observed_at, "date") else None + + # derive a fake ICAO24 from the packet id (deterministic per id) + icao24 = f"{(raw_id * 7919) % 0xFFFFFF:06X}" + + try: + aircraft_id = upsert_aircraft(conn, icao24) + flight_id = get_or_create_flight(conn, aircraft_id, observed_at) + track_id = get_or_create_track(conn, flight_id) + append_track_point(conn, track_id, flight_id, observed_at, raw_id, partition_date) + conn.commit() + last_id = raw_id + except Exception as e: + log.error("Failed processing packet %s: %s", raw_id, e) + conn.rollback() + + return last_id + + +def main(): + conn = wait_for_db(30) + + open(HEALTHCHECK_FILE, "w").close() + log.info("Healthcheck file written: %s", HEALTHCHECK_FILE) + + shutdown = {"flag": False} + + def _handle_signal(sig, frame): + log.info("Signal %s — shutting down", sig) + shutdown["flag"] = True + + signal.signal(signal.SIGTERM, _handle_signal) + signal.signal(signal.SIGINT, _handle_signal) + + total = 0 + log.info("Preprocess loop started (poll=%.1fs, batch=%d)", POLL_INTERVAL, BATCH_SIZE) + + while not shutdown["flag"]: + try: + cursor = get_cursor(conn) + packets = fetch_unprocessed(conn, cursor, BATCH_SIZE) + + if packets: + last_id = process_batch(conn, packets) + if last_id: + set_cursor(conn, last_id) + total += len(packets) + log.info("Processed %d packets (cursor→%d, total=%d)", len(packets), last_id, total) + else: + log.debug("No new packets, sleeping") + + except Exception as e: + log.error("Poll error: %s", e) + try: + conn.rollback() + except Exception: + pass + + time.sleep(POLL_INTERVAL) + + conn.close() + log.info("Preprocess service stopped. Total processed: %d", total) + + +if __name__ == "__main__": + main() diff --git a/tasks/flightradar24/ingest/preprocess/requirements.txt b/tasks/flightradar24/ingest/preprocess/requirements.txt new file mode 100644 index 0000000..58ab769 --- /dev/null +++ b/tasks/flightradar24/ingest/preprocess/requirements.txt @@ -0,0 +1 @@ +psycopg2-binary==2.9.9