import subprocess, json, time, threading from http.server import HTTPServer, BaseHTTPRequestHandler import pyModeS as pms aircraft, cpr = {}, {} HTML = """ ADS-B Радар
""" def decode(): proc = subprocess.Popen(["rtl_adsb.exe"], stdout=subprocess.PIPE, text=True) for line in proc.stdout: line = line.strip() if not (line.startswith('*') and line.endswith(';')): continue msg = line[1:-1] if len(msg) not in (14, 28): continue try: if pms.df(msg) != 17: continue icao = pms.icao(msg) tc = pms.typecode(msg) if icao not in aircraft: aircraft[icao] = {'icao': icao} aircraft[icao]['seen'] = time.time() aircraft[icao]['msgs'] = aircraft[icao].get('msgs', 0) + 1 if 1 <= tc <= 4: cs = pms.adsb.callsign(msg).strip() if cs: aircraft[icao]['flight'] = cs elif 9 <= tc <= 18: alt = pms.adsb.altitude(msg) if alt: aircraft[icao]['altitude'] = alt oe = pms.adsb.oe_flag(msg) cpr.setdefault(icao, {})[oe] = (msg, time.time()) try: pos = pms.adsb.position_with_ref(msg, 55.75, 37.62) if pos: aircraft[icao]['lat'], aircraft[icao]['lon'] = pos except: pass if 0 in cpr[icao] and 1 in cpr[icao]: m0,t0 = cpr[icao][0]; m1,t1 = cpr[icao][1] if abs(t0-t1) < 10: try: pos = pms.adsb.position(m0, m1, t0, t1) if pos: aircraft[icao]['lat'], aircraft[icao]['lon'] = pos except: pass elif tc == 19: v = pms.adsb.velocity(msg) if v: aircraft[icao]['speed'], aircraft[icao]['track'] = v[0], v[1] except: pass class H(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/aircraft.json': now = time.time() active = [v for v in aircraft.values() if now - v.get('seen',0) < 120] body = json.dumps({'now': now, 'aircraft': active}).encode() self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(body) elif self.path == '/': body = HTML.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html; charset=utf-8') self.end_headers() self.wfile.write(body) else: self.send_response(404) self.end_headers() def log_message(self, *a): pass threading.Thread(target=decode, daemon=True).start() print("Запущен!") print(" Карта: http://localhost:8080/") print(" API: http://localhost:8080/aircraft.json") HTTPServer(('0.0.0.0', 8080), H).serve_forever()