280 lines
9.7 KiB
Python
280 lines
9.7 KiB
Python
"""Utility functions for availability calculator."""
|
||
|
||
import re
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
# Domains to track (phase 1)
|
||
TRACKED_DOMAINS = {"light", "switch"}
|
||
|
||
# Entity suffixes to exclude (Zigbee2MQTT helpers, etc.)
|
||
EXCLUDE_SUFFIXES = (
|
||
"_battery_low", "_battery", "_linkquality", "_update",
|
||
"_update_state", "_identify",
|
||
)
|
||
|
||
# Additional exclusions for switch domain (relay settings, not real devices)
|
||
SWITCH_EXCLUDE_SUFFIXES = (
|
||
"_delayed_power_on_state", "_detach_relay_mode",
|
||
"_network_indicator", "_turbo_mode", "_do_not_disturb",
|
||
)
|
||
|
||
SWITCH_EXCLUDE_EXACT = {"switch.zigbee2mqtt_bridge_permit_join"}
|
||
|
||
# Auto-generated Zigbee select/number patterns
|
||
ZIGBEE_AUTO_PATTERN = re.compile(
|
||
r"(select|number)\.(zigbee2mqtt_|.*_(radar_sensitivity|sensitivity|mode|level|delay|threshold|interval|timeout))",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
# Update/button domain patterns
|
||
UPDATE_BUTTON_EXCLUDE = re.compile(r"^(update|button)\.", re.IGNORECASE)
|
||
|
||
|
||
def is_excluded(entity_id: str) -> bool:
|
||
"""Check if entity should be excluded from tracking."""
|
||
domain = entity_id.split(".")[0]
|
||
name = entity_id.split(".")[1] if "." in entity_id else ""
|
||
|
||
# Exact exclusions
|
||
if entity_id in SWITCH_EXCLUDE_EXACT:
|
||
return True
|
||
|
||
# Suffix exclusions
|
||
for suffix in EXCLUDE_SUFFIXES:
|
||
if name.endswith(suffix):
|
||
return True
|
||
|
||
# Switch-specific suffix exclusions
|
||
if domain == "switch":
|
||
for suffix in SWITCH_EXCLUDE_SUFFIXES:
|
||
if name.endswith(suffix):
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def sanitize_entity_id(entity_id: str) -> str:
|
||
"""Convert entity_id to safe sensor name: light.bra_v_spalne → light_bra_v_spalne."""
|
||
return entity_id.replace(".", "_")
|
||
|
||
|
||
# Cyrillic transliteration table
|
||
_CYRILLIC_MAP = {
|
||
'а': 'a', 'б': 'b', 'в': 'v', 'г': 'g', 'д': 'd', 'е': 'e', 'ё': 'yo',
|
||
'ж': 'zh', 'з': 'z', 'и': 'i', 'й': 'y', 'к': 'k', 'л': 'l', 'м': 'm',
|
||
'н': 'n', 'о': 'o', 'п': 'p', 'р': 'r', 'с': 's', 'т': 't', 'у': 'u',
|
||
'ф': 'f', 'х': 'kh', 'ц': 'ts', 'ч': 'ch', 'ш': 'sh', 'щ': 'shch',
|
||
'ъ': '', 'ы': 'y', 'ь': '', 'э': 'e', 'ю': 'yu', 'я': 'ya',
|
||
}
|
||
|
||
|
||
def sanitize_area_name(name: str) -> str:
|
||
"""Convert area name to safe sensor suffix (Latin only)."""
|
||
s = name.lower().strip()
|
||
# Transliterate Cyrillic to Latin
|
||
s = ''.join(_CYRILLIC_MAP.get(ch, ch) for ch in s)
|
||
# Keep only a-z, 0-9, _; replace everything else
|
||
s = re.sub(r"[^a-z0-9_]", "_", s)
|
||
s = re.sub(r"_+", "_", s)
|
||
return s.strip("_")
|
||
|
||
|
||
def domain_icon(domain: str) -> str:
|
||
"""Return icon for domain."""
|
||
return {
|
||
"light": "💡",
|
||
"switch": "🔌",
|
||
"binary_sensor": "👁️",
|
||
"sensor": "🌡️",
|
||
"climate": "❄️",
|
||
"cover": "🚪",
|
||
"lock": "🔒",
|
||
"media_player": "🔊",
|
||
"device_tracker": "📍",
|
||
"vacuum": "🤖",
|
||
"fan": "🌀",
|
||
"humidifier": "💨",
|
||
"water_heater": "🚿",
|
||
"siren": "🚨",
|
||
"button": "🔘",
|
||
}.get(domain, "📡")
|
||
|
||
|
||
def get_color(pct: float) -> str:
|
||
"""Return color bucket for availability percentage."""
|
||
if pct >= 99:
|
||
return "green"
|
||
elif pct >= 95:
|
||
return "yellow"
|
||
elif pct >= 90:
|
||
return "orange"
|
||
return "red"
|
||
|
||
|
||
def calc_trend(current_pct: float, previous_pct: float, threshold: float = 0.5) -> str:
|
||
"""Compare current vs previous period. Returns up/down/stable."""
|
||
diff = current_pct - previous_pct
|
||
if diff > threshold:
|
||
return "up"
|
||
elif diff < -threshold:
|
||
return "down"
|
||
return "stable"
|
||
|
||
|
||
def format_downtime(minutes: int) -> str:
|
||
"""Format downtime minutes to human-readable string."""
|
||
if minutes < 60:
|
||
return f"{minutes} мин"
|
||
h, m = divmod(minutes, 60)
|
||
if h < 24:
|
||
return f"{h}ч {m}мин"
|
||
d, h = divmod(h, 24)
|
||
return f"{d}д {h}ч {m}мин"
|
||
|
||
|
||
def parse_datetime(dt_str: str) -> datetime:
|
||
"""Parse HA datetime string to timezone-aware datetime."""
|
||
# HA format: 2026-04-14T15:32:00+03:00 or 2026-04-14T15:32:00.123456+03:00
|
||
dt_str = dt_str.rstrip("Z")
|
||
if "+" in dt_str[10:] or dt_str.endswith("Z"):
|
||
# Has timezone
|
||
if "+" in dt_str:
|
||
# Python fromisoformat handles this in 3.7+
|
||
return datetime.fromisoformat(dt_str)
|
||
return datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
|
||
# No timezone — assume UTC
|
||
dt = datetime.fromisoformat(dt_str)
|
||
return dt.replace(tzinfo=timezone.utc)
|
||
|
||
|
||
def period_timedelta(period: str) -> timedelta:
|
||
"""Convert period string to timedelta."""
|
||
return {"24h": timedelta(hours=24), "7d": timedelta(days=7), "30d": timedelta(days=30)}[period]
|
||
|
||
|
||
def chunk_list(lst: list, size: int) -> list:
|
||
"""Split list into chunks of given size."""
|
||
return [lst[i:i + size] for i in range(0, len(lst), size)]
|
||
|
||
|
||
def compute_availability(history_entries: list, period_start: datetime, period_end: datetime) -> dict:
|
||
"""
|
||
Compute availability metrics from HA history entries.
|
||
|
||
Each entry is a dict with 'state' and 'last_changed'.
|
||
Returns dict with: availability_pct, down_count, max_downtime_minutes, last_downtime
|
||
"""
|
||
unavailable_seconds = 0
|
||
down_count = 0
|
||
max_downtime = 0
|
||
current_downtime_start = None
|
||
last_downtime = None
|
||
total_seconds = (period_end - period_start).total_seconds()
|
||
|
||
if not history_entries or total_seconds <= 0:
|
||
return {
|
||
"availability_pct": 100.0,
|
||
"down_count": 0,
|
||
"max_downtime_minutes": 0,
|
||
"last_downtime": None,
|
||
}
|
||
|
||
# Sort by last_changed
|
||
entries = sorted(history_entries, key=lambda e: parse_datetime(e["last_changed"]))
|
||
|
||
for i, entry in enumerate(entries):
|
||
state = entry["state"]
|
||
changed = parse_datetime(entry["last_changed"])
|
||
|
||
# Clamp to period
|
||
if changed < period_start:
|
||
changed = period_start
|
||
|
||
# Determine next change time
|
||
if i + 1 < len(entries):
|
||
next_changed = parse_datetime(entries[i + 1]["last_changed"])
|
||
else:
|
||
next_changed = period_end
|
||
|
||
if next_changed > period_end:
|
||
next_changed = period_end
|
||
|
||
if state in ("unavailable", "unknown"):
|
||
if current_downtime_start is None:
|
||
# Transition to unavailable
|
||
current_downtime_start = changed
|
||
down_count += 1
|
||
|
||
duration = (next_changed - changed).total_seconds()
|
||
if duration > 0:
|
||
unavailable_seconds += duration
|
||
last_downtime = changed
|
||
|
||
# Check if this is the end of a downtime period
|
||
if i + 1 < len(entries) and entries[i + 1]["state"] not in ("unavailable", "unknown"):
|
||
if current_downtime_start is not None:
|
||
downtime = (next_changed - current_downtime_start).total_seconds()
|
||
if downtime > max_downtime:
|
||
max_downtime = downtime
|
||
current_downtime_start = None
|
||
# If last entry and still unavailable
|
||
elif i == len(entries) - 1:
|
||
if current_downtime_start is not None:
|
||
downtime = (next_changed - current_downtime_start).total_seconds()
|
||
if downtime > max_downtime:
|
||
max_downtime = downtime
|
||
current_downtime_start = None
|
||
else:
|
||
# Available state — if there was a downtime running, end it
|
||
if current_downtime_start is not None:
|
||
downtime = (changed - current_downtime_start).total_seconds()
|
||
if downtime > max_downtime:
|
||
max_downtime = downtime
|
||
current_downtime_start = None
|
||
|
||
# Handle case where device was unavailable at period_start
|
||
# (first entry state determines status from period_start to first change)
|
||
if entries and entries[0]["state"] in ("unavailable", "unknown"):
|
||
first_change = parse_datetime(entries[0]["last_changed"])
|
||
if first_change > period_start:
|
||
pre_duration = (first_change - period_start).total_seconds()
|
||
unavailable_seconds += pre_duration
|
||
# This counts as one transition if not already counted
|
||
if current_downtime_start is None:
|
||
down_count += 1
|
||
|
||
# Handle trailing unavailable (last entry is unavailable)
|
||
if entries and entries[-1]["state"] in ("unavailable", "unknown"):
|
||
last_change = parse_datetime(entries[-1]["last_changed"])
|
||
trailing = (period_end - last_change).total_seconds()
|
||
if trailing > 0 and current_downtime_start is None:
|
||
# Already counted in the loop
|
||
pass
|
||
|
||
pct = round((1 - unavailable_seconds / total_seconds) * 100, 1) if total_seconds > 0 else 100.0
|
||
pct = max(0.0, min(100.0, pct))
|
||
|
||
return {
|
||
"availability_pct": pct,
|
||
"down_count": down_count,
|
||
"max_downtime_minutes": round(max_downtime / 60),
|
||
"last_downtime": last_downtime.isoformat() if last_downtime else None,
|
||
}
|
||
|
||
|
||
def compute_sparkline(history_entries: list, period_end: datetime, days: int = 7) -> list:
|
||
"""Compute daily availability percentages for sparkline (last N days)."""
|
||
points = []
|
||
for i in range(days - 1, -1, -1):
|
||
day_start = period_end - timedelta(days=i + 1)
|
||
day_end = period_end - timedelta(days=i)
|
||
# Filter entries relevant to this day
|
||
day_entries = []
|
||
for e in history_entries:
|
||
changed = parse_datetime(e["last_changed"])
|
||
if changed < day_end and changed >= day_start - timedelta(days=1):
|
||
day_entries.append(e)
|
||
result = compute_availability(day_entries, day_start, day_end)
|
||
points.append(result["availability_pct"])
|
||
return points
|