362 lines
13 KiB
Python
362 lines
13 KiB
Python
"""AppDaemon app: Device availability calculator for Home Assistant.
|
|
|
|
Calculates uptime metrics for tracked devices and creates sensor.avail_* entities.
|
|
"""
|
|
|
|
import appdaemon.plugins.hass.hassapi as hass
|
|
import requests
|
|
from datetime import datetime, timezone, timedelta
|
|
from availability_utils import (
|
|
is_excluded, sanitize_entity_id, sanitize_area_name,
|
|
get_color, calc_trend, format_downtime,
|
|
parse_datetime, period_timedelta, chunk_list,
|
|
compute_availability, compute_sparkline,
|
|
TRACKED_DOMAINS,
|
|
)
|
|
|
|
BATCH_SIZE = 20
|
|
BATCH_DELAY = 1 # seconds between batch requests
|
|
HA_URL = "http://supervisor/core/api"
|
|
|
|
|
|
class Availability(hass.Hass):
|
|
def initialize(self):
|
|
# Ensure websocket-client is available for registry queries
|
|
try:
|
|
import websocket
|
|
except ImportError:
|
|
import subprocess
|
|
subprocess.check_call(["pip", "install", "websocket-client", "-q"])
|
|
self.log("Installed websocket-client")
|
|
|
|
self._token = None
|
|
self._entities_cache = []
|
|
self._areas_cache = {}
|
|
self._current_period = "7d"
|
|
self._results_cache = {"24h": {}, "7d": {}, "30d": {}}
|
|
|
|
# Cold-start: full recalc after 30s
|
|
self.run_in(self._cold_start, 30)
|
|
|
|
# Periodic schedules
|
|
self.run_every(self._calc_24h, "now+60", 5 * 60)
|
|
self.run_every(self._calc_7d, "now+90", 15 * 60)
|
|
self.run_every(self._calc_30d, "now+120", 2 * 3600)
|
|
|
|
# Listen for period selector change
|
|
self.listen_state(self._period_changed, "input_select.avail_period")
|
|
|
|
self.log("Availability app initialized")
|
|
|
|
def _get_token(self):
|
|
if self._token is None:
|
|
import os
|
|
self._token = os.environ.get("SUPERVISOR_TOKEN", "")
|
|
if not self._token:
|
|
self._token = self.args.get("token", "")
|
|
return self._token
|
|
|
|
def _api_headers(self):
|
|
return {
|
|
"Authorization": f"Bearer {self._get_token()}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
def _api_get(self, path):
|
|
url = f"{HA_URL}{path}"
|
|
try:
|
|
resp = requests.get(url, headers=self._api_headers(), timeout=30)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as e:
|
|
self.log(f"API error {path}: {e}", level="WARNING")
|
|
return None
|
|
|
|
# ── Entity Discovery ──
|
|
|
|
def _fetch_entities(self):
|
|
"""Get all entities from HA and filter by domain + exclusions."""
|
|
data = self._api_get("/states")
|
|
if not data:
|
|
self.log("Failed to fetch entities", level="ERROR")
|
|
return []
|
|
|
|
entities = []
|
|
for e in data:
|
|
eid = e.get("entity_id", "")
|
|
domain = eid.split(".")[0] if "." in eid else ""
|
|
if domain not in TRACKED_DOMAINS:
|
|
continue
|
|
if is_excluded(eid):
|
|
continue
|
|
entities.append({
|
|
"entity_id": eid,
|
|
"friendly_name": e.get("attributes", {}).get("friendly_name", eid),
|
|
"domain": domain,
|
|
})
|
|
self._entities_cache = entities
|
|
self.log(f"Found {len(entities)} tracked entities")
|
|
return entities
|
|
|
|
def _fetch_areas(self):
|
|
"""Fetch area registry via WebSocket and build entity_id -> area mapping.
|
|
|
|
HA registry APIs are only available via WebSocket, not REST.
|
|
We use a synchronous websocket-client to fetch the data.
|
|
"""
|
|
try:
|
|
import websocket
|
|
import json as _json
|
|
except ImportError:
|
|
self.log("websocket-client not available, areas will be unavailable", level="WARNING")
|
|
self._areas_cache = {}
|
|
return {}
|
|
|
|
token = self._get_token()
|
|
if not token:
|
|
self._areas_cache = {}
|
|
return {}
|
|
|
|
# Connect to HA WebSocket
|
|
# From addon, use homeassistant:8123 (not supervisor proxy)
|
|
ws_url = "ws://homeassistant:8123/api/websocket"
|
|
try:
|
|
ws = websocket.create_connection(ws_url, timeout=10,
|
|
header=["Authorization: Bearer " + token])
|
|
except Exception as e:
|
|
self.log(f"WebSocket connect failed: {e}", level="WARNING")
|
|
self._areas_cache = {}
|
|
return {}
|
|
|
|
def _ws_call(msg_type):
|
|
"""Send a WebSocket command and return the result."""
|
|
# Auth
|
|
result = _json.loads(ws.recv())
|
|
if result.get("type") == "auth_required":
|
|
ws.send(_json.dumps({"type": "auth", "access_token": token}))
|
|
result = _json.loads(ws.recv())
|
|
if result.get("type") != "auth_ok":
|
|
return None
|
|
# Send command
|
|
ws.send(_json.dumps({"id": msg_type.__hash__() % 10000, "type": msg_type}))
|
|
result = _json.loads(ws.recv())
|
|
return result.get("result") if result.get("success") else None
|
|
|
|
# Fetch area registry
|
|
areas_data = _ws_call("config/area_registry/list")
|
|
if not areas_data:
|
|
self.log("Failed to fetch area registry via WebSocket", level="WARNING")
|
|
ws.close()
|
|
self._areas_cache = {}
|
|
return {}
|
|
|
|
# Build area_id -> area_name map
|
|
area_map = {}
|
|
for area in areas_data:
|
|
area_map[area["area_id"]] = area.get("name", "Без комнаты")
|
|
|
|
# Fetch device registry
|
|
dev_data = _ws_call("config/device_registry/list")
|
|
dev_area = {}
|
|
if dev_data and isinstance(dev_data, dict):
|
|
dev_data = dev_data.get("devices", dev_data) if isinstance(dev_data, dict) else dev_data
|
|
if dev_data and isinstance(dev_data, list):
|
|
for dev in dev_data:
|
|
dev_id = dev.get("id")
|
|
a_id = dev.get("area_id")
|
|
if dev_id and a_id and a_id in area_map:
|
|
dev_area[dev_id] = area_map[a_id]
|
|
|
|
# Fetch entity registry
|
|
ent_data = _ws_call("config/entity_registry/list")
|
|
eid_to_area = {}
|
|
if ent_data and isinstance(ent_data, dict):
|
|
ent_data = ent_data.get("entities", ent_data)
|
|
if ent_data and isinstance(ent_data, list):
|
|
for entry in ent_data:
|
|
eid = entry.get("entity_id", "")
|
|
a_id = entry.get("area_id")
|
|
dev_id = entry.get("device_id")
|
|
if a_id and a_id in area_map:
|
|
eid_to_area[eid] = area_map[a_id]
|
|
elif dev_id and dev_id in dev_area:
|
|
eid_to_area[eid] = dev_area[dev_id]
|
|
else:
|
|
eid_to_area[eid] = "Без комнаты"
|
|
|
|
ws.close()
|
|
self._areas_cache = eid_to_area
|
|
self.log(f"Fetched {len(area_map)} areas, {len(dev_area)} device-areas, {len(eid_to_area)} entity-areas")
|
|
return eid_to_area
|
|
|
|
# ── History Fetch ──
|
|
|
|
def _fetch_history(self, entity_ids: list, period_start: datetime, period_end: datetime):
|
|
"""Fetch history from HA API in batches."""
|
|
start_str = period_start.isoformat()
|
|
all_history = {}
|
|
|
|
chunks = chunk_list(entity_ids, BATCH_SIZE)
|
|
total_chunks = len(chunks)
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
ids_str = ",".join(chunk)
|
|
path = f"/history/period/{start_str}?filter_entity_id={ids_str}&minimal_response&no_attributes"
|
|
data = self._api_get(path)
|
|
|
|
if data and isinstance(data, list):
|
|
for entity_history in data:
|
|
if entity_history and isinstance(entity_history, list) and len(entity_history) > 0:
|
|
eid = entity_history[0].get("entity_id", chunk[0] if chunk else "")
|
|
all_history[eid] = entity_history
|
|
|
|
# Update progress
|
|
progress = f"{i + 1}/{total_chunks}"
|
|
self._set_progress(progress)
|
|
|
|
if i < total_chunks - 1:
|
|
import time; time.sleep(BATCH_DELAY)
|
|
|
|
self._set_progress("idle")
|
|
return all_history
|
|
|
|
# ── Calculation ──
|
|
|
|
def _calc_period(self, period: str):
|
|
"""Calculate availability for all devices in given period."""
|
|
self.log(f"Calculating availability for period {period}")
|
|
|
|
now = datetime.now(timezone.utc)
|
|
td = period_timedelta(period)
|
|
period_start = now - td
|
|
period_end = now
|
|
|
|
# Previous period for trend
|
|
prev_start = period_start - td
|
|
prev_end = period_start
|
|
|
|
entities = self._fetch_entities()
|
|
if not entities:
|
|
return
|
|
|
|
areas = self._fetch_areas()
|
|
entity_ids = [e["entity_id"] for e in entities]
|
|
|
|
# Fetch history for current period
|
|
history = self._fetch_history(entity_ids, period_start, period_end)
|
|
|
|
# Fetch history for previous period (for trend)
|
|
prev_history = self._fetch_history(entity_ids, prev_start, prev_end)
|
|
|
|
results = {}
|
|
area_results = {}
|
|
|
|
for ent in entities:
|
|
eid = ent["entity_id"]
|
|
fname = ent["friendly_name"]
|
|
domain = ent["domain"]
|
|
area = areas.get(eid, "Без комнаты")
|
|
|
|
entries = history.get(eid, [])
|
|
metrics = compute_availability(entries, period_start, period_end)
|
|
|
|
# Sparkline
|
|
spark_days = 7 if period in ("7d", "30d") else 1
|
|
sparkline = compute_sparkline(entries, period_end, days=spark_days)
|
|
|
|
# Trend
|
|
prev_entries = prev_history.get(eid, [])
|
|
prev_metrics = compute_availability(prev_entries, prev_start, prev_end)
|
|
trend = calc_trend(metrics["availability_pct"], prev_metrics["availability_pct"])
|
|
|
|
color = get_color(metrics["availability_pct"])
|
|
|
|
result = {
|
|
"entity_id": eid,
|
|
"friendly_name": fname,
|
|
"domain": domain,
|
|
"area": area,
|
|
"period": period,
|
|
"availability_pct": metrics["availability_pct"],
|
|
"down_count": metrics["down_count"],
|
|
"max_downtime_minutes": metrics["max_downtime_minutes"],
|
|
"sparkline": sparkline,
|
|
"trend": trend,
|
|
"last_downtime": metrics["last_downtime"],
|
|
"color": color,
|
|
"last_updated": now.isoformat(),
|
|
}
|
|
results[eid] = result
|
|
|
|
# Area aggregation
|
|
if area not in area_results:
|
|
area_results[area] = {"pcts": [], "devices": 0, "problems": 0}
|
|
area_results[area]["pcts"].append(metrics["availability_pct"])
|
|
area_results[area]["devices"] += 1
|
|
if metrics["availability_pct"] < 95:
|
|
area_results[area]["problems"] += 1
|
|
|
|
self._results_cache[period] = results
|
|
|
|
# Write device sensors
|
|
for eid, res in results.items():
|
|
safe = sanitize_entity_id(eid)
|
|
sensor_id = f"sensor.avail_{safe}"
|
|
attrs = {k: v for k, v in res.items() if k != "state"}
|
|
self.set_state(sensor_id, state=str(res["availability_pct"]), attributes=attrs)
|
|
|
|
# Write area sensors
|
|
for area_name, adata in area_results.items():
|
|
avg = round(sum(adata["pcts"]) / len(adata["pcts"]), 1) if adata["pcts"] else 100.0
|
|
safe_area = sanitize_area_name(area_name)
|
|
sensor_id = f"sensor.avail_area_{safe_area}"
|
|
attrs = {
|
|
"area": area_name,
|
|
"period": period,
|
|
"availability_pct": avg,
|
|
"device_count": adata["devices"],
|
|
"problem_count": adata["problems"],
|
|
"color": get_color(avg),
|
|
"last_updated": now.isoformat(),
|
|
}
|
|
self.set_state(sensor_id, state=str(avg), attributes=attrs)
|
|
|
|
self.log(f"Calculated {len(results)} device sensors, {len(area_results)} area sensors for {period}")
|
|
|
|
# ── Callbacks ──
|
|
|
|
def _cold_start(self, kwargs):
|
|
"""Initial calculation on startup."""
|
|
self.log("Cold start: calculating current period")
|
|
try:
|
|
period = self.get_state("input_select.avail_period") or "7d"
|
|
except Exception:
|
|
period = "7d"
|
|
self._current_period = period
|
|
self._calc_period(period)
|
|
|
|
def _calc_24h(self, kwargs):
|
|
self._calc_period("24h")
|
|
|
|
def _calc_7d(self, kwargs):
|
|
self._calc_period("7d")
|
|
|
|
def _calc_30d(self, kwargs):
|
|
self._calc_period("30d")
|
|
|
|
def _period_changed(self, entity, attribute, old, new, kwargs):
|
|
"""Handle period selector change."""
|
|
if new and new != old:
|
|
self._current_period = new
|
|
self.log(f"Period changed to {new}, recalculating")
|
|
self._calc_period(new)
|
|
|
|
# ── Progress ──
|
|
|
|
def _set_progress(self, value: str):
|
|
"""Update calculation progress sensor."""
|
|
self.set_state("sensor.avail_calc_progress", state=value, attributes={
|
|
"friendly_name": "Расчёт доступности",
|
|
"last_updated": datetime.now(timezone.utc).isoformat(),
|
|
})
|