"""AppDaemon app: Device availability calculator for Home Assistant. Calculates uptime metrics for tracked devices and creates sensor.avail_* entities. """ import appdaemon.plugins.hass.hassapi as hass import requests from datetime import datetime, timezone, timedelta from availability_utils import ( is_excluded, sanitize_entity_id, sanitize_area_name, get_color, calc_trend, format_downtime, parse_datetime, period_timedelta, chunk_list, compute_availability, compute_sparkline, TRACKED_DOMAINS, ) BATCH_SIZE = 20 BATCH_DELAY = 1 # seconds between batch requests HA_URL = "http://supervisor/core/api" class Availability(hass.Hass): def initialize(self): # Ensure websocket-client is available for registry queries try: import websocket except ImportError: import subprocess subprocess.check_call(["pip", "install", "websocket-client", "-q"]) self.log("Installed websocket-client") self._token = None self._entities_cache = [] self._areas_cache = {} self._current_period = "7d" self._results_cache = {"24h": {}, "7d": {}, "30d": {}} # Cold-start: full recalc after 30s self.run_in(self._cold_start, 30) # Periodic schedules self.run_every(self._calc_24h, "now+60", 5 * 60) self.run_every(self._calc_7d, "now+90", 15 * 60) self.run_every(self._calc_30d, "now+120", 2 * 3600) # Listen for period selector change self.listen_state(self._period_changed, "input_select.avail_period") self.log("Availability app initialized") def _get_token(self): if self._token is None: import os self._token = os.environ.get("SUPERVISOR_TOKEN", "") if not self._token: self._token = self.args.get("token", "") return self._token def _api_headers(self): return { "Authorization": f"Bearer {self._get_token()}", "Content-Type": "application/json", } def _api_get(self, path): url = f"{HA_URL}{path}" try: resp = requests.get(url, headers=self._api_headers(), timeout=30) resp.raise_for_status() return resp.json() except Exception as e: self.log(f"API error {path}: {e}", level="WARNING") return None # ── Entity Discovery ── def _fetch_entities(self): """Get all entities from HA and filter by domain + exclusions.""" data = self._api_get("/states") if not data: self.log("Failed to fetch entities", level="ERROR") return [] entities = [] for e in data: eid = e.get("entity_id", "") domain = eid.split(".")[0] if "." in eid else "" if domain not in TRACKED_DOMAINS: continue if is_excluded(eid): continue entities.append({ "entity_id": eid, "friendly_name": e.get("attributes", {}).get("friendly_name", eid), "domain": domain, }) self._entities_cache = entities self.log(f"Found {len(entities)} tracked entities") return entities def _fetch_areas(self): """Fetch area registry via WebSocket and build entity_id -> area mapping. HA registry APIs are only available via WebSocket, not REST. """ try: import websocket import json as _json except ImportError: self.log("websocket-client not available, areas will be unavailable", level="WARNING") self._areas_cache = {} return {} # Use HA long-lived access token for WebSocket auth token = self.args.get("ha_token", "") if not token: token = self._get_token() if not token: self._areas_cache = {} return {} ws_url = "ws://homeassistant:8123/api/websocket" try: ws = websocket.create_connection(ws_url, timeout=15) except Exception as e: self.log(f"WebSocket connect failed: {e}", level="WARNING") self._areas_cache = {} return {} try: # Step 1: Auth handshake auth_msg = _json.loads(ws.recv()) if auth_msg.get("type") == "auth_required": ws.send(_json.dumps({"type": "auth", "access_token": token})) auth_result = _json.loads(ws.recv()) if auth_result.get("type") != "auth_ok": self.log(f"WS auth failed: {auth_result.get('message','')}", level="ERROR") ws.close() self._areas_cache = {} return {} self.log("WS auth OK") # Step 2: Fetch area registry ws.send(_json.dumps({"id": 1, "type": "config/area_registry/list"})) areas_result = _json.loads(ws.recv()) if not areas_result.get("success"): self.log(f"Area registry failed: {areas_result.get('error','?')}", level="WARNING") ws.close() self._areas_cache = {} return {} areas_data = areas_result.get("result", []) # Build area_id -> area_name map area_map = {} if isinstance(areas_data, list): for area in areas_data: area_map[area["area_id"]] = area.get("name", "Без комнаты") self.log(f"WS: {len(area_map)} areas fetched") # Step 3: Fetch device registry ws.send(_json.dumps({"id": 2, "type": "config/device_registry/list"})) dev_result = _json.loads(ws.recv()) dev_area = {} if dev_result.get("success"): dev_data = dev_result.get("result", []) if isinstance(dev_data, list): for dev in dev_data: dev_id = dev.get("id") a_id = dev.get("area_id") if dev_id and a_id and a_id in area_map: dev_area[dev_id] = area_map[a_id] self.log(f"WS: {len(dev_area)} device-area mappings") # Step 4: Fetch entity registry ws.send(_json.dumps({"id": 3, "type": "config/entity_registry/list"})) ent_result = _json.loads(ws.recv()) eid_to_area = {} if ent_result.get("success"): ent_data = ent_result.get("result", []) # HA returns either a list directly or a dict with 'entities' key if isinstance(ent_data, dict): ent_list = ent_data.get("entities", []) else: ent_list = ent_data for entry in ent_list: eid = entry.get("entity_id", "") a_id = entry.get("area_id") dev_id = entry.get("device_id") if a_id and a_id in area_map: eid_to_area[eid] = area_map[a_id] elif dev_id and dev_id in dev_area: eid_to_area[eid] = dev_area[dev_id] else: eid_to_area[eid] = "Без комнаты" self.log(f"WS: {len(eid_to_area)} entity-area mappings") ws.close() self._areas_cache = eid_to_area return eid_to_area except Exception as e: self.log(f"WebSocket area fetch error: {e}", level="WARNING") try: ws.close() except: pass self._areas_cache = {} return {} # ── History Fetch ── def _fetch_history(self, entity_ids: list, period_start: datetime, period_end: datetime): """Fetch history from HA API in batches.""" start_str = period_start.isoformat() all_history = {} chunks = chunk_list(entity_ids, BATCH_SIZE) total_chunks = len(chunks) for i, chunk in enumerate(chunks): ids_str = ",".join(chunk) path = f"/history/period/{start_str}?filter_entity_id={ids_str}&minimal_response&no_attributes" data = self._api_get(path) if data and isinstance(data, list): for entity_history in data: if entity_history and isinstance(entity_history, list) and len(entity_history) > 0: eid = entity_history[0].get("entity_id", chunk[0] if chunk else "") all_history[eid] = entity_history # Update progress progress = f"{i + 1}/{total_chunks}" self._set_progress(progress) if i < total_chunks - 1: import time; time.sleep(BATCH_DELAY) self._set_progress("idle") return all_history # ── Calculation ── def _calc_period(self, period: str): """Calculate availability for all devices in given period.""" self.log(f"Calculating availability for period {period}") now = datetime.now(timezone.utc) td = period_timedelta(period) period_start = now - td period_end = now # Previous period for trend prev_start = period_start - td prev_end = period_start entities = self._fetch_entities() if not entities: return areas = self._fetch_areas() entity_ids = [e["entity_id"] for e in entities] # Fetch history for current period history = self._fetch_history(entity_ids, period_start, period_end) # Fetch history for previous period (for trend) prev_history = self._fetch_history(entity_ids, prev_start, prev_end) results = {} area_results = {} for ent in entities: eid = ent["entity_id"] fname = ent["friendly_name"] domain = ent["domain"] area = areas.get(eid, "Без комнаты") entries = history.get(eid, []) metrics = compute_availability(entries, period_start, period_end) # Sparkline spark_days = 7 if period in ("7d", "30d") else 1 sparkline = compute_sparkline(entries, period_end, days=spark_days) # Trend prev_entries = prev_history.get(eid, []) prev_metrics = compute_availability(prev_entries, prev_start, prev_end) trend = calc_trend(metrics["availability_pct"], prev_metrics["availability_pct"]) color = get_color(metrics["availability_pct"]) result = { "entity_id": eid, "friendly_name": fname, "domain": domain, "area": area, "period": period, "availability_pct": metrics["availability_pct"], "down_count": metrics["down_count"], "max_downtime_minutes": metrics["max_downtime_minutes"], "sparkline": sparkline, "trend": trend, "last_downtime": metrics["last_downtime"], "color": color, "last_updated": now.isoformat(), } results[eid] = result # Area aggregation if area not in area_results: area_results[area] = {"pcts": [], "devices": 0, "problems": 0} area_results[area]["pcts"].append(metrics["availability_pct"]) area_results[area]["devices"] += 1 if metrics["availability_pct"] < 95: area_results[area]["problems"] += 1 self._results_cache[period] = results # Write device sensors for eid, res in results.items(): safe = sanitize_entity_id(eid) sensor_id = f"sensor.avail_{safe}" attrs = {k: v for k, v in res.items() if k != "state"} self.set_state(sensor_id, state=str(res["availability_pct"]), attributes=attrs) # Write area sensors for area_name, adata in area_results.items(): avg = round(sum(adata["pcts"]) / len(adata["pcts"]), 1) if adata["pcts"] else 100.0 safe_area = sanitize_area_name(area_name) sensor_id = f"sensor.avail_area_{safe_area}" attrs = { "area": area_name, "period": period, "availability_pct": avg, "device_count": adata["devices"], "problem_count": adata["problems"], "color": get_color(avg), "last_updated": now.isoformat(), } self.set_state(sensor_id, state=str(avg), attributes=attrs) self.log(f"Calculated {len(results)} device sensors, {len(area_results)} area sensors for {period}") # ── Callbacks ── def _cold_start(self, kwargs): """Initial calculation on startup.""" self.log("Cold start: calculating current period") try: period = self.get_state("input_select.avail_period") or "7d" except Exception: period = "7d" self._current_period = period self._calc_period(period) def _calc_24h(self, kwargs): self._calc_period("24h") def _calc_7d(self, kwargs): self._calc_period("7d") def _calc_30d(self, kwargs): self._calc_period("30d") def _period_changed(self, entity, attribute, old, new, kwargs): """Handle period selector change.""" if new and new != old: self._current_period = new self.log(f"Period changed to {new}, recalculating") self._calc_period(new) # ── Progress ── def _set_progress(self, value: str): """Update calculation progress sensor.""" self.set_state("sensor.avail_calc_progress", state=value, attributes={ "friendly_name": "Расчёт доступности", "last_updated": datetime.now(timezone.utc).isoformat(), })