reels-app/app/main.py
Sebastjan Artič 32baf9cd45 Auto-resume: cleanup stuck jobs on container startup + GEMINI_API_KEY env
- @app.on_event(startup) marks all status=processing jobs as error after restart
- Process endpoint now clears chorus_error/interrupted_at on retry (retry-friendly)
- GEMINI_API_KEY added to Coolify env (Gemini 3.1 Pro now active)
- User can now choose Gemini in UI dropdown for analysis
2026-04-29 08:43:31 +00:00

641 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
reels.biba.live — FastAPI backend.
Endpoints:
GET / — frontend HTML
POST /api/upload — naloži video file
POST /api/youtube — submit YouTube URL
POST /api/process/{id} — start processing job
GET /api/jobs — list vseh jobov
GET /api/jobs/{id} — status job-a
GET /api/stream/{id} — SSE progress stream
GET /api/download/{id} — download finalni reel
GET /api/preview/{id} — preview video stream
DELETE /api/jobs/{id} — pobriši job + datoteke
"""
import asyncio
import json
import os
import secrets
import shutil
import subprocess
import time
import uuid
from pathlib import Path
from typing import Optional
from fastapi import (
FastAPI, UploadFile, File, Form, HTTPException, Depends,
BackgroundTasks, Request, status
)
from fastapi.responses import (
FileResponse, HTMLResponse, StreamingResponse, JSONResponse
)
from fastapi.staticfiles import StaticFiles
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel
# ────────────────────────────────────────────────────────────────
# Config
# ────────────────────────────────────────────────────────────────
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
UPLOAD_DIR = DATA_DIR / "uploads"
OUTPUT_DIR = DATA_DIR / "outputs"
JOBS_DIR = DATA_DIR / "jobs"
SCRIPTS_DIR = Path(__file__).parent.parent / "scripts"
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
JOBS_DIR.mkdir(parents=True, exist_ok=True)
AUTH_USER = os.environ.get("AUTH_USER", "sebastjan")
AUTH_PASS = os.environ.get("AUTH_PASS", "change-me-in-coolify-env")
MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "2000"))
# ────────────────────────────────────────────────────────────────
# Auth
# ────────────────────────────────────────────────────────────────
security = HTTPBasic()
def check_auth(creds: HTTPBasicCredentials = Depends(security)):
correct_user = secrets.compare_digest(creds.username, AUTH_USER)
correct_pass = secrets.compare_digest(creds.password, AUTH_PASS)
if not (correct_user and correct_pass):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Napačno geslo",
headers={"WWW-Authenticate": "Basic"},
)
return creds.username
# ────────────────────────────────────────────────────────────────
# Job state (filesystem-based, persistent prek restartov)
# ────────────────────────────────────────────────────────────────
def job_path(job_id):
return JOBS_DIR / f"{job_id}.json"
def load_job(job_id):
p = job_path(job_id)
if not p.exists():
return None
return json.loads(p.read_text())
def save_job(job):
job_path(job["id"]).write_text(json.dumps(job, ensure_ascii=False, indent=2))
def update_job(job_id, **kwargs):
job = load_job(job_id)
if not job:
return None
job.update(kwargs)
job["updated_at"] = time.time()
save_job(job)
return job
def list_jobs():
out = []
for f in sorted(JOBS_DIR.glob("*.json"), reverse=True):
try:
out.append(json.loads(f.read_text()))
except Exception:
pass
return out
def generate_srt_from_segments(segments, clip_start, clip_end, output_path):
"""Generira SRT samo za dele, ki spadajo v [clip_start, clip_end].
Timestamp-i so re-mapirani na 0-based (kot je v trim-anem videu).
Razdeli dolge segmente (>2.5s) na enake kose za hiter pacing v reels stilu.
Vse besedilo VELIKE TISKANE ČRKE.
"""
MAX_CHUNK_DURATION = 2.5
def fmt_ts(s):
h = int(s // 3600)
m = int((s % 3600) // 60)
sec = s % 60
return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",")
lines = []
idx = 1
for seg in segments:
s_start = float(seg["start"])
s_end = float(seg["end"])
text = str(seg["text"]).strip()
# Filter v range
if s_end <= clip_start or s_start >= clip_end:
continue
# Klipni
s_start = max(s_start, clip_start)
s_end = min(s_end, clip_end)
if s_end - s_start < 0.2:
continue
# Re-mapraj na 0-based
rel_start = s_start - clip_start
rel_end = s_end - clip_start
if not text:
continue
text_upper = text.upper()
# Razdeli na chunk-e če je predolg
duration = rel_end - rel_start
if duration <= MAX_CHUNK_DURATION:
lines.append(f"{idx}\n{fmt_ts(rel_start)} --> {fmt_ts(rel_end)}\n{text_upper}\n")
idx += 1
else:
# Razdeli na N enakih kosov; če ima Whisper word-timing, jih lahko razdelimo bolje,
# ampak za zdaj enako razdelimo
n_parts = int(duration / MAX_CHUNK_DURATION) + 1
words = text_upper.split()
words_per_part = max(1, len(words) // n_parts)
chunk_dur = duration / n_parts
for i in range(n_parts):
cs = rel_start + i * chunk_dur
ce = rel_start + (i + 1) * chunk_dur
# Vzemi pripadajoče besede
wstart = i * words_per_part
wend = (i + 1) * words_per_part if i < n_parts - 1 else len(words)
chunk_text = " ".join(words[wstart:wend]) if wstart < len(words) else text_upper
if not chunk_text.strip():
chunk_text = text_upper
lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text.strip()}\n")
idx += 1
with open(output_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
return output_path
# ────────────────────────────────────────────────────────────────
# Pipeline runner (background task)
# ────────────────────────────────────────────────────────────────
def run_subprocess_logged(cmd, job_id, step_name):
"""Pokliče subprocess, logi gredo v job."""
update_job(job_id, current_step=step_name, status="processing")
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
# Combine stdout + stderr za diagnostiko
err_msg = (proc.stderr or "") + "\n" + (proc.stdout or "")
update_job(
job_id,
status="failed",
error=f"{step_name}: {err_msg[-800:].strip()}",
)
return False
# Tudi pri success-u beleži stderr za diagnostiko (samo zadnji del)
if proc.stderr and proc.stderr.strip():
update_job(job_id, last_step_log=proc.stderr[-500:].strip())
return True
def process_job(job_id):
"""Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs."""
job = load_job(job_id)
if not job:
return
try:
# ── 1. Source preparation ─────────────────────────────
if job["source_type"] == "youtube":
update_job(job_id, status="downloading", current_step="YouTube download")
input_path = UPLOAD_DIR / f"{job_id}_yt.mp4"
cmd = [
"python3", str(SCRIPTS_DIR / "yt_download.py"),
job["youtube_url"], str(input_path),
]
if not run_subprocess_logged(cmd, job_id, "YouTube download"):
return
update_job(job_id, input_path=str(input_path))
else:
input_path = Path(job["input_path"])
# ── 2. Smart analysis (če auto_chorus) ──────────────────────────
if job.get("auto_chorus"):
update_job(job_id, current_step="Analiza pesmi (transkript + energija)")
analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json"
cmd = [
"python3", str(SCRIPTS_DIR / "analyze.py"),
str(input_path),
"--target-duration", str(job.get("duration", 30)),
"--max-duration", str(job.get("max_duration", 45)),
"--min-duration", str(job.get("min_duration", 20)),
"--output", str(analysis_path),
]
if job.get("include_prebuild"):
cmd += ["--include-prebuild"]
# LLM provider (claude/gemini/auto)
if job.get("llm_provider"):
cmd += ["--llm-provider", job["llm_provider"]]
if job.get("llm_model"):
cmd += ["--llm-model", job["llm_model"]]
# lang: če None ali 'auto', pusti analyze.py auto-detect
if job.get("lang") and job["lang"] not in ("auto", ""):
cmd += ["--lang", job["lang"]]
cmd += ["--model", job.get("whisper_model", "large-v3")]
proc = subprocess.run(cmd, capture_output=True, text=True)
srt_from_claude = None # Pot do SRT iz Claude-popravljenega transcript-a
if proc.returncode == 0 and analysis_path.exists():
try:
with open(analysis_path, "r", encoding="utf-8") as f:
analysis = json.load(f)
cr = analysis["clip_range"]
fade = analysis["fade"]
# Generiraj SRT iz transcript-a TRIM-ANEGA na clip_range
# (Claude je morda popravil besedilo — uporabi popravljeno)
if analysis.get("transcript", {}).get("segments"):
srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt"
try:
generate_srt_from_segments(
analysis["transcript"]["segments"],
cr["start"], cr["end"],
srt_path_out,
)
srt_from_claude = str(srt_path_out)
print(f"📝 Generated SRT from Claude transcript: {srt_path_out}")
except Exception as e:
print(f"⚠️ SRT generation failed: {e}")
update_job(
job_id,
analysis_summary={
"language": analysis["language"],
"language_probability": analysis["language_probability"],
"instrumental": analysis["instrumental"],
"clip_range": cr,
"fade": fade,
"chorus_preview": analysis["chorus"]["best"]["text_preview"]
if analysis.get("chorus") and analysis["chorus"].get("best") else None,
"video_duration": analysis.get("video_duration"),
"candidates": analysis["chorus"].get("all_candidates", [])[:5]
if analysis.get("chorus") else [],
"claude_corrected_text": analysis.get("claude_corrected_text", False),
},
# Cel transkript shranimo za UI prikaz
full_transcript=[
{"start": s["start"], "end": s["end"], "text": s["text"]}
for s in analysis.get("transcript", {}).get("segments", [])
],
start=cr["start"],
duration=cr["duration"],
fade_in=fade["fade_in"],
fade_out=fade["fade_out"],
detected_language=analysis["language"],
is_instrumental=analysis["instrumental"],
claude_srt_path=srt_from_claude,
)
# Auto-disable subs za instrumental
if analysis["instrumental"] and not job.get("no_subs"):
update_job(job_id, no_subs=True, auto_disabled_subs=True)
# Reload local dict
job = load_job(job_id)
except (json.JSONDecodeError, KeyError) as e:
update_job(job_id, chorus_error=f"Analysis parse: {e}")
else:
update_job(job_id, chorus_error=(proc.stderr or "")[-500:])
# ── 3. Reframe + subtitles (clip.py orchestrator) ─────
output_path = OUTPUT_DIR / f"{job_id}.mp4"
update_job(job_id, current_step="Reframe + subtitles")
cmd = [
"python3", str(SCRIPTS_DIR / "clip.py"),
str(input_path), str(output_path),
"--mode", job.get("mode", "track"),
"--quality", job.get("quality", "medium"),
"--style", job.get("subtitle_style", "reels"),
]
if job.get("start") is not None:
cmd += ["--start", str(job["start"])]
if job.get("duration") is not None:
cmd += ["--duration", str(job["duration"])]
if job.get("fade_in", 0) > 0:
cmd += ["--fade-in", str(job["fade_in"])]
if job.get("fade_out", 0) > 0:
cmd += ["--fade-out", str(job["fade_out"])]
# SRT iz Claude (boljše besedilo) — preda direktno v subtitle.py
if job.get("claude_srt_path") and Path(job["claude_srt_path"]).exists() and not job.get("no_subs"):
cmd += ["--srt", job["claude_srt_path"]]
# lang: prefer detected_language če auto
chosen_lang = job.get("lang")
if chosen_lang in (None, "auto", ""):
chosen_lang = job.get("detected_language")
if chosen_lang:
cmd += ["--lang", chosen_lang]
if job.get("no_subs"):
cmd += ["--no-subs"]
cmd += ["--model", job.get("whisper_model", "large-v3")]
# DEBUG: zapiši natanko kakšen ukaz se izvede
update_job(job_id, debug_clip_cmd=" ".join(cmd))
if not run_subprocess_logged(cmd, job_id, "Reframe + subtitles"):
return
# ── Done ──────────────────────────────────────────────
if output_path.exists():
update_job(
job_id,
status="done",
current_step="Končano",
output_path=str(output_path),
output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
)
else:
update_job(
job_id,
status="failed",
error="Output datoteka ne obstaja po obdelavi",
)
except Exception as e:
update_job(job_id, status="failed", error=str(e))
# ────────────────────────────────────────────────────────────────
# FastAPI app
# ────────────────────────────────────────────────────────────────
app = FastAPI(title="Reels Clipper")
app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "static"), name="static")
@app.on_event("startup")
async def cleanup_stuck_jobs():
"""Ob startu containerja: označi vse 'processing' jobs kot prekinjene.
Ko Coolify deployа nov container, prejšnji se ubije sredi obdelave,
JSON file pa ostane status='processing'. Tukaj preverimo in počistimo.
"""
print("🔄 Preverjam stuck jobs...")
cleaned = 0
for f in JOBS_DIR.glob("*.json"):
try:
j = json.loads(f.read_text())
if j.get("status") == "processing":
j["status"] = "error"
j["current_step"] = "Prekinjeno (container restart) — naloži ponovno"
j["chorus_error"] = "Container restart during deploy. Napaka ni vaša — obnovite z gumbom Process."
j["interrupted_at"] = time.time()
j["updated_at"] = time.time()
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
cleaned += 1
except Exception as e:
print(f" ⚠️ Napaka pri {f.name}: {e}")
if cleaned > 0:
print(f" ✅ Označenih {cleaned} prekinjenih jobs")
else:
print(" 👍 Ni stuck jobs")
@app.get("/", response_class=HTMLResponse)
async def index(user: str = Depends(check_auth)):
html = (Path(__file__).parent.parent / "templates" / "index.html").read_text()
return html
@app.get("/healthz")
async def healthz():
return {"ok": True}
# ────────────────────────────────────────────────────────────────
# Job models
# ────────────────────────────────────────────────────────────────
class YouTubeJobIn(BaseModel):
url: str
mode: str = "track"
lang: Optional[str] = None
auto_chorus: bool = True
start: Optional[float] = None
duration: Optional[float] = 30
no_subs: bool = False
subtitle_style: str = "reels"
whisper_model: str = "large-v3"
quality: str = "medium"
class StartJobIn(BaseModel):
job_id: str
mode: str = "track"
lang: Optional[str] = None # None/auto = Whisper auto-detect
auto_chorus: bool = True
include_prebuild: bool = False # vključi pre-chorus build-up
start: Optional[float] = None
duration: Optional[float] = 30
max_duration: Optional[float] = 45
min_duration: Optional[float] = 20
no_subs: bool = False
subtitle_style: str = "reels"
whisper_model: str = "large-v3"
quality: str = "medium"
# LLM za semantično analizo + popravke
llm_provider: str = "claude" # claude / gemini / auto
llm_model: Optional[str] = None # specifičen model (privzeto najboljši za provider)
# ────────────────────────────────────────────────────────────────
# Upload (file)
# ────────────────────────────────────────────────────────────────
@app.post("/api/upload")
async def upload_video(
file: UploadFile = File(...),
user: str = Depends(check_auth),
):
if not file.filename:
raise HTTPException(400, "Brez imena")
job_id = uuid.uuid4().hex[:12]
ext = Path(file.filename).suffix or ".mp4"
input_path = UPLOAD_DIR / f"{job_id}{ext}"
size = 0
with input_path.open("wb") as f:
while chunk := await file.read(1024 * 1024):
size += len(chunk)
if size > MAX_UPLOAD_MB * 1024 * 1024:
f.close()
input_path.unlink(missing_ok=True)
raise HTTPException(413, f"Prevelika datoteka (limit {MAX_UPLOAD_MB} MB)")
f.write(chunk)
job = {
"id": job_id,
"source_type": "upload",
"filename": file.filename,
"input_path": str(input_path),
"size_mb": round(size / 1024 / 1024, 2),
"status": "uploaded",
"current_step": "Naloženo, čaka na obdelavo",
"created_at": time.time(),
"updated_at": time.time(),
}
save_job(job)
return job
# ────────────────────────────────────────────────────────────────
# YouTube submit
# ────────────────────────────────────────────────────────────────
@app.post("/api/youtube")
async def submit_youtube(
payload: YouTubeJobIn,
background: BackgroundTasks,
user: str = Depends(check_auth),
):
job_id = uuid.uuid4().hex[:12]
job = {
"id": job_id,
"source_type": "youtube",
"youtube_url": payload.url,
"status": "queued",
"current_step": "V vrsti za YouTube prenos",
"created_at": time.time(),
"updated_at": time.time(),
"mode": payload.mode,
"lang": payload.lang,
"auto_chorus": payload.auto_chorus,
"start": payload.start,
"duration": payload.duration,
"no_subs": payload.no_subs,
"subtitle_style": payload.subtitle_style,
"whisper_model": payload.whisper_model,
"quality": payload.quality,
}
save_job(job)
background.add_task(process_job, job_id)
return job
# ────────────────────────────────────────────────────────────────
# Start processing for uploaded job
# ────────────────────────────────────────────────────────────────
@app.post("/api/process")
async def start_processing(
payload: StartJobIn,
background: BackgroundTasks,
user: str = Depends(check_auth),
):
job = load_job(payload.job_id)
if not job:
raise HTTPException(404, "Job ne obstaja")
update_job(
payload.job_id,
status="queued",
mode=payload.mode,
lang=payload.lang,
auto_chorus=payload.auto_chorus,
include_prebuild=payload.include_prebuild,
start=payload.start,
duration=payload.duration,
max_duration=payload.max_duration,
min_duration=payload.min_duration,
no_subs=payload.no_subs,
subtitle_style=payload.subtitle_style,
whisper_model=payload.whisper_model,
quality=payload.quality,
llm_provider=payload.llm_provider,
llm_model=payload.llm_model,
current_step="V vrsti za obdelavo",
# Počisti pretekle napake (retry-friendly)
chorus_error=None,
interrupted_at=None,
)
background.add_task(process_job, payload.job_id)
return load_job(payload.job_id)
# ────────────────────────────────────────────────────────────────
# Job queries
# ────────────────────────────────────────────────────────────────
@app.get("/api/jobs")
async def get_jobs(user: str = Depends(check_auth)):
return {"jobs": list_jobs()}
@app.get("/api/jobs/{job_id}")
async def get_job(job_id: str, user: str = Depends(check_auth)):
job = load_job(job_id)
if not job:
raise HTTPException(404, "Ne obstaja")
return job
@app.get("/api/stream/{job_id}")
async def stream_job(job_id: str, user: str = Depends(check_auth)):
"""Server-Sent Events za real-time status."""
async def gen():
last_status = None
last_step = None
for _ in range(600): # max 10 min stream
job = load_job(job_id)
if not job:
yield f"data: {json.dumps({'error': 'not found'})}\n\n"
return
if job["status"] != last_status or job.get("current_step") != last_step:
yield f"data: {json.dumps(job, ensure_ascii=False)}\n\n"
last_status = job["status"]
last_step = job.get("current_step")
if job["status"] in ("done", "failed"):
return
await asyncio.sleep(1)
return StreamingResponse(gen(), media_type="text/event-stream")
# ────────────────────────────────────────────────────────────────
# Download / preview
# ────────────────────────────────────────────────────────────────
@app.get("/api/download/{job_id}")
async def download(job_id: str, user: str = Depends(check_auth)):
job = load_job(job_id)
if not job or job.get("status") != "done":
raise HTTPException(404, "Ne pripravljen")
out = Path(job["output_path"])
if not out.exists():
raise HTTPException(404, "Output ne obstaja")
return FileResponse(
out,
media_type="video/mp4",
filename=f"reel_{job_id}.mp4",
)
@app.get("/api/preview/{job_id}")
async def preview(job_id: str, user: str = Depends(check_auth)):
job = load_job(job_id)
if not job or job.get("status") != "done":
raise HTTPException(404, "Ne pripravljen")
out = Path(job["output_path"])
if not out.exists():
raise HTTPException(404, "Output ne obstaja")
return FileResponse(out, media_type="video/mp4")
@app.delete("/api/jobs/{job_id}")
async def delete_job(job_id: str, user: str = Depends(check_auth)):
job = load_job(job_id)
if not job:
raise HTTPException(404, "Ne obstaja")
for key in ("input_path", "output_path"):
p = job.get(key)
if p and Path(p).exists():
Path(p).unlink(missing_ok=True)
job_path(job_id).unlink(missing_ok=True)
return {"deleted": job_id}