auto-sync: 2026-04-20 14:20:01
This commit is contained in:
@@ -185,6 +185,36 @@ services:
|
||||
networks:
|
||||
- fr24-net
|
||||
|
||||
fr24-schedule:
|
||||
build:
|
||||
context: ../ingest/schedule
|
||||
dockerfile: Dockerfile
|
||||
image: fr24-schedule
|
||||
container_name: fr24-schedule
|
||||
environment:
|
||||
POSTGRES_HOST: ${POSTGRES_HOST:-postgres}
|
||||
POSTGRES_PORT: ${POSTGRES_PORT:-5432}
|
||||
POSTGRES_DB: ${POSTGRES_DB:-fr24}
|
||||
POSTGRES_USER: ${POSTGRES_USER:-fr24}
|
||||
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-change-me}
|
||||
YANDEX_RASP_API_KEY: ${YANDEX_RASP_API_KEY:-}
|
||||
OPENSKY_USERNAME: ${OPENSKY_USERNAME:-}
|
||||
OPENSKY_PASSWORD: ${OPENSKY_PASSWORD:-}
|
||||
SCHEDULE_RETENTION_DAYS: ${SCHEDULE_RETENTION_DAYS:-1095}
|
||||
TZ: ${TZ:-UTC}
|
||||
depends_on:
|
||||
postgres:
|
||||
condition: service_healthy
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "python -c 'import urllib.request; urllib.request.urlopen(\"http://localhost:8000/health\", timeout=3)' || exit 1"]
|
||||
interval: 60s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 20s
|
||||
restart: unless-stopped
|
||||
networks:
|
||||
- fr24-net
|
||||
|
||||
networks:
|
||||
fr24-net:
|
||||
name: fr24-net
|
||||
|
||||
44
tasks/flightradar24/db/init/004_schema_ext.sql
Normal file
44
tasks/flightradar24/db/init/004_schema_ext.sql
Normal file
@@ -0,0 +1,44 @@
|
||||
-- FR24 External data schema
|
||||
-- Phase 2, Step 1: Airport schedule (Yandex.Rasp + OpenSky)
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS fr24_ext;
|
||||
|
||||
-- Airport schedule (merged from Yandex + OpenSky)
|
||||
CREATE TABLE IF NOT EXISTS fr24_ext.schedule (
|
||||
schedule_id BIGSERIAL PRIMARY KEY,
|
||||
flight_date DATE NOT NULL,
|
||||
airport_iata CHAR(3) NOT NULL,
|
||||
direction VARCHAR(10) NOT NULL CHECK (direction IN ('arrival', 'departure')),
|
||||
flight_number VARCHAR(10) NOT NULL,
|
||||
airline_iata CHAR(2),
|
||||
airline_name VARCHAR(100),
|
||||
origin_iata CHAR(3),
|
||||
destination_iata CHAR(3),
|
||||
aircraft_type VARCHAR(10),
|
||||
scheduled_at TIMESTAMPTZ NOT NULL,
|
||||
estimated_at TIMESTAMPTZ,
|
||||
actual_at TIMESTAMPTZ,
|
||||
status VARCHAR(20) DEFAULT 'scheduled'
|
||||
CHECK (status IN ('scheduled', 'delayed', 'cancelled', 'departed', 'arrived')),
|
||||
icao24 CHAR(6),
|
||||
source VARCHAR(20) NOT NULL CHECK (source IN ('yandex', 'opensky', 'merged')),
|
||||
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
UNIQUE (flight_number, airport_iata, scheduled_at, direction)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_schedule_date ON fr24_ext.schedule (flight_date);
|
||||
CREATE INDEX IF NOT EXISTS idx_schedule_airport ON fr24_ext.schedule (airport_iata);
|
||||
CREATE INDEX IF NOT EXISTS idx_schedule_flight ON fr24_ext.schedule (flight_number);
|
||||
CREATE INDEX IF NOT EXISTS idx_schedule_time ON fr24_ext.schedule (scheduled_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_schedule_dir ON fr24_ext.schedule (direction);
|
||||
CREATE INDEX IF NOT EXISTS idx_schedule_status ON fr24_ext.schedule (status);
|
||||
|
||||
-- Load state / backfill cursor
|
||||
CREATE TABLE IF NOT EXISTS fr24_ext.load_state (
|
||||
state_key VARCHAR(50) PRIMARY KEY,
|
||||
state_value JSONB NOT NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
COMMENT ON TABLE fr24_ext.schedule IS 'Airport schedule from Yandex.Rasp + OpenSky, T-1 mode';
|
||||
COMMENT ON TABLE fr24_ext.load_state IS 'Backfill cursor and load progress tracking';
|
||||
@@ -2,6 +2,8 @@
|
||||
FR24 API Service
|
||||
Minimal Flask API reading from PostgreSQL fr24 schema.
|
||||
"""
|
||||
import csv
|
||||
import io
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
@@ -10,7 +12,7 @@ from functools import wraps
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
from flask import Flask, jsonify, request, send_from_directory
|
||||
from flask import Flask, jsonify, request, send_from_directory, Response
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -335,6 +337,182 @@ def monitoring_page():
|
||||
return send_from_directory("/app/static", "monitoring.html")
|
||||
|
||||
|
||||
@app.get("/schedule")
|
||||
def schedule_page():
|
||||
return send_from_directory("/app/static", "schedule.html")
|
||||
|
||||
|
||||
# ── schedule API ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _schedule_where(args):
|
||||
"""Build WHERE clause + params list from request args."""
|
||||
clauses = []
|
||||
params = []
|
||||
|
||||
date_from = args.get("date_from")
|
||||
date_to = args.get("date_to")
|
||||
airport = args.get("airport", "all")
|
||||
direction = args.get("direction", "all")
|
||||
flight_number = args.get("flight_number", "").strip()
|
||||
time_from = args.get("time_from", "").strip()
|
||||
time_to = args.get("time_to", "").strip()
|
||||
|
||||
if date_from:
|
||||
clauses.append("flight_date >= %s")
|
||||
params.append(date_from)
|
||||
if date_to:
|
||||
clauses.append("flight_date <= %s")
|
||||
params.append(date_to)
|
||||
if airport and airport != "all":
|
||||
clauses.append("airport_iata = %s")
|
||||
params.append(airport)
|
||||
if direction and direction != "all":
|
||||
clauses.append("direction = %s")
|
||||
params.append(direction)
|
||||
if flight_number:
|
||||
clauses.append("flight_number ILIKE %s")
|
||||
params.append(f"%{flight_number}%")
|
||||
if time_from:
|
||||
try:
|
||||
h, m = map(int, time_from.split(":"))
|
||||
clauses.append(
|
||||
"(EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC') * 60 "
|
||||
"+ EXTRACT(MINUTE FROM scheduled_at AT TIME ZONE 'UTC')) >= %s"
|
||||
)
|
||||
params.append(h * 60 + m)
|
||||
except ValueError:
|
||||
pass
|
||||
if time_to:
|
||||
try:
|
||||
h, m = map(int, time_to.split(":"))
|
||||
clauses.append(
|
||||
"(EXTRACT(HOUR FROM scheduled_at AT TIME ZONE 'UTC') * 60 "
|
||||
"+ EXTRACT(MINUTE FROM scheduled_at AT TIME ZONE 'UTC')) <= %s"
|
||||
)
|
||||
params.append(h * 60 + m)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
where = " AND ".join(clauses) if clauses else "1=1"
|
||||
return where, params
|
||||
|
||||
|
||||
@app.get("/api/schedule/data")
|
||||
def schedule_data():
|
||||
try:
|
||||
limit = min(int(request.args.get("limit", 100)), 1000)
|
||||
offset = max(int(request.args.get("offset", 0)), 0)
|
||||
|
||||
where, params = _schedule_where(request.args)
|
||||
|
||||
total_row = query_one(
|
||||
f"SELECT COUNT(*) AS cnt FROM fr24_ext.schedule WHERE {where}",
|
||||
params,
|
||||
)
|
||||
total = total_row["cnt"] if total_row else 0
|
||||
|
||||
rows = query(
|
||||
f"""
|
||||
SELECT
|
||||
flight_number, airline_name, airport_iata, direction,
|
||||
origin_iata, destination_iata,
|
||||
scheduled_at, actual_at, status, icao24,
|
||||
flight_date
|
||||
FROM fr24_ext.schedule
|
||||
WHERE {where}
|
||||
ORDER BY scheduled_at DESC
|
||||
LIMIT %s OFFSET %s
|
||||
""",
|
||||
params + [limit, offset],
|
||||
)
|
||||
|
||||
flights = []
|
||||
for r in rows:
|
||||
sched = r["scheduled_at"]
|
||||
actual = r["actual_at"]
|
||||
delay_min = (
|
||||
int((actual - sched).total_seconds() / 60)
|
||||
if actual and sched else None
|
||||
)
|
||||
flights.append({
|
||||
"flight_number": r["flight_number"],
|
||||
"airline": r["airline_name"],
|
||||
"airport": r["airport_iata"],
|
||||
"direction": r["direction"],
|
||||
"origin": r["origin_iata"],
|
||||
"destination": r["destination_iata"],
|
||||
"scheduled_at": sched.isoformat() if sched else None,
|
||||
"actual_at": actual.isoformat() if actual else None,
|
||||
"delay_min": delay_min,
|
||||
"status": r["status"],
|
||||
"icao24": r["icao24"],
|
||||
})
|
||||
|
||||
return ok({"total": total, "flights": flights})
|
||||
except Exception as e:
|
||||
log.exception("schedule_data error")
|
||||
return err(str(e))
|
||||
|
||||
|
||||
@app.get("/api/schedule/export")
|
||||
def schedule_export():
|
||||
try:
|
||||
where, params = _schedule_where(request.args)
|
||||
|
||||
rows = query(
|
||||
f"""
|
||||
SELECT
|
||||
flight_date, flight_number, airline_name, airport_iata, direction,
|
||||
origin_iata, destination_iata,
|
||||
scheduled_at, actual_at, status, icao24
|
||||
FROM fr24_ext.schedule
|
||||
WHERE {where}
|
||||
ORDER BY scheduled_at DESC
|
||||
LIMIT 100000
|
||||
""",
|
||||
params,
|
||||
)
|
||||
|
||||
buf = io.StringIO()
|
||||
writer = csv.writer(buf)
|
||||
writer.writerow([
|
||||
"Date", "Flight", "Airline", "Airport", "Direction",
|
||||
"Origin", "Destination", "Scheduled", "Actual",
|
||||
"Delay (min)", "Status", "ICAO24",
|
||||
])
|
||||
for r in rows:
|
||||
sched = r["scheduled_at"]
|
||||
actual = r["actual_at"]
|
||||
delay = (
|
||||
int((actual - sched).total_seconds() / 60)
|
||||
if actual and sched else ""
|
||||
)
|
||||
writer.writerow([
|
||||
str(r["flight_date"]),
|
||||
r["flight_number"],
|
||||
r["airline_name"] or "",
|
||||
r["airport_iata"],
|
||||
r["direction"],
|
||||
r["origin_iata"] or "",
|
||||
r["destination_iata"] or "",
|
||||
sched.isoformat() if sched else "",
|
||||
actual.isoformat() if actual else "",
|
||||
delay,
|
||||
r["status"] or "",
|
||||
r["icao24"] or "",
|
||||
])
|
||||
|
||||
buf.seek(0)
|
||||
return Response(
|
||||
buf.getvalue(),
|
||||
mimetype="text/csv; charset=utf-8",
|
||||
headers={"Content-Disposition": "attachment; filename=schedule.csv"},
|
||||
)
|
||||
except Exception as e:
|
||||
log.exception("schedule_export error")
|
||||
return err(str(e))
|
||||
|
||||
|
||||
@app.get("/api/monitoring/status")
|
||||
def monitoring_status():
|
||||
"""Return latest monitoring metrics + last 20 rows history."""
|
||||
|
||||
320
tasks/flightradar24/frontend/static/schedule.html
Normal file
320
tasks/flightradar24/frontend/static/schedule.html
Normal file
@@ -0,0 +1,320 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="ru">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>FR24 — Табло аэропортов</title>
|
||||
<style>
|
||||
* { box-sizing: border-box; margin: 0; padding: 0; }
|
||||
|
||||
body {
|
||||
background: #0d1117;
|
||||
color: #c9d1d9;
|
||||
font-family: 'Segoe UI', system-ui, sans-serif;
|
||||
font-size: 14px;
|
||||
min-height: 100vh;
|
||||
}
|
||||
|
||||
header {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 16px;
|
||||
padding: 20px 24px 16px;
|
||||
border-bottom: 1px solid #21262d;
|
||||
}
|
||||
|
||||
header h1 { font-size: 18px; font-weight: 600; color: #e6edf3; }
|
||||
|
||||
header a {
|
||||
color: #58a6ff;
|
||||
text-decoration: none;
|
||||
font-size: 13px;
|
||||
opacity: 0.8;
|
||||
}
|
||||
header a:hover { opacity: 1; }
|
||||
|
||||
#last-updated { margin-left: auto; font-size: 12px; color: #6e7681; }
|
||||
|
||||
/* ── filters ── */
|
||||
.filters {
|
||||
background: #161b22;
|
||||
border-bottom: 1px solid #21262d;
|
||||
padding: 14px 24px;
|
||||
}
|
||||
|
||||
.filters-grid {
|
||||
display: flex;
|
||||
flex-wrap: wrap;
|
||||
gap: 10px;
|
||||
align-items: flex-end;
|
||||
}
|
||||
|
||||
.filter-group {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 4px;
|
||||
}
|
||||
|
||||
.filter-group label {
|
||||
font-size: 11px;
|
||||
text-transform: uppercase;
|
||||
letter-spacing: 0.06em;
|
||||
color: #6e7681;
|
||||
}
|
||||
|
||||
.filter-group input,
|
||||
.filter-group select {
|
||||
background: #0d1117;
|
||||
border: 1px solid #30363d;
|
||||
border-radius: 6px;
|
||||
color: #c9d1d9;
|
||||
font-size: 13px;
|
||||
padding: 6px 10px;
|
||||
height: 32px;
|
||||
outline: none;
|
||||
transition: border-color 0.15s;
|
||||
}
|
||||
|
||||
.filter-group input:focus,
|
||||
.filter-group select:focus { border-color: #58a6ff; }
|
||||
|
||||
.filter-group input[type="date"] { width: 140px; }
|
||||
.filter-group input[type="time"] { width: 100px; }
|
||||
.filter-group input[type="text"] { width: 130px; }
|
||||
.filter-group select { width: 130px; }
|
||||
|
||||
.filter-actions {
|
||||
display: flex;
|
||||
gap: 8px;
|
||||
align-items: flex-end;
|
||||
margin-left: auto;
|
||||
}
|
||||
|
||||
button {
|
||||
border: none;
|
||||
border-radius: 6px;
|
||||
cursor: pointer;
|
||||
font-size: 13px;
|
||||
font-weight: 500;
|
||||
height: 32px;
|
||||
padding: 0 14px;
|
||||
transition: opacity 0.15s;
|
||||
}
|
||||
button:hover { opacity: 0.85; }
|
||||
|
||||
.btn-primary { background: #238636; color: #fff; }
|
||||
.btn-secondary { background: #21262d; color: #c9d1d9; border: 1px solid #30363d; }
|
||||
.btn-export { background: #1f6feb; color: #fff; }
|
||||
|
||||
/* ── table area ── */
|
||||
.table-wrap {
|
||||
padding: 16px 24px;
|
||||
overflow-x: auto;
|
||||
}
|
||||
|
||||
table {
|
||||
width: 100%;
|
||||
border-collapse: collapse;
|
||||
font-size: 13px;
|
||||
}
|
||||
|
||||
thead th {
|
||||
background: #161b22;
|
||||
border-bottom: 1px solid #30363d;
|
||||
color: #8b949e;
|
||||
font-size: 11px;
|
||||
font-weight: 600;
|
||||
letter-spacing: 0.06em;
|
||||
padding: 10px 12px;
|
||||
text-align: left;
|
||||
text-transform: uppercase;
|
||||
white-space: nowrap;
|
||||
}
|
||||
|
||||
tbody tr {
|
||||
border-bottom: 1px solid #21262d;
|
||||
transition: background 0.1s;
|
||||
}
|
||||
tbody tr:hover { background: #161b22; }
|
||||
|
||||
tbody td {
|
||||
padding: 9px 12px;
|
||||
vertical-align: middle;
|
||||
white-space: nowrap;
|
||||
}
|
||||
|
||||
.dir-icon { font-size: 16px; }
|
||||
.dir-dep { color: #58a6ff; }
|
||||
.dir-arr { color: #3fb950; }
|
||||
|
||||
/* status badges */
|
||||
.badge {
|
||||
border-radius: 4px;
|
||||
display: inline-block;
|
||||
font-size: 11px;
|
||||
font-weight: 600;
|
||||
letter-spacing: 0.04em;
|
||||
padding: 2px 8px;
|
||||
text-transform: uppercase;
|
||||
}
|
||||
.badge-scheduled { background: #21262d; color: #8b949e; }
|
||||
.badge-departed { background: #0d4429; color: #3fb950; }
|
||||
.badge-arrived { background: #0d4429; color: #3fb950; }
|
||||
.badge-delayed { background: #3d2b00; color: #d29922; }
|
||||
.badge-cancelled { background: #3d0c0c; color: #f85149; }
|
||||
|
||||
.delay-pos { color: #d29922; }
|
||||
.delay-neg { color: #3fb950; }
|
||||
|
||||
/* ── pagination ── */
|
||||
.pagination {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 12px;
|
||||
padding: 12px 24px 20px;
|
||||
font-size: 13px;
|
||||
color: #8b949e;
|
||||
}
|
||||
.pagination button { height: 28px; padding: 0 12px; font-size: 12px; }
|
||||
#page-info { min-width: 120px; text-align: center; }
|
||||
|
||||
/* ── empty / loading ── */
|
||||
.state-msg {
|
||||
padding: 48px;
|
||||
text-align: center;
|
||||
color: #6e7681;
|
||||
font-size: 14px;
|
||||
}
|
||||
|
||||
/* ── mobile cards ── */
|
||||
.cards { display: none; padding: 12px 16px; }
|
||||
|
||||
.card {
|
||||
background: #161b22;
|
||||
border: 1px solid #30363d;
|
||||
border-radius: 8px;
|
||||
margin-bottom: 10px;
|
||||
padding: 14px 16px;
|
||||
}
|
||||
|
||||
.card-header {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: space-between;
|
||||
margin-bottom: 8px;
|
||||
}
|
||||
|
||||
.card-flight { font-size: 16px; font-weight: 700; color: #e6edf3; }
|
||||
.card-airline { font-size: 12px; color: #8b949e; margin-top: 2px; }
|
||||
|
||||
.card-row {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
font-size: 12px;
|
||||
color: #8b949e;
|
||||
margin-top: 6px;
|
||||
}
|
||||
.card-row span:last-child { color: #c9d1d9; }
|
||||
|
||||
@media (max-width: 767px) {
|
||||
.table-wrap { display: none; }
|
||||
.cards { display: block; }
|
||||
.filters-grid { flex-direction: column; }
|
||||
.filter-actions { margin-left: 0; }
|
||||
.filter-group input,
|
||||
.filter-group select { width: 100%; }
|
||||
header { padding: 14px 16px; }
|
||||
.filters { padding: 12px 16px; }
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<header>
|
||||
<a href="/">← Главная</a>
|
||||
<h1>✈ Табло аэропортов Москвы</h1>
|
||||
<span id="last-updated"></span>
|
||||
</header>
|
||||
|
||||
<div class="filters">
|
||||
<div class="filters-grid">
|
||||
<div class="filter-group">
|
||||
<label>Дата от</label>
|
||||
<input type="date" id="f-date-from">
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label>Дата до</label>
|
||||
<input type="date" id="f-date-to">
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label>Аэропорт</label>
|
||||
<select id="f-airport">
|
||||
<option value="all">Все</option>
|
||||
<option value="SVO">SVO — Шереметьево</option>
|
||||
<option value="DME">DME — Домодедово</option>
|
||||
<option value="VKO">VKO — Внуково</option>
|
||||
<option value="ZIA">ZIA — Жуковский</option>
|
||||
</select>
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label>Направление</label>
|
||||
<select id="f-direction">
|
||||
<option value="all">Все</option>
|
||||
<option value="departure">↑ Вылет</option>
|
||||
<option value="arrival">↓ Прилёт</option>
|
||||
</select>
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label>Рейс</label>
|
||||
<input type="text" id="f-flight" placeholder="SU1234">
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label>Время от</label>
|
||||
<input type="time" id="f-time-from">
|
||||
</div>
|
||||
<div class="filter-group">
|
||||
<label>Время до</label>
|
||||
<input type="time" id="f-time-to">
|
||||
</div>
|
||||
<div class="filter-actions">
|
||||
<button class="btn-primary" onclick="applyFilters()">Применить</button>
|
||||
<button class="btn-secondary" onclick="resetFilters()">Сброс</button>
|
||||
<button class="btn-export" onclick="exportCsv()">Экспорт CSV</button>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="table-wrap">
|
||||
<table id="schedule-table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Дата</th>
|
||||
<th>Рейс</th>
|
||||
<th>Авиакомпания</th>
|
||||
<th>Аэропорт</th>
|
||||
<th>Напр.</th>
|
||||
<th>Маршрут</th>
|
||||
<th>Запланировано</th>
|
||||
<th>Фактически</th>
|
||||
<th>Задержка</th>
|
||||
<th>Статус</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody id="table-body">
|
||||
<tr><td colspan="10" class="state-msg">Загрузка…</td></tr>
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
|
||||
<div class="cards" id="cards-container"></div>
|
||||
|
||||
<div class="pagination">
|
||||
<button class="btn-secondary" id="btn-prev" onclick="prevPage()">← Назад</button>
|
||||
<span id="page-info">—</span>
|
||||
<button class="btn-secondary" id="btn-next" onclick="nextPage()">Вперёд →</button>
|
||||
<span id="total-info" style="margin-left:auto;"></span>
|
||||
</div>
|
||||
|
||||
<script src="/static/schedule.js"></script>
|
||||
</body>
|
||||
</html>
|
||||
257
tasks/flightradar24/frontend/static/schedule.js
Normal file
257
tasks/flightradar24/frontend/static/schedule.js
Normal file
@@ -0,0 +1,257 @@
|
||||
/* schedule.js — filters, pagination, table render, CSV export */
|
||||
|
||||
const PAGE_SIZE = 100;
|
||||
let currentOffset = 0;
|
||||
let currentTotal = 0;
|
||||
|
||||
// ── init ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
document.addEventListener("DOMContentLoaded", () => {
|
||||
// Default date range: last 7 days
|
||||
const today = new Date();
|
||||
const week = new Date(today);
|
||||
week.setDate(today.getDate() - 7);
|
||||
document.getElementById("f-date-from").value = fmtDate(week);
|
||||
document.getElementById("f-date-to").value = fmtDate(today);
|
||||
|
||||
loadData();
|
||||
});
|
||||
|
||||
// ── filter helpers ────────────────────────────────────────────────────────────
|
||||
|
||||
function getFilters() {
|
||||
return {
|
||||
date_from: document.getElementById("f-date-from").value || "",
|
||||
date_to: document.getElementById("f-date-to").value || "",
|
||||
airport: document.getElementById("f-airport").value,
|
||||
direction: document.getElementById("f-direction").value,
|
||||
flight_number: document.getElementById("f-flight").value.trim(),
|
||||
time_from: document.getElementById("f-time-from").value || "",
|
||||
time_to: document.getElementById("f-time-to").value || "",
|
||||
};
|
||||
}
|
||||
|
||||
function buildQuery(extra = {}) {
|
||||
const f = { ...getFilters(), limit: PAGE_SIZE, offset: currentOffset, ...extra };
|
||||
return Object.entries(f)
|
||||
.filter(([, v]) => v !== "" && v !== "all")
|
||||
.map(([k, v]) => `${encodeURIComponent(k)}=${encodeURIComponent(v)}`)
|
||||
.join("&");
|
||||
}
|
||||
|
||||
function applyFilters() {
|
||||
currentOffset = 0;
|
||||
loadData();
|
||||
}
|
||||
|
||||
function resetFilters() {
|
||||
document.getElementById("f-date-from").value = "";
|
||||
document.getElementById("f-date-to").value = "";
|
||||
document.getElementById("f-airport").value = "all";
|
||||
document.getElementById("f-direction").value = "all";
|
||||
document.getElementById("f-flight").value = "";
|
||||
document.getElementById("f-time-from").value = "";
|
||||
document.getElementById("f-time-to").value = "";
|
||||
currentOffset = 0;
|
||||
loadData();
|
||||
}
|
||||
|
||||
// ── pagination ────────────────────────────────────────────────────────────────
|
||||
|
||||
function prevPage() {
|
||||
if (currentOffset <= 0) return;
|
||||
currentOffset = Math.max(0, currentOffset - PAGE_SIZE);
|
||||
loadData();
|
||||
}
|
||||
|
||||
function nextPage() {
|
||||
if (currentOffset + PAGE_SIZE >= currentTotal) return;
|
||||
currentOffset += PAGE_SIZE;
|
||||
loadData();
|
||||
}
|
||||
|
||||
function updatePagination() {
|
||||
const page = Math.floor(currentOffset / PAGE_SIZE) + 1;
|
||||
const pages = Math.max(1, Math.ceil(currentTotal / PAGE_SIZE));
|
||||
document.getElementById("page-info").textContent = `Стр. ${page} / ${pages}`;
|
||||
document.getElementById("total-info").textContent = `Всего: ${currentTotal.toLocaleString("ru")}`;
|
||||
document.getElementById("btn-prev").disabled = currentOffset <= 0;
|
||||
document.getElementById("btn-next").disabled = currentOffset + PAGE_SIZE >= currentTotal;
|
||||
}
|
||||
|
||||
// ── data load ─────────────────────────────────────────────────────────────────
|
||||
|
||||
async function loadData() {
|
||||
setLoading(true);
|
||||
try {
|
||||
const resp = await fetch(`/api/schedule/data?${buildQuery()}`);
|
||||
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
|
||||
const data = await resp.json();
|
||||
currentTotal = data.total || 0;
|
||||
renderTable(data.flights || []);
|
||||
renderCards(data.flights || []);
|
||||
updatePagination();
|
||||
document.getElementById("last-updated").textContent =
|
||||
"Обновлено: " + new Date().toLocaleTimeString("ru");
|
||||
} catch (e) {
|
||||
showError(e.message);
|
||||
} finally {
|
||||
setLoading(false);
|
||||
}
|
||||
}
|
||||
|
||||
// ── render table ──────────────────────────────────────────────────────────────
|
||||
|
||||
function renderTable(flights) {
|
||||
const tbody = document.getElementById("table-body");
|
||||
if (!flights.length) {
|
||||
tbody.innerHTML = `<tr><td colspan="10" class="state-msg">Нет данных по выбранным фильтрам</td></tr>`;
|
||||
return;
|
||||
}
|
||||
|
||||
tbody.innerHTML = flights.map(f => {
|
||||
const dirIcon = f.direction === "departure"
|
||||
? `<span class="dir-icon dir-dep" title="Вылет">↑</span>`
|
||||
: `<span class="dir-icon dir-arr" title="Прилёт">↓</span>`;
|
||||
|
||||
const route = routeStr(f);
|
||||
const sched = fmtTime(f.scheduled_at);
|
||||
const actual = f.actual_at ? fmtTime(f.actual_at) : "—";
|
||||
const delay = delayCell(f.delay_min);
|
||||
const badge = statusBadge(f.status);
|
||||
const dateStr = fmtDateShort(f.scheduled_at);
|
||||
|
||||
return `<tr>
|
||||
<td>${dateStr}</td>
|
||||
<td><strong>${esc(f.flight_number)}</strong></td>
|
||||
<td>${esc(f.airline || "—")}</td>
|
||||
<td>${esc(f.airport)}</td>
|
||||
<td>${dirIcon}</td>
|
||||
<td>${esc(route)}</td>
|
||||
<td>${sched}</td>
|
||||
<td>${actual}</td>
|
||||
<td>${delay}</td>
|
||||
<td>${badge}</td>
|
||||
</tr>`;
|
||||
}).join("");
|
||||
}
|
||||
|
||||
// ── render cards (mobile) ─────────────────────────────────────────────────────
|
||||
|
||||
function renderCards(flights) {
|
||||
const container = document.getElementById("cards-container");
|
||||
if (!flights.length) {
|
||||
container.innerHTML = `<div class="state-msg">Нет данных по выбранным фильтрам</div>`;
|
||||
return;
|
||||
}
|
||||
|
||||
container.innerHTML = flights.map(f => {
|
||||
const dirLabel = f.direction === "departure" ? "↑ Вылет" : "↓ Прилёт";
|
||||
const dirClass = f.direction === "departure" ? "dir-dep" : "dir-arr";
|
||||
const route = routeStr(f);
|
||||
const sched = fmtTime(f.scheduled_at);
|
||||
const actual = f.actual_at ? fmtTime(f.actual_at) : "—";
|
||||
const delay = f.delay_min != null ? `${f.delay_min > 0 ? "+" : ""}${f.delay_min} мин` : "—";
|
||||
const badge = statusBadge(f.status);
|
||||
|
||||
return `<div class="card">
|
||||
<div class="card-header">
|
||||
<div>
|
||||
<div class="card-flight">${esc(f.flight_number)}</div>
|
||||
<div class="card-airline">${esc(f.airline || "—")}</div>
|
||||
</div>
|
||||
${badge}
|
||||
</div>
|
||||
<div class="card-row"><span>Аэропорт</span><span>${esc(f.airport)}</span></div>
|
||||
<div class="card-row"><span>Направление</span><span class="${dirClass}">${dirLabel}</span></div>
|
||||
<div class="card-row"><span>Маршрут</span><span>${esc(route)}</span></div>
|
||||
<div class="card-row"><span>Запланировано</span><span>${sched}</span></div>
|
||||
<div class="card-row"><span>Фактически</span><span>${actual}</span></div>
|
||||
<div class="card-row"><span>Задержка</span><span>${delay}</span></div>
|
||||
</div>`;
|
||||
}).join("");
|
||||
}
|
||||
|
||||
// ── CSV export ────────────────────────────────────────────────────────────────
|
||||
|
||||
function exportCsv() {
|
||||
const qs = buildQuery({ limit: 100000, offset: 0 });
|
||||
window.location.href = `/api/schedule/export?${qs}`;
|
||||
}
|
||||
|
||||
// ── helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
function routeStr(f) {
|
||||
const o = f.origin || "";
|
||||
const d = f.destination || "";
|
||||
if (!o && !d) return "—";
|
||||
if (!o) return `→ ${d}`;
|
||||
if (!d) return `${o} →`;
|
||||
return `${o} → ${d}`;
|
||||
}
|
||||
|
||||
function statusBadge(status) {
|
||||
const map = {
|
||||
scheduled: "badge-scheduled",
|
||||
departed: "badge-departed",
|
||||
arrived: "badge-arrived",
|
||||
delayed: "badge-delayed",
|
||||
cancelled: "badge-cancelled",
|
||||
};
|
||||
const cls = map[status] || "badge-scheduled";
|
||||
const labels = {
|
||||
scheduled: "По расписанию",
|
||||
departed: "Вылетел",
|
||||
arrived: "Прилетел",
|
||||
delayed: "Задержан",
|
||||
cancelled: "Отменён",
|
||||
};
|
||||
return `<span class="badge ${cls}">${labels[status] || status || "—"}</span>`;
|
||||
}
|
||||
|
||||
function delayCell(min) {
|
||||
if (min == null) return "—";
|
||||
if (min > 0) return `<span class="delay-pos">+${min}</span>`;
|
||||
if (min < 0) return `<span class="delay-neg">${min}</span>`;
|
||||
return "0";
|
||||
}
|
||||
|
||||
function fmtDate(d) {
|
||||
return d.toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
function fmtDateShort(iso) {
|
||||
if (!iso) return "—";
|
||||
return iso.slice(0, 10);
|
||||
}
|
||||
|
||||
function fmtTime(iso) {
|
||||
if (!iso) return "—";
|
||||
try {
|
||||
return new Date(iso).toLocaleTimeString("ru", { hour: "2-digit", minute: "2-digit", timeZone: "Europe/Moscow" });
|
||||
} catch { return iso.slice(11, 16); }
|
||||
}
|
||||
|
||||
function esc(s) {
|
||||
if (!s) return "—";
|
||||
return String(s)
|
||||
.replace(/&/g, "&")
|
||||
.replace(/</g, "<")
|
||||
.replace(/>/g, ">");
|
||||
}
|
||||
|
||||
function setLoading(on) {
|
||||
if (on) {
|
||||
document.getElementById("table-body").innerHTML =
|
||||
`<tr><td colspan="10" class="state-msg">Загрузка…</td></tr>`;
|
||||
document.getElementById("cards-container").innerHTML =
|
||||
`<div class="state-msg">Загрузка…</div>`;
|
||||
}
|
||||
}
|
||||
|
||||
function showError(msg) {
|
||||
document.getElementById("table-body").innerHTML =
|
||||
`<tr><td colspan="10" class="state-msg" style="color:#f85149">Ошибка: ${esc(msg)}</td></tr>`;
|
||||
document.getElementById("cards-container").innerHTML =
|
||||
`<div class="state-msg" style="color:#f85149">Ошибка: ${esc(msg)}</div>`;
|
||||
}
|
||||
13
tasks/flightradar24/ingest/schedule/Dockerfile
Normal file
13
tasks/flightradar24/ingest/schedule/Dockerfile
Normal file
@@ -0,0 +1,13 @@
|
||||
FROM python:3.11-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY *.py ./
|
||||
|
||||
HEALTHCHECK --interval=60s --timeout=5s --start-period=15s \
|
||||
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health', timeout=3)" || exit 1
|
||||
|
||||
CMD ["python", "main.py"]
|
||||
121
tasks/flightradar24/ingest/schedule/backfill.py
Normal file
121
tasks/flightradar24/ingest/schedule/backfill.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""
|
||||
Backfill CLI — loads historical schedule data for a date range.
|
||||
Saves progress to fr24_ext.load_state so it can resume after interruption.
|
||||
|
||||
Usage:
|
||||
python backfill.py --start-date 2026-04-01 --end-date 2026-04-19
|
||||
python backfill.py --start-date 2026-04-01 --end-date 2026-04-19 --skip-opensky
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from datetime import date, timedelta
|
||||
|
||||
import psycopg2
|
||||
|
||||
from config import config
|
||||
from yandex_worker import fetch_day as yandex_fetch_day
|
||||
from opensky_worker import enrich_day as opensky_enrich_day
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
STATE_KEY = "backfill_last_date"
|
||||
|
||||
|
||||
def load_state(conn, key: str):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT state_value FROM fr24_ext.load_state WHERE state_key = %s",
|
||||
(key,),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
|
||||
def save_state(conn, key: str, value: dict):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO fr24_ext.load_state (state_key, state_value)
|
||||
VALUES (%s, %s::jsonb)
|
||||
ON CONFLICT (state_key) DO UPDATE
|
||||
SET state_value = EXCLUDED.state_value,
|
||||
updated_at = now()
|
||||
""",
|
||||
(key, json.dumps(value)),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Backfill fr24_ext.schedule")
|
||||
parser.add_argument("--start-date", required=True, help="YYYY-MM-DD")
|
||||
parser.add_argument("--end-date", required=True, help="YYYY-MM-DD")
|
||||
parser.add_argument("--skip-opensky", action="store_true",
|
||||
help="Skip OpenSky enrichment (faster, no icao24)")
|
||||
parser.add_argument("--resume", action="store_true",
|
||||
help="Resume from last saved state (ignores --start-date if state exists)")
|
||||
args = parser.parse_args()
|
||||
|
||||
start = date.fromisoformat(args.start_date)
|
||||
end = date.fromisoformat(args.end_date)
|
||||
|
||||
if start > end:
|
||||
log.error("start-date must be <= end-date")
|
||||
sys.exit(1)
|
||||
|
||||
conn = psycopg2.connect(config.DB_DSN)
|
||||
|
||||
# Resume from saved state if requested
|
||||
if args.resume:
|
||||
state = load_state(conn, STATE_KEY)
|
||||
if state and state.get("last_date"):
|
||||
last = date.fromisoformat(state["last_date"])
|
||||
resume_from = last + timedelta(days=1)
|
||||
if resume_from > start:
|
||||
log.info("Resuming from %s (last completed: %s)", resume_from, last)
|
||||
start = resume_from
|
||||
|
||||
current = start
|
||||
total_flights = 0
|
||||
total_enriched = 0
|
||||
|
||||
log.info("Backfill: %s → %s (%d days)", start, end, (end - start).days + 1)
|
||||
|
||||
while current <= end:
|
||||
log.info("── Processing %s ──", current)
|
||||
|
||||
try:
|
||||
yandex_count = yandex_fetch_day(current, conn)
|
||||
total_flights += yandex_count
|
||||
log.info("Yandex: %d flights", yandex_count)
|
||||
|
||||
if not args.skip_opensky:
|
||||
opensky_count = opensky_enrich_day(current, conn)
|
||||
total_enriched += opensky_count
|
||||
log.info("OpenSky: %d enriched", opensky_count)
|
||||
|
||||
save_state(conn, STATE_KEY, {"last_date": current.isoformat()})
|
||||
|
||||
except KeyboardInterrupt:
|
||||
log.info("Interrupted. Progress saved up to %s", current - timedelta(days=1))
|
||||
break
|
||||
except Exception as e:
|
||||
log.error("Failed on %s: %s — stopping", current, e)
|
||||
break
|
||||
|
||||
current += timedelta(days=1)
|
||||
|
||||
conn.close()
|
||||
log.info("Backfill done. Flights: %d, Enriched: %d", total_flights, total_enriched)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [backfill] %(levelname)s %(message)s",
|
||||
datefmt="%Y-%m-%dT%H:%M:%S",
|
||||
handlers=[logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
main()
|
||||
48
tasks/flightradar24/ingest/schedule/config.py
Normal file
48
tasks/flightradar24/ingest/schedule/config.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
# Database
|
||||
DB_HOST: str = os.getenv("POSTGRES_HOST", "fr24-postgres")
|
||||
DB_PORT: int = int(os.getenv("POSTGRES_PORT", "5432"))
|
||||
DB_NAME: str = os.getenv("POSTGRES_DB", "fr24")
|
||||
DB_USER: str = os.getenv("POSTGRES_USER", "fr24")
|
||||
DB_PASSWORD: str = os.getenv("POSTGRES_PASSWORD", "change-me")
|
||||
|
||||
# API keys
|
||||
YANDEX_RASP_API_KEY: str = os.getenv("YANDEX_RASP_API_KEY", "")
|
||||
OPENSKY_USERNAME: str = os.getenv("OPENSKY_USERNAME", "")
|
||||
OPENSKY_PASSWORD: str = os.getenv("OPENSKY_PASSWORD", "")
|
||||
|
||||
# Airports: IATA → metadata
|
||||
AIRPORTS: Dict = field(default_factory=lambda: {
|
||||
"SVO": {"iata": "SVO", "icao": "UUEE", "yandex_code": "s9600213", "name": "Шереметьево"},
|
||||
"DME": {"iata": "DME", "icao": "UUDD", "yandex_code": "s9600366", "name": "Домодедово"},
|
||||
"VKO": {"iata": "VKO", "icao": "UUWW", "yandex_code": "s9600215", "name": "Внуково"},
|
||||
"ZIA": {"iata": "ZIA", "icao": "UUBW", "yandex_code": "s9881291", "name": "Жуковский"},
|
||||
})
|
||||
|
||||
# Rate limits (seconds between requests)
|
||||
YANDEX_RATE_LIMIT_SEC: float = float(os.getenv("YANDEX_RATE_LIMIT_SEC", "1.0"))
|
||||
OPENSKY_RATE_LIMIT_SEC: float = float(os.getenv("OPENSKY_RATE_LIMIT_SEC", "30.0"))
|
||||
|
||||
# Retention
|
||||
RETENTION_DAYS: int = int(os.getenv("SCHEDULE_RETENTION_DAYS", "1095"))
|
||||
|
||||
# Cron schedule (UTC)
|
||||
DAILY_RUN_HOUR: int = 2
|
||||
DAILY_RUN_MINUTE: int = 0
|
||||
|
||||
@property
|
||||
def DB_DSN(self) -> str:
|
||||
return (
|
||||
f"host={self.DB_HOST} port={self.DB_PORT} "
|
||||
f"dbname={self.DB_NAME} user={self.DB_USER} "
|
||||
f"password={self.DB_PASSWORD}"
|
||||
)
|
||||
|
||||
|
||||
config = Config()
|
||||
111
tasks/flightradar24/ingest/schedule/main.py
Normal file
111
tasks/flightradar24/ingest/schedule/main.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""
|
||||
fr24-schedule main entry point.
|
||||
- APScheduler cron: daily_job() at 02:00 UTC
|
||||
- Flask healthcheck on port 8000
|
||||
- Cleanup job at 03:00 UTC (retention)
|
||||
"""
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from datetime import date, timedelta, datetime, timezone
|
||||
|
||||
import psycopg2
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from flask import Flask, jsonify
|
||||
|
||||
from config import config
|
||||
from yandex_worker import fetch_day as yandex_fetch_day
|
||||
from opensky_worker import enrich_day as opensky_enrich_day
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [schedule] %(levelname)s %(message)s",
|
||||
datefmt="%Y-%m-%dT%H:%M:%S",
|
||||
handlers=[logging.StreamHandler(sys.stdout)],
|
||||
)
|
||||
log = logging.getLogger("schedule")
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
_last_run: dict = {"at": None, "status": "never", "flights": 0, "enriched": 0}
|
||||
_conn = None
|
||||
|
||||
|
||||
def get_conn():
|
||||
global _conn
|
||||
if _conn is None or _conn.closed:
|
||||
_conn = psycopg2.connect(config.DB_DSN)
|
||||
log.info("DB connection established")
|
||||
return _conn
|
||||
|
||||
|
||||
def daily_job():
|
||||
"""T-1: load yesterday's schedule from Yandex then enrich with OpenSky."""
|
||||
target = date.today() - timedelta(days=1)
|
||||
log.info("daily_job: starting for %s", target)
|
||||
_last_run["at"] = datetime.now(timezone.utc).isoformat()
|
||||
_last_run["status"] = "running"
|
||||
|
||||
try:
|
||||
conn = get_conn()
|
||||
flights = yandex_fetch_day(target, conn)
|
||||
enriched = opensky_enrich_day(target, conn)
|
||||
_last_run.update(status="ok", flights=flights, enriched=enriched)
|
||||
log.info("daily_job: done — %d flights, %d enriched", flights, enriched)
|
||||
except Exception as e:
|
||||
_last_run["status"] = f"error: {e}"
|
||||
log.error("daily_job failed: %s", e)
|
||||
|
||||
|
||||
def cleanup_job():
|
||||
"""Delete records older than RETENTION_DAYS."""
|
||||
try:
|
||||
conn = get_conn()
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"DELETE FROM fr24_ext.schedule WHERE flight_date < CURRENT_DATE - %s::int * INTERVAL '1 day'",
|
||||
(config.RETENTION_DAYS,),
|
||||
)
|
||||
deleted = cur.rowcount
|
||||
conn.commit()
|
||||
log.info("cleanup_job: deleted %d old records (retention=%d days)", deleted, config.RETENTION_DAYS)
|
||||
except Exception as e:
|
||||
log.error("cleanup_job failed: %s", e)
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
def health():
|
||||
try:
|
||||
get_conn().cursor().execute("SELECT 1")
|
||||
db_ok = True
|
||||
except Exception:
|
||||
db_ok = False
|
||||
return jsonify({
|
||||
"status": "ok" if db_ok else "degraded",
|
||||
"db": "ok" if db_ok else "error",
|
||||
"last_run": _last_run,
|
||||
}), 200 if db_ok else 503
|
||||
|
||||
|
||||
def wait_for_db(max_attempts: int = 30):
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
get_conn()
|
||||
return
|
||||
except psycopg2.OperationalError as e:
|
||||
log.warning("DB not ready (%d/%d): %s", attempt, max_attempts, e)
|
||||
time.sleep(3)
|
||||
log.error("Could not connect to DB after %d attempts", max_attempts)
|
||||
raise SystemExit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
wait_for_db()
|
||||
|
||||
scheduler = BackgroundScheduler(timezone="UTC")
|
||||
scheduler.add_job(daily_job, "cron", hour=config.DAILY_RUN_HOUR, minute=config.DAILY_RUN_MINUTE)
|
||||
scheduler.add_job(cleanup_job, "cron", hour=3, minute=0)
|
||||
scheduler.start()
|
||||
log.info("Scheduler started — daily job at %02d:%02d UTC", config.DAILY_RUN_HOUR, config.DAILY_RUN_MINUTE)
|
||||
|
||||
app.run(host="0.0.0.0", port=8000, debug=False)
|
||||
141
tasks/flightradar24/ingest/schedule/opensky_worker.py
Normal file
141
tasks/flightradar24/ingest/schedule/opensky_worker.py
Normal file
@@ -0,0 +1,141 @@
|
||||
"""
|
||||
OpenSky Network API worker — enriches fr24_ext.schedule with icao24 + actual times.
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
from datetime import date, datetime, timezone
|
||||
from typing import Dict, List, Optional
|
||||
from functools import wraps
|
||||
|
||||
import requests
|
||||
import psycopg2
|
||||
|
||||
from config import config
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
OPENSKY_BASE = "https://opensky-network.org/api/flights"
|
||||
|
||||
|
||||
# ── retry decorator ───────────────────────────────────────────────────────────
|
||||
|
||||
def retry(max_retries: int = 3, base_delay: float = 10.0):
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except requests.RequestException as e:
|
||||
if attempt < max_retries - 1:
|
||||
wait = base_delay * (2 ** attempt)
|
||||
log.warning("Retry %d/%d after %.0fs: %s", attempt + 1, max_retries, wait, e)
|
||||
time.sleep(wait)
|
||||
else:
|
||||
raise
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
# ── API fetch ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@retry(max_retries=3, base_delay=10.0)
|
||||
def fetch_opensky_flights(icao: str, begin_ts: int, end_ts: int, direction: str) -> List[Dict]:
|
||||
"""
|
||||
Fetch arrivals or departures from OpenSky for one airport/day.
|
||||
direction: 'arrival' | 'departure'
|
||||
"""
|
||||
url = f"{OPENSKY_BASE}/{direction}"
|
||||
params = {"airport": icao, "begin": begin_ts, "end": end_ts}
|
||||
|
||||
auth = None
|
||||
if config.OPENSKY_USERNAME and config.OPENSKY_PASSWORD:
|
||||
auth = (config.OPENSKY_USERNAME, config.OPENSKY_PASSWORD)
|
||||
|
||||
resp = requests.get(url, params=params, auth=auth, timeout=60)
|
||||
|
||||
# 404 means no data for this period — not an error
|
||||
if resp.status_code == 404:
|
||||
return []
|
||||
|
||||
resp.raise_for_status()
|
||||
return resp.json() or []
|
||||
|
||||
|
||||
# ── DB enrichment ─────────────────────────────────────────────────────────────
|
||||
|
||||
def enrich_flights(conn, opensky_flights: List[Dict], airport_iata: str, direction: str) -> int:
|
||||
"""
|
||||
Update icao24 + actual_at on existing schedule rows matched by callsign.
|
||||
Match window: ±3 hours around the OpenSky actual time.
|
||||
"""
|
||||
if not opensky_flights:
|
||||
return 0
|
||||
|
||||
enriched = 0
|
||||
with conn.cursor() as cur:
|
||||
for flight in opensky_flights:
|
||||
icao24 = (flight.get("icao24") or "").strip().lower()
|
||||
callsign = (flight.get("callsign") or "").strip()
|
||||
|
||||
if not icao24 or not callsign:
|
||||
continue
|
||||
|
||||
# actual time: lastSeen for arrivals, firstSeen for departures
|
||||
actual_ts = (
|
||||
flight.get("lastSeen") if direction == "arrival"
|
||||
else flight.get("firstSeen")
|
||||
)
|
||||
if not actual_ts:
|
||||
continue
|
||||
|
||||
actual_at = datetime.fromtimestamp(actual_ts, tz=timezone.utc)
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE fr24_ext.schedule
|
||||
SET
|
||||
icao24 = %s,
|
||||
actual_at = %s,
|
||||
source = CASE WHEN source = 'yandex' THEN 'merged' ELSE source END
|
||||
WHERE airport_iata = %s
|
||||
AND direction = %s
|
||||
AND flight_number = %s
|
||||
AND scheduled_at BETWEEN %s - INTERVAL '3 hours'
|
||||
AND %s + INTERVAL '3 hours'
|
||||
""",
|
||||
(icao24, actual_at, airport_iata, direction, callsign, actual_at, actual_at),
|
||||
)
|
||||
enriched += cur.rowcount
|
||||
|
||||
return enriched
|
||||
|
||||
|
||||
# ── main entry ────────────────────────────────────────────────────────────────
|
||||
|
||||
def enrich_day(target_date: date, conn) -> int:
|
||||
"""Enrich all airports for one day. Returns total rows updated."""
|
||||
# Unix timestamps for start/end of the day (UTC)
|
||||
day_start = int(datetime(target_date.year, target_date.month, target_date.day,
|
||||
tzinfo=timezone.utc).timestamp())
|
||||
day_end = day_start + 86400
|
||||
|
||||
total = 0
|
||||
|
||||
for airport_iata, airport_info in config.AIRPORTS.items():
|
||||
icao = airport_info["icao"]
|
||||
log.info("OpenSky: enriching %s (%s) for %s", airport_iata, icao, target_date)
|
||||
|
||||
for direction in ("arrival", "departure"):
|
||||
try:
|
||||
flights = fetch_opensky_flights(icao, day_start, day_end, direction)
|
||||
count = enrich_flights(conn, flights, airport_iata, direction)
|
||||
total += count
|
||||
log.info("OpenSky: %s %s → %d rows enriched", airport_iata, direction, count)
|
||||
except Exception as e:
|
||||
log.error("OpenSky: failed %s %s: %s", airport_iata, direction, e)
|
||||
|
||||
time.sleep(config.OPENSKY_RATE_LIMIT_SEC)
|
||||
|
||||
conn.commit()
|
||||
return total
|
||||
4
tasks/flightradar24/ingest/schedule/requirements.txt
Normal file
4
tasks/flightradar24/ingest/schedule/requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
requests==2.31.0
|
||||
psycopg2-binary==2.9.9
|
||||
APScheduler==3.10.4
|
||||
Flask==3.0.3
|
||||
205
tasks/flightradar24/ingest/schedule/yandex_worker.py
Normal file
205
tasks/flightradar24/ingest/schedule/yandex_worker.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""
|
||||
Yandex.Rasp API worker — loads airport schedule into fr24_ext.schedule.
|
||||
Makes two requests per airport per day: event=departure and event=arrival.
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
from datetime import date
|
||||
from typing import Dict, List, Optional
|
||||
from functools import wraps
|
||||
|
||||
import requests
|
||||
import psycopg2
|
||||
|
||||
from config import config
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
YANDEX_URL = "https://api.rasp.yandex.net/v3.0/schedule/"
|
||||
|
||||
|
||||
# ── retry decorator ───────────────────────────────────────────────────────────
|
||||
|
||||
def retry(max_retries: int = 3, base_delay: float = 5.0):
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except requests.RequestException as e:
|
||||
if attempt < max_retries - 1:
|
||||
wait = base_delay * (2 ** attempt)
|
||||
log.warning("Retry %d/%d after %.0fs: %s", attempt + 1, max_retries, wait, e)
|
||||
time.sleep(wait)
|
||||
else:
|
||||
raise
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
# ── API fetch ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@retry(max_retries=3, base_delay=5.0)
|
||||
def _fetch_page(yandex_code: str, target_date: date, event: str, offset: int = 0) -> dict:
|
||||
params = {
|
||||
"apikey": config.YANDEX_RASP_API_KEY,
|
||||
"station": yandex_code,
|
||||
"date": target_date.isoformat(),
|
||||
"transport_types": "plane",
|
||||
"event": event,
|
||||
"offset": offset,
|
||||
"limit": 100,
|
||||
}
|
||||
resp = requests.get(YANDEX_URL, params=params, timeout=30)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
|
||||
def fetch_airport_schedule(yandex_code: str, target_date: date, direction: str) -> List[Dict]:
|
||||
"""
|
||||
Fetches all flights for one airport/direction with pagination.
|
||||
direction: 'departure' | 'arrival'
|
||||
"""
|
||||
flights = []
|
||||
offset = 0
|
||||
|
||||
while True:
|
||||
data = _fetch_page(yandex_code, target_date, event=direction, offset=offset)
|
||||
items = data.get("schedule", [])
|
||||
pagination = data.get("pagination", {})
|
||||
|
||||
for item in items:
|
||||
flight = _parse_item(item, direction)
|
||||
if flight:
|
||||
flights.append(flight)
|
||||
|
||||
total = pagination.get("total", 0)
|
||||
offset += len(items)
|
||||
|
||||
if offset >= total or not items:
|
||||
break
|
||||
|
||||
time.sleep(config.YANDEX_RATE_LIMIT_SEC)
|
||||
|
||||
return flights
|
||||
|
||||
|
||||
def _parse_item(item: Dict, direction: str) -> Optional[Dict]:
|
||||
thread = item.get("thread", {})
|
||||
flight_number = thread.get("number", "").strip()
|
||||
if not flight_number:
|
||||
return None
|
||||
|
||||
carrier = thread.get("carrier", {})
|
||||
|
||||
# Scheduled time: departure event → use 'departure' field; arrival → 'arrival'
|
||||
scheduled_at = item.get("departure") if direction == "departure" else item.get("arrival")
|
||||
if not scheduled_at:
|
||||
# fallback
|
||||
scheduled_at = item.get("departure") or item.get("arrival")
|
||||
if not scheduled_at:
|
||||
return None
|
||||
|
||||
# Route: from/to station objects
|
||||
from_station = item.get("from", {}) or {}
|
||||
to_station = item.get("to", {}) or {}
|
||||
origin_iata = _station_iata(from_station)
|
||||
destination_iata = _station_iata(to_station)
|
||||
|
||||
return {
|
||||
"flight_number": flight_number,
|
||||
"airline_iata": carrier.get("code"),
|
||||
"airline_name": carrier.get("title"),
|
||||
"origin_iata": origin_iata,
|
||||
"destination_iata": destination_iata,
|
||||
"aircraft_type": thread.get("vehicle"),
|
||||
"scheduled_at": scheduled_at,
|
||||
"direction": direction,
|
||||
"status": "scheduled",
|
||||
"source": "yandex",
|
||||
}
|
||||
|
||||
|
||||
def _station_iata(station: Dict) -> Optional[str]:
|
||||
"""Extract IATA code from station dict (codes list or direct field)."""
|
||||
codes = station.get("codes", {})
|
||||
if isinstance(codes, dict):
|
||||
iata = codes.get("iata")
|
||||
if iata:
|
||||
return iata[:3].upper()
|
||||
# fallback: station code field
|
||||
code = station.get("code", "")
|
||||
if code and not code.startswith("s"): # yandex internal codes start with 's'
|
||||
return code[:3].upper()
|
||||
return None
|
||||
|
||||
|
||||
# ── DB upsert ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def upsert_flights(conn, flights: List[Dict], airport_iata: str, flight_date: date) -> int:
|
||||
if not flights:
|
||||
return 0
|
||||
|
||||
with conn.cursor() as cur:
|
||||
for flight in flights:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO fr24_ext.schedule
|
||||
(flight_date, airport_iata, direction, flight_number,
|
||||
airline_iata, airline_name, origin_iata, destination_iata,
|
||||
aircraft_type, scheduled_at, status, source)
|
||||
VALUES
|
||||
(%(flight_date)s, %(airport_iata)s, %(direction)s, %(flight_number)s,
|
||||
%(airline_iata)s, %(airline_name)s, %(origin_iata)s, %(destination_iata)s,
|
||||
%(aircraft_type)s, %(scheduled_at)s, %(status)s, %(source)s)
|
||||
ON CONFLICT (flight_number, airport_iata, scheduled_at, direction)
|
||||
DO UPDATE SET
|
||||
airline_name = EXCLUDED.airline_name,
|
||||
origin_iata = COALESCE(EXCLUDED.origin_iata, fr24_ext.schedule.origin_iata),
|
||||
destination_iata = COALESCE(EXCLUDED.destination_iata, fr24_ext.schedule.destination_iata),
|
||||
aircraft_type = COALESCE(EXCLUDED.aircraft_type, fr24_ext.schedule.aircraft_type),
|
||||
status = EXCLUDED.status,
|
||||
fetched_at = now()
|
||||
""",
|
||||
{
|
||||
"flight_date": flight_date,
|
||||
"airport_iata": airport_iata,
|
||||
"direction": flight["direction"],
|
||||
"flight_number": flight["flight_number"],
|
||||
"airline_iata": flight.get("airline_iata"),
|
||||
"airline_name": flight.get("airline_name"),
|
||||
"origin_iata": flight.get("origin_iata"),
|
||||
"destination_iata": flight.get("destination_iata"),
|
||||
"aircraft_type": flight.get("aircraft_type"),
|
||||
"scheduled_at": flight["scheduled_at"],
|
||||
"status": flight.get("status", "scheduled"),
|
||||
"source": flight["source"],
|
||||
},
|
||||
)
|
||||
return len(flights)
|
||||
|
||||
|
||||
# ── main entry ────────────────────────────────────────────────────────────────
|
||||
|
||||
def fetch_day(target_date: date, conn) -> int:
|
||||
"""Load schedule for all airports for one day. Returns total flights upserted."""
|
||||
total = 0
|
||||
|
||||
for airport_iata, airport_info in config.AIRPORTS.items():
|
||||
yandex_code = airport_info["yandex_code"]
|
||||
log.info("Yandex: fetching %s (%s) for %s", airport_iata, yandex_code, target_date)
|
||||
|
||||
for direction in ("departure", "arrival"):
|
||||
try:
|
||||
flights = fetch_airport_schedule(yandex_code, target_date, direction)
|
||||
count = upsert_flights(conn, flights, airport_iata, target_date)
|
||||
total += count
|
||||
log.info("Yandex: %s %s → %d flights upserted", airport_iata, direction, count)
|
||||
except Exception as e:
|
||||
log.error("Yandex: failed %s %s: %s", airport_iata, direction, e)
|
||||
|
||||
time.sleep(config.YANDEX_RATE_LIMIT_SEC)
|
||||
|
||||
conn.commit()
|
||||
return total
|
||||
Reference in New Issue
Block a user