Changes:
1. Frontend multi-upload:
- File input now has 'multiple' attribute, drag-drop accepts multiple
- File queue list with per-file artist/title preview + remove button
- 'Pošlji vse' uploads sequentially (one at a time to avoid network saturation)
- Each file gets same batch_id for Telegram batch summary
- After upload, queue clears, jobs appear in right sidebar
2. Backend queue worker:
- New _queue_worker() background thread processes 'queued' jobs sequentially
- Only 1 job at a time to keep openclaw stable (avoid CPU/RAM thrash)
- FIFO order by created_at
- Auto-starts on app startup after job resume
3. Job submission flow change:
- /api/process and /api/youtube no longer call background.add_task directly
- Just mark status='queued', queue worker picks up
- This means upload completes fast, processing happens in background
- User can close browser, jobs continue
4. Telegram notifications (FOLX Alerts bot):
- Per-job: 'Reel pripravljen: Lady Gaga - Abracadabra (29s, 30 MB)'
- Per-job failed: 'Reel ni uspel: <name> + error message'
- Batch summary: 'Batch končan: 10/10 reels pripravljeni' (only if >1 in batch)
- Uses existing TELEGRAM_TOKEN + TELEGRAM_CHAT_ID env vars
- app/telegram.py module with notify_job_done(), notify_job_failed(),
notify_batch_complete()
5. batch_id field:
- Added to Job model + StartJobIn pydantic
- Saved during upload + process
- Used to count batch progress and trigger summary notification
User experience:
- Drag 20 videos at once
- Click 'Pošlji'
- Close browser, go grab coffee
- Telegram sends 'Reel pripravljen' for each
- After all done: 'Batch končan: 20/20 reels pripravljeni' summary
- Open app to download all
1045 lines
42 KiB
Python
1045 lines
42 KiB
Python
"""
|
||
reels.biba.live — FastAPI backend.
|
||
|
||
Endpoints:
|
||
GET / — frontend HTML
|
||
POST /api/upload — naloži video file
|
||
POST /api/youtube — submit YouTube URL
|
||
POST /api/process/{id} — start processing job
|
||
GET /api/jobs — list vseh jobov
|
||
GET /api/jobs/{id} — status job-a
|
||
GET /api/stream/{id} — SSE progress stream
|
||
GET /api/download/{id} — download finalni reel
|
||
GET /api/preview/{id} — preview video stream
|
||
DELETE /api/jobs/{id} — pobriši job + datoteke
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import secrets
|
||
import shutil
|
||
import subprocess
|
||
import time
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from fastapi import (
|
||
FastAPI, UploadFile, File, Form, HTTPException, Depends,
|
||
BackgroundTasks, Request, status
|
||
)
|
||
from fastapi.responses import (
|
||
FileResponse, HTMLResponse, StreamingResponse, JSONResponse, Response
|
||
)
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||
from pydantic import BaseModel
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Config
|
||
# ────────────────────────────────────────────────────────────────
|
||
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
|
||
UPLOAD_DIR = DATA_DIR / "uploads"
|
||
OUTPUT_DIR = DATA_DIR / "outputs"
|
||
JOBS_DIR = DATA_DIR / "jobs"
|
||
SCRIPTS_DIR = Path(__file__).parent.parent / "scripts"
|
||
|
||
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
JOBS_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
AUTH_USER = os.environ.get("AUTH_USER", "sebastjan")
|
||
AUTH_PASS = os.environ.get("AUTH_PASS", "change-me-in-coolify-env")
|
||
|
||
MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "2000"))
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Auth
|
||
# ────────────────────────────────────────────────────────────────
|
||
security = HTTPBasic()
|
||
|
||
|
||
def check_auth(creds: HTTPBasicCredentials = Depends(security)):
|
||
correct_user = secrets.compare_digest(creds.username, AUTH_USER)
|
||
correct_pass = secrets.compare_digest(creds.password, AUTH_PASS)
|
||
if not (correct_user and correct_pass):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Napačno geslo",
|
||
headers={"WWW-Authenticate": "Basic"},
|
||
)
|
||
return creds.username
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Artist + title parsing iz filename / YouTube title
|
||
# ────────────────────────────────────────────────────────────────
|
||
import re
|
||
|
||
_NOISE_PATTERNS = [
|
||
# Pogosti "noise" ki ga je treba odstraniti
|
||
r"\(Official\s+(?:Music\s+)?Video\)",
|
||
r"\(Officia[lk]\s+Audio\)",
|
||
r"\(Offizielles\s+(?:Musik)?[Vv]ideo\)",
|
||
r"\(Lyric[s]?\s+Video\)",
|
||
r"\(Audio\)",
|
||
r"\(HD\)", r"\(HQ\)", r"\(4K\)",
|
||
r"\(Live\)", r"\(Remix\)",
|
||
r"\(Remastered\)", r"\(Remaster(?:ed)?\s*\d{0,4}\)",
|
||
r"\[Official.*?\]", r"\[Music.*?\]", r"\[Audio.*?\]",
|
||
r"\bofficial\s+video\b", r"\bofficial\s+audio\b",
|
||
r"\boriginal\s+(?:video|audio)\b",
|
||
r"\bMV\b", r"\b4K\b", r"\bHD\b", r"\bHQ\b",
|
||
]
|
||
|
||
def parse_artist_title(filename_or_title):
|
||
"""Iz imena datoteke / YouTube naslova ekstrahira (artist, title).
|
||
|
||
Podpira pogoste vzorce:
|
||
- "Artist - Title.mp4"
|
||
- "Artist - Title (Official Music Video).mp4"
|
||
- "Artist – Title" (em-dash)
|
||
- "Artist | Title"
|
||
|
||
Vrne (artist, title) ali (None, None) če ni razvidno.
|
||
"""
|
||
if not filename_or_title:
|
||
return (None, None)
|
||
|
||
# Odstrani extension
|
||
name = Path(filename_or_title).stem if "." in filename_or_title else filename_or_title
|
||
|
||
# Odstrani noise patterns
|
||
for pat in _NOISE_PATTERNS:
|
||
name = re.sub(pat, "", name, flags=re.IGNORECASE)
|
||
|
||
# Normaliziraj presledke
|
||
name = re.sub(r"\s+", " ", name).strip()
|
||
|
||
# Probaj različne separatorje
|
||
for sep in [" - ", " – ", " — ", " | ", " : "]:
|
||
if sep in name:
|
||
parts = name.split(sep, 1)
|
||
artist = parts[0].strip()
|
||
title = parts[1].strip()
|
||
# Strip trailing/leading puncutation
|
||
artist = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', artist)
|
||
title = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', title)
|
||
if artist and title and len(artist) <= 80 and len(title) <= 100:
|
||
return (artist, title)
|
||
|
||
return (None, None)
|
||
|
||
|
||
def safe_filename(s, max_len=80):
|
||
"""Naredi varno ime datoteke (brez znakov ki bi razbili FS)."""
|
||
if not s:
|
||
return ""
|
||
# Replace problematic chars with safe alternative
|
||
s = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', s)
|
||
s = re.sub(r'\s+', ' ', s).strip()
|
||
return s[:max_len]
|
||
|
||
|
||
def build_download_filename(job):
|
||
"""Sestavi pravilno ime download datoteke iz job metadata."""
|
||
# Najprej probaj job-shranjene parsed values
|
||
artist = job.get("parsed_artist")
|
||
title = job.get("parsed_title")
|
||
|
||
# Fallback: parse from filename
|
||
if not artist or not title:
|
||
source = job.get("filename") or job.get("youtube_title") or ""
|
||
parsed_artist, parsed_title = parse_artist_title(source)
|
||
artist = artist or parsed_artist
|
||
title = title or parsed_title
|
||
|
||
if artist and title:
|
||
return f"{safe_filename(artist)} - {safe_filename(title)} - REEL.mp4"
|
||
if title:
|
||
return f"{safe_filename(title)} - REEL.mp4"
|
||
# Last resort: job ID (vendar to bi se moralo preprečiti že ob upload-u)
|
||
return f"reel_{job['id']}.mp4"
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Job state (filesystem-based, persistent prek restartov)
|
||
# ────────────────────────────────────────────────────────────────
|
||
def job_path(job_id):
|
||
return JOBS_DIR / f"{job_id}.json"
|
||
|
||
|
||
def load_job(job_id):
|
||
p = job_path(job_id)
|
||
if not p.exists():
|
||
return None
|
||
return json.loads(p.read_text())
|
||
|
||
|
||
def save_job(job):
|
||
job_path(job["id"]).write_text(json.dumps(job, ensure_ascii=False, indent=2))
|
||
|
||
|
||
def update_job(job_id, **kwargs):
|
||
job = load_job(job_id)
|
||
if not job:
|
||
return None
|
||
job.update(kwargs)
|
||
job["updated_at"] = time.time()
|
||
save_job(job)
|
||
return job
|
||
|
||
|
||
def list_jobs():
|
||
out = []
|
||
for f in sorted(JOBS_DIR.glob("*.json"), reverse=True):
|
||
try:
|
||
out.append(json.loads(f.read_text()))
|
||
except Exception:
|
||
pass
|
||
return out
|
||
|
||
|
||
def generate_srt_from_segments(segments, clip_start, clip_end, output_path):
|
||
"""Generira SRT samo za dele, ki spadajo v [clip_start, clip_end].
|
||
|
||
Timestamp-i so re-mapirani na 0-based (kot je v trim-anem videu).
|
||
Razdeli dolge segmente (>2.5s) na enake kose za hiter pacing v reels stilu.
|
||
Vse besedilo VELIKE TISKANE ČRKE.
|
||
"""
|
||
MAX_CHUNK_DURATION = 2.5
|
||
|
||
def fmt_ts(s):
|
||
h = int(s // 3600)
|
||
m = int((s % 3600) // 60)
|
||
sec = s % 60
|
||
return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",")
|
||
|
||
lines = []
|
||
idx = 1
|
||
|
||
for seg in segments:
|
||
s_start = float(seg["start"])
|
||
s_end = float(seg["end"])
|
||
text = str(seg["text"]).strip()
|
||
|
||
# Filter v range
|
||
if s_end <= clip_start or s_start >= clip_end:
|
||
continue
|
||
# Klipni
|
||
s_start = max(s_start, clip_start)
|
||
s_end = min(s_end, clip_end)
|
||
if s_end - s_start < 0.2:
|
||
continue
|
||
|
||
# Re-mapraj na 0-based
|
||
rel_start = s_start - clip_start
|
||
rel_end = s_end - clip_start
|
||
|
||
if not text:
|
||
continue
|
||
text_upper = text.upper()
|
||
|
||
# Razdeli na chunk-e če je predolg
|
||
duration = rel_end - rel_start
|
||
if duration <= MAX_CHUNK_DURATION:
|
||
lines.append(f"{idx}\n{fmt_ts(rel_start)} --> {fmt_ts(rel_end)}\n{text_upper}\n")
|
||
idx += 1
|
||
else:
|
||
# Razdeli na N enakih kosov; če ima Whisper word-timing, jih lahko razdelimo bolje,
|
||
# ampak za zdaj enako razdelimo
|
||
n_parts = int(duration / MAX_CHUNK_DURATION) + 1
|
||
words = text_upper.split()
|
||
words_per_part = max(1, len(words) // n_parts)
|
||
chunk_dur = duration / n_parts
|
||
for i in range(n_parts):
|
||
cs = rel_start + i * chunk_dur
|
||
ce = rel_start + (i + 1) * chunk_dur
|
||
# Vzemi pripadajoče besede
|
||
wstart = i * words_per_part
|
||
wend = (i + 1) * words_per_part if i < n_parts - 1 else len(words)
|
||
chunk_text = " ".join(words[wstart:wend]) if wstart < len(words) else text_upper
|
||
if not chunk_text.strip():
|
||
chunk_text = text_upper
|
||
lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text.strip()}\n")
|
||
idx += 1
|
||
|
||
with open(output_path, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(lines))
|
||
return output_path
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Pipeline runner (background task)
|
||
# ────────────────────────────────────────────────────────────────
|
||
def run_subprocess_logged(cmd, job_id, step_name):
|
||
"""Pokliče subprocess, logi gredo v job."""
|
||
update_job(job_id, current_step=step_name, status="processing")
|
||
proc = subprocess.run(cmd, capture_output=True, text=True)
|
||
if proc.returncode != 0:
|
||
# Combine stdout + stderr za diagnostiko
|
||
err_msg = (proc.stderr or "") + "\n" + (proc.stdout or "")
|
||
update_job(
|
||
job_id,
|
||
status="failed",
|
||
error=f"{step_name}: {err_msg[-800:].strip()}",
|
||
)
|
||
return False
|
||
# Tudi pri success-u beleži stderr za diagnostiko (samo zadnji del)
|
||
if proc.stderr and proc.stderr.strip():
|
||
update_job(job_id, last_step_log=proc.stderr[-500:].strip())
|
||
return True
|
||
|
||
|
||
def process_job(job_id):
|
||
"""Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs."""
|
||
job = load_job(job_id)
|
||
if not job:
|
||
return
|
||
|
||
try:
|
||
# ── 1. Source preparation ─────────────────────────────
|
||
if job["source_type"] == "youtube":
|
||
update_job(job_id, status="downloading", current_step="YouTube download")
|
||
input_path = UPLOAD_DIR / f"{job_id}_yt.mp4"
|
||
cmd = [
|
||
"python3", str(SCRIPTS_DIR / "yt_download.py"),
|
||
job["youtube_url"], str(input_path),
|
||
]
|
||
if not run_subprocess_logged(cmd, job_id, "YouTube download"):
|
||
return
|
||
update_job(job_id, input_path=str(input_path))
|
||
|
||
# Probaj dobiti YT naslov za artist+title parsing
|
||
try:
|
||
info_cmd = [
|
||
"python3", str(SCRIPTS_DIR / "yt_download.py"),
|
||
job["youtube_url"], "/dev/null", "--info-only",
|
||
]
|
||
proc = subprocess.run(info_cmd, capture_output=True, text=True, timeout=30)
|
||
if proc.returncode == 0 and proc.stdout:
|
||
info = json.loads(proc.stdout)
|
||
yt_title = info.get("title", "")
|
||
if yt_title:
|
||
a, t = parse_artist_title(yt_title)
|
||
updates = {"youtube_title": yt_title}
|
||
if a:
|
||
updates["parsed_artist"] = a
|
||
if t:
|
||
updates["parsed_title"] = t
|
||
updates["has_clean_name"] = bool(a and t)
|
||
update_job(job_id, **updates)
|
||
# Reload job for downstream use
|
||
job = load_job(job_id)
|
||
except Exception as e:
|
||
print(f"⚠️ Cannot fetch YT title: {e}", flush=True)
|
||
else:
|
||
input_path = Path(job["input_path"])
|
||
|
||
# ── 1b. Music recognition (ACRCloud) — če nimamo artist+title ─────
|
||
# Tudi za YouTube jobs lahko naslov ni razviden (npr. iz playliste, "Track 5")
|
||
if not (job.get("parsed_artist") and job.get("parsed_title")):
|
||
update_job(job_id, current_step="Avto-prepoznavam pesem (ACRCloud)")
|
||
try:
|
||
acr_cmd = [
|
||
"python3", str(SCRIPTS_DIR / "acr_recognize.py"),
|
||
str(input_path),
|
||
]
|
||
proc = subprocess.run(acr_cmd, capture_output=True, text=True, timeout=120)
|
||
if proc.returncode == 0 and proc.stdout:
|
||
data = json.loads(proc.stdout)
|
||
a, t = data.get("artist"), data.get("title")
|
||
if a and t:
|
||
update_job(
|
||
job_id,
|
||
parsed_artist=a, parsed_title=t,
|
||
has_clean_name=True,
|
||
recognized_via="acrcloud",
|
||
)
|
||
job = load_job(job_id)
|
||
print(f"✅ ACR prepoznal: {a} - {t}", flush=True)
|
||
else:
|
||
print(f"⚠️ ACR ni prepoznal pesmi", flush=True)
|
||
else:
|
||
print(f"⚠️ ACR exit {proc.returncode}: {proc.stderr[:200]}", flush=True)
|
||
except Exception as e:
|
||
print(f"⚠️ ACR error: {e}", flush=True)
|
||
|
||
# ── 2. Smart analysis (če auto_chorus) ──────────────────────────
|
||
if job.get("auto_chorus"):
|
||
update_job(job_id, current_step="Analiza pesmi (transkript + energija)")
|
||
analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json"
|
||
cmd = [
|
||
"python3", str(SCRIPTS_DIR / "analyze.py"),
|
||
str(input_path),
|
||
"--target-duration", str(job.get("duration", 30)),
|
||
"--max-duration", str(job.get("max_duration", 45)),
|
||
"--min-duration", str(job.get("min_duration", 20)),
|
||
"--output", str(analysis_path),
|
||
]
|
||
if job.get("include_prebuild"):
|
||
cmd += ["--include-prebuild"]
|
||
# LLM provider (claude/gemini/auto)
|
||
if job.get("llm_provider"):
|
||
cmd += ["--llm-provider", job["llm_provider"]]
|
||
if job.get("llm_model"):
|
||
cmd += ["--llm-model", job["llm_model"]]
|
||
# Filename hint za Claude/Scribe — preferiraj parsed artist+title (čistejše)
|
||
if job.get("parsed_artist") and job.get("parsed_title"):
|
||
fn_hint = f"{job['parsed_artist']} - {job['parsed_title']}"
|
||
cmd += ["--filename-hint", fn_hint]
|
||
elif job.get("filename"):
|
||
fn_hint = Path(job["filename"]).stem
|
||
cmd += ["--filename-hint", fn_hint]
|
||
# STT provider (elevenlabs = Scribe, local = faster-whisper, auto = preferiraj Scribe)
|
||
if job.get("whisper_provider"):
|
||
cmd += ["--whisper-provider", job["whisper_provider"]]
|
||
# lang: če None ali 'auto', pusti analyze.py auto-detect
|
||
if job.get("lang") and job["lang"] not in ("auto", ""):
|
||
cmd += ["--lang", job["lang"]]
|
||
cmd += ["--model", job.get("whisper_model", "large-v3")]
|
||
|
||
proc = subprocess.run(cmd, capture_output=True, text=True)
|
||
srt_from_claude = None # Pot do SRT iz Claude-popravljenega transcript-a
|
||
if proc.returncode == 0 and analysis_path.exists():
|
||
try:
|
||
with open(analysis_path, "r", encoding="utf-8") as f:
|
||
analysis = json.load(f)
|
||
cr = analysis["clip_range"]
|
||
fade = analysis["fade"]
|
||
|
||
# Generiraj SRT iz transcript-a TRIM-ANEGA na clip_range
|
||
# (Claude je morda popravil besedilo — uporabi popravljeno)
|
||
if analysis.get("transcript", {}).get("segments"):
|
||
srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt"
|
||
try:
|
||
generate_srt_from_segments(
|
||
analysis["transcript"]["segments"],
|
||
cr["start"], cr["end"],
|
||
srt_path_out,
|
||
)
|
||
srt_from_claude = str(srt_path_out)
|
||
llm_src = cr.get("source", "LLM")
|
||
print(f"📝 Generated SRT from {llm_src} transcript: {srt_path_out}")
|
||
except Exception as e:
|
||
print(f"⚠️ SRT generation failed: {e}")
|
||
|
||
update_job(
|
||
job_id,
|
||
analysis_summary={
|
||
"language": analysis["language"],
|
||
"language_probability": analysis["language_probability"],
|
||
"instrumental": analysis["instrumental"],
|
||
"clip_range": cr,
|
||
"fade": fade,
|
||
"chorus_preview": analysis["chorus"]["best"]["text_preview"]
|
||
if analysis.get("chorus") and analysis["chorus"].get("best") else None,
|
||
"video_duration": analysis.get("video_duration"),
|
||
"candidates": analysis["chorus"].get("all_candidates", [])[:5]
|
||
if analysis.get("chorus") else [],
|
||
"claude_corrected_text": analysis.get("claude_corrected_text", False),
|
||
},
|
||
# Cel transkript shranimo za UI prikaz
|
||
full_transcript=[
|
||
{"start": s["start"], "end": s["end"], "text": s["text"]}
|
||
for s in analysis.get("transcript", {}).get("segments", [])
|
||
],
|
||
start=cr["start"],
|
||
duration=cr["duration"],
|
||
fade_in=fade["fade_in"],
|
||
fade_out=fade["fade_out"],
|
||
detected_language=analysis["language"],
|
||
is_instrumental=analysis["instrumental"],
|
||
claude_srt_path=srt_from_claude,
|
||
)
|
||
# Auto-disable subs za instrumental
|
||
if analysis["instrumental"] and not job.get("no_subs"):
|
||
update_job(job_id, no_subs=True, auto_disabled_subs=True)
|
||
# Reload local dict
|
||
job = load_job(job_id)
|
||
except (json.JSONDecodeError, KeyError) as e:
|
||
update_job(job_id, chorus_error=f"Analysis parse: {e}")
|
||
else:
|
||
update_job(job_id, chorus_error=(proc.stderr or "")[-500:])
|
||
|
||
# ── 3. Reframe + subtitles (clip.py orchestrator) ─────
|
||
output_path = OUTPUT_DIR / f"{job_id}.mp4"
|
||
update_job(job_id, current_step="Reframe + subtitles")
|
||
|
||
cmd = [
|
||
"python3", str(SCRIPTS_DIR / "clip.py"),
|
||
str(input_path), str(output_path),
|
||
"--mode", job.get("mode", "track"),
|
||
"--quality", job.get("quality", "medium"),
|
||
"--style", job.get("subtitle_style", "reels"),
|
||
]
|
||
if job.get("start") is not None:
|
||
cmd += ["--start", str(job["start"])]
|
||
if job.get("duration") is not None:
|
||
cmd += ["--duration", str(job["duration"])]
|
||
if job.get("fade_in", 0) > 0:
|
||
cmd += ["--fade-in", str(job["fade_in"])]
|
||
if job.get("fade_out", 0) > 0:
|
||
cmd += ["--fade-out", str(job["fade_out"])]
|
||
# SRT iz Claude (boljše besedilo) — preda direktno v subtitle.py
|
||
if job.get("claude_srt_path") and Path(job["claude_srt_path"]).exists() and not job.get("no_subs"):
|
||
cmd += ["--srt", job["claude_srt_path"]]
|
||
# lang: prefer detected_language če auto
|
||
chosen_lang = job.get("lang")
|
||
if chosen_lang in (None, "auto", ""):
|
||
chosen_lang = job.get("detected_language")
|
||
if chosen_lang:
|
||
cmd += ["--lang", chosen_lang]
|
||
if job.get("no_subs"):
|
||
cmd += ["--no-subs"]
|
||
cmd += ["--model", job.get("whisper_model", "large-v3")]
|
||
|
||
# DEBUG: zapiši natanko kakšen ukaz se izvede
|
||
update_job(job_id, debug_clip_cmd=" ".join(cmd))
|
||
|
||
if not run_subprocess_logged(cmd, job_id, "Reframe + subtitles"):
|
||
return
|
||
|
||
# ── Done ──────────────────────────────────────────────
|
||
if output_path.exists():
|
||
update_job(
|
||
job_id,
|
||
status="done",
|
||
current_step="Končano",
|
||
output_path=str(output_path),
|
||
output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
|
||
)
|
||
# Telegram obvestilo
|
||
try:
|
||
from app.telegram import notify_job_done
|
||
final_job = load_job(job_id)
|
||
notify_job_done(final_job)
|
||
except Exception as e:
|
||
print(f"⚠️ TG notify_job_done failed: {e}", flush=True)
|
||
|
||
# Batch tracking — če je zadnji v batchu, pošlji summary
|
||
_try_finalize_batch(job_id)
|
||
else:
|
||
update_job(
|
||
job_id,
|
||
status="failed",
|
||
error="Output datoteka ne obstaja po obdelavi",
|
||
)
|
||
try:
|
||
from app.telegram import notify_job_failed
|
||
notify_job_failed(load_job(job_id), "Output datoteka ne obstaja")
|
||
except Exception:
|
||
pass
|
||
_try_finalize_batch(job_id)
|
||
except Exception as e:
|
||
update_job(job_id, status="failed", error=str(e))
|
||
try:
|
||
from app.telegram import notify_job_failed
|
||
notify_job_failed(load_job(job_id), str(e))
|
||
except Exception:
|
||
pass
|
||
_try_finalize_batch(job_id)
|
||
|
||
|
||
def _try_finalize_batch(job_id):
|
||
"""Če je job del batch-a, preveri če so vsi zaključili in pošlji summary."""
|
||
try:
|
||
job = load_job(job_id)
|
||
batch_id = job.get("batch_id")
|
||
if not batch_id:
|
||
return
|
||
# Preštej batch jobe
|
||
all_jobs = [load_job(jp.stem) for jp in JOBS_DIR.glob("*.json")]
|
||
batch_jobs = [j for j in all_jobs if j and j.get("batch_id") == batch_id]
|
||
unfinished = [j for j in batch_jobs if j.get("status") not in ("done", "failed")]
|
||
if unfinished:
|
||
return # še niso vsi končani
|
||
# Vsi so končani — pošlji summary
|
||
total = len(batch_jobs)
|
||
succeeded = len([j for j in batch_jobs if j.get("status") == "done"])
|
||
failed = total - succeeded
|
||
try:
|
||
from app.telegram import notify_batch_complete
|
||
notify_batch_complete(batch_id, total, succeeded, failed)
|
||
except Exception as e:
|
||
print(f"⚠️ TG batch summary failed: {e}", flush=True)
|
||
except Exception as e:
|
||
print(f"⚠️ _try_finalize_batch error: {e}", flush=True)
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# FastAPI app
|
||
# ────────────────────────────────────────────────────────────────
|
||
app = FastAPI(title="Reels Clipper")
|
||
app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "static"), name="static")
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Queue worker — procesira queued jobe enega za drugim
|
||
# ────────────────────────────────────────────────────────────────
|
||
import threading
|
||
_queue_lock = threading.Lock()
|
||
_queue_running = {"current_job": None} # tracked v dict da je accessible iz threadov
|
||
|
||
|
||
def _queue_worker():
|
||
"""Background thread ki preverja queued jobe in jih obdeluje 1 po 1.
|
||
|
||
Teče forever, sleep 3s med iteracijami če ni dela.
|
||
"""
|
||
print("🚜 Queue worker zagnan", flush=True)
|
||
while True:
|
||
try:
|
||
# Že nekaj v obdelavi? Počakaj.
|
||
if _queue_running["current_job"]:
|
||
# Preverim ali je status še processing
|
||
cur = load_job(_queue_running["current_job"])
|
||
if cur and cur.get("status") in ("processing", "downloading"):
|
||
time.sleep(3)
|
||
continue
|
||
# Ni več v obdelavi → sprosti
|
||
_queue_running["current_job"] = None
|
||
|
||
# Najdi naslednjega "queued" joba (FIFO po created_at)
|
||
queued_jobs = []
|
||
for f in JOBS_DIR.glob("*.json"):
|
||
try:
|
||
j = json.loads(f.read_text())
|
||
if j.get("status") == "queued":
|
||
queued_jobs.append(j)
|
||
except Exception:
|
||
continue
|
||
|
||
if not queued_jobs:
|
||
time.sleep(3)
|
||
continue
|
||
|
||
queued_jobs.sort(key=lambda x: x.get("created_at", 0))
|
||
next_job = queued_jobs[0]
|
||
job_id = next_job["id"]
|
||
|
||
# Mark "processing" + zaženi
|
||
with _queue_lock:
|
||
_queue_running["current_job"] = job_id
|
||
|
||
print(f"🚜 Queue worker: obdelujem {job_id}", flush=True)
|
||
try:
|
||
process_job(job_id)
|
||
except Exception as e:
|
||
print(f"❌ Queue worker error pri {job_id}: {e}", flush=True)
|
||
update_job(job_id, status="failed", error=f"Queue worker: {e}")
|
||
finally:
|
||
with _queue_lock:
|
||
_queue_running["current_job"] = None
|
||
except Exception as e:
|
||
print(f"❌ Queue worker outer error: {e}", flush=True)
|
||
time.sleep(5)
|
||
|
||
|
||
# Zaženi worker v ozadju (samo enkrat)
|
||
_worker_thread = None
|
||
def _start_queue_worker():
|
||
global _worker_thread
|
||
if _worker_thread is None or not _worker_thread.is_alive():
|
||
_worker_thread = threading.Thread(target=_queue_worker, daemon=True)
|
||
_worker_thread.start()
|
||
|
||
|
||
@app.on_event("startup")
|
||
async def resume_or_cleanup_jobs():
|
||
"""Ob startu containerja: avto-resume processing jobs ali jih označi kot error.
|
||
|
||
Ko Coolify deploya nov container, prejšnji se ubije sredi obdelave,
|
||
JSON file pa ostane status='processing'.
|
||
|
||
Strategija:
|
||
- Če je analyze.json že narejen (analiza je končana) → resume z reframe+subs
|
||
- Če ni analyze.json → restart pipeline od začetka
|
||
- Po 3 napakah (resume_attempts >= 3) → mark error
|
||
"""
|
||
import asyncio
|
||
print("🔄 Preverjam in obnavljam jobs po restart-u...")
|
||
resumed_count = 0
|
||
error_count = 0
|
||
|
||
for f in JOBS_DIR.glob("*.json"):
|
||
try:
|
||
j = json.loads(f.read_text())
|
||
if j.get("status") != "processing":
|
||
continue
|
||
|
||
job_id = j.get("job_id") or f.stem
|
||
attempts = j.get("resume_attempts", 0)
|
||
|
||
# Po 3 neuspehih nehamo
|
||
if attempts >= 3:
|
||
j["status"] = "error"
|
||
j["current_step"] = "Preveč napak pri ponovnem zagonu"
|
||
j["chorus_error"] = f"Job restartal {attempts} krat — napaka v pipeline-u"
|
||
j["updated_at"] = time.time()
|
||
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
|
||
error_count += 1
|
||
continue
|
||
|
||
# Preveri ali input file še obstaja
|
||
input_path = UPLOAD_DIR / f"{job_id}.mp4"
|
||
if not input_path.exists():
|
||
j["status"] = "error"
|
||
j["current_step"] = "Vhodna datoteka ne obstaja"
|
||
j["chorus_error"] = f"Upload {job_id}.mp4 ne obstaja po restart-u"
|
||
j["updated_at"] = time.time()
|
||
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
|
||
error_count += 1
|
||
continue
|
||
|
||
# Resume: status=queued, +1 attempt, in pošlji v background
|
||
j["status"] = "queued"
|
||
j["resume_attempts"] = attempts + 1
|
||
j["current_step"] = f"Avto-resume po restart-u (poskus {attempts + 1}/3)"
|
||
j["last_resume_at"] = time.time()
|
||
j["updated_at"] = time.time()
|
||
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
|
||
resumed_count += 1
|
||
# Pošlji v background po startup-u (ne smemo blokirati startup)
|
||
asyncio.create_task(_resume_job_async(job_id))
|
||
print(f" 🔁 Resume {job_id} (attempt {attempts + 1}/3)")
|
||
except Exception as e:
|
||
print(f" ⚠️ Napaka pri {f.name}: {e}")
|
||
|
||
print(f" ✅ Resumed: {resumed_count}, Error: {error_count}")
|
||
|
||
# Zaženi queue worker za queued jobe (multi-upload batch)
|
||
_start_queue_worker()
|
||
|
||
|
||
async def _resume_job_async(job_id):
|
||
"""Pomožna funkcija ki zažene process_job v background-u."""
|
||
import asyncio
|
||
# Počakaj kratek čas da je startup končan
|
||
await asyncio.sleep(2)
|
||
try:
|
||
# process_job je sync funkcija, izvedi v thread executor
|
||
loop = asyncio.get_event_loop()
|
||
await loop.run_in_executor(None, process_job, job_id)
|
||
except Exception as e:
|
||
print(f" ❌ Resume failed for {job_id}: {e}")
|
||
|
||
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def index(user: str = Depends(check_auth)):
|
||
html = (Path(__file__).parent.parent / "templates" / "index.html").read_text()
|
||
return html
|
||
|
||
|
||
@app.get("/healthz")
|
||
async def healthz():
|
||
return {"ok": True}
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Job models
|
||
# ────────────────────────────────────────────────────────────────
|
||
class YouTubeJobIn(BaseModel):
|
||
url: str
|
||
mode: str = "track"
|
||
lang: Optional[str] = None
|
||
auto_chorus: bool = True
|
||
start: Optional[float] = None
|
||
duration: Optional[float] = 30
|
||
no_subs: bool = False
|
||
subtitle_style: str = "reels"
|
||
whisper_model: str = "large-v3"
|
||
quality: str = "medium"
|
||
|
||
|
||
class StartJobIn(BaseModel):
|
||
job_id: str
|
||
mode: str = "track"
|
||
lang: Optional[str] = None # None/auto = Whisper auto-detect
|
||
auto_chorus: bool = True
|
||
include_prebuild: bool = False # vključi pre-chorus build-up
|
||
start: Optional[float] = None
|
||
duration: Optional[float] = 30
|
||
max_duration: Optional[float] = 45
|
||
min_duration: Optional[float] = 20
|
||
no_subs: bool = False
|
||
subtitle_style: str = "reels"
|
||
whisper_model: str = "large-v3"
|
||
quality: str = "medium"
|
||
# LLM za semantično analizo + popravke
|
||
llm_provider: str = "claude" # claude / gemini / auto
|
||
llm_model: Optional[str] = None # specifičen model (privzeto najboljši za provider)
|
||
# STT provider (Scribe je 18x hitreje + boljša multilingual accuracy)
|
||
whisper_provider: str = "auto" # auto / elevenlabs / local
|
||
# Batch tracking za multi-upload (Telegram summary)
|
||
batch_id: Optional[str] = None
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Upload (file)
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.post("/api/upload")
|
||
async def upload_video(
|
||
file: UploadFile = File(...),
|
||
artist: Optional[str] = Form(None),
|
||
title: Optional[str] = Form(None),
|
||
batch_id: Optional[str] = Form(None),
|
||
user: str = Depends(check_auth),
|
||
):
|
||
if not file.filename:
|
||
raise HTTPException(400, "Brez imena")
|
||
|
||
job_id = uuid.uuid4().hex[:12]
|
||
ext = Path(file.filename).suffix or ".mp4"
|
||
input_path = UPLOAD_DIR / f"{job_id}{ext}"
|
||
|
||
size = 0
|
||
with input_path.open("wb") as f:
|
||
while chunk := await file.read(1024 * 1024):
|
||
size += len(chunk)
|
||
if size > MAX_UPLOAD_MB * 1024 * 1024:
|
||
f.close()
|
||
input_path.unlink(missing_ok=True)
|
||
raise HTTPException(413, f"Prevelika datoteka (limit {MAX_UPLOAD_MB} MB)")
|
||
f.write(chunk)
|
||
|
||
job = {
|
||
"id": job_id,
|
||
"source_type": "upload",
|
||
"filename": file.filename,
|
||
"input_path": str(input_path),
|
||
"size_mb": round(size / 1024 / 1024, 2),
|
||
"status": "uploaded",
|
||
"current_step": "Naloženo, čaka na obdelavo",
|
||
"created_at": time.time(),
|
||
"updated_at": time.time(),
|
||
}
|
||
|
||
if batch_id:
|
||
job["batch_id"] = batch_id
|
||
|
||
# Artist + title — najprej user-provided, potem parse iz filename
|
||
if artist and title:
|
||
# User je vpisal ali potrdil
|
||
job["parsed_artist"] = artist.strip()
|
||
job["parsed_title"] = title.strip()
|
||
job["has_clean_name"] = True
|
||
else:
|
||
# Auto parse iz filename
|
||
a, t = parse_artist_title(file.filename)
|
||
if a:
|
||
job["parsed_artist"] = a
|
||
if t:
|
||
job["parsed_title"] = t
|
||
job["has_clean_name"] = bool(a and t)
|
||
|
||
save_job(job)
|
||
return job
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# YouTube submit
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.post("/api/youtube")
|
||
async def submit_youtube(
|
||
payload: YouTubeJobIn,
|
||
background: BackgroundTasks,
|
||
user: str = Depends(check_auth),
|
||
):
|
||
job_id = uuid.uuid4().hex[:12]
|
||
job = {
|
||
"id": job_id,
|
||
"source_type": "youtube",
|
||
"youtube_url": payload.url,
|
||
"status": "queued",
|
||
"current_step": "V vrsti za YouTube prenos",
|
||
"created_at": time.time(),
|
||
"updated_at": time.time(),
|
||
"mode": payload.mode,
|
||
"lang": payload.lang,
|
||
"auto_chorus": payload.auto_chorus,
|
||
"start": payload.start,
|
||
"duration": payload.duration,
|
||
"no_subs": payload.no_subs,
|
||
"subtitle_style": payload.subtitle_style,
|
||
"whisper_model": payload.whisper_model,
|
||
"quality": payload.quality,
|
||
}
|
||
save_job(job)
|
||
# Queue worker bo pograbil
|
||
return job
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Start processing for uploaded job
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.post("/api/process")
|
||
async def start_processing(
|
||
payload: StartJobIn,
|
||
background: BackgroundTasks,
|
||
user: str = Depends(check_auth),
|
||
):
|
||
job = load_job(payload.job_id)
|
||
if not job:
|
||
raise HTTPException(404, "Job ne obstaja")
|
||
|
||
update_job(
|
||
payload.job_id,
|
||
status="queued",
|
||
mode=payload.mode,
|
||
lang=payload.lang,
|
||
auto_chorus=payload.auto_chorus,
|
||
include_prebuild=payload.include_prebuild,
|
||
start=payload.start,
|
||
duration=payload.duration,
|
||
max_duration=payload.max_duration,
|
||
min_duration=payload.min_duration,
|
||
no_subs=payload.no_subs,
|
||
subtitle_style=payload.subtitle_style,
|
||
whisper_model=payload.whisper_model,
|
||
quality=payload.quality,
|
||
llm_provider=payload.llm_provider,
|
||
llm_model=payload.llm_model,
|
||
whisper_provider=payload.whisper_provider,
|
||
batch_id=payload.batch_id,
|
||
current_step="V vrsti za obdelavo",
|
||
# Počisti pretekle napake (retry-friendly)
|
||
chorus_error=None,
|
||
interrupted_at=None,
|
||
)
|
||
# Queue worker (background thread) bo pograbil ta job — ne zaganjamo neposredno.
|
||
# To pomeni, da se ob več upload-ih obdelujejo zaporedno (1 hkrati = stabilno).
|
||
return load_job(payload.job_id)
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Job queries
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.get("/api/jobs")
|
||
async def get_jobs(user: str = Depends(check_auth)):
|
||
return {"jobs": list_jobs()}
|
||
|
||
|
||
@app.get("/api/jobs/{job_id}")
|
||
async def get_job(job_id: str, user: str = Depends(check_auth)):
|
||
job = load_job(job_id)
|
||
if not job:
|
||
raise HTTPException(404, "Ne obstaja")
|
||
return job
|
||
|
||
|
||
@app.get("/api/stream/{job_id}")
|
||
async def stream_job(job_id: str, user: str = Depends(check_auth)):
|
||
"""Server-Sent Events za real-time status."""
|
||
|
||
async def gen():
|
||
last_status = None
|
||
last_step = None
|
||
for _ in range(600): # max 10 min stream
|
||
job = load_job(job_id)
|
||
if not job:
|
||
yield f"data: {json.dumps({'error': 'not found'})}\n\n"
|
||
return
|
||
if job["status"] != last_status or job.get("current_step") != last_step:
|
||
yield f"data: {json.dumps(job, ensure_ascii=False)}\n\n"
|
||
last_status = job["status"]
|
||
last_step = job.get("current_step")
|
||
if job["status"] in ("done", "failed"):
|
||
return
|
||
await asyncio.sleep(1)
|
||
|
||
return StreamingResponse(gen(), media_type="text/event-stream")
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────
|
||
# Download / preview
|
||
# ────────────────────────────────────────────────────────────────
|
||
@app.get("/api/download/{job_id}")
|
||
async def download(job_id: str, user: str = Depends(check_auth)):
|
||
job = load_job(job_id)
|
||
if not job or job.get("status") != "done":
|
||
raise HTTPException(404, "Ne pripravljen")
|
||
out = Path(job["output_path"])
|
||
if not out.exists():
|
||
raise HTTPException(404, "Output ne obstaja")
|
||
|
||
# Pametno ime: "Izvajalec - Naslov - REEL.mp4"
|
||
download_name = build_download_filename(job)
|
||
|
||
return FileResponse(
|
||
out,
|
||
media_type="video/mp4",
|
||
filename=download_name,
|
||
)
|
||
|
||
|
||
@app.get("/api/preview/{job_id}")
|
||
async def preview(job_id: str, request: Request, user: str = Depends(check_auth)):
|
||
"""Video preview z Range request podporo (potrebno za HTML5 video player)."""
|
||
job = load_job(job_id)
|
||
if not job or job.get("status") != "done":
|
||
raise HTTPException(404, "Ne pripravljen")
|
||
out = Path(job["output_path"])
|
||
if not out.exists():
|
||
raise HTTPException(404, "Output ne obstaja")
|
||
|
||
file_size = out.stat().st_size
|
||
range_header = request.headers.get("range") or request.headers.get("Range")
|
||
|
||
if range_header:
|
||
# Parse "bytes=START-END"
|
||
try:
|
||
range_str = range_header.replace("bytes=", "").strip()
|
||
start_s, end_s = range_str.split("-")
|
||
start = int(start_s) if start_s else 0
|
||
end = int(end_s) if end_s else file_size - 1
|
||
end = min(end, file_size - 1)
|
||
if start > end or start >= file_size:
|
||
return Response(status_code=416) # Range Not Satisfiable
|
||
chunk_size = end - start + 1
|
||
|
||
def iter_file():
|
||
with open(out, "rb") as f:
|
||
f.seek(start)
|
||
remaining = chunk_size
|
||
while remaining > 0:
|
||
read_size = min(64 * 1024, remaining)
|
||
data = f.read(read_size)
|
||
if not data:
|
||
break
|
||
remaining -= len(data)
|
||
yield data
|
||
|
||
headers = {
|
||
"Content-Range": f"bytes {start}-{end}/{file_size}",
|
||
"Accept-Ranges": "bytes",
|
||
"Content-Length": str(chunk_size),
|
||
"Content-Type": "video/mp4",
|
||
}
|
||
return StreamingResponse(iter_file(), status_code=206, headers=headers,
|
||
media_type="video/mp4")
|
||
except (ValueError, IndexError):
|
||
pass
|
||
|
||
# Brez Range — vrni cel file
|
||
return FileResponse(
|
||
out,
|
||
media_type="video/mp4",
|
||
headers={"Accept-Ranges": "bytes", "Content-Length": str(file_size)},
|
||
)
|
||
|
||
|
||
@app.delete("/api/jobs/{job_id}")
|
||
async def delete_job(job_id: str, user: str = Depends(check_auth)):
|
||
job = load_job(job_id)
|
||
if not job:
|
||
raise HTTPException(404, "Ne obstaja")
|
||
for key in ("input_path", "output_path"):
|
||
p = job.get(key)
|
||
if p and Path(p).exists():
|
||
Path(p).unlink(missing_ok=True)
|
||
job_path(job_id).unlink(missing_ok=True)
|
||
return {"deleted": job_id}
|