From 1ad89ff1bd02e891b021d8bd79f146bb57a6aedb Mon Sep 17 00:00:00 2001 From: Stream Date: Mon, 20 Apr 2026 23:20:01 +0300 Subject: [PATCH] auto-sync: 2026-04-20 23:20:01 --- .../flightradar24/compose/docker-compose.yml | 87 ++++ .../db/init/005_schema_tracks.sql | 151 +++++++ tasks/flightradar24/frontend/main.py | 157 +++++++ .../frontend/static/data_sources.html | 123 +++++ .../frontend/static/data_sources.js | 195 ++++++++ tasks/flightradar24/ingest/mart/Dockerfile | 6 + tasks/flightradar24/ingest/mart/build_mart.py | 419 ++++++++++++++++++ tasks/flightradar24/ingest/mart/config.py | 26 ++ tasks/flightradar24/ingest/mart/main.py | 120 +++++ .../flightradar24/ingest/mart/noise_model.py | 296 +++++++++++++ .../ingest/mart/requirements.txt | 3 + .../flightradar24/ingest/tracks_fa/Dockerfile | 6 + .../flightradar24/ingest/tracks_fa/config.py | 30 ++ .../ingest/tracks_fa/fa_worker.py | 200 +++++++++ tasks/flightradar24/ingest/tracks_fa/main.py | 98 ++++ .../ingest/tracks_fa/requirements.txt | 3 + .../ingest/tracks_fr24/Dockerfile | 6 + .../ingest/tracks_fr24/config.py | 36 ++ .../ingest/tracks_fr24/fr24_worker.py | 198 +++++++++ .../flightradar24/ingest/tracks_fr24/main.py | 98 ++++ .../ingest/tracks_fr24/requirements.txt | 3 + 21 files changed, 2261 insertions(+) create mode 100644 tasks/flightradar24/db/init/005_schema_tracks.sql create mode 100644 tasks/flightradar24/frontend/static/data_sources.html create mode 100644 tasks/flightradar24/frontend/static/data_sources.js create mode 100644 tasks/flightradar24/ingest/mart/Dockerfile create mode 100644 tasks/flightradar24/ingest/mart/build_mart.py create mode 100644 tasks/flightradar24/ingest/mart/config.py create mode 100644 tasks/flightradar24/ingest/mart/main.py create mode 100644 tasks/flightradar24/ingest/mart/noise_model.py create mode 100644 tasks/flightradar24/ingest/mart/requirements.txt create mode 100644 tasks/flightradar24/ingest/tracks_fa/Dockerfile create mode 100644 tasks/flightradar24/ingest/tracks_fa/config.py create mode 100644 tasks/flightradar24/ingest/tracks_fa/fa_worker.py create mode 100644 tasks/flightradar24/ingest/tracks_fa/main.py create mode 100644 tasks/flightradar24/ingest/tracks_fa/requirements.txt create mode 100644 tasks/flightradar24/ingest/tracks_fr24/Dockerfile create mode 100644 tasks/flightradar24/ingest/tracks_fr24/config.py create mode 100644 tasks/flightradar24/ingest/tracks_fr24/fr24_worker.py create mode 100644 tasks/flightradar24/ingest/tracks_fr24/main.py create mode 100644 tasks/flightradar24/ingest/tracks_fr24/requirements.txt diff --git a/tasks/flightradar24/compose/docker-compose.yml b/tasks/flightradar24/compose/docker-compose.yml index 14d2885..02bface 100644 --- a/tasks/flightradar24/compose/docker-compose.yml +++ b/tasks/flightradar24/compose/docker-compose.yml @@ -215,6 +215,93 @@ services: networks: - fr24-net + fr24-tracks-fr24: + build: + context: ../ingest/tracks_fr24 + dockerfile: Dockerfile + image: fr24-tracks-fr24 + container_name: fr24-tracks-fr24 + environment: + POSTGRES_HOST: ${POSTGRES_HOST:-postgres} + POSTGRES_PORT: ${POSTGRES_PORT:-5432} + POSTGRES_DB: ${POSTGRES_DB:-fr24} + POSTGRES_USER: ${POSTGRES_USER:-fr24} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change-me} + FR24_API_KEY: ${FR24_API_KEY:-} + FR24_RATE_LIMIT_SEC: ${FR24_RATE_LIMIT_SEC:-6.0} + TZ: ${TZ:-UTC} + ports: + - "${FR24_TRACKS_PORT:-8001}:8001" + depends_on: + postgres: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "python -c 'import urllib.request; urllib.request.urlopen(\"http://localhost:8001/health\", timeout=3)' || exit 1"] + interval: 60s + timeout: 10s + retries: 3 + start_period: 20s + restart: unless-stopped + networks: + - fr24-net + + fr24-tracks-fa: + build: + context: ../ingest/tracks_fa + dockerfile: Dockerfile + image: fr24-tracks-fa + container_name: fr24-tracks-fa + environment: + POSTGRES_HOST: ${POSTGRES_HOST:-postgres} + POSTGRES_PORT: ${POSTGRES_PORT:-5432} + POSTGRES_DB: ${POSTGRES_DB:-fr24} + POSTGRES_USER: ${POSTGRES_USER:-fr24} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change-me} + FLIGHTAWARE_API_KEY: ${FLIGHTAWARE_API_KEY:-} + FA_RATE_LIMIT_SEC: ${FA_RATE_LIMIT_SEC:-2.0} + TZ: ${TZ:-UTC} + ports: + - "${FA_TRACKS_PORT:-8002}:8002" + depends_on: + postgres: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "python -c 'import urllib.request; urllib.request.urlopen(\"http://localhost:8002/health\", timeout=3)' || exit 1"] + interval: 60s + timeout: 10s + retries: 3 + start_period: 20s + restart: unless-stopped + networks: + - fr24-net + + fr24-mart: + build: + context: ../ingest/mart + dockerfile: Dockerfile + image: fr24-mart + container_name: fr24-mart + environment: + POSTGRES_HOST: ${POSTGRES_HOST:-postgres} + POSTGRES_PORT: ${POSTGRES_PORT:-5432} + POSTGRES_DB: ${POSTGRES_DB:-fr24} + POSTGRES_USER: ${POSTGRES_USER:-fr24} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change-me} + MART_BUILD_INTERVAL_MINUTES: ${MART_BUILD_INTERVAL_MINUTES:-60} + TZ: ${TZ:-UTC} + depends_on: + postgres: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "test -f /tmp/ready || exit 1"] + interval: 60s + timeout: 10s + retries: 3 + start_period: 30s + restart: unless-stopped + networks: + - fr24-net + networks: fr24-net: name: fr24-net diff --git a/tasks/flightradar24/db/init/005_schema_tracks.sql b/tasks/flightradar24/db/init/005_schema_tracks.sql new file mode 100644 index 0000000..db1c858 --- /dev/null +++ b/tasks/flightradar24/db/init/005_schema_tracks.sql @@ -0,0 +1,151 @@ +-- ============================================================ +-- Phase 2 Step 2: Track tables + fr24_mart schema +-- ============================================================ + +-- ── fr24_ext: FR24 API tracks ──────────────────────────────── + +CREATE TABLE IF NOT EXISTS fr24_ext.flight_tracks_fr24 ( + id BIGSERIAL PRIMARY KEY, + fr24_id VARCHAR(20) NOT NULL UNIQUE, + flight_number VARCHAR(20), + callsign VARCHAR(20), + aircraft_type VARCHAR(10), + registration VARCHAR(15), + origin_icao VARCHAR(5), + destination_icao VARCHAR(5), + actual_takeoff TIMESTAMPTZ, + actual_landed TIMESTAMPTZ, + flight_date DATE NOT NULL, + fetched_at TIMESTAMPTZ DEFAULT now() +); +CREATE INDEX IF NOT EXISTS idx_flight_tracks_fr24_date ON fr24_ext.flight_tracks_fr24 (flight_date); +CREATE INDEX IF NOT EXISTS idx_flight_tracks_fr24_flight ON fr24_ext.flight_tracks_fr24 (flight_number, flight_date); + +CREATE TABLE IF NOT EXISTS fr24_ext.track_points_fr24 ( + id BIGSERIAL PRIMARY KEY, + track_id BIGINT NOT NULL REFERENCES fr24_ext.flight_tracks_fr24(id) ON DELETE CASCADE, + observed_at TIMESTAMPTZ NOT NULL, + lat DOUBLE PRECISION NOT NULL, + lon DOUBLE PRECISION NOT NULL, + altitude_ft INTEGER, + gspeed_kt INTEGER, + vspeed_fpm INTEGER, + heading SMALLINT, + squawk VARCHAR(5), + source VARCHAR(10) +); +CREATE INDEX IF NOT EXISTS idx_track_points_fr24_track ON fr24_ext.track_points_fr24 (track_id, observed_at); + +-- ── fr24_ext: FlightAware tracks ───────────────────────────── + +CREATE TABLE IF NOT EXISTS fr24_ext.flight_tracks_fa ( + id BIGSERIAL PRIMARY KEY, + fa_flight_id VARCHAR(50) NOT NULL UNIQUE, + ident_iata VARCHAR(10), + ident_icao VARCHAR(10), + registration VARCHAR(15), + aircraft_type VARCHAR(10), + origin_icao VARCHAR(5), + destination_icao VARCHAR(5), + actual_off TIMESTAMPTZ, + actual_on TIMESTAMPTZ, + departure_delay INTEGER, -- seconds + arrival_delay INTEGER, -- seconds + actual_distance INTEGER, -- nautical miles + flight_date DATE NOT NULL, + fetched_at TIMESTAMPTZ DEFAULT now() +); +CREATE INDEX IF NOT EXISTS idx_flight_tracks_fa_date ON fr24_ext.flight_tracks_fa (flight_date); +CREATE INDEX IF NOT EXISTS idx_flight_tracks_fa_ident ON fr24_ext.flight_tracks_fa (ident_iata, flight_date); + +CREATE TABLE IF NOT EXISTS fr24_ext.track_points_fa ( + id BIGSERIAL PRIMARY KEY, + track_id BIGINT NOT NULL REFERENCES fr24_ext.flight_tracks_fa(id) ON DELETE CASCADE, + observed_at TIMESTAMPTZ NOT NULL, + lat DOUBLE PRECISION NOT NULL, + lon DOUBLE PRECISION NOT NULL, + altitude_ft INTEGER, -- already converted: hundreds_of_feet * 100 + gspeed_kt INTEGER, + heading SMALLINT, + update_type VARCHAR(5) -- M=ADS-B, D=dead reckoning +); +CREATE INDEX IF NOT EXISTS idx_track_points_fa_track ON fr24_ext.track_points_fa (track_id, observed_at); + +-- ── fr24_mart schema ───────────────────────────────────────── + +CREATE SCHEMA IF NOT EXISTS fr24_mart; + +-- Unified flights table +CREATE TABLE IF NOT EXISTS fr24_mart.flights ( + id BIGSERIAL PRIMARY KEY, + flight_number VARCHAR(20), + callsign VARCHAR(20), + icao24 CHAR(6), + airline_iata VARCHAR(5), + origin_iata VARCHAR(5), + destination_iata VARCHAR(5), + aircraft_type VARCHAR(50), + flight_date DATE NOT NULL, + scheduled_dep TIMESTAMPTZ, + actual_dep TIMESTAMPTZ, + actual_arr TIMESTAMPTZ, + duration_min INTEGER, + -- source flags + has_schedule BOOLEAN DEFAULT false, + has_rtlsdr BOOLEAN DEFAULT false, + has_fr24 BOOLEAN DEFAULT false, + has_fa BOOLEAN DEFAULT false, + track_source VARCHAR(10), -- 'rtlsdr'|'fr24'|'fa'|null + track_points INTEGER, + -- source keys + schedule_id BIGINT, + fr24_track_id BIGINT, + fa_track_id BIGINT, + rtlsdr_flight_id BIGINT, + updated_at TIMESTAMPTZ DEFAULT now() +); +CREATE UNIQUE INDEX IF NOT EXISTS idx_mart_flights_date_callsign ON fr24_mart.flights (flight_date, callsign); +CREATE INDEX IF NOT EXISTS idx_mart_flights_date ON fr24_mart.flights (flight_date); + +-- Unified track points (best source) +CREATE TABLE IF NOT EXISTS fr24_mart.track_points ( + id BIGSERIAL PRIMARY KEY, + flight_id BIGINT NOT NULL REFERENCES fr24_mart.flights(id) ON DELETE CASCADE, + observed_at TIMESTAMPTZ NOT NULL, + lat DOUBLE PRECISION NOT NULL, + lon DOUBLE PRECISION NOT NULL, + altitude_m INTEGER, + speed_kt INTEGER, + heading SMALLINT, + source VARCHAR(10) +); +CREATE INDEX IF NOT EXISTS idx_mart_track_points_flight ON fr24_mart.track_points (flight_id, observed_at); +CREATE INDEX IF NOT EXISTS idx_mart_track_points_geo ON fr24_mart.track_points (lat, lon); + +-- Noise grid (0.01° cells) +CREATE TABLE IF NOT EXISTS fr24_mart.noise_grid ( + id BIGSERIAL PRIMARY KEY, + grid_lat NUMERIC(7,4) NOT NULL, + grid_lon NUMERIC(7,4) NOT NULL, + period_date DATE NOT NULL, + flight_count INTEGER DEFAULT 0, + noise_score FLOAT DEFAULT 0, + avg_altitude_m FLOAT, + updated_at TIMESTAMPTZ DEFAULT now(), + UNIQUE (grid_lat, grid_lon, period_date) +); +CREATE INDEX IF NOT EXISTS idx_noise_grid_date ON fr24_mart.noise_grid (period_date); + +-- Source coverage metrics +CREATE TABLE IF NOT EXISTS fr24_mart.source_coverage ( + coverage_date DATE PRIMARY KEY, + total_schedule INTEGER DEFAULT 0, + with_rtlsdr INTEGER DEFAULT 0, + with_fr24 INTEGER DEFAULT 0, + with_fa INTEGER DEFAULT 0, + schedule_only INTEGER DEFAULT 0, + rtlsdr_pct FLOAT, + fr24_pct FLOAT, + fa_pct FLOAT, + updated_at TIMESTAMPTZ DEFAULT now() +); diff --git a/tasks/flightradar24/frontend/main.py b/tasks/flightradar24/frontend/main.py index 1e29eb1..e6ddfb2 100644 --- a/tasks/flightradar24/frontend/main.py +++ b/tasks/flightradar24/frontend/main.py @@ -639,6 +639,163 @@ def tracks(): return err(str(e)) +# ── data-sources page & API ───────────────────────────────────────────────── + +@app.get("/data-sources") +def data_sources_page(): + return send_from_directory("/app/static", "data_sources.html") + + +def _ds_where(args): + clauses, params = [], [] + if args.get("date_from"): + clauses.append("flight_date >= %s"); params.append(args["date_from"]) + if args.get("date_to"): + clauses.append("flight_date <= %s"); params.append(args["date_to"]) + return (" AND ".join(clauses) if clauses else "1=1"), params + + +@app.get("/api/data-sources/coverage") +def ds_coverage(): + try: + where, params = _ds_where(request.args) + rows = query( + f""" + SELECT + coverage_date, + total_schedule, + with_rtlsdr, with_fr24, with_fa, schedule_only, + rtlsdr_pct, fr24_pct, fa_pct + FROM fr24_mart.source_coverage + WHERE {where} + ORDER BY coverage_date + """, + params, + ) + totals = query_one( + f""" + SELECT + SUM(total_schedule) AS total_schedule, + SUM(with_rtlsdr) AS with_rtlsdr, + SUM(with_fr24) AS with_fr24, + SUM(with_fa) AS with_fa, + COUNT(*) AS days + FROM fr24_mart.source_coverage + WHERE {where} + """, + params, + ) + return ok({"rows": rows, "totals": totals or {}}) + except Exception as e: + return err(str(e)) + + +@app.get("/api/data-sources/quality") +def ds_quality(): + try: + where, params = _ds_where(request.args) + q = query_one( + f""" + SELECT + COUNT(*) AS total, + COUNT(*) FILTER (WHERE origin_iata IS NOT NULL + AND destination_iata IS NOT NULL) AS with_route, + COUNT(*) FILTER (WHERE track_source IS NOT NULL) AS with_track, + COUNT(*) FILTER (WHERE actual_dep IS NOT NULL + OR actual_arr IS NOT NULL) AS with_actual_time, + COUNT(*) FILTER (WHERE aircraft_type IS NOT NULL) AS with_aircraft_type + FROM fr24_mart.flights + WHERE {where} + """, + params, + ) + # median points per source + def median_pts(source): + r = query_one( + f""" + SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY track_points) AS med + FROM fr24_mart.flights + WHERE {where} AND track_source = %s AND track_points > 0 + """, + params + [source], + ) + v = r["med"] if r else None + return int(v) if v is not None else None + + quality = dict(q or {}) + quality["median_points_rtlsdr"] = median_pts("rtlsdr") + quality["median_points_fr24"] = median_pts("fr24") + quality["median_points_fa"] = median_pts("fa") + return ok({"quality": quality}) + except Exception as e: + return err(str(e)) + + +@app.get("/api/data-sources/top-airlines") +def ds_top_airlines(): + try: + where, params = _ds_where(request.args) + rows = query( + f""" + SELECT airline_iata, COUNT(*) AS flight_count + FROM fr24_mart.flights + WHERE {where} AND airline_iata IS NOT NULL + GROUP BY airline_iata + ORDER BY flight_count DESC + LIMIT 20 + """, + params, + ) + return ok({"rows": rows}) + except Exception as e: + return err(str(e)) + + +@app.get("/api/data-sources/top-routes") +def ds_top_routes(): + try: + where, params = _ds_where(request.args) + rows = query( + f""" + SELECT origin_iata, destination_iata, COUNT(*) AS flight_count + FROM fr24_mart.flights + WHERE {where} + AND origin_iata IS NOT NULL + AND destination_iata IS NOT NULL + GROUP BY origin_iata, destination_iata + ORDER BY flight_count DESC + LIMIT 20 + """, + params, + ) + return ok({"rows": rows}) + except Exception as e: + return err(str(e)) + + +@app.get("/api/data-sources/airport-load") +def ds_airport_load(): + try: + where, params = _ds_where(request.args) + rows = query( + f""" + SELECT + airport_iata, + EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC')::int AS hour, + COUNT(*) AS flight_count + FROM fr24_ext.schedule + WHERE {where} + AND airport_iata IN ('SVO','DME','VKO','ZIA') + GROUP BY airport_iata, hour + ORDER BY airport_iata, hour + """, + params, + ) + return ok({"rows": rows}) + except Exception as e: + return err(str(e)) + + # ── startup ─────────────────────────────────────────────────────────────────── def wait_for_db(max_attempts: int = 30): diff --git a/tasks/flightradar24/frontend/static/data_sources.html b/tasks/flightradar24/frontend/static/data_sources.html new file mode 100644 index 0000000..fe330e5 --- /dev/null +++ b/tasks/flightradar24/frontend/static/data_sources.html @@ -0,0 +1,123 @@ + + + + + + Data Sources — FR24 + + + + + +
+

Источники данных

+

Покрытие треков, качество данных и статистика по аэропортам

+ +
+ + + +
+ + +
+
Загрузка...
+
+ + +
+

Покрытие по дням

+
+
Загрузка...
+
+
+ RTL-SDR + FR24 + FlightAware + Только расписание +
+
+ + +
+
+

Качество данных

+
Загрузка...
+
+
+

Загрузка аэропортов по часам

+
Загрузка...
+
+
+ + +
+
+

Топ авиакомпаний

+
Загрузка...
+
+
+

Топ маршрутов

+
Загрузка...
+
+
+
+ + + + diff --git a/tasks/flightradar24/frontend/static/data_sources.js b/tasks/flightradar24/frontend/static/data_sources.js new file mode 100644 index 0000000..3cbec7b --- /dev/null +++ b/tasks/flightradar24/frontend/static/data_sources.js @@ -0,0 +1,195 @@ +// data_sources.js — logic for /data-sources page + +function getDateRange() { + const to = document.getElementById('date_to').value; + const from = document.getElementById('date_from').value; + return { date_from: from, date_to: to }; +} + +function qs(params) { + return Object.entries(params).filter(([, v]) => v).map(([k, v]) => `${k}=${encodeURIComponent(v)}`).join('&'); +} + +async function fetchJSON(url) { + const r = await fetch(url); + if (!r.ok) throw new Error(`HTTP ${r.status}`); + return r.json(); +} + +function pct(n, total) { + if (!total) return '—'; + return (100 * n / total).toFixed(1) + '%'; +} + +// ── Coverage cards ──────────────────────────────────────────── + +async function loadCoverage() { + const el = document.getElementById('coverage-cards'); + const chart = document.getElementById('coverage-chart'); + try { + const data = await fetchJSON('/api/data-sources/coverage?' + qs(getDateRange())); + const rows = data.rows || []; + const totals = data.totals || {}; + + // Summary cards + const total = totals.total_schedule || 0; + el.innerHTML = ` +
+

Расписание Яндекс

+
Всего рейсов${total.toLocaleString()}
+
Дней${totals.days || 0}
+
+
+

RTL-SDR Локальный

+
Рейсов с треком${(totals.with_rtlsdr||0).toLocaleString()}${pct(totals.with_rtlsdr, total)}
+
+
+
+

FR24 API Платный

+
Рейсов с треком${(totals.with_fr24||0).toLocaleString()}${pct(totals.with_fr24, total)}
+
+
+
+

FlightAware AeroAPI

+
Рейсов с треком${(totals.with_fa||0).toLocaleString()}${pct(totals.with_fa, total)}
+
+
+ `; + + // Stacked bar chart by day + if (!rows.length) { chart.innerHTML = '
Нет данных
'; return; } + const maxTotal = Math.max(...rows.map(r => r.total_schedule || 0), 1); + chart.innerHTML = rows.map(r => { + const t = r.total_schedule || 1; + const wRtl = ((r.with_rtlsdr || 0) / t * 100).toFixed(1); + const wFr = ((r.with_fr24 || 0) / t * 100).toFixed(1); + const wFa = ((r.with_fa || 0) / t * 100).toFixed(1); + const wSch = (100 - parseFloat(wRtl) - parseFloat(wFr) - parseFloat(wFa)).toFixed(1); + return `
+ ${r.coverage_date} +
+ + + + +
+ ${t} +
`; + }).join(''); + } catch (e) { + el.innerHTML = `
Ошибка: ${e.message}
`; + } +} + +// ── Quality ─────────────────────────────────────────────────── + +async function loadQuality() { + const el = document.getElementById('quality-table'); + try { + const data = await fetchJSON('/api/data-sources/quality?' + qs(getDateRange())); + const q = data.quality || {}; + el.innerHTML = ` + + + + + + + + +
МетрикаЗначение
Рейсов с маршрутом${pct(q.with_route, q.total)}
Рейсов с треком${pct(q.with_track, q.total)}
Рейсов с факт. временем${pct(q.with_actual_time, q.total)}
Рейсов с типом ВС${pct(q.with_aircraft_type, q.total)}
Медиана точек (RTL-SDR)${q.median_points_rtlsdr || '—'}
Медиана точек (FR24)${q.median_points_fr24 || '—'}
Медиана точек (FA)${q.median_points_fa || '—'}
`; + } catch (e) { + el.innerHTML = `
Ошибка: ${e.message}
`; + } +} + +// ── Airport load ────────────────────────────────────────────── + +async function loadAirportLoad() { + const el = document.getElementById('airport-load'); + try { + const data = await fetchJSON('/api/data-sources/airport-load?' + qs(getDateRange())); + const rows = data.rows || []; + if (!rows.length) { el.innerHTML = '
Нет данных
'; return; } + const maxCount = Math.max(...rows.map(r => r.flight_count || 0), 1); + el.innerHTML = ` + + ${rows.map(r => ` + + + + + `).join('')} +
АэропортЧас (UTC)Рейсов
${r.airport_iata}${String(r.hour).padStart(2,'0')}:00${r.flight_count}
`; + } catch (e) { + el.innerHTML = `
Ошибка: ${e.message}
`; + } +} + +// ── Top airlines ────────────────────────────────────────────── + +async function loadTopAirlines() { + const el = document.getElementById('top-airlines'); + try { + const data = await fetchJSON('/api/data-sources/top-airlines?' + qs(getDateRange())); + const rows = data.rows || []; + if (!rows.length) { el.innerHTML = '
Нет данных
'; return; } + const max = rows[0].flight_count || 1; + el.innerHTML = ` + + ${rows.map((r, i) => ` + + + + + `).join('')} +
#АвиакомпанияРейсов
${i+1}${r.airline_iata || '—'}${r.flight_count}
`; + } catch (e) { + el.innerHTML = `
Ошибка: ${e.message}
`; + } +} + +// ── Top routes ──────────────────────────────────────────────── + +async function loadTopRoutes() { + const el = document.getElementById('top-routes'); + try { + const data = await fetchJSON('/api/data-sources/top-routes?' + qs(getDateRange())); + const rows = data.rows || []; + if (!rows.length) { el.innerHTML = '
Нет данных
'; return; } + const max = rows[0].flight_count || 1; + el.innerHTML = ` + + ${rows.map((r, i) => ` + + + + + `).join('')} +
#МаршрутРейсов
${i+1}${r.origin_iata || '?'} → ${r.destination_iata || '?'}${r.flight_count}
`; + } catch (e) { + el.innerHTML = `
Ошибка: ${e.message}
`; + } +} + +// ── Init ────────────────────────────────────────────────────── + +function loadAll() { + loadCoverage(); + loadQuality(); + loadAirportLoad(); + loadTopAirlines(); + loadTopRoutes(); +} + +// Set default date range: last 7 days +(function initDates() { + const today = new Date(); + const to = today.toISOString().slice(0, 10); + today.setDate(today.getDate() - 7); + const from = today.toISOString().slice(0, 10); + document.getElementById('date_from').value = from; + document.getElementById('date_to').value = to; +})(); + +loadAll(); diff --git a/tasks/flightradar24/ingest/mart/Dockerfile b/tasks/flightradar24/ingest/mart/Dockerfile new file mode 100644 index 0000000..dc3c137 --- /dev/null +++ b/tasks/flightradar24/ingest/mart/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY . . +CMD ["python", "main.py"] diff --git a/tasks/flightradar24/ingest/mart/build_mart.py b/tasks/flightradar24/ingest/mart/build_mart.py new file mode 100644 index 0000000..fc54145 --- /dev/null +++ b/tasks/flightradar24/ingest/mart/build_mart.py @@ -0,0 +1,419 @@ +""" +Mart builder: merges all track sources into fr24_mart. +Priority: RTL-SDR > FR24 > FlightAware + +For each flight in fr24_ext.schedule: + 1. Find matching tracks from each source + 2. Pick best available track + 3. Copy points to fr24_mart.track_points with noise_score + 4. Update fr24_mart.noise_grid (0.01° cells) + 5. Update fr24_mart.source_coverage +""" +import logging +from datetime import date +from typing import Dict, List, Optional, Tuple + +import psycopg2 +import psycopg2.extras + +from noise_model import altitude_to_noise_db + +log = logging.getLogger("build_mart") + +# ft → m conversion +FT_TO_M = 0.3048 + + +def _ft_to_m(ft: Optional[int]) -> Optional[int]: + if ft is None: + return None + return int(ft * FT_TO_M) + + +# ── source matchers ─────────────────────────────────────────── + +def find_rtlsdr_flight(conn, callsign: str, flight_date: date) -> Optional[int]: + """Return fr24.flights.flight_id for RTL-SDR data.""" + with conn.cursor() as cur: + cur.execute( + """ + SELECT f.flight_id FROM fr24.flights f + WHERE f.callsign = %s + AND f.started_at::date = %s + ORDER BY f.started_at + LIMIT 1 + """, + (callsign, flight_date), + ) + row = cur.fetchone() + return row[0] if row else None + + +def find_fr24_track(conn, flight_number: str, flight_date: date) -> Optional[Tuple[int, str]]: + """Return (id, aircraft_type) from fr24_ext.flight_tracks_fr24.""" + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, aircraft_type FROM fr24_ext.flight_tracks_fr24 + WHERE flight_number = %s AND flight_date = %s + ORDER BY fetched_at DESC + LIMIT 1 + """, + (flight_number, flight_date), + ) + row = cur.fetchone() + return (row[0], row[1]) if row else None + + +def find_fa_track(conn, flight_number: str, flight_date: date) -> Optional[Tuple[int, str]]: + """Return (id, aircraft_type) from fr24_ext.flight_tracks_fa.""" + ident = flight_number.replace(" ", "") + with conn.cursor() as cur: + cur.execute( + """ + SELECT id, aircraft_type FROM fr24_ext.flight_tracks_fa + WHERE ident_iata = %s AND flight_date = %s + ORDER BY fetched_at DESC + LIMIT 1 + """, + (ident, flight_date), + ) + row = cur.fetchone() + return (row[0], row[1]) if row else None + + +# ── point fetchers ──────────────────────────────────────────── + +def get_rtlsdr_points(conn, flight_id: int) -> List[Dict]: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + """ + SELECT tp.observed_at, + ST_Y(tp.geom) AS lat, + ST_X(tp.geom) AS lon, + tp.altitude_m, + tp.ground_speed_kt AS speed_kt, + tp.heading_deg AS heading + FROM fr24.track_points tp + JOIN fr24.tracks t ON t.track_id = tp.track_id + WHERE t.flight_id = %s + ORDER BY tp.observed_at + """, + (flight_id,), + ) + return [dict(r) for r in cur.fetchall()] + + +def get_fr24_points(conn, track_id: int) -> List[Dict]: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + """ + SELECT observed_at, lat, lon, + altitude_ft, gspeed_kt AS speed_kt, heading + FROM fr24_ext.track_points_fr24 + WHERE track_id = %s + ORDER BY observed_at + """, + (track_id,), + ) + rows = [dict(r) for r in cur.fetchall()] + # convert ft → m + for r in rows: + r["altitude_m"] = _ft_to_m(r.pop("altitude_ft", None)) + return rows + + +def get_fa_points(conn, track_id: int) -> List[Dict]: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + """ + SELECT observed_at, lat, lon, + altitude_ft, gspeed_kt AS speed_kt, heading + FROM fr24_ext.track_points_fa + WHERE track_id = %s + ORDER BY observed_at + """, + (track_id,), + ) + rows = [dict(r) for r in cur.fetchall()] + for r in rows: + r["altitude_m"] = _ft_to_m(r.pop("altitude_ft", None)) + return rows + + +# ── mart writers ────────────────────────────────────────────── + +def upsert_mart_flight(conn, sched: Dict, source_info: Dict) -> int: + """Upsert into fr24_mart.flights, return mart flight id.""" + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24_mart.flights + (flight_number, callsign, icao24, airline_iata, + origin_iata, destination_iata, aircraft_type, + flight_date, scheduled_dep, + has_schedule, has_rtlsdr, has_fr24, has_fa, + track_source, track_points, + schedule_id, fr24_track_id, fa_track_id, rtlsdr_flight_id, + updated_at) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,now()) + ON CONFLICT (flight_date, callsign) DO UPDATE SET + flight_number = EXCLUDED.flight_number, + airline_iata = EXCLUDED.airline_iata, + origin_iata = EXCLUDED.origin_iata, + destination_iata = EXCLUDED.destination_iata, + aircraft_type = COALESCE(EXCLUDED.aircraft_type, fr24_mart.flights.aircraft_type), + scheduled_dep = EXCLUDED.scheduled_dep, + has_schedule = EXCLUDED.has_schedule, + has_rtlsdr = EXCLUDED.has_rtlsdr, + has_fr24 = EXCLUDED.has_fr24, + has_fa = EXCLUDED.has_fa, + track_source = EXCLUDED.track_source, + track_points = EXCLUDED.track_points, + schedule_id = EXCLUDED.schedule_id, + fr24_track_id = EXCLUDED.fr24_track_id, + fa_track_id = EXCLUDED.fa_track_id, + rtlsdr_flight_id = EXCLUDED.rtlsdr_flight_id, + updated_at = now() + RETURNING id + """, + ( + sched.get("flight_number"), + sched.get("callsign") or sched.get("flight_number"), + sched.get("icao24"), + sched.get("airline_iata"), + sched.get("origin_iata"), + sched.get("destination_iata"), + source_info.get("aircraft_type"), + sched["flight_date"], + sched.get("scheduled_at"), + True, + source_info.get("has_rtlsdr", False), + source_info.get("has_fr24", False), + source_info.get("has_fa", False), + source_info.get("track_source"), + source_info.get("track_points", 0), + sched.get("id"), + source_info.get("fr24_track_id"), + source_info.get("fa_track_id"), + source_info.get("rtlsdr_flight_id"), + ), + ) + row = cur.fetchone() + return row[0] + + +def insert_mart_points(conn, mart_flight_id: int, points: List[Dict], + source: str, aircraft_type: str): + """Delete old mart points and insert new ones with noise_score.""" + with conn.cursor() as cur: + cur.execute("DELETE FROM fr24_mart.track_points WHERE flight_id = %s", (mart_flight_id,)) + if not points: + return + + args = [] + for p in points: + alt_m = p.get("altitude_m") or 0 + alt_ft = alt_m / FT_TO_M + noise = altitude_to_noise_db(alt_ft, aircraft_type or "default") + args.append(( + mart_flight_id, + p["observed_at"], + p["lat"], + p["lon"], + alt_m, + p.get("speed_kt"), + p.get("heading"), + source, + round(noise, 2), + )) + + psycopg2.extras.execute_values( + cur, + """ + INSERT INTO fr24_mart.track_points + (flight_id, observed_at, lat, lon, altitude_m, + speed_kt, heading, source, noise_score) + VALUES %s + """, + args, + ) + + +def update_noise_grid(conn, flight_date: date): + """Aggregate track_points into noise_grid by 0.01° cells.""" + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24_mart.noise_grid + (grid_lat, grid_lon, period_date, flight_count, noise_score, avg_altitude_m, updated_at) + SELECT + round(lat::numeric, 2) AS grid_lat, + round(lon::numeric, 2) AS grid_lon, + %s AS period_date, + COUNT(DISTINCT flight_id) AS flight_count, + AVG(noise_score) AS noise_score, + AVG(altitude_m) AS avg_altitude_m, + now() + FROM fr24_mart.track_points tp + JOIN fr24_mart.flights f ON f.id = tp.flight_id + WHERE f.flight_date = %s + GROUP BY grid_lat, grid_lon + ON CONFLICT (grid_lat, grid_lon, period_date) DO UPDATE SET + flight_count = EXCLUDED.flight_count, + noise_score = EXCLUDED.noise_score, + avg_altitude_m = EXCLUDED.avg_altitude_m, + updated_at = now() + """, + (flight_date, flight_date), + ) + + +def update_source_coverage(conn, flight_date: date): + """Recalculate source_coverage for the date.""" + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24_mart.source_coverage + (coverage_date, total_schedule, with_rtlsdr, with_fr24, with_fa, + schedule_only, rtlsdr_pct, fr24_pct, fa_pct, updated_at) + SELECT + %s, + COUNT(*) AS total_schedule, + COUNT(*) FILTER (WHERE has_rtlsdr) AS with_rtlsdr, + COUNT(*) FILTER (WHERE has_fr24) AS with_fr24, + COUNT(*) FILTER (WHERE has_fa) AS with_fa, + COUNT(*) FILTER (WHERE NOT has_rtlsdr AND NOT has_fr24 AND NOT has_fa) AS schedule_only, + ROUND(100.0 * COUNT(*) FILTER (WHERE has_rtlsdr) / NULLIF(COUNT(*),0), 1), + ROUND(100.0 * COUNT(*) FILTER (WHERE has_fr24) / NULLIF(COUNT(*),0), 1), + ROUND(100.0 * COUNT(*) FILTER (WHERE has_fa) / NULLIF(COUNT(*),0), 1), + now() + FROM fr24_mart.flights + WHERE flight_date = %s + ON CONFLICT (coverage_date) DO UPDATE SET + total_schedule = EXCLUDED.total_schedule, + with_rtlsdr = EXCLUDED.with_rtlsdr, + with_fr24 = EXCLUDED.with_fr24, + with_fa = EXCLUDED.with_fa, + schedule_only = EXCLUDED.schedule_only, + rtlsdr_pct = EXCLUDED.rtlsdr_pct, + fr24_pct = EXCLUDED.fr24_pct, + fa_pct = EXCLUDED.fa_pct, + updated_at = now() + """, + (flight_date, flight_date), + ) + + +# ── main ────────────────────────────────────────────────────── + +def build(target_date: date, conn) -> Dict: + log.info("Mart build: starting for %s", target_date) + stats = { + "date": str(target_date), + "schedule_flights": 0, + "mart_flights": 0, + "with_track": 0, + "errors": 0, + } + + # Load schedule for the date + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + """ + SELECT DISTINCT ON (flight_number, direction) + id, flight_number, airline_iata, origin_iata, destination_iata, + scheduled_at, icao24, flight_date, + -- use callsign from icao24 if available, else flight_number + COALESCE(icao24, flight_number) AS callsign + FROM fr24_ext.schedule + WHERE flight_date = %s + ORDER BY flight_number, direction, scheduled_at + """, + (target_date,), + ) + schedule = [dict(r) for r in cur.fetchall()] + + stats["schedule_flights"] = len(schedule) + log.info("Mart build: %d schedule flights", len(schedule)) + + for sched in schedule: + flight_number = sched["flight_number"] + callsign = sched.get("callsign") or flight_number + + try: + source_info: Dict = { + "has_rtlsdr": False, "has_fr24": False, "has_fa": False, + "track_source": None, "track_points": 0, + "aircraft_type": None, + "fr24_track_id": None, "fa_track_id": None, "rtlsdr_flight_id": None, + } + + points: List[Dict] = [] + source_label = None + + # 1. Try RTL-SDR + rtlsdr_id = find_rtlsdr_flight(conn, callsign, target_date) + if rtlsdr_id: + source_info["has_rtlsdr"] = True + source_info["rtlsdr_flight_id"] = rtlsdr_id + pts = get_rtlsdr_points(conn, rtlsdr_id) + if pts: + points = pts + source_label = "rtlsdr" + + # 2. Try FR24 + fr24_result = find_fr24_track(conn, flight_number, target_date) + if fr24_result: + source_info["has_fr24"] = True + source_info["fr24_track_id"] = fr24_result[0] + if not points: + pts = get_fr24_points(conn, fr24_result[0]) + if pts: + points = pts + source_label = "fr24" + source_info["aircraft_type"] = fr24_result[1] + + # 3. Try FlightAware + fa_result = find_fa_track(conn, flight_number, target_date) + if fa_result: + source_info["has_fa"] = True + source_info["fa_track_id"] = fa_result[0] + if not points: + pts = get_fa_points(conn, fa_result[0]) + if pts: + points = pts + source_label = "fa" + source_info["aircraft_type"] = fa_result[1] + + source_info["track_source"] = source_label + source_info["track_points"] = len(points) + + mart_id = upsert_mart_flight(conn, sched, source_info) + + if points: + insert_mart_points( + conn, mart_id, points, source_label, + source_info.get("aircraft_type") or "default", + ) + stats["with_track"] += 1 + + stats["mart_flights"] += 1 + + except Exception as e: + conn.rollback() + stats["errors"] += 1 + log.error("Mart: error processing %s: %s", flight_number, e) + continue + + try: + update_noise_grid(conn, target_date) + update_source_coverage(conn, target_date) + conn.commit() + except Exception as e: + conn.rollback() + log.error("Mart: error updating grid/coverage: %s", e) + stats["errors"] += 1 + + log.info("Mart build done: %s", stats) + return stats diff --git a/tasks/flightradar24/ingest/mart/config.py b/tasks/flightradar24/ingest/mart/config.py new file mode 100644 index 0000000..de040c7 --- /dev/null +++ b/tasks/flightradar24/ingest/mart/config.py @@ -0,0 +1,26 @@ +import os +from dataclasses import dataclass + + +@dataclass +class Config: + # Database + DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres") + DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432")) + DB_NAME: str = os.getenv("POSTGRES_DB", "fr24") + DB_USER: str = os.getenv("POSTGRES_USER", "fr24") + DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", "change-me") + + # Scheduler + BUILD_INTERVAL_MINUTES: int = int(os.getenv("MART_BUILD_INTERVAL_MINUTES", "60")) + + @property + def DB_DSN(self) -> str: + return ( + f"host={self.DB_HOST} port={self.DB_PORT} " + f"dbname={self.DB_NAME} user={self.DB_USER} " + f"password={self.DB_PASSWORD}" + ) + + +config = Config() diff --git a/tasks/flightradar24/ingest/mart/main.py b/tasks/flightradar24/ingest/mart/main.py new file mode 100644 index 0000000..b18d4dd --- /dev/null +++ b/tasks/flightradar24/ingest/mart/main.py @@ -0,0 +1,120 @@ +""" +fr24-mart service. +- GET /health — healthcheck +- POST /run?date=YYYY-MM-DD — manual trigger +- APScheduler: runs build every hour for yesterday +""" +import logging +import sys +import time +from datetime import date, timedelta, datetime, timezone + +import psycopg2 +import psycopg2.extras +from apscheduler.schedulers.background import BackgroundScheduler +from flask import Flask, jsonify, request + +from config import config +from build_mart import build + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [mart] %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +log = logging.getLogger("mart") + +app = Flask(__name__) + +_last_run: dict = {"at": None, "status": "never", "stats": {}} +_conn = None + + +def get_conn(): + global _conn + if _conn is None or _conn.closed: + _conn = psycopg2.connect(config.DB_DSN) + psycopg2.extras.register_uuid(_conn) + log.info("DB connection established") + return _conn + + +def wait_for_db(max_attempts: int = 30): + for attempt in range(1, max_attempts + 1): + try: + get_conn() + return + except psycopg2.OperationalError as e: + log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e) + time.sleep(3) + raise SystemExit("Could not connect to DB") + + +def scheduled_build(): + target = date.today() - timedelta(days=1) + log.info("Scheduled mart build for %s", target) + _last_run["at"] = datetime.now(timezone.utc).isoformat() + _last_run["status"] = "running" + try: + stats = build(target, get_conn()) + _last_run.update(status="ok", stats=stats) + except Exception as e: + _last_run["status"] = f"error: {e}" + log.error("Scheduled build failed: %s", e) + + +@app.get("/health") +def health(): + try: + get_conn().cursor().execute("SELECT 1") + db_ok = True + except Exception: + db_ok = False + return jsonify({ + "status": "ok" if db_ok else "degraded", + "db": "ok" if db_ok else "error", + "last_run": _last_run, + }), 200 if db_ok else 503 + + +@app.post("/run") +def run_manual(): + date_str = request.args.get("date") + if date_str: + try: + target = date.fromisoformat(date_str) + except ValueError: + return jsonify({"error": "invalid date, use YYYY-MM-DD"}), 400 + else: + target = date.today() - timedelta(days=1) + + if _last_run.get("status") == "running": + return jsonify({"error": "already running"}), 409 + + _last_run["at"] = datetime.now(timezone.utc).isoformat() + _last_run["status"] = "running" + try: + stats = build(target, get_conn()) + _last_run.update(status="ok", stats=stats) + return jsonify({"status": "ok", "stats": stats}) + except Exception as e: + _last_run["status"] = f"error: {e}" + log.error("Manual run failed: %s", e) + return jsonify({"error": str(e)}), 500 + + +if __name__ == "__main__": + wait_for_db() + + scheduler = BackgroundScheduler(timezone="UTC") + scheduler.add_job( + scheduled_build, "interval", + minutes=config.BUILD_INTERVAL_MINUTES, + next_run_time=datetime.now(timezone.utc), + ) + scheduler.start() + log.info("Scheduler started, interval=%d min", config.BUILD_INTERVAL_MINUTES) + + open("/tmp/ready", "w").close() + app.run(host="0.0.0.0", port=8003, debug=False) diff --git a/tasks/flightradar24/ingest/mart/noise_model.py b/tasks/flightradar24/ingest/mart/noise_model.py new file mode 100644 index 0000000..9f9a845 --- /dev/null +++ b/tasks/flightradar24/ingest/mart/noise_model.py @@ -0,0 +1,296 @@ +""" +Модель шумового загрязнения от воздушных судов (v1.1) + +Физическая основа +───────────────── +Шум распространяется сферически. Уровень шума определяется +реальным 3D-расстоянием R (гипотенуза) от самолёта до наблюдателя. + +На карте отображается горизонтальный катет D: + + самолёт ● + |\ + H | \ R ← граница зоны + | \ + земля ●───●─────● наблюдатель + D + + D = √(R² − H²), если H < R, иначе 0 + +Пример (H = 3.5 км): + R=2 км → нет (2² < 3.5²) + R=5 км → D = √(25−12.25) = 3.57 км (круг) + R=7 км → D = 6.06 км, кольцо от 3.57 до 6.06 км + R=11 км → D = 10.43 км, кольцо от 6.06 до 10.43 км + +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +КАЛИБРОВОЧНЫЕ ПАРАМЕТРЫ (редактируй здесь) +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +NOISE_ZONES — три концентрических зоны вдоль траектории. +Каждая зона описывает «рукав» определённой ширины рядом с треком. + +Поля каждой зоны: + id - уникальный идентификатор (используется в JS) + label - отображаемое название в легенде + dist_km - внешняя граница зоны от трека (км) + Зона 0 рисуется от 0 до dist_km[0], + Зона 1 — от dist_km[0] до dist_km[1], и т.д. + color - цвет заливки (hex) + opacity - базовая прозрачность при полной активации (0.0–1.0) + Итоговая прозрачность умножается на altitude_factor + +ALTITUDE_BANDS — как высота влияет на ширину зон. + max_alt_m - верхняя граница диапазона высоты (метры) + width_factor - коэффициент ширины зоны (1.0 = полная, 0.0 = зона исчезает) + Диапазоны проверяются снизу вверх, берётся первый подходящий. + +Пример калибровки: + Если реальные замеры показывают, что на высоте 500м зона 0–2км + слишком широкая — уменьши width_factor для диапазона max_alt_m=900. +""" + +# ── Зоны шума ──────────────────────────────────────────────────── +# +# Физическая модель (теорема Пифагора): +# +# самолёт ● +# |\ +# H | \ R ← гипотенуза = реальное расстояние до наблюдателя +# | \ +# земля ●───────●──────● наблюдатель +# проекция D ← катет = ширина зоны на карте +# +# D = √(R² − H²), если H < R, иначе 0 +# +# Поля зоны: +# R_inner — внутренняя граница сферы (км); для первой зоны = 0 +# R_outer — внешняя граница сферы (км) +# color — цвет заливки (hex) +# opacity — прозрачность (фиксированная, 0.0–1.0) +# +# Таблица соответствия: +# R < 2 км → критический шум 🔴 +# R 2–5 км → сильный шум 🟠 +# R 5–7 км → средний шум 🟡 +# R 7–9 км → низкий шум 🟢 +# R > 9 км → зона не рисуется +# +NOISE_ZONES = [ + { + "id": "zone_critical", + "label": "Критический (R < 2 км)", + "R_inner": 0.0, # км — внутренняя граница сферы + "R_outer": 2.0, # км — внешняя граница сферы + "color": "#FF3333", + "opacity": 0.01, + }, + { + "id": "zone_strong", + "label": "Сильный (R 2–5 км)", + "R_inner": 2.0, + "R_outer": 5.0, + "color": "#FF8800", + "opacity": 0.01, + }, + { + "id": "zone_medium", + "label": "Средний (R 5–7 км)", + "R_inner": 5.0, + "R_outer": 7.0, + "color": "#FFCC00", + "opacity": 0.01, + }, + { + "id": "zone_low", + "label": "Низкий (R 7–9 км)", + "R_inner": 7.0, + "R_outer": 9.0, + "color": "#88DD00", + "opacity": 0.01, + }, +] + +# ALTITUDE_BANDS больше не используется — ширина зоны теперь +# рассчитывается аналитически через теорему Пифагора в calc_horizontal_radius() +ALTITUDE_BANDS = [] # оставлен для обратной совместимости + +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +# КОНЕЦ КАЛИБРОВОЧНЫХ ПАРАМЕТРОВ +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +# Справочные данные: типичные уровни шума у земли (дБ) +# Источник: стандартные авиационные данные +NOISE_AT_GROUND = { + "default": 85, # дефолт для неизвестного типа ВС + "B738": 88, # Boeing 737-800 + "B77W": 90, # Boeing 777-300ER + "A320": 87, # Airbus A320 + "A321": 88, # Airbus A321 + "A333": 89, # Airbus A330-300 + "A359": 86, # Airbus A350-900 + "B763": 89, # Boeing 767-300 + "SU95": 86, # Sukhoi Superjet 100 + "E170": 84, # Embraer 170 + "AT75": 80, # ATR 72 +} + +# Параметры модели +MAX_NOISE_RADIUS_KM = 3.0 # максимальный радиус шумовой зоны (км) на нулевой высоте +MIN_ALTITUDE_FT = 100 # минимальная высота для расчёта (фут) +MAX_ALTITUDE_FT = 40000 # максимальная высота (фут) — шум не слышен выше +NOISE_THRESHOLD_DB = 55 # порог шума (дБ), ниже которого зона не показывается + + +def altitude_to_noise_db(altitude_ft: float, aircraft_type: str = "default") -> float: + """ + Расчёт уровня шума на земле в зависимости от высоты (дБ) + + Формула: L = L0 - 20*log10(h/h0) - α*h + где L0 - шум у земли, h - высота, h0 = 300 ft (опорная высота), α = коэф. затухания + """ + base_noise = NOISE_AT_GROUND.get(aircraft_type, NOISE_AT_GROUND["default"]) + + if altitude_ft <= MIN_ALTITUDE_FT: + return base_noise + + if altitude_ft >= MAX_ALTITUDE_FT: + return 0.0 + + # Геометрическое затухание (обратный квадрат расстояния → 20 log) + import math + h0 = 300 # опорная высота в футах + geometric_attenuation = 20 * math.log10(altitude_ft / h0) + + # Атмосферное поглощение (приблизительно 0.002 дБ/фут) + atmospheric_attenuation = 0.002 * altitude_ft + + noise_db = base_noise - geometric_attenuation - atmospheric_attenuation + return max(0.0, noise_db) + + +def altitude_to_noise_radius_km(altitude_ft: float) -> float: + """ + Расчёт радиуса шумовой зоны (км) на основе высоты + Простая обратно-пропорциональная модель для визуализации + """ + if altitude_ft <= 0: + altitude_ft = 100 + + if altitude_ft >= MAX_ALTITUDE_FT: + return 0.0 + + # Радиус уменьшается с высотой (нелинейно) + radius = MAX_NOISE_RADIUS_KM * (1.0 - (altitude_ft / MAX_ALTITUDE_FT) ** 0.5) + return max(0.0, radius) + + +def altitude_to_color(altitude_ft: float) -> str: + """ + Цветовая кодировка по высоте: + - Красный (0–3000 ft): высокий шум + - Оранжевый (3000–10000 ft): средний шум + - Жёлтый (10000–25000 ft): низкий шум + - Зелёный (25000+ ft): минимальный шум + """ + if altitude_ft < 3000: + return "#FF0000" # красный - критический шум + elif altitude_ft < 10000: + return "#FF6600" # оранжевый - высокий шум + elif altitude_ft < 25000: + return "#FFAA00" # жёлтый - средний шум + else: + return "#00AA44" # зелёный - низкий шум + + +def altitude_to_noise_level(altitude_ft: float) -> str: + """Текстовое описание уровня шума""" + if altitude_ft < 3000: + return "Критический" + elif altitude_ft < 10000: + return "Высокий" + elif altitude_ft < 25000: + return "Средний" + else: + return "Низкий" + + +def calculate_noise_opacity(altitude_ft: float) -> float: + """Прозрачность шумовой зоны (0.1–0.6)""" + if altitude_ft >= MAX_ALTITUDE_FT: + return 0.0 + opacity = 0.6 * (1.0 - altitude_ft / MAX_ALTITUDE_FT) + return max(0.05, min(0.6, opacity)) + + +def calc_horizontal_radius(R_km: float, altitude_m: float) -> float: + """ + Горизонтальный радиус зоны на карте (катет) по теореме Пифагора. + + R_km — радиус сферы шума (км), граница зоны + altitude_m — высота самолёта над землёй (метры) + + Возвращает D в км, или 0 если самолёт выше границы зоны. + """ + import math + H = altitude_m / 1000.0 # переводим в км + if H >= R_km: + return 0.0 + return math.sqrt(max(0.0, R_km**2 - H**2)) + + +def calc_zone_radii_for_point(altitude_m: float) -> list: + """ + Для каждой зоны возвращает (D_inner, D_outer) в км на земле. + Если D_outer == 0 → зона не видна. + Если D_inner == 0 → зона рисуется как круг (без дырки). + """ + result = [] + for zone in NOISE_ZONES: + d_inner = calc_horizontal_radius(zone["R_inner"], altitude_m) if zone["R_inner"] > 0 else 0.0 + d_outer = calc_horizontal_radius(zone["R_outer"], altitude_m) + result.append({ + "id": zone["id"], + "color": zone["color"], + "opacity": zone["opacity"], + "d_inner": round(d_inner, 4), # км, внутренняя граница на карте + "d_outer": round(d_outer, 4), # км, внешняя граница на карте + "visible": d_outer > 0.0, + }) + return result + + +def get_noise_config() -> dict: + """ + Возвращает калибровочные параметры для фронтенда. + Вызывается через /api/noise-config — JS читает конфиг при старте. + """ + return { + "zones": NOISE_ZONES, + "altitude_bands": ALTITUDE_BANDS, + } + + +def get_altitude_width_factor(altitude_m: float) -> float: + """Возвращает коэффициент ширины зоны для данной высоты (метры).""" + for band in ALTITUDE_BANDS: + if altitude_m <= band["max_alt_m"]: + return band["width_factor"] + return 0.0 + + +def process_flight_for_map(flight_data: dict) -> dict: + """ + Обрабатывает данные одного рейса и добавляет шумовые характеристики + """ + altitude = flight_data.get("altitude", 0) or 0 + aircraft_type = flight_data.get("aircraft_type", "default") or "default" + + return { + **flight_data, + "noise_db": round(altitude_to_noise_db(altitude, aircraft_type), 1), + "noise_radius_km": round(altitude_to_noise_radius_km(altitude), 3), + "noise_color": altitude_to_color(altitude), + "noise_level": altitude_to_noise_level(altitude), + "noise_opacity": round(calculate_noise_opacity(altitude), 3), + } diff --git a/tasks/flightradar24/ingest/mart/requirements.txt b/tasks/flightradar24/ingest/mart/requirements.txt new file mode 100644 index 0000000..5ce8c9e --- /dev/null +++ b/tasks/flightradar24/ingest/mart/requirements.txt @@ -0,0 +1,3 @@ +flask==3.0.3 +psycopg2-binary==2.9.9 +apscheduler==3.10.4 diff --git a/tasks/flightradar24/ingest/tracks_fa/Dockerfile b/tasks/flightradar24/ingest/tracks_fa/Dockerfile new file mode 100644 index 0000000..dc3c137 --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fa/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY . . +CMD ["python", "main.py"] diff --git a/tasks/flightradar24/ingest/tracks_fa/config.py b/tasks/flightradar24/ingest/tracks_fa/config.py new file mode 100644 index 0000000..5facc95 --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fa/config.py @@ -0,0 +1,30 @@ +import os +from dataclasses import dataclass + + +@dataclass +class Config: + # Database + DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres") + DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432")) + DB_NAME: str = os.getenv("POSTGRES_DB", "fr24") + DB_USER: str = os.getenv("POSTGRES_USER", "fr24") + DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", "change-me") + + # FlightAware AeroAPI + FA_API_KEY: str = os.getenv("FLIGHTAWARE_API_KEY", "") + FA_API_BASE: str = "https://aeroapi.flightaware.com/aeroapi" + + # Rate limit: conservative for Personal tier (500 req/month) + RATE_LIMIT_SEC: float = float(os.getenv("FA_RATE_LIMIT_SEC", "2.0")) + + @property + def DB_DSN(self) -> str: + return ( + f"host={self.DB_HOST} port={self.DB_PORT} " + f"dbname={self.DB_NAME} user={self.DB_USER} " + f"password={self.DB_PASSWORD}" + ) + + +config = Config() diff --git a/tasks/flightradar24/ingest/tracks_fa/fa_worker.py b/tasks/flightradar24/ingest/tracks_fa/fa_worker.py new file mode 100644 index 0000000..d9517ee --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fa/fa_worker.py @@ -0,0 +1,200 @@ +""" +FlightAware AeroAPI tracks worker. +1. Query fr24_ext.schedule for unique flight numbers on target_date +2. GET /aeroapi/flights/{ident} → get fa_flight_id +3. GET /aeroapi/flights/{fa_flight_id}/track → track points +4. Upsert into fr24_ext.flight_tracks_fa + fr24_ext.track_points_fa +""" +import logging +import time +from datetime import date, datetime, timezone +from typing import List, Dict, Optional + +import requests +import psycopg2 +import psycopg2.extras + +from config import config + +log = logging.getLogger("fa_worker") + +HEADERS = {"x-apikey": config.FA_API_KEY} + +_last_request_at: float = 0.0 + + +def _throttle(): + global _last_request_at + elapsed = time.monotonic() - _last_request_at + if elapsed < config.RATE_LIMIT_SEC: + time.sleep(config.RATE_LIMIT_SEC - elapsed) + _last_request_at = time.monotonic() + + +def _get(path: str, params: dict = None) -> dict: + _throttle() + url = f"{config.FA_API_BASE}{path}" + resp = requests.get(url, headers=HEADERS, params=params, timeout=30) + if resp.status_code == 429: + retry_after = int(resp.headers.get("Retry-After", 60)) + log.warning("Rate limited, sleeping %ds", retry_after) + time.sleep(retry_after) + return _get(path, params) + if resp.status_code == 404: + return {} + resp.raise_for_status() + return resp.json() + + +def get_flights_for_ident(ident: str, target_date: date) -> List[Dict]: + """GET /aeroapi/flights/{ident} filtered to target_date window.""" + start = f"{target_date}T00:00:00Z" + end = f"{target_date}T23:59:59Z" + data = _get(f"/flights/{ident}", params={"start": start, "end": end}) + return data.get("flights", []) + + +def get_track(fa_flight_id: str) -> List[Dict]: + """GET /aeroapi/flights/{fa_flight_id}/track → list of positions.""" + data = _get(f"/flights/{fa_flight_id}/track") + return data.get("positions", []) + + +def get_schedule_flights(conn, target_date: date) -> List[Dict]: + """Return distinct flight_number + airline_iata from schedule for the date.""" + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + """ + SELECT DISTINCT flight_number, airline_iata + FROM fr24_ext.schedule + WHERE flight_date = %s + AND flight_number IS NOT NULL + ORDER BY flight_number + """, + (target_date,), + ) + return [dict(r) for r in cur.fetchall()] + + +def upsert_flight(conn, fa_flight: Dict, target_date: date) -> Optional[int]: + """Insert/update FA flight header, return DB id.""" + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24_ext.flight_tracks_fa + (fa_flight_id, ident_iata, ident_icao, registration, aircraft_type, + origin_icao, destination_icao, actual_off, actual_on, + departure_delay, arrival_delay, actual_distance, flight_date) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (fa_flight_id) DO UPDATE SET + ident_iata = EXCLUDED.ident_iata, + ident_icao = EXCLUDED.ident_icao, + registration = EXCLUDED.registration, + aircraft_type = EXCLUDED.aircraft_type, + origin_icao = EXCLUDED.origin_icao, + destination_icao = EXCLUDED.destination_icao, + actual_off = EXCLUDED.actual_off, + actual_on = EXCLUDED.actual_on, + departure_delay = EXCLUDED.departure_delay, + arrival_delay = EXCLUDED.arrival_delay, + actual_distance = EXCLUDED.actual_distance, + fetched_at = now() + RETURNING id + """, + ( + fa_flight.get("fa_flight_id"), + fa_flight.get("ident"), + fa_flight.get("ident_icao"), + fa_flight.get("registration"), + fa_flight.get("aircraft_type"), + (fa_flight.get("origin") or {}).get("code_icao"), + (fa_flight.get("destination") or {}).get("code_icao"), + fa_flight.get("actual_off"), + fa_flight.get("actual_on"), + fa_flight.get("departure_delay"), + fa_flight.get("arrival_delay"), + fa_flight.get("route_distance"), + target_date, + ), + ) + row = cur.fetchone() + return row[0] if row else None + + +def upsert_track_points(conn, track_id: int, positions: List[Dict]): + """Delete old points and insert fresh ones. altitude is hundreds of feet → *100.""" + with conn.cursor() as cur: + cur.execute("DELETE FROM fr24_ext.track_points_fa WHERE track_id = %s", (track_id,)) + if not positions: + return + args = [ + ( + track_id, + p.get("timestamp"), + p.get("latitude"), + p.get("longitude"), + (p["altitude"] * 100) if p.get("altitude") is not None else None, + p.get("groundspeed"), + p.get("heading"), + p.get("update_type"), + ) + for p in positions + if p.get("latitude") is not None and p.get("longitude") is not None + ] + psycopg2.extras.execute_values( + cur, + """ + INSERT INTO fr24_ext.track_points_fa + (track_id, observed_at, lat, lon, altitude_ft, gspeed_kt, heading, update_type) + VALUES %s + """, + args, + ) + + +def run(target_date: date, conn) -> Dict: + """Main entry: load FA tracks for all scheduled flights on target_date.""" + log.info("FA tracks: starting for %s", target_date) + stats = { + "date": str(target_date), + "schedule_flights": 0, + "fa_flights_found": 0, + "tracks_loaded": 0, + "errors": 0, + } + + schedule_flights = get_schedule_flights(conn, target_date) + stats["schedule_flights"] = len(schedule_flights) + log.info("FA tracks: %d unique flights in schedule", len(schedule_flights)) + + for sched in schedule_flights: + ident = sched["flight_number"].replace(" ", "") # "SU 208" → "SU208" + try: + fa_flights = get_flights_for_ident(ident, target_date) + if not fa_flights: + log.debug("FA: no flights found for %s", ident) + continue + + for fa_flight in fa_flights: + fa_flight_id = fa_flight.get("fa_flight_id") + if not fa_flight_id: + continue + + stats["fa_flights_found"] += 1 + track_id = upsert_flight(conn, fa_flight, target_date) + if track_id is None: + continue + + positions = get_track(fa_flight_id) + upsert_track_points(conn, track_id, positions) + conn.commit() + stats["tracks_loaded"] += 1 + log.debug("FA: %s (%s) → %d points", ident, fa_flight_id, len(positions)) + + except Exception as e: + conn.rollback() + stats["errors"] += 1 + log.error("FA: error processing %s: %s", ident, e) + + log.info("FA tracks done: %s", stats) + return stats diff --git a/tasks/flightradar24/ingest/tracks_fa/main.py b/tasks/flightradar24/ingest/tracks_fa/main.py new file mode 100644 index 0000000..a29d965 --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fa/main.py @@ -0,0 +1,98 @@ +""" +FlightAware tracks service. +- GET /health — healthcheck +- POST /run?date=YYYY-MM-DD — manual trigger +""" +import logging +import sys +import time +from datetime import date, timedelta, datetime, timezone + +import psycopg2 +import psycopg2.extras +from flask import Flask, jsonify, request + +from config import config +from fa_worker import run as worker_run + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [tracks-fa] %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +log = logging.getLogger("tracks_fa") + +app = Flask(__name__) + +_last_run: dict = {"at": None, "status": "never", "stats": {}} +_conn = None + + +def get_conn(): + global _conn + if _conn is None or _conn.closed: + _conn = psycopg2.connect(config.DB_DSN) + psycopg2.extras.register_uuid(_conn) + log.info("DB connection established") + return _conn + + +def wait_for_db(max_attempts: int = 30): + for attempt in range(1, max_attempts + 1): + try: + get_conn() + return + except psycopg2.OperationalError as e: + log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e) + time.sleep(3) + raise SystemExit("Could not connect to DB") + + +@app.get("/health") +def health(): + try: + get_conn().cursor().execute("SELECT 1") + db_ok = True + except Exception: + db_ok = False + return jsonify({ + "status": "ok" if db_ok else "degraded", + "db": "ok" if db_ok else "error", + "last_run": _last_run, + }), 200 if db_ok else 503 + + +@app.post("/run") +def run_manual(): + date_str = request.args.get("date") + if date_str: + try: + target = date.fromisoformat(date_str) + except ValueError: + return jsonify({"error": "invalid date, use YYYY-MM-DD"}), 400 + else: + target = date.today() - timedelta(days=1) + + if _last_run.get("status") == "running": + return jsonify({"error": "already running"}), 409 + + _last_run["at"] = datetime.now(timezone.utc).isoformat() + _last_run["status"] = "running" + + try: + conn = get_conn() + stats = worker_run(target, conn) + _last_run.update(status="ok", stats=stats) + return jsonify({"status": "ok", "stats": stats}) + except Exception as e: + _last_run["status"] = f"error: {e}" + log.error("run failed: %s", e) + return jsonify({"error": str(e)}), 500 + + +if __name__ == "__main__": + wait_for_db() + open("/tmp/ready", "w").close() + log.info("Starting FA tracks service on port 8002") + app.run(host="0.0.0.0", port=8002, debug=False) diff --git a/tasks/flightradar24/ingest/tracks_fa/requirements.txt b/tasks/flightradar24/ingest/tracks_fa/requirements.txt new file mode 100644 index 0000000..7813aa9 --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fa/requirements.txt @@ -0,0 +1,3 @@ +flask==3.0.3 +psycopg2-binary==2.9.9 +requests==2.32.3 diff --git a/tasks/flightradar24/ingest/tracks_fr24/Dockerfile b/tasks/flightradar24/ingest/tracks_fr24/Dockerfile new file mode 100644 index 0000000..dc3c137 --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fr24/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY . . +CMD ["python", "main.py"] diff --git a/tasks/flightradar24/ingest/tracks_fr24/config.py b/tasks/flightradar24/ingest/tracks_fr24/config.py new file mode 100644 index 0000000..7867679 --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fr24/config.py @@ -0,0 +1,36 @@ +import os +from dataclasses import dataclass + + +@dataclass +class Config: + # Database + DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres") + DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432")) + DB_NAME: str = os.getenv("POSTGRES_DB", "fr24") + DB_USER: str = os.getenv("POSTGRES_USER", "fr24") + DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", "change-me") + + # FR24 API + FR24_API_KEY: str = os.getenv("FR24_API_KEY", "") + FR24_API_BASE: str = "https://fr24api.flightradar24.com" + + # Airports to track + AIRPORTS: str = "SVO,DME,VKO,ZIA" + + # Rate limit: 10 req/min for Explorer tier → 6s between requests + RATE_LIMIT_SEC: float = float(os.getenv("FR24_RATE_LIMIT_SEC", "6.0")) + + # Pagination page size + PAGE_SIZE: int = 100 + + @property + def DB_DSN(self) -> str: + return ( + f"host={self.DB_HOST} port={self.DB_PORT} " + f"dbname={self.DB_NAME} user={self.DB_USER} " + f"password={self.DB_PASSWORD}" + ) + + +config = Config() diff --git a/tasks/flightradar24/ingest/tracks_fr24/fr24_worker.py b/tasks/flightradar24/ingest/tracks_fr24/fr24_worker.py new file mode 100644 index 0000000..175264a --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fr24/fr24_worker.py @@ -0,0 +1,198 @@ +""" +FR24 tracks worker. +1. GET /api/flight-summary/light for each day → list of fr24_ids +2. GET /api/flight-tracks?flight_id={fr24_id} → track points +3. Upsert into fr24_ext.flight_tracks_fr24 + fr24_ext.track_points_fr24 +""" +import logging +import time +from datetime import date, datetime, timezone +from typing import Iterator, List, Dict, Optional + +import requests +import psycopg2 + +from config import config + +log = logging.getLogger("fr24_worker") + +HEADERS = { + "Authorization": f"Bearer {config.FR24_API_KEY}", + "Accept": "application/json", + "Accept-Version": "v1", +} + +_last_request_at: float = 0.0 + + +def _throttle(): + """Enforce rate limit: max 10 req/min → sleep if needed.""" + global _last_request_at + elapsed = time.monotonic() - _last_request_at + if elapsed < config.RATE_LIMIT_SEC: + time.sleep(config.RATE_LIMIT_SEC - elapsed) + _last_request_at = time.monotonic() + + +def _get(path: str, params: dict = None) -> dict: + _throttle() + url = f"{config.FR24_API_BASE}{path}" + resp = requests.get(url, headers=HEADERS, params=params, timeout=30) + if resp.status_code == 429: + retry_after = int(resp.headers.get("Retry-After", 60)) + log.warning("Rate limited, sleeping %ds", retry_after) + time.sleep(retry_after) + return _get(path, params) + resp.raise_for_status() + return resp.json() + + +def iter_flight_summaries(target_date: date) -> Iterator[Dict]: + """Paginate through flight-summary/light for all 4 airports.""" + dt_from = f"{target_date}T00:00:00" + dt_to = f"{target_date}T23:59:59" + offset = 0 + + while True: + data = _get("/api/flight-summary/light", params={ + "flight_datetime_from": dt_from, + "flight_datetime_to": dt_to, + "airports": config.AIRPORTS, + "limit": config.PAGE_SIZE, + "offset": offset, + }) + + items = data.get("data", data) if isinstance(data, dict) else data + if not items: + break + + for item in items: + yield item + + # pagination: if fewer items than page size, we're done + if len(items) < config.PAGE_SIZE: + break + offset += len(items) + + +def fetch_track(fr24_id: str) -> Optional[List[Dict]]: + """Fetch track points for a single flight.""" + try: + data = _get("/api/flight-tracks", params={"flight_id": fr24_id}) + # response is a list of {fr24_id, tracks: [...]} + if isinstance(data, list) and data: + return data[0].get("tracks", []) + if isinstance(data, dict): + return data.get("tracks", []) + return [] + except requests.HTTPError as e: + log.warning("Failed to fetch track for %s: %s", fr24_id, e) + return None + + +def upsert_flight(conn, summary: Dict, target_date: date) -> Optional[int]: + """Insert/update flight header, return DB id.""" + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24_ext.flight_tracks_fr24 + (fr24_id, flight_number, callsign, aircraft_type, registration, + origin_icao, destination_icao, actual_takeoff, actual_landed, flight_date) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON CONFLICT (fr24_id) DO UPDATE SET + flight_number = EXCLUDED.flight_number, + callsign = EXCLUDED.callsign, + aircraft_type = EXCLUDED.aircraft_type, + registration = EXCLUDED.registration, + origin_icao = EXCLUDED.origin_icao, + destination_icao = EXCLUDED.destination_icao, + actual_takeoff = EXCLUDED.actual_takeoff, + actual_landed = EXCLUDED.actual_landed, + fetched_at = now() + RETURNING id + """, + ( + summary.get("fr24_id"), + summary.get("flight"), + summary.get("callsign"), + summary.get("type"), + summary.get("reg"), + summary.get("orig_icao"), + summary.get("dest_icao"), + summary.get("datetime_takeoff"), + summary.get("datetime_landed"), + target_date, + ), + ) + row = cur.fetchone() + return row[0] if row else None + + +def upsert_track_points(conn, track_id: int, points: List[Dict]): + """Delete old points and insert fresh ones.""" + with conn.cursor() as cur: + cur.execute("DELETE FROM fr24_ext.track_points_fr24 WHERE track_id = %s", (track_id,)) + if not points: + return + args = [ + ( + track_id, + p.get("timestamp"), + p.get("lat"), + p.get("lon"), + p.get("alt"), + p.get("gspeed"), + p.get("vspeed"), + p.get("track"), + p.get("squawk"), + p.get("source"), + ) + for p in points + if p.get("lat") is not None and p.get("lon") is not None + ] + psycopg2.extras.execute_values( + cur, + """ + INSERT INTO fr24_ext.track_points_fr24 + (track_id, observed_at, lat, lon, altitude_ft, gspeed_kt, + vspeed_fpm, heading, squawk, source) + VALUES %s + """, + args, + ) + + +def run(target_date: date, conn) -> Dict: + """Main entry: load all tracks for target_date. Returns stats dict.""" + log.info("FR24 tracks: starting for %s", target_date) + stats = {"date": str(target_date), "flights_found": 0, "tracks_loaded": 0, "errors": 0} + + summaries = list(iter_flight_summaries(target_date)) + stats["flights_found"] = len(summaries) + log.info("FR24 tracks: found %d flights in summary", len(summaries)) + + for summary in summaries: + fr24_id = summary.get("fr24_id") + if not fr24_id: + continue + try: + track_id = upsert_flight(conn, summary, target_date) + if track_id is None: + continue + + points = fetch_track(fr24_id) + if points is None: + stats["errors"] += 1 + continue + + upsert_track_points(conn, track_id, points) + conn.commit() + stats["tracks_loaded"] += 1 + log.debug("FR24: %s → %d points", fr24_id, len(points)) + except Exception as e: + conn.rollback() + stats["errors"] += 1 + log.error("FR24: error processing %s: %s", fr24_id, e) + + log.info("FR24 tracks done: %s", stats) + return stats diff --git a/tasks/flightradar24/ingest/tracks_fr24/main.py b/tasks/flightradar24/ingest/tracks_fr24/main.py new file mode 100644 index 0000000..6f30eb4 --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fr24/main.py @@ -0,0 +1,98 @@ +""" +FR24 tracks service. +- GET /health — healthcheck +- POST /run?date=YYYY-MM-DD — manual trigger +""" +import logging +import sys +import time +from datetime import date, timedelta, datetime, timezone + +import psycopg2 +import psycopg2.extras +from flask import Flask, jsonify, request + +from config import config +from fr24_worker import run as worker_run + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [tracks-fr24] %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +log = logging.getLogger("tracks_fr24") + +app = Flask(__name__) + +_last_run: dict = {"at": None, "status": "never", "stats": {}} +_conn = None + + +def get_conn(): + global _conn + if _conn is None or _conn.closed: + _conn = psycopg2.connect(config.DB_DSN) + psycopg2.extras.register_uuid(_conn) + log.info("DB connection established") + return _conn + + +def wait_for_db(max_attempts: int = 30): + for attempt in range(1, max_attempts + 1): + try: + get_conn() + return + except psycopg2.OperationalError as e: + log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e) + time.sleep(3) + raise SystemExit("Could not connect to DB") + + +@app.get("/health") +def health(): + try: + get_conn().cursor().execute("SELECT 1") + db_ok = True + except Exception: + db_ok = False + return jsonify({ + "status": "ok" if db_ok else "degraded", + "db": "ok" if db_ok else "error", + "last_run": _last_run, + }), 200 if db_ok else 503 + + +@app.post("/run") +def run_manual(): + date_str = request.args.get("date") + if date_str: + try: + target = date.fromisoformat(date_str) + except ValueError: + return jsonify({"error": "invalid date, use YYYY-MM-DD"}), 400 + else: + target = date.today() - timedelta(days=1) + + if _last_run.get("status") == "running": + return jsonify({"error": "already running"}), 409 + + _last_run["at"] = datetime.now(timezone.utc).isoformat() + _last_run["status"] = "running" + + try: + conn = get_conn() + stats = worker_run(target, conn) + _last_run.update(status="ok", stats=stats) + return jsonify({"status": "ok", "stats": stats}) + except Exception as e: + _last_run["status"] = f"error: {e}" + log.error("run failed: %s", e) + return jsonify({"error": str(e)}), 500 + + +if __name__ == "__main__": + wait_for_db() + open("/tmp/ready", "w").close() + log.info("Starting FR24 tracks service on port 8001") + app.run(host="0.0.0.0", port=8001, debug=False) diff --git a/tasks/flightradar24/ingest/tracks_fr24/requirements.txt b/tasks/flightradar24/ingest/tracks_fr24/requirements.txt new file mode 100644 index 0000000..7813aa9 --- /dev/null +++ b/tasks/flightradar24/ingest/tracks_fr24/requirements.txt @@ -0,0 +1,3 @@ +flask==3.0.3 +psycopg2-binary==2.9.9 +requests==2.32.3