Files
wiki/tasks/ha-availability-dashboard/appdaemon/availability_utils.py
2026-04-15 14:30:01 +03:00

280 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Utility functions for availability calculator."""
import re
from datetime import datetime, timezone, timedelta
# Domains to track (phase 1)
TRACKED_DOMAINS = {"light", "switch"}
# Entity suffixes to exclude (Zigbee2MQTT helpers, etc.)
EXCLUDE_SUFFIXES = (
"_battery_low", "_battery", "_linkquality", "_update",
"_update_state", "_identify",
)
# Additional exclusions for switch domain (relay settings, not real devices)
SWITCH_EXCLUDE_SUFFIXES = (
"_delayed_power_on_state", "_detach_relay_mode",
"_network_indicator", "_turbo_mode", "_do_not_disturb",
)
SWITCH_EXCLUDE_EXACT = {"switch.zigbee2mqtt_bridge_permit_join"}
# Auto-generated Zigbee select/number patterns
ZIGBEE_AUTO_PATTERN = re.compile(
r"(select|number)\.(zigbee2mqtt_|.*_(radar_sensitivity|sensitivity|mode|level|delay|threshold|interval|timeout))",
re.IGNORECASE,
)
# Update/button domain patterns
UPDATE_BUTTON_EXCLUDE = re.compile(r"^(update|button)\.", re.IGNORECASE)
def is_excluded(entity_id: str) -> bool:
"""Check if entity should be excluded from tracking."""
domain = entity_id.split(".")[0]
name = entity_id.split(".")[1] if "." in entity_id else ""
# Exact exclusions
if entity_id in SWITCH_EXCLUDE_EXACT:
return True
# Suffix exclusions
for suffix in EXCLUDE_SUFFIXES:
if name.endswith(suffix):
return True
# Switch-specific suffix exclusions
if domain == "switch":
for suffix in SWITCH_EXCLUDE_SUFFIXES:
if name.endswith(suffix):
return True
return False
def sanitize_entity_id(entity_id: str) -> str:
"""Convert entity_id to safe sensor name: light.bra_v_spalne → light_bra_v_spalne."""
return entity_id.replace(".", "_")
# Cyrillic transliteration table
_CYRILLIC_MAP = {
'а': 'a', 'б': 'b', 'в': 'v', 'г': 'g', 'д': 'd', 'е': 'e', 'ё': 'yo',
'ж': 'zh', 'з': 'z', 'и': 'i', 'й': 'y', 'к': 'k', 'л': 'l', 'м': 'm',
'н': 'n', 'о': 'o', 'п': 'p', 'р': 'r', 'с': 's', 'т': 't', 'у': 'u',
'ф': 'f', 'х': 'kh', 'ц': 'ts', 'ч': 'ch', 'ш': 'sh', 'щ': 'shch',
'ъ': '', 'ы': 'y', 'ь': '', 'э': 'e', 'ю': 'yu', 'я': 'ya',
}
def sanitize_area_name(name: str) -> str:
"""Convert area name to safe sensor suffix (Latin only)."""
s = name.lower().strip()
# Transliterate Cyrillic to Latin
s = ''.join(_CYRILLIC_MAP.get(ch, ch) for ch in s)
# Keep only a-z, 0-9, _; replace everything else
s = re.sub(r"[^a-z0-9_]", "_", s)
s = re.sub(r"_+", "_", s)
return s.strip("_")
def domain_icon(domain: str) -> str:
"""Return icon for domain."""
return {
"light": "💡",
"switch": "🔌",
"binary_sensor": "👁️",
"sensor": "🌡️",
"climate": "❄️",
"cover": "🚪",
"lock": "🔒",
"media_player": "🔊",
"device_tracker": "📍",
"vacuum": "🤖",
"fan": "🌀",
"humidifier": "💨",
"water_heater": "🚿",
"siren": "🚨",
"button": "🔘",
}.get(domain, "📡")
def get_color(pct: float) -> str:
"""Return color bucket for availability percentage."""
if pct >= 99:
return "green"
elif pct >= 95:
return "yellow"
elif pct >= 90:
return "orange"
return "red"
def calc_trend(current_pct: float, previous_pct: float, threshold: float = 0.5) -> str:
"""Compare current vs previous period. Returns up/down/stable."""
diff = current_pct - previous_pct
if diff > threshold:
return "up"
elif diff < -threshold:
return "down"
return "stable"
def format_downtime(minutes: int) -> str:
"""Format downtime minutes to human-readable string."""
if minutes < 60:
return f"{minutes} мин"
h, m = divmod(minutes, 60)
if h < 24:
return f"{h}ч {m}мин"
d, h = divmod(h, 24)
return f"{d}д {h}ч {m}мин"
def parse_datetime(dt_str: str) -> datetime:
"""Parse HA datetime string to timezone-aware datetime."""
# HA format: 2026-04-14T15:32:00+03:00 or 2026-04-14T15:32:00.123456+03:00
dt_str = dt_str.rstrip("Z")
if "+" in dt_str[10:] or dt_str.endswith("Z"):
# Has timezone
if "+" in dt_str:
# Python fromisoformat handles this in 3.7+
return datetime.fromisoformat(dt_str)
return datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
# No timezone — assume UTC
dt = datetime.fromisoformat(dt_str)
return dt.replace(tzinfo=timezone.utc)
def period_timedelta(period: str) -> timedelta:
"""Convert period string to timedelta."""
return {"24h": timedelta(hours=24), "7d": timedelta(days=7), "30d": timedelta(days=30)}[period]
def chunk_list(lst: list, size: int) -> list:
"""Split list into chunks of given size."""
return [lst[i:i + size] for i in range(0, len(lst), size)]
def compute_availability(history_entries: list, period_start: datetime, period_end: datetime) -> dict:
"""
Compute availability metrics from HA history entries.
Each entry is a dict with 'state' and 'last_changed'.
Returns dict with: availability_pct, down_count, max_downtime_minutes, last_downtime
"""
unavailable_seconds = 0
down_count = 0
max_downtime = 0
current_downtime_start = None
last_downtime = None
total_seconds = (period_end - period_start).total_seconds()
if not history_entries or total_seconds <= 0:
return {
"availability_pct": 100.0,
"down_count": 0,
"max_downtime_minutes": 0,
"last_downtime": None,
}
# Sort by last_changed
entries = sorted(history_entries, key=lambda e: parse_datetime(e["last_changed"]))
for i, entry in enumerate(entries):
state = entry["state"]
changed = parse_datetime(entry["last_changed"])
# Clamp to period
if changed < period_start:
changed = period_start
# Determine next change time
if i + 1 < len(entries):
next_changed = parse_datetime(entries[i + 1]["last_changed"])
else:
next_changed = period_end
if next_changed > period_end:
next_changed = period_end
if state in ("unavailable", "unknown"):
if current_downtime_start is None:
# Transition to unavailable
current_downtime_start = changed
down_count += 1
duration = (next_changed - changed).total_seconds()
if duration > 0:
unavailable_seconds += duration
last_downtime = changed
# Check if this is the end of a downtime period
if i + 1 < len(entries) and entries[i + 1]["state"] not in ("unavailable", "unknown"):
if current_downtime_start is not None:
downtime = (next_changed - current_downtime_start).total_seconds()
if downtime > max_downtime:
max_downtime = downtime
current_downtime_start = None
# If last entry and still unavailable
elif i == len(entries) - 1:
if current_downtime_start is not None:
downtime = (next_changed - current_downtime_start).total_seconds()
if downtime > max_downtime:
max_downtime = downtime
current_downtime_start = None
else:
# Available state — if there was a downtime running, end it
if current_downtime_start is not None:
downtime = (changed - current_downtime_start).total_seconds()
if downtime > max_downtime:
max_downtime = downtime
current_downtime_start = None
# Handle case where device was unavailable at period_start
# (first entry state determines status from period_start to first change)
if entries and entries[0]["state"] in ("unavailable", "unknown"):
first_change = parse_datetime(entries[0]["last_changed"])
if first_change > period_start:
pre_duration = (first_change - period_start).total_seconds()
unavailable_seconds += pre_duration
# This counts as one transition if not already counted
if current_downtime_start is None:
down_count += 1
# Handle trailing unavailable (last entry is unavailable)
if entries and entries[-1]["state"] in ("unavailable", "unknown"):
last_change = parse_datetime(entries[-1]["last_changed"])
trailing = (period_end - last_change).total_seconds()
if trailing > 0 and current_downtime_start is None:
# Already counted in the loop
pass
pct = round((1 - unavailable_seconds / total_seconds) * 100, 1) if total_seconds > 0 else 100.0
pct = max(0.0, min(100.0, pct))
return {
"availability_pct": pct,
"down_count": down_count,
"max_downtime_minutes": round(max_downtime / 60),
"last_downtime": last_downtime.isoformat() if last_downtime else None,
}
def compute_sparkline(history_entries: list, period_end: datetime, days: int = 7) -> list:
"""Compute daily availability percentages for sparkline (last N days)."""
points = []
for i in range(days - 1, -1, -1):
day_start = period_end - timedelta(days=i + 1)
day_end = period_end - timedelta(days=i)
# Filter entries relevant to this day
day_entries = []
for e in history_entries:
changed = parse_datetime(e["last_changed"])
if changed < day_end and changed >= day_start - timedelta(days=1):
day_entries.append(e)
result = compute_availability(day_entries, day_start, day_end)
points.append(result["availability_pct"])
return points