Files
wiki/tasks/karaoke/transcribe.py
2026-04-30 02:10:01 +03:00

382 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
transcribe.py — Whisper транскрипция аудио → [{start, end, text}]
Поддержка:
- faster-whisper (CPU) по умолчанию
- Готовые файлы .lrc, .srt (парсинг таймингов)
- Простой .txt (forced alignment через Whisper word-timestamps)
"""
import os
import re
import sys
import json
import subprocess
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
# ---------------------------------------------------------------------------
# Whisper транскрипция
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# OpenAI Whisper API (облако)
# ---------------------------------------------------------------------------
def transcribe_openai_api(audio_path: str, language: str = "ru") -> list[dict]:
"""Транскрипция через OpenAI Whisper API. Возвращает сегменты с word-timestamps."""
import httpx
load_dotenv(os.path.expanduser("~/.openclaw/.env"))
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY не задан в окружении / ~/.openclaw/.env")
print("[transcribe] Отправляем аудио в OpenAI Whisper API…")
with open(audio_path, "rb") as f:
resp = httpx.post(
"https://api.openai.com/v1/audio/transcriptions",
headers={"Authorization": f"Bearer {api_key}"},
data={
"model": "whisper-1",
"response_format": "verbose_json",
"language": language,
"timestamp_granularities[]": ["word", "segment"],
},
files={"file": f},
timeout=300,
)
resp.raise_for_status()
data = resp.json()
# Собираем сегменты с word-timestamps
results = []
word_list = data.get("words", []) # плоский список слов с таймингами
segments = data.get("segments", [])
if word_list and segments:
# Распределяем слова по сегментам
seg_offset = 0
for seg in segments:
seg_start = seg["start"]
seg_end = seg["end"]
seg_text = seg.get("text", "").strip()
if not seg_text:
continue
# Собираем слова, попадающие в этот сегмент
seg_words = []
for w in word_list:
ws = w["start"]
we = w["end"]
ww = w.get("word", "")
# Слово принадлежит сегменту, если его начало попадает в диапазон
if ws >= seg_start - 0.05 and ws <= seg_end + 0.05:
seg_words.append({
"word": ww,
"start": round(ws, 2),
"end": round(we, 2),
})
results.append({
"start": round(seg_start, 2),
"end": round(seg_end, 2),
"text": seg_text,
"words": seg_words,
})
elif segments:
for seg in segments:
seg_text = seg.get("text", "").strip()
if not seg_text:
continue
results.append({
"start": round(seg["start"], 2),
"end": round(seg["end"], 2),
"text": seg_text,
})
print(f"[transcribe] OpenAI API вернул {len(results)} сегментов")
return results
# ---------------------------------------------------------------------------
# Whisper транскрипция (локальная)
# ---------------------------------------------------------------------------
def transcribe_whisper(audio_path: str, model_size: str = "base",
language: Optional[str] = None,
device: str = "cpu") -> list[dict]:
"""Запуск faster-whisper. Возвращает список сегментов [{start, end, text}]."""
try:
from faster_whisper import WhisperModel
except ImportError:
print("[transcribe] faster-whisper не установлен. Ставим...", flush=True)
os.system(f"{sys.executable} -m pip install faster-whisper")
from faster_whisper import WhisperModel
model = WhisperModel(model_size, device=device, compute_type="int8")
segments, _ = model.transcribe(audio_path, language=language,
word_timestamps=True)
results = []
for seg in segments:
text = seg.text.strip()
if not text:
continue
# Собираем word-timestamps если есть
words = []
if hasattr(seg, 'words') and seg.words:
for w in seg.words:
words.append({
"word": w.word,
"start": round(w.start, 2),
"end": round(w.end, 2),
})
entry = {
"start": round(seg.start, 2),
"end": round(seg.end, 2),
"text": text,
}
if words:
entry["words"] = words
results.append(entry)
return results
# ---------------------------------------------------------------------------
# Парсинг .lrc
# ---------------------------------------------------------------------------
_RE_LRC = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2,3})\](.*)")
def parse_lrc(path: str) -> list[dict]:
lines = []
for line in Path(path).read_text(encoding="utf-8").splitlines():
m = _RE_LRC.match(line.strip())
if not m:
continue
mins, secs, frac, text = m.groups()
frac = frac.ljust(3, "0")[:3]
timestamp = int(mins) * 60 + int(secs) + int(frac) / 1000
text = text.strip()
if text:
lines.append({"start": round(timestamp, 2),
"end": None, # заполним позже
"text": text})
# Длительность каждой строки = начало следующей
for i in range(len(lines) - 1):
lines[i]["end"] = lines[i + 1]["start"]
if lines:
lines[-1]["end"] = lines[-1]["start"] + 4.0
return lines
# ---------------------------------------------------------------------------
# Парсинг .srt
# ---------------------------------------------------------------------------
_RE_SRT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2}),(\d{3})")
def _srt_ts_to_sec(h, m, s, ms):
return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000
def parse_srt(path: str) -> list[dict]:
results = []
blocks = re.split(r"\n\s*\n", Path(path).read_text(encoding="utf-8").strip())
for block in blocks:
parts = block.strip().splitlines()
if len(parts) < 3:
continue
m_from = _RE_SRT_TIME.match(parts[1].split(" --> ")[0].strip())
m_to = _RE_SRT_TIME.match(parts[1].split(" --> ")[1].strip())
if not m_from or not m_to:
continue
start = _srt_ts_to_sec(*m_from.groups())
end = _srt_ts_to_sec(*m_to.groups())
text = " ".join(parts[2:]).strip()
if text:
results.append({"start": round(start, 2),
"end": round(end, 2),
"text": text})
return results
# ---------------------------------------------------------------------------
# .txt forced alignment — равномерное распределение по длительности аудио
# ---------------------------------------------------------------------------
def _get_audio_duration(audio_path: str) -> float:
"""Получить длительность аудио через ffmpeg (ffprobe недоступен)."""
import shutil
ffmpeg_bin = shutil.which("ffmpeg") or "/home/node/bin/ffmpeg"
# Используем ffmpeg -i для получения длительности
result = subprocess.run(
[ffmpeg_bin, "-i", audio_path],
capture_output=True, text=True
)
# ffmpeg пишет Duration в stderr
import re
m = re.search(r"Duration: (\d+):(\d+):([\d.]+)", result.stderr)
if m:
h, mn, s = m.groups()
return int(h) * 3600 + int(mn) * 60 + float(s)
raise RuntimeError(f"Не удалось получить длительность аудио: {audio_path}")
def align_txt(audio_path: str, txt_path: str,
model_size: str = "base",
device: str = "cpu") -> list[dict]:
"""Выровнить строки .txt по Whisper-таймингам.
Алгоритм:
1. Запустить Whisper (language='ru', word_timestamps=True)
2. Если получили >5 сегментов с реальным текстом (не 'Music','' и т.п.)
— выровнять строки текста по сегментам пропорционально
3. Fallback: равномерное распределение по длительности аудио
"""
txt_lines = Path(txt_path).read_text(encoding="utf-8").splitlines()
txt_lines = [l.strip() for l in txt_lines if l.strip()]
total_lines = len(txt_lines)
if total_lines == 0:
return []
# Попробовать выровнить через Whisper
_SKIPPED = re.compile(r"^(music|\[music\]|instrumental|♪+|♫*|\(music\)|$)", re.IGNORECASE)
try:
whisper_segs = transcribe_whisper(audio_path, model_size,
language="ru", device=device)
# Фильтруем «мусорные» сегменты
real_segs = [s for s in whisper_segs if not _SKIPPED.match(s["text"].strip())]
if len(real_segs) > 5:
# Есть достаточно реальных сегментов — выравниваем строки по ним
return _align_by_segments(txt_lines, real_segs)
except Exception as e:
print(f"[transcribe] Whisper alignment не удался ({e}), fallback на равномерное распределение")
# Fallback: равномерное распределение
return _align_uniform(txt_lines, audio_path)
def _align_uniform(txt_lines: list[str], audio_path: str) -> list[dict]:
"""Равномерно распределить строки по длительности аудио."""
total_lines = len(txt_lines)
duration = _get_audio_duration(audio_path)
slot = duration / total_lines
return [{"start": round(i * slot, 2),
"end": round((i + 1) * slot, 2),
"text": txt_lines[i]} for i in range(total_lines)]
def _align_by_segments(txt_lines: list[dict], whisper_segs: list[dict]) -> list[dict]:
"""Выровнять строки текста по Whisper-сегментам.
Каждый Whisper-сегмент может охватывать несколько строк текста.
Распределяем строки внутри диапазона сегмента пропорционально.
"""
if not txt_lines:
return []
# Если строк больше чем сегментов — распределяем строки по сегментам
num_segs = len(whisper_segs)
total_lines = len(txt_lines)
if total_lines <= num_segs:
# Строк меньше или столько же сколько сегментов — берём сегменты напрямую
# Берём первые total_lines сегментов
result = []
for i in range(total_lines):
seg = whisper_segs[i] if i < num_segs else whisper_segs[-1]
result.append({
"start": seg["start"],
"end": seg["end"],
"text": txt_lines[i]
})
return result
# Строк больше чем сегментов — распределяем пропорционально
result = []
seg_idx = 0
lines_per_seg = max(1, total_lines // num_segs)
for i, line in enumerate(txt_lines):
# Определяем к какому сегменту принадлежит строка
seg_idx = min(i // lines_per_seg, num_segs - 1)
# Границы текущего сегмента
seg_start = whisper_segs[seg_idx]["start"]
seg_end = whisper_segs[min(seg_idx + 1, num_segs - 1)]["start"] if seg_idx + 1 < num_segs else whisper_segs[seg_idx]["end"]
# Позиция внутри группы строк текущего сегмента
group_start = seg_idx * lines_per_seg
group_size = min(lines_per_seg, total_lines - group_start)
line_in_group = i - group_start
# Пропорциональное распределение внутри сегмента
if group_size > 1:
line_dur = (seg_end - seg_start) / group_size
line_start = seg_start + line_in_group * line_dur
line_end = line_start + line_dur
else:
line_start = seg_start
line_end = seg_end
result.append({
"start": round(line_start, 2),
"end": round(line_end, 2),
"text": line
})
return result
# ---------------------------------------------------------------------------
# Главная функция
# ---------------------------------------------------------------------------
def transcribe(audio_path: str, text_path: Optional[str] = None,
model_size: str = "base", device: str = "cpu",
language: Optional[str] = None,
use_api: bool = True) -> list[dict]:
"""
Универсальная функция транскрипции.
Параметры:
audio_path — путь к аудиофайлу
text_path — путь к .lrc/.srt/.txt (опционально)
model_size — размер Whisper-модели (tiny, base, small, medium, large)
device — "cpu" или "cuda"
language — код языка (напр. "ru", "en") или None для авто
use_api — использовать OpenAI API (по умолч. True)
Возвращает: [{start, end, text, words?}, ...]
"""
if text_path:
ext = Path(text_path).suffix.lower()
if ext == ".lrc":
return parse_lrc(text_path)
elif ext == ".srt":
return parse_srt(text_path)
elif ext == ".txt":
return align_txt(audio_path, text_path, model_size, device)
else:
print(f"[transcribe] Неподдерживаемый формат: {text_path}")
sys.exit(1)
else:
# Проверяем, можно ли использовать OpenAI API
if use_api:
load_dotenv(os.path.expanduser("~/.openclaw/.env"))
api_key = os.environ.get("OPENAI_API_KEY")
if api_key:
try:
return transcribe_openai_api(audio_path, language or "ru")
except Exception as e:
print(f"[transcribe] OpenAI API не удался ({e}), fallback на faster-whisper")
print(f"[transcribe] Запускаем Whisper ({model_size}, {device})…")
return transcribe_whisper(audio_path, model_size, language, device)