diff --git a/tasks/flightradar24/compose/docker-compose.yml b/tasks/flightradar24/compose/docker-compose.yml index 9b01e36..14d2885 100644 --- a/tasks/flightradar24/compose/docker-compose.yml +++ b/tasks/flightradar24/compose/docker-compose.yml @@ -185,6 +185,36 @@ services: networks: - fr24-net + fr24-schedule: + build: + context: ../ingest/schedule + dockerfile: Dockerfile + image: fr24-schedule + container_name: fr24-schedule + environment: + POSTGRES_HOST: ${POSTGRES_HOST:-postgres} + POSTGRES_PORT: ${POSTGRES_PORT:-5432} + POSTGRES_DB: ${POSTGRES_DB:-fr24} + POSTGRES_USER: ${POSTGRES_USER:-fr24} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change-me} + YANDEX_RASP_API_KEY: ${YANDEX_RASP_API_KEY:-} + OPENSKY_USERNAME: ${OPENSKY_USERNAME:-} + OPENSKY_PASSWORD: ${OPENSKY_PASSWORD:-} + SCHEDULE_RETENTION_DAYS: ${SCHEDULE_RETENTION_DAYS:-1095} + TZ: ${TZ:-UTC} + depends_on: + postgres: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "python -c 'import urllib.request; urllib.request.urlopen(\"http://localhost:8000/health\", timeout=3)' || exit 1"] + interval: 60s + timeout: 10s + retries: 3 + start_period: 20s + restart: unless-stopped + networks: + - fr24-net + networks: fr24-net: name: fr24-net diff --git a/tasks/flightradar24/db/init/004_schema_ext.sql b/tasks/flightradar24/db/init/004_schema_ext.sql new file mode 100644 index 0000000..25d07df --- /dev/null +++ b/tasks/flightradar24/db/init/004_schema_ext.sql @@ -0,0 +1,44 @@ +-- FR24 External data schema +-- Phase 2, Step 1: Airport schedule (Yandex.Rasp + OpenSky) + +CREATE SCHEMA IF NOT EXISTS fr24_ext; + +-- Airport schedule (merged from Yandex + OpenSky) +CREATE TABLE IF NOT EXISTS fr24_ext.schedule ( + schedule_id BIGSERIAL PRIMARY KEY, + flight_date DATE NOT NULL, + airport_iata CHAR(3) NOT NULL, + direction VARCHAR(10) NOT NULL CHECK (direction IN ('arrival', 'departure')), + flight_number VARCHAR(10) NOT NULL, + airline_iata CHAR(2), + airline_name VARCHAR(100), + origin_iata CHAR(3), + destination_iata CHAR(3), + aircraft_type VARCHAR(10), + scheduled_at TIMESTAMPTZ NOT NULL, + estimated_at TIMESTAMPTZ, + actual_at TIMESTAMPTZ, + status VARCHAR(20) DEFAULT 'scheduled' + CHECK (status IN ('scheduled', 'delayed', 'cancelled', 'departed', 'arrived')), + icao24 CHAR(6), + source VARCHAR(20) NOT NULL CHECK (source IN ('yandex', 'opensky', 'merged')), + fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (flight_number, airport_iata, scheduled_at, direction) +); + +CREATE INDEX IF NOT EXISTS idx_schedule_date ON fr24_ext.schedule (flight_date); +CREATE INDEX IF NOT EXISTS idx_schedule_airport ON fr24_ext.schedule (airport_iata); +CREATE INDEX IF NOT EXISTS idx_schedule_flight ON fr24_ext.schedule (flight_number); +CREATE INDEX IF NOT EXISTS idx_schedule_time ON fr24_ext.schedule (scheduled_at); +CREATE INDEX IF NOT EXISTS idx_schedule_dir ON fr24_ext.schedule (direction); +CREATE INDEX IF NOT EXISTS idx_schedule_status ON fr24_ext.schedule (status); + +-- Load state / backfill cursor +CREATE TABLE IF NOT EXISTS fr24_ext.load_state ( + state_key VARCHAR(50) PRIMARY KEY, + state_value JSONB NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +COMMENT ON TABLE fr24_ext.schedule IS 'Airport schedule from Yandex.Rasp + OpenSky, T-1 mode'; +COMMENT ON TABLE fr24_ext.load_state IS 'Backfill cursor and load progress tracking'; diff --git a/tasks/flightradar24/frontend/main.py b/tasks/flightradar24/frontend/main.py index ae6ab62..a940eac 100644 --- a/tasks/flightradar24/frontend/main.py +++ b/tasks/flightradar24/frontend/main.py @@ -2,6 +2,8 @@ FR24 API Service Minimal Flask API reading from PostgreSQL fr24 schema. """ +import csv +import io import os import time import logging @@ -10,7 +12,7 @@ from functools import wraps import psycopg2 import psycopg2.extras -from flask import Flask, jsonify, request, send_from_directory +from flask import Flask, jsonify, request, send_from_directory, Response logging.basicConfig( level=logging.INFO, @@ -335,6 +337,182 @@ def monitoring_page(): return send_from_directory("/app/static", "monitoring.html") +@app.get("/schedule") +def schedule_page(): + return send_from_directory("/app/static", "schedule.html") + + +# ── schedule API ────────────────────────────────────────────────────────────── + +def _schedule_where(args): + """Build WHERE clause + params list from request args.""" + clauses = [] + params = [] + + date_from = args.get("date_from") + date_to = args.get("date_to") + airport = args.get("airport", "all") + direction = args.get("direction", "all") + flight_number = args.get("flight_number", "").strip() + time_from = args.get("time_from", "").strip() + time_to = args.get("time_to", "").strip() + + if date_from: + clauses.append("flight_date >= %s") + params.append(date_from) + if date_to: + clauses.append("flight_date <= %s") + params.append(date_to) + if airport and airport != "all": + clauses.append("airport_iata = %s") + params.append(airport) + if direction and direction != "all": + clauses.append("direction = %s") + params.append(direction) + if flight_number: + clauses.append("flight_number ILIKE %s") + params.append(f"%{flight_number}%") + if time_from: + try: + h, m = map(int, time_from.split(":")) + clauses.append( + "(EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC') * 60 " + "+ EXTRACT(MINUTE FROM scheduled_at AT TIME ZONE 'UTC')) >= %s" + ) + params.append(h * 60 + m) + except ValueError: + pass + if time_to: + try: + h, m = map(int, time_to.split(":")) + clauses.append( + "(EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC') * 60 " + "+ EXTRACT(MINUTE FROM scheduled_at AT TIME ZONE 'UTC')) <= %s" + ) + params.append(h * 60 + m) + except ValueError: + pass + + where = " AND ".join(clauses) if clauses else "1=1" + return where, params + + +@app.get("/api/schedule/data") +def schedule_data(): + try: + limit = min(int(request.args.get("limit", 100)), 1000) + offset = max(int(request.args.get("offset", 0)), 0) + + where, params = _schedule_where(request.args) + + total_row = query_one( + f"SELECT COUNT(*) AS cnt FROM fr24_ext.schedule WHERE {where}", + params, + ) + total = total_row["cnt"] if total_row else 0 + + rows = query( + f""" + SELECT + flight_number, airline_name, airport_iata, direction, + origin_iata, destination_iata, + scheduled_at, actual_at, status, icao24, + flight_date + FROM fr24_ext.schedule + WHERE {where} + ORDER BY scheduled_at DESC + LIMIT %s OFFSET %s + """, + params + [limit, offset], + ) + + flights = [] + for r in rows: + sched = r["scheduled_at"] + actual = r["actual_at"] + delay_min = ( + int((actual - sched).total_seconds() / 60) + if actual and sched else None + ) + flights.append({ + "flight_number": r["flight_number"], + "airline": r["airline_name"], + "airport": r["airport_iata"], + "direction": r["direction"], + "origin": r["origin_iata"], + "destination": r["destination_iata"], + "scheduled_at": sched.isoformat() if sched else None, + "actual_at": actual.isoformat() if actual else None, + "delay_min": delay_min, + "status": r["status"], + "icao24": r["icao24"], + }) + + return ok({"total": total, "flights": flights}) + except Exception as e: + log.exception("schedule_data error") + return err(str(e)) + + +@app.get("/api/schedule/export") +def schedule_export(): + try: + where, params = _schedule_where(request.args) + + rows = query( + f""" + SELECT + flight_date, flight_number, airline_name, airport_iata, direction, + origin_iata, destination_iata, + scheduled_at, actual_at, status, icao24 + FROM fr24_ext.schedule + WHERE {where} + ORDER BY scheduled_at DESC + LIMIT 100000 + """, + params, + ) + + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow([ + "Date", "Flight", "Airline", "Airport", "Direction", + "Origin", "Destination", "Scheduled", "Actual", + "Delay (min)", "Status", "ICAO24", + ]) + for r in rows: + sched = r["scheduled_at"] + actual = r["actual_at"] + delay = ( + int((actual - sched).total_seconds() / 60) + if actual and sched else "" + ) + writer.writerow([ + str(r["flight_date"]), + r["flight_number"], + r["airline_name"] or "", + r["airport_iata"], + r["direction"], + r["origin_iata"] or "", + r["destination_iata"] or "", + sched.isoformat() if sched else "", + actual.isoformat() if actual else "", + delay, + r["status"] or "", + r["icao24"] or "", + ]) + + buf.seek(0) + return Response( + buf.getvalue(), + mimetype="text/csv; charset=utf-8", + headers={"Content-Disposition": "attachment; filename=schedule.csv"}, + ) + except Exception as e: + log.exception("schedule_export error") + return err(str(e)) + + @app.get("/api/monitoring/status") def monitoring_status(): """Return latest monitoring metrics + last 20 rows history.""" diff --git a/tasks/flightradar24/frontend/static/schedule.html b/tasks/flightradar24/frontend/static/schedule.html new file mode 100644 index 0000000..46de615 --- /dev/null +++ b/tasks/flightradar24/frontend/static/schedule.html @@ -0,0 +1,320 @@ + + + + + + FR24 — Табло аэропортов + + + + +
+ ← Главная +

✈ Табло аэропортов Москвы

+ +
+ +
+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + + +
+
+
+ +
+ + + + + + + + + + + + + + + + + + +
ДатаРейсАвиакомпанияАэропортНапр.МаршрутЗапланированоФактическиЗадержкаСтатус
Загрузка…
+
+ +
+ + + + + + diff --git a/tasks/flightradar24/frontend/static/schedule.js b/tasks/flightradar24/frontend/static/schedule.js new file mode 100644 index 0000000..5c4e58c --- /dev/null +++ b/tasks/flightradar24/frontend/static/schedule.js @@ -0,0 +1,257 @@ +/* schedule.js — filters, pagination, table render, CSV export */ + +const PAGE_SIZE = 100; +let currentOffset = 0; +let currentTotal = 0; + +// ── init ────────────────────────────────────────────────────────────────────── + +document.addEventListener("DOMContentLoaded", () => { + // Default date range: last 7 days + const today = new Date(); + const week = new Date(today); + week.setDate(today.getDate() - 7); + document.getElementById("f-date-from").value = fmtDate(week); + document.getElementById("f-date-to").value = fmtDate(today); + + loadData(); +}); + +// ── filter helpers ──────────────────────────────────────────────────────────── + +function getFilters() { + return { + date_from: document.getElementById("f-date-from").value || "", + date_to: document.getElementById("f-date-to").value || "", + airport: document.getElementById("f-airport").value, + direction: document.getElementById("f-direction").value, + flight_number: document.getElementById("f-flight").value.trim(), + time_from: document.getElementById("f-time-from").value || "", + time_to: document.getElementById("f-time-to").value || "", + }; +} + +function buildQuery(extra = {}) { + const f = { ...getFilters(), limit: PAGE_SIZE, offset: currentOffset, ...extra }; + return Object.entries(f) + .filter(([, v]) => v !== "" && v !== "all") + .map(([k, v]) => `${encodeURIComponent(k)}=${encodeURIComponent(v)}`) + .join("&"); +} + +function applyFilters() { + currentOffset = 0; + loadData(); +} + +function resetFilters() { + document.getElementById("f-date-from").value = ""; + document.getElementById("f-date-to").value = ""; + document.getElementById("f-airport").value = "all"; + document.getElementById("f-direction").value = "all"; + document.getElementById("f-flight").value = ""; + document.getElementById("f-time-from").value = ""; + document.getElementById("f-time-to").value = ""; + currentOffset = 0; + loadData(); +} + +// ── pagination ──────────────────────────────────────────────────────────────── + +function prevPage() { + if (currentOffset <= 0) return; + currentOffset = Math.max(0, currentOffset - PAGE_SIZE); + loadData(); +} + +function nextPage() { + if (currentOffset + PAGE_SIZE >= currentTotal) return; + currentOffset += PAGE_SIZE; + loadData(); +} + +function updatePagination() { + const page = Math.floor(currentOffset / PAGE_SIZE) + 1; + const pages = Math.max(1, Math.ceil(currentTotal / PAGE_SIZE)); + document.getElementById("page-info").textContent = `Стр. ${page} / ${pages}`; + document.getElementById("total-info").textContent = `Всего: ${currentTotal.toLocaleString("ru")}`; + document.getElementById("btn-prev").disabled = currentOffset <= 0; + document.getElementById("btn-next").disabled = currentOffset + PAGE_SIZE >= currentTotal; +} + +// ── data load ───────────────────────────────────────────────────────────────── + +async function loadData() { + setLoading(true); + try { + const resp = await fetch(`/api/schedule/data?${buildQuery()}`); + if (!resp.ok) throw new Error(`HTTP ${resp.status}`); + const data = await resp.json(); + currentTotal = data.total || 0; + renderTable(data.flights || []); + renderCards(data.flights || []); + updatePagination(); + document.getElementById("last-updated").textContent = + "Обновлено: " + new Date().toLocaleTimeString("ru"); + } catch (e) { + showError(e.message); + } finally { + setLoading(false); + } +} + +// ── render table ────────────────────────────────────────────────────────────── + +function renderTable(flights) { + const tbody = document.getElementById("table-body"); + if (!flights.length) { + tbody.innerHTML = `Нет данных по выбранным фильтрам`; + return; + } + + tbody.innerHTML = flights.map(f => { + const dirIcon = f.direction === "departure" + ? `` + : ``; + + const route = routeStr(f); + const sched = fmtTime(f.scheduled_at); + const actual = f.actual_at ? fmtTime(f.actual_at) : "—"; + const delay = delayCell(f.delay_min); + const badge = statusBadge(f.status); + const dateStr = fmtDateShort(f.scheduled_at); + + return ` + ${dateStr} + ${esc(f.flight_number)} + ${esc(f.airline || "—")} + ${esc(f.airport)} + ${dirIcon} + ${esc(route)} + ${sched} + ${actual} + ${delay} + ${badge} + `; + }).join(""); +} + +// ── render cards (mobile) ───────────────────────────────────────────────────── + +function renderCards(flights) { + const container = document.getElementById("cards-container"); + if (!flights.length) { + container.innerHTML = `
Нет данных по выбранным фильтрам
`; + return; + } + + container.innerHTML = flights.map(f => { + const dirLabel = f.direction === "departure" ? "↑ Вылет" : "↓ Прилёт"; + const dirClass = f.direction === "departure" ? "dir-dep" : "dir-arr"; + const route = routeStr(f); + const sched = fmtTime(f.scheduled_at); + const actual = f.actual_at ? fmtTime(f.actual_at) : "—"; + const delay = f.delay_min != null ? `${f.delay_min > 0 ? "+" : ""}${f.delay_min} мин` : "—"; + const badge = statusBadge(f.status); + + return `
+
+
+
${esc(f.flight_number)}
+
${esc(f.airline || "—")}
+
+ ${badge} +
+
Аэропорт${esc(f.airport)}
+
Направление${dirLabel}
+
Маршрут${esc(route)}
+
Запланировано${sched}
+
Фактически${actual}
+
Задержка${delay}
+
`; + }).join(""); +} + +// ── CSV export ──────────────────────────────────────────────────────────────── + +function exportCsv() { + const qs = buildQuery({ limit: 100000, offset: 0 }); + window.location.href = `/api/schedule/export?${qs}`; +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +function routeStr(f) { + const o = f.origin || ""; + const d = f.destination || ""; + if (!o && !d) return "—"; + if (!o) return `→ ${d}`; + if (!d) return `${o} →`; + return `${o} → ${d}`; +} + +function statusBadge(status) { + const map = { + scheduled: "badge-scheduled", + departed: "badge-departed", + arrived: "badge-arrived", + delayed: "badge-delayed", + cancelled: "badge-cancelled", + }; + const cls = map[status] || "badge-scheduled"; + const labels = { + scheduled: "По расписанию", + departed: "Вылетел", + arrived: "Прилетел", + delayed: "Задержан", + cancelled: "Отменён", + }; + return `${labels[status] || status || "—"}`; +} + +function delayCell(min) { + if (min == null) return "—"; + if (min > 0) return `+${min}`; + if (min < 0) return `${min}`; + return "0"; +} + +function fmtDate(d) { + return d.toISOString().slice(0, 10); +} + +function fmtDateShort(iso) { + if (!iso) return "—"; + return iso.slice(0, 10); +} + +function fmtTime(iso) { + if (!iso) return "—"; + try { + return new Date(iso).toLocaleTimeString("ru", { hour: "2-digit", minute: "2-digit", timeZone: "Europe/Moscow" }); + } catch { return iso.slice(11, 16); } +} + +function esc(s) { + if (!s) return "—"; + return String(s) + .replace(/&/g, "&") + .replace(//g, ">"); +} + +function setLoading(on) { + if (on) { + document.getElementById("table-body").innerHTML = + `Загрузка…`; + document.getElementById("cards-container").innerHTML = + `
Загрузка…
`; + } +} + +function showError(msg) { + document.getElementById("table-body").innerHTML = + `Ошибка: ${esc(msg)}`; + document.getElementById("cards-container").innerHTML = + `
Ошибка: ${esc(msg)}
`; +} diff --git a/tasks/flightradar24/ingest/schedule/Dockerfile b/tasks/flightradar24/ingest/schedule/Dockerfile new file mode 100644 index 0000000..ffb9a0b --- /dev/null +++ b/tasks/flightradar24/ingest/schedule/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY *.py ./ + +HEALTHCHECK --interval=60s --timeout=5s --start-period=15s \ + CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health', timeout=3)" || exit 1 + +CMD ["python", "main.py"] diff --git a/tasks/flightradar24/ingest/schedule/backfill.py b/tasks/flightradar24/ingest/schedule/backfill.py new file mode 100644 index 0000000..18c98e1 --- /dev/null +++ b/tasks/flightradar24/ingest/schedule/backfill.py @@ -0,0 +1,121 @@ +""" +Backfill CLI — loads historical schedule data for a date range. +Saves progress to fr24_ext.load_state so it can resume after interruption. + +Usage: + python backfill.py --start-date 2026-04-01 --end-date 2026-04-19 + python backfill.py --start-date 2026-04-01 --end-date 2026-04-19 --skip-opensky +""" +import argparse +import json +import logging +import sys +from datetime import date, timedelta + +import psycopg2 + +from config import config +from yandex_worker import fetch_day as yandex_fetch_day +from opensky_worker import enrich_day as opensky_enrich_day + +log = logging.getLogger(__name__) + +STATE_KEY = "backfill_last_date" + + +def load_state(conn, key: str): + with conn.cursor() as cur: + cur.execute( + "SELECT state_value FROM fr24_ext.load_state WHERE state_key = %s", + (key,), + ) + row = cur.fetchone() + return row[0] if row else None + + +def save_state(conn, key: str, value: dict): + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO fr24_ext.load_state (state_key, state_value) + VALUES (%s, %s::jsonb) + ON CONFLICT (state_key) DO UPDATE + SET state_value = EXCLUDED.state_value, + updated_at = now() + """, + (key, json.dumps(value)), + ) + conn.commit() + + +def main(): + parser = argparse.ArgumentParser(description="Backfill fr24_ext.schedule") + parser.add_argument("--start-date", required=True, help="YYYY-MM-DD") + parser.add_argument("--end-date", required=True, help="YYYY-MM-DD") + parser.add_argument("--skip-opensky", action="store_true", + help="Skip OpenSky enrichment (faster, no icao24)") + parser.add_argument("--resume", action="store_true", + help="Resume from last saved state (ignores --start-date if state exists)") + args = parser.parse_args() + + start = date.fromisoformat(args.start_date) + end = date.fromisoformat(args.end_date) + + if start > end: + log.error("start-date must be <= end-date") + sys.exit(1) + + conn = psycopg2.connect(config.DB_DSN) + + # Resume from saved state if requested + if args.resume: + state = load_state(conn, STATE_KEY) + if state and state.get("last_date"): + last = date.fromisoformat(state["last_date"]) + resume_from = last + timedelta(days=1) + if resume_from > start: + log.info("Resuming from %s (last completed: %s)", resume_from, last) + start = resume_from + + current = start + total_flights = 0 + total_enriched = 0 + + log.info("Backfill: %s → %s (%d days)", start, end, (end - start).days + 1) + + while current <= end: + log.info("── Processing %s ──", current) + + try: + yandex_count = yandex_fetch_day(current, conn) + total_flights += yandex_count + log.info("Yandex: %d flights", yandex_count) + + if not args.skip_opensky: + opensky_count = opensky_enrich_day(current, conn) + total_enriched += opensky_count + log.info("OpenSky: %d enriched", opensky_count) + + save_state(conn, STATE_KEY, {"last_date": current.isoformat()}) + + except KeyboardInterrupt: + log.info("Interrupted. Progress saved up to %s", current - timedelta(days=1)) + break + except Exception as e: + log.error("Failed on %s: %s — stopping", current, e) + break + + current += timedelta(days=1) + + conn.close() + log.info("Backfill done. Flights: %d, Enriched: %d", total_flights, total_enriched) + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [backfill] %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], + ) + main() diff --git a/tasks/flightradar24/ingest/schedule/config.py b/tasks/flightradar24/ingest/schedule/config.py new file mode 100644 index 0000000..7e8eb5b --- /dev/null +++ b/tasks/flightradar24/ingest/schedule/config.py @@ -0,0 +1,48 @@ +import os +from dataclasses import dataclass, field +from typing import Dict + + +@dataclass +class Config: + # Database + DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres") + DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432")) + DB_NAME: str = os.getenv("POSTGRES_DB", "fr24") + DB_USER: str = os.getenv("POSTGRES_USER", "fr24") + DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", "change-me") + + # API keys + YANDEX_RASP_API_KEY: str = os.getenv("YANDEX_RASP_API_KEY", "") + OPENSKY_USERNAME: str = os.getenv("OPENSKY_USERNAME", "") + OPENSKY_PASSWORD: str = os.getenv("OPENSKY_PASSWORD", "") + + # Airports: IATA → metadata + AIRPORTS: Dict = field(default_factory=lambda: { + "SVO": {"iata": "SVO", "icao": "UUEE", "yandex_code": "s9600213", "name": "Шереметьево"}, + "DME": {"iata": "DME", "icao": "UUDD", "yandex_code": "s9600366", "name": "Домодедово"}, + "VKO": {"iata": "VKO", "icao": "UUWW", "yandex_code": "s9600215", "name": "Внуково"}, + "ZIA": {"iata": "ZIA", "icao": "UUBW", "yandex_code": "s9881291", "name": "Жуковский"}, + }) + + # Rate limits (seconds between requests) + YANDEX_RATE_LIMIT_SEC: float = float(os.getenv("YANDEX_RATE_LIMIT_SEC", "1.0")) + OPENSKY_RATE_LIMIT_SEC: float = float(os.getenv("OPENSKY_RATE_LIMIT_SEC", "30.0")) + + # Retention + RETENTION_DAYS: int = int(os.getenv("SCHEDULE_RETENTION_DAYS", "1095")) + + # Cron schedule (UTC) + DAILY_RUN_HOUR: int = 2 + DAILY_RUN_MINUTE: int = 0 + + @property + def DB_DSN(self) -> str: + return ( + f"host={self.DB_HOST} port={self.DB_PORT} " + f"dbname={self.DB_NAME} user={self.DB_USER} " + f"password={self.DB_PASSWORD}" + ) + + +config = Config() diff --git a/tasks/flightradar24/ingest/schedule/main.py b/tasks/flightradar24/ingest/schedule/main.py new file mode 100644 index 0000000..8b90e37 --- /dev/null +++ b/tasks/flightradar24/ingest/schedule/main.py @@ -0,0 +1,111 @@ +""" +fr24-schedule main entry point. +- APScheduler cron: daily_job() at 02:00 UTC +- Flask healthcheck on port 8000 +- Cleanup job at 03:00 UTC (retention) +""" +import logging +import sys +import time +from datetime import date, timedelta, datetime, timezone + +import psycopg2 +from apscheduler.schedulers.background import BackgroundScheduler +from flask import Flask, jsonify + +from config import config +from yandex_worker import fetch_day as yandex_fetch_day +from opensky_worker import enrich_day as opensky_enrich_day + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [schedule] %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +log = logging.getLogger("schedule") + +app = Flask(__name__) + +_last_run: dict = {"at": None, "status": "never", "flights": 0, "enriched": 0} +_conn = None + + +def get_conn(): + global _conn + if _conn is None or _conn.closed: + _conn = psycopg2.connect(config.DB_DSN) + log.info("DB connection established") + return _conn + + +def daily_job(): + """T-1: load yesterday's schedule from Yandex then enrich with OpenSky.""" + target = date.today() - timedelta(days=1) + log.info("daily_job: starting for %s", target) + _last_run["at"] = datetime.now(timezone.utc).isoformat() + _last_run["status"] = "running" + + try: + conn = get_conn() + flights = yandex_fetch_day(target, conn) + enriched = opensky_enrich_day(target, conn) + _last_run.update(status="ok", flights=flights, enriched=enriched) + log.info("daily_job: done — %d flights, %d enriched", flights, enriched) + except Exception as e: + _last_run["status"] = f"error: {e}" + log.error("daily_job failed: %s", e) + + +def cleanup_job(): + """Delete records older than RETENTION_DAYS.""" + try: + conn = get_conn() + with conn.cursor() as cur: + cur.execute( + "DELETE FROM fr24_ext.schedule WHERE flight_date < CURRENT_DATE - %s::int * INTERVAL '1 day'", + (config.RETENTION_DAYS,), + ) + deleted = cur.rowcount + conn.commit() + log.info("cleanup_job: deleted %d old records (retention=%d days)", deleted, config.RETENTION_DAYS) + except Exception as e: + log.error("cleanup_job failed: %s", e) + + +@app.get("/health") +def health(): + try: + get_conn().cursor().execute("SELECT 1") + db_ok = True + except Exception: + db_ok = False + return jsonify({ + "status": "ok" if db_ok else "degraded", + "db": "ok" if db_ok else "error", + "last_run": _last_run, + }), 200 if db_ok else 503 + + +def wait_for_db(max_attempts: int = 30): + for attempt in range(1, max_attempts + 1): + try: + get_conn() + return + except psycopg2.OperationalError as e: + log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e) + time.sleep(3) + log.error("Could not connect to DB after %d attempts", max_attempts) + raise SystemExit(1) + + +if __name__ == "__main__": + wait_for_db() + + scheduler = BackgroundScheduler(timezone="UTC") + scheduler.add_job(daily_job, "cron", hour=config.DAILY_RUN_HOUR, minute=config.DAILY_RUN_MINUTE) + scheduler.add_job(cleanup_job, "cron", hour=3, minute=0) + scheduler.start() + log.info("Scheduler started — daily job at %02d:%02d UTC", config.DAILY_RUN_HOUR, config.DAILY_RUN_MINUTE) + + app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/tasks/flightradar24/ingest/schedule/opensky_worker.py b/tasks/flightradar24/ingest/schedule/opensky_worker.py new file mode 100644 index 0000000..3fc67f7 --- /dev/null +++ b/tasks/flightradar24/ingest/schedule/opensky_worker.py @@ -0,0 +1,141 @@ +""" +OpenSky Network API worker — enriches fr24_ext.schedule with icao24 + actual times. +""" +import logging +import time +from datetime import date, datetime, timezone +from typing import Dict, List, Optional +from functools import wraps + +import requests +import psycopg2 + +from config import config + +log = logging.getLogger(__name__) + +OPENSKY_BASE = "https://opensky-network.org/api/flights" + + +# ── retry decorator ─────────────────────────────────────────────────────────── + +def retry(max_retries: int = 3, base_delay: float = 10.0): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + for attempt in range(max_retries): + try: + return func(*args, **kwargs) + except requests.RequestException as e: + if attempt < max_retries - 1: + wait = base_delay * (2 ** attempt) + log.warning("Retry %d/%d after %.0fs: %s", attempt + 1, max_retries, wait, e) + time.sleep(wait) + else: + raise + return wrapper + return decorator + + +# ── API fetch ───────────────────────────────────────────────────────────────── + +@retry(max_retries=3, base_delay=10.0) +def fetch_opensky_flights(icao: str, begin_ts: int, end_ts: int, direction: str) -> List[Dict]: + """ + Fetch arrivals or departures from OpenSky for one airport/day. + direction: 'arrival' | 'departure' + """ + url = f"{OPENSKY_BASE}/{direction}" + params = {"airport": icao, "begin": begin_ts, "end": end_ts} + + auth = None + if config.OPENSKY_USERNAME and config.OPENSKY_PASSWORD: + auth = (config.OPENSKY_USERNAME, config.OPENSKY_PASSWORD) + + resp = requests.get(url, params=params, auth=auth, timeout=60) + + # 404 means no data for this period — not an error + if resp.status_code == 404: + return [] + + resp.raise_for_status() + return resp.json() or [] + + +# ── DB enrichment ───────────────────────────────────────────────────────────── + +def enrich_flights(conn, opensky_flights: List[Dict], airport_iata: str, direction: str) -> int: + """ + Update icao24 + actual_at on existing schedule rows matched by callsign. + Match window: ±3 hours around the OpenSky actual time. + """ + if not opensky_flights: + return 0 + + enriched = 0 + with conn.cursor() as cur: + for flight in opensky_flights: + icao24 = (flight.get("icao24") or "").strip().lower() + callsign = (flight.get("callsign") or "").strip() + + if not icao24 or not callsign: + continue + + # actual time: lastSeen for arrivals, firstSeen for departures + actual_ts = ( + flight.get("lastSeen") if direction == "arrival" + else flight.get("firstSeen") + ) + if not actual_ts: + continue + + actual_at = datetime.fromtimestamp(actual_ts, tz=timezone.utc) + + cur.execute( + """ + UPDATE fr24_ext.schedule + SET + icao24 = %s, + actual_at = %s, + source = CASE WHEN source = 'yandex' THEN 'merged' ELSE source END + WHERE airport_iata = %s + AND direction = %s + AND flight_number = %s + AND scheduled_at BETWEEN %s - INTERVAL '3 hours' + AND %s + INTERVAL '3 hours' + """, + (icao24, actual_at, airport_iata, direction, callsign, actual_at, actual_at), + ) + enriched += cur.rowcount + + return enriched + + +# ── main entry ──────────────────────────────────────────────────────────────── + +def enrich_day(target_date: date, conn) -> int: + """Enrich all airports for one day. Returns total rows updated.""" + # Unix timestamps for start/end of the day (UTC) + day_start = int(datetime(target_date.year, target_date.month, target_date.day, + tzinfo=timezone.utc).timestamp()) + day_end = day_start + 86400 + + total = 0 + + for airport_iata, airport_info in config.AIRPORTS.items(): + icao = airport_info["icao"] + log.info("OpenSky: enriching %s (%s) for %s", airport_iata, icao, target_date) + + for direction in ("arrival", "departure"): + try: + flights = fetch_opensky_flights(icao, day_start, day_end, direction) + count = enrich_flights(conn, flights, airport_iata, direction) + total += count + log.info("OpenSky: %s %s → %d rows enriched", airport_iata, direction, count) + except Exception as e: + log.error("OpenSky: failed %s %s: %s", airport_iata, direction, e) + + time.sleep(config.OPENSKY_RATE_LIMIT_SEC) + + conn.commit() + return total diff --git a/tasks/flightradar24/ingest/schedule/requirements.txt b/tasks/flightradar24/ingest/schedule/requirements.txt new file mode 100644 index 0000000..006896a --- /dev/null +++ b/tasks/flightradar24/ingest/schedule/requirements.txt @@ -0,0 +1,4 @@ +requests==2.31.0 +psycopg2-binary==2.9.9 +APScheduler==3.10.4 +Flask==3.0.3 diff --git a/tasks/flightradar24/ingest/schedule/yandex_worker.py b/tasks/flightradar24/ingest/schedule/yandex_worker.py new file mode 100644 index 0000000..b9cd4e3 --- /dev/null +++ b/tasks/flightradar24/ingest/schedule/yandex_worker.py @@ -0,0 +1,205 @@ +""" +Yandex.Rasp API worker — loads airport schedule into fr24_ext.schedule. +Makes two requests per airport per day: event=departure and event=arrival. +""" +import logging +import time +from datetime import date +from typing import Dict, List, Optional +from functools import wraps + +import requests +import psycopg2 + +from config import config + +log = logging.getLogger(__name__) + +YANDEX_URL = "https://api.rasp.yandex.net/v3.0/schedule/" + + +# ── retry decorator ─────────────────────────────────────────────────────────── + +def retry(max_retries: int = 3, base_delay: float = 5.0): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + for attempt in range(max_retries): + try: + return func(*args, **kwargs) + except requests.RequestException as e: + if attempt < max_retries - 1: + wait = base_delay * (2 ** attempt) + log.warning("Retry %d/%d after %.0fs: %s", attempt + 1, max_retries, wait, e) + time.sleep(wait) + else: + raise + return wrapper + return decorator + + +# ── API fetch ───────────────────────────────────────────────────────────────── + +@retry(max_retries=3, base_delay=5.0) +def _fetch_page(yandex_code: str, target_date: date, event: str, offset: int = 0) -> dict: + params = { + "apikey": config.YANDEX_RASP_API_KEY, + "station": yandex_code, + "date": target_date.isoformat(), + "transport_types": "plane", + "event": event, + "offset": offset, + "limit": 100, + } + resp = requests.get(YANDEX_URL, params=params, timeout=30) + resp.raise_for_status() + return resp.json() + + +def fetch_airport_schedule(yandex_code: str, target_date: date, direction: str) -> List[Dict]: + """ + Fetches all flights for one airport/direction with pagination. + direction: 'departure' | 'arrival' + """ + flights = [] + offset = 0 + + while True: + data = _fetch_page(yandex_code, target_date, event=direction, offset=offset) + items = data.get("schedule", []) + pagination = data.get("pagination", {}) + + for item in items: + flight = _parse_item(item, direction) + if flight: + flights.append(flight) + + total = pagination.get("total", 0) + offset += len(items) + + if offset >= total or not items: + break + + time.sleep(config.YANDEX_RATE_LIMIT_SEC) + + return flights + + +def _parse_item(item: Dict, direction: str) -> Optional[Dict]: + thread = item.get("thread", {}) + flight_number = thread.get("number", "").strip() + if not flight_number: + return None + + carrier = thread.get("carrier", {}) + + # Scheduled time: departure event → use 'departure' field; arrival → 'arrival' + scheduled_at = item.get("departure") if direction == "departure" else item.get("arrival") + if not scheduled_at: + # fallback + scheduled_at = item.get("departure") or item.get("arrival") + if not scheduled_at: + return None + + # Route: from/to station objects + from_station = item.get("from", {}) or {} + to_station = item.get("to", {}) or {} + origin_iata = _station_iata(from_station) + destination_iata = _station_iata(to_station) + + return { + "flight_number": flight_number, + "airline_iata": carrier.get("code"), + "airline_name": carrier.get("title"), + "origin_iata": origin_iata, + "destination_iata": destination_iata, + "aircraft_type": thread.get("vehicle"), + "scheduled_at": scheduled_at, + "direction": direction, + "status": "scheduled", + "source": "yandex", + } + + +def _station_iata(station: Dict) -> Optional[str]: + """Extract IATA code from station dict (codes list or direct field).""" + codes = station.get("codes", {}) + if isinstance(codes, dict): + iata = codes.get("iata") + if iata: + return iata[:3].upper() + # fallback: station code field + code = station.get("code", "") + if code and not code.startswith("s"): # yandex internal codes start with 's' + return code[:3].upper() + return None + + +# ── DB upsert ───────────────────────────────────────────────────────────────── + +def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: date) -> int: + if not flights: + return 0 + + with conn.cursor() as cur: + for flight in flights: + cur.execute( + """ + INSERT INTO fr24_ext.schedule + (flight_date, airport_iata, direction, flight_number, + airline_iata, airline_name, origin_iata, destination_iata, + aircraft_type, scheduled_at, status, source) + VALUES + (%(flight_date)s, %(airport_iata)s, %(direction)s, %(flight_number)s, + %(airline_iata)s, %(airline_name)s, %(origin_iata)s, %(destination_iata)s, + %(aircraft_type)s, %(scheduled_at)s, %(status)s, %(source)s) + ON CONFLICT (flight_number, airport_iata, scheduled_at, direction) + DO UPDATE SET + airline_name = EXCLUDED.airline_name, + origin_iata = COALESCE(EXCLUDED.origin_iata, fr24_ext.schedule.origin_iata), + destination_iata = COALESCE(EXCLUDED.destination_iata, fr24_ext.schedule.destination_iata), + aircraft_type = COALESCE(EXCLUDED.aircraft_type, fr24_ext.schedule.aircraft_type), + status = EXCLUDED.status, + fetched_at = now() + """, + { + "flight_date": flight_date, + "airport_iata": airport_iata, + "direction": flight["direction"], + "flight_number": flight["flight_number"], + "airline_iata": flight.get("airline_iata"), + "airline_name": flight.get("airline_name"), + "origin_iata": flight.get("origin_iata"), + "destination_iata": flight.get("destination_iata"), + "aircraft_type": flight.get("aircraft_type"), + "scheduled_at": flight["scheduled_at"], + "status": flight.get("status", "scheduled"), + "source": flight["source"], + }, + ) + return len(flights) + + +# ── main entry ──────────────────────────────────────────────────────────────── + +def fetch_day(target_date: date, conn) -> int: + """Load schedule for all airports for one day. Returns total flights upserted.""" + total = 0 + + for airport_iata, airport_info in config.AIRPORTS.items(): + yandex_code = airport_info["yandex_code"] + log.info("Yandex: fetching %s (%s) for %s", airport_iata, yandex_code, target_date) + + for direction in ("departure", "arrival"): + try: + flights = fetch_airport_schedule(yandex_code, target_date, direction) + count = upsert_flights(conn, flights, airport_iata, target_date) + total += count + log.info("Yandex: %s %s → %d flights upserted", airport_iata, direction, count) + except Exception as e: + log.error("Yandex: failed %s %s: %s", airport_iata, direction, e) + + time.sleep(config.YANDEX_RATE_LIMIT_SEC) + + conn.commit() + return total