auto-sync: 2026-04-15 14:20:01

This commit is contained in:
Stream
2026-04-15 14:20:02 +03:00
parent f5c13ecacc
commit a89d1285d2
3 changed files with 599 additions and 0 deletions

View File

@@ -0,0 +1,266 @@
"""Utility functions for availability calculator."""
import re
from datetime import datetime, timezone, timedelta
# Domains to track (phase 1)
TRACKED_DOMAINS = {"light", "switch"}
# Entity suffixes to exclude (Zigbee2MQTT helpers, etc.)
EXCLUDE_SUFFIXES = (
"_battery_low", "_battery", "_linkquality", "_update",
"_update_state", "_identify",
)
# Additional exclusions for switch domain (relay settings, not real devices)
SWITCH_EXCLUDE_SUFFIXES = (
"_delayed_power_on_state", "_detach_relay_mode",
"_network_indicator", "_turbo_mode", "_do_not_disturb",
)
SWITCH_EXCLUDE_EXACT = {"switch.zigbee2mqtt_bridge_permit_join"}
# Auto-generated Zigbee select/number patterns
ZIGBEE_AUTO_PATTERN = re.compile(
r"(select|number)\.(zigbee2mqtt_|.*_(radar_sensitivity|sensitivity|mode|level|delay|threshold|interval|timeout))",
re.IGNORECASE,
)
# Update/button domain patterns
UPDATE_BUTTON_EXCLUDE = re.compile(r"^(update|button)\.", re.IGNORECASE)
def is_excluded(entity_id: str) -> bool:
"""Check if entity should be excluded from tracking."""
domain = entity_id.split(".")[0]
name = entity_id.split(".")[1] if "." in entity_id else ""
# Exact exclusions
if entity_id in SWITCH_EXCLUDE_EXACT:
return True
# Suffix exclusions
for suffix in EXCLUDE_SUFFIXES:
if name.endswith(suffix):
return True
# Switch-specific suffix exclusions
if domain == "switch":
for suffix in SWITCH_EXCLUDE_SUFFIXES:
if name.endswith(suffix):
return True
return False
def sanitize_entity_id(entity_id: str) -> str:
"""Convert entity_id to safe sensor name: light.bra_v_spalne → light_bra_v_spalne."""
return entity_id.replace(".", "_")
def sanitize_area_name(name: str) -> str:
"""Convert area name to safe sensor suffix."""
s = name.lower().strip()
s = re.sub(r"[^a-zа-яё0-9_]", "_", s)
s = re.sub(r"_+", "_", s)
return s.strip("_")
def domain_icon(domain: str) -> str:
"""Return icon for domain."""
return {
"light": "💡",
"switch": "🔌",
"binary_sensor": "👁️",
"sensor": "🌡️",
"climate": "❄️",
"cover": "🚪",
"lock": "🔒",
"media_player": "🔊",
"device_tracker": "📍",
"vacuum": "🤖",
"fan": "🌀",
"humidifier": "💨",
"water_heater": "🚿",
"siren": "🚨",
"button": "🔘",
}.get(domain, "📡")
def get_color(pct: float) -> str:
"""Return color bucket for availability percentage."""
if pct >= 99:
return "green"
elif pct >= 95:
return "yellow"
elif pct >= 90:
return "orange"
return "red"
def calc_trend(current_pct: float, previous_pct: float, threshold: float = 0.5) -> str:
"""Compare current vs previous period. Returns up/down/stable."""
diff = current_pct - previous_pct
if diff > threshold:
return "up"
elif diff < -threshold:
return "down"
return "stable"
def format_downtime(minutes: int) -> str:
"""Format downtime minutes to human-readable string."""
if minutes < 60:
return f"{minutes} мин"
h, m = divmod(minutes, 60)
if h < 24:
return f"{h}ч {m}мин"
d, h = divmod(h, 24)
return f"{d}д {h}ч {m}мин"
def parse_datetime(dt_str: str) -> datetime:
"""Parse HA datetime string to timezone-aware datetime."""
# HA format: 2026-04-14T15:32:00+03:00 or 2026-04-14T15:32:00.123456+03:00
dt_str = dt_str.rstrip("Z")
if "+" in dt_str[10:] or dt_str.endswith("Z"):
# Has timezone
if "+" in dt_str:
# Python fromisoformat handles this in 3.7+
return datetime.fromisoformat(dt_str)
return datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
# No timezone — assume UTC
dt = datetime.fromisoformat(dt_str)
return dt.replace(tzinfo=timezone.utc)
def period_timedelta(period: str) -> timedelta:
"""Convert period string to timedelta."""
return {"24h": timedelta(hours=24), "7d": timedelta(days=7), "30d": timedelta(days=30)}[period]
def chunk_list(lst: list, size: int) -> list:
"""Split list into chunks of given size."""
return [lst[i:i + size] for i in range(0, len(lst), size)]
def compute_availability(history_entries: list, period_start: datetime, period_end: datetime) -> dict:
"""
Compute availability metrics from HA history entries.
Each entry is a dict with 'state' and 'last_changed'.
Returns dict with: availability_pct, down_count, max_downtime_minutes, last_downtime
"""
unavailable_seconds = 0
down_count = 0
max_downtime = 0
current_downtime_start = None
last_downtime = None
total_seconds = (period_end - period_start).total_seconds()
if not history_entries or total_seconds <= 0:
return {
"availability_pct": 100.0,
"down_count": 0,
"max_downtime_minutes": 0,
"last_downtime": None,
}
# Sort by last_changed
entries = sorted(history_entries, key=lambda e: parse_datetime(e["last_changed"]))
for i, entry in enumerate(entries):
state = entry["state"]
changed = parse_datetime(entry["last_changed"])
# Clamp to period
if changed < period_start:
changed = period_start
# Determine next change time
if i + 1 < len(entries):
next_changed = parse_datetime(entries[i + 1]["last_changed"])
else:
next_changed = period_end
if next_changed > period_end:
next_changed = period_end
if state in ("unavailable", "unknown"):
if current_downtime_start is None:
# Transition to unavailable
current_downtime_start = changed
down_count += 1
duration = (next_changed - changed).total_seconds()
if duration > 0:
unavailable_seconds += duration
last_downtime = changed
# Check if this is the end of a downtime period
if i + 1 < len(entries) and entries[i + 1]["state"] not in ("unavailable", "unknown"):
if current_downtime_start is not None:
downtime = (next_changed - current_downtime_start).total_seconds()
if downtime > max_downtime:
max_downtime = downtime
current_downtime_start = None
# If last entry and still unavailable
elif i == len(entries) - 1:
if current_downtime_start is not None:
downtime = (next_changed - current_downtime_start).total_seconds()
if downtime > max_downtime:
max_downtime = downtime
current_downtime_start = None
else:
# Available state — if there was a downtime running, end it
if current_downtime_start is not None:
downtime = (changed - current_downtime_start).total_seconds()
if downtime > max_downtime:
max_downtime = downtime
current_downtime_start = None
# Handle case where device was unavailable at period_start
# (first entry state determines status from period_start to first change)
if entries and entries[0]["state"] in ("unavailable", "unknown"):
first_change = parse_datetime(entries[0]["last_changed"])
if first_change > period_start:
pre_duration = (first_change - period_start).total_seconds()
unavailable_seconds += pre_duration
# This counts as one transition if not already counted
if current_downtime_start is None:
down_count += 1
# Handle trailing unavailable (last entry is unavailable)
if entries and entries[-1]["state"] in ("unavailable", "unknown"):
last_change = parse_datetime(entries[-1]["last_changed"])
trailing = (period_end - last_change).total_seconds()
if trailing > 0 and current_downtime_start is None:
# Already counted in the loop
pass
pct = round((1 - unavailable_seconds / total_seconds) * 100, 1) if total_seconds > 0 else 100.0
pct = max(0.0, min(100.0, pct))
return {
"availability_pct": pct,
"down_count": down_count,
"max_downtime_minutes": round(max_downtime / 60),
"last_downtime": last_downtime.isoformat() if last_downtime else None,
}
def compute_sparkline(history_entries: list, period_end: datetime, days: int = 7) -> list:
"""Compute daily availability percentages for sparkline (last N days)."""
points = []
for i in range(days - 1, -1, -1):
day_start = period_end - timedelta(days=i + 1)
day_end = period_end - timedelta(days=i)
# Filter entries relevant to this day
day_entries = []
for e in history_entries:
changed = parse_datetime(e["last_changed"])
if changed < day_end and changed >= day_start - timedelta(days=1):
day_entries.append(e)
result = compute_availability(day_entries, day_start, day_end)
points.append(result["availability_pct"])
return points