Files
wiki/tasks/karaoke/transcribe.py
2026-04-30 00:50:01 +03:00

184 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
transcribe.py — Whisper транскрипция аудио → [{start, end, text}]
Поддержка:
- faster-whisper (CPU) по умолчанию
- Готовые файлы .lrc, .srt (парсинг таймингов)
- Простой .txt (forced alignment через Whisper word-timestamps)
"""
import os
import re
import sys
import json
import subprocess
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Whisper транскрипция
# ---------------------------------------------------------------------------
def transcribe_whisper(audio_path: str, model_size: str = "base",
language: Optional[str] = None,
device: str = "cpu") -> list[dict]:
"""Запуск faster-whisper. Возвращает список сегментов [{start, end, text}]."""
try:
from faster_whisper import WhisperModel
except ImportError:
print("[transcribe] faster-whisper не установлен. Ставим...", flush=True)
os.system(f"{sys.executable} -m pip install faster-whisper")
from faster_whisper import WhisperModel
model = WhisperModel(model_size, device=device, compute_type="int8")
segments, _ = model.transcribe(audio_path, language=language,
word_timestamps=True)
results = []
for seg in segments:
text = seg.text.strip()
if not text:
continue
results.append({
"start": round(seg.start, 2),
"end": round(seg.end, 2),
"text": text,
})
return results
# ---------------------------------------------------------------------------
# Парсинг .lrc
# ---------------------------------------------------------------------------
_RE_LRC = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2,3})\](.*)")
def parse_lrc(path: str) -> list[dict]:
lines = []
for line in Path(path).read_text(encoding="utf-8").splitlines():
m = _RE_LRC.match(line.strip())
if not m:
continue
mins, secs, frac, text = m.groups()
frac = frac.ljust(3, "0")[:3]
timestamp = int(mins) * 60 + int(secs) + int(frac) / 1000
text = text.strip()
if text:
lines.append({"start": round(timestamp, 2),
"end": None, # заполним позже
"text": text})
# Длительность каждой строки = начало следующей
for i in range(len(lines) - 1):
lines[i]["end"] = lines[i + 1]["start"]
if lines:
lines[-1]["end"] = lines[-1]["start"] + 4.0
return lines
# ---------------------------------------------------------------------------
# Парсинг .srt
# ---------------------------------------------------------------------------
_RE_SRT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2}),(\d{3})")
def _srt_ts_to_sec(h, m, s, ms):
return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000
def parse_srt(path: str) -> list[dict]:
results = []
blocks = re.split(r"\n\s*\n", Path(path).read_text(encoding="utf-8").strip())
for block in blocks:
parts = block.strip().splitlines()
if len(parts) < 3:
continue
m_from = _RE_SRT_TIME.match(parts[1].split(" --> ")[0].strip())
m_to = _RE_SRT_TIME.match(parts[1].split(" --> ")[1].strip())
if not m_from or not m_to:
continue
start = _srt_ts_to_sec(*m_from.groups())
end = _srt_ts_to_sec(*m_to.groups())
text = " ".join(parts[2:]).strip()
if text:
results.append({"start": round(start, 2),
"end": round(end, 2),
"text": text})
return results
# ---------------------------------------------------------------------------
# .txt forced alignment — равномерное распределение по длительности аудио
# ---------------------------------------------------------------------------
def _get_audio_duration(audio_path: str) -> float:
"""Получить длительность аудио через ffmpeg (ffprobe недоступен)."""
import shutil
ffmpeg_bin = shutil.which("ffmpeg") or "/home/node/bin/ffmpeg"
# Используем ffmpeg -i для получения длительности
result = subprocess.run(
[ffmpeg_bin, "-i", audio_path],
capture_output=True, text=True
)
# ffmpeg пишет Duration в stderr
import re
m = re.search(r"Duration: (\d+):(\d+):([\d.]+)", result.stderr)
if m:
h, mn, s = m.groups()
return int(h) * 3600 + int(mn) * 60 + float(s)
raise RuntimeError(f"Не удалось получить длительность аудио: {audio_path}")
def align_txt(audio_path: str, txt_path: str,
model_size: str = "base",
device: str = "cpu") -> list[dict]:
"""Равномерно распределить строки .txt по длительности аудио."""
txt_lines = Path(txt_path).read_text(encoding="utf-8").splitlines()
txt_lines = [l.strip() for l in txt_lines if l.strip()]
total_lines = len(txt_lines)
if total_lines == 0:
return []
duration = _get_audio_duration(audio_path)
slot = duration / total_lines
return [{"start": round(i * slot, 2),
"end": round((i + 1) * slot, 2),
"text": txt_lines[i]} for i in range(total_lines)]
# ---------------------------------------------------------------------------
# Главная функция
# ---------------------------------------------------------------------------
def transcribe(audio_path: str, text_path: Optional[str] = None,
model_size: str = "base", device: str = "cpu",
language: Optional[str] = None) -> list[dict]:
"""
Универсальная функция транскрипции.
Параметры:
audio_path — путь к аудиофайлу
text_path — путь к .lrc/.srt/.txt (опционально)
model_size — размер Whisper-модели (tiny, base, small, medium, large)
device — "cpu" или "cuda"
language — код языка (напр. "ru", "en") или None для авто
Возвращает: [{start, end, text}, ...]
"""
if text_path:
ext = Path(text_path).suffix.lower()
if ext == ".lrc":
return parse_lrc(text_path)
elif ext == ".srt":
return parse_srt(text_path)
elif ext == ".txt":
return align_txt(audio_path, text_path, model_size, device)
else:
print(f"[transcribe] Неподдерживаемый формат: {text_path}")
sys.exit(1)
else:
print(f"[transcribe] Запускаем Whisper ({model_size}, {device})…")
return transcribe_whisper(audio_path, model_size, language, device)