"""Utility functions for availability calculator.""" import re from datetime import datetime, timezone, timedelta # Domains to track (phase 1) TRACKED_DOMAINS = {"light", "switch"} # Entity suffixes to exclude (Zigbee2MQTT helpers, etc.) EXCLUDE_SUFFIXES = ( "_battery_low", "_battery", "_linkquality", "_update", "_update_state", "_identify", ) # Additional exclusions for switch domain (relay settings, not real devices) SWITCH_EXCLUDE_SUFFIXES = ( "_delayed_power_on_state", "_detach_relay_mode", "_network_indicator", "_turbo_mode", "_do_not_disturb", ) SWITCH_EXCLUDE_EXACT = {"switch.zigbee2mqtt_bridge_permit_join"} # Auto-generated Zigbee select/number patterns ZIGBEE_AUTO_PATTERN = re.compile( r"(select|number)\.(zigbee2mqtt_|.*_(radar_sensitivity|sensitivity|mode|level|delay|threshold|interval|timeout))", re.IGNORECASE, ) # Update/button domain patterns UPDATE_BUTTON_EXCLUDE = re.compile(r"^(update|button)\.", re.IGNORECASE) def is_excluded(entity_id: str) -> bool: """Check if entity should be excluded from tracking.""" domain = entity_id.split(".")[0] name = entity_id.split(".")[1] if "." in entity_id else "" # Exact exclusions if entity_id in SWITCH_EXCLUDE_EXACT: return True # Suffix exclusions for suffix in EXCLUDE_SUFFIXES: if name.endswith(suffix): return True # Switch-specific suffix exclusions if domain == "switch": for suffix in SWITCH_EXCLUDE_SUFFIXES: if name.endswith(suffix): return True return False def sanitize_entity_id(entity_id: str) -> str: """Convert entity_id to safe sensor name: light.bra_v_spalne → light_bra_v_spalne.""" return entity_id.replace(".", "_") # Cyrillic transliteration table _CYRILLIC_MAP = { 'а': 'a', 'б': 'b', 'в': 'v', 'г': 'g', 'д': 'd', 'е': 'e', 'ё': 'yo', 'ж': 'zh', 'з': 'z', 'и': 'i', 'й': 'y', 'к': 'k', 'л': 'l', 'м': 'm', 'н': 'n', 'о': 'o', 'п': 'p', 'р': 'r', 'с': 's', 'т': 't', 'у': 'u', 'ф': 'f', 'х': 'kh', 'ц': 'ts', 'ч': 'ch', 'ш': 'sh', 'щ': 'shch', 'ъ': '', 'ы': 'y', 'ь': '', 'э': 'e', 'ю': 'yu', 'я': 'ya', } def sanitize_area_name(name: str) -> str: """Convert area name to safe sensor suffix (Latin only).""" s = name.lower().strip() # Transliterate Cyrillic to Latin s = ''.join(_CYRILLIC_MAP.get(ch, ch) for ch in s) # Keep only a-z, 0-9, _; replace everything else s = re.sub(r"[^a-z0-9_]", "_", s) s = re.sub(r"_+", "_", s) return s.strip("_") def domain_icon(domain: str) -> str: """Return icon for domain.""" return { "light": "💡", "switch": "🔌", "binary_sensor": "👁️", "sensor": "🌡️", "climate": "❄️", "cover": "🚪", "lock": "🔒", "media_player": "🔊", "device_tracker": "📍", "vacuum": "🤖", "fan": "🌀", "humidifier": "💨", "water_heater": "🚿", "siren": "🚨", "button": "🔘", }.get(domain, "📡") def get_color(pct: float) -> str: """Return color bucket for availability percentage.""" if pct >= 99: return "green" elif pct >= 95: return "yellow" elif pct >= 90: return "orange" return "red" def calc_trend(current_pct: float, previous_pct: float, threshold: float = 0.5) -> str: """Compare current vs previous period. Returns up/down/stable.""" diff = current_pct - previous_pct if diff > threshold: return "up" elif diff < -threshold: return "down" return "stable" def format_downtime(minutes: int) -> str: """Format downtime minutes to human-readable string.""" if minutes < 60: return f"{minutes} мин" h, m = divmod(minutes, 60) if h < 24: return f"{h}ч {m}мин" d, h = divmod(h, 24) return f"{d}д {h}ч {m}мин" def parse_datetime(dt_str: str) -> datetime: """Parse HA datetime string to timezone-aware datetime.""" # HA format: 2026-04-14T15:32:00+03:00 or 2026-04-14T15:32:00.123456+03:00 dt_str = dt_str.rstrip("Z") if "+" in dt_str[10:] or dt_str.endswith("Z"): # Has timezone if "+" in dt_str: # Python fromisoformat handles this in 3.7+ return datetime.fromisoformat(dt_str) return datetime.fromisoformat(dt_str.replace("Z", "+00:00")) # No timezone — assume UTC dt = datetime.fromisoformat(dt_str) return dt.replace(tzinfo=timezone.utc) def period_timedelta(period: str) -> timedelta: """Convert period string to timedelta.""" return {"24h": timedelta(hours=24), "7d": timedelta(days=7), "30d": timedelta(days=30)}[period] def chunk_list(lst: list, size: int) -> list: """Split list into chunks of given size.""" return [lst[i:i + size] for i in range(0, len(lst), size)] def compute_availability(history_entries: list, period_start: datetime, period_end: datetime) -> dict: """ Compute availability metrics from HA history entries. Each entry is a dict with 'state' and 'last_changed'. Returns dict with: availability_pct, down_count, max_downtime_minutes, last_downtime """ unavailable_seconds = 0 down_count = 0 max_downtime = 0 current_downtime_start = None last_downtime = None total_seconds = (period_end - period_start).total_seconds() if not history_entries or total_seconds <= 0: return { "availability_pct": 100.0, "down_count": 0, "max_downtime_minutes": 0, "last_downtime": None, } # Sort by last_changed entries = sorted(history_entries, key=lambda e: parse_datetime(e["last_changed"])) for i, entry in enumerate(entries): state = entry["state"] changed = parse_datetime(entry["last_changed"]) # Clamp to period if changed < period_start: changed = period_start # Determine next change time if i + 1 < len(entries): next_changed = parse_datetime(entries[i + 1]["last_changed"]) else: next_changed = period_end if next_changed > period_end: next_changed = period_end if state in ("unavailable", "unknown"): if current_downtime_start is None: # Transition to unavailable current_downtime_start = changed down_count += 1 duration = (next_changed - changed).total_seconds() if duration > 0: unavailable_seconds += duration last_downtime = changed # Check if this is the end of a downtime period if i + 1 < len(entries) and entries[i + 1]["state"] not in ("unavailable", "unknown"): if current_downtime_start is not None: downtime = (next_changed - current_downtime_start).total_seconds() if downtime > max_downtime: max_downtime = downtime current_downtime_start = None # If last entry and still unavailable elif i == len(entries) - 1: if current_downtime_start is not None: downtime = (next_changed - current_downtime_start).total_seconds() if downtime > max_downtime: max_downtime = downtime current_downtime_start = None else: # Available state — if there was a downtime running, end it if current_downtime_start is not None: downtime = (changed - current_downtime_start).total_seconds() if downtime > max_downtime: max_downtime = downtime current_downtime_start = None # Handle case where device was unavailable at period_start # (first entry state determines status from period_start to first change) if entries and entries[0]["state"] in ("unavailable", "unknown"): first_change = parse_datetime(entries[0]["last_changed"]) if first_change > period_start: pre_duration = (first_change - period_start).total_seconds() unavailable_seconds += pre_duration # This counts as one transition if not already counted if current_downtime_start is None: down_count += 1 # Handle trailing unavailable (last entry is unavailable) if entries and entries[-1]["state"] in ("unavailable", "unknown"): last_change = parse_datetime(entries[-1]["last_changed"]) trailing = (period_end - last_change).total_seconds() if trailing > 0 and current_downtime_start is None: # Already counted in the loop pass pct = round((1 - unavailable_seconds / total_seconds) * 100, 1) if total_seconds > 0 else 100.0 pct = max(0.0, min(100.0, pct)) return { "availability_pct": pct, "down_count": down_count, "max_downtime_minutes": round(max_downtime / 60), "last_downtime": last_downtime.isoformat() if last_downtime else None, } def compute_sparkline(history_entries: list, period_end: datetime, days: int = 7) -> list: """Compute daily availability percentages for sparkline (last N days).""" points = [] for i in range(days - 1, -1, -1): day_start = period_end - timedelta(days=i + 1) day_end = period_end - timedelta(days=i) # Filter entries relevant to this day day_entries = [] for e in history_entries: changed = parse_datetime(e["last_changed"]) if changed < day_end and changed >= day_start - timedelta(days=1): day_entries.append(e) result = compute_availability(day_entries, day_start, day_end) points.append(result["availability_pct"]) return points