- FastAPI backend (auth, jobs, SSE, download) - Frontend: drag&drop + YouTube URL + jobs panel - Pipeline: yt_download → find_chorus → reframe → subtitle - Modes: track (face follow), center, blur - Whisper for SI/DE/EN subtitles - Auto-chorus detection via Whisper + RMS energy - Docker + Coolify ready
290 lines
9.7 KiB
Python
290 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
find_chorus.py — Avto-detekcija refrena v glasbenem videu.
|
|
|
|
Hibridni pristop:
|
|
1. Whisper transkribira pesem z word-level timestamps
|
|
2. Najde ponavljajoče se vrstice (n-gram matching + Levenshtein)
|
|
3. Energy analiza prek FFmpeg (RMS dB) — refren je navadno glasnejši
|
|
4. Združi: ponovljen tekst + visoka energija = refren
|
|
|
|
Output: JSON z najboljšimi kandidati (ranked).
|
|
|
|
Primer:
|
|
python3 find_chorus.py pesem.mp4
|
|
python3 find_chorus.py pesem.mp4 --duration 30 --json
|
|
"""
|
|
import argparse
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
import re
|
|
|
|
|
|
def extract_audio(video, sample_rate=16000):
|
|
"""Ekstrahiraj mono WAV za Whisper in energy analizo."""
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
|
|
tmp.close()
|
|
cmd = [
|
|
"ffmpeg", "-y", "-i", str(video),
|
|
"-vn", "-ac", "1", "-ar", str(sample_rate),
|
|
"-c:a", "pcm_s16le", tmp.name,
|
|
]
|
|
subprocess.run(cmd, check=True, stderr=subprocess.DEVNULL)
|
|
return tmp.name
|
|
|
|
|
|
def transcribe(audio_path, lang=None, model_size="small"):
|
|
"""Whisper transkripcija z word-level timestamps."""
|
|
from faster_whisper import WhisperModel
|
|
|
|
print(f"🧠 Whisper: {model_size}, lang={lang or 'auto'}", file=sys.stderr)
|
|
model = WhisperModel(model_size, device="cpu", compute_type="int8")
|
|
segments, info = model.transcribe(
|
|
audio_path,
|
|
language=lang,
|
|
word_timestamps=True,
|
|
vad_filter=True,
|
|
)
|
|
print(f" Detekcija: {info.language} (p={info.language_probability:.2f})", file=sys.stderr)
|
|
|
|
# Vrne seznam line-level segmentov s timestamp-i
|
|
lines = []
|
|
for seg in segments:
|
|
text = seg.text.strip()
|
|
if text:
|
|
lines.append({
|
|
"start": seg.start,
|
|
"end": seg.end,
|
|
"text": text,
|
|
"duration": seg.end - seg.start,
|
|
})
|
|
return lines, info.language
|
|
|
|
|
|
def normalize_text(s):
|
|
"""Normalize za primerjavo: lowercase, brez punktuacije."""
|
|
s = s.lower()
|
|
s = re.sub(r"[^\w\s]", "", s)
|
|
s = re.sub(r"\s+", " ", s).strip()
|
|
return s
|
|
|
|
|
|
def line_similarity(a, b):
|
|
"""Jaccard similarity na bigrams besedah."""
|
|
a_words = normalize_text(a).split()
|
|
b_words = normalize_text(b).split()
|
|
if not a_words or not b_words:
|
|
return 0.0
|
|
|
|
def bigrams(words):
|
|
return set(zip(words, words[1:])) if len(words) > 1 else {(words[0],)}
|
|
|
|
a_bg = bigrams(a_words)
|
|
b_bg = bigrams(b_words)
|
|
if not a_bg or not b_bg:
|
|
return 0.0
|
|
return len(a_bg & b_bg) / len(a_bg | b_bg)
|
|
|
|
|
|
def find_repeated_lines(lines, similarity_threshold=0.5):
|
|
"""
|
|
Najdi ponavljajoče se vrstice. Vrne seznam clustrov.
|
|
Vsak cluster = list[indices_v_lines] kjer so si vrstice podobne.
|
|
"""
|
|
n = len(lines)
|
|
visited = [False] * n
|
|
clusters = []
|
|
|
|
for i in range(n):
|
|
if visited[i]:
|
|
continue
|
|
cluster = [i]
|
|
visited[i] = True
|
|
for j in range(i + 1, n):
|
|
if visited[j]:
|
|
continue
|
|
sim = line_similarity(lines[i]["text"], lines[j]["text"])
|
|
if sim >= similarity_threshold:
|
|
cluster.append(j)
|
|
visited[j] = True
|
|
if len(cluster) >= 2: # samo če se ponovi vsaj 2x
|
|
clusters.append(cluster)
|
|
|
|
return clusters
|
|
|
|
|
|
def compute_energy(audio_path, window_sec=1.0):
|
|
"""
|
|
Vrni list (timestamp, rms_db) preko FFmpeg ebur128 filter.
|
|
"""
|
|
# Uporabi ebur128 ali astats za RMS
|
|
cmd = [
|
|
"ffmpeg", "-i", audio_path,
|
|
"-af", f"asetnsamples=n={int(16000 * window_sec)}:p=0,astats=metadata=1:reset={window_sec},"
|
|
"ametadata=print:key=lavfi.astats.Overall.RMS_level",
|
|
"-f", "null", "-",
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
output = result.stderr
|
|
|
|
energies = []
|
|
current_pts = None
|
|
for line in output.split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("frame:"):
|
|
# frame:N pts:X pts_time:Y
|
|
m = re.search(r"pts_time:(\S+)", line)
|
|
if m:
|
|
current_pts = float(m.group(1))
|
|
elif line.startswith("lavfi.astats.Overall.RMS_level="):
|
|
val = line.split("=")[1]
|
|
try:
|
|
rms = float(val)
|
|
if current_pts is not None:
|
|
energies.append((current_pts, rms))
|
|
except ValueError:
|
|
pass
|
|
|
|
return energies
|
|
|
|
|
|
def avg_energy_in_range(energies, start, end):
|
|
"""Povprečna RMS v [start, end]."""
|
|
in_range = [e for t, e in energies if start <= t <= end]
|
|
if not in_range:
|
|
return -60.0 # default tih
|
|
return sum(in_range) / len(in_range)
|
|
|
|
|
|
def find_chorus(video, lang=None, model_size="small", target_duration=30.0):
|
|
"""
|
|
Glavni entry point. Vrne ranked kandidate refrenov.
|
|
"""
|
|
audio = extract_audio(video)
|
|
try:
|
|
lines, detected_lang = transcribe(audio, lang=lang, model_size=model_size)
|
|
if not lines:
|
|
return {"error": "Brez transkripcije", "candidates": []}
|
|
|
|
print(f"📝 {len(lines)} vrstic transkripta", file=sys.stderr)
|
|
|
|
clusters = find_repeated_lines(lines, similarity_threshold=0.5)
|
|
print(f"🔁 {len(clusters)} ponavljajočih se sklopov", file=sys.stderr)
|
|
|
|
if not clusters:
|
|
return {
|
|
"error": "Ni najdenih ponavljajočih se vrstic",
|
|
"language": detected_lang,
|
|
"candidates": [],
|
|
}
|
|
|
|
print("🔊 Analiza energije...", file=sys.stderr)
|
|
energies = compute_energy(audio)
|
|
avg_overall = sum(e for _, e in energies) / max(1, len(energies))
|
|
print(f" Povprečje RMS: {avg_overall:.1f} dB", file=sys.stderr)
|
|
|
|
# Za vsak cluster izračunaj score
|
|
candidates = []
|
|
for cluster_idx, cluster in enumerate(clusters):
|
|
# Predstavnik clusterja = najdaljša vrstica
|
|
rep = max(cluster, key=lambda i: len(lines[i]["text"]))
|
|
rep_text = lines[rep]["text"]
|
|
|
|
# Vsaka instanca = potencialen reel start
|
|
for inst_idx in cluster:
|
|
line = lines[inst_idx]
|
|
# Razširi okno na target_duration začenši pri tej vrstici
|
|
start = line["start"]
|
|
end = min(start + target_duration, line["start"] + target_duration)
|
|
|
|
# Najdi konec videa (zadnja vrstica)
|
|
video_end = max(l["end"] for l in lines)
|
|
if start + target_duration > video_end:
|
|
start = max(0, video_end - target_duration)
|
|
end = video_end
|
|
|
|
avg_e = avg_energy_in_range(energies, start, start + target_duration)
|
|
energy_score = max(0, avg_e - avg_overall) # koliko nad povprečjem
|
|
|
|
# Score: število ponovitev + energy + dolžina vrstice
|
|
score = (
|
|
len(cluster) * 10 # repetition weight
|
|
+ energy_score * 2 # energy weight
|
|
+ min(len(rep_text.split()), 10) # text richness
|
|
)
|
|
|
|
candidates.append({
|
|
"start": round(start, 2),
|
|
"end": round(start + target_duration, 2),
|
|
"duration": target_duration,
|
|
"score": round(score, 2),
|
|
"repetitions": len(cluster),
|
|
"avg_rms_db": round(avg_e, 1),
|
|
"energy_above_avg_db": round(energy_score, 1),
|
|
"text_sample": rep_text[:80],
|
|
"cluster_id": cluster_idx,
|
|
})
|
|
|
|
# Sort by score, dedupe close candidates
|
|
candidates.sort(key=lambda c: -c["score"])
|
|
deduped = []
|
|
for c in candidates:
|
|
if all(abs(c["start"] - d["start"]) > 5 for d in deduped):
|
|
deduped.append(c)
|
|
if len(deduped) >= 5:
|
|
break
|
|
|
|
return {
|
|
"language": detected_lang,
|
|
"total_lines": len(lines),
|
|
"clusters_found": len(clusters),
|
|
"candidates": deduped,
|
|
}
|
|
finally:
|
|
Path(audio).unlink(missing_ok=True)
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("input")
|
|
ap.add_argument("--lang", default=None)
|
|
ap.add_argument("--model", default="small",
|
|
choices=["tiny", "base", "small", "medium", "large-v3"])
|
|
ap.add_argument("--duration", type=float, default=30.0,
|
|
help="Ciljna dolžina reel-a v s")
|
|
ap.add_argument("--json", action="store_true", help="JSON output")
|
|
args = ap.parse_args()
|
|
|
|
src = Path(args.input)
|
|
if not src.exists():
|
|
print(f"❌ {src} ne obstaja", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
result = find_chorus(src, lang=args.lang, model_size=args.model,
|
|
target_duration=args.duration)
|
|
|
|
if args.json:
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
else:
|
|
if "error" in result and not result.get("candidates"):
|
|
print(f"❌ {result['error']}", file=sys.stderr)
|
|
sys.exit(2)
|
|
print(f"\n🎵 Jezik: {result.get('language', '?')}")
|
|
print(f"📋 {result['total_lines']} vrstic, {result['clusters_found']} ponavljanj\n")
|
|
print("🏆 Najboljši kandidati za refren:\n")
|
|
for i, c in enumerate(result["candidates"], 1):
|
|
mins = int(c["start"] // 60)
|
|
secs = c["start"] - mins * 60
|
|
print(f" {i}. {mins}:{secs:05.2f} → +{c['duration']:.0f}s "
|
|
f"(score={c['score']}, ponovitev={c['repetitions']}, "
|
|
f"energija={c['energy_above_avg_db']:+.1f} dB)")
|
|
print(f" '{c['text_sample']}'\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|