From 1768596750e89116e06488cc73edff20a4ae14b1 Mon Sep 17 00:00:00 2001 From: Stream Date: Thu, 30 Apr 2026 00:50:01 +0300 Subject: [PATCH] auto-sync: 2026-04-30 00:50:01 --- tasks/karaoke/transcribe.py | 76 ++++++++++++++----------------------- 1 file changed, 28 insertions(+), 48 deletions(-) diff --git a/tasks/karaoke/transcribe.py b/tasks/karaoke/transcribe.py index ea68ff7..eded913 100644 --- a/tasks/karaoke/transcribe.py +++ b/tasks/karaoke/transcribe.py @@ -10,6 +10,8 @@ transcribe.py — Whisper транскрипция аудио → [{start, end, import os import re import sys +import json +import subprocess from pathlib import Path from typing import Optional @@ -106,66 +108,44 @@ def parse_srt(path: str) -> list[dict]: # --------------------------------------------------------------------------- -# .txt forced alignment через whisper word-timestamps +# .txt forced alignment — равномерное распределение по длительности аудио # --------------------------------------------------------------------------- +def _get_audio_duration(audio_path: str) -> float: + """Получить длительность аудио через ffmpeg (ffprobe недоступен).""" + import shutil + ffmpeg_bin = shutil.which("ffmpeg") or "/home/node/bin/ffmpeg" + # Используем ffmpeg -i для получения длительности + result = subprocess.run( + [ffmpeg_bin, "-i", audio_path], + capture_output=True, text=True + ) + # ffmpeg пишет Duration в stderr + import re + m = re.search(r"Duration: (\d+):(\d+):([\d.]+)", result.stderr) + if m: + h, mn, s = m.groups() + return int(h) * 3600 + int(mn) * 60 + float(s) + raise RuntimeError(f"Не удалось получить длительность аудио: {audio_path}") + + + def align_txt(audio_path: str, txt_path: str, model_size: str = "base", device: str = "cpu") -> list[dict]: - """Align plain .txt lyrics to audio by splitting Whisper segments by lines.""" - from faster_whisper import WhisperModel - + """Равномерно распределить строки .txt по длительности аудио.""" txt_lines = Path(txt_path).read_text(encoding="utf-8").splitlines() txt_lines = [l.strip() for l in txt_lines if l.strip()] total_lines = len(txt_lines) if total_lines == 0: return [] - model = WhisperModel(model_size, device=device, compute_type="int8") - segments, _ = model.transcribe(audio_path, word_timestamps=True) + duration = _get_audio_duration(audio_path) + slot = duration / total_lines - # Собираем полнотекст из whisper - whisper_parts = [] - for seg in segments: - whisper_parts.append(seg.text.strip()) - full_whisper = " ".join(whisper_parts) - - segment_lines = max(total_lines, len(whisper_parts)) - - # Равномерно распределяем строки по whisper-сегментам - if len(whisper_parts) == 0: - return [] - - results = [] - line_idx = 0 - for i, seg in enumerate(segments): - seg_text = seg.text.strip() - if not seg_text: - continue - - # Сколько строк текста привязать к этому сегменту - if i == len(whisper_parts) - 1: - # Последний сегмент — все оставшиеся строки - count = total_lines - line_idx - else: - # Пропорционально по символам - ratio = len(seg_text) / len(full_whisper) - count = max(1, round(total_lines * ratio)) - count = min(count, total_lines - line_idx) - - for j in range(count): - if line_idx >= total_lines: - break - t_start = seg.start + j * (seg.end - seg.start) / max(count, 1) - t_end = seg.start + (j + 1) * (seg.end - seg.start) / max(count, 1) - results.append({ - "start": round(t_start, 2), - "end": round(t_end, 2), - "text": txt_lines[line_idx], - }) - line_idx += 1 - - return results + return [{"start": round(i * slot, 2), + "end": round((i + 1) * slot, 2), + "text": txt_lines[i]} for i in range(total_lines)] # ---------------------------------------------------------------------------