""" reels.biba.live — FastAPI backend. Endpoints: GET / — frontend HTML POST /api/upload — naloži video file POST /api/youtube — submit YouTube URL POST /api/process/{id} — start processing job GET /api/jobs — list vseh jobov GET /api/jobs/{id} — status job-a GET /api/stream/{id} — SSE progress stream GET /api/download/{id} — download finalni reel GET /api/preview/{id} — preview video stream DELETE /api/jobs/{id} — pobriši job + datoteke """ import asyncio import json import os import secrets import shutil import subprocess import time import uuid from pathlib import Path from typing import Optional from fastapi import ( FastAPI, UploadFile, File, Form, HTTPException, Depends, BackgroundTasks, Request, status ) from fastapi.responses import ( FileResponse, HTMLResponse, StreamingResponse, JSONResponse, Response ) from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBasic, HTTPBasicCredentials from pydantic import BaseModel # ──────────────────────────────────────────────────────────────── # Config # ──────────────────────────────────────────────────────────────── DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) UPLOAD_DIR = DATA_DIR / "uploads" OUTPUT_DIR = DATA_DIR / "outputs" JOBS_DIR = DATA_DIR / "jobs" SCRIPTS_DIR = Path(__file__).parent.parent / "scripts" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) JOBS_DIR.mkdir(parents=True, exist_ok=True) AUTH_USER = os.environ.get("AUTH_USER", "sebastjan") AUTH_PASS = os.environ.get("AUTH_PASS", "change-me-in-coolify-env") MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "2000")) # ──────────────────────────────────────────────────────────────── # Auth # ──────────────────────────────────────────────────────────────── security = HTTPBasic() def check_auth(creds: HTTPBasicCredentials = Depends(security)): correct_user = secrets.compare_digest(creds.username, AUTH_USER) correct_pass = secrets.compare_digest(creds.password, AUTH_PASS) if not (correct_user and correct_pass): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Napačno geslo", headers={"WWW-Authenticate": "Basic"}, ) return creds.username # ──────────────────────────────────────────────────────────────── # Artist + title parsing iz filename / YouTube title # ──────────────────────────────────────────────────────────────── import re _NOISE_PATTERNS = [ # Variacije "official" z možnimi tipkarskimi napakami (offiicial, offical, oficial, ...) # Match liberalno: "off" + 0-3 dodatnih črk + "icial" + "video"/"audio" + opcijsko številka r"\(Off[a-z]*icial\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\s*\)", r"\(Off[a-z]*icia[lk]\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\)", r"\bOff[a-z]*icial\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\b", # Nemške variacije r"\(Offizielles?\s+(?:Musik)?[Vv]ideo\)", r"\bOffizielles?\s+(?:Musik)?[Vv]ideo\b", # Lyric videos r"\(Lyric[s]?\s+Video\)", r"\bLyric[s]?\s+Video\b", # Audio quality / version markers r"\(Audio\)", r"\(HD\)", r"\(HQ\)", r"\(4K\)", r"\(8K\)", r"\(1080p?\)", r"\(Live\)", r"\(Remix\)", r"\(Cover\)", r"\(Acoustic\)", r"\(Remastered\)", r"\(Remaster(?:ed)?\s*\d{0,4}\)", r"\(Extended(?:\s+Mix)?\)", r"\(Radio(?:\s+Edit)?\)", r"\(Clean(?:\s+Version)?\)", r"\(Explicit\)", # Square brackets variations r"\[Official[^\]]*\]", r"\[Music[^\]]*\]", r"\[Audio[^\]]*\]", r"\[HD\]", r"\[HQ\]", r"\[4K\]", r"\[Lyric[s]?[^\]]*\]", # Bare words (without brackets) r"\boriginal\s+(?:video|audio)\b", r"\bMV\b", # Trailing year in parens (npr. "(2024)") r"\(\d{4}\)\s*$", ] def parse_artist_title(filename_or_title): """Iz imena datoteke / YouTube naslova ekstrahira (artist, title). Podpira pogoste vzorce: - "Artist - Title.mp4" - "Artist - Title (Official Music Video).mp4" - "Artist – Title" (em-dash) - "Artist | Title" Vrne (artist, title) ali (None, None) če ni razvidno. """ if not filename_or_title: return (None, None) # Odstrani extension name = Path(filename_or_title).stem if "." in filename_or_title else filename_or_title # Odstrani noise patterns for pat in _NOISE_PATTERNS: name = re.sub(pat, "", name, flags=re.IGNORECASE) # Normaliziraj presledke name = re.sub(r"\s+", " ", name).strip() # Probaj različne separatorje for sep in [" - ", " – ", " — ", " | ", " : "]: if sep in name: parts = name.split(sep, 1) artist = parts[0].strip() title = parts[1].strip() # Strip trailing/leading puncutation artist = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', artist) title = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', title) if artist and title and len(artist) <= 80 and len(title) <= 100: return (artist, title) return (None, None) def safe_filename(s, max_len=80): """Naredi varno ime datoteke (brez znakov ki bi razbili FS).""" if not s: return "" # Replace problematic chars with safe alternative s = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', s) s = re.sub(r'\s+', ' ', s).strip() return s[:max_len] def clean_noise(s): """Odstrani 'noise' (Official Video itd.) iz besedila - tudi že-parsed values.""" if not s: return s cleaned = s for pat in _NOISE_PATTERNS: cleaned = re.sub(pat, "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"\s+", " ", cleaned).strip() cleaned = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', cleaned) return cleaned def build_download_filename(job): """Sestavi pravilno ime download datoteke iz job metadata.""" # Najprej probaj job-shranjene parsed values artist = clean_noise(job.get("parsed_artist")) title = clean_noise(job.get("parsed_title")) # Fallback: parse from filename if not artist or not title: source = job.get("filename") or job.get("youtube_title") or "" parsed_artist, parsed_title = parse_artist_title(source) artist = artist or parsed_artist title = title or parsed_title if artist and title: return f"{safe_filename(artist)} - {safe_filename(title)} - REEL.mp4" if title: return f"{safe_filename(title)} - REEL.mp4" # Last resort: job ID (vendar to bi se moralo preprečiti že ob upload-u) return f"reel_{job['id']}.mp4" # ──────────────────────────────────────────────────────────────── # Job state (filesystem-based, persistent prek restartov) # ──────────────────────────────────────────────────────────────── def job_path(job_id): return JOBS_DIR / f"{job_id}.json" def load_job(job_id): p = job_path(job_id) if not p.exists(): return None return json.loads(p.read_text()) def save_job(job): job_path(job["id"]).write_text(json.dumps(job, ensure_ascii=False, indent=2)) def update_job(job_id, **kwargs): job = load_job(job_id) if not job: return None job.update(kwargs) job["updated_at"] = time.time() save_job(job) return job def _clean_job_titles(job): """Očisti 'Official Video' ipd. iz parsed_title v real-time (brez perzistiranja).""" if not job: return job if job.get("parsed_title"): cleaned = clean_noise(job["parsed_title"]) if cleaned and cleaned != job["parsed_title"]: job["parsed_title"] = cleaned if job.get("parsed_artist"): cleaned = clean_noise(job["parsed_artist"]) if cleaned and cleaned != job["parsed_artist"]: job["parsed_artist"] = cleaned return job def list_jobs(): out = [] for f in sorted(JOBS_DIR.glob("*.json"), reverse=True): try: out.append(_clean_job_titles(json.loads(f.read_text()))) except Exception: pass return out def generate_srt_from_segments(segments, clip_start, clip_end, output_path): """Generira SRT samo za dele, ki spadajo v [clip_start, clip_end]. Timestamp-i so re-mapirani na 0-based (kot je v trim-anem videu). Razdeli dolge segmente (>2.5s) na enake kose za hiter pacing v reels stilu. Vse besedilo VELIKE TISKANE ČRKE. """ MAX_CHUNK_DURATION = 2.5 def fmt_ts(s): h = int(s // 3600) m = int((s % 3600) // 60) sec = s % 60 return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",") lines = [] idx = 1 for seg in segments: s_start = float(seg["start"]) s_end = float(seg["end"]) text = str(seg["text"]).strip() words = seg.get("words", []) or [] # Filter v range if s_end <= clip_start or s_start >= clip_end: continue # ── HALLUCINATION FILTER ── # STT (Scribe, Whisper) občasno halucinira pri dolgih instrumentalih: # vrne segment 60-100s z 1-2 besedama. Tak segment ne smemo dati v SRT. # Pravilo: če je segment > 15s IN ima < 5 besed (ali words array < 5), # je verjetno halucinacija. seg_dur = s_end - s_start word_count = len(words) if words else len(text.split()) if seg_dur > 15 and word_count < 5: print(f"[SRT] Preskočil halucinacijski segment [{s_start:.1f}-{s_end:.1f}] " f"({seg_dur:.1f}s, {word_count} besed): {text[:50]!r}", flush=True) continue # Pripravi words_in_clip vedno (če imamo word-level timestampe) # Uporabili ga bomo tako za segment trim kot za chunk boundaries words_in_clip = None if words: words_in_clip = [] for w in words: w_start = float(w.get("start", 0)) w_end = float(w.get("end", 0)) w_text = w.get("text", "").strip() if not w_text: continue if w_end > clip_start and w_start < clip_end: words_in_clip.append({ "start": max(w_start, clip_start), "end": min(w_end, clip_end), "text": w_text, }) # Če segment delno štrli iz clip range-a IN imamo word-level timestampe, # uporabi samo tiste besede ki dejansko padejo v clip range if words_in_clip and (s_start < clip_start or s_end > clip_end): if not words_in_clip: continue # Reconstruiraj segment z dejanskim word-level timing-om text = " ".join(w["text"] for w in words_in_clip) s_start = words_in_clip[0]["start"] s_end = words_in_clip[-1]["end"] else: # Klipni segment začetek/konec na clip range s_start = max(s_start, clip_start) s_end = min(s_end, clip_end) if s_end - s_start < 0.2: continue # Re-mapraj na 0-based rel_start = s_start - clip_start rel_end = s_end - clip_start if not text: continue text_upper = text.upper() # Razdeli na chunk-e če je predolg duration = rel_end - rel_start if duration <= MAX_CHUNK_DURATION: lines.append(f"{idx}\n{fmt_ts(rel_start)} --> {fmt_ts(rel_end)}\n{text_upper}\n") idx += 1 else: # ── WORD-LEVEL CHUNKING ── # Če imamo word-level timestampe (Soniox/Scribe), uporabi DEJANSKE čase besed # za chunk boundaries (NE enake time chunks, ker pevec ne poje enakomerno). if words_in_clip and len(words_in_clip) >= 2: # Group besede v chunke z max trajanjem MAX_CHUNK_DURATION # Pravila: # 1. Vsak chunk traja max MAX_CHUNK_DURATION (2.5s) # 2. NE začni novega chunka če bi prejšnji ostal kratek (<3 besede ali <12 znakov) # 3. NE pusti zadnji chunk siroto z 1 kratko besedo (<3 znakov) — združi nazaj MIN_CHUNK_WORDS = 3 MIN_CHUNK_CHARS = 12 chunks = [] current_chunk = [words_in_clip[0]] for w in words_in_clip[1:]: chunk_start_time = current_chunk[0]["start"] chunk_dur_so_far = w["end"] - chunk_start_time # Bi začeli nov chunk? if chunk_dur_so_far > MAX_CHUNK_DURATION: # Preveri ali bi current_chunk ostal "siroto kratek" current_text = " ".join(c["text"] for c in current_chunk) too_short = (len(current_chunk) < MIN_CHUNK_WORDS or len(current_text) < MIN_CHUNK_CHARS) if too_short: # Daj še to besedo zraven (raje malo predolg chunk kot siroto kratek) current_chunk.append(w) else: chunks.append(current_chunk) current_chunk = [w] else: current_chunk.append(w) if current_chunk: # Če zadnji chunk ima samo 1-2 kratki besedi, združi z zadnjim chunkom last_text = " ".join(c["text"] for c in current_chunk) if (chunks and (len(current_chunk) < 2 or len(last_text) < 5)): chunks[-1].extend(current_chunk) else: chunks.append(current_chunk) # Generiraj SRT iz chunks (z dejanskimi word timestampi) for chunk in chunks: cs = chunk[0]["start"] - clip_start ce = chunk[-1]["end"] - clip_start chunk_text = " ".join(w["text"] for w in chunk).upper().strip() if not chunk_text: continue lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text}\n") idx += 1 else: # Fallback: brez word-level timestampov, razdeli enako n_parts = int(duration / MAX_CHUNK_DURATION) + 1 words_split = text_upper.split() words_per_part = max(1, len(words_split) // n_parts) chunk_dur = duration / n_parts for i in range(n_parts): cs = rel_start + i * chunk_dur ce = rel_start + (i + 1) * chunk_dur wstart = i * words_per_part wend = (i + 1) * words_per_part if i < n_parts - 1 else len(words_split) chunk_text = " ".join(words_split[wstart:wend]) if wstart < len(words_split) else text_upper if not chunk_text.strip(): chunk_text = text_upper lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text.strip()}\n") idx += 1 with open(output_path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) return output_path # ──────────────────────────────────────────────────────────────── # Pipeline runner (background task) # ──────────────────────────────────────────────────────────────── def run_subprocess_logged(cmd, job_id, step_name): """Pokliče subprocess, logi gredo v job.""" update_job(job_id, current_step=step_name, status="processing") proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: # Combine stdout + stderr za diagnostiko err_msg = (proc.stderr or "") + "\n" + (proc.stdout or "") update_job( job_id, status="failed", error=f"{step_name}: {err_msg[-800:].strip()}", ) return False # Tudi pri success-u beleži stderr za diagnostiko (samo zadnji del) if proc.stderr and proc.stderr.strip(): update_job(job_id, last_step_log=proc.stderr[-500:].strip()) return True def process_job(job_id): """Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs.""" job = load_job(job_id) if not job: return try: # ── 1. Source preparation ───────────────────────────── if job["source_type"] == "youtube": update_job(job_id, status="downloading", current_step="YouTube download") input_path = UPLOAD_DIR / f"{job_id}_yt.mp4" cmd = [ "python3", str(SCRIPTS_DIR / "yt_download.py"), job["youtube_url"], str(input_path), ] if not run_subprocess_logged(cmd, job_id, "YouTube download"): return update_job(job_id, input_path=str(input_path)) # Probaj dobiti YT naslov za artist+title parsing try: info_cmd = [ "python3", str(SCRIPTS_DIR / "yt_download.py"), job["youtube_url"], "/dev/null", "--info-only", ] proc = subprocess.run(info_cmd, capture_output=True, text=True, timeout=30) if proc.returncode == 0 and proc.stdout: info = json.loads(proc.stdout) yt_title = info.get("title", "") if yt_title: a, t = parse_artist_title(yt_title) updates = {"youtube_title": yt_title} if a: updates["parsed_artist"] = a if t: updates["parsed_title"] = t updates["has_clean_name"] = bool(a and t) update_job(job_id, **updates) # Reload job for downstream use job = load_job(job_id) except Exception as e: print(f"⚠️ Cannot fetch YT title: {e}", flush=True) else: input_path = Path(job["input_path"]) # ── 1b. Music recognition (ACRCloud) — če nimamo artist+title ───── # Tudi za YouTube jobs lahko naslov ni razviden (npr. iz playliste, "Track 5") if not (job.get("parsed_artist") and job.get("parsed_title")): update_job(job_id, current_step="Avto-prepoznavam pesem (ACRCloud)") try: acr_cmd = [ "python3", str(SCRIPTS_DIR / "acr_recognize.py"), str(input_path), ] proc = subprocess.run(acr_cmd, capture_output=True, text=True, timeout=120) if proc.returncode == 0 and proc.stdout: data = json.loads(proc.stdout) a, t = data.get("artist"), data.get("title") if a and t: update_job( job_id, parsed_artist=a, parsed_title=t, has_clean_name=True, recognized_via="acrcloud", ) job = load_job(job_id) print(f"✅ ACR prepoznal: {a} - {t}", flush=True) else: print(f"⚠️ ACR ni prepoznal pesmi", flush=True) else: print(f"⚠️ ACR exit {proc.returncode}: {proc.stderr[:200]}", flush=True) except Exception as e: print(f"⚠️ ACR error: {e}", flush=True) # ── 2. Smart analysis (če auto_chorus) ────────────────────────── # ÄŒe je custom_clip=True (user-edit mode), preskoÄi analizo # in samo posodobi start/duration iz obstojeÄega clip_range v analysis.json if job.get("custom_clip"): update_job(job_id, current_step="User-edit recut (preskočim analizo)") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" srt_from_claude = None if analysis_path.exists(): try: with open(analysis_path, "r", encoding="utf-8") as f: analysis = json.load(f) cr = analysis["clip_range"] fade = analysis.get("fade", {"fade_in": 0.05, "fade_out": 0.5}) # Generiraj nov SRT iz obstoječega transkripta na novem range-u if analysis.get("transcript", {}).get("segments") and not job.get("no_subs"): srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt" try: # Če imamo custom_subtitle_segments (user popravil napise) segs_to_use = analysis.get("custom_subtitle_segments") or analysis["transcript"]["segments"] generate_srt_from_segments( segs_to_use, cr["start"], cr["end"], srt_path_out, ) srt_from_claude = str(srt_path_out) except Exception as e: print(f"⚠️ Recut SRT generation failed: {e}", flush=True) update_job( job_id, start=cr["start"], duration=cr["duration"], fade_in=fade.get("fade_in", 0.05), fade_out=fade.get("fade_out", 0.5), claude_srt_path=srt_from_claude, ) job = load_job(job_id) except Exception as e: update_job(job_id, status="failed", error=f"Recut analysis read: {e}") return else: update_job(job_id, status="failed", error="Analysis manjka za recut") return elif job.get("auto_chorus"): update_job(job_id, current_step="Analiza pesmi (transkript + energija)") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" cmd = [ "python3", str(SCRIPTS_DIR / "analyze.py"), str(input_path), "--target-duration", str(job.get("duration", 30)), "--max-duration", str(job.get("max_duration", 45)), "--min-duration", str(job.get("min_duration", 20)), "--output", str(analysis_path), ] if job.get("include_prebuild"): cmd += ["--include-prebuild"] # LLM provider (claude/gemini/auto) if job.get("llm_provider"): cmd += ["--llm-provider", job["llm_provider"]] if job.get("llm_model"): cmd += ["--llm-model", job["llm_model"]] # Filename hint za Claude/Scribe — preferiraj parsed artist+title (čistejše) if job.get("parsed_artist") and job.get("parsed_title"): fn_hint = f"{job['parsed_artist']} - {job['parsed_title']}" cmd += ["--filename-hint", fn_hint] elif job.get("filename"): fn_hint = Path(job["filename"]).stem cmd += ["--filename-hint", fn_hint] # STT provider (elevenlabs = Scribe, local = faster-whisper, auto = preferiraj Scribe) if job.get("whisper_provider"): cmd += ["--whisper-provider", job["whisper_provider"]] # lang: če None ali 'auto', pusti analyze.py auto-detect if job.get("lang") and job["lang"] not in ("auto", ""): cmd += ["--lang", job["lang"]] cmd += ["--model", job.get("whisper_model", "large-v3")] proc = subprocess.run(cmd, capture_output=True, text=True) srt_from_claude = None # Pot do SRT iz Claude-popravljenega transcript-a if proc.returncode == 0 and analysis_path.exists(): try: with open(analysis_path, "r", encoding="utf-8") as f: analysis = json.load(f) cr = analysis["clip_range"] fade = analysis["fade"] # Generiraj SRT iz transcript-a TRIM-ANEGA na clip_range # (Claude je morda popravil besedilo — uporabi popravljeno) if analysis.get("transcript", {}).get("segments"): srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt" try: generate_srt_from_segments( analysis["transcript"]["segments"], cr["start"], cr["end"], srt_path_out, ) srt_from_claude = str(srt_path_out) llm_src = cr.get("source", "LLM") print(f"📝 Generated SRT from {llm_src} transcript: {srt_path_out}") except Exception as e: print(f"⚠️ SRT generation failed: {e}") update_job( job_id, analysis_summary={ "language": analysis["language"], "language_probability": analysis["language_probability"], "instrumental": analysis["instrumental"], "clip_range": cr, "fade": fade, "chorus_preview": analysis["chorus"]["best"]["text_preview"] if analysis.get("chorus") and analysis["chorus"].get("best") else None, "video_duration": analysis.get("video_duration"), "candidates": analysis["chorus"].get("all_candidates", [])[:5] if analysis.get("chorus") else [], "claude_corrected_text": analysis.get("claude_corrected_text", False), }, # Cel transkript shranimo za UI prikaz full_transcript=[ {"start": s["start"], "end": s["end"], "text": s["text"]} for s in analysis.get("transcript", {}).get("segments", []) ], start=cr["start"], duration=cr["duration"], fade_in=fade["fade_in"], fade_out=fade["fade_out"], detected_language=analysis["language"], is_instrumental=analysis["instrumental"], claude_srt_path=srt_from_claude, ) # Auto-disable subs za instrumental if analysis["instrumental"] and not job.get("no_subs"): update_job(job_id, no_subs=True, auto_disabled_subs=True) # Reload local dict job = load_job(job_id) except (json.JSONDecodeError, KeyError) as e: update_job(job_id, chorus_error=f"Analysis parse: {e}") else: update_job(job_id, chorus_error=(proc.stderr or "")[-500:]) # ── 3. Reframe + subtitles (clip.py orchestrator) ───── output_path = OUTPUT_DIR / f"{job_id}.mp4" update_job(job_id, current_step="Reframe + subtitles") cmd = [ "python3", str(SCRIPTS_DIR / "clip.py"), str(input_path), str(output_path), "--mode", job.get("mode", "track"), "--quality", job.get("quality", "medium"), "--style", job.get("subtitle_style", "reels"), ] if job.get("start") is not None: cmd += ["--start", str(job["start"])] if job.get("duration") is not None: cmd += ["--duration", str(job["duration"])] if job.get("fade_in", 0) > 0: cmd += ["--fade-in", str(job["fade_in"])] if job.get("fade_out", 0) > 0: cmd += ["--fade-out", str(job["fade_out"])] # SRT iz Claude (boljše besedilo) — preda direktno v subtitle.py if job.get("claude_srt_path") and Path(job["claude_srt_path"]).exists() and not job.get("no_subs"): cmd += ["--srt", job["claude_srt_path"]] # lang: prefer detected_language če auto chosen_lang = job.get("lang") if chosen_lang in (None, "auto", ""): chosen_lang = job.get("detected_language") if chosen_lang: cmd += ["--lang", chosen_lang] if job.get("no_subs"): cmd += ["--no-subs"] cmd += ["--model", job.get("whisper_model", "large-v3")] # DEBUG: zapiši natanko kakšen ukaz se izvede update_job(job_id, debug_clip_cmd=" ".join(cmd)) if not run_subprocess_logged(cmd, job_id, "Reframe + subtitles"): return # ── Done ────────────────────────────────────────────── if output_path.exists(): update_job( job_id, status="done", current_step="Končano", output_path=str(output_path), output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2), ) # Telegram obvestilo try: from app.telegram import notify_job_done final_job = load_job(job_id) notify_job_done(final_job) except Exception as e: print(f"⚠️ TG notify_job_done failed: {e}", flush=True) # Batch tracking — če je zadnji v batchu, pošlji summary _try_finalize_batch(job_id) else: update_job( job_id, status="failed", error="Output datoteka ne obstaja po obdelavi", ) try: from app.telegram import notify_job_failed notify_job_failed(load_job(job_id), "Output datoteka ne obstaja") except Exception: pass _try_finalize_batch(job_id) except Exception as e: update_job(job_id, status="failed", error=str(e)) try: from app.telegram import notify_job_failed notify_job_failed(load_job(job_id), str(e)) except Exception: pass _try_finalize_batch(job_id) def _try_finalize_batch(job_id): """Če je job del batch-a, preveri če so vsi zaključili in pošlji summary.""" try: job = load_job(job_id) batch_id = job.get("batch_id") if not batch_id: return # Preštej batch jobe all_jobs = [load_job(jp.stem) for jp in JOBS_DIR.glob("*.json")] batch_jobs = [j for j in all_jobs if j and j.get("batch_id") == batch_id] unfinished = [j for j in batch_jobs if j.get("status") not in ("done", "failed")] if unfinished: return # še niso vsi končani # Vsi so končani — pošlji summary total = len(batch_jobs) succeeded = len([j for j in batch_jobs if j.get("status") == "done"]) failed = total - succeeded try: from app.telegram import notify_batch_complete notify_batch_complete(batch_id, total, succeeded, failed) except Exception as e: print(f"⚠️ TG batch summary failed: {e}", flush=True) except Exception as e: print(f"⚠️ _try_finalize_batch error: {e}", flush=True) # ──────────────────────────────────────────────────────────────── # FastAPI app # ──────────────────────────────────────────────────────────────── app = FastAPI(title="Reels Clipper") app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "static"), name="static") # ──────────────────────────────────────────────────────────────── # Queue worker — procesira queued jobe enega za drugim # ──────────────────────────────────────────────────────────────── import threading _queue_lock = threading.Lock() _queue_running = {"current_job": None} # tracked v dict da je accessible iz threadov def _queue_worker(): """Background thread ki preverja queued jobe in jih obdeluje 1 po 1. Teče forever, sleep 3s med iteracijami če ni dela. """ print("🚜 Queue worker zagnan", flush=True) while True: try: # Že nekaj v obdelavi? Počakaj. if _queue_running["current_job"]: # Preverim ali je status še processing cur = load_job(_queue_running["current_job"]) if cur and cur.get("status") in ("processing", "downloading"): time.sleep(3) continue # Ni več v obdelavi → sprosti _queue_running["current_job"] = None # Najdi naslednjega "queued" joba (FIFO po created_at) queued_jobs = [] for f in JOBS_DIR.glob("*.json"): try: j = json.loads(f.read_text()) if j.get("status") == "queued": queued_jobs.append(j) except Exception: continue if not queued_jobs: time.sleep(3) continue queued_jobs.sort(key=lambda x: x.get("created_at", 0)) next_job = queued_jobs[0] job_id = next_job["id"] # Mark "processing" + zaženi with _queue_lock: _queue_running["current_job"] = job_id print(f"🚜 Queue worker: obdelujem {job_id}", flush=True) try: process_job(job_id) except Exception as e: print(f"❌ Queue worker error pri {job_id}: {e}", flush=True) update_job(job_id, status="failed", error=f"Queue worker: {e}") finally: with _queue_lock: _queue_running["current_job"] = None except Exception as e: print(f"❌ Queue worker outer error: {e}", flush=True) time.sleep(5) # Zaženi worker v ozadju (samo enkrat) _worker_thread = None def _start_queue_worker(): global _worker_thread if _worker_thread is None or not _worker_thread.is_alive(): _worker_thread = threading.Thread(target=_queue_worker, daemon=True) _worker_thread.start() @app.on_event("startup") async def resume_or_cleanup_jobs(): """Ob startu containerja: avto-resume processing jobs ali jih označi kot error. Ko Coolify deploya nov container, prejšnji se ubije sredi obdelave, JSON file pa ostane status='processing'. Strategija: - Če je analyze.json že narejen (analiza je končana) → resume z reframe+subs - Če ni analyze.json → restart pipeline od začetka - Po 3 napakah (resume_attempts >= 3) → mark error """ import asyncio print("🔄 Preverjam in obnavljam jobs po restart-u...") resumed_count = 0 error_count = 0 for f in JOBS_DIR.glob("*.json"): try: j = json.loads(f.read_text()) if j.get("status") != "processing": continue job_id = j.get("job_id") or f.stem attempts = j.get("resume_attempts", 0) # Po 3 neuspehih nehamo if attempts >= 3: j["status"] = "error" j["current_step"] = "Preveč napak pri ponovnem zagonu" j["chorus_error"] = f"Job restartal {attempts} krat — napaka v pipeline-u" j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) error_count += 1 continue # Preveri ali input file še obstaja input_path = UPLOAD_DIR / f"{job_id}.mp4" if not input_path.exists(): j["status"] = "error" j["current_step"] = "Vhodna datoteka ne obstaja" j["chorus_error"] = f"Upload {job_id}.mp4 ne obstaja po restart-u" j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) error_count += 1 continue # Resume: status=queued, +1 attempt, in pošlji v background j["status"] = "queued" j["resume_attempts"] = attempts + 1 j["current_step"] = f"Avto-resume po restart-u (poskus {attempts + 1}/3)" j["last_resume_at"] = time.time() j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) resumed_count += 1 # Pošlji v background po startup-u (ne smemo blokirati startup) asyncio.create_task(_resume_job_async(job_id)) print(f" 🔁 Resume {job_id} (attempt {attempts + 1}/3)") except Exception as e: print(f" ⚠️ Napaka pri {f.name}: {e}") print(f" ✅ Resumed: {resumed_count}, Error: {error_count}") # Zaženi queue worker za queued jobe (multi-upload batch) _start_queue_worker() async def _resume_job_async(job_id): """Pomožna funkcija ki zažene process_job v background-u.""" import asyncio # Počakaj kratek čas da je startup končan await asyncio.sleep(2) try: # process_job je sync funkcija, izvedi v thread executor loop = asyncio.get_event_loop() await loop.run_in_executor(None, process_job, job_id) except Exception as e: print(f" ❌ Resume failed for {job_id}: {e}") @app.get("/", response_class=HTMLResponse) async def index(user: str = Depends(check_auth)): html = (Path(__file__).parent.parent / "templates" / "index.html").read_text() return html @app.get("/healthz") async def healthz(): return {"ok": True} # ──────────────────────────────────────────────────────────────── # Job models # ──────────────────────────────────────────────────────────────── class YouTubeJobIn(BaseModel): url: str mode: str = "track" lang: Optional[str] = None auto_chorus: bool = True start: Optional[float] = None duration: Optional[float] = 30 no_subs: bool = False subtitle_style: str = "reels" whisper_model: str = "large-v3" quality: str = "medium" class StartJobIn(BaseModel): job_id: str mode: str = "track" lang: Optional[str] = None # None/auto = Whisper auto-detect auto_chorus: bool = True include_prebuild: bool = False # vključi pre-chorus build-up start: Optional[float] = None duration: Optional[float] = 30 max_duration: Optional[float] = 45 min_duration: Optional[float] = 20 no_subs: bool = False subtitle_style: str = "reels" whisper_model: str = "large-v3" quality: str = "medium" # LLM za semantično analizo + popravke llm_provider: str = "claude" # claude / gemini / auto llm_model: Optional[str] = None # specifičen model (privzeto najboljši za provider) # STT provider (Scribe je 18x hitreje + boljša multilingual accuracy) whisper_provider: str = "auto" # auto / elevenlabs / local # Batch tracking za multi-upload (Telegram summary) batch_id: Optional[str] = None # ──────────────────────────────────────────────────────────────── # Upload (file) # ──────────────────────────────────────────────────────────────── @app.post("/api/upload") async def upload_video( file: UploadFile = File(...), artist: Optional[str] = Form(None), title: Optional[str] = Form(None), batch_id: Optional[str] = Form(None), user: str = Depends(check_auth), ): if not file.filename: raise HTTPException(400, "Brez imena") job_id = uuid.uuid4().hex[:12] ext = Path(file.filename).suffix or ".mp4" input_path = UPLOAD_DIR / f"{job_id}{ext}" size = 0 with input_path.open("wb") as f: while chunk := await file.read(1024 * 1024): size += len(chunk) if size > MAX_UPLOAD_MB * 1024 * 1024: f.close() input_path.unlink(missing_ok=True) raise HTTPException(413, f"Prevelika datoteka (limit {MAX_UPLOAD_MB} MB)") f.write(chunk) job = { "id": job_id, "source_type": "upload", "filename": file.filename, "input_path": str(input_path), "size_mb": round(size / 1024 / 1024, 2), "status": "uploaded", "current_step": "Naloženo, čaka na obdelavo", "created_at": time.time(), "updated_at": time.time(), } if batch_id: job["batch_id"] = batch_id # Artist + title — najprej user-provided, potem parse iz filename if artist and title: # User je vpisal ali potrdil job["parsed_artist"] = artist.strip() job["parsed_title"] = title.strip() job["has_clean_name"] = True else: # Auto parse iz filename a, t = parse_artist_title(file.filename) if a: job["parsed_artist"] = a if t: job["parsed_title"] = t job["has_clean_name"] = bool(a and t) save_job(job) return job # ──────────────────────────────────────────────────────────────── # YouTube submit # ──────────────────────────────────────────────────────────────── @app.post("/api/youtube") async def submit_youtube( payload: YouTubeJobIn, background: BackgroundTasks, user: str = Depends(check_auth), ): job_id = uuid.uuid4().hex[:12] job = { "id": job_id, "source_type": "youtube", "youtube_url": payload.url, "status": "queued", "current_step": "V vrsti za YouTube prenos", "created_at": time.time(), "updated_at": time.time(), "mode": payload.mode, "lang": payload.lang, "auto_chorus": payload.auto_chorus, "start": payload.start, "duration": payload.duration, "no_subs": payload.no_subs, "subtitle_style": payload.subtitle_style, "whisper_model": payload.whisper_model, "quality": payload.quality, } save_job(job) # Queue worker bo pograbil return job # ──────────────────────────────────────────────────────────────── # Start processing for uploaded job # ──────────────────────────────────────────────────────────────── @app.post("/api/process") async def start_processing( payload: StartJobIn, background: BackgroundTasks, user: str = Depends(check_auth), ): job = load_job(payload.job_id) if not job: raise HTTPException(404, "Job ne obstaja") update_job( payload.job_id, status="queued", mode=payload.mode, lang=payload.lang, auto_chorus=payload.auto_chorus, include_prebuild=payload.include_prebuild, start=payload.start, duration=payload.duration, max_duration=payload.max_duration, min_duration=payload.min_duration, no_subs=payload.no_subs, subtitle_style=payload.subtitle_style, whisper_model=payload.whisper_model, quality=payload.quality, llm_provider=payload.llm_provider, llm_model=payload.llm_model, whisper_provider=payload.whisper_provider, batch_id=payload.batch_id, current_step="V vrsti za obdelavo", # Počisti pretekle napake (retry-friendly) chorus_error=None, interrupted_at=None, ) # Queue worker (background thread) bo pograbil ta job — ne zaganjamo neposredno. # To pomeni, da se ob več upload-ih obdelujejo zaporedno (1 hkrati = stabilno). return load_job(payload.job_id) # ──────────────────────────────────────────────────────────────── # Job queries # ──────────────────────────────────────────────────────────────── @app.get("/api/jobs") async def get_jobs(user: str = Depends(check_auth)): return {"jobs": list_jobs()} @app.get("/api/jobs/{job_id}") async def get_job(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") return _clean_job_titles(job) @app.get("/api/stream/{job_id}") async def stream_job(job_id: str, user: str = Depends(check_auth)): """Server-Sent Events za real-time status.""" async def gen(): last_status = None last_step = None for _ in range(600): # max 10 min stream job = load_job(job_id) if not job: yield f"data: {json.dumps({'error': 'not found'})}\n\n" return if job["status"] != last_status or job.get("current_step") != last_step: yield f"data: {json.dumps(job, ensure_ascii=False)}\n\n" last_status = job["status"] last_step = job.get("current_step") if job["status"] in ("done", "failed"): return await asyncio.sleep(1) return StreamingResponse(gen(), media_type="text/event-stream") # ──────────────────────────────────────────────────────────────── # Download / preview # ──────────────────────────────────────────────────────────────── @app.get("/api/download/{job_id}") async def download(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) if not out.exists(): raise HTTPException(404, "Output ne obstaja") # Pametno ime: "Izvajalec - Naslov - REEL.mp4" download_name = build_download_filename(job) return FileResponse( out, media_type="video/mp4", filename=download_name, ) @app.get("/api/preview/{job_id}") async def preview(job_id: str, request: Request, user: str = Depends(check_auth)): """Video preview z Range request podporo (potrebno za HTML5 video player).""" job = load_job(job_id) if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) if not out.exists(): raise HTTPException(404, "Output ne obstaja") file_size = out.stat().st_size range_header = request.headers.get("range") or request.headers.get("Range") if range_header: # Parse "bytes=START-END" try: range_str = range_header.replace("bytes=", "").strip() start_s, end_s = range_str.split("-") start = int(start_s) if start_s else 0 end = int(end_s) if end_s else file_size - 1 end = min(end, file_size - 1) if start > end or start >= file_size: return Response(status_code=416) # Range Not Satisfiable chunk_size = end - start + 1 def iter_file(): with open(out, "rb") as f: f.seek(start) remaining = chunk_size while remaining > 0: read_size = min(64 * 1024, remaining) data = f.read(read_size) if not data: break remaining -= len(data) yield data headers = { "Content-Range": f"bytes {start}-{end}/{file_size}", "Accept-Ranges": "bytes", "Content-Length": str(chunk_size), "Content-Type": "video/mp4", } return StreamingResponse(iter_file(), status_code=206, headers=headers, media_type="video/mp4") except (ValueError, IndexError): pass # Brez Range — vrni cel file return FileResponse( out, media_type="video/mp4", headers={"Accept-Ranges": "bytes", "Content-Length": str(file_size)}, ) @app.delete("/api/jobs/{job_id}") async def delete_job(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") for key in ("input_path", "output_path"): p = job.get(key) if p and Path(p).exists(): Path(p).unlink(missing_ok=True) job_path(job_id).unlink(missing_ok=True) return {"deleted": job_id} # ─── EDIT FEATURE ──────────────────────────────────────────────── @app.get("/api/source-video/{job_id}") async def source_video(job_id: str, quality: str = "high", user: str = Depends(check_auth)): """Vrne 16:9 original video za predogled v editorju. quality='high' (default): originalni file quality='low': 480p re-encoded version (cached) — za hitro scrubbanje v editorju """ job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") if not src or not Path(src).exists(): raise HTTPException(404, "Original video ne obstaja") if quality == "low": # 480p cached za hitro scrubbanje cache_path = OUTPUT_DIR / f"{job_id}_source_low.mp4" cache_valid = cache_path.exists() and cache_path.stat().st_size > 1024 if not cache_valid: if cache_path.exists(): cache_path.unlink() cmd = [ "ffmpeg", "-y", "-i", str(src), "-vf", "scale=854:480:force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2", "-c:v", "libx264", "-preset", "veryfast", # malo boljša kvaliteta od ultrafast "-crf", "28", "-c:a", "aac", "-b:a", "96k", "-movflags", "+faststart", "-loglevel", "error", str(cache_path), ] try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if proc.returncode != 0 or not cache_path.exists() or cache_path.stat().st_size < 1024: if cache_path.exists(): cache_path.unlink() raise HTTPException(500, f"FFmpeg failed: {(proc.stderr or 'unknown')[-300:]}") except subprocess.TimeoutExpired: if cache_path.exists(): cache_path.unlink() raise HTTPException(500, "Low-q source render timeout (>120s)") return FileResponse( path=cache_path, media_type="video/mp4", filename=f"source_low_{job_id}.mp4", headers={"Accept-Ranges": "bytes", "Cache-Control": "max-age=3600"}, ) return FileResponse( path=src, media_type="video/mp4", filename=f"source_{job_id}.mp4", headers={"Accept-Ranges": "bytes"}, ) @app.api_route("/api/preview-clip/{job_id}", methods=["GET", "HEAD"]) async def preview_clip( job_id: str, start: float, end: float, user: str = Depends(check_auth), ): """Live preview odseka — low-quality 480p clip za hiter predogled. BREZ reframe (16:9 ostane), BREZ napisov, BREZ face tracking. Cilj: ~2-3s render za takojšen feedback med Edit dragom. Cache po start+end timestampih da ponovljeni request ne renderira ponovno. Podpira HEAD (frontend cache check). """ job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") if not src or not Path(src).exists(): raise HTTPException(404, "Original video ne obstaja") if end <= start: raise HTTPException(400, "end mora biti večji od start") duration = end - start if duration < 1: raise HTTPException(400, "Trajanje vsaj 1s") if duration > 90: raise HTTPException(400, "Trajanje največ 90s") # Cache key — preview se shrani in ne re-renderira če isti range cache_key = f"{job_id}_preview_{start:.1f}_{end:.1f}.mp4" cache_path = OUTPUT_DIR / cache_key # Validate cache: datoteka mora obstajati IN biti vsaj 1KB cache_valid = cache_path.exists() and cache_path.stat().st_size > 1024 if not cache_valid: # Briši staro prazno če obstaja if cache_path.exists(): cache_path.unlink() # ffmpeg low-q clip — fast seek + force even dimensions for libx264 cmd = [ "ffmpeg", "-y", "-ss", f"{max(0, start - 0.5):.2f}", # 0.5s buffer za keyframe "-i", str(src), "-ss", f"{min(0.5, start):.2f}", # fine seek "-t", f"{duration:.2f}", "-vf", "scale=854:480:force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2", # 480p, even dimensions "-c:v", "libx264", "-preset", "ultrafast", "-crf", "30", "-c:a", "aac", "-b:a", "96k", "-movflags", "+faststart", "-loglevel", "error", str(cache_path), ] try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=20) if proc.returncode != 0 or not cache_path.exists() or cache_path.stat().st_size < 1024: # Briši nepopolno datoteko if cache_path.exists(): cache_path.unlink() raise HTTPException(500, f"FFmpeg failed: {(proc.stderr or 'unknown')[-300:]}") except subprocess.TimeoutExpired: if cache_path.exists(): cache_path.unlink() raise HTTPException(500, "Preview render timeout (>20s)") return FileResponse( path=cache_path, media_type="video/mp4", headers={"Accept-Ranges": "bytes", "Cache-Control": "max-age=300"}, ) @app.get("/api/transcript/{job_id}") async def get_transcript(job_id: str, user: str = Depends(check_auth)): """Vrne transkript (segmente + words) za editor.""" job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" if not analysis_path.exists(): raise HTTPException(404, "Analysis ne obstaja") try: analysis = json.loads(analysis_path.read_text()) transcript = analysis.get("transcript", {}) clip_range = analysis.get("clip_range", {}) # video_duration: probaj iz analysis, sicer iz zadnjega segmenta, sicer 0 video_dur = analysis.get("video_duration") or job.get("video_duration") or 0 if not video_dur and transcript.get("segments"): video_dur = max((s.get("end", 0) for s in transcript["segments"]), default=0) return { "segments": transcript.get("segments", []), "language": transcript.get("language", "?"), "clip_range": clip_range, "video_duration": video_dur, } except Exception as e: raise HTTPException(500, f"Napaka pri branju: {e}") class RecutRequest(BaseModel): start: float end: float custom_segments: Optional[list] = None # [{start, end, text}] za override napisov no_subs: Optional[bool] = None @app.post("/api/jobs/{job_id}/recut") async def recut_job(job_id: str, payload: RecutRequest, user: str = Depends(check_auth)): """Re-rendar reel z user-defined timestampi (in opcijsko popravljenimi napisi). Veliko hitrejše od full pipeline-a: - Brez Soniox transkripta (re-uporabi shranjenega) - Brez Claude analize (uporabnik je odločil) - Samo: clip → reframe → subtitle → output """ job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") if not src or not Path(src).exists(): raise HTTPException(400, "Original video manjka") if payload.end <= payload.start: raise HTTPException(400, "End mora biti večji od start") if payload.end - payload.start < 5: raise HTTPException(400, "Trajanje vsaj 5s") if payload.end - payload.start > 60: raise HTTPException(400, "Trajanje največ 60s") duration = payload.end - payload.start no_subs = payload.no_subs if payload.no_subs is not None else job.get("no_subs", False) # Naloži obstoječi analysis analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" if not analysis_path.exists(): raise HTTPException(500, "Analysis manjka — re-uplad pesmi") analysis = json.loads(analysis_path.read_text()) # Posodobi clip range analysis["clip_range"] = { "start": payload.start, "end": payload.end, "duration": duration, "reason": f"USER EDIT: ročno popravljen clip {payload.start:.1f}-{payload.end:.1f}s", "source": "user_edit", } # Če uporabnik popravil napise, override segmenti if payload.custom_segments: # Ohrani originalni transcript ampak shrani custom segments za SRT analysis["custom_subtitle_segments"] = payload.custom_segments analysis_path.write_text(json.dumps(analysis, indent=2, ensure_ascii=False)) # Re-queue job v processing (worker ga bo obdelal) update_job( job_id, status="queued", no_subs=no_subs, custom_clip=True, # flag da preskoči Soniox + Claude current_step="V vrsti za recut", error=None, chorus_error=None, ) # Briši stari output out_mp4 = OUTPUT_DIR / f"{job_id}.mp4" if out_mp4.exists(): out_mp4.unlink() for ext in ("srt", "ass"): p = OUTPUT_DIR / f"{job_id}.subtitles.{ext}" if p.exists(): p.unlink() return { "status": "queued", "job_id": job_id, "start": payload.start, "end": payload.end, "duration": duration, }