#!/usr/bin/env python3 """ analyze.py — Predhodna analiza CELEGA videa pred trim-anjem. Naredi: 1. Whisper transcript celega videa (auto-detect jezika ali user-specified) 2. Energy profile (RMS dB na 1s windows) 3. Structural detection (vocal/instrumental sections, energy peaks) 4. Pametno izbere clip range (lahko >30s, vključi pre-chorus) 5. Detekcija instrumentalnih pesmi (no_subs auto) Output: JSON s podatki za clip.py """ import argparse import json import os import re import subprocess import sys import tempfile from pathlib import Path def get_video_duration(path): r = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nw=1:nokey=1", str(path)], capture_output=True, text=True ) try: return float(r.stdout.strip()) except ValueError: return 0.0 def extract_audio(video_path): """Extract avdio v 16kHz mono WAV za Whisper + energy.""" audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) audio.close() subprocess.run( ["ffmpeg", "-y", "-i", str(video_path), "-vn", "-ac", "1", "-ar", "16000", "-c:a", "pcm_s16le", audio.name], check=True, capture_output=True ) return audio.name def transcribe_full(audio_path, lang=None, model_size="small"): """Whisper transcript celega avdia. lang=None → auto-detect.""" from faster_whisper import WhisperModel print(f"🧠 Whisper {model_size}, lang={lang or 'auto'}", file=sys.stderr) m = WhisperModel(model_size, device="cpu", compute_type="int8") segs, info = m.transcribe( audio_path, language=lang, word_timestamps=True, vad_filter=True, ) detected_lang = info.language detected_prob = info.language_probability print(f" Detekcija: {detected_lang} (p={detected_prob:.2f})", file=sys.stderr) segments = [] for s in segs: words = [] if s.words: for w in s.words: words.append({ "start": w.start, "end": w.end, "text": w.word, }) segments.append({ "start": s.start, "end": s.end, "text": s.text.strip(), "words": words, }) return { "language": detected_lang, "language_probability": detected_prob, "segments": segments, } def compute_energy_profile(audio_path, window_sec=1.0): """RMS dB na window_sec sekund. Vrne list (timestamp, rms_db).""" cmd = [ "ffmpeg", "-i", audio_path, "-af", f"asetnsamples=n={int(16000 * window_sec)}:p=0," f"astats=metadata=1:reset={window_sec}," f"ametadata=print:key=lavfi.astats.Overall.RMS_level:file=-", "-f", "null", "-", ] result = subprocess.run(cmd, capture_output=True, text=True) output = result.stdout + "\n" + result.stderr energies = [] current_pts = 0.0 for line in output.split("\n"): line = line.strip() m = re.search(r"pts_time:(\S+)", line) if m: try: current_pts = float(m.group(1)) except ValueError: pass continue if "RMS_level=" in line: val = line.split("RMS_level=")[-1].strip() try: rms = float(val) # -inf zamenjamo z -90 if rms < -90 or rms != rms: # NaN check rms = -90.0 energies.append((current_pts, rms)) current_pts += window_sec except ValueError: pass return energies def detect_vocal_sections(segments, max_gap=3.0): """Združi consecutive segmente v "vokalne sekcije".""" if not segments: return [] sections = [] current = { "start": segments[0]["start"], "end": segments[0]["end"], "segments": [segments[0]], "text": segments[0]["text"], } for seg in segments[1:]: if seg["start"] - current["end"] > max_gap: sections.append(current) current = { "start": seg["start"], "end": seg["end"], "segments": [seg], "text": seg["text"], } else: current["end"] = seg["end"] current["segments"].append(seg) current["text"] += " " + seg["text"] sections.append(current) return sections def avg_energy_in_range(energies, start, end): """Povprečna RMS v rangeu.""" vals = [r for (t, r) in energies if start <= t <= end] if not vals: return -90.0 return sum(vals) / len(vals) def score_section_as_chorus(section, all_sections, energies, avg_rms): """Score sekcijo kot kandidat za refren. Faktorji: - Ponavljajoče besede (low unique-word-ratio) = refren - Visoka energija - Sekcija se pojavi večkrat v pesmi (refren se ponovi) - Krajše vrstice (3-8 besed) """ text = section["text"].lower() words = re.findall(r"\b\w+\b", text) if not words: return 0 unique_ratio = len(set(words)) / len(words) # Refren = nizko unique ratio (ponovitve) chorus_signal = max(0, (1.0 - unique_ratio) * 30) # Energija sec_energy = avg_energy_in_range(energies, section["start"], section["end"]) energy_above = max(0, sec_energy - avg_rms) energy_score = energy_above * 8 # Kako pogosto se pojavi podobno besedilo repeat_count = 0 for other in all_sections: if other is section: continue other_text = other["text"].lower() other_words = set(re.findall(r"\b\w+\b", other_text)) common = set(words) & other_words # Če imata >50% besed skupnih, je verjetno isti refren if len(common) >= len(set(words)) * 0.5 and len(common) >= 3: repeat_count += 1 repeat_score = repeat_count * 25 # Dolžina vrstice duration = section["end"] - section["start"] if 3 <= duration <= 25: length_score = 10 elif duration > 25: length_score = 5 else: length_score = 2 return chorus_signal + energy_score + repeat_score + length_score def find_chorus(transcript, energies, video_duration): """Najde najbolj verjeten refren.""" sections = detect_vocal_sections(transcript["segments"]) if not sections: return None avg_rms = sum(r for (_, r) in energies) / len(energies) if energies else -30.0 candidates = [] for sec in sections: score = score_section_as_chorus(sec, sections, energies, avg_rms) candidates.append({ "start": sec["start"], "end": sec["end"], "duration": sec["end"] - sec["start"], "text_preview": sec["text"][:80], "score": round(score, 2), "avg_rms": round(avg_energy_in_range(energies, sec["start"], sec["end"]), 2), }) # Sort by score descending candidates.sort(key=lambda c: -c["score"]) if not candidates: return None return { "best": candidates[0], "all_candidates": candidates[:10], "avg_rms_total": round(avg_rms, 2), } def smart_clip_range(chorus, transcript, video_duration, target_duration=30, max_duration=45, min_duration=20): """Inteligentno določi clip range. Logika: 1. Začni z refrenom kot core 2. Če je krajši od min_duration, razširi na obeh straneh 3. Če imamo prostor, dodaj pre-chorus pred refrenom 4. Cap na max_duration """ if not chorus or not chorus.get("best"): # Fallback: vzemi sredino videa mid = video_duration / 2 start = max(0, mid - target_duration / 2) return { "start": start, "end": min(video_duration, start + target_duration), "reason": "fallback_middle", } best = chorus["best"] sections = detect_vocal_sections(transcript["segments"]) actual_start = best["start"] actual_end = best["end"] # 1. Če je core refren prekratek, razširi if actual_end - actual_start < min_duration: # Najdi naslednjo sekcijo (verjetno se refren ponovi) for sec in sections: if sec["start"] > actual_end and sec["start"] - actual_end < 5: # Sekcija blizu, dodaj jo if sec["end"] - actual_start <= max_duration: actual_end = sec["end"] if actual_end - actual_start >= min_duration: break # 2. Dodaj pre-chorus pred refrenom (build-up) pre_section = None for sec in sections: if sec["end"] <= actual_start and actual_start - sec["end"] < 8: pre_section = sec # zadnja pred refrenom if pre_section: candidate_start = pre_section["start"] if actual_end - candidate_start <= max_duration: actual_start = candidate_start # 3. Če je res prekratek, razširi simetrično if actual_end - actual_start < min_duration: deficit = min_duration - (actual_end - actual_start) actual_start = max(0, actual_start - deficit / 2) actual_end = min(video_duration, actual_end + deficit / 2) # 4. Trim na max if actual_end - actual_start > max_duration: actual_end = actual_start + max_duration # Snap to video bounds actual_start = max(0, actual_start) actual_end = min(video_duration, actual_end) return { "start": round(actual_start, 2), "end": round(actual_end, 2), "duration": round(actual_end - actual_start, 2), "reason": "smart_chorus_with_prebuild", "chorus_start": round(best["start"], 2), "chorus_end": round(best["end"], 2), } def detect_audio_fade(clip_range, transcript): """Določi fade-in/fade-out trajanje. Logika: - Če clip začne sredi vokala → 0.5s fade in - Če se konča sredi vokala → 1.0s fade out - Sicer manj fade """ cs, ce = clip_range["start"], clip_range["end"] # Vokal pri začetku? starts_in_vocal = False ends_in_vocal = False for seg in transcript["segments"]: # Začetek clip-a znotraj segmenta if seg["start"] <= cs <= seg["end"]: starts_in_vocal = True # Konec clip-a znotraj segmenta if seg["start"] <= ce <= seg["end"]: ends_in_vocal = True fade_in = 0.5 if starts_in_vocal else 0.2 fade_out = 1.5 if ends_in_vocal else 0.3 return {"fade_in": fade_in, "fade_out": fade_out} def is_instrumental(transcript, video_duration, threshold=0.1): """Detekcija ali je pesem instrumentalna. Če je vsota trajanja vokalnih segmentov < threshold * video_duration, je pesem instrumentalna. """ if not transcript.get("segments"): return True vocal_duration = sum( s["end"] - s["start"] for s in transcript["segments"] ) ratio = vocal_duration / max(video_duration, 1) return ratio < threshold def main(): ap = argparse.ArgumentParser() ap.add_argument("video", help="Vhod video file") ap.add_argument("--lang", default=None, help="ISO 639-1 ali 'auto' (default: auto)") ap.add_argument("--model", default="small", help="Whisper model") ap.add_argument("--target-duration", type=float, default=30.0) ap.add_argument("--max-duration", type=float, default=45.0) ap.add_argument("--min-duration", type=float, default=20.0) ap.add_argument("--json", action="store_true", help="Output JSON") ap.add_argument("--output", help="Path za JSON output") args = ap.parse_args() video = Path(args.video) if not video.exists(): print(f"❌ Video ne obstaja: {video}", file=sys.stderr) sys.exit(1) duration = get_video_duration(video) print(f"📹 Video: {video.name}, {duration:.1f}s", file=sys.stderr) # 1. Extract avdio audio = extract_audio(video) try: # 2. Whisper transcript lang = None if args.lang in (None, "auto", "") else args.lang transcript = transcribe_full(audio, lang=lang, model_size=args.model) print(f" Transkripcija: {len(transcript['segments'])} segmentov", file=sys.stderr) # 3. Energy profile print(f"⚡ Energy profile...", file=sys.stderr) energies = compute_energy_profile(audio) print(f" Energy samples: {len(energies)}", file=sys.stderr) # 4. Instrumental detection instrumental = is_instrumental(transcript, duration) print(f"🎵 Instrumentalna: {instrumental}", file=sys.stderr) # 5. Find chorus (samo če ni instrumental) if not instrumental: chorus = find_chorus(transcript, energies, duration) else: # Za instrumentalne: najdi sekcijo z najvišjo energijo window = args.target_duration best_start = 0 best_avg = -100 t = 0 while t + window <= duration: avg = avg_energy_in_range(energies, t, t + window) if avg > best_avg: best_avg = avg best_start = t t += 5 # step 5s chorus = { "best": { "start": best_start, "end": best_start + window, "duration": window, "text_preview": "(instrumental — energy peak)", "score": 0, "avg_rms": round(best_avg, 2), }, "all_candidates": [], "avg_rms_total": round( sum(r for (_, r) in energies) / len(energies) if energies else -30, 2 ), } # 6. Smart clip range clip_range = smart_clip_range( chorus, transcript, duration, target_duration=args.target_duration, max_duration=args.max_duration, min_duration=args.min_duration, ) print(f"✂ Clip range: {clip_range['start']:.1f}s - {clip_range['end']:.1f}s " f"(duration: {clip_range['duration']}s)", file=sys.stderr) # 7. Fade params fade = detect_audio_fade(clip_range, transcript) print(f"🎚 Fade: in={fade['fade_in']}s, out={fade['fade_out']}s", file=sys.stderr) result = { "video": str(video), "video_duration": duration, "language": transcript["language"], "language_probability": transcript["language_probability"], "instrumental": instrumental, "transcript": transcript, "chorus": chorus, "clip_range": clip_range, "fade": fade, } if args.output: with open(args.output, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f"💾 Saved: {args.output}", file=sys.stderr) if args.json: print(json.dumps(result, ensure_ascii=False)) finally: try: os.unlink(audio) except Exception: pass if __name__ == "__main__": main()