382 lines
15 KiB
Python
382 lines
15 KiB
Python
"""
|
||
transcribe.py — Whisper транскрипция аудио → [{start, end, text}]
|
||
|
||
Поддержка:
|
||
- faster-whisper (CPU) по умолчанию
|
||
- Готовые файлы .lrc, .srt (парсинг таймингов)
|
||
- Простой .txt (forced alignment через Whisper word-timestamps)
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import sys
|
||
import json
|
||
import subprocess
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
from dotenv import load_dotenv
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Whisper транскрипция
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# OpenAI Whisper API (облако)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def transcribe_openai_api(audio_path: str, language: str = "ru") -> list[dict]:
|
||
"""Транскрипция через OpenAI Whisper API. Возвращает сегменты с word-timestamps."""
|
||
import httpx
|
||
|
||
load_dotenv(os.path.expanduser("~/.openclaw/.env"))
|
||
api_key = os.environ.get("OPENAI_API_KEY")
|
||
if not api_key:
|
||
raise RuntimeError("OPENAI_API_KEY не задан в окружении / ~/.openclaw/.env")
|
||
|
||
print("[transcribe] Отправляем аудио в OpenAI Whisper API…")
|
||
|
||
with open(audio_path, "rb") as f:
|
||
resp = httpx.post(
|
||
"https://api.openai.com/v1/audio/transcriptions",
|
||
headers={"Authorization": f"Bearer {api_key}"},
|
||
data={
|
||
"model": "whisper-1",
|
||
"response_format": "verbose_json",
|
||
"language": language,
|
||
"timestamp_granularities[]": ["word", "segment"],
|
||
},
|
||
files={"file": f},
|
||
timeout=300,
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
|
||
# Собираем сегменты с word-timestamps
|
||
results = []
|
||
word_list = data.get("words", []) # плоский список слов с таймингами
|
||
segments = data.get("segments", [])
|
||
|
||
if word_list and segments:
|
||
# Распределяем слова по сегментам
|
||
seg_offset = 0
|
||
for seg in segments:
|
||
seg_start = seg["start"]
|
||
seg_end = seg["end"]
|
||
seg_text = seg.get("text", "").strip()
|
||
if not seg_text:
|
||
continue
|
||
# Собираем слова, попадающие в этот сегмент
|
||
seg_words = []
|
||
for w in word_list:
|
||
ws = w["start"]
|
||
we = w["end"]
|
||
ww = w.get("word", "")
|
||
# Слово принадлежит сегменту, если его начало попадает в диапазон
|
||
if ws >= seg_start - 0.05 and ws <= seg_end + 0.05:
|
||
seg_words.append({
|
||
"word": ww,
|
||
"start": round(ws, 2),
|
||
"end": round(we, 2),
|
||
})
|
||
results.append({
|
||
"start": round(seg_start, 2),
|
||
"end": round(seg_end, 2),
|
||
"text": seg_text,
|
||
"words": seg_words,
|
||
})
|
||
elif segments:
|
||
for seg in segments:
|
||
seg_text = seg.get("text", "").strip()
|
||
if not seg_text:
|
||
continue
|
||
results.append({
|
||
"start": round(seg["start"], 2),
|
||
"end": round(seg["end"], 2),
|
||
"text": seg_text,
|
||
})
|
||
|
||
print(f"[transcribe] OpenAI API вернул {len(results)} сегментов")
|
||
return results
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Whisper транскрипция (локальная)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def transcribe_whisper(audio_path: str, model_size: str = "base",
|
||
language: Optional[str] = None,
|
||
device: str = "cpu") -> list[dict]:
|
||
"""Запуск faster-whisper. Возвращает список сегментов [{start, end, text}]."""
|
||
try:
|
||
from faster_whisper import WhisperModel
|
||
except ImportError:
|
||
print("[transcribe] faster-whisper не установлен. Ставим...", flush=True)
|
||
os.system(f"{sys.executable} -m pip install faster-whisper")
|
||
from faster_whisper import WhisperModel
|
||
|
||
model = WhisperModel(model_size, device=device, compute_type="int8")
|
||
segments, _ = model.transcribe(audio_path, language=language,
|
||
word_timestamps=True)
|
||
|
||
results = []
|
||
for seg in segments:
|
||
text = seg.text.strip()
|
||
if not text:
|
||
continue
|
||
# Собираем word-timestamps если есть
|
||
words = []
|
||
if hasattr(seg, 'words') and seg.words:
|
||
for w in seg.words:
|
||
words.append({
|
||
"word": w.word,
|
||
"start": round(w.start, 2),
|
||
"end": round(w.end, 2),
|
||
})
|
||
entry = {
|
||
"start": round(seg.start, 2),
|
||
"end": round(seg.end, 2),
|
||
"text": text,
|
||
}
|
||
if words:
|
||
entry["words"] = words
|
||
results.append(entry)
|
||
return results
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Парсинг .lrc
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_RE_LRC = re.compile(r"\[(\d{2,}):(\d{2})\.(\d{2,3})\](.*)")
|
||
|
||
def parse_lrc(path: str) -> list[dict]:
|
||
lines = []
|
||
for line in Path(path).read_text(encoding="utf-8").splitlines():
|
||
m = _RE_LRC.match(line.strip())
|
||
if not m:
|
||
continue
|
||
mins, secs, frac, text = m.groups()
|
||
frac = frac.ljust(3, "0")[:3]
|
||
timestamp = int(mins) * 60 + int(secs) + int(frac) / 1000
|
||
text = text.strip()
|
||
if text:
|
||
lines.append({"start": round(timestamp, 2),
|
||
"end": None, # заполним позже
|
||
"text": text})
|
||
|
||
# Длительность каждой строки = начало следующей
|
||
for i in range(len(lines) - 1):
|
||
lines[i]["end"] = lines[i + 1]["start"]
|
||
if lines:
|
||
lines[-1]["end"] = lines[-1]["start"] + 4.0
|
||
return lines
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Парсинг .srt
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_RE_SRT_TIME = re.compile(r"(\d{2}):(\d{2}):(\d{2}),(\d{3})")
|
||
|
||
def _srt_ts_to_sec(h, m, s, ms):
|
||
return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000
|
||
|
||
|
||
def parse_srt(path: str) -> list[dict]:
|
||
results = []
|
||
blocks = re.split(r"\n\s*\n", Path(path).read_text(encoding="utf-8").strip())
|
||
for block in blocks:
|
||
parts = block.strip().splitlines()
|
||
if len(parts) < 3:
|
||
continue
|
||
m_from = _RE_SRT_TIME.match(parts[1].split(" --> ")[0].strip())
|
||
m_to = _RE_SRT_TIME.match(parts[1].split(" --> ")[1].strip())
|
||
if not m_from or not m_to:
|
||
continue
|
||
start = _srt_ts_to_sec(*m_from.groups())
|
||
end = _srt_ts_to_sec(*m_to.groups())
|
||
text = " ".join(parts[2:]).strip()
|
||
if text:
|
||
results.append({"start": round(start, 2),
|
||
"end": round(end, 2),
|
||
"text": text})
|
||
return results
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# .txt forced alignment — равномерное распределение по длительности аудио
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _get_audio_duration(audio_path: str) -> float:
|
||
"""Получить длительность аудио через ffmpeg (ffprobe недоступен)."""
|
||
import shutil
|
||
ffmpeg_bin = shutil.which("ffmpeg") or "/home/node/bin/ffmpeg"
|
||
# Используем ffmpeg -i для получения длительности
|
||
result = subprocess.run(
|
||
[ffmpeg_bin, "-i", audio_path],
|
||
capture_output=True, text=True
|
||
)
|
||
# ffmpeg пишет Duration в stderr
|
||
import re
|
||
m = re.search(r"Duration: (\d+):(\d+):([\d.]+)", result.stderr)
|
||
if m:
|
||
h, mn, s = m.groups()
|
||
return int(h) * 3600 + int(mn) * 60 + float(s)
|
||
raise RuntimeError(f"Не удалось получить длительность аудио: {audio_path}")
|
||
|
||
|
||
|
||
def align_txt(audio_path: str, txt_path: str,
|
||
model_size: str = "base",
|
||
device: str = "cpu") -> list[dict]:
|
||
"""Выровнить строки .txt по Whisper-таймингам.
|
||
|
||
Алгоритм:
|
||
1. Запустить Whisper (language='ru', word_timestamps=True)
|
||
2. Если получили >5 сегментов с реальным текстом (не 'Music','♪' и т.п.)
|
||
— выровнять строки текста по сегментам пропорционально
|
||
3. Fallback: равномерное распределение по длительности аудио
|
||
"""
|
||
txt_lines = Path(txt_path).read_text(encoding="utf-8").splitlines()
|
||
txt_lines = [l.strip() for l in txt_lines if l.strip()]
|
||
total_lines = len(txt_lines)
|
||
if total_lines == 0:
|
||
return []
|
||
|
||
# Попробовать выровнить через Whisper
|
||
_SKIPPED = re.compile(r"^(music|\[music\]|instrumental|♪+|♫*|\(music\)|$)", re.IGNORECASE)
|
||
try:
|
||
whisper_segs = transcribe_whisper(audio_path, model_size,
|
||
language="ru", device=device)
|
||
|
||
# Фильтруем «мусорные» сегменты
|
||
real_segs = [s for s in whisper_segs if not _SKIPPED.match(s["text"].strip())]
|
||
|
||
if len(real_segs) > 5:
|
||
# Есть достаточно реальных сегментов — выравниваем строки по ним
|
||
return _align_by_segments(txt_lines, real_segs)
|
||
except Exception as e:
|
||
print(f"[transcribe] Whisper alignment не удался ({e}), fallback на равномерное распределение")
|
||
|
||
# Fallback: равномерное распределение
|
||
return _align_uniform(txt_lines, audio_path)
|
||
|
||
|
||
def _align_uniform(txt_lines: list[str], audio_path: str) -> list[dict]:
|
||
"""Равномерно распределить строки по длительности аудио."""
|
||
total_lines = len(txt_lines)
|
||
duration = _get_audio_duration(audio_path)
|
||
slot = duration / total_lines
|
||
return [{"start": round(i * slot, 2),
|
||
"end": round((i + 1) * slot, 2),
|
||
"text": txt_lines[i]} for i in range(total_lines)]
|
||
|
||
|
||
def _align_by_segments(txt_lines: list[dict], whisper_segs: list[dict]) -> list[dict]:
|
||
"""Выровнять строки текста по Whisper-сегментам.
|
||
|
||
Каждый Whisper-сегмент может охватывать несколько строк текста.
|
||
Распределяем строки внутри диапазона сегмента пропорционально.
|
||
"""
|
||
if not txt_lines:
|
||
return []
|
||
|
||
# Если строк больше чем сегментов — распределяем строки по сегментам
|
||
num_segs = len(whisper_segs)
|
||
total_lines = len(txt_lines)
|
||
|
||
if total_lines <= num_segs:
|
||
# Строк меньше или столько же сколько сегментов — берём сегменты напрямую
|
||
# Берём первые total_lines сегментов
|
||
result = []
|
||
for i in range(total_lines):
|
||
seg = whisper_segs[i] if i < num_segs else whisper_segs[-1]
|
||
result.append({
|
||
"start": seg["start"],
|
||
"end": seg["end"],
|
||
"text": txt_lines[i]
|
||
})
|
||
return result
|
||
|
||
# Строк больше чем сегментов — распределяем пропорционально
|
||
result = []
|
||
seg_idx = 0
|
||
lines_per_seg = max(1, total_lines // num_segs)
|
||
|
||
for i, line in enumerate(txt_lines):
|
||
# Определяем к какому сегменту принадлежит строка
|
||
seg_idx = min(i // lines_per_seg, num_segs - 1)
|
||
|
||
# Границы текущего сегмента
|
||
seg_start = whisper_segs[seg_idx]["start"]
|
||
seg_end = whisper_segs[min(seg_idx + 1, num_segs - 1)]["start"] if seg_idx + 1 < num_segs else whisper_segs[seg_idx]["end"]
|
||
|
||
# Позиция внутри группы строк текущего сегмента
|
||
group_start = seg_idx * lines_per_seg
|
||
group_size = min(lines_per_seg, total_lines - group_start)
|
||
line_in_group = i - group_start
|
||
|
||
# Пропорциональное распределение внутри сегмента
|
||
if group_size > 1:
|
||
line_dur = (seg_end - seg_start) / group_size
|
||
line_start = seg_start + line_in_group * line_dur
|
||
line_end = line_start + line_dur
|
||
else:
|
||
line_start = seg_start
|
||
line_end = seg_end
|
||
|
||
result.append({
|
||
"start": round(line_start, 2),
|
||
"end": round(line_end, 2),
|
||
"text": line
|
||
})
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Главная функция
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def transcribe(audio_path: str, text_path: Optional[str] = None,
|
||
model_size: str = "base", device: str = "cpu",
|
||
language: Optional[str] = None,
|
||
use_api: bool = True) -> list[dict]:
|
||
"""
|
||
Универсальная функция транскрипции.
|
||
|
||
Параметры:
|
||
audio_path — путь к аудиофайлу
|
||
text_path — путь к .lrc/.srt/.txt (опционально)
|
||
model_size — размер Whisper-модели (tiny, base, small, medium, large)
|
||
device — "cpu" или "cuda"
|
||
language — код языка (напр. "ru", "en") или None для авто
|
||
use_api — использовать OpenAI API (по умолч. True)
|
||
|
||
Возвращает: [{start, end, text, words?}, ...]
|
||
"""
|
||
if text_path:
|
||
ext = Path(text_path).suffix.lower()
|
||
if ext == ".lrc":
|
||
return parse_lrc(text_path)
|
||
elif ext == ".srt":
|
||
return parse_srt(text_path)
|
||
elif ext == ".txt":
|
||
return align_txt(audio_path, text_path, model_size, device)
|
||
else:
|
||
print(f"[transcribe] Неподдерживаемый формат: {text_path}")
|
||
sys.exit(1)
|
||
else:
|
||
# Проверяем, можно ли использовать OpenAI API
|
||
if use_api:
|
||
load_dotenv(os.path.expanduser("~/.openclaw/.env"))
|
||
api_key = os.environ.get("OPENAI_API_KEY")
|
||
if api_key:
|
||
try:
|
||
return transcribe_openai_api(audio_path, language or "ru")
|
||
except Exception as e:
|
||
print(f"[transcribe] OpenAI API не удался ({e}), fallback на faster-whisper")
|
||
|
||
print(f"[transcribe] Запускаем Whisper ({model_size}, {device})…")
|
||
return transcribe_whisper(audio_path, model_size, language, device)
|