reels-app/app/main.py
Sebastjan Artič 7cb4302dcd Edit feature: slider + napis edit + recut endpoint
User insight: 'treba je narediti da ko se reels naredijo da jih lahko
popravljamo... delamo na avtomatiko ampak lahk pa tudi popravljam'

Avto pipeline ostane (Soniox + Claude + render). Po render-u uporabnik
lahko klikne ✏️ Edit gumb in:

1. **Slider za clip start/end**:
   - Vidi 16:9 original video
   - Drag start/end slider z živim preview-om
   - Dolžina prikazana real-time
   - Min 5s, max 60s

2. **Edit napisov** (collapsed, opcijsko):
   - Klik na vrstico → input za popravek besedila
   - Original timestamp ostane, samo besedilo se posodobi
   - Uporabno za 'doline IZBOR' → 'doline IZPOD' tip popravkov

3. **Re-render**:
   - Backend POST /api/jobs/{id}/recut z {start, end, custom_segments}
   - Worker preskoči Soniox + Claude (custom_clip flag)
   - Re-uporabi cached transcript + analysis
   - Re-render samo: clip → reframe → subtitle → output
   - ~30s namesto 3-5 min

New endpoints:
- GET /api/source-video/{id} — 16:9 original za editor preview
- GET /api/transcript/{id} — segmenti + clip range za editor
- POST /api/jobs/{id}/recut — re-render z user timestampi

Worker change: če job ima custom_clip=True, preskoči auto_chorus
analizo in samo re-uporabi obstoječi clip_range iz analysis.json
(updated by recut endpoint).
2026-04-30 10:26:25 +00:00

1359 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
reels.biba.live — FastAPI backend.
Endpoints:
GET / — frontend HTML
POST /api/upload — naloži video file
POST /api/youtube — submit YouTube URL
POST /api/process/{id} — start processing job
GET /api/jobs — list vseh jobov
GET /api/jobs/{id} — status job-a
GET /api/stream/{id} — SSE progress stream
GET /api/download/{id} — download finalni reel
GET /api/preview/{id} — preview video stream
DELETE /api/jobs/{id} — pobriši job + datoteke
"""
import asyncio
import json
import os
import secrets
import shutil
import subprocess
import time
import uuid
from pathlib import Path
from typing import Optional
from fastapi import (
FastAPI, UploadFile, File, Form, HTTPException, Depends,
BackgroundTasks, Request, status
)
from fastapi.responses import (
FileResponse, HTMLResponse, StreamingResponse, JSONResponse, Response
)
from fastapi.staticfiles import StaticFiles
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel
# ────────────────────────────────────────────────────────────────
# Config
# ────────────────────────────────────────────────────────────────
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
UPLOAD_DIR = DATA_DIR / "uploads"
OUTPUT_DIR = DATA_DIR / "outputs"
JOBS_DIR = DATA_DIR / "jobs"
SCRIPTS_DIR = Path(__file__).parent.parent / "scripts"
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
JOBS_DIR.mkdir(parents=True, exist_ok=True)
AUTH_USER = os.environ.get("AUTH_USER", "sebastjan")
AUTH_PASS = os.environ.get("AUTH_PASS", "change-me-in-coolify-env")
MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "2000"))
# ────────────────────────────────────────────────────────────────
# Auth
# ────────────────────────────────────────────────────────────────
security = HTTPBasic()
def check_auth(creds: HTTPBasicCredentials = Depends(security)):
correct_user = secrets.compare_digest(creds.username, AUTH_USER)
correct_pass = secrets.compare_digest(creds.password, AUTH_PASS)
if not (correct_user and correct_pass):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Napačno geslo",
headers={"WWW-Authenticate": "Basic"},
)
return creds.username
# ────────────────────────────────────────────────────────────────
# Artist + title parsing iz filename / YouTube title
# ────────────────────────────────────────────────────────────────
import re
_NOISE_PATTERNS = [
# Variacije "official" z možnimi tipkarskimi napakami (offiicial, offical, oficial, ...)
# Match liberalno: "off" + 0-3 dodatnih črk + "icial" + "video"/"audio" + opcijsko številka
r"\(Off[a-z]*icial\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\s*\)",
r"\(Off[a-z]*icia[lk]\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\)",
r"\bOff[a-z]*icial\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\b",
# Nemške variacije
r"\(Offizielles?\s+(?:Musik)?[Vv]ideo\)",
r"\bOffizielles?\s+(?:Musik)?[Vv]ideo\b",
# Lyric videos
r"\(Lyric[s]?\s+Video\)",
r"\bLyric[s]?\s+Video\b",
# Audio quality / version markers
r"\(Audio\)",
r"\(HD\)", r"\(HQ\)", r"\(4K\)", r"\(8K\)", r"\(1080p?\)",
r"\(Live\)", r"\(Remix\)", r"\(Cover\)", r"\(Acoustic\)",
r"\(Remastered\)", r"\(Remaster(?:ed)?\s*\d{0,4}\)",
r"\(Extended(?:\s+Mix)?\)",
r"\(Radio(?:\s+Edit)?\)",
r"\(Clean(?:\s+Version)?\)",
r"\(Explicit\)",
# Square brackets variations
r"\[Official[^\]]*\]",
r"\[Music[^\]]*\]",
r"\[Audio[^\]]*\]",
r"\[HD\]", r"\[HQ\]", r"\[4K\]",
r"\[Lyric[s]?[^\]]*\]",
# Bare words (without brackets)
r"\boriginal\s+(?:video|audio)\b",
r"\bMV\b",
# Trailing year in parens (npr. "(2024)")
r"\(\d{4}\)\s*$",
]
def parse_artist_title(filename_or_title):
"""Iz imena datoteke / YouTube naslova ekstrahira (artist, title).
Podpira pogoste vzorce:
- "Artist - Title.mp4"
- "Artist - Title (Official Music Video).mp4"
- "Artist Title" (em-dash)
- "Artist | Title"
Vrne (artist, title) ali (None, None) če ni razvidno.
"""
if not filename_or_title:
return (None, None)
# Odstrani extension
name = Path(filename_or_title).stem if "." in filename_or_title else filename_or_title
# Odstrani noise patterns
for pat in _NOISE_PATTERNS:
name = re.sub(pat, "", name, flags=re.IGNORECASE)
# Normaliziraj presledke
name = re.sub(r"\s+", " ", name).strip()
# Probaj različne separatorje
for sep in [" - ", " ", "", " | ", " : "]:
if sep in name:
parts = name.split(sep, 1)
artist = parts[0].strip()
title = parts[1].strip()
# Strip trailing/leading puncutation
artist = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', artist)
title = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', title)
if artist and title and len(artist) <= 80 and len(title) <= 100:
return (artist, title)
return (None, None)
def safe_filename(s, max_len=80):
"""Naredi varno ime datoteke (brez znakov ki bi razbili FS)."""
if not s:
return ""
# Replace problematic chars with safe alternative
s = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', s)
s = re.sub(r'\s+', ' ', s).strip()
return s[:max_len]
def clean_noise(s):
"""Odstrani 'noise' (Official Video itd.) iz besedila - tudi že-parsed values."""
if not s:
return s
cleaned = s
for pat in _NOISE_PATTERNS:
cleaned = re.sub(pat, "", cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r"\s+", " ", cleaned).strip()
cleaned = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', cleaned)
return cleaned
def build_download_filename(job):
"""Sestavi pravilno ime download datoteke iz job metadata."""
# Najprej probaj job-shranjene parsed values
artist = clean_noise(job.get("parsed_artist"))
title = clean_noise(job.get("parsed_title"))
# Fallback: parse from filename
if not artist or not title:
source = job.get("filename") or job.get("youtube_title") or ""
parsed_artist, parsed_title = parse_artist_title(source)
artist = artist or parsed_artist
title = title or parsed_title
if artist and title:
return f"{safe_filename(artist)} - {safe_filename(title)} - REEL.mp4"
if title:
return f"{safe_filename(title)} - REEL.mp4"
# Last resort: job ID (vendar to bi se moralo preprečiti že ob upload-u)
return f"reel_{job['id']}.mp4"
# ────────────────────────────────────────────────────────────────
# Job state (filesystem-based, persistent prek restartov)
# ────────────────────────────────────────────────────────────────
def job_path(job_id):
return JOBS_DIR / f"{job_id}.json"
def load_job(job_id):
p = job_path(job_id)
if not p.exists():
return None
return json.loads(p.read_text())
def save_job(job):
job_path(job["id"]).write_text(json.dumps(job, ensure_ascii=False, indent=2))
def update_job(job_id, **kwargs):
job = load_job(job_id)
if not job:
return None
job.update(kwargs)
job["updated_at"] = time.time()
save_job(job)
return job
def _clean_job_titles(job):
"""Očisti 'Official Video' ipd. iz parsed_title v real-time (brez perzistiranja)."""
if not job:
return job
if job.get("parsed_title"):
cleaned = clean_noise(job["parsed_title"])
if cleaned and cleaned != job["parsed_title"]:
job["parsed_title"] = cleaned
if job.get("parsed_artist"):
cleaned = clean_noise(job["parsed_artist"])
if cleaned and cleaned != job["parsed_artist"]:
job["parsed_artist"] = cleaned
return job
def list_jobs():
out = []
for f in sorted(JOBS_DIR.glob("*.json"), reverse=True):
try:
out.append(_clean_job_titles(json.loads(f.read_text())))
except Exception:
pass
return out
def generate_srt_from_segments(segments, clip_start, clip_end, output_path):
"""Generira SRT samo za dele, ki spadajo v [clip_start, clip_end].
Timestamp-i so re-mapirani na 0-based (kot je v trim-anem videu).
Razdeli dolge segmente (>2.5s) na enake kose za hiter pacing v reels stilu.
Vse besedilo VELIKE TISKANE ČRKE.
"""
MAX_CHUNK_DURATION = 2.5
def fmt_ts(s):
h = int(s // 3600)
m = int((s % 3600) // 60)
sec = s % 60
return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",")
lines = []
idx = 1
for seg in segments:
s_start = float(seg["start"])
s_end = float(seg["end"])
text = str(seg["text"]).strip()
words = seg.get("words", []) or []
# Filter v range
if s_end <= clip_start or s_start >= clip_end:
continue
# ── HALLUCINATION FILTER ──
# STT (Scribe, Whisper) občasno halucinira pri dolgih instrumentalih:
# vrne segment 60-100s z 1-2 besedama. Tak segment ne smemo dati v SRT.
# Pravilo: če je segment > 15s IN ima < 5 besed (ali words array < 5),
# je verjetno halucinacija.
seg_dur = s_end - s_start
word_count = len(words) if words else len(text.split())
if seg_dur > 15 and word_count < 5:
print(f"[SRT] Preskočil halucinacijski segment [{s_start:.1f}-{s_end:.1f}] "
f"({seg_dur:.1f}s, {word_count} besed): {text[:50]!r}", flush=True)
continue
# Pripravi words_in_clip vedno (če imamo word-level timestampe)
# Uporabili ga bomo tako za segment trim kot za chunk boundaries
words_in_clip = None
if words:
words_in_clip = []
for w in words:
w_start = float(w.get("start", 0))
w_end = float(w.get("end", 0))
w_text = w.get("text", "").strip()
if not w_text:
continue
if w_end > clip_start and w_start < clip_end:
words_in_clip.append({
"start": max(w_start, clip_start),
"end": min(w_end, clip_end),
"text": w_text,
})
# Če segment delno štrli iz clip range-a IN imamo word-level timestampe,
# uporabi samo tiste besede ki dejansko padejo v clip range
if words_in_clip and (s_start < clip_start or s_end > clip_end):
if not words_in_clip:
continue
# Reconstruiraj segment z dejanskim word-level timing-om
text = " ".join(w["text"] for w in words_in_clip)
s_start = words_in_clip[0]["start"]
s_end = words_in_clip[-1]["end"]
else:
# Klipni segment začetek/konec na clip range
s_start = max(s_start, clip_start)
s_end = min(s_end, clip_end)
if s_end - s_start < 0.2:
continue
# Re-mapraj na 0-based
rel_start = s_start - clip_start
rel_end = s_end - clip_start
if not text:
continue
text_upper = text.upper()
# Razdeli na chunk-e če je predolg
duration = rel_end - rel_start
if duration <= MAX_CHUNK_DURATION:
lines.append(f"{idx}\n{fmt_ts(rel_start)} --> {fmt_ts(rel_end)}\n{text_upper}\n")
idx += 1
else:
# ── WORD-LEVEL CHUNKING ──
# Če imamo word-level timestampe (Soniox/Scribe), uporabi DEJANSKE čase besed
# za chunk boundaries (NE enake time chunks, ker pevec ne poje enakomerno).
if words_in_clip and len(words_in_clip) >= 2:
# Group besede v chunke z max trajanjem MAX_CHUNK_DURATION
# Pravila:
# 1. Vsak chunk traja max MAX_CHUNK_DURATION (2.5s)
# 2. NE začni novega chunka če bi prejšnji ostal kratek (<3 besede ali <12 znakov)
# 3. NE pusti zadnji chunk siroto z 1 kratko besedo (<3 znakov) — združi nazaj
MIN_CHUNK_WORDS = 3
MIN_CHUNK_CHARS = 12
chunks = []
current_chunk = [words_in_clip[0]]
for w in words_in_clip[1:]:
chunk_start_time = current_chunk[0]["start"]
chunk_dur_so_far = w["end"] - chunk_start_time
# Bi začeli nov chunk?
if chunk_dur_so_far > MAX_CHUNK_DURATION:
# Preveri ali bi current_chunk ostal "siroto kratek"
current_text = " ".join(c["text"] for c in current_chunk)
too_short = (len(current_chunk) < MIN_CHUNK_WORDS
or len(current_text) < MIN_CHUNK_CHARS)
if too_short:
# Daj še to besedo zraven (raje malo predolg chunk kot siroto kratek)
current_chunk.append(w)
else:
chunks.append(current_chunk)
current_chunk = [w]
else:
current_chunk.append(w)
if current_chunk:
# Če zadnji chunk ima samo 1-2 kratki besedi, združi z zadnjim chunkom
last_text = " ".join(c["text"] for c in current_chunk)
if (chunks and
(len(current_chunk) < 2 or len(last_text) < 5)):
chunks[-1].extend(current_chunk)
else:
chunks.append(current_chunk)
# Generiraj SRT iz chunks (z dejanskimi word timestampi)
for chunk in chunks:
cs = chunk[0]["start"] - clip_start
ce = chunk[-1]["end"] - clip_start
chunk_text = " ".join(w["text"] for w in chunk).upper().strip()
if not chunk_text:
continue
lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text}\n")
idx += 1
else:
# Fallback: brez word-level timestampov, razdeli enako
n_parts = int(duration / MAX_CHUNK_DURATION) + 1
words_split = text_upper.split()
words_per_part = max(1, len(words_split) // n_parts)
chunk_dur = duration / n_parts
for i in range(n_parts):
cs = rel_start + i * chunk_dur
ce = rel_start + (i + 1) * chunk_dur
wstart = i * words_per_part
wend = (i + 1) * words_per_part if i < n_parts - 1 else len(words_split)
chunk_text = " ".join(words_split[wstart:wend]) if wstart < len(words_split) else text_upper
if not chunk_text.strip():
chunk_text = text_upper
lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text.strip()}\n")
idx += 1
with open(output_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
return output_path
# ────────────────────────────────────────────────────────────────
# Pipeline runner (background task)
# ────────────────────────────────────────────────────────────────
def run_subprocess_logged(cmd, job_id, step_name):
"""Pokliče subprocess, logi gredo v job."""
update_job(job_id, current_step=step_name, status="processing")
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
# Combine stdout + stderr za diagnostiko
err_msg = (proc.stderr or "") + "\n" + (proc.stdout or "")
update_job(
job_id,
status="failed",
error=f"{step_name}: {err_msg[-800:].strip()}",
)
return False
# Tudi pri success-u beleži stderr za diagnostiko (samo zadnji del)
if proc.stderr and proc.stderr.strip():
update_job(job_id, last_step_log=proc.stderr[-500:].strip())
return True
def process_job(job_id):
"""Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs."""
job = load_job(job_id)
if not job:
return
try:
# ── 1. Source preparation ─────────────────────────────
if job["source_type"] == "youtube":
update_job(job_id, status="downloading", current_step="YouTube download")
input_path = UPLOAD_DIR / f"{job_id}_yt.mp4"
cmd = [
"python3", str(SCRIPTS_DIR / "yt_download.py"),
job["youtube_url"], str(input_path),
]
if not run_subprocess_logged(cmd, job_id, "YouTube download"):
return
update_job(job_id, input_path=str(input_path))
# Probaj dobiti YT naslov za artist+title parsing
try:
info_cmd = [
"python3", str(SCRIPTS_DIR / "yt_download.py"),
job["youtube_url"], "/dev/null", "--info-only",
]
proc = subprocess.run(info_cmd, capture_output=True, text=True, timeout=30)
if proc.returncode == 0 and proc.stdout:
info = json.loads(proc.stdout)
yt_title = info.get("title", "")
if yt_title:
a, t = parse_artist_title(yt_title)
updates = {"youtube_title": yt_title}
if a:
updates["parsed_artist"] = a
if t:
updates["parsed_title"] = t
updates["has_clean_name"] = bool(a and t)
update_job(job_id, **updates)
# Reload job for downstream use
job = load_job(job_id)
except Exception as e:
print(f"⚠️ Cannot fetch YT title: {e}", flush=True)
else:
input_path = Path(job["input_path"])
# ── 1b. Music recognition (ACRCloud) — če nimamo artist+title ─────
# Tudi za YouTube jobs lahko naslov ni razviden (npr. iz playliste, "Track 5")
if not (job.get("parsed_artist") and job.get("parsed_title")):
update_job(job_id, current_step="Avto-prepoznavam pesem (ACRCloud)")
try:
acr_cmd = [
"python3", str(SCRIPTS_DIR / "acr_recognize.py"),
str(input_path),
]
proc = subprocess.run(acr_cmd, capture_output=True, text=True, timeout=120)
if proc.returncode == 0 and proc.stdout:
data = json.loads(proc.stdout)
a, t = data.get("artist"), data.get("title")
if a and t:
update_job(
job_id,
parsed_artist=a, parsed_title=t,
has_clean_name=True,
recognized_via="acrcloud",
)
job = load_job(job_id)
print(f"✅ ACR prepoznal: {a} - {t}", flush=True)
else:
print(f"⚠️ ACR ni prepoznal pesmi", flush=True)
else:
print(f"⚠️ ACR exit {proc.returncode}: {proc.stderr[:200]}", flush=True)
except Exception as e:
print(f"⚠️ ACR error: {e}", flush=True)
# ── 2. Smart analysis (če auto_chorus) ──────────────────────────
# ÄŒe je custom_clip=True (user-edit mode), preskoÄi analizo
# in samo posodobi start/duration iz obstojeÄega clip_range v analysis.json
if job.get("custom_clip"):
update_job(job_id, current_step="User-edit recut (preskočim analizo)")
analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json"
srt_from_claude = None
if analysis_path.exists():
try:
with open(analysis_path, "r", encoding="utf-8") as f:
analysis = json.load(f)
cr = analysis["clip_range"]
fade = analysis.get("fade", {"fade_in": 0.05, "fade_out": 0.5})
# Generiraj nov SRT iz obstoječega transkripta na novem range-u
if analysis.get("transcript", {}).get("segments") and not job.get("no_subs"):
srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt"
try:
# Če imamo custom_subtitle_segments (user popravil napise)
segs_to_use = analysis.get("custom_subtitle_segments") or analysis["transcript"]["segments"]
generate_srt_from_segments(
segs_to_use,
cr["start"], cr["end"],
srt_path_out,
)
srt_from_claude = str(srt_path_out)
except Exception as e:
print(f"⚠️ Recut SRT generation failed: {e}", flush=True)
update_job(
job_id,
start=cr["start"],
duration=cr["duration"],
fade_in=fade.get("fade_in", 0.05),
fade_out=fade.get("fade_out", 0.5),
claude_srt_path=srt_from_claude,
)
job = load_job(job_id)
except Exception as e:
update_job(job_id, status="failed", error=f"Recut analysis read: {e}")
return
else:
update_job(job_id, status="failed", error="Analysis manjka za recut")
return
elif job.get("auto_chorus"):
update_job(job_id, current_step="Analiza pesmi (transkript + energija)")
analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json"
cmd = [
"python3", str(SCRIPTS_DIR / "analyze.py"),
str(input_path),
"--target-duration", str(job.get("duration", 30)),
"--max-duration", str(job.get("max_duration", 45)),
"--min-duration", str(job.get("min_duration", 20)),
"--output", str(analysis_path),
]
if job.get("include_prebuild"):
cmd += ["--include-prebuild"]
# LLM provider (claude/gemini/auto)
if job.get("llm_provider"):
cmd += ["--llm-provider", job["llm_provider"]]
if job.get("llm_model"):
cmd += ["--llm-model", job["llm_model"]]
# Filename hint za Claude/Scribe — preferiraj parsed artist+title (čistejše)
if job.get("parsed_artist") and job.get("parsed_title"):
fn_hint = f"{job['parsed_artist']} - {job['parsed_title']}"
cmd += ["--filename-hint", fn_hint]
elif job.get("filename"):
fn_hint = Path(job["filename"]).stem
cmd += ["--filename-hint", fn_hint]
# STT provider (elevenlabs = Scribe, local = faster-whisper, auto = preferiraj Scribe)
if job.get("whisper_provider"):
cmd += ["--whisper-provider", job["whisper_provider"]]
# lang: če None ali 'auto', pusti analyze.py auto-detect
if job.get("lang") and job["lang"] not in ("auto", ""):
cmd += ["--lang", job["lang"]]
cmd += ["--model", job.get("whisper_model", "large-v3")]
proc = subprocess.run(cmd, capture_output=True, text=True)
srt_from_claude = None # Pot do SRT iz Claude-popravljenega transcript-a
if proc.returncode == 0 and analysis_path.exists():
try:
with open(analysis_path, "r", encoding="utf-8") as f:
analysis = json.load(f)
cr = analysis["clip_range"]
fade = analysis["fade"]
# Generiraj SRT iz transcript-a TRIM-ANEGA na clip_range
# (Claude je morda popravil besedilo — uporabi popravljeno)
if analysis.get("transcript", {}).get("segments"):
srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt"
try:
generate_srt_from_segments(
analysis["transcript"]["segments"],
cr["start"], cr["end"],
srt_path_out,
)
srt_from_claude = str(srt_path_out)
llm_src = cr.get("source", "LLM")
print(f"📝 Generated SRT from {llm_src} transcript: {srt_path_out}")
except Exception as e:
print(f"⚠️ SRT generation failed: {e}")
update_job(
job_id,
analysis_summary={
"language": analysis["language"],
"language_probability": analysis["language_probability"],
"instrumental": analysis["instrumental"],
"clip_range": cr,
"fade": fade,
"chorus_preview": analysis["chorus"]["best"]["text_preview"]
if analysis.get("chorus") and analysis["chorus"].get("best") else None,
"video_duration": analysis.get("video_duration"),
"candidates": analysis["chorus"].get("all_candidates", [])[:5]
if analysis.get("chorus") else [],
"claude_corrected_text": analysis.get("claude_corrected_text", False),
},
# Cel transkript shranimo za UI prikaz
full_transcript=[
{"start": s["start"], "end": s["end"], "text": s["text"]}
for s in analysis.get("transcript", {}).get("segments", [])
],
start=cr["start"],
duration=cr["duration"],
fade_in=fade["fade_in"],
fade_out=fade["fade_out"],
detected_language=analysis["language"],
is_instrumental=analysis["instrumental"],
claude_srt_path=srt_from_claude,
)
# Auto-disable subs za instrumental
if analysis["instrumental"] and not job.get("no_subs"):
update_job(job_id, no_subs=True, auto_disabled_subs=True)
# Reload local dict
job = load_job(job_id)
except (json.JSONDecodeError, KeyError) as e:
update_job(job_id, chorus_error=f"Analysis parse: {e}")
else:
update_job(job_id, chorus_error=(proc.stderr or "")[-500:])
# ── 3. Reframe + subtitles (clip.py orchestrator) ─────
output_path = OUTPUT_DIR / f"{job_id}.mp4"
update_job(job_id, current_step="Reframe + subtitles")
cmd = [
"python3", str(SCRIPTS_DIR / "clip.py"),
str(input_path), str(output_path),
"--mode", job.get("mode", "track"),
"--quality", job.get("quality", "medium"),
"--style", job.get("subtitle_style", "reels"),
]
if job.get("start") is not None:
cmd += ["--start", str(job["start"])]
if job.get("duration") is not None:
cmd += ["--duration", str(job["duration"])]
if job.get("fade_in", 0) > 0:
cmd += ["--fade-in", str(job["fade_in"])]
if job.get("fade_out", 0) > 0:
cmd += ["--fade-out", str(job["fade_out"])]
# SRT iz Claude (boljše besedilo) — preda direktno v subtitle.py
if job.get("claude_srt_path") and Path(job["claude_srt_path"]).exists() and not job.get("no_subs"):
cmd += ["--srt", job["claude_srt_path"]]
# lang: prefer detected_language če auto
chosen_lang = job.get("lang")
if chosen_lang in (None, "auto", ""):
chosen_lang = job.get("detected_language")
if chosen_lang:
cmd += ["--lang", chosen_lang]
if job.get("no_subs"):
cmd += ["--no-subs"]
cmd += ["--model", job.get("whisper_model", "large-v3")]
# DEBUG: zapiši natanko kakšen ukaz se izvede
update_job(job_id, debug_clip_cmd=" ".join(cmd))
if not run_subprocess_logged(cmd, job_id, "Reframe + subtitles"):
return
# ── Done ──────────────────────────────────────────────
if output_path.exists():
update_job(
job_id,
status="done",
current_step="Končano",
output_path=str(output_path),
output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
)
# Telegram obvestilo
try:
from app.telegram import notify_job_done
final_job = load_job(job_id)
notify_job_done(final_job)
except Exception as e:
print(f"⚠️ TG notify_job_done failed: {e}", flush=True)
# Batch tracking — če je zadnji v batchu, pošlji summary
_try_finalize_batch(job_id)
else:
update_job(
job_id,
status="failed",
error="Output datoteka ne obstaja po obdelavi",
)
try:
from app.telegram import notify_job_failed
notify_job_failed(load_job(job_id), "Output datoteka ne obstaja")
except Exception:
pass
_try_finalize_batch(job_id)
except Exception as e:
update_job(job_id, status="failed", error=str(e))
try:
from app.telegram import notify_job_failed
notify_job_failed(load_job(job_id), str(e))
except Exception:
pass
_try_finalize_batch(job_id)
def _try_finalize_batch(job_id):
"""Če je job del batch-a, preveri če so vsi zaključili in pošlji summary."""
try:
job = load_job(job_id)
batch_id = job.get("batch_id")
if not batch_id:
return
# Preštej batch jobe
all_jobs = [load_job(jp.stem) for jp in JOBS_DIR.glob("*.json")]
batch_jobs = [j for j in all_jobs if j and j.get("batch_id") == batch_id]
unfinished = [j for j in batch_jobs if j.get("status") not in ("done", "failed")]
if unfinished:
return # še niso vsi končani
# Vsi so končani — pošlji summary
total = len(batch_jobs)
succeeded = len([j for j in batch_jobs if j.get("status") == "done"])
failed = total - succeeded
try:
from app.telegram import notify_batch_complete
notify_batch_complete(batch_id, total, succeeded, failed)
except Exception as e:
print(f"⚠️ TG batch summary failed: {e}", flush=True)
except Exception as e:
print(f"⚠️ _try_finalize_batch error: {e}", flush=True)
# ────────────────────────────────────────────────────────────────
# FastAPI app
# ────────────────────────────────────────────────────────────────
app = FastAPI(title="Reels Clipper")
app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "static"), name="static")
# ────────────────────────────────────────────────────────────────
# Queue worker — procesira queued jobe enega za drugim
# ────────────────────────────────────────────────────────────────
import threading
_queue_lock = threading.Lock()
_queue_running = {"current_job": None} # tracked v dict da je accessible iz threadov
def _queue_worker():
"""Background thread ki preverja queued jobe in jih obdeluje 1 po 1.
Teče forever, sleep 3s med iteracijami če ni dela.
"""
print("🚜 Queue worker zagnan", flush=True)
while True:
try:
# Že nekaj v obdelavi? Počakaj.
if _queue_running["current_job"]:
# Preverim ali je status še processing
cur = load_job(_queue_running["current_job"])
if cur and cur.get("status") in ("processing", "downloading"):
time.sleep(3)
continue
# Ni več v obdelavi → sprosti
_queue_running["current_job"] = None
# Najdi naslednjega "queued" joba (FIFO po created_at)
queued_jobs = []
for f in JOBS_DIR.glob("*.json"):
try:
j = json.loads(f.read_text())
if j.get("status") == "queued":
queued_jobs.append(j)
except Exception:
continue
if not queued_jobs:
time.sleep(3)
continue
queued_jobs.sort(key=lambda x: x.get("created_at", 0))
next_job = queued_jobs[0]
job_id = next_job["id"]
# Mark "processing" + zaženi
with _queue_lock:
_queue_running["current_job"] = job_id
print(f"🚜 Queue worker: obdelujem {job_id}", flush=True)
try:
process_job(job_id)
except Exception as e:
print(f"❌ Queue worker error pri {job_id}: {e}", flush=True)
update_job(job_id, status="failed", error=f"Queue worker: {e}")
finally:
with _queue_lock:
_queue_running["current_job"] = None
except Exception as e:
print(f"❌ Queue worker outer error: {e}", flush=True)
time.sleep(5)
# Zaženi worker v ozadju (samo enkrat)
_worker_thread = None
def _start_queue_worker():
global _worker_thread
if _worker_thread is None or not _worker_thread.is_alive():
_worker_thread = threading.Thread(target=_queue_worker, daemon=True)
_worker_thread.start()
@app.on_event("startup")
async def resume_or_cleanup_jobs():
"""Ob startu containerja: avto-resume processing jobs ali jih označi kot error.
Ko Coolify deploya nov container, prejšnji se ubije sredi obdelave,
JSON file pa ostane status='processing'.
Strategija:
- Če je analyze.json že narejen (analiza je končana) → resume z reframe+subs
- Če ni analyze.json → restart pipeline od začetka
- Po 3 napakah (resume_attempts >= 3) → mark error
"""
import asyncio
print("🔄 Preverjam in obnavljam jobs po restart-u...")
resumed_count = 0
error_count = 0
for f in JOBS_DIR.glob("*.json"):
try:
j = json.loads(f.read_text())
if j.get("status") != "processing":
continue
job_id = j.get("job_id") or f.stem
attempts = j.get("resume_attempts", 0)
# Po 3 neuspehih nehamo
if attempts >= 3:
j["status"] = "error"
j["current_step"] = "Preveč napak pri ponovnem zagonu"
j["chorus_error"] = f"Job restartal {attempts} krat — napaka v pipeline-u"
j["updated_at"] = time.time()
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
error_count += 1
continue
# Preveri ali input file še obstaja
input_path = UPLOAD_DIR / f"{job_id}.mp4"
if not input_path.exists():
j["status"] = "error"
j["current_step"] = "Vhodna datoteka ne obstaja"
j["chorus_error"] = f"Upload {job_id}.mp4 ne obstaja po restart-u"
j["updated_at"] = time.time()
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
error_count += 1
continue
# Resume: status=queued, +1 attempt, in pošlji v background
j["status"] = "queued"
j["resume_attempts"] = attempts + 1
j["current_step"] = f"Avto-resume po restart-u (poskus {attempts + 1}/3)"
j["last_resume_at"] = time.time()
j["updated_at"] = time.time()
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
resumed_count += 1
# Pošlji v background po startup-u (ne smemo blokirati startup)
asyncio.create_task(_resume_job_async(job_id))
print(f" 🔁 Resume {job_id} (attempt {attempts + 1}/3)")
except Exception as e:
print(f" ⚠️ Napaka pri {f.name}: {e}")
print(f" ✅ Resumed: {resumed_count}, Error: {error_count}")
# Zaženi queue worker za queued jobe (multi-upload batch)
_start_queue_worker()
async def _resume_job_async(job_id):
"""Pomožna funkcija ki zažene process_job v background-u."""
import asyncio
# Počakaj kratek čas da je startup končan
await asyncio.sleep(2)
try:
# process_job je sync funkcija, izvedi v thread executor
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, process_job, job_id)
except Exception as e:
print(f" ❌ Resume failed for {job_id}: {e}")
@app.get("/", response_class=HTMLResponse)
async def index(user: str = Depends(check_auth)):
html = (Path(__file__).parent.parent / "templates" / "index.html").read_text()
return html
@app.get("/healthz")
async def healthz():
return {"ok": True}
# ────────────────────────────────────────────────────────────────
# Job models
# ────────────────────────────────────────────────────────────────
class YouTubeJobIn(BaseModel):
url: str
mode: str = "track"
lang: Optional[str] = None
auto_chorus: bool = True
start: Optional[float] = None
duration: Optional[float] = 30
no_subs: bool = False
subtitle_style: str = "reels"
whisper_model: str = "large-v3"
quality: str = "medium"
class StartJobIn(BaseModel):
job_id: str
mode: str = "track"
lang: Optional[str] = None # None/auto = Whisper auto-detect
auto_chorus: bool = True
include_prebuild: bool = False # vključi pre-chorus build-up
start: Optional[float] = None
duration: Optional[float] = 30
max_duration: Optional[float] = 45
min_duration: Optional[float] = 20
no_subs: bool = False
subtitle_style: str = "reels"
whisper_model: str = "large-v3"
quality: str = "medium"
# LLM za semantično analizo + popravke
llm_provider: str = "claude" # claude / gemini / auto
llm_model: Optional[str] = None # specifičen model (privzeto najboljši za provider)
# STT provider (Scribe je 18x hitreje + boljša multilingual accuracy)
whisper_provider: str = "auto" # auto / elevenlabs / local
# Batch tracking za multi-upload (Telegram summary)
batch_id: Optional[str] = None
# ────────────────────────────────────────────────────────────────
# Upload (file)
# ────────────────────────────────────────────────────────────────
@app.post("/api/upload")
async def upload_video(
file: UploadFile = File(...),
artist: Optional[str] = Form(None),
title: Optional[str] = Form(None),
batch_id: Optional[str] = Form(None),
user: str = Depends(check_auth),
):
if not file.filename:
raise HTTPException(400, "Brez imena")
job_id = uuid.uuid4().hex[:12]
ext = Path(file.filename).suffix or ".mp4"
input_path = UPLOAD_DIR / f"{job_id}{ext}"
size = 0
with input_path.open("wb") as f:
while chunk := await file.read(1024 * 1024):
size += len(chunk)
if size > MAX_UPLOAD_MB * 1024 * 1024:
f.close()
input_path.unlink(missing_ok=True)
raise HTTPException(413, f"Prevelika datoteka (limit {MAX_UPLOAD_MB} MB)")
f.write(chunk)
job = {
"id": job_id,
"source_type": "upload",
"filename": file.filename,
"input_path": str(input_path),
"size_mb": round(size / 1024 / 1024, 2),
"status": "uploaded",
"current_step": "Naloženo, čaka na obdelavo",
"created_at": time.time(),
"updated_at": time.time(),
}
if batch_id:
job["batch_id"] = batch_id
# Artist + title — najprej user-provided, potem parse iz filename
if artist and title:
# User je vpisal ali potrdil
job["parsed_artist"] = artist.strip()
job["parsed_title"] = title.strip()
job["has_clean_name"] = True
else:
# Auto parse iz filename
a, t = parse_artist_title(file.filename)
if a:
job["parsed_artist"] = a
if t:
job["parsed_title"] = t
job["has_clean_name"] = bool(a and t)
save_job(job)
return job
# ────────────────────────────────────────────────────────────────
# YouTube submit
# ────────────────────────────────────────────────────────────────
@app.post("/api/youtube")
async def submit_youtube(
payload: YouTubeJobIn,
background: BackgroundTasks,
user: str = Depends(check_auth),
):
job_id = uuid.uuid4().hex[:12]
job = {
"id": job_id,
"source_type": "youtube",
"youtube_url": payload.url,
"status": "queued",
"current_step": "V vrsti za YouTube prenos",
"created_at": time.time(),
"updated_at": time.time(),
"mode": payload.mode,
"lang": payload.lang,
"auto_chorus": payload.auto_chorus,
"start": payload.start,
"duration": payload.duration,
"no_subs": payload.no_subs,
"subtitle_style": payload.subtitle_style,
"whisper_model": payload.whisper_model,
"quality": payload.quality,
}
save_job(job)
# Queue worker bo pograbil
return job
# ────────────────────────────────────────────────────────────────
# Start processing for uploaded job
# ────────────────────────────────────────────────────────────────
@app.post("/api/process")
async def start_processing(
payload: StartJobIn,
background: BackgroundTasks,
user: str = Depends(check_auth),
):
job = load_job(payload.job_id)
if not job:
raise HTTPException(404, "Job ne obstaja")
update_job(
payload.job_id,
status="queued",
mode=payload.mode,
lang=payload.lang,
auto_chorus=payload.auto_chorus,
include_prebuild=payload.include_prebuild,
start=payload.start,
duration=payload.duration,
max_duration=payload.max_duration,
min_duration=payload.min_duration,
no_subs=payload.no_subs,
subtitle_style=payload.subtitle_style,
whisper_model=payload.whisper_model,
quality=payload.quality,
llm_provider=payload.llm_provider,
llm_model=payload.llm_model,
whisper_provider=payload.whisper_provider,
batch_id=payload.batch_id,
current_step="V vrsti za obdelavo",
# Počisti pretekle napake (retry-friendly)
chorus_error=None,
interrupted_at=None,
)
# Queue worker (background thread) bo pograbil ta job — ne zaganjamo neposredno.
# To pomeni, da se ob več upload-ih obdelujejo zaporedno (1 hkrati = stabilno).
return load_job(payload.job_id)
# ────────────────────────────────────────────────────────────────
# Job queries
# ────────────────────────────────────────────────────────────────
@app.get("/api/jobs")
async def get_jobs(user: str = Depends(check_auth)):
return {"jobs": list_jobs()}
@app.get("/api/jobs/{job_id}")
async def get_job(job_id: str, user: str = Depends(check_auth)):
job = load_job(job_id)
if not job:
raise HTTPException(404, "Ne obstaja")
return _clean_job_titles(job)
@app.get("/api/stream/{job_id}")
async def stream_job(job_id: str, user: str = Depends(check_auth)):
"""Server-Sent Events za real-time status."""
async def gen():
last_status = None
last_step = None
for _ in range(600): # max 10 min stream
job = load_job(job_id)
if not job:
yield f"data: {json.dumps({'error': 'not found'})}\n\n"
return
if job["status"] != last_status or job.get("current_step") != last_step:
yield f"data: {json.dumps(job, ensure_ascii=False)}\n\n"
last_status = job["status"]
last_step = job.get("current_step")
if job["status"] in ("done", "failed"):
return
await asyncio.sleep(1)
return StreamingResponse(gen(), media_type="text/event-stream")
# ────────────────────────────────────────────────────────────────
# Download / preview
# ────────────────────────────────────────────────────────────────
@app.get("/api/download/{job_id}")
async def download(job_id: str, user: str = Depends(check_auth)):
job = load_job(job_id)
if not job or job.get("status") != "done":
raise HTTPException(404, "Ne pripravljen")
out = Path(job["output_path"])
if not out.exists():
raise HTTPException(404, "Output ne obstaja")
# Pametno ime: "Izvajalec - Naslov - REEL.mp4"
download_name = build_download_filename(job)
return FileResponse(
out,
media_type="video/mp4",
filename=download_name,
)
@app.get("/api/preview/{job_id}")
async def preview(job_id: str, request: Request, user: str = Depends(check_auth)):
"""Video preview z Range request podporo (potrebno za HTML5 video player)."""
job = load_job(job_id)
if not job or job.get("status") != "done":
raise HTTPException(404, "Ne pripravljen")
out = Path(job["output_path"])
if not out.exists():
raise HTTPException(404, "Output ne obstaja")
file_size = out.stat().st_size
range_header = request.headers.get("range") or request.headers.get("Range")
if range_header:
# Parse "bytes=START-END"
try:
range_str = range_header.replace("bytes=", "").strip()
start_s, end_s = range_str.split("-")
start = int(start_s) if start_s else 0
end = int(end_s) if end_s else file_size - 1
end = min(end, file_size - 1)
if start > end or start >= file_size:
return Response(status_code=416) # Range Not Satisfiable
chunk_size = end - start + 1
def iter_file():
with open(out, "rb") as f:
f.seek(start)
remaining = chunk_size
while remaining > 0:
read_size = min(64 * 1024, remaining)
data = f.read(read_size)
if not data:
break
remaining -= len(data)
yield data
headers = {
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(chunk_size),
"Content-Type": "video/mp4",
}
return StreamingResponse(iter_file(), status_code=206, headers=headers,
media_type="video/mp4")
except (ValueError, IndexError):
pass
# Brez Range — vrni cel file
return FileResponse(
out,
media_type="video/mp4",
headers={"Accept-Ranges": "bytes", "Content-Length": str(file_size)},
)
@app.delete("/api/jobs/{job_id}")
async def delete_job(job_id: str, user: str = Depends(check_auth)):
job = load_job(job_id)
if not job:
raise HTTPException(404, "Ne obstaja")
for key in ("input_path", "output_path"):
p = job.get(key)
if p and Path(p).exists():
Path(p).unlink(missing_ok=True)
job_path(job_id).unlink(missing_ok=True)
return {"deleted": job_id}
# ─── EDIT FEATURE ────────────────────────────────────────────────
@app.get("/api/source-video/{job_id}")
async def source_video(job_id: str, user: str = Depends(check_auth)):
"""Vrne 16:9 original video za predogled v editorju."""
job = load_job(job_id)
if not job:
raise HTTPException(404, "Ne obstaja")
src = job.get("input_path")
if not src or not Path(src).exists():
raise HTTPException(404, "Original video ne obstaja")
return FileResponse(
path=src,
media_type="video/mp4",
filename=f"source_{job_id}.mp4",
headers={"Accept-Ranges": "bytes"},
)
@app.get("/api/transcript/{job_id}")
async def get_transcript(job_id: str, user: str = Depends(check_auth)):
"""Vrne transkript (segmente + words) za editor."""
job = load_job(job_id)
if not job:
raise HTTPException(404, "Ne obstaja")
analysis_path = OUTPUTS_DIR / f"{job_id}.analysis.json"
if not analysis_path.exists():
raise HTTPException(404, "Analysis ne obstaja")
try:
analysis = json.loads(analysis_path.read_text())
transcript = analysis.get("transcript", {})
clip_range = analysis.get("clip_range", {})
return {
"segments": transcript.get("segments", []),
"language": transcript.get("language", "?"),
"clip_range": clip_range,
"video_duration": job.get("video_duration", 0),
}
except Exception as e:
raise HTTPException(500, f"Napaka pri branju: {e}")
class RecutRequest(BaseModel):
start: float
end: float
custom_segments: Optional[list] = None # [{start, end, text}] za override napisov
no_subs: Optional[bool] = None
@app.post("/api/jobs/{job_id}/recut")
async def recut_job(job_id: str, payload: RecutRequest, user: str = Depends(check_auth)):
"""Re-rendar reel z user-defined timestampi (in opcijsko popravljenimi napisi).
Veliko hitrejše od full pipeline-a:
- Brez Soniox transkripta (re-uporabi shranjenega)
- Brez Claude analize (uporabnik je odločil)
- Samo: clip → reframe → subtitle → output
"""
job = load_job(job_id)
if not job:
raise HTTPException(404, "Ne obstaja")
src = job.get("input_path")
if not src or not Path(src).exists():
raise HTTPException(400, "Original video manjka")
if payload.end <= payload.start:
raise HTTPException(400, "End mora biti večji od start")
if payload.end - payload.start < 5:
raise HTTPException(400, "Trajanje vsaj 5s")
if payload.end - payload.start > 60:
raise HTTPException(400, "Trajanje največ 60s")
duration = payload.end - payload.start
no_subs = payload.no_subs if payload.no_subs is not None else job.get("no_subs", False)
# Naloži obstoječi analysis
analysis_path = OUTPUTS_DIR / f"{job_id}.analysis.json"
if not analysis_path.exists():
raise HTTPException(500, "Analysis manjka — re-uplad pesmi")
analysis = json.loads(analysis_path.read_text())
# Posodobi clip range
analysis["clip_range"] = {
"start": payload.start,
"end": payload.end,
"duration": duration,
"reason": f"USER EDIT: ročno popravljen clip {payload.start:.1f}-{payload.end:.1f}s",
"source": "user_edit",
}
# Če uporabnik popravil napise, override segmenti
if payload.custom_segments:
# Ohrani originalni transcript ampak shrani custom segments za SRT
analysis["custom_subtitle_segments"] = payload.custom_segments
analysis_path.write_text(json.dumps(analysis, indent=2, ensure_ascii=False))
# Re-queue job v processing (worker ga bo obdelal)
update_job(
job_id,
status="queued",
no_subs=no_subs,
custom_clip=True, # flag da preskoči Soniox + Claude
current_step="V vrsti za recut",
error=None,
chorus_error=None,
)
# Briši stari output
out_mp4 = OUTPUTS_DIR / f"{job_id}.mp4"
if out_mp4.exists():
out_mp4.unlink()
for ext in ("srt", "ass"):
p = OUTPUTS_DIR / f"{job_id}.subtitles.{ext}"
if p.exists():
p.unlink()
return {
"status": "queued",
"job_id": job_id,
"start": payload.start,
"end": payload.end,
"duration": duration,
}