""" transcribe.py — Whisper транскрипция аудио → [{start, end, text}] Поддержка: - faster-whisper (CPU) по умолчанию - Готовые файлы .lrc, .srt (парсинг таймингов) - Простой .txt (forced alignment через Whisper word-timestamps) """ import os import re import sys import json import subprocess from pathlib import Path from typing import Optional from dotenv import load_dotenv # --------------------------------------------------------------------------- # Whisper транскрипция # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # OpenAI Whisper API (облако) # --------------------------------------------------------------------------- def transcribe_openai_api(audio_path: str, language: str = "ru") -> list[dict]: """Транскрипция через OpenAI Whisper API. Возвращает сегменты с word-timestamps.""" import httpx load_dotenv(os.path.expanduser("~/.openclaw/.env")) api_key = os.environ.get("OPENAI_API_KEY") if not api_key: raise RuntimeError("OPENAI_API_KEY не задан в окружении / ~/.openclaw/.env") print("[transcribe] Отправляем аудио в OpenAI Whisper API…") with open(audio_path, "rb") as f: resp = httpx.post( "https://api.openai.com/v1/audio/transcriptions", headers={"Authorization": f"Bearer {api_key}"}, data={ "model": "whisper-1", "response_format": "verbose_json", "language": language, "timestamp_granularities[]": ["word", "segment"], }, files={"file": f}, timeout=300, ) resp.raise_for_status() data = resp.json() # Собираем сегменты с word-timestamps results = [] word_list = data.get("words", []) # плоский список слов с таймингами segments = data.get("segments", []) if word_list and segments: # Распределяем слова по сегментам seg_offset = 0 for seg in segments: seg_start = seg["start"] seg_end = seg["end"] seg_text = seg.get("text", "").strip() if not seg_text: continue # Собираем слова, попадающие в этот сегмент seg_words = [] for w in word_list: ws = w["start"] we = w["end"] ww = w.get("word", "") # Слово принадлежит сегменту, если его начало попадает в диапазон if ws >= seg_start - 0.05 and ws <= seg_end + 0.05: seg_words.append({ "word": ww, "start": round(ws, 2), "end": round(we, 2), }) results.append({ "start": round(seg_start, 2), "end": round(seg_end, 2), "text": seg_text, "words": seg_words, }) elif segments: for seg in segments: seg_text = seg.get("text", "").strip() if not seg_text: continue results.append({ "start": round(seg["start"], 2), "end": round(seg["end"], 2), "text": seg_text, }) print(f"[transcribe] OpenAI API вернул {len(results)} сегментов") return results # --------------------------------------------------------------------------- # Whisper транскрипция (локальная) # --------------------------------------------------------------------------- def transcribe_whisper(audio_path: str, model_size: str = "base", language: Optional[str] = None, device: str = "cpu") -> list[dict]: """Запуск faster-whisper. Возвращает список сегментов [{start, end, text}].""" try: from faster_whisper import WhisperModel except ImportError: print("[transcribe] faster-whisper не установлен. Ставим...", flush=True) os.system(f"{sys.executable} -m pip install faster-whisper") from faster_whisper import WhisperModel model = WhisperModel(model_size, device=device, compute_type="int8") segments, _ = model.transcribe(audio_path, language=language, word_timestamps=True) results = [] for seg in segments: text = seg.text.strip() if not text: continue # Собираем word-timestamps если есть words = [] if hasattr(seg, 'words') and seg.words: for w in seg.words: words.append({ "word": w.word, "start": round(w.start, 2), "end": round(w.end, 2), }) entry = { "start": round(seg.start, 2), "end": round(seg.end, 2), "text": text, } if words: entry["words"] = words results.append(entry) return results # --------------------------------------------------------------------------- # Парсинг .lrc # --------------------------------------------------------------------------- _RE_LRC = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2,3})\](.*)") def parse_lrc(path: str) -> list[dict]: lines = [] for line in Path(path).read_text(encoding="utf-8").splitlines(): m = _RE_LRC.match(line.strip()) if not m: continue mins, secs, frac, text = m.groups() frac = frac.ljust(3, "0")[:3] timestamp = int(mins) * 60 + int(secs) + int(frac) / 1000 text = text.strip() if text: lines.append({"start": round(timestamp, 2), "end": None, # заполним позже "text": text}) # Длительность каждой строки = начало следующей for i in range(len(lines) - 1): lines[i]["end"] = lines[i + 1]["start"] if lines: lines[-1]["end"] = lines[-1]["start"] + 4.0 return lines # --------------------------------------------------------------------------- # Парсинг .srt # --------------------------------------------------------------------------- _RE_SRT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2}),(\d{3})") def _srt_ts_to_sec(h, m, s, ms): return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000 def parse_srt(path: str) -> list[dict]: results = [] blocks = re.split(r"\n\s*\n", Path(path).read_text(encoding="utf-8").strip()) for block in blocks: parts = block.strip().splitlines() if len(parts) < 3: continue m_from = _RE_SRT_TIME.match(parts[1].split(" --> ")[0].strip()) m_to = _RE_SRT_TIME.match(parts[1].split(" --> ")[1].strip()) if not m_from or not m_to: continue start = _srt_ts_to_sec(*m_from.groups()) end = _srt_ts_to_sec(*m_to.groups()) text = " ".join(parts[2:]).strip() if text: results.append({"start": round(start, 2), "end": round(end, 2), "text": text}) return results # --------------------------------------------------------------------------- # .txt forced alignment — равномерное распределение по длительности аудио # --------------------------------------------------------------------------- def _get_audio_duration(audio_path: str) -> float: """Получить длительность аудио через ffmpeg (ffprobe недоступен).""" import shutil ffmpeg_bin = shutil.which("ffmpeg") or "/home/node/bin/ffmpeg" # Используем ffmpeg -i для получения длительности result = subprocess.run( [ffmpeg_bin, "-i", audio_path], capture_output=True, text=True ) # ffmpeg пишет Duration в stderr import re m = re.search(r"Duration: (\d+):(\d+):([\d.]+)", result.stderr) if m: h, mn, s = m.groups() return int(h) * 3600 + int(mn) * 60 + float(s) raise RuntimeError(f"Не удалось получить длительность аудио: {audio_path}") def align_txt(audio_path: str, txt_path: str, model_size: str = "base", device: str = "cpu") -> list[dict]: """Выровнить строки .txt по Whisper-таймингам. Алгоритм: 1. Запустить Whisper (language='ru', word_timestamps=True) 2. Если получили >5 сегментов с реальным текстом (не 'Music','♪' и т.п.) — выровнять строки текста по сегментам пропорционально 3. Fallback: равномерное распределение по длительности аудио """ txt_lines = Path(txt_path).read_text(encoding="utf-8").splitlines() txt_lines = [l.strip() for l in txt_lines if l.strip()] total_lines = len(txt_lines) if total_lines == 0: return [] # Попробовать выровнить через Whisper _SKIPPED = re.compile(r"^(music|\[music\]|instrumental|♪+|♫*|\(music\)|$)", re.IGNORECASE) try: whisper_segs = transcribe_whisper(audio_path, model_size, language="ru", device=device) # Фильтруем «мусорные» сегменты real_segs = [s for s in whisper_segs if not _SKIPPED.match(s["text"].strip())] if len(real_segs) > 5: # Есть достаточно реальных сегментов — выравниваем строки по ним return _align_by_segments(txt_lines, real_segs) except Exception as e: print(f"[transcribe] Whisper alignment не удался ({e}), fallback на равномерное распределение") # Fallback: равномерное распределение return _align_uniform(txt_lines, audio_path) def _align_uniform(txt_lines: list[str], audio_path: str) -> list[dict]: """Равномерно распределить строки по длительности аудио.""" total_lines = len(txt_lines) duration = _get_audio_duration(audio_path) slot = duration / total_lines return [{"start": round(i * slot, 2), "end": round((i + 1) * slot, 2), "text": txt_lines[i]} for i in range(total_lines)] def _align_by_segments(txt_lines: list[dict], whisper_segs: list[dict]) -> list[dict]: """Выровнять строки текста по Whisper-сегментам. Каждый Whisper-сегмент может охватывать несколько строк текста. Распределяем строки внутри диапазона сегмента пропорционально. """ if not txt_lines: return [] # Если строк больше чем сегментов — распределяем строки по сегментам num_segs = len(whisper_segs) total_lines = len(txt_lines) if total_lines <= num_segs: # Строк меньше или столько же сколько сегментов — берём сегменты напрямую # Берём первые total_lines сегментов result = [] for i in range(total_lines): seg = whisper_segs[i] if i < num_segs else whisper_segs[-1] result.append({ "start": seg["start"], "end": seg["end"], "text": txt_lines[i] }) return result # Строк больше чем сегментов — распределяем пропорционально result = [] seg_idx = 0 lines_per_seg = max(1, total_lines // num_segs) for i, line in enumerate(txt_lines): # Определяем к какому сегменту принадлежит строка seg_idx = min(i // lines_per_seg, num_segs - 1) # Границы текущего сегмента seg_start = whisper_segs[seg_idx]["start"] seg_end = whisper_segs[min(seg_idx + 1, num_segs - 1)]["start"] if seg_idx + 1 < num_segs else whisper_segs[seg_idx]["end"] # Позиция внутри группы строк текущего сегмента group_start = seg_idx * lines_per_seg group_size = min(lines_per_seg, total_lines - group_start) line_in_group = i - group_start # Пропорциональное распределение внутри сегмента if group_size > 1: line_dur = (seg_end - seg_start) / group_size line_start = seg_start + line_in_group * line_dur line_end = line_start + line_dur else: line_start = seg_start line_end = seg_end result.append({ "start": round(line_start, 2), "end": round(line_end, 2), "text": line }) return result # --------------------------------------------------------------------------- # Главная функция # --------------------------------------------------------------------------- def transcribe(audio_path: str, text_path: Optional[str] = None, model_size: str = "base", device: str = "cpu", language: Optional[str] = None, use_api: bool = True) -> list[dict]: """ Универсальная функция транскрипции. Параметры: audio_path — путь к аудиофайлу text_path — путь к .lrc/.srt/.txt (опционально) model_size — размер Whisper-модели (tiny, base, small, medium, large) device — "cpu" или "cuda" language — код языка (напр. "ru", "en") или None для авто use_api — использовать OpenAI API (по умолч. True) Возвращает: [{start, end, text, words?}, ...] """ if text_path: ext = Path(text_path).suffix.lower() if ext == ".lrc": return parse_lrc(text_path) elif ext == ".srt": return parse_srt(text_path) elif ext == ".txt": return align_txt(audio_path, text_path, model_size, device) else: print(f"[transcribe] Неподдерживаемый формат: {text_path}") sys.exit(1) else: # Проверяем, можно ли использовать OpenAI API if use_api: load_dotenv(os.path.expanduser("~/.openclaw/.env")) api_key = os.environ.get("OPENAI_API_KEY") if api_key: try: return transcribe_openai_api(audio_path, language or "ru") except Exception as e: print(f"[transcribe] OpenAI API не удался ({e}), fallback на faster-whisper") print(f"[transcribe] Запускаем Whisper ({model_size}, {device})…") return transcribe_whisper(audio_path, model_size, language, device)