Bug found in Žena ME TEPE third re-test: - Scribe transcribed only verse 1 (0-33s) properly - Then returned a single 98s segment [34.7-133.2] with just 1 word 'sam' - This is a known Scribe hallucination on instrumental sections - Result: SRT showed 'SAM SAM SAM SAM...' 14 times across the chorus - Looked completely wrong because the chorus audio was correct but subtitles showed 'SAM' repeatedly Three-part fix: 1. SRT GENERATOR: skip segments > 15s with < 5 words. These are hallucinations and have no real transcription value. 2. SCRIBE TRANSCRIBE: detect hallucinations in returned segments. - Mark segments > 15s with < 5 words as hallucinations - Compute true coverage % (excluding hallucinations) - Add _hallucination_count and _coverage_pct to result 3. TRANSCRIBE_FULL: auto-retry Scribe if quality is poor. - If hallucinations detected OR coverage < 50%, retry once - Keep retry result only if it has better stats - Otherwise fall back to first attempt (still better than nothing) This makes the pipeline robust against Scribe's occasional bad transcripts on songs with long instrumental breaks. Most second attempts succeed where the first failed (random Scribe variance).
1087 lines
44 KiB
Python
1087 lines
44 KiB
Python
"""
|
||
reels.biba.live — FastAPI backend.
|
||
|
||
Endpoints:
|
||
GET / — frontend HTML
|
||
POST /api/upload — naloži video file
|
||
POST /api/youtube — submit YouTube URL
|
||
POST /api/process/{id} — start processing job
|
||
GET /api/jobs — list vseh jobov
|
||
GET /api/jobs/{id} — status job-a
|
||
GET /api/stream/{id} — SSE progress stream
|
||
GET /api/download/{id} — download finalni reel
|
||
GET /api/preview/{id} — preview video stream
|
||
DELETE /api/jobs/{id} — pobriši job + datoteke
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import secrets
|
||
import shutil
|
||
import subprocess
|
||
import time
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from fastapi import (
|
||
FastAPI, UploadFile, File, Form, HTTPException, Depends,
|
||
BackgroundTasks, Request, status
|
||
)
|
||
from fastapi.responses import (
|
||
FileResponse, HTMLResponse, StreamingResponse, JSONResponse, Response
|
||
)
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||
from pydantic import BaseModel
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Config
|
||
# ────────────────────────────────────────────────────────────────
|
||
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
|
||
UPLOAD_DIR = DATA_DIR / "uploads"
|
||
OUTPUT_DIR = DATA_DIR / "outputs"
|
||
JOBS_DIR = DATA_DIR / "jobs"
|
||
SCRIPTS_DIR = Path(__file__).parent.parent / "scripts"
|
||
|
||
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
JOBS_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
AUTH_USER = os.environ.get("AUTH_USER", "sebastjan")
|
||
AUTH_PASS = os.environ.get("AUTH_PASS", "change-me-in-coolify-env")
|
||
|
||
MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "2000"))
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Auth
|
||
# ────────────────────────────────────────────────────────────────
|
||
security = HTTPBasic()
|
||
|
||
|
||
def check_auth(creds: HTTPBasicCredentials = Depends(security)):
|
||
correct_user = secrets.compare_digest(creds.username, AUTH_USER)
|
||
correct_pass = secrets.compare_digest(creds.password, AUTH_PASS)
|
||
if not (correct_user and correct_pass):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Napačno geslo",
|
||
headers={"WWW-Authenticate": "Basic"},
|
||
)
|
||
return creds.username
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Artist + title parsing iz filename / YouTube title
|
||
# ────────────────────────────────────────────────────────────────
|
||
import re
|
||
|
||
_NOISE_PATTERNS = [
|
||
# Pogosti "noise" ki ga je treba odstraniti
|
||
r"\(Official\s+(?:Music\s+)?Video\)",
|
||
r"\(Officia[lk]\s+Audio\)",
|
||
r"\(Offizielles\s+(?:Musik)?[Vv]ideo\)",
|
||
r"\(Lyric[s]?\s+Video\)",
|
||
r"\(Audio\)",
|
||
r"\(HD\)", r"\(HQ\)", r"\(4K\)",
|
||
r"\(Live\)", r"\(Remix\)",
|
||
r"\(Remastered\)", r"\(Remaster(?:ed)?\s*\d{0,4}\)",
|
||
r"\[Official.*?\]", r"\[Music.*?\]", r"\[Audio.*?\]",
|
||
r"\bofficial\s+video\b", r"\bofficial\s+audio\b",
|
||
r"\boriginal\s+(?:video|audio)\b",
|
||
r"\bMV\b", r"\b4K\b", r"\bHD\b", r"\bHQ\b",
|
||
]
|
||
|
||
def parse_artist_title(filename_or_title):
|
||
"""Iz imena datoteke / YouTube naslova ekstrahira (artist, title).
|
||
|
||
Podpira pogoste vzorce:
|
||
- "Artist - Title.mp4"
|
||
- "Artist - Title (Official Music Video).mp4"
|
||
- "Artist – Title" (em-dash)
|
||
- "Artist | Title"
|
||
|
||
Vrne (artist, title) ali (None, None) če ni razvidno.
|
||
"""
|
||
if not filename_or_title:
|
||
return (None, None)
|
||
|
||
# Odstrani extension
|
||
name = Path(filename_or_title).stem if "." in filename_or_title else filename_or_title
|
||
|
||
# Odstrani noise patterns
|
||
for pat in _NOISE_PATTERNS:
|
||
name = re.sub(pat, "", name, flags=re.IGNORECASE)
|
||
|
||
# Normaliziraj presledke
|
||
name = re.sub(r"\s+", " ", name).strip()
|
||
|
||
# Probaj različne separatorje
|
||
for sep in [" - ", " – ", " — ", " | ", " : "]:
|
||
if sep in name:
|
||
parts = name.split(sep, 1)
|
||
artist = parts[0].strip()
|
||
title = parts[1].strip()
|
||
# Strip trailing/leading puncutation
|
||
artist = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', artist)
|
||
title = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', title)
|
||
if artist and title and len(artist) <= 80 and len(title) <= 100:
|
||
return (artist, title)
|
||
|
||
return (None, None)
|
||
|
||
|
||
def safe_filename(s, max_len=80):
|
||
"""Naredi varno ime datoteke (brez znakov ki bi razbili FS)."""
|
||
if not s:
|
||
return ""
|
||
# Replace problematic chars with safe alternative
|
||
s = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', s)
|
||
s = re.sub(r'\s+', ' ', s).strip()
|
||
return s[:max_len]
|
||
|
||
|
||
def build_download_filename(job):
|
||
"""Sestavi pravilno ime download datoteke iz job metadata."""
|
||
# Najprej probaj job-shranjene parsed values
|
||
artist = job.get("parsed_artist")
|
||
title = job.get("parsed_title")
|
||
|
||
# Fallback: parse from filename
|
||
if not artist or not title:
|
||
source = job.get("filename") or job.get("youtube_title") or ""
|
||
parsed_artist, parsed_title = parse_artist_title(source)
|
||
artist = artist or parsed_artist
|
||
title = title or parsed_title
|
||
|
||
if artist and title:
|
||
return f"{safe_filename(artist)} - {safe_filename(title)} - REEL.mp4"
|
||
if title:
|
||
return f"{safe_filename(title)} - REEL.mp4"
|
||
# Last resort: job ID (vendar to bi se moralo preprečiti že ob upload-u)
|
||
return f"reel_{job['id']}.mp4"
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Job state (filesystem-based, persistent prek restartov)
|
||
# ────────────────────────────────────────────────────────────────
|
||
def job_path(job_id):
|
||
return JOBS_DIR / f"{job_id}.json"
|
||
|
||
|
||
def load_job(job_id):
|
||
p = job_path(job_id)
|
||
if not p.exists():
|
||
return None
|
||
return json.loads(p.read_text())
|
||
|
||
|
||
def save_job(job):
|
||
job_path(job["id"]).write_text(json.dumps(job, ensure_ascii=False, indent=2))
|
||
|
||
|
||
def update_job(job_id, **kwargs):
|
||
job = load_job(job_id)
|
||
if not job:
|
||
return None
|
||
job.update(kwargs)
|
||
job["updated_at"] = time.time()
|
||
save_job(job)
|
||
return job
|
||
|
||
|
||
def list_jobs():
|
||
out = []
|
||
for f in sorted(JOBS_DIR.glob("*.json"), reverse=True):
|
||
try:
|
||
out.append(json.loads(f.read_text()))
|
||
except Exception:
|
||
pass
|
||
return out
|
||
|
||
|
||
def generate_srt_from_segments(segments, clip_start, clip_end, output_path):
|
||
"""Generira SRT samo za dele, ki spadajo v [clip_start, clip_end].
|
||
|
||
Timestamp-i so re-mapirani na 0-based (kot je v trim-anem videu).
|
||
Razdeli dolge segmente (>2.5s) na enake kose za hiter pacing v reels stilu.
|
||
Vse besedilo VELIKE TISKANE ČRKE.
|
||
"""
|
||
MAX_CHUNK_DURATION = 2.5
|
||
|
||
def fmt_ts(s):
|
||
h = int(s // 3600)
|
||
m = int((s % 3600) // 60)
|
||
sec = s % 60
|
||
return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",")
|
||
|
||
lines = []
|
||
idx = 1
|
||
|
||
for seg in segments:
|
||
s_start = float(seg["start"])
|
||
s_end = float(seg["end"])
|
||
text = str(seg["text"]).strip()
|
||
words = seg.get("words", []) or []
|
||
|
||
# Filter v range
|
||
if s_end <= clip_start or s_start >= clip_end:
|
||
continue
|
||
|
||
# ── HALLUCINATION FILTER ──
|
||
# STT (Scribe, Whisper) občasno halucinira pri dolgih instrumentalih:
|
||
# vrne segment 60-100s z 1-2 besedama. Tak segment ne smemo dati v SRT.
|
||
# Pravilo: če je segment > 15s IN ima < 5 besed (ali words array < 5),
|
||
# je verjetno halucinacija.
|
||
seg_dur = s_end - s_start
|
||
word_count = len(words) if words else len(text.split())
|
||
if seg_dur > 15 and word_count < 5:
|
||
print(f"[SRT] Preskočil halucinacijski segment [{s_start:.1f}-{s_end:.1f}] "
|
||
f"({seg_dur:.1f}s, {word_count} besed): {text[:50]!r}", flush=True)
|
||
continue
|
||
|
||
# Če segment delno štrli iz clip range-a IN imamo word-level timestampe,
|
||
# uporabi samo tiste besede ki dejansko padejo v clip range
|
||
# (sicer subtitle vsebuje besedilo iz prejšnjega/naslednjega refrena/verza)
|
||
if words and (s_start < clip_start or s_end > clip_end):
|
||
words_in_clip = []
|
||
for w in words:
|
||
w_start = float(w.get("start", 0))
|
||
w_end = float(w.get("end", 0))
|
||
w_text = w.get("text", "").strip()
|
||
if not w_text:
|
||
continue
|
||
# Beseda padeva v clip če se prekriva (ne mora biti popolnoma znotraj)
|
||
if w_end > clip_start and w_start < clip_end:
|
||
words_in_clip.append({
|
||
"start": max(w_start, clip_start),
|
||
"end": min(w_end, clip_end),
|
||
"text": w_text,
|
||
})
|
||
|
||
if not words_in_clip:
|
||
continue
|
||
|
||
# Reconstruiraj segment z dejanskim word-level timing-om
|
||
text = " ".join(w["text"] for w in words_in_clip)
|
||
s_start = words_in_clip[0]["start"]
|
||
s_end = words_in_clip[-1]["end"]
|
||
else:
|
||
# Klipni segment začetek/konec na clip range
|
||
s_start = max(s_start, clip_start)
|
||
s_end = min(s_end, clip_end)
|
||
|
||
if s_end - s_start < 0.2:
|
||
continue
|
||
|
||
# Re-mapraj na 0-based
|
||
rel_start = s_start - clip_start
|
||
rel_end = s_end - clip_start
|
||
|
||
if not text:
|
||
continue
|
||
text_upper = text.upper()
|
||
|
||
# Razdeli na chunk-e če je predolg
|
||
duration = rel_end - rel_start
|
||
if duration <= MAX_CHUNK_DURATION:
|
||
lines.append(f"{idx}\n{fmt_ts(rel_start)} --> {fmt_ts(rel_end)}\n{text_upper}\n")
|
||
idx += 1
|
||
else:
|
||
# Razdeli na N enakih kosov; če ima Whisper word-timing, jih lahko razdelimo bolje,
|
||
# ampak za zdaj enako razdelimo
|
||
n_parts = int(duration / MAX_CHUNK_DURATION) + 1
|
||
words = text_upper.split()
|
||
words_per_part = max(1, len(words) // n_parts)
|
||
chunk_dur = duration / n_parts
|
||
for i in range(n_parts):
|
||
cs = rel_start + i * chunk_dur
|
||
ce = rel_start + (i + 1) * chunk_dur
|
||
# Vzemi pripadajoče besede
|
||
wstart = i * words_per_part
|
||
wend = (i + 1) * words_per_part if i < n_parts - 1 else len(words)
|
||
chunk_text = " ".join(words[wstart:wend]) if wstart < len(words) else text_upper
|
||
if not chunk_text.strip():
|
||
chunk_text = text_upper
|
||
lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text.strip()}\n")
|
||
idx += 1
|
||
|
||
with open(output_path, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(lines))
|
||
return output_path
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Pipeline runner (background task)
|
||
# ────────────────────────────────────────────────────────────────
|
||
def run_subprocess_logged(cmd, job_id, step_name):
|
||
"""Pokliče subprocess, logi gredo v job."""
|
||
update_job(job_id, current_step=step_name, status="processing")
|
||
proc = subprocess.run(cmd, capture_output=True, text=True)
|
||
if proc.returncode != 0:
|
||
# Combine stdout + stderr za diagnostiko
|
||
err_msg = (proc.stderr or "") + "\n" + (proc.stdout or "")
|
||
update_job(
|
||
job_id,
|
||
status="failed",
|
||
error=f"{step_name}: {err_msg[-800:].strip()}",
|
||
)
|
||
return False
|
||
# Tudi pri success-u beleži stderr za diagnostiko (samo zadnji del)
|
||
if proc.stderr and proc.stderr.strip():
|
||
update_job(job_id, last_step_log=proc.stderr[-500:].strip())
|
||
return True
|
||
|
||
|
||
def process_job(job_id):
|
||
"""Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs."""
|
||
job = load_job(job_id)
|
||
if not job:
|
||
return
|
||
|
||
try:
|
||
# ── 1. Source preparation ─────────────────────────────
|
||
if job["source_type"] == "youtube":
|
||
update_job(job_id, status="downloading", current_step="YouTube download")
|
||
input_path = UPLOAD_DIR / f"{job_id}_yt.mp4"
|
||
cmd = [
|
||
"python3", str(SCRIPTS_DIR / "yt_download.py"),
|
||
job["youtube_url"], str(input_path),
|
||
]
|
||
if not run_subprocess_logged(cmd, job_id, "YouTube download"):
|
||
return
|
||
update_job(job_id, input_path=str(input_path))
|
||
|
||
# Probaj dobiti YT naslov za artist+title parsing
|
||
try:
|
||
info_cmd = [
|
||
"python3", str(SCRIPTS_DIR / "yt_download.py"),
|
||
job["youtube_url"], "/dev/null", "--info-only",
|
||
]
|
||
proc = subprocess.run(info_cmd, capture_output=True, text=True, timeout=30)
|
||
if proc.returncode == 0 and proc.stdout:
|
||
info = json.loads(proc.stdout)
|
||
yt_title = info.get("title", "")
|
||
if yt_title:
|
||
a, t = parse_artist_title(yt_title)
|
||
updates = {"youtube_title": yt_title}
|
||
if a:
|
||
updates["parsed_artist"] = a
|
||
if t:
|
||
updates["parsed_title"] = t
|
||
updates["has_clean_name"] = bool(a and t)
|
||
update_job(job_id, **updates)
|
||
# Reload job for downstream use
|
||
job = load_job(job_id)
|
||
except Exception as e:
|
||
print(f"⚠️ Cannot fetch YT title: {e}", flush=True)
|
||
else:
|
||
input_path = Path(job["input_path"])
|
||
|
||
# ── 1b. Music recognition (ACRCloud) — če nimamo artist+title ─────
|
||
# Tudi za YouTube jobs lahko naslov ni razviden (npr. iz playliste, "Track 5")
|
||
if not (job.get("parsed_artist") and job.get("parsed_title")):
|
||
update_job(job_id, current_step="Avto-prepoznavam pesem (ACRCloud)")
|
||
try:
|
||
acr_cmd = [
|
||
"python3", str(SCRIPTS_DIR / "acr_recognize.py"),
|
||
str(input_path),
|
||
]
|
||
proc = subprocess.run(acr_cmd, capture_output=True, text=True, timeout=120)
|
||
if proc.returncode == 0 and proc.stdout:
|
||
data = json.loads(proc.stdout)
|
||
a, t = data.get("artist"), data.get("title")
|
||
if a and t:
|
||
update_job(
|
||
job_id,
|
||
parsed_artist=a, parsed_title=t,
|
||
has_clean_name=True,
|
||
recognized_via="acrcloud",
|
||
)
|
||
job = load_job(job_id)
|
||
print(f"✅ ACR prepoznal: {a} - {t}", flush=True)
|
||
else:
|
||
print(f"⚠️ ACR ni prepoznal pesmi", flush=True)
|
||
else:
|
||
print(f"⚠️ ACR exit {proc.returncode}: {proc.stderr[:200]}", flush=True)
|
||
except Exception as e:
|
||
print(f"⚠️ ACR error: {e}", flush=True)
|
||
|
||
# ── 2. Smart analysis (če auto_chorus) ──────────────────────────
|
||
if job.get("auto_chorus"):
|
||
update_job(job_id, current_step="Analiza pesmi (transkript + energija)")
|
||
analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json"
|
||
cmd = [
|
||
"python3", str(SCRIPTS_DIR / "analyze.py"),
|
||
str(input_path),
|
||
"--target-duration", str(job.get("duration", 30)),
|
||
"--max-duration", str(job.get("max_duration", 45)),
|
||
"--min-duration", str(job.get("min_duration", 20)),
|
||
"--output", str(analysis_path),
|
||
]
|
||
if job.get("include_prebuild"):
|
||
cmd += ["--include-prebuild"]
|
||
# LLM provider (claude/gemini/auto)
|
||
if job.get("llm_provider"):
|
||
cmd += ["--llm-provider", job["llm_provider"]]
|
||
if job.get("llm_model"):
|
||
cmd += ["--llm-model", job["llm_model"]]
|
||
# Filename hint za Claude/Scribe — preferiraj parsed artist+title (čistejše)
|
||
if job.get("parsed_artist") and job.get("parsed_title"):
|
||
fn_hint = f"{job['parsed_artist']} - {job['parsed_title']}"
|
||
cmd += ["--filename-hint", fn_hint]
|
||
elif job.get("filename"):
|
||
fn_hint = Path(job["filename"]).stem
|
||
cmd += ["--filename-hint", fn_hint]
|
||
# STT provider (elevenlabs = Scribe, local = faster-whisper, auto = preferiraj Scribe)
|
||
if job.get("whisper_provider"):
|
||
cmd += ["--whisper-provider", job["whisper_provider"]]
|
||
# lang: če None ali 'auto', pusti analyze.py auto-detect
|
||
if job.get("lang") and job["lang"] not in ("auto", ""):
|
||
cmd += ["--lang", job["lang"]]
|
||
cmd += ["--model", job.get("whisper_model", "large-v3")]
|
||
|
||
proc = subprocess.run(cmd, capture_output=True, text=True)
|
||
srt_from_claude = None # Pot do SRT iz Claude-popravljenega transcript-a
|
||
if proc.returncode == 0 and analysis_path.exists():
|
||
try:
|
||
with open(analysis_path, "r", encoding="utf-8") as f:
|
||
analysis = json.load(f)
|
||
cr = analysis["clip_range"]
|
||
fade = analysis["fade"]
|
||
|
||
# Generiraj SRT iz transcript-a TRIM-ANEGA na clip_range
|
||
# (Claude je morda popravil besedilo — uporabi popravljeno)
|
||
if analysis.get("transcript", {}).get("segments"):
|
||
srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt"
|
||
try:
|
||
generate_srt_from_segments(
|
||
analysis["transcript"]["segments"],
|
||
cr["start"], cr["end"],
|
||
srt_path_out,
|
||
)
|
||
srt_from_claude = str(srt_path_out)
|
||
llm_src = cr.get("source", "LLM")
|
||
print(f"📝 Generated SRT from {llm_src} transcript: {srt_path_out}")
|
||
except Exception as e:
|
||
print(f"⚠️ SRT generation failed: {e}")
|
||
|
||
update_job(
|
||
job_id,
|
||
analysis_summary={
|
||
"language": analysis["language"],
|
||
"language_probability": analysis["language_probability"],
|
||
"instrumental": analysis["instrumental"],
|
||
"clip_range": cr,
|
||
"fade": fade,
|
||
"chorus_preview": analysis["chorus"]["best"]["text_preview"]
|
||
if analysis.get("chorus") and analysis["chorus"].get("best") else None,
|
||
"video_duration": analysis.get("video_duration"),
|
||
"candidates": analysis["chorus"].get("all_candidates", [])[:5]
|
||
if analysis.get("chorus") else [],
|
||
"claude_corrected_text": analysis.get("claude_corrected_text", False),
|
||
},
|
||
# Cel transkript shranimo za UI prikaz
|
||
full_transcript=[
|
||
{"start": s["start"], "end": s["end"], "text": s["text"]}
|
||
for s in analysis.get("transcript", {}).get("segments", [])
|
||
],
|
||
start=cr["start"],
|
||
duration=cr["duration"],
|
||
fade_in=fade["fade_in"],
|
||
fade_out=fade["fade_out"],
|
||
detected_language=analysis["language"],
|
||
is_instrumental=analysis["instrumental"],
|
||
claude_srt_path=srt_from_claude,
|
||
)
|
||
# Auto-disable subs za instrumental
|
||
if analysis["instrumental"] and not job.get("no_subs"):
|
||
update_job(job_id, no_subs=True, auto_disabled_subs=True)
|
||
# Reload local dict
|
||
job = load_job(job_id)
|
||
except (json.JSONDecodeError, KeyError) as e:
|
||
update_job(job_id, chorus_error=f"Analysis parse: {e}")
|
||
else:
|
||
update_job(job_id, chorus_error=(proc.stderr or "")[-500:])
|
||
|
||
# ── 3. Reframe + subtitles (clip.py orchestrator) ─────
|
||
output_path = OUTPUT_DIR / f"{job_id}.mp4"
|
||
update_job(job_id, current_step="Reframe + subtitles")
|
||
|
||
cmd = [
|
||
"python3", str(SCRIPTS_DIR / "clip.py"),
|
||
str(input_path), str(output_path),
|
||
"--mode", job.get("mode", "track"),
|
||
"--quality", job.get("quality", "medium"),
|
||
"--style", job.get("subtitle_style", "reels"),
|
||
]
|
||
if job.get("start") is not None:
|
||
cmd += ["--start", str(job["start"])]
|
||
if job.get("duration") is not None:
|
||
cmd += ["--duration", str(job["duration"])]
|
||
if job.get("fade_in", 0) > 0:
|
||
cmd += ["--fade-in", str(job["fade_in"])]
|
||
if job.get("fade_out", 0) > 0:
|
||
cmd += ["--fade-out", str(job["fade_out"])]
|
||
# SRT iz Claude (boljše besedilo) — preda direktno v subtitle.py
|
||
if job.get("claude_srt_path") and Path(job["claude_srt_path"]).exists() and not job.get("no_subs"):
|
||
cmd += ["--srt", job["claude_srt_path"]]
|
||
# lang: prefer detected_language če auto
|
||
chosen_lang = job.get("lang")
|
||
if chosen_lang in (None, "auto", ""):
|
||
chosen_lang = job.get("detected_language")
|
||
if chosen_lang:
|
||
cmd += ["--lang", chosen_lang]
|
||
if job.get("no_subs"):
|
||
cmd += ["--no-subs"]
|
||
cmd += ["--model", job.get("whisper_model", "large-v3")]
|
||
|
||
# DEBUG: zapiši natanko kakšen ukaz se izvede
|
||
update_job(job_id, debug_clip_cmd=" ".join(cmd))
|
||
|
||
if not run_subprocess_logged(cmd, job_id, "Reframe + subtitles"):
|
||
return
|
||
|
||
# ── Done ──────────────────────────────────────────────
|
||
if output_path.exists():
|
||
update_job(
|
||
job_id,
|
||
status="done",
|
||
current_step="Končano",
|
||
output_path=str(output_path),
|
||
output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
|
||
)
|
||
# Telegram obvestilo
|
||
try:
|
||
from app.telegram import notify_job_done
|
||
final_job = load_job(job_id)
|
||
notify_job_done(final_job)
|
||
except Exception as e:
|
||
print(f"⚠️ TG notify_job_done failed: {e}", flush=True)
|
||
|
||
# Batch tracking — če je zadnji v batchu, pošlji summary
|
||
_try_finalize_batch(job_id)
|
||
else:
|
||
update_job(
|
||
job_id,
|
||
status="failed",
|
||
error="Output datoteka ne obstaja po obdelavi",
|
||
)
|
||
try:
|
||
from app.telegram import notify_job_failed
|
||
notify_job_failed(load_job(job_id), "Output datoteka ne obstaja")
|
||
except Exception:
|
||
pass
|
||
_try_finalize_batch(job_id)
|
||
except Exception as e:
|
||
update_job(job_id, status="failed", error=str(e))
|
||
try:
|
||
from app.telegram import notify_job_failed
|
||
notify_job_failed(load_job(job_id), str(e))
|
||
except Exception:
|
||
pass
|
||
_try_finalize_batch(job_id)
|
||
|
||
|
||
def _try_finalize_batch(job_id):
|
||
"""Če je job del batch-a, preveri če so vsi zaključili in pošlji summary."""
|
||
try:
|
||
job = load_job(job_id)
|
||
batch_id = job.get("batch_id")
|
||
if not batch_id:
|
||
return
|
||
# Preštej batch jobe
|
||
all_jobs = [load_job(jp.stem) for jp in JOBS_DIR.glob("*.json")]
|
||
batch_jobs = [j for j in all_jobs if j and j.get("batch_id") == batch_id]
|
||
unfinished = [j for j in batch_jobs if j.get("status") not in ("done", "failed")]
|
||
if unfinished:
|
||
return # še niso vsi končani
|
||
# Vsi so končani — pošlji summary
|
||
total = len(batch_jobs)
|
||
succeeded = len([j for j in batch_jobs if j.get("status") == "done"])
|
||
failed = total - succeeded
|
||
try:
|
||
from app.telegram import notify_batch_complete
|
||
notify_batch_complete(batch_id, total, succeeded, failed)
|
||
except Exception as e:
|
||
print(f"⚠️ TG batch summary failed: {e}", flush=True)
|
||
except Exception as e:
|
||
print(f"⚠️ _try_finalize_batch error: {e}", flush=True)
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# FastAPI app
|
||
# ────────────────────────────────────────────────────────────────
|
||
app = FastAPI(title="Reels Clipper")
|
||
app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "static"), name="static")
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Queue worker — procesira queued jobe enega za drugim
|
||
# ────────────────────────────────────────────────────────────────
|
||
import threading
|
||
_queue_lock = threading.Lock()
|
||
_queue_running = {"current_job": None} # tracked v dict da je accessible iz threadov
|
||
|
||
|
||
def _queue_worker():
|
||
"""Background thread ki preverja queued jobe in jih obdeluje 1 po 1.
|
||
|
||
Teče forever, sleep 3s med iteracijami če ni dela.
|
||
"""
|
||
print("🚜 Queue worker zagnan", flush=True)
|
||
while True:
|
||
try:
|
||
# Že nekaj v obdelavi? Počakaj.
|
||
if _queue_running["current_job"]:
|
||
# Preverim ali je status še processing
|
||
cur = load_job(_queue_running["current_job"])
|
||
if cur and cur.get("status") in ("processing", "downloading"):
|
||
time.sleep(3)
|
||
continue
|
||
# Ni več v obdelavi → sprosti
|
||
_queue_running["current_job"] = None
|
||
|
||
# Najdi naslednjega "queued" joba (FIFO po created_at)
|
||
queued_jobs = []
|
||
for f in JOBS_DIR.glob("*.json"):
|
||
try:
|
||
j = json.loads(f.read_text())
|
||
if j.get("status") == "queued":
|
||
queued_jobs.append(j)
|
||
except Exception:
|
||
continue
|
||
|
||
if not queued_jobs:
|
||
time.sleep(3)
|
||
continue
|
||
|
||
queued_jobs.sort(key=lambda x: x.get("created_at", 0))
|
||
next_job = queued_jobs[0]
|
||
job_id = next_job["id"]
|
||
|
||
# Mark "processing" + zaženi
|
||
with _queue_lock:
|
||
_queue_running["current_job"] = job_id
|
||
|
||
print(f"🚜 Queue worker: obdelujem {job_id}", flush=True)
|
||
try:
|
||
process_job(job_id)
|
||
except Exception as e:
|
||
print(f"❌ Queue worker error pri {job_id}: {e}", flush=True)
|
||
update_job(job_id, status="failed", error=f"Queue worker: {e}")
|
||
finally:
|
||
with _queue_lock:
|
||
_queue_running["current_job"] = None
|
||
except Exception as e:
|
||
print(f"❌ Queue worker outer error: {e}", flush=True)
|
||
time.sleep(5)
|
||
|
||
|
||
# Zaženi worker v ozadju (samo enkrat)
|
||
_worker_thread = None
|
||
def _start_queue_worker():
|
||
global _worker_thread
|
||
if _worker_thread is None or not _worker_thread.is_alive():
|
||
_worker_thread = threading.Thread(target=_queue_worker, daemon=True)
|
||
_worker_thread.start()
|
||
|
||
|
||
@app.on_event("startup")
|
||
async def resume_or_cleanup_jobs():
|
||
"""Ob startu containerja: avto-resume processing jobs ali jih označi kot error.
|
||
|
||
Ko Coolify deploya nov container, prejšnji se ubije sredi obdelave,
|
||
JSON file pa ostane status='processing'.
|
||
|
||
Strategija:
|
||
- Če je analyze.json že narejen (analiza je končana) → resume z reframe+subs
|
||
- Če ni analyze.json → restart pipeline od začetka
|
||
- Po 3 napakah (resume_attempts >= 3) → mark error
|
||
"""
|
||
import asyncio
|
||
print("🔄 Preverjam in obnavljam jobs po restart-u...")
|
||
resumed_count = 0
|
||
error_count = 0
|
||
|
||
for f in JOBS_DIR.glob("*.json"):
|
||
try:
|
||
j = json.loads(f.read_text())
|
||
if j.get("status") != "processing":
|
||
continue
|
||
|
||
job_id = j.get("job_id") or f.stem
|
||
attempts = j.get("resume_attempts", 0)
|
||
|
||
# Po 3 neuspehih nehamo
|
||
if attempts >= 3:
|
||
j["status"] = "error"
|
||
j["current_step"] = "Preveč napak pri ponovnem zagonu"
|
||
j["chorus_error"] = f"Job restartal {attempts} krat — napaka v pipeline-u"
|
||
j["updated_at"] = time.time()
|
||
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
|
||
error_count += 1
|
||
continue
|
||
|
||
# Preveri ali input file še obstaja
|
||
input_path = UPLOAD_DIR / f"{job_id}.mp4"
|
||
if not input_path.exists():
|
||
j["status"] = "error"
|
||
j["current_step"] = "Vhodna datoteka ne obstaja"
|
||
j["chorus_error"] = f"Upload {job_id}.mp4 ne obstaja po restart-u"
|
||
j["updated_at"] = time.time()
|
||
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
|
||
error_count += 1
|
||
continue
|
||
|
||
# Resume: status=queued, +1 attempt, in pošlji v background
|
||
j["status"] = "queued"
|
||
j["resume_attempts"] = attempts + 1
|
||
j["current_step"] = f"Avto-resume po restart-u (poskus {attempts + 1}/3)"
|
||
j["last_resume_at"] = time.time()
|
||
j["updated_at"] = time.time()
|
||
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
|
||
resumed_count += 1
|
||
# Pošlji v background po startup-u (ne smemo blokirati startup)
|
||
asyncio.create_task(_resume_job_async(job_id))
|
||
print(f" 🔁 Resume {job_id} (attempt {attempts + 1}/3)")
|
||
except Exception as e:
|
||
print(f" ⚠️ Napaka pri {f.name}: {e}")
|
||
|
||
print(f" ✅ Resumed: {resumed_count}, Error: {error_count}")
|
||
|
||
# Zaženi queue worker za queued jobe (multi-upload batch)
|
||
_start_queue_worker()
|
||
|
||
|
||
async def _resume_job_async(job_id):
|
||
"""Pomožna funkcija ki zažene process_job v background-u."""
|
||
import asyncio
|
||
# Počakaj kratek čas da je startup končan
|
||
await asyncio.sleep(2)
|
||
try:
|
||
# process_job je sync funkcija, izvedi v thread executor
|
||
loop = asyncio.get_event_loop()
|
||
await loop.run_in_executor(None, process_job, job_id)
|
||
except Exception as e:
|
||
print(f" ❌ Resume failed for {job_id}: {e}")
|
||
|
||
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def index(user: str = Depends(check_auth)):
|
||
html = (Path(__file__).parent.parent / "templates" / "index.html").read_text()
|
||
return html
|
||
|
||
|
||
@app.get("/healthz")
|
||
async def healthz():
|
||
return {"ok": True}
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Job models
|
||
# ────────────────────────────────────────────────────────────────
|
||
class YouTubeJobIn(BaseModel):
|
||
url: str
|
||
mode: str = "track"
|
||
lang: Optional[str] = None
|
||
auto_chorus: bool = True
|
||
start: Optional[float] = None
|
||
duration: Optional[float] = 30
|
||
no_subs: bool = False
|
||
subtitle_style: str = "reels"
|
||
whisper_model: str = "large-v3"
|
||
quality: str = "medium"
|
||
|
||
|
||
class StartJobIn(BaseModel):
|
||
job_id: str
|
||
mode: str = "track"
|
||
lang: Optional[str] = None # None/auto = Whisper auto-detect
|
||
auto_chorus: bool = True
|
||
include_prebuild: bool = False # vključi pre-chorus build-up
|
||
start: Optional[float] = None
|
||
duration: Optional[float] = 30
|
||
max_duration: Optional[float] = 45
|
||
min_duration: Optional[float] = 20
|
||
no_subs: bool = False
|
||
subtitle_style: str = "reels"
|
||
whisper_model: str = "large-v3"
|
||
quality: str = "medium"
|
||
# LLM za semantično analizo + popravke
|
||
llm_provider: str = "claude" # claude / gemini / auto
|
||
llm_model: Optional[str] = None # specifičen model (privzeto najboljši za provider)
|
||
# STT provider (Scribe je 18x hitreje + boljša multilingual accuracy)
|
||
whisper_provider: str = "auto" # auto / elevenlabs / local
|
||
# Batch tracking za multi-upload (Telegram summary)
|
||
batch_id: Optional[str] = None
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Upload (file)
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.post("/api/upload")
|
||
async def upload_video(
|
||
file: UploadFile = File(...),
|
||
artist: Optional[str] = Form(None),
|
||
title: Optional[str] = Form(None),
|
||
batch_id: Optional[str] = Form(None),
|
||
user: str = Depends(check_auth),
|
||
):
|
||
if not file.filename:
|
||
raise HTTPException(400, "Brez imena")
|
||
|
||
job_id = uuid.uuid4().hex[:12]
|
||
ext = Path(file.filename).suffix or ".mp4"
|
||
input_path = UPLOAD_DIR / f"{job_id}{ext}"
|
||
|
||
size = 0
|
||
with input_path.open("wb") as f:
|
||
while chunk := await file.read(1024 * 1024):
|
||
size += len(chunk)
|
||
if size > MAX_UPLOAD_MB * 1024 * 1024:
|
||
f.close()
|
||
input_path.unlink(missing_ok=True)
|
||
raise HTTPException(413, f"Prevelika datoteka (limit {MAX_UPLOAD_MB} MB)")
|
||
f.write(chunk)
|
||
|
||
job = {
|
||
"id": job_id,
|
||
"source_type": "upload",
|
||
"filename": file.filename,
|
||
"input_path": str(input_path),
|
||
"size_mb": round(size / 1024 / 1024, 2),
|
||
"status": "uploaded",
|
||
"current_step": "Naloženo, čaka na obdelavo",
|
||
"created_at": time.time(),
|
||
"updated_at": time.time(),
|
||
}
|
||
|
||
if batch_id:
|
||
job["batch_id"] = batch_id
|
||
|
||
# Artist + title — najprej user-provided, potem parse iz filename
|
||
if artist and title:
|
||
# User je vpisal ali potrdil
|
||
job["parsed_artist"] = artist.strip()
|
||
job["parsed_title"] = title.strip()
|
||
job["has_clean_name"] = True
|
||
else:
|
||
# Auto parse iz filename
|
||
a, t = parse_artist_title(file.filename)
|
||
if a:
|
||
job["parsed_artist"] = a
|
||
if t:
|
||
job["parsed_title"] = t
|
||
job["has_clean_name"] = bool(a and t)
|
||
|
||
save_job(job)
|
||
return job
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# YouTube submit
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.post("/api/youtube")
|
||
async def submit_youtube(
|
||
payload: YouTubeJobIn,
|
||
background: BackgroundTasks,
|
||
user: str = Depends(check_auth),
|
||
):
|
||
job_id = uuid.uuid4().hex[:12]
|
||
job = {
|
||
"id": job_id,
|
||
"source_type": "youtube",
|
||
"youtube_url": payload.url,
|
||
"status": "queued",
|
||
"current_step": "V vrsti za YouTube prenos",
|
||
"created_at": time.time(),
|
||
"updated_at": time.time(),
|
||
"mode": payload.mode,
|
||
"lang": payload.lang,
|
||
"auto_chorus": payload.auto_chorus,
|
||
"start": payload.start,
|
||
"duration": payload.duration,
|
||
"no_subs": payload.no_subs,
|
||
"subtitle_style": payload.subtitle_style,
|
||
"whisper_model": payload.whisper_model,
|
||
"quality": payload.quality,
|
||
}
|
||
save_job(job)
|
||
# Queue worker bo pograbil
|
||
return job
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Start processing for uploaded job
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.post("/api/process")
|
||
async def start_processing(
|
||
payload: StartJobIn,
|
||
background: BackgroundTasks,
|
||
user: str = Depends(check_auth),
|
||
):
|
||
job = load_job(payload.job_id)
|
||
if not job:
|
||
raise HTTPException(404, "Job ne obstaja")
|
||
|
||
update_job(
|
||
payload.job_id,
|
||
status="queued",
|
||
mode=payload.mode,
|
||
lang=payload.lang,
|
||
auto_chorus=payload.auto_chorus,
|
||
include_prebuild=payload.include_prebuild,
|
||
start=payload.start,
|
||
duration=payload.duration,
|
||
max_duration=payload.max_duration,
|
||
min_duration=payload.min_duration,
|
||
no_subs=payload.no_subs,
|
||
subtitle_style=payload.subtitle_style,
|
||
whisper_model=payload.whisper_model,
|
||
quality=payload.quality,
|
||
llm_provider=payload.llm_provider,
|
||
llm_model=payload.llm_model,
|
||
whisper_provider=payload.whisper_provider,
|
||
batch_id=payload.batch_id,
|
||
current_step="V vrsti za obdelavo",
|
||
# Počisti pretekle napake (retry-friendly)
|
||
chorus_error=None,
|
||
interrupted_at=None,
|
||
)
|
||
# Queue worker (background thread) bo pograbil ta job — ne zaganjamo neposredno.
|
||
# To pomeni, da se ob več upload-ih obdelujejo zaporedno (1 hkrati = stabilno).
|
||
return load_job(payload.job_id)
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Job queries
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.get("/api/jobs")
|
||
async def get_jobs(user: str = Depends(check_auth)):
|
||
return {"jobs": list_jobs()}
|
||
|
||
|
||
@app.get("/api/jobs/{job_id}")
|
||
async def get_job(job_id: str, user: str = Depends(check_auth)):
|
||
job = load_job(job_id)
|
||
if not job:
|
||
raise HTTPException(404, "Ne obstaja")
|
||
return job
|
||
|
||
|
||
@app.get("/api/stream/{job_id}")
|
||
async def stream_job(job_id: str, user: str = Depends(check_auth)):
|
||
"""Server-Sent Events za real-time status."""
|
||
|
||
async def gen():
|
||
last_status = None
|
||
last_step = None
|
||
for _ in range(600): # max 10 min stream
|
||
job = load_job(job_id)
|
||
if not job:
|
||
yield f"data: {json.dumps({'error': 'not found'})}\n\n"
|
||
return
|
||
if job["status"] != last_status or job.get("current_step") != last_step:
|
||
yield f"data: {json.dumps(job, ensure_ascii=False)}\n\n"
|
||
last_status = job["status"]
|
||
last_step = job.get("current_step")
|
||
if job["status"] in ("done", "failed"):
|
||
return
|
||
await asyncio.sleep(1)
|
||
|
||
return StreamingResponse(gen(), media_type="text/event-stream")
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Download / preview
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.get("/api/download/{job_id}")
|
||
async def download(job_id: str, user: str = Depends(check_auth)):
|
||
job = load_job(job_id)
|
||
if not job or job.get("status") != "done":
|
||
raise HTTPException(404, "Ne pripravljen")
|
||
out = Path(job["output_path"])
|
||
if not out.exists():
|
||
raise HTTPException(404, "Output ne obstaja")
|
||
|
||
# Pametno ime: "Izvajalec - Naslov - REEL.mp4"
|
||
download_name = build_download_filename(job)
|
||
|
||
return FileResponse(
|
||
out,
|
||
media_type="video/mp4",
|
||
filename=download_name,
|
||
)
|
||
|
||
|
||
@app.get("/api/preview/{job_id}")
|
||
async def preview(job_id: str, request: Request, user: str = Depends(check_auth)):
|
||
"""Video preview z Range request podporo (potrebno za HTML5 video player)."""
|
||
job = load_job(job_id)
|
||
if not job or job.get("status") != "done":
|
||
raise HTTPException(404, "Ne pripravljen")
|
||
out = Path(job["output_path"])
|
||
if not out.exists():
|
||
raise HTTPException(404, "Output ne obstaja")
|
||
|
||
file_size = out.stat().st_size
|
||
range_header = request.headers.get("range") or request.headers.get("Range")
|
||
|
||
if range_header:
|
||
# Parse "bytes=START-END"
|
||
try:
|
||
range_str = range_header.replace("bytes=", "").strip()
|
||
start_s, end_s = range_str.split("-")
|
||
start = int(start_s) if start_s else 0
|
||
end = int(end_s) if end_s else file_size - 1
|
||
end = min(end, file_size - 1)
|
||
if start > end or start >= file_size:
|
||
return Response(status_code=416) # Range Not Satisfiable
|
||
chunk_size = end - start + 1
|
||
|
||
def iter_file():
|
||
with open(out, "rb") as f:
|
||
f.seek(start)
|
||
remaining = chunk_size
|
||
while remaining > 0:
|
||
read_size = min(64 * 1024, remaining)
|
||
data = f.read(read_size)
|
||
if not data:
|
||
break
|
||
remaining -= len(data)
|
||
yield data
|
||
|
||
headers = {
|
||
"Content-Range": f"bytes {start}-{end}/{file_size}",
|
||
"Accept-Ranges": "bytes",
|
||
"Content-Length": str(chunk_size),
|
||
"Content-Type": "video/mp4",
|
||
}
|
||
return StreamingResponse(iter_file(), status_code=206, headers=headers,
|
||
media_type="video/mp4")
|
||
except (ValueError, IndexError):
|
||
pass
|
||
|
||
# Brez Range — vrni cel file
|
||
return FileResponse(
|
||
out,
|
||
media_type="video/mp4",
|
||
headers={"Accept-Ranges": "bytes", "Content-Length": str(file_size)},
|
||
)
|
||
|
||
|
||
@app.delete("/api/jobs/{job_id}")
|
||
async def delete_job(job_id: str, user: str = Depends(check_auth)):
|
||
job = load_job(job_id)
|
||
if not job:
|
||
raise HTTPException(404, "Ne obstaja")
|
||
for key in ("input_path", "output_path"):
|
||
p = job.get(key)
|
||
if p and Path(p).exists():
|
||
Path(p).unlink(missing_ok=True)
|
||
job_path(job_id).unlink(missing_ok=True)
|
||
return {"deleted": job_id}
|