""" reels.biba.live — FastAPI backend. Endpoints: GET / — frontend HTML POST /api/upload — naloži video file POST /api/youtube — submit YouTube URL POST /api/process/{id} — start processing job GET /api/jobs — list vseh jobov GET /api/jobs/{id} — status job-a GET /api/stream/{id} — SSE progress stream GET /api/download/{id} — download finalni reel GET /api/preview/{id} — preview video stream DELETE /api/jobs/{id} — pobriši job + datoteke """ import asyncio import json import os import secrets import shutil import subprocess import time import uuid from pathlib import Path from typing import Optional from fastapi import ( FastAPI, UploadFile, File, Form, HTTPException, Depends, BackgroundTasks, Request, status ) from fastapi.responses import ( FileResponse, HTMLResponse, StreamingResponse, JSONResponse ) from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBasic, HTTPBasicCredentials from pydantic import BaseModel # ──────────────────────────────────────────────────────────────── # Config # ──────────────────────────────────────────────────────────────── DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) UPLOAD_DIR = DATA_DIR / "uploads" OUTPUT_DIR = DATA_DIR / "outputs" JOBS_DIR = DATA_DIR / "jobs" SCRIPTS_DIR = Path(__file__).parent.parent / "scripts" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) JOBS_DIR.mkdir(parents=True, exist_ok=True) AUTH_USER = os.environ.get("AUTH_USER", "sebastjan") AUTH_PASS = os.environ.get("AUTH_PASS", "change-me-in-coolify-env") MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "2000")) # ──────────────────────────────────────────────────────────────── # Auth # ──────────────────────────────────────────────────────────────── security = HTTPBasic() def check_auth(creds: HTTPBasicCredentials = Depends(security)): correct_user = secrets.compare_digest(creds.username, AUTH_USER) correct_pass = secrets.compare_digest(creds.password, AUTH_PASS) if not (correct_user and correct_pass): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Napačno geslo", headers={"WWW-Authenticate": "Basic"}, ) return creds.username # ──────────────────────────────────────────────────────────────── # Job state (filesystem-based, persistent prek restartov) # ──────────────────────────────────────────────────────────────── def job_path(job_id): return JOBS_DIR / f"{job_id}.json" def load_job(job_id): p = job_path(job_id) if not p.exists(): return None return json.loads(p.read_text()) def save_job(job): job_path(job["id"]).write_text(json.dumps(job, ensure_ascii=False, indent=2)) def update_job(job_id, **kwargs): job = load_job(job_id) if not job: return None job.update(kwargs) job["updated_at"] = time.time() save_job(job) return job def list_jobs(): out = [] for f in sorted(JOBS_DIR.glob("*.json"), reverse=True): try: out.append(json.loads(f.read_text())) except Exception: pass return out # ──────────────────────────────────────────────────────────────── # Pipeline runner (background task) # ──────────────────────────────────────────────────────────────── def run_subprocess_logged(cmd, job_id, step_name): """Pokliče subprocess, logi gredo v job.""" update_job(job_id, current_step=step_name, status="processing") proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: # Combine stdout + stderr za diagnostiko err_msg = (proc.stderr or "") + "\n" + (proc.stdout or "") update_job( job_id, status="failed", error=f"{step_name}: {err_msg[-800:].strip()}", ) return False # Tudi pri success-u beleži stderr za diagnostiko (samo zadnji del) if proc.stderr and proc.stderr.strip(): update_job(job_id, last_step_log=proc.stderr[-500:].strip()) return True def process_job(job_id): """Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs.""" job = load_job(job_id) if not job: return try: # ── 1. Source preparation ───────────────────────────── if job["source_type"] == "youtube": update_job(job_id, status="downloading", current_step="YouTube download") input_path = UPLOAD_DIR / f"{job_id}_yt.mp4" cmd = [ "python3", str(SCRIPTS_DIR / "yt_download.py"), job["youtube_url"], str(input_path), ] if not run_subprocess_logged(cmd, job_id, "YouTube download"): return update_job(job_id, input_path=str(input_path)) else: input_path = Path(job["input_path"]) # ── 2. Smart analysis (če auto_chorus) ────────────────────────── if job.get("auto_chorus"): update_job(job_id, current_step="Analiza pesmi (transkript + energija)") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" cmd = [ "python3", str(SCRIPTS_DIR / "analyze.py"), str(input_path), "--target-duration", str(job.get("duration", 30)), "--max-duration", str(job.get("max_duration", 45)), "--min-duration", str(job.get("min_duration", 20)), "--output", str(analysis_path), ] if job.get("include_prebuild"): cmd += ["--include-prebuild"] # lang: če None ali 'auto', pusti analyze.py auto-detect if job.get("lang") and job["lang"] not in ("auto", ""): cmd += ["--lang", job["lang"]] cmd += ["--model", job.get("whisper_model", "small")] proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode == 0 and analysis_path.exists(): try: with open(analysis_path, "r", encoding="utf-8") as f: analysis = json.load(f) cr = analysis["clip_range"] fade = analysis["fade"] update_job( job_id, analysis_summary={ "language": analysis["language"], "language_probability": analysis["language_probability"], "instrumental": analysis["instrumental"], "clip_range": cr, "fade": fade, "chorus_preview": analysis["chorus"]["best"]["text_preview"] if analysis.get("chorus") and analysis["chorus"].get("best") else None, "video_duration": analysis.get("video_duration"), "candidates": analysis["chorus"].get("all_candidates", [])[:5] if analysis.get("chorus") else [], }, # Cel transkript shranimo za UI prikaz full_transcript=[ {"start": s["start"], "end": s["end"], "text": s["text"]} for s in analysis.get("transcript", {}).get("segments", []) ], start=cr["start"], duration=cr["duration"], fade_in=fade["fade_in"], fade_out=fade["fade_out"], detected_language=analysis["language"], is_instrumental=analysis["instrumental"], ) # Auto-disable subs za instrumental if analysis["instrumental"] and not job.get("no_subs"): update_job(job_id, no_subs=True, auto_disabled_subs=True) # Reload local dict job = load_job(job_id) except (json.JSONDecodeError, KeyError) as e: update_job(job_id, chorus_error=f"Analysis parse: {e}") else: update_job(job_id, chorus_error=(proc.stderr or "")[-500:]) # ── 3. Reframe + subtitles (clip.py orchestrator) ───── output_path = OUTPUT_DIR / f"{job_id}.mp4" update_job(job_id, current_step="Reframe + subtitles") cmd = [ "python3", str(SCRIPTS_DIR / "clip.py"), str(input_path), str(output_path), "--mode", job.get("mode", "track"), "--quality", job.get("quality", "medium"), "--style", job.get("subtitle_style", "reels"), ] if job.get("start") is not None: cmd += ["--start", str(job["start"])] if job.get("duration") is not None: cmd += ["--duration", str(job["duration"])] if job.get("fade_in", 0) > 0: cmd += ["--fade-in", str(job["fade_in"])] if job.get("fade_out", 0) > 0: cmd += ["--fade-out", str(job["fade_out"])] # lang: prefer detected_language če auto chosen_lang = job.get("lang") if chosen_lang in (None, "auto", ""): chosen_lang = job.get("detected_language") if chosen_lang: cmd += ["--lang", chosen_lang] if job.get("no_subs"): cmd += ["--no-subs"] cmd += ["--model", job.get("whisper_model", "small")] # DEBUG: zapiši natanko kakšen ukaz se izvede update_job(job_id, debug_clip_cmd=" ".join(cmd)) if not run_subprocess_logged(cmd, job_id, "Reframe + subtitles"): return # ── Done ────────────────────────────────────────────── if output_path.exists(): update_job( job_id, status="done", current_step="Končano", output_path=str(output_path), output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2), ) else: update_job( job_id, status="failed", error="Output datoteka ne obstaja po obdelavi", ) except Exception as e: update_job(job_id, status="failed", error=str(e)) # ──────────────────────────────────────────────────────────────── # FastAPI app # ──────────────────────────────────────────────────────────────── app = FastAPI(title="Reels Clipper") app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "static"), name="static") @app.get("/", response_class=HTMLResponse) async def index(user: str = Depends(check_auth)): html = (Path(__file__).parent.parent / "templates" / "index.html").read_text() return html @app.get("/healthz") async def healthz(): return {"ok": True} # ──────────────────────────────────────────────────────────────── # Job models # ──────────────────────────────────────────────────────────────── class YouTubeJobIn(BaseModel): url: str mode: str = "track" lang: Optional[str] = None auto_chorus: bool = True start: Optional[float] = None duration: Optional[float] = 30 no_subs: bool = False subtitle_style: str = "reels" whisper_model: str = "small" quality: str = "medium" class StartJobIn(BaseModel): job_id: str mode: str = "track" lang: Optional[str] = None # None/auto = Whisper auto-detect auto_chorus: bool = True include_prebuild: bool = False # vključi pre-chorus build-up start: Optional[float] = None duration: Optional[float] = 30 max_duration: Optional[float] = 45 min_duration: Optional[float] = 20 no_subs: bool = False subtitle_style: str = "reels" whisper_model: str = "small" quality: str = "medium" # ──────────────────────────────────────────────────────────────── # Upload (file) # ──────────────────────────────────────────────────────────────── @app.post("/api/upload") async def upload_video( file: UploadFile = File(...), user: str = Depends(check_auth), ): if not file.filename: raise HTTPException(400, "Brez imena") job_id = uuid.uuid4().hex[:12] ext = Path(file.filename).suffix or ".mp4" input_path = UPLOAD_DIR / f"{job_id}{ext}" size = 0 with input_path.open("wb") as f: while chunk := await file.read(1024 * 1024): size += len(chunk) if size > MAX_UPLOAD_MB * 1024 * 1024: f.close() input_path.unlink(missing_ok=True) raise HTTPException(413, f"Prevelika datoteka (limit {MAX_UPLOAD_MB} MB)") f.write(chunk) job = { "id": job_id, "source_type": "upload", "filename": file.filename, "input_path": str(input_path), "size_mb": round(size / 1024 / 1024, 2), "status": "uploaded", "current_step": "Naloženo, čaka na obdelavo", "created_at": time.time(), "updated_at": time.time(), } save_job(job) return job # ──────────────────────────────────────────────────────────────── # YouTube submit # ──────────────────────────────────────────────────────────────── @app.post("/api/youtube") async def submit_youtube( payload: YouTubeJobIn, background: BackgroundTasks, user: str = Depends(check_auth), ): job_id = uuid.uuid4().hex[:12] job = { "id": job_id, "source_type": "youtube", "youtube_url": payload.url, "status": "queued", "current_step": "V vrsti za YouTube prenos", "created_at": time.time(), "updated_at": time.time(), "mode": payload.mode, "lang": payload.lang, "auto_chorus": payload.auto_chorus, "start": payload.start, "duration": payload.duration, "no_subs": payload.no_subs, "subtitle_style": payload.subtitle_style, "whisper_model": payload.whisper_model, "quality": payload.quality, } save_job(job) background.add_task(process_job, job_id) return job # ──────────────────────────────────────────────────────────────── # Start processing for uploaded job # ──────────────────────────────────────────────────────────────── @app.post("/api/process") async def start_processing( payload: StartJobIn, background: BackgroundTasks, user: str = Depends(check_auth), ): job = load_job(payload.job_id) if not job: raise HTTPException(404, "Job ne obstaja") update_job( payload.job_id, status="queued", mode=payload.mode, lang=payload.lang, auto_chorus=payload.auto_chorus, include_prebuild=payload.include_prebuild, start=payload.start, duration=payload.duration, max_duration=payload.max_duration, min_duration=payload.min_duration, no_subs=payload.no_subs, subtitle_style=payload.subtitle_style, whisper_model=payload.whisper_model, quality=payload.quality, current_step="V vrsti za obdelavo", ) background.add_task(process_job, payload.job_id) return load_job(payload.job_id) # ──────────────────────────────────────────────────────────────── # Job queries # ──────────────────────────────────────────────────────────────── @app.get("/api/jobs") async def get_jobs(user: str = Depends(check_auth)): return {"jobs": list_jobs()} @app.get("/api/jobs/{job_id}") async def get_job(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") return job @app.get("/api/stream/{job_id}") async def stream_job(job_id: str, user: str = Depends(check_auth)): """Server-Sent Events za real-time status.""" async def gen(): last_status = None last_step = None for _ in range(600): # max 10 min stream job = load_job(job_id) if not job: yield f"data: {json.dumps({'error': 'not found'})}\n\n" return if job["status"] != last_status or job.get("current_step") != last_step: yield f"data: {json.dumps(job, ensure_ascii=False)}\n\n" last_status = job["status"] last_step = job.get("current_step") if job["status"] in ("done", "failed"): return await asyncio.sleep(1) return StreamingResponse(gen(), media_type="text/event-stream") # ──────────────────────────────────────────────────────────────── # Download / preview # ──────────────────────────────────────────────────────────────── @app.get("/api/download/{job_id}") async def download(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) if not out.exists(): raise HTTPException(404, "Output ne obstaja") return FileResponse( out, media_type="video/mp4", filename=f"reel_{job_id}.mp4", ) @app.get("/api/preview/{job_id}") async def preview(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) if not out.exists(): raise HTTPException(404, "Output ne obstaja") return FileResponse(out, media_type="video/mp4") @app.delete("/api/jobs/{job_id}") async def delete_job(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") for key in ("input_path", "output_path"): p = job.get(key) if p and Path(p).exists(): Path(p).unlink(missing_ok=True) job_path(job_id).unlink(missing_ok=True) return {"deleted": job_id}