diff --git a/tasks/ha-availability-dashboard/appdaemon/apps.yaml b/tasks/ha-availability-dashboard/appdaemon/apps.yaml new file mode 100644 index 0000000..b49911c --- /dev/null +++ b/tasks/ha-availability-dashboard/appdaemon/apps.yaml @@ -0,0 +1,11 @@ +# AppDaemon apps configuration +# Add the availability app alongside existing apps (e.g., hello_world) + +hello_world: + module: hello + class: HelloWorld + +availability: + module: availability + class: Availability + log_level: info diff --git a/tasks/ha-availability-dashboard/appdaemon/availability.py b/tasks/ha-availability-dashboard/appdaemon/availability.py new file mode 100644 index 0000000..db8c3c3 --- /dev/null +++ b/tasks/ha-availability-dashboard/appdaemon/availability.py @@ -0,0 +1,322 @@ +"""AppDaemon app: Device availability calculator for Home Assistant. + +Calculates uptime metrics for tracked devices and creates sensor.avail_* entities. +""" + +import appdaemon.plugins.hass.hassapi as hass +import requests +from datetime import datetime, timezone, timedelta +from availability_utils import ( + is_excluded, sanitize_entity_id, sanitize_area_name, + get_color, calc_trend, format_downtime, + parse_datetime, period_timedelta, chunk_list, + compute_availability, compute_sparkline, + TRACKED_DOMAINS, +) + +BATCH_SIZE = 20 +BATCH_DELAY = 1 # seconds between batch requests +HA_URL = "http://supervisor/core/api" + + +class Availability(hass.Hass): + def initialize(self): + self._token = None + self._entities_cache = [] + self._areas_cache = {} + self._current_period = "7d" + self._results_cache = {"24h": {}, "7d": {}, "30d": {}} + + # Cold-start: full recalc after 30s + self.run_in(self._cold_start, 30) + + # Periodic schedules + self.run_every(self._calc_24h, "now+60", 5 * 60) + self.run_every(self._calc_7d, "now+90", 15 * 60) + self.run_every(self._calc_30d, "now+120", 2 * 3600) + + # Listen for period selector change + self.listen_state(self._period_changed, "input_select.avail_period") + + self.log("Availability app initialized") + + def _get_token(self): + if self._token is None: + import os + self._token = os.environ.get("SUPERVISOR_TOKEN", "") + if not self._token: + self._token = self.args.get("token", "") + return self._token + + def _api_headers(self): + return { + "Authorization": f"Bearer {self._get_token()}", + "Content-Type": "application/json", + } + + def _api_get(self, path): + url = f"{HA_URL}{path}" + try: + resp = requests.get(url, headers=self._api_headers(), timeout=30) + resp.raise_for_status() + return resp.json() + except Exception as e: + self.log(f"API error {path}: {e}", level="WARNING") + return None + + # ── Entity Discovery ── + + def _fetch_entities(self): + """Get all entities from HA and filter by domain + exclusions.""" + data = self._api_get("/states") + if not data: + self.log("Failed to fetch entities", level="ERROR") + return [] + + entities = [] + for e in data: + eid = e.get("entity_id", "") + domain = eid.split(".")[0] if "." in eid else "" + if domain not in TRACKED_DOMAINS: + continue + if is_excluded(eid): + continue + entities.append({ + "entity_id": eid, + "friendly_name": e.get("attributes", {}).get("friendly_name", eid), + "domain": domain, + }) + self._entities_cache = entities + self.log(f"Found {len(entities)} tracked entities") + return entities + + def _fetch_areas(self): + """Fetch area registry and build entity_id -> area mapping.""" + data = self._api_get("/config/area_registry") + if not data: + self._areas_cache = {} + return {} + + # Build device_id -> area map + device_areas = {} + for area in data: + area_id = area.get("area_id") + area_name = area.get("name", "Без комнаты") + device_areas[area_id] = area_name + + # Now get entity registry to map entity -> device -> area + reg = self._api_get("/config/entity_registry") + if not reg: + self._areas_cache = {} + return {} + + eid_to_area = {} + for entry in reg: + eid = entry.get("entity_id", "") + device_id = entry.get("device_id") + area_id = entry.get("area_id") + # Use entity-level area if set, otherwise device-level + if area_id and area_id in device_areas: + eid_to_area[eid] = device_areas[area_id] + elif device_id: + # Try to find device's area via device registry + dev_data = self._api_get(f"/config/device_registry") + # This is expensive; fallback to simple mapping + pass + else: + eid_to_area[eid] = "Без комнаты" + + # Fill from device registry if available + dev_reg = self._api_get("/config/device_registry") + if dev_reg: + dev_area_map = {} + for dev in dev_reg: + dev_id = dev.get("id") + a_id = dev.get("area_id") + if dev_id and a_id and a_id in device_areas: + dev_area_map[dev_id] = device_areas[a_id] + + for entry in reg: + eid = entry.get("entity_id", "") + device_id = entry.get("device_id") + area_id = entry.get("area_id") + if area_id and area_id in device_areas: + eid_to_area[eid] = device_areas[area_id] + elif device_id and device_id in dev_area_map: + eid_to_area[eid] = dev_area_map[device_id] + else: + eid_to_area[eid] = eid_to_area.get(eid, "Без комнаты") + + self._areas_cache = eid_to_area + return eid_to_area + + # ── History Fetch ── + + def _fetch_history(self, entity_ids: list, period_start: datetime, period_end: datetime): + """Fetch history from HA API in batches.""" + start_str = period_start.isoformat() + all_history = {} + + chunks = chunk_list(entity_ids, BATCH_SIZE) + total_chunks = len(chunks) + + for i, chunk in enumerate(chunks): + ids_str = ",".join(chunk) + path = f"/history/period/{start_str}?filter_entity_id={ids_str}&minimal_response&no_attributes" + data = self._api_get(path) + + if data and isinstance(data, list): + for entity_history in data: + if entity_history and isinstance(entity_history, list) and len(entity_history) > 0: + eid = entity_history[0].get("entity_id", chunk[0] if chunk else "") + all_history[eid] = entity_history + + # Update progress + progress = f"{i + 1}/{total_chunks}" + self._set_progress(progress) + + if i < total_chunks - 1: + self.sleep(BATCH_DELAY) + + self._set_progress("idle") + return all_history + + # ── Calculation ── + + def _calc_period(self, period: str): + """Calculate availability for all devices in given period.""" + self.log(f"Calculating availability for period {period}") + + now = datetime.now(timezone.utc) + td = period_timedelta(period) + period_start = now - td + period_end = now + + # Previous period for trend + prev_start = period_start - td + prev_end = period_start + + entities = self._fetch_entities() + if not entities: + return + + areas = self._fetch_areas() + entity_ids = [e["entity_id"] for e in entities] + + # Fetch history for current period + history = self._fetch_history(entity_ids, period_start, period_end) + + # Fetch history for previous period (for trend) + prev_history = self._fetch_history(entity_ids, prev_start, prev_end) + + results = {} + area_results = {} + + for ent in entities: + eid = ent["entity_id"] + fname = ent["friendly_name"] + domain = ent["domain"] + area = areas.get(eid, "Без комнаты") + + entries = history.get(eid, []) + metrics = compute_availability(entries, period_start, period_end) + + # Sparkline + spark_days = 7 if period in ("7d", "30d") else 1 + sparkline = compute_sparkline(entries, period_end, days=spark_days) + + # Trend + prev_entries = prev_history.get(eid, []) + prev_metrics = compute_availability(prev_entries, prev_start, prev_end) + trend = calc_trend(metrics["availability_pct"], prev_metrics["availability_pct"]) + + color = get_color(metrics["availability_pct"]) + + result = { + "entity_id": eid, + "friendly_name": fname, + "domain": domain, + "area": area, + "period": period, + "availability_pct": metrics["availability_pct"], + "down_count": metrics["down_count"], + "max_downtime_minutes": metrics["max_downtime_minutes"], + "sparkline": sparkline, + "trend": trend, + "last_downtime": metrics["last_downtime"], + "color": color, + "last_updated": now.isoformat(), + } + results[eid] = result + + # Area aggregation + if area not in area_results: + area_results[area] = {"pcts": [], "devices": 0, "problems": 0} + area_results[area]["pcts"].append(metrics["availability_pct"]) + area_results[area]["devices"] += 1 + if metrics["availability_pct"] < 95: + area_results[area]["problems"] += 1 + + self._results_cache[period] = results + + # Write device sensors + for eid, res in results.items(): + safe = sanitize_entity_id(eid) + sensor_id = f"sensor.avail_{safe}" + attrs = {k: v for k, v in res.items() if k != "state"} + self.set_state(sensor_id, state=str(res["availability_pct"]), attributes=attrs) + + # Write area sensors + for area_name, adata in area_results.items(): + avg = round(sum(adata["pcts"]) / len(adata["pcts"]), 1) if adata["pcts"] else 100.0 + safe_area = sanitize_area_name(area_name) + sensor_id = f"sensor.avail_area_{safe_area}" + attrs = { + "area": area_name, + "period": period, + "availability_pct": avg, + "device_count": adata["devices"], + "problem_count": adata["problems"], + "color": get_color(avg), + "last_updated": now.isoformat(), + } + self.set_state(sensor_id, state=str(avg), attributes=attrs) + + self.log(f"Calculated {len(results)} device sensors, {len(area_results)} area sensors for {period}") + + # ── Callbacks ── + + def _cold_start(self, kwargs): + """Initial calculation on startup.""" + self.log("Cold start: calculating current period") + try: + period = self.get_state("input_select.avail_period") or "7d" + except Exception: + period = "7d" + self._current_period = period + self._calc_period(period) + + def _calc_24h(self, kwargs): + self._calc_period("24h") + + def _calc_7d(self, kwargs): + self._calc_period("7d") + + def _calc_30d(self, kwargs): + self._calc_period("30d") + + def _period_changed(self, entity, attribute, old, new, kwargs): + """Handle period selector change.""" + if new and new != old: + self._current_period = new + self.log(f"Period changed to {new}, recalculating") + self._calc_period(new) + + # ── Progress ── + + def _set_progress(self, value: str): + """Update calculation progress sensor.""" + self.set_state("sensor.avail_calc_progress", state=value, attributes={ + "friendly_name": "Расчёт доступности", + "last_updated": datetime.now(timezone.utc).isoformat(), + }) diff --git a/tasks/ha-availability-dashboard/appdaemon/availability_utils.py b/tasks/ha-availability-dashboard/appdaemon/availability_utils.py new file mode 100644 index 0000000..3ad9a31 --- /dev/null +++ b/tasks/ha-availability-dashboard/appdaemon/availability_utils.py @@ -0,0 +1,266 @@ +"""Utility functions for availability calculator.""" + +import re +from datetime import datetime, timezone, timedelta + +# Domains to track (phase 1) +TRACKED_DOMAINS = {"light", "switch"} + +# Entity suffixes to exclude (Zigbee2MQTT helpers, etc.) +EXCLUDE_SUFFIXES = ( + "_battery_low", "_battery", "_linkquality", "_update", + "_update_state", "_identify", +) + +# Additional exclusions for switch domain (relay settings, not real devices) +SWITCH_EXCLUDE_SUFFIXES = ( + "_delayed_power_on_state", "_detach_relay_mode", + "_network_indicator", "_turbo_mode", "_do_not_disturb", +) + +SWITCH_EXCLUDE_EXACT = {"switch.zigbee2mqtt_bridge_permit_join"} + +# Auto-generated Zigbee select/number patterns +ZIGBEE_AUTO_PATTERN = re.compile( + r"(select|number)\.(zigbee2mqtt_|.*_(radar_sensitivity|sensitivity|mode|level|delay|threshold|interval|timeout))", + re.IGNORECASE, +) + +# Update/button domain patterns +UPDATE_BUTTON_EXCLUDE = re.compile(r"^(update|button)\.", re.IGNORECASE) + + +def is_excluded(entity_id: str) -> bool: + """Check if entity should be excluded from tracking.""" + domain = entity_id.split(".")[0] + name = entity_id.split(".")[1] if "." in entity_id else "" + + # Exact exclusions + if entity_id in SWITCH_EXCLUDE_EXACT: + return True + + # Suffix exclusions + for suffix in EXCLUDE_SUFFIXES: + if name.endswith(suffix): + return True + + # Switch-specific suffix exclusions + if domain == "switch": + for suffix in SWITCH_EXCLUDE_SUFFIXES: + if name.endswith(suffix): + return True + + return False + + +def sanitize_entity_id(entity_id: str) -> str: + """Convert entity_id to safe sensor name: light.bra_v_spalne → light_bra_v_spalne.""" + return entity_id.replace(".", "_") + + +def sanitize_area_name(name: str) -> str: + """Convert area name to safe sensor suffix.""" + s = name.lower().strip() + s = re.sub(r"[^a-zа-яё0-9_]", "_", s) + s = re.sub(r"_+", "_", s) + return s.strip("_") + + +def domain_icon(domain: str) -> str: + """Return icon for domain.""" + return { + "light": "💡", + "switch": "🔌", + "binary_sensor": "👁️", + "sensor": "🌡️", + "climate": "❄️", + "cover": "🚪", + "lock": "🔒", + "media_player": "🔊", + "device_tracker": "📍", + "vacuum": "🤖", + "fan": "🌀", + "humidifier": "💨", + "water_heater": "🚿", + "siren": "🚨", + "button": "🔘", + }.get(domain, "📡") + + +def get_color(pct: float) -> str: + """Return color bucket for availability percentage.""" + if pct >= 99: + return "green" + elif pct >= 95: + return "yellow" + elif pct >= 90: + return "orange" + return "red" + + +def calc_trend(current_pct: float, previous_pct: float, threshold: float = 0.5) -> str: + """Compare current vs previous period. Returns up/down/stable.""" + diff = current_pct - previous_pct + if diff > threshold: + return "up" + elif diff < -threshold: + return "down" + return "stable" + + +def format_downtime(minutes: int) -> str: + """Format downtime minutes to human-readable string.""" + if minutes < 60: + return f"{minutes} мин" + h, m = divmod(minutes, 60) + if h < 24: + return f"{h}ч {m}мин" + d, h = divmod(h, 24) + return f"{d}д {h}ч {m}мин" + + +def parse_datetime(dt_str: str) -> datetime: + """Parse HA datetime string to timezone-aware datetime.""" + # HA format: 2026-04-14T15:32:00+03:00 or 2026-04-14T15:32:00.123456+03:00 + dt_str = dt_str.rstrip("Z") + if "+" in dt_str[10:] or dt_str.endswith("Z"): + # Has timezone + if "+" in dt_str: + # Python fromisoformat handles this in 3.7+ + return datetime.fromisoformat(dt_str) + return datetime.fromisoformat(dt_str.replace("Z", "+00:00")) + # No timezone — assume UTC + dt = datetime.fromisoformat(dt_str) + return dt.replace(tzinfo=timezone.utc) + + +def period_timedelta(period: str) -> timedelta: + """Convert period string to timedelta.""" + return {"24h": timedelta(hours=24), "7d": timedelta(days=7), "30d": timedelta(days=30)}[period] + + +def chunk_list(lst: list, size: int) -> list: + """Split list into chunks of given size.""" + return [lst[i:i + size] for i in range(0, len(lst), size)] + + +def compute_availability(history_entries: list, period_start: datetime, period_end: datetime) -> dict: + """ + Compute availability metrics from HA history entries. + + Each entry is a dict with 'state' and 'last_changed'. + Returns dict with: availability_pct, down_count, max_downtime_minutes, last_downtime + """ + unavailable_seconds = 0 + down_count = 0 + max_downtime = 0 + current_downtime_start = None + last_downtime = None + total_seconds = (period_end - period_start).total_seconds() + + if not history_entries or total_seconds <= 0: + return { + "availability_pct": 100.0, + "down_count": 0, + "max_downtime_minutes": 0, + "last_downtime": None, + } + + # Sort by last_changed + entries = sorted(history_entries, key=lambda e: parse_datetime(e["last_changed"])) + + for i, entry in enumerate(entries): + state = entry["state"] + changed = parse_datetime(entry["last_changed"]) + + # Clamp to period + if changed < period_start: + changed = period_start + + # Determine next change time + if i + 1 < len(entries): + next_changed = parse_datetime(entries[i + 1]["last_changed"]) + else: + next_changed = period_end + + if next_changed > period_end: + next_changed = period_end + + if state in ("unavailable", "unknown"): + if current_downtime_start is None: + # Transition to unavailable + current_downtime_start = changed + down_count += 1 + + duration = (next_changed - changed).total_seconds() + if duration > 0: + unavailable_seconds += duration + last_downtime = changed + + # Check if this is the end of a downtime period + if i + 1 < len(entries) and entries[i + 1]["state"] not in ("unavailable", "unknown"): + if current_downtime_start is not None: + downtime = (next_changed - current_downtime_start).total_seconds() + if downtime > max_downtime: + max_downtime = downtime + current_downtime_start = None + # If last entry and still unavailable + elif i == len(entries) - 1: + if current_downtime_start is not None: + downtime = (next_changed - current_downtime_start).total_seconds() + if downtime > max_downtime: + max_downtime = downtime + current_downtime_start = None + else: + # Available state — if there was a downtime running, end it + if current_downtime_start is not None: + downtime = (changed - current_downtime_start).total_seconds() + if downtime > max_downtime: + max_downtime = downtime + current_downtime_start = None + + # Handle case where device was unavailable at period_start + # (first entry state determines status from period_start to first change) + if entries and entries[0]["state"] in ("unavailable", "unknown"): + first_change = parse_datetime(entries[0]["last_changed"]) + if first_change > period_start: + pre_duration = (first_change - period_start).total_seconds() + unavailable_seconds += pre_duration + # This counts as one transition if not already counted + if current_downtime_start is None: + down_count += 1 + + # Handle trailing unavailable (last entry is unavailable) + if entries and entries[-1]["state"] in ("unavailable", "unknown"): + last_change = parse_datetime(entries[-1]["last_changed"]) + trailing = (period_end - last_change).total_seconds() + if trailing > 0 and current_downtime_start is None: + # Already counted in the loop + pass + + pct = round((1 - unavailable_seconds / total_seconds) * 100, 1) if total_seconds > 0 else 100.0 + pct = max(0.0, min(100.0, pct)) + + return { + "availability_pct": pct, + "down_count": down_count, + "max_downtime_minutes": round(max_downtime / 60), + "last_downtime": last_downtime.isoformat() if last_downtime else None, + } + + +def compute_sparkline(history_entries: list, period_end: datetime, days: int = 7) -> list: + """Compute daily availability percentages for sparkline (last N days).""" + points = [] + for i in range(days - 1, -1, -1): + day_start = period_end - timedelta(days=i + 1) + day_end = period_end - timedelta(days=i) + # Filter entries relevant to this day + day_entries = [] + for e in history_entries: + changed = parse_datetime(e["last_changed"]) + if changed < day_end and changed >= day_start - timedelta(days=1): + day_entries.append(e) + result = compute_availability(day_entries, day_start, day_end) + points.append(result["availability_pct"]) + return points