""" reels.biba.live — FastAPI backend. Endpoints: GET / — frontend HTML POST /api/upload — naloži video file POST /api/youtube — submit YouTube URL POST /api/process/{id} — start processing job GET /api/jobs — list vseh jobov GET /api/jobs/{id} — status job-a GET /api/stream/{id} — SSE progress stream GET /api/download/{id} — download finalni reel GET /api/preview/{id} — preview video stream DELETE /api/jobs/{id} — pobriši job + datoteke """ import asyncio import json import os import secrets import shutil import subprocess import time import uuid from pathlib import Path from typing import Optional from fastapi import ( FastAPI, UploadFile, File, Form, HTTPException, Depends, BackgroundTasks, Request, status ) from fastapi.responses import ( FileResponse, HTMLResponse, StreamingResponse, JSONResponse, Response ) from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBasic, HTTPBasicCredentials from pydantic import BaseModel # ──────────────────────────────────────────────────────────────── # Config # ──────────────────────────────────────────────────────────────── DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) UPLOAD_DIR = DATA_DIR / "uploads" OUTPUT_DIR = DATA_DIR / "outputs" JOBS_DIR = DATA_DIR / "jobs" SCRIPTS_DIR = Path(__file__).parent.parent / "scripts" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) JOBS_DIR.mkdir(parents=True, exist_ok=True) AUTH_USER = os.environ.get("AUTH_USER", "sebastjan") AUTH_PASS = os.environ.get("AUTH_PASS", "change-me-in-coolify-env") MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "2000")) # ──────────────────────────────────────────────────────────────── # Auth # ──────────────────────────────────────────────────────────────── security = HTTPBasic() def check_auth(creds: HTTPBasicCredentials = Depends(security)): correct_user = secrets.compare_digest(creds.username, AUTH_USER) correct_pass = secrets.compare_digest(creds.password, AUTH_PASS) if not (correct_user and correct_pass): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Napačno geslo", headers={"WWW-Authenticate": "Basic"}, ) return creds.username # ──────────────────────────────────────────────────────────────── # Job state (filesystem-based, persistent prek restartov) # ──────────────────────────────────────────────────────────────── def job_path(job_id): return JOBS_DIR / f"{job_id}.json" def load_job(job_id): p = job_path(job_id) if not p.exists(): return None return json.loads(p.read_text()) def save_job(job): job_path(job["id"]).write_text(json.dumps(job, ensure_ascii=False, indent=2)) def update_job(job_id, **kwargs): job = load_job(job_id) if not job: return None job.update(kwargs) job["updated_at"] = time.time() save_job(job) return job def list_jobs(): out = [] for f in sorted(JOBS_DIR.glob("*.json"), reverse=True): try: out.append(json.loads(f.read_text())) except Exception: pass return out def generate_srt_from_segments(segments, clip_start, clip_end, output_path): """Generira SRT samo za dele, ki spadajo v [clip_start, clip_end]. Timestamp-i so re-mapirani na 0-based (kot je v trim-anem videu). Razdeli dolge segmente (>2.5s) na enake kose za hiter pacing v reels stilu. Vse besedilo VELIKE TISKANE ČRKE. """ MAX_CHUNK_DURATION = 2.5 def fmt_ts(s): h = int(s // 3600) m = int((s % 3600) // 60) sec = s % 60 return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",") lines = [] idx = 1 for seg in segments: s_start = float(seg["start"]) s_end = float(seg["end"]) text = str(seg["text"]).strip() # Filter v range if s_end <= clip_start or s_start >= clip_end: continue # Klipni s_start = max(s_start, clip_start) s_end = min(s_end, clip_end) if s_end - s_start < 0.2: continue # Re-mapraj na 0-based rel_start = s_start - clip_start rel_end = s_end - clip_start if not text: continue text_upper = text.upper() # Razdeli na chunk-e če je predolg duration = rel_end - rel_start if duration <= MAX_CHUNK_DURATION: lines.append(f"{idx}\n{fmt_ts(rel_start)} --> {fmt_ts(rel_end)}\n{text_upper}\n") idx += 1 else: # Razdeli na N enakih kosov; če ima Whisper word-timing, jih lahko razdelimo bolje, # ampak za zdaj enako razdelimo n_parts = int(duration / MAX_CHUNK_DURATION) + 1 words = text_upper.split() words_per_part = max(1, len(words) // n_parts) chunk_dur = duration / n_parts for i in range(n_parts): cs = rel_start + i * chunk_dur ce = rel_start + (i + 1) * chunk_dur # Vzemi pripadajoče besede wstart = i * words_per_part wend = (i + 1) * words_per_part if i < n_parts - 1 else len(words) chunk_text = " ".join(words[wstart:wend]) if wstart < len(words) else text_upper if not chunk_text.strip(): chunk_text = text_upper lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text.strip()}\n") idx += 1 with open(output_path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) return output_path # ──────────────────────────────────────────────────────────────── # Pipeline runner (background task) # ──────────────────────────────────────────────────────────────── def run_subprocess_logged(cmd, job_id, step_name): """Pokliče subprocess, logi gredo v job.""" update_job(job_id, current_step=step_name, status="processing") proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: # Combine stdout + stderr za diagnostiko err_msg = (proc.stderr or "") + "\n" + (proc.stdout or "") update_job( job_id, status="failed", error=f"{step_name}: {err_msg[-800:].strip()}", ) return False # Tudi pri success-u beleži stderr za diagnostiko (samo zadnji del) if proc.stderr and proc.stderr.strip(): update_job(job_id, last_step_log=proc.stderr[-500:].strip()) return True def process_job(job_id): """Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs.""" job = load_job(job_id) if not job: return try: # ── 1. Source preparation ───────────────────────────── if job["source_type"] == "youtube": update_job(job_id, status="downloading", current_step="YouTube download") input_path = UPLOAD_DIR / f"{job_id}_yt.mp4" cmd = [ "python3", str(SCRIPTS_DIR / "yt_download.py"), job["youtube_url"], str(input_path), ] if not run_subprocess_logged(cmd, job_id, "YouTube download"): return update_job(job_id, input_path=str(input_path)) else: input_path = Path(job["input_path"]) # ── 2. Smart analysis (če auto_chorus) ────────────────────────── if job.get("auto_chorus"): update_job(job_id, current_step="Analiza pesmi (transkript + energija)") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" cmd = [ "python3", str(SCRIPTS_DIR / "analyze.py"), str(input_path), "--target-duration", str(job.get("duration", 30)), "--max-duration", str(job.get("max_duration", 45)), "--min-duration", str(job.get("min_duration", 20)), "--output", str(analysis_path), ] if job.get("include_prebuild"): cmd += ["--include-prebuild"] # LLM provider (claude/gemini/auto) if job.get("llm_provider"): cmd += ["--llm-provider", job["llm_provider"]] if job.get("llm_model"): cmd += ["--llm-model", job["llm_model"]] # lang: če None ali 'auto', pusti analyze.py auto-detect if job.get("lang") and job["lang"] not in ("auto", ""): cmd += ["--lang", job["lang"]] cmd += ["--model", job.get("whisper_model", "large-v3")] proc = subprocess.run(cmd, capture_output=True, text=True) srt_from_claude = None # Pot do SRT iz Claude-popravljenega transcript-a if proc.returncode == 0 and analysis_path.exists(): try: with open(analysis_path, "r", encoding="utf-8") as f: analysis = json.load(f) cr = analysis["clip_range"] fade = analysis["fade"] # Generiraj SRT iz transcript-a TRIM-ANEGA na clip_range # (Claude je morda popravil besedilo — uporabi popravljeno) if analysis.get("transcript", {}).get("segments"): srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt" try: generate_srt_from_segments( analysis["transcript"]["segments"], cr["start"], cr["end"], srt_path_out, ) srt_from_claude = str(srt_path_out) llm_src = cr.get("source", "LLM") print(f"📝 Generated SRT from {llm_src} transcript: {srt_path_out}") except Exception as e: print(f"⚠️ SRT generation failed: {e}") update_job( job_id, analysis_summary={ "language": analysis["language"], "language_probability": analysis["language_probability"], "instrumental": analysis["instrumental"], "clip_range": cr, "fade": fade, "chorus_preview": analysis["chorus"]["best"]["text_preview"] if analysis.get("chorus") and analysis["chorus"].get("best") else None, "video_duration": analysis.get("video_duration"), "candidates": analysis["chorus"].get("all_candidates", [])[:5] if analysis.get("chorus") else [], "claude_corrected_text": analysis.get("claude_corrected_text", False), }, # Cel transkript shranimo za UI prikaz full_transcript=[ {"start": s["start"], "end": s["end"], "text": s["text"]} for s in analysis.get("transcript", {}).get("segments", []) ], start=cr["start"], duration=cr["duration"], fade_in=fade["fade_in"], fade_out=fade["fade_out"], detected_language=analysis["language"], is_instrumental=analysis["instrumental"], claude_srt_path=srt_from_claude, ) # Auto-disable subs za instrumental if analysis["instrumental"] and not job.get("no_subs"): update_job(job_id, no_subs=True, auto_disabled_subs=True) # Reload local dict job = load_job(job_id) except (json.JSONDecodeError, KeyError) as e: update_job(job_id, chorus_error=f"Analysis parse: {e}") else: update_job(job_id, chorus_error=(proc.stderr or "")[-500:]) # ── 3. Reframe + subtitles (clip.py orchestrator) ───── output_path = OUTPUT_DIR / f"{job_id}.mp4" update_job(job_id, current_step="Reframe + subtitles") cmd = [ "python3", str(SCRIPTS_DIR / "clip.py"), str(input_path), str(output_path), "--mode", job.get("mode", "track"), "--quality", job.get("quality", "medium"), "--style", job.get("subtitle_style", "reels"), ] if job.get("start") is not None: cmd += ["--start", str(job["start"])] if job.get("duration") is not None: cmd += ["--duration", str(job["duration"])] if job.get("fade_in", 0) > 0: cmd += ["--fade-in", str(job["fade_in"])] if job.get("fade_out", 0) > 0: cmd += ["--fade-out", str(job["fade_out"])] # SRT iz Claude (boljše besedilo) — preda direktno v subtitle.py if job.get("claude_srt_path") and Path(job["claude_srt_path"]).exists() and not job.get("no_subs"): cmd += ["--srt", job["claude_srt_path"]] # lang: prefer detected_language če auto chosen_lang = job.get("lang") if chosen_lang in (None, "auto", ""): chosen_lang = job.get("detected_language") if chosen_lang: cmd += ["--lang", chosen_lang] if job.get("no_subs"): cmd += ["--no-subs"] cmd += ["--model", job.get("whisper_model", "large-v3")] # DEBUG: zapiši natanko kakšen ukaz se izvede update_job(job_id, debug_clip_cmd=" ".join(cmd)) if not run_subprocess_logged(cmd, job_id, "Reframe + subtitles"): return # ── Done ────────────────────────────────────────────── if output_path.exists(): update_job( job_id, status="done", current_step="Končano", output_path=str(output_path), output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2), ) else: update_job( job_id, status="failed", error="Output datoteka ne obstaja po obdelavi", ) except Exception as e: update_job(job_id, status="failed", error=str(e)) # ──────────────────────────────────────────────────────────────── # FastAPI app # ──────────────────────────────────────────────────────────────── app = FastAPI(title="Reels Clipper") app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "static"), name="static") @app.on_event("startup") async def resume_or_cleanup_jobs(): """Ob startu containerja: avto-resume processing jobs ali jih označi kot error. Ko Coolify deploya nov container, prejšnji se ubije sredi obdelave, JSON file pa ostane status='processing'. Strategija: - Če je analyze.json že narejen (analiza je končana) → resume z reframe+subs - Če ni analyze.json → restart pipeline od začetka - Po 3 napakah (resume_attempts >= 3) → mark error """ import asyncio print("🔄 Preverjam in obnavljam jobs po restart-u...") resumed_count = 0 error_count = 0 for f in JOBS_DIR.glob("*.json"): try: j = json.loads(f.read_text()) if j.get("status") != "processing": continue job_id = j.get("job_id") or f.stem attempts = j.get("resume_attempts", 0) # Po 3 neuspehih nehamo if attempts >= 3: j["status"] = "error" j["current_step"] = "Preveč napak pri ponovnem zagonu" j["chorus_error"] = f"Job restartal {attempts} krat — napaka v pipeline-u" j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) error_count += 1 continue # Preveri ali input file še obstaja input_path = UPLOAD_DIR / f"{job_id}.mp4" if not input_path.exists(): j["status"] = "error" j["current_step"] = "Vhodna datoteka ne obstaja" j["chorus_error"] = f"Upload {job_id}.mp4 ne obstaja po restart-u" j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) error_count += 1 continue # Resume: status=queued, +1 attempt, in pošlji v background j["status"] = "queued" j["resume_attempts"] = attempts + 1 j["current_step"] = f"Avto-resume po restart-u (poskus {attempts + 1}/3)" j["last_resume_at"] = time.time() j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) resumed_count += 1 # Pošlji v background po startup-u (ne smemo blokirati startup) asyncio.create_task(_resume_job_async(job_id)) print(f" 🔁 Resume {job_id} (attempt {attempts + 1}/3)") except Exception as e: print(f" ⚠️ Napaka pri {f.name}: {e}") print(f" ✅ Resumed: {resumed_count}, Error: {error_count}") async def _resume_job_async(job_id): """Pomožna funkcija ki zažene process_job v background-u.""" import asyncio # Počakaj kratek čas da je startup končan await asyncio.sleep(2) try: # process_job je sync funkcija, izvedi v thread executor loop = asyncio.get_event_loop() await loop.run_in_executor(None, process_job, job_id) except Exception as e: print(f" ❌ Resume failed for {job_id}: {e}") @app.get("/", response_class=HTMLResponse) async def index(user: str = Depends(check_auth)): html = (Path(__file__).parent.parent / "templates" / "index.html").read_text() return html @app.get("/healthz") async def healthz(): return {"ok": True} # ──────────────────────────────────────────────────────────────── # Job models # ──────────────────────────────────────────────────────────────── class YouTubeJobIn(BaseModel): url: str mode: str = "track" lang: Optional[str] = None auto_chorus: bool = True start: Optional[float] = None duration: Optional[float] = 30 no_subs: bool = False subtitle_style: str = "reels" whisper_model: str = "large-v3" quality: str = "medium" class StartJobIn(BaseModel): job_id: str mode: str = "track" lang: Optional[str] = None # None/auto = Whisper auto-detect auto_chorus: bool = True include_prebuild: bool = False # vključi pre-chorus build-up start: Optional[float] = None duration: Optional[float] = 30 max_duration: Optional[float] = 45 min_duration: Optional[float] = 20 no_subs: bool = False subtitle_style: str = "reels" whisper_model: str = "large-v3" quality: str = "medium" # LLM za semantično analizo + popravke llm_provider: str = "claude" # claude / gemini / auto llm_model: Optional[str] = None # specifičen model (privzeto najboljši za provider) # ──────────────────────────────────────────────────────────────── # Upload (file) # ──────────────────────────────────────────────────────────────── @app.post("/api/upload") async def upload_video( file: UploadFile = File(...), user: str = Depends(check_auth), ): if not file.filename: raise HTTPException(400, "Brez imena") job_id = uuid.uuid4().hex[:12] ext = Path(file.filename).suffix or ".mp4" input_path = UPLOAD_DIR / f"{job_id}{ext}" size = 0 with input_path.open("wb") as f: while chunk := await file.read(1024 * 1024): size += len(chunk) if size > MAX_UPLOAD_MB * 1024 * 1024: f.close() input_path.unlink(missing_ok=True) raise HTTPException(413, f"Prevelika datoteka (limit {MAX_UPLOAD_MB} MB)") f.write(chunk) job = { "id": job_id, "source_type": "upload", "filename": file.filename, "input_path": str(input_path), "size_mb": round(size / 1024 / 1024, 2), "status": "uploaded", "current_step": "Naloženo, čaka na obdelavo", "created_at": time.time(), "updated_at": time.time(), } save_job(job) return job # ──────────────────────────────────────────────────────────────── # YouTube submit # ──────────────────────────────────────────────────────────────── @app.post("/api/youtube") async def submit_youtube( payload: YouTubeJobIn, background: BackgroundTasks, user: str = Depends(check_auth), ): job_id = uuid.uuid4().hex[:12] job = { "id": job_id, "source_type": "youtube", "youtube_url": payload.url, "status": "queued", "current_step": "V vrsti za YouTube prenos", "created_at": time.time(), "updated_at": time.time(), "mode": payload.mode, "lang": payload.lang, "auto_chorus": payload.auto_chorus, "start": payload.start, "duration": payload.duration, "no_subs": payload.no_subs, "subtitle_style": payload.subtitle_style, "whisper_model": payload.whisper_model, "quality": payload.quality, } save_job(job) background.add_task(process_job, job_id) return job # ──────────────────────────────────────────────────────────────── # Start processing for uploaded job # ──────────────────────────────────────────────────────────────── @app.post("/api/process") async def start_processing( payload: StartJobIn, background: BackgroundTasks, user: str = Depends(check_auth), ): job = load_job(payload.job_id) if not job: raise HTTPException(404, "Job ne obstaja") update_job( payload.job_id, status="queued", mode=payload.mode, lang=payload.lang, auto_chorus=payload.auto_chorus, include_prebuild=payload.include_prebuild, start=payload.start, duration=payload.duration, max_duration=payload.max_duration, min_duration=payload.min_duration, no_subs=payload.no_subs, subtitle_style=payload.subtitle_style, whisper_model=payload.whisper_model, quality=payload.quality, llm_provider=payload.llm_provider, llm_model=payload.llm_model, current_step="V vrsti za obdelavo", # Počisti pretekle napake (retry-friendly) chorus_error=None, interrupted_at=None, ) background.add_task(process_job, payload.job_id) return load_job(payload.job_id) # ──────────────────────────────────────────────────────────────── # Job queries # ──────────────────────────────────────────────────────────────── @app.get("/api/jobs") async def get_jobs(user: str = Depends(check_auth)): return {"jobs": list_jobs()} @app.get("/api/jobs/{job_id}") async def get_job(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") return job @app.get("/api/stream/{job_id}") async def stream_job(job_id: str, user: str = Depends(check_auth)): """Server-Sent Events za real-time status.""" async def gen(): last_status = None last_step = None for _ in range(600): # max 10 min stream job = load_job(job_id) if not job: yield f"data: {json.dumps({'error': 'not found'})}\n\n" return if job["status"] != last_status or job.get("current_step") != last_step: yield f"data: {json.dumps(job, ensure_ascii=False)}\n\n" last_status = job["status"] last_step = job.get("current_step") if job["status"] in ("done", "failed"): return await asyncio.sleep(1) return StreamingResponse(gen(), media_type="text/event-stream") # ──────────────────────────────────────────────────────────────── # Download / preview # ──────────────────────────────────────────────────────────────── @app.get("/api/download/{job_id}") async def download(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) if not out.exists(): raise HTTPException(404, "Output ne obstaja") return FileResponse( out, media_type="video/mp4", filename=f"reel_{job_id}.mp4", ) @app.get("/api/preview/{job_id}") async def preview(job_id: str, request: Request, user: str = Depends(check_auth)): """Video preview z Range request podporo (potrebno za HTML5 video player).""" job = load_job(job_id) if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) if not out.exists(): raise HTTPException(404, "Output ne obstaja") file_size = out.stat().st_size range_header = request.headers.get("range") or request.headers.get("Range") if range_header: # Parse "bytes=START-END" try: range_str = range_header.replace("bytes=", "").strip() start_s, end_s = range_str.split("-") start = int(start_s) if start_s else 0 end = int(end_s) if end_s else file_size - 1 end = min(end, file_size - 1) if start > end or start >= file_size: return Response(status_code=416) # Range Not Satisfiable chunk_size = end - start + 1 def iter_file(): with open(out, "rb") as f: f.seek(start) remaining = chunk_size while remaining > 0: read_size = min(64 * 1024, remaining) data = f.read(read_size) if not data: break remaining -= len(data) yield data headers = { "Content-Range": f"bytes {start}-{end}/{file_size}", "Accept-Ranges": "bytes", "Content-Length": str(chunk_size), "Content-Type": "video/mp4", } return StreamingResponse(iter_file(), status_code=206, headers=headers, media_type="video/mp4") except (ValueError, IndexError): pass # Brez Range — vrni cel file return FileResponse( out, media_type="video/mp4", headers={"Accept-Ranges": "bytes", "Content-Length": str(file_size)}, ) @app.delete("/api/jobs/{job_id}") async def delete_job(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") for key in ("input_path", "output_path"): p = job.get(key) if p and Path(p).exists(): Path(p).unlink(missing_ok=True) job_path(job_id).unlink(missing_ok=True) return {"deleted": job_id}