""" reels.biba.live — FastAPI backend. Endpoints: GET / — frontend HTML POST /api/upload — naloži video file POST /api/youtube — submit YouTube URL POST /api/process/{id} — start processing job GET /api/jobs — list vseh jobov GET /api/jobs/{id} — status job-a GET /api/stream/{id} — SSE progress stream GET /api/download/{id} — download finalni reel GET /api/preview/{id} — preview video stream DELETE /api/jobs/{id} — pobriši job + datoteke """ import asyncio import json import hashlib import hmac import os import secrets import shutil import subprocess import threading import time import urllib.parse import uuid from pathlib import Path from typing import Optional from fastapi import ( FastAPI, UploadFile, File, Form, HTTPException, Depends, BackgroundTasks, Request, Response as FastAPIResponse, status, Cookie ) from fastapi.responses import ( FileResponse, HTMLResponse, StreamingResponse, JSONResponse, Response, RedirectResponse ) from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBasic, HTTPBasicCredentials from pydantic import BaseModel # ──────────────────────────────────────────────────────────────── # Config # ──────────────────────────────────────────────────────────────── DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) UPLOAD_DIR = DATA_DIR / "uploads" OUTPUT_DIR = DATA_DIR / "outputs" JOBS_DIR = DATA_DIR / "jobs" QNET_DIR = DATA_DIR / "qnet" SCRIPTS_DIR = Path(__file__).parent.parent / "scripts" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) JOBS_DIR.mkdir(parents=True, exist_ok=True) QNET_DIR.mkdir(parents=True, exist_ok=True) # Qnet song match — povezava z MB player bazami os.environ.setdefault("QNET_LOOKUP_PATH", str(QNET_DIR / "songs_lookup.json")) from app import qnet_match # noqa: E402 # S3 storage mirror — uploads/outputs/jobs gredo tudi v s3://folxspeed/reels-app/ # Lokalna mapa ostane primary, S3 je replica/cache. from app import s3_storage # noqa: E402 def _persist_to_s3(local_path, kind: str) -> bool: """Mirror local file to S3 after producing it (best-effort). kind: 'upload' for uploads/, 'output' for outputs/, 'job_meta' for jobs/ Silent no-op when S3 not configured. Never raises. """ try: if not s3_storage.is_enabled(): return False p = Path(local_path) if not p.exists() or p.stat().st_size == 0: return False return s3_storage.upload_job_file("", kind, p) except Exception as e: print(f"⚠️ S3 mirror failed for {local_path}: {e}", flush=True) return False def _ensure_local(local_path, kind: str) -> bool: """Make sure file is on disk; if missing, fetch from S3. Returns True if file is ready locally after the call. kind: 'upload' or 'output' """ p = Path(local_path) if p.exists() and p.stat().st_size > 0: return True if not s3_storage.is_enabled(): return False folder = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"}.get(kind, kind) key = f"{folder}/{p.name}" print(f"📥 Local missing, fetching from S3: {key}", flush=True) return s3_storage.download(key, p) def _delete_from_s3(filename: str, kind: str) -> bool: """Delete object from S3 (mirror local delete). Best-effort, no raise.""" try: if not s3_storage.is_enabled(): return False folder = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"}.get(kind, kind) return s3_storage.delete(f"{folder}/{filename}") except Exception: return False def _ffmpeg_then_persist(cmd, out_path, kind: str = "output", timeout: int = 600): """Run ffmpeg in background thread, then mirror result to S3. Drop-in replacement for subprocess.Popen() when we want S3 sync after. """ def runner(): try: subprocess.run( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=timeout, ) _persist_to_s3(out_path, kind) except Exception as e: print(f"⚠️ ffmpeg/persist failed for {out_path}: {e}", flush=True) threading.Thread(target=runner, daemon=True).start() # Dedup DB — sledi že obdelanim/naloženim komadom DEDUP_DB = DATA_DIR / "processed.db" def _normalize_filename(filename: str) -> str: """Normaliziraj filename za dedup primerjavo. 'BRAJDE (Official Video).mp4' → 'brajde' 'Brajde (HD).mxf' → 'brajde' """ import re name = Path(filename).stem.lower() # Odstrani pogoste suffix-e name = re.sub(r'\b(official|video|hd|4k|lyric|audio|music|mv|live|cover|version|remix)\b', '', name) # Odstrani parentheses content name = re.sub(r'\([^)]*\)', '', name) name = re.sub(r'\[[^\]]*\]', '', name) # Whitespace normalize name = re.sub(r'\s+', ' ', name).strip() # Odstrani pogoste ločila name = re.sub(r'[-_.]+', ' ', name).strip() return name def _dedup_init(): """Ustvari SQLite tabelo če ne obstaja.""" import sqlite3 conn = sqlite3.connect(str(DEDUP_DB)) conn.execute(""" CREATE TABLE IF NOT EXISTS processed_videos ( normalized_name TEXT NOT NULL, tv_station TEXT NOT NULL, filename_orig TEXT NOT NULL, job_id TEXT NOT NULL, nextcloud_url TEXT, file_size_mb REAL, uploaded_at REAL NOT NULL, PRIMARY KEY (normalized_name, tv_station) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_norm ON processed_videos(normalized_name)") conn.commit() conn.close() def dedup_check(filename: str, tv_station: str) -> Optional[dict]: """Vrne dict z info o že obdelanem komadu, ali None.""" import sqlite3 _dedup_init() norm = _normalize_filename(filename) if not norm: return None conn = sqlite3.connect(str(DEDUP_DB)) conn.row_factory = sqlite3.Row row = conn.execute( "SELECT * FROM processed_videos WHERE normalized_name = ? AND tv_station = ?", (norm, tv_station) ).fetchone() conn.close() if row: return dict(row) return None def dedup_record(filename: str, tv_station: str, job_id: str, nextcloud_url: str = None, file_size_mb: float = None): """Zabeleži uspešno obdelan + naložen komad.""" import sqlite3 _dedup_init() norm = _normalize_filename(filename) if not norm: return conn = sqlite3.connect(str(DEDUP_DB)) conn.execute(""" INSERT OR REPLACE INTO processed_videos (normalized_name, tv_station, filename_orig, job_id, nextcloud_url, file_size_mb, uploaded_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (norm, tv_station, filename, job_id, nextcloud_url, file_size_mb, time.time())) conn.commit() conn.close() print(f"📒 Dedup: zabeležen {norm} → {tv_station} (job {job_id})", flush=True) def dedup_remove(filename: str, tv_station: str): """Izbriši zapis (npr. če uporabnik želi re-narediti).""" import sqlite3 _dedup_init() norm = _normalize_filename(filename) if not norm: return conn = sqlite3.connect(str(DEDUP_DB)) conn.execute("DELETE FROM processed_videos WHERE normalized_name = ? AND tv_station = ?", (norm, tv_station)) conn.commit() conn.close() AUTH_USER = os.environ.get("AUTH_USER", "sebastjan") AUTH_PASS = os.environ.get("AUTH_PASS", "change-me-in-coolify-env") # Google Sign-In (https://developers.google.com/identity/gsi/web) GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "").strip() ALLOWED_EMAILS = [e.strip().lower() for e in os.environ.get("ALLOWED_EMAILS", "").split(",") if e.strip()] SESSION_SECRET = os.environ.get("SESSION_SECRET", "").strip() or hashlib.sha256( f"{AUTH_USER}|{AUTH_PASS}|reels-app-default-secret".encode() ).hexdigest() SESSION_COOKIE_NAME = "reels_session" SESSION_TTL_SECONDS = 30 * 24 * 3600 # 30 dni MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "2000")) # Nextcloud upload (folxspeed/REELS/) NEXTCLOUD_URL = os.environ.get("NEXTCLOUD_URL", "https://nextcloud.folx.tv") NEXTCLOUD_USER = os.environ.get("NEXTCLOUD_USER", "admin") NEXTCLOUD_PASS = os.environ.get("NEXTCLOUD_PASS", "") NEXTCLOUD_FOLDER = os.environ.get("NEXTCLOUD_FOLDER", "folxspeed/REELS") # ──────────────────────────────────────────────────────────────── # Auth — Google Sign-In + basic auth fallback (za API/cron) # ──────────────────────────────────────────────────────────────── security = HTTPBasic(auto_error=False) def _email_allowed(email: str) -> bool: """Email mora biti v ALLOWED_EMAILS whitelistu.""" if not email: return False return email.lower().strip() in ALLOWED_EMAILS def _make_session_token(email: str) -> str: """Sign session: base64url(email|expiry|hmac_sha256).""" import base64 expiry = int(time.time()) + SESSION_TTL_SECONDS payload = f"{email}|{expiry}" sig = hmac.new(SESSION_SECRET.encode(), payload.encode(), hashlib.sha256).hexdigest() full = f"{payload}|{sig}" return base64.urlsafe_b64encode(full.encode()).decode().rstrip("=") def _verify_session_token(token: str) -> Optional[str]: """Returns email if valid+not expired, else None.""" if not token: return None try: import base64 pad = len(token) % 4 if pad: token += "=" * (4 - pad) decoded = base64.urlsafe_b64decode(token.encode()).decode() parts = decoded.split("|") if len(parts) != 3: return None email, expiry, sig = parts expected = hmac.new(SESSION_SECRET.encode(), f"{email}|{expiry}".encode(), hashlib.sha256).hexdigest() if not hmac.compare_digest(sig, expected): return None if int(expiry) < int(time.time()): return None return email except Exception: return None def _verify_google_id_token(id_token: str) -> Optional[dict]: """Verify Google ID token via tokeninfo endpoint. Returns claims dict or None.""" if not id_token or not GOOGLE_CLIENT_ID: return None try: import urllib.request, urllib.error, json as _json url = f"https://oauth2.googleapis.com/tokeninfo?id_token={urllib.parse.quote(id_token)}" with urllib.request.urlopen(url, timeout=10) as resp: data = _json.loads(resp.read().decode()) if data.get("aud") != GOOGLE_CLIENT_ID: print(f"⚠️ Google token aud mismatch: {data.get('aud')!r}", flush=True) return None if data.get("iss") not in ("accounts.google.com", "https://accounts.google.com"): return None if int(data.get("exp", 0)) < int(time.time()): return None if data.get("email_verified") not in (True, "true"): return None return data except Exception as e: print(f"⚠️ Google token verify error: {e}", flush=True) return None def check_auth( request: Request, creds: Optional[HTTPBasicCredentials] = Depends(security), ): """Auth dependency. Accepts (in order): 1) Valid session cookie (set by Google Sign-In flow) 2) HTTP Basic with AUTH_USER/AUTH_PASS (legacy + cron + scripts) On failure: redirect browser GETs to /login, return 401 to API clients. """ # 1) Session cookie token = request.cookies.get(SESSION_COOKIE_NAME) if token: email = _verify_session_token(token) if email and _email_allowed(email): return email # 2) Basic auth fallback if creds and creds.username and creds.password: correct_user = secrets.compare_digest(creds.username, AUTH_USER) correct_pass = secrets.compare_digest(creds.password, AUTH_PASS) if correct_user and correct_pass: return creds.username # 3) Failure — redirect browsers, 401 API accept = request.headers.get("accept", "") is_browser_get = request.method == "GET" and "text/html" in accept if is_browser_get and GOOGLE_CLIENT_ID: # Browser → /login (raise s 303 ne dela v Depends, zato uporabljamo HTTPException) raise HTTPException( status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/login"}, ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Basic"}, ) # ──────────────────────────────────────────────────────────────── # Artist + title parsing iz filename / YouTube title # ──────────────────────────────────────────────────────────────── import re _NOISE_PATTERNS = [ # Variacije "official" z možnimi tipkarskimi napakami (offiicial, offical, oficial, ...) # Match liberalno: "off" + 0-3 dodatnih črk + "icial" + "video"/"audio" + opcijsko številka r"\(Off[a-z]*icial\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\s*\)", r"\(Off[a-z]*icia[lk]\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\)", r"\bOff[a-z]*icial\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\b", # Nemške variacije r"\(Offizielles?\s+(?:Musik)?[Vv]ideo\)", r"\bOffizielles?\s+(?:Musik)?[Vv]ideo\b", # Lyric videos r"\(Lyric[s]?\s+Video\)", r"\bLyric[s]?\s+Video\b", # Audio quality / version markers r"\(Audio\)", r"\(HD\)", r"\(HQ\)", r"\(4K\)", r"\(8K\)", r"\(1080p?\)", r"\(Live\)", r"\(Remix\)", r"\(Cover\)", r"\(Acoustic\)", r"\(Remastered\)", r"\(Remaster(?:ed)?\s*\d{0,4}\)", r"\(Extended(?:\s+Mix)?\)", r"\(Radio(?:\s+Edit)?\)", r"\(Clean(?:\s+Version)?\)", r"\(Explicit\)", # Square brackets variations r"\[Official[^\]]*\]", r"\[Music[^\]]*\]", r"\[Audio[^\]]*\]", r"\[HD\]", r"\[HQ\]", r"\[4K\]", r"\[Lyric[s]?[^\]]*\]", # Bare words (without brackets) r"\boriginal\s+(?:video|audio)\b", r"\bMV\b", r"\b(?:4K|HD|HQ|8K|1080p|720p|4k|hd|hq)\s*$", # trailing 4K/HD brez oklepajev # Trailing year in parens (npr. "(2024)") r"\(\d{4}\)\s*$", # Trailing 2-4 digit number na koncu (verjetno leto: "23", "2023", "33"): # POMEMBNO: samo če je ZADNJA stvar v stringu in ne del besede # NE odstrani '33 točk' ampak DA odstrani 'Naslov 33' # Pred številko: presledek/oklepaji/ničesar r"\s+\d{2,4}\s*$", # Prazni / dummy oklepaji: "( )", "( )", "( - )", "(-)", "(.)" r"\(\s*[-–—._]*\s*\)", # Catch-all: oklepaji z "video"/"audio"/"version"/"mix"/"edit"/"remix" # (široko match — če oklepaji vsebujejo te besede, so verjetno noise) r"\([^)]*\b(?:video|audio|version|mix|edit|remix|cover|live|hd|hq|4k|8k|remaster(?:ed)?|extended|clean|explicit|radio|lyric[s]?|official|offizielles?|musik)\b[^)]*\)", # Catch-all: oglati oklepaji z noise besedami r"\[[^\]]*\b(?:video|audio|version|mix|edit|remix|cover|live|hd|hq|4k|official|musik)\b[^\]]*\]", # Avtor/feat. v oklepajih: "(prod. by X)", "(feat. Y)", "(ft. Z)" r"\(\s*(?:prod\.?(?:uced)?\s+by|feat\.?(?:uring)?|ft\.?)\s+[^)]+\)", # Trailing številke ki označujejo verzije: "33" na koncu (npr. "Modrijani - X 33") # POZOR: zelo previdno, ker so lahko legit (npr. del naslova) # — Ne dodam splošnega trailing številk pattern-a, ker bi razbil legitime ] def parse_artist_title(filename_or_title): """Iz imena datoteke / YouTube naslova ekstrahira (artist, title). Podpira pogoste vzorce: - "Artist - Title.mp4" - "Artist - Title (Official Music Video).mp4" - "Artist – Title" (em-dash) - "Artist | Title" Vrne (artist, title) ali (None, None) če ni razvidno. """ if not filename_or_title: return (None, None) # Odstrani extension SAMO če je dejansko file extension (kratko, brez presledka). # Path("ANS.NAVEZA - SREČA OPOTEČA").stem vrne "ANS" — narobe za YT title. # Pravilno: samo če je string brez presledkov ali se konča na običajno ext. _COMMON_EXT = {".mp4", ".mp3", ".m4a", ".webm", ".mkv", ".avi", ".mov", ".wav", ".flac", ".aac", ".opus", ".ogg", ".wmv", ".mxf"} name = filename_or_title if "." in name: # Vzemi zadnjo piko in preveri ali je to znana ext last_dot = name.rfind(".") ext = name[last_dot:].lower() if ext in _COMMON_EXT and last_dot > 0: name = name[:last_dot] # Odstrani noise patterns for pat in _NOISE_PATTERNS: name = re.sub(pat, "", name, flags=re.IGNORECASE) # Normaliziraj presledke name = re.sub(r"\s+", " ", name).strip() # Probaj različne separatorje for sep in [" - ", " – ", " — ", " | ", " : "]: if sep in name: parts = name.split(sep, 1) artist = parts[0].strip() title = parts[1].strip() # Strip trailing/leading puncutation artist = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', artist) title = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', title) if artist and title and len(artist) <= 80 and len(title) <= 100: return (artist, title) return (None, None) def safe_filename(s, max_len=80): """Naredi varno ime datoteke (brez znakov ki bi razbili FS).""" if not s: return "" # Replace problematic chars with safe alternative s = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', s) s = re.sub(r'\s+', ' ', s).strip() return s[:max_len] def clean_noise(s): """Odstrani 'noise' (Official Video itd.) iz besedila - tudi že-parsed values.""" if not s: return s cleaned = s for pat in _NOISE_PATTERNS: cleaned = re.sub(pat, "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"\s+", " ", cleaned).strip() cleaned = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', cleaned) return cleaned def build_download_filename(job): """Sestavi pravilno ime download datoteke iz job metadata.""" # Najprej probaj job-shranjene parsed values artist = clean_noise(job.get("parsed_artist")) title = clean_noise(job.get("parsed_title")) # Fallback: parse from filename if not artist or not title: source = job.get("filename") or job.get("youtube_title") or "" parsed_artist, parsed_title = parse_artist_title(source) artist = artist or parsed_artist title = title or parsed_title if artist and title: return f"{safe_filename(artist)} - {safe_filename(title)} - REEL.mp4" if title: return f"{safe_filename(title)} - REEL.mp4" # Last resort: job ID (vendar to bi se moralo preprečiti že ob upload-u) return f"reel_{job['id']}.mp4" # ──────────────────────────────────────────────────────────────── # Job state (filesystem-based, persistent prek restartov) # ──────────────────────────────────────────────────────────────── def job_path(job_id): return JOBS_DIR / f"{job_id}.json" def load_job(job_id): p = job_path(job_id) if not p.exists(): return None return json.loads(p.read_text()) def save_job(job): p = job_path(job["id"]) p.write_text(json.dumps(job, ensure_ascii=False, indent=2)) _persist_to_s3(p, "job_meta") def update_job(job_id, **kwargs): job = load_job(job_id) if not job: return None job.update(kwargs) job["updated_at"] = time.time() save_job(job) return job def _clean_job_titles(job): """Očisti 'Official Video' ipd. iz parsed_title v real-time (brez perzistiranja).""" if not job: return job if job.get("parsed_title"): cleaned = clean_noise(job["parsed_title"]) if cleaned and cleaned != job["parsed_title"]: job["parsed_title"] = cleaned if job.get("parsed_artist"): cleaned = clean_noise(job["parsed_artist"]) if cleaned and cleaned != job["parsed_artist"]: job["parsed_artist"] = cleaned return job def list_jobs(): out = [] for f in sorted(JOBS_DIR.glob("*.json"), reverse=True): try: out.append(_clean_job_titles(json.loads(f.read_text()))) except Exception: pass return out def generate_srt_from_segments(segments, clip_start, clip_end, output_path): """Generira SRT samo za dele, ki spadajo v [clip_start, clip_end]. Timestamp-i so re-mapirani na 0-based (kot je v trim-anem videu). Razdeli dolge segmente (>2.5s) na enake kose za hiter pacing v reels stilu. Vse besedilo VELIKE TISKANE ČRKE. """ MAX_CHUNK_DURATION = 2.5 def fmt_ts(s): h = int(s // 3600) m = int((s % 3600) // 60) sec = s % 60 return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",") lines = [] idx = 1 for seg in segments: s_start = float(seg["start"]) s_end = float(seg["end"]) text = str(seg["text"]).strip() words = seg.get("words", []) or [] # Filter v range if s_end <= clip_start or s_start >= clip_end: continue # ── HALLUCINATION FILTER ── # STT (Scribe, Whisper) občasno halucinira pri dolgih instrumentalih: # vrne segment 60-100s z 1-2 besedama. Tak segment ne smemo dati v SRT. # Pravilo: če je segment > 15s IN ima < 5 besed (ali words array < 5), # je verjetno halucinacija. seg_dur = s_end - s_start word_count = len(words) if words else len(text.split()) if seg_dur > 15 and word_count < 5: print(f"[SRT] Preskočil halucinacijski segment [{s_start:.1f}-{s_end:.1f}] " f"({seg_dur:.1f}s, {word_count} besed): {text[:50]!r}", flush=True) continue # Pripravi words_in_clip vedno (če imamo word-level timestampe) # Uporabili ga bomo tako za segment trim kot za chunk boundaries words_in_clip = None if words: words_in_clip = [] for w in words: w_start = float(w.get("start", 0)) w_end = float(w.get("end", 0)) w_text = w.get("text", "").strip() if not w_text: continue if w_end > clip_start and w_start < clip_end: words_in_clip.append({ "start": max(w_start, clip_start), "end": min(w_end, clip_end), "text": w_text, }) # Če segment delno štrli iz clip range-a IN imamo word-level timestampe, # uporabi samo tiste besede ki dejansko padejo v clip range if words_in_clip and (s_start < clip_start or s_end > clip_end): if not words_in_clip: continue # Reconstruiraj segment z dejanskim word-level timing-om text = " ".join(w["text"] for w in words_in_clip) s_start = words_in_clip[0]["start"] s_end = words_in_clip[-1]["end"] else: # Klipni segment začetek/konec na clip range s_start = max(s_start, clip_start) s_end = min(s_end, clip_end) if s_end - s_start < 0.2: continue # Re-mapraj na 0-based rel_start = s_start - clip_start rel_end = s_end - clip_start if not text: continue text_upper = text.upper() # Razdeli na chunk-e če je predolg duration = rel_end - rel_start if duration <= MAX_CHUNK_DURATION: lines.append(f"{idx}\n{fmt_ts(rel_start)} --> {fmt_ts(rel_end)}\n{text_upper}\n") idx += 1 else: # ── WORD-LEVEL CHUNKING ── # Če imamo word-level timestampe (Soniox/Scribe), uporabi DEJANSKE čase besed # za chunk boundaries (NE enake time chunks, ker pevec ne poje enakomerno). if words_in_clip and len(words_in_clip) >= 2: # Group besede v chunke z max trajanjem MAX_CHUNK_DURATION # Pravila: # 1. Vsak chunk traja max MAX_CHUNK_DURATION (2.5s) # 2. NE začni novega chunka če bi prejšnji ostal kratek (<3 besede ali <12 znakov) # 3. NE pusti zadnji chunk siroto z 1 kratko besedo (<3 znakov) — združi nazaj MIN_CHUNK_WORDS = 3 MIN_CHUNK_CHARS = 12 chunks = [] current_chunk = [words_in_clip[0]] for w in words_in_clip[1:]: chunk_start_time = current_chunk[0]["start"] chunk_dur_so_far = w["end"] - chunk_start_time # Bi začeli nov chunk? if chunk_dur_so_far > MAX_CHUNK_DURATION: # Preveri ali bi current_chunk ostal "siroto kratek" current_text = " ".join(c["text"] for c in current_chunk) too_short = (len(current_chunk) < MIN_CHUNK_WORDS or len(current_text) < MIN_CHUNK_CHARS) if too_short: # Daj še to besedo zraven (raje malo predolg chunk kot siroto kratek) current_chunk.append(w) else: chunks.append(current_chunk) current_chunk = [w] else: current_chunk.append(w) if current_chunk: # Če zadnji chunk ima samo 1-2 kratki besedi, združi z zadnjim chunkom last_text = " ".join(c["text"] for c in current_chunk) if (chunks and (len(current_chunk) < 2 or len(last_text) < 5)): chunks[-1].extend(current_chunk) else: chunks.append(current_chunk) # Generiraj SRT iz chunks (z dejanskimi word timestampi) for chunk in chunks: cs = chunk[0]["start"] - clip_start ce = chunk[-1]["end"] - clip_start chunk_text = " ".join(w["text"] for w in chunk).upper().strip() if not chunk_text: continue lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text}\n") idx += 1 else: # Fallback: brez word-level timestampov, razdeli enako n_parts = int(duration / MAX_CHUNK_DURATION) + 1 words_split = text_upper.split() words_per_part = max(1, len(words_split) // n_parts) chunk_dur = duration / n_parts for i in range(n_parts): cs = rel_start + i * chunk_dur ce = rel_start + (i + 1) * chunk_dur wstart = i * words_per_part wend = (i + 1) * words_per_part if i < n_parts - 1 else len(words_split) chunk_text = " ".join(words_split[wstart:wend]) if wstart < len(words_split) else text_upper if not chunk_text.strip(): chunk_text = text_upper lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text.strip()}\n") idx += 1 with open(output_path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) return output_path # ──────────────────────────────────────────────────────────────── # Pipeline runner (background task) # ──────────────────────────────────────────────────────────────── def run_subprocess_logged(cmd, job_id, step_name): """Pokliče subprocess, logi gredo v job.""" update_job(job_id, current_step=step_name, status="processing") proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: # Combine stdout + stderr za diagnostiko err_msg = (proc.stderr or "") + "\n" + (proc.stdout or "") update_job( job_id, status="failed", error=f"{step_name}: {err_msg[-800:].strip()}", ) return False # Tudi pri success-u beleži stderr za diagnostiko (samo zadnji del) if proc.stderr and proc.stderr.strip(): update_job(job_id, last_step_log=proc.stderr[-500:].strip()) return True def _precache_edit_assets(job_id: str, src_path: str): """Generira low-q source video + waveform PNG za Edit modal. Tečeta v ozadju (subprocess), ne blokira pipeline-a. Če Edit modal je odprt, te assete dobi instant. """ src = Path(src_path) if not src.exists(): return # Low-q source (480p) — za hitro scrubbanje low_path = OUTPUT_DIR / f"{job_id}_source_low.mp4" if not low_path.exists() or low_path.stat().st_size < 1024: if low_path.exists(): low_path.unlink() cmd = [ "ffmpeg", "-y", "-i", str(src), "-vf", "scale=854:480:force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2", "-c:v", "libx264", "-preset", "veryfast", "-crf", "28", "-c:a", "aac", "-b:a", "96k", "-movflags", "+faststart", "-loglevel", "error", str(low_path), ] _ffmpeg_then_persist(cmd, low_path, kind="output") print(f"📦 Pre-cache low-q source za {job_id} (background → S3)", flush=True) # Waveform PNG (2400x72 — za zoom) wave_path = OUTPUT_DIR / f"{job_id}_waveform_2400x72.png" if not wave_path.exists() or wave_path.stat().st_size < 100: if wave_path.exists(): wave_path.unlink() cmd = [ "ffmpeg", "-y", "-i", str(src), "-filter_complex", "[0:a]aformat=channel_layouts=mono,showwavespic=s=2400x72:colors=#ff6b6b:scale=lin:draw=full[wave]", "-map", "[wave]", "-frames:v", "1", "-loglevel", "error", str(wave_path), ] _ffmpeg_then_persist(cmd, wave_path, kind="output") print(f"📦 Pre-cache waveform za {job_id} (background → S3)", flush=True) def process_job(job_id): """Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs.""" job = load_job(job_id) if not job: return try: # ── 1. Source preparation ───────────────────────────── if job["source_type"] == "youtube": update_job(job_id, status="downloading", current_step="YouTube download") input_path = UPLOAD_DIR / f"{job_id}_yt.mp4" cmd = [ "python3", str(SCRIPTS_DIR / "yt_download.py"), job["youtube_url"], str(input_path), ] if not run_subprocess_logged(cmd, job_id, "YouTube download"): return update_job(job_id, input_path=str(input_path)) # S3 mirror — original video + info.json _persist_to_s3(input_path, "upload") info_json = input_path.with_suffix(".info.json") if info_json.exists(): _persist_to_s3(info_json, "upload") # Probaj dobiti YT metadata (če še ni iz submit-a) — title, uploader, id, ... # Single video submit ali playlist resolve že nastavi metadata, ampak # včasih (npr. če je submit fetch failed) je še manjka. need_metadata_fetch = not job.get("youtube_title") or not job.get("youtube_uploader") if need_metadata_fetch: try: info_cmd = [ "python3", str(SCRIPTS_DIR / "yt_download.py"), job["youtube_url"], "/dev/null", "--info-only", ] proc = subprocess.run(info_cmd, capture_output=True, text=True, timeout=30) if proc.returncode == 0 and proc.stdout: info = json.loads(proc.stdout) yt_title = info.get("title", "") or "" updates = {} if yt_title: updates["youtube_title"] = yt_title if info.get("id"): updates["youtube_id"] = info["id"] if info.get("uploader") or info.get("channel"): updates["youtube_uploader"] = info.get("uploader") or info.get("channel") or "" if info.get("duration") is not None: updates["youtube_duration"] = info["duration"] if info.get("thumbnail"): updates["youtube_thumbnail"] = info["thumbnail"] if info.get("description"): updates["youtube_description"] = info["description"][:2000] if info.get("upload_date"): updates["youtube_upload_date"] = info["upload_date"] if info.get("webpage_url"): updates["youtube_webpage_url"] = info["webpage_url"] # Qnet match + parser samo če še nimamo clean name if yt_title and not job.get("has_clean_name"): qm = qnet_match.match_filename(yt_title) if qm["matched"] and qm["confidence"] >= 0.85: updates["parsed_artist"] = qm["artist"] updates["parsed_title"] = qm["title"] updates["has_clean_name"] = True updates["qnet_match"] = { "method": qm["method"], "confidence": qm["confidence"], "matched_file": qm["file"], "matched_station": qm["station"], } updates["tv_station"] = qm["station"] else: a, t = parse_artist_title(yt_title) if a: updates["parsed_artist"] = a if t: updates["parsed_title"] = t updates["has_clean_name"] = bool(a and t) if updates: update_job(job_id, **updates) # Reload job for downstream use job = load_job(job_id) except Exception as e: print(f"⚠️ Cannot fetch YT metadata: {e}", flush=True) else: input_path = Path(job["input_path"]) # ── 1b. Music recognition (ACRCloud) — če nimamo artist+title ───── # Tudi za YouTube jobs lahko naslov ni razviden (npr. iz playliste, "Track 5") if not (job.get("parsed_artist") and job.get("parsed_title")): update_job(job_id, current_step="Avto-prepoznavam pesem (ACRCloud)") try: acr_cmd = [ "python3", str(SCRIPTS_DIR / "acr_recognize.py"), str(input_path), ] proc = subprocess.run(acr_cmd, capture_output=True, text=True, timeout=120) if proc.returncode == 0 and proc.stdout: data = json.loads(proc.stdout) a, t = data.get("artist"), data.get("title") if a and t: update_job( job_id, parsed_artist=a, parsed_title=t, has_clean_name=True, recognized_via="acrcloud", ) job = load_job(job_id) print(f"✅ ACR prepoznal: {a} - {t}", flush=True) else: print(f"⚠️ ACR ni prepoznal pesmi", flush=True) else: print(f"⚠️ ACR exit {proc.returncode}: {proc.stderr[:200]}", flush=True) except Exception as e: print(f"⚠️ ACR error: {e}", flush=True) # ── 2. Smart analysis (če auto_chorus) ────────────────────────── # ÄŒe je custom_clip=True (user-edit mode), preskoÄi analizo # in samo posodobi start/duration iz obstojeÄega clip_range v analysis.json if job.get("custom_clip"): update_job(job_id, current_step="User-edit recut (preskočim analizo)") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" srt_from_claude = None if analysis_path.exists(): try: with open(analysis_path, "r", encoding="utf-8") as f: analysis = json.load(f) cr = analysis["clip_range"] fade = analysis.get("fade", {"fade_in": 0.05, "fade_out": 0.5}) # Generiraj nov SRT iz obstoječega transkripta na novem range-u if analysis.get("transcript", {}).get("segments") and not job.get("no_subs"): srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt" try: # Če imamo custom_subtitle_segments (user popravil napise) segs_to_use = analysis.get("custom_subtitle_segments") or analysis["transcript"]["segments"] generate_srt_from_segments( segs_to_use, cr["start"], cr["end"], srt_path_out, ) srt_from_claude = str(srt_path_out) except Exception as e: print(f"⚠️ Recut SRT generation failed: {e}", flush=True) update_job( job_id, start=cr["start"], duration=cr["duration"], fade_in=fade.get("fade_in", 0.05), fade_out=fade.get("fade_out", 0.5), claude_srt_path=srt_from_claude, ) job = load_job(job_id) except Exception as e: update_job(job_id, status="failed", error=f"Recut analysis read: {e}") return else: update_job(job_id, status="failed", error="Analysis manjka za recut") return elif job.get("auto_chorus"): update_job(job_id, current_step="Analiza pesmi (transkript + energija)") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" cmd = [ "python3", str(SCRIPTS_DIR / "analyze.py"), str(input_path), "--target-duration", str(job.get("duration", 30)), "--max-duration", str(job.get("max_duration", 45)), "--min-duration", str(job.get("min_duration", 20)), "--output", str(analysis_path), ] if job.get("include_prebuild"): cmd += ["--include-prebuild"] # LLM provider (claude/gemini/auto) if job.get("llm_provider"): cmd += ["--llm-provider", job["llm_provider"]] if job.get("llm_model"): cmd += ["--llm-model", job["llm_model"]] # Filename hint za Claude/Scribe — preferiraj parsed artist+title (čistejše) if job.get("parsed_artist") and job.get("parsed_title"): fn_hint = f"{job['parsed_artist']} - {job['parsed_title']}" cmd += ["--filename-hint", fn_hint] elif job.get("filename"): fn_hint = Path(job["filename"]).stem cmd += ["--filename-hint", fn_hint] # STT provider (elevenlabs = Scribe, local = faster-whisper, auto = preferiraj Scribe) # Per-station default routing: # FOLX DE / ZWEI (nemške) → Scribe (boljši za nemščino, brez "Mrs. Sadie" halucinacij) # FOLX SLO / ONE / ADRIA → Soniox primary (auto = soniox_chain) # User lahko v UI override-a (whisper_provider polje). station = (job.get("tv_station") or "").upper() DE_STATIONS = ("FOLX DE", "ZWEI", "ZWEI MUSIC TV", "FOLX MUSIC TV") user_provider = job.get("whisper_provider") if user_provider and user_provider not in ("auto", ""): # User explicitly chose a provider — respect it cmd += ["--whisper-provider", user_provider] elif any(s in station for s in DE_STATIONS): # Auto-route DE → Scribe cmd += ["--whisper-provider", "elevenlabs"] print(f"🇩🇪 Auto-routing STT to Scribe za {job.get('tv_station')}", flush=True) elif user_provider: # auto → analyze.py si izbere (soniox_chain default) cmd += ["--whisper-provider", user_provider] # lang: če None ali 'auto', pusti analyze.py auto-detect if job.get("lang") and job["lang"] not in ("auto", ""): cmd += ["--lang", job["lang"]] cmd += ["--model", job.get("whisper_model", "large-v3")] proc = subprocess.run(cmd, capture_output=True, text=True) srt_from_claude = None # Pot do SRT iz Claude-popravljenega transcript-a if proc.returncode == 0 and analysis_path.exists(): try: with open(analysis_path, "r", encoding="utf-8") as f: analysis = json.load(f) cr = analysis["clip_range"] fade = analysis["fade"] # Generiraj SRT iz transcript-a TRIM-ANEGA na clip_range # (Claude je morda popravil besedilo — uporabi popravljeno) if analysis.get("transcript", {}).get("segments"): srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt" try: generate_srt_from_segments( analysis["transcript"]["segments"], cr["start"], cr["end"], srt_path_out, ) srt_from_claude = str(srt_path_out) llm_src = cr.get("source", "LLM") print(f"📝 Generated SRT from {llm_src} transcript: {srt_path_out}") except Exception as e: print(f"⚠️ SRT generation failed: {e}") update_job( job_id, analysis_summary={ "language": analysis["language"], "language_probability": analysis["language_probability"], "instrumental": analysis["instrumental"], "clip_range": cr, "fade": fade, "chorus_preview": analysis["chorus"]["best"]["text_preview"] if analysis.get("chorus") and analysis["chorus"].get("best") else None, "video_duration": analysis.get("video_duration"), "candidates": analysis["chorus"].get("all_candidates", [])[:5] if analysis.get("chorus") else [], "claude_corrected_text": analysis.get("claude_corrected_text", False), }, # Cel transkript shranimo za UI prikaz full_transcript=[ {"start": s["start"], "end": s["end"], "text": s["text"]} for s in analysis.get("transcript", {}).get("segments", []) ], start=cr["start"], duration=cr["duration"], fade_in=fade["fade_in"], fade_out=fade["fade_out"], detected_language=analysis["language"], is_instrumental=analysis["instrumental"], claude_srt_path=srt_from_claude, ) # Auto-disable subs za instrumental if analysis["instrumental"] and not job.get("no_subs"): update_job(job_id, no_subs=True, auto_disabled_subs=True) # Reload local dict job = load_job(job_id) except (json.JSONDecodeError, KeyError) as e: update_job(job_id, chorus_error=f"Analysis parse: {e}") else: update_job(job_id, chorus_error=(proc.stderr or "")[-500:]) # ── 3. Reframe + subtitles (clip.py orchestrator) ───── output_path = OUTPUT_DIR / f"{job_id}.mp4" update_job(job_id, current_step="Reframe + subtitles") cmd = [ "python3", str(SCRIPTS_DIR / "clip.py"), str(input_path), str(output_path), "--mode", job.get("mode", "track"), "--quality", job.get("quality", "medium"), "--style", job.get("subtitle_style", "reels"), ] if job.get("start") is not None: cmd += ["--start", str(job["start"])] if job.get("duration") is not None: cmd += ["--duration", str(job["duration"])] if job.get("fade_in", 0) > 0: cmd += ["--fade-in", str(job["fade_in"])] if job.get("fade_out", 0) > 0: cmd += ["--fade-out", str(job["fade_out"])] # SRT iz Claude (boljše besedilo) — preda direktno v subtitle.py if job.get("claude_srt_path") and Path(job["claude_srt_path"]).exists() and not job.get("no_subs"): cmd += ["--srt", job["claude_srt_path"]] # lang: prefer detected_language če auto chosen_lang = job.get("lang") if chosen_lang in (None, "auto", ""): chosen_lang = job.get("detected_language") if chosen_lang: cmd += ["--lang", chosen_lang] if job.get("no_subs"): cmd += ["--no-subs"] cmd += ["--model", job.get("whisper_model", "large-v3")] # DEBUG: zapiši natanko kakšen ukaz se izvede update_job(job_id, debug_clip_cmd=" ".join(cmd)) if not run_subprocess_logged(cmd, job_id, "Reframe + subtitles"): return # ── Done ────────────────────────────────────────────── if output_path.exists(): update_job( job_id, status="done", current_step="Končano", output_path=str(output_path), output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2), ) # S3 mirror — final reel + pomožne datoteke (analysis, subtitles) _persist_to_s3(output_path, "output") for suffix in (".analysis.json", ".subtitles.srt", ".subtitles.ass"): aux = OUTPUT_DIR / f"{job_id}{suffix}" if aux.exists(): _persist_to_s3(aux, "output") # Telegram obvestilo try: from app.telegram import notify_job_done final_job = load_job(job_id) notify_job_done(final_job) except Exception as e: print(f"⚠️ TG notify_job_done failed: {e}", flush=True) # Pre-cache za Edit modal (instant odpiranje): # - low-q source video (480p) za hitro scrubbanje # - waveform PNG # Ti se bodo zgenerirali tudi on-demand, ampak zdaj so že tu = Edit instant. try: _precache_edit_assets(job_id, str(input_path)) except Exception as e: print(f"⚠️ Edit precache failed: {e}", flush=True) # Avto-upload v Nextcloud če flag set (npr. po Save/recut) try: final_job = load_job(job_id) if final_job.get("auto_upload_to_nextcloud") and _nextcloud_configured(): update_job(job_id, nextcloud_status="uploading", nextcloud_error=None) download_name = build_download_filename(final_job) tv_station = final_job.get("tv_station", "FOLX SLO") nc_folder = _nextcloud_folder_for_station(tv_station) target_subdir = f"folxspeed/REELS/{nc_folder}" print(f"☁️ Avto-upload v Nextcloud: /{target_subdir}/{download_name}", flush=True) success, result = _nextcloud_upload(str(output_path), download_name, target_subdir=target_subdir) if success: update_job( job_id, nextcloud_status="uploaded", nextcloud_url=result, nextcloud_error=None, auto_upload_to_nextcloud=False, # disable da se ne ponovi hidden_after_upload=True, # signal za UI da ga skrije ) # Zabeleži v dedup try: orig_filename = final_job.get("filename") or download_name file_mb = final_job.get("output_size_mb") or final_job.get("size_mb") dedup_record(orig_filename, tv_station, job_id, nextcloud_url=result, file_size_mb=file_mb) except Exception as e: print(f"⚠️ Dedup record failed: {e}", flush=True) print(f"☁️ Auto-upload OK: /{target_subdir}/{download_name}", flush=True) else: update_job(job_id, nextcloud_status="error", nextcloud_error=result) print(f"⚠️ Auto-upload failed: {result}", flush=True) except Exception as e: print(f"⚠️ Auto-upload error: {e}", flush=True) # Batch tracking — če je zadnji v batchu, pošlji summary _try_finalize_batch(job_id) else: update_job( job_id, status="failed", error="Output datoteka ne obstaja po obdelavi", ) try: from app.telegram import notify_job_failed notify_job_failed(load_job(job_id), "Output datoteka ne obstaja") except Exception: pass _try_finalize_batch(job_id) except Exception as e: update_job(job_id, status="failed", error=str(e)) try: from app.telegram import notify_job_failed notify_job_failed(load_job(job_id), str(e)) except Exception: pass _try_finalize_batch(job_id) def _try_finalize_batch(job_id): """Če je job del batch-a, preveri če so vsi zaključili in pošlji summary.""" try: job = load_job(job_id) batch_id = job.get("batch_id") if not batch_id: return # Preštej batch jobe all_jobs = [load_job(jp.stem) for jp in JOBS_DIR.glob("*.json")] batch_jobs = [j for j in all_jobs if j and j.get("batch_id") == batch_id] unfinished = [j for j in batch_jobs if j.get("status") not in ("done", "failed")] if unfinished: return # še niso vsi končani # Vsi so končani — pošlji summary total = len(batch_jobs) succeeded = len([j for j in batch_jobs if j.get("status") == "done"]) failed = total - succeeded try: from app.telegram import notify_batch_complete notify_batch_complete(batch_id, total, succeeded, failed) except Exception as e: print(f"⚠️ TG batch summary failed: {e}", flush=True) except Exception as e: print(f"⚠️ _try_finalize_batch error: {e}", flush=True) # ──────────────────────────────────────────────────────────────── # FastAPI app # ──────────────────────────────────────────────────────────────── app = FastAPI(title="Reels Clipper") app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "static"), name="static") # ──────────────────────────────────────────────────────────────── # Queue worker — procesira queued jobe enega za drugim # ──────────────────────────────────────────────────────────────── import threading _queue_lock = threading.Lock() # Set ID-jev v obdelavi — omogoča 3 paralelne workerje _queue_running = {"jobs_in_progress": set()} # Število paralelnih worker-jev (CPU dovoljena hkratna obdelava) NUM_WORKERS = int(os.environ.get("NUM_WORKERS", "3")) def _queue_worker(worker_id: int): """Background thread ki preverja queued jobe in jih obdeluje paralelno. Več worker-jev teče hkrati. Vsak vzame naslednji "queued" job ki ni že v obdelavi pri drugem worker-ju. Zaklep poskrbi za atomično vzetje. """ print(f"🚜 Queue worker #{worker_id} zagnan", flush=True) while True: try: # Najdi naslednjega "queued" joba ki ni že v obdelavi with _queue_lock: in_progress = _queue_running["jobs_in_progress"] queued_jobs = [] for f in JOBS_DIR.glob("*.json"): try: j = json.loads(f.read_text()) jid = j.get("id") if j.get("status") == "queued" and jid not in in_progress: queued_jobs.append(j) except Exception: continue if not queued_jobs: next_job = None else: queued_jobs.sort(key=lambda x: x.get("created_at", 0)) next_job = queued_jobs[0] # Atomično rezerviraj _queue_running["jobs_in_progress"].add(next_job["id"]) if not next_job: time.sleep(2) continue job_id = next_job["id"] print(f"🚜 Worker #{worker_id}: obdelujem {job_id}", flush=True) try: process_job(job_id) except Exception as e: print(f"❌ Worker #{worker_id} error pri {job_id}: {e}", flush=True) update_job(job_id, status="failed", error=f"Worker #{worker_id}: {e}") finally: with _queue_lock: _queue_running["jobs_in_progress"].discard(job_id) except Exception as e: print(f"❌ Worker #{worker_id} outer error: {e}", flush=True) time.sleep(5) # Zaženi N worker-jev v ozadju (samo enkrat) _worker_threads = [] def _start_queue_worker(): global _worker_threads if _worker_threads: # Preveri da so vsi alive alive = [t for t in _worker_threads if t.is_alive()] if len(alive) == NUM_WORKERS: return _worker_threads = [] for i in range(NUM_WORKERS): t = threading.Thread(target=_queue_worker, args=(i+1,), daemon=True) t.start() _worker_threads.append(t) print(f"🚜 Zagnal {NUM_WORKERS} paralelnih worker-jev", flush=True) @app.on_event("startup") async def resume_or_cleanup_jobs(): """Ob startu containerja: avto-resume processing jobs ali jih označi kot error. Ko Coolify deploya nov container, prejšnji se ubije sredi obdelave, JSON file pa ostane status='processing'. Strategija: - Če je analyze.json že narejen (analiza je končana) → resume z reframe+subs - Če ni analyze.json → restart pipeline od začetka - Po 3 napakah (resume_attempts >= 3) → mark error """ import asyncio print("🔄 Preverjam in obnavljam jobs po restart-u...") resumed_count = 0 error_count = 0 for f in JOBS_DIR.glob("*.json"): try: j = json.loads(f.read_text()) if j.get("status") != "processing": continue job_id = j.get("job_id") or f.stem attempts = j.get("resume_attempts", 0) # Po 3 neuspehih nehamo if attempts >= 3: j["status"] = "error" j["current_step"] = "Preveč napak pri ponovnem zagonu" j["chorus_error"] = f"Job restartal {attempts} krat — napaka v pipeline-u" j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) error_count += 1 continue # Preveri ali input file še obstaja input_path = UPLOAD_DIR / f"{job_id}.mp4" if not input_path.exists(): j["status"] = "error" j["current_step"] = "Vhodna datoteka ne obstaja" j["chorus_error"] = f"Upload {job_id}.mp4 ne obstaja po restart-u" j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) error_count += 1 continue # Resume: status=queued, +1 attempt, in pošlji v background j["status"] = "queued" j["resume_attempts"] = attempts + 1 j["current_step"] = f"Avto-resume po restart-u (poskus {attempts + 1}/3)" j["last_resume_at"] = time.time() j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) resumed_count += 1 # Pošlji v background po startup-u (ne smemo blokirati startup) asyncio.create_task(_resume_job_async(job_id)) print(f" 🔁 Resume {job_id} (attempt {attempts + 1}/3)") except Exception as e: print(f" ⚠️ Napaka pri {f.name}: {e}") print(f" ✅ Resumed: {resumed_count}, Error: {error_count}") # Zaženi queue worker za queued jobe (multi-upload batch) _start_queue_worker() async def _resume_job_async(job_id): """Pomožna funkcija ki zažene process_job v background-u.""" import asyncio # Počakaj kratek čas da je startup končan await asyncio.sleep(2) try: # process_job je sync funkcija, izvedi v thread executor loop = asyncio.get_event_loop() await loop.run_in_executor(None, process_job, job_id) except Exception as e: print(f" ❌ Resume failed for {job_id}: {e}") # ──────────────────────────────────────────────────────────────── # Google Sign-In endpointi # ──────────────────────────────────────────────────────────────── @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request, error: Optional[str] = None): """Login stran z Google Sign-In gumbom.""" # Če že prijavljen, redirect na / token = request.cookies.get(SESSION_COOKIE_NAME) if token and _verify_session_token(token): email = _verify_session_token(token) if email and _email_allowed(email): return RedirectResponse(url="/", status_code=303) error_html = "" if error == "not_allowed": error_html = '
❌ Tvoj račun nima dostopa. Kontaktiraj Sebastjana, da te doda.
' elif error == "invalid_token": error_html = '
❌ Neveljaven Google token. Probaj še enkrat.
' elif error == "no_client_id": error_html = '
⚠️ Google Sign-In ni konfiguriran. Nastavi GOOGLE_CLIENT_ID env var.
' if not GOOGLE_CLIENT_ID: gsi_html = '
⚠️ Google Sign-In ni konfiguriran (GOOGLE_CLIENT_ID manjka).
' else: gsi_html = f"""
""" html = f""" Prijava — reels.biba.live

🎬 reels.biba.live

Prijava z Google računom
{error_html}
{gsi_html}
""" return html class GoogleCallbackRequest(BaseModel): credential: str # Google ID token (JWT) @app.post("/auth/google/callback") async def google_callback(payload: GoogleCallbackRequest): """Verify Google ID token in set session cookie.""" if not GOOGLE_CLIENT_ID: raise HTTPException(503, "no_client_id") claims = _verify_google_id_token(payload.credential) if not claims: raise HTTPException(401, "invalid_token") email = claims.get("email", "").lower() if not _email_allowed(email): raise HTTPException(403, "not_allowed") # Set session cookie token = _make_session_token(email) resp = JSONResponse({"ok": True, "email": email, "name": claims.get("name", "")}) resp.set_cookie( key=SESSION_COOKIE_NAME, value=token, max_age=SESSION_TTL_SECONDS, httponly=True, secure=True, samesite="lax", path="/", ) print(f"✅ Login: {email} ({claims.get('name','?')})", flush=True) return resp @app.get("/auth/me") async def auth_me(request: Request): """Vrne trenutno prijavljenega uporabnika (za frontend header).""" token = request.cookies.get(SESSION_COOKIE_NAME) if token: email = _verify_session_token(token) if email and _email_allowed(email): return {"email": email, "method": "google"} # Tudi basic auth (če cron uporablja) auth_header = request.headers.get("authorization", "") if auth_header.startswith("Basic "): try: import base64 decoded = base64.b64decode(auth_header[6:]).decode() user, _, pwd = decoded.partition(":") if secrets.compare_digest(user, AUTH_USER) and secrets.compare_digest(pwd, AUTH_PASS): return {"email": user, "method": "basic"} except Exception: pass raise HTTPException(401, "Not logged in") @app.post("/logout") async def logout(): """Pobriše session cookie + redirect na /login.""" resp = RedirectResponse(url="/login", status_code=303) resp.delete_cookie(SESSION_COOKIE_NAME, path="/") return resp @app.get("/logout") async def logout_get(): """GET varianta za linke.""" resp = RedirectResponse(url="/login", status_code=303) resp.delete_cookie(SESSION_COOKIE_NAME, path="/") return resp @app.get("/", response_class=HTMLResponse) async def index(user: str = Depends(check_auth)): html = (Path(__file__).parent.parent / "templates" / "index.html").read_text() return html @app.get("/healthz") async def healthz(): return {"ok": True} # ──────────────────────────────────────────────────────────────── # Job models # ──────────────────────────────────────────────────────────────── class YouTubeJobIn(BaseModel): url: str mode: str = "track" lang: Optional[str] = None auto_chorus: bool = True start: Optional[float] = None duration: Optional[float] = 30 no_subs: bool = False subtitle_style: str = "reels" whisper_model: str = "large-v3" quality: str = "medium" tv_station: str = "FOLX SLO" # za Nextcloud target mapo class StartJobIn(BaseModel): job_id: str mode: str = "track" lang: Optional[str] = None # None/auto = Whisper auto-detect auto_chorus: bool = True include_prebuild: bool = False # vključi pre-chorus build-up start: Optional[float] = None duration: Optional[float] = 30 max_duration: Optional[float] = 45 min_duration: Optional[float] = 20 no_subs: bool = False subtitle_style: str = "reels" whisper_model: str = "large-v3" quality: str = "medium" # LLM za semantično analizo + popravke llm_provider: str = "claude" # claude / gemini / auto llm_model: Optional[str] = None # specifičen model (privzeto najboljši za provider) # STT provider (Scribe je 18x hitreje + boljša multilingual accuracy) whisper_provider: str = "auto" # auto / elevenlabs / local # Batch tracking za multi-upload (Telegram summary) batch_id: Optional[str] = None # TV postaja — določa Nextcloud target mapo tv_station: str = "FOLX SLO" # ──────────────────────────────────────────────────────────────── # Dedup check # ──────────────────────────────────────────────────────────────── class DedupCheckRequest(BaseModel): filenames: list[str] tv_station: str = "FOLX SLO" @app.post("/api/dedup/check") async def dedup_check_endpoint(payload: DedupCheckRequest, user: str = Depends(check_auth)): """Preveri katere filename so že obdelane (na isti TV postaji). Vrne dict { filename: {match} | null } """ result = {} for fn in payload.filenames: match = dedup_check(fn, payload.tv_station) result[fn] = match return {"results": result, "tv_station": payload.tv_station} @app.post("/api/dedup/remove") async def dedup_remove_endpoint(payload: DedupCheckRequest, user: str = Depends(check_auth)): """Izbriši dedup zapise — uporabnik želi re-narediti komad.""" for fn in payload.filenames: dedup_remove(fn, payload.tv_station) return {"ok": True, "removed": payload.filenames} @app.get("/api/dedup/list") async def dedup_list(tv_station: Optional[str] = None, user: str = Depends(check_auth)): """Seznam vseh obdelanih komadov (opcijsko filtrirano po TV postaji).""" import sqlite3 _dedup_init() conn = sqlite3.connect(str(DEDUP_DB)) conn.row_factory = sqlite3.Row if tv_station: rows = conn.execute( "SELECT * FROM processed_videos WHERE tv_station = ? ORDER BY uploaded_at DESC", (tv_station,) ).fetchall() else: rows = conn.execute( "SELECT * FROM processed_videos ORDER BY uploaded_at DESC" ).fetchall() conn.close() return {"count": len(rows), "items": [dict(r) for r in rows]} # ──────────────────────────────────────────────────────────────── # Upload (file) # ──────────────────────────────────────────────────────────────── @app.post("/api/upload") async def upload_video( file: UploadFile = File(...), artist: Optional[str] = Form(None), title: Optional[str] = Form(None), batch_id: Optional[str] = Form(None), user: str = Depends(check_auth), ): if not file.filename: raise HTTPException(400, "Brez imena") job_id = uuid.uuid4().hex[:12] ext = Path(file.filename).suffix or ".mp4" input_path = UPLOAD_DIR / f"{job_id}{ext}" size = 0 with input_path.open("wb") as f: while chunk := await file.read(1024 * 1024): size += len(chunk) if size > MAX_UPLOAD_MB * 1024 * 1024: f.close() input_path.unlink(missing_ok=True) raise HTTPException(413, f"Prevelika datoteka (limit {MAX_UPLOAD_MB} MB)") f.write(chunk) job = { "id": job_id, "source_type": "upload", "filename": file.filename, "input_path": str(input_path), "size_mb": round(size / 1024 / 1024, 2), "status": "uploaded", "current_step": "Naloženo, čaka na obdelavo", "created_at": time.time(), "updated_at": time.time(), } if batch_id: job["batch_id"] = batch_id # Artist + title — najprej user-provided, potem Qnet match, potem parse iz filename # Qnet match poskusimo VEDNO (audit + tv_station auto-fill), ne glede na to # ali je client že poslal artist+title (jih je morda dobil iz Qnet match-a sam) qm = qnet_match.match_filename(file.filename) if qm["matched"] and qm["confidence"] >= 0.85: job["qnet_match"] = { "method": qm["method"], "confidence": qm["confidence"], "matched_file": qm["file"], "matched_station": qm["station"], } # Auto-set tv_station, če še ni — uporabnik ga lahko ročno spremeni v UI if not job.get("tv_station"): job["tv_station"] = qm["station"] if artist and title: # User je vpisal ali potrdil (lahko prihaja iz client-side Qnet match-a) job["parsed_artist"] = artist.strip() job["parsed_title"] = title.strip() job["has_clean_name"] = True else: # Fallback: Qnet match (server-side) → parse_artist_title regex if qm["matched"] and qm["confidence"] >= 0.85: job["parsed_artist"] = qm["artist"] job["parsed_title"] = qm["title"] job["has_clean_name"] = True print(f"🎯 Qnet match [{qm['method']}, {qm['confidence']}]: " f"{qm['station']} — {qm['artist']} — {qm['title']}", flush=True) else: # Fallback: regex parser a, t = parse_artist_title(file.filename) if a: job["parsed_artist"] = a if t: job["parsed_title"] = t job["has_clean_name"] = bool(a and t) save_job(job) # S3 mirror — direct upload datoteka _persist_to_s3(input_path, "upload") return job # ──────────────────────────────────────────────────────────────── # YouTube submit # ──────────────────────────────────────────────────────────────── class YouTubePlaylistPreviewIn(BaseModel): url: str @app.post("/api/youtube/playlist-preview") async def youtube_playlist_preview(payload: YouTubePlaylistPreviewIn, user: str = Depends(check_auth)): """Resolve YouTube URL — vrne listo videov če je playlist, ali en item. Frontend uporabi to za preview pred submit-om: če je playlist, user vidi koliko komadov je in lahko potrdi. """ import sys as _sys _sys.path.insert(0, str(SCRIPTS_DIR)) try: from yt_download import get_playlist except ImportError: raise HTTPException(500, "yt_download modul ne dostopen") try: info = get_playlist(payload.url) except subprocess.TimeoutExpired: raise HTTPException(504, "yt-dlp timeout (>120s) — preveri URL") except Exception as e: raise HTTPException(500, f"yt-dlp napaka: {e}") if info.get("error"): raise HTTPException(400, f"yt-dlp: {info['error']}") if not info.get("items"): raise HTTPException(400, "Ni najdenih videov na tem URL-u") return info @app.post("/api/youtube") async def submit_youtube( payload: YouTubeJobIn, background: BackgroundTasks, user: str = Depends(check_auth), ): """Submit YouTube URL — single video ali cela playlist. Če URL vsebuje 'list=' parameter, resolve-amo playlist in kreiramo batch jobs (en job za vsak video). Single video kreira en job. """ url = payload.url.strip() is_playlist_url = "list=" in url and "watch?" not in url.split("list=")[0].split("&")[0] # Bolj robustno: če ima list= ampak tudi v= potem je morda single video iz playliste # → tretiraj kot single (default) has_list = "list=" in url has_v = "v=" in url or "/watch?" in url or "youtu.be/" in url treat_as_playlist = has_list and not has_v # samo "playlist?list=..." URLs # Pa tudi če je explicit "/playlist?" v URL-u if "/playlist?" in url: treat_as_playlist = True if treat_as_playlist: # Resolve playlist import sys as _sys _sys.path.insert(0, str(SCRIPTS_DIR)) try: from yt_download import get_playlist except ImportError: raise HTTPException(500, "yt_download modul ne dostopen") try: info = get_playlist(url) except Exception as e: raise HTTPException(500, f"Playlist resolve napaka: {e}") items = info.get("items", []) if not items: raise HTTPException(400, "Playlist je prazen ali ni dostopen") # Batch ID za grupiranje batch_id = "yt-batch-" + uuid.uuid4().hex[:8] playlist_title = info.get("playlist_title", "YouTube Playlist") created_jobs = [] for item in items: job_id = uuid.uuid4().hex[:12] # Probaj match proti Qnet bazi (parsa Artist - Title iz YouTube naslova) yt_title = item.get("title", "") qm = qnet_match.match_filename(yt_title) job = { "id": job_id, "source_type": "youtube", "youtube_url": item["url"], "youtube_title": yt_title, "youtube_id": item.get("id"), "youtube_uploader": item.get("uploader", ""), "playlist_url": url, "playlist_title": playlist_title, "batch_id": batch_id, "status": "queued", "current_step": "V vrsti za YouTube prenos", "created_at": time.time(), "updated_at": time.time(), "mode": payload.mode, "lang": payload.lang, "auto_chorus": payload.auto_chorus, "start": payload.start, "duration": payload.duration, "no_subs": payload.no_subs, "subtitle_style": payload.subtitle_style, "whisper_model": payload.whisper_model, "quality": payload.quality, "tv_station": payload.tv_station, } # Qnet match — auto-fill clean Artist/Title in tv_station če je matched if qm["matched"] and qm["confidence"] >= 0.85: job["parsed_artist"] = qm["artist"] job["parsed_title"] = qm["title"] job["has_clean_name"] = True job["qnet_match"] = { "method": qm["method"], "confidence": qm["confidence"], "matched_file": qm["file"], "matched_station": qm["station"], } # Auto-set tv_station iz Qnet match-a (override default) job["tv_station"] = qm["station"] else: # Fallback: regex parser na YT naslovu a, t = parse_artist_title(yt_title) if a: job["parsed_artist"] = a if t: job["parsed_title"] = t job["has_clean_name"] = bool(a and t) save_job(job) created_jobs.append(job) print(f"📋 YouTube playlist '{playlist_title}': {len(created_jobs)} jobov v batch {batch_id}", flush=True) return { "is_playlist": True, "batch_id": batch_id, "playlist_title": playlist_title, "count": len(created_jobs), "jobs": created_jobs, } # ─── Single video ─── job_id = uuid.uuid4().hex[:12] job = { "id": job_id, "source_type": "youtube", "youtube_url": url, "status": "queued", "current_step": "V vrsti za YouTube prenos", "created_at": time.time(), "updated_at": time.time(), "mode": payload.mode, "lang": payload.lang, "auto_chorus": payload.auto_chorus, "start": payload.start, "duration": payload.duration, "no_subs": payload.no_subs, "subtitle_style": payload.subtitle_style, "whisper_model": payload.whisper_model, "quality": payload.quality, "tv_station": payload.tv_station, } # Fetch YT metadata že ob submit (preko yt.biba.live API ali lokalni yt-dlp). # Uporabljamo isti yt_download modul, da konsistentno dobimo vse pomembne polja. # Če fetch ne uspe, gre job naprej brez metadat — worker bo še enkrat probal. try: import sys as _sys _sys.path.insert(0, str(SCRIPTS_DIR)) from yt_download import get_info as yt_get_info info = yt_get_info(url) if info: yt_title = info.get("title", "") or "" job["youtube_title"] = yt_title job["youtube_id"] = info.get("id") or "" job["youtube_uploader"] = info.get("uploader") or info.get("channel") or "" job["youtube_duration"] = info.get("duration") job["youtube_thumbnail"] = info.get("thumbnail") or "" job["youtube_description"] = (info.get("description") or "")[:2000] # cap na 2KB job["youtube_upload_date"] = info.get("upload_date") or "" job["youtube_webpage_url"] = info.get("webpage_url") or url # Qnet match na YT naslovu (kot pri playlist) if yt_title: qm = qnet_match.match_filename(yt_title) if qm["matched"] and qm["confidence"] >= 0.85: job["parsed_artist"] = qm["artist"] job["parsed_title"] = qm["title"] job["has_clean_name"] = True job["qnet_match"] = { "method": qm["method"], "confidence": qm["confidence"], "matched_file": qm["file"], "matched_station": qm["station"], } # Auto-set tv_station iz Qnet match-a (override default) job["tv_station"] = qm["station"] else: # Fallback: regex parser na YT naslovu a, t = parse_artist_title(yt_title) if a: job["parsed_artist"] = a if t: job["parsed_title"] = t job["has_clean_name"] = bool(a and t) except Exception as e: print(f"⚠️ Cannot fetch YT metadata at submit: {e}", flush=True) save_job(job) return job # ──────────────────────────────────────────────────────────────── # Start processing for uploaded job # ──────────────────────────────────────────────────────────────── @app.post("/api/process") async def start_processing( payload: StartJobIn, background: BackgroundTasks, user: str = Depends(check_auth), ): job = load_job(payload.job_id) if not job: raise HTTPException(404, "Job ne obstaja") update_job( payload.job_id, status="queued", mode=payload.mode, lang=payload.lang, auto_chorus=payload.auto_chorus, include_prebuild=payload.include_prebuild, start=payload.start, duration=payload.duration, max_duration=payload.max_duration, min_duration=payload.min_duration, no_subs=payload.no_subs, subtitle_style=payload.subtitle_style, whisper_model=payload.whisper_model, quality=payload.quality, llm_provider=payload.llm_provider, llm_model=payload.llm_model, whisper_provider=payload.whisper_provider, batch_id=payload.batch_id, tv_station=payload.tv_station, current_step="V vrsti za obdelavo", # Počisti pretekle napake (retry-friendly) chorus_error=None, interrupted_at=None, ) # Queue worker (background thread) bo pograbil ta job — ne zaganjamo neposredno. # To pomeni, da se ob več upload-ih obdelujejo zaporedno (1 hkrati = stabilno). return load_job(payload.job_id) # ──────────────────────────────────────────────────────────────── # Job queries # ──────────────────────────────────────────────────────────────── @app.get("/api/jobs") async def get_jobs(user: str = Depends(check_auth)): return {"jobs": list_jobs()} @app.get("/api/jobs/{job_id}") async def get_job(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") return _clean_job_titles(job) @app.get("/api/stream/{job_id}") async def stream_job(job_id: str, user: str = Depends(check_auth)): """Server-Sent Events za real-time status.""" async def gen(): last_status = None last_step = None for _ in range(600): # max 10 min stream job = load_job(job_id) if not job: yield f"data: {json.dumps({'error': 'not found'})}\n\n" return if job["status"] != last_status or job.get("current_step") != last_step: yield f"data: {json.dumps(job, ensure_ascii=False)}\n\n" last_status = job["status"] last_step = job.get("current_step") if job["status"] in ("done", "failed"): return await asyncio.sleep(1) return StreamingResponse(gen(), media_type="text/event-stream") # ──────────────────────────────────────────────────────────────── # Download / preview # ──────────────────────────────────────────────────────────────── @app.get("/api/download/{job_id}") async def download(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) _ensure_local(out, "output") if not out.exists(): raise HTTPException(404, "Output ne obstaja") # Pametno ime: "Izvajalec - Naslov - REEL.mp4" download_name = build_download_filename(job) return FileResponse( out, media_type="video/mp4", filename=download_name, ) @app.get("/api/preview/{job_id}") async def preview(job_id: str, request: Request, user: str = Depends(check_auth)): """Video preview z Range request podporo (potrebno za HTML5 video player).""" job = load_job(job_id) if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) _ensure_local(out, "output") if not out.exists(): raise HTTPException(404, "Output ne obstaja") file_size = out.stat().st_size range_header = request.headers.get("range") or request.headers.get("Range") if range_header: # Parse "bytes=START-END" try: range_str = range_header.replace("bytes=", "").strip() start_s, end_s = range_str.split("-") start = int(start_s) if start_s else 0 end = int(end_s) if end_s else file_size - 1 end = min(end, file_size - 1) if start > end or start >= file_size: return Response(status_code=416) # Range Not Satisfiable chunk_size = end - start + 1 def iter_file(): with open(out, "rb") as f: f.seek(start) remaining = chunk_size while remaining > 0: read_size = min(64 * 1024, remaining) data = f.read(read_size) if not data: break remaining -= len(data) yield data headers = { "Content-Range": f"bytes {start}-{end}/{file_size}", "Accept-Ranges": "bytes", "Content-Length": str(chunk_size), "Content-Type": "video/mp4", "Cache-Control": "no-cache, must-revalidate", # browser ne sme cachat starega output-a } return StreamingResponse(iter_file(), status_code=206, headers=headers, media_type="video/mp4") except (ValueError, IndexError): pass # Brez Range — vrni cel file return FileResponse( out, media_type="video/mp4", headers={ "Accept-Ranges": "bytes", "Content-Length": str(file_size), "Cache-Control": "no-cache, must-revalidate", # browser ne sme cachat starega output-a }, ) @app.delete("/api/jobs/{job_id}") async def delete_job(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") # ── 1. Nextcloud (če je bil naložen) ────────────────────── nc_filename = None nc_subdir = None nc_status = None if job.get("nextcloud_status") == "uploaded" or job.get("nextcloud_url"): try: station = job.get("tv_station", "") nc_subdir = f"{NEXTCLOUD_REELS_PATH.strip('/')}/{_nextcloud_folder_for_station(station)}" nc_filename = build_download_filename(job) ok, msg = _nextcloud_delete(nc_filename, target_subdir=nc_subdir) nc_status = ("ok" if ok else f"fail: {msg}") print(f"🗑️ NC delete {nc_filename} → {nc_status}", flush=True) except Exception as e: nc_status = f"error: {e}" print(f"⚠️ NC delete error: {e}", flush=True) # ── 2. Dedup DB (processed_videos) ──────────────────────── try: if nc_filename and job.get("tv_station"): dedup_remove(nc_filename, job["tv_station"]) # Tudi po job_id — če je shranjen pod drugim filename # (Glavno: zbriši zapis, da se enaka pesem lahko spet upload-a) except Exception as e: print(f"⚠️ Dedup remove error: {e}", flush=True) # ── 3. Lokal + S3: glavni input + output ────────────────── for key, kind in (("input_path", "upload"), ("output_path", "output")): p = job.get(key) if p: local_p = Path(p) local_p.unlink(missing_ok=True) _delete_from_s3(local_p.name, kind) # ── 4. Pomožne datoteke (analysis, subtitles, low-q, waveform) ── for fname in ( f"{job_id}.mp4", f"{job_id}.analysis.json", f"{job_id}.subtitles.srt", f"{job_id}.subtitles.ass", f"{job_id}_source_low.mp4", ): f = OUTPUT_DIR / fname f.unlink(missing_ok=True) _delete_from_s3(fname, "output") # Waveform PNG-ji (več velikosti) — listanje ker imena niso fiksna try: for wf in OUTPUT_DIR.glob(f"{job_id}_waveform_*.png"): wf_name = wf.name wf.unlink(missing_ok=True) _delete_from_s3(wf_name, "output") except Exception: pass # YT info.json + ostali yt-dlp artifacti (.part, .f137.mp4, ...) try: for f in UPLOAD_DIR.glob(f"{job_id}_yt*"): f_name = f.name f.unlink(missing_ok=True) _delete_from_s3(f_name, "upload") except Exception: pass # ── 5. Job metadata (lokal + S3) ────────────────────────── jp = job_path(job_id) jp.unlink(missing_ok=True) _delete_from_s3(f"{job_id}.json", "job_meta") return { "deleted": job_id, "nextcloud_delete": nc_status, "nextcloud_filename": nc_filename, } # ─── EDIT FEATURE ──────────────────────────────────────────────── @app.get("/api/source-video/{job_id}") async def source_video(job_id: str, quality: str = "high", user: str = Depends(check_auth)): """Vrne 16:9 original video za predogled v editorju. quality='high' (default): originalni file quality='low': 480p re-encoded version (cached) — za hitro scrubbanje v editorju """ job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") if not src: raise HTTPException(404, "Original video ne obstaja") _ensure_local(src, "upload") if not Path(src).exists(): raise HTTPException(404, "Original video ne obstaja") if quality == "low": # 480p cached za hitro scrubbanje cache_path = OUTPUT_DIR / f"{job_id}_source_low.mp4" # Najprej probaj fetch iz S3 (po cleanupu lahko manjka lokalno) _ensure_local(cache_path, "output") cache_valid = cache_path.exists() and cache_path.stat().st_size > 1024 if not cache_valid: if cache_path.exists(): cache_path.unlink() cmd = [ "ffmpeg", "-y", "-i", str(src), "-vf", "scale=854:480:force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2", "-c:v", "libx264", "-preset", "veryfast", # malo boljša kvaliteta od ultrafast "-crf", "28", "-c:a", "aac", "-b:a", "96k", "-movflags", "+faststart", "-loglevel", "error", str(cache_path), ] try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if proc.returncode != 0 or not cache_path.exists() or cache_path.stat().st_size < 1024: if cache_path.exists(): cache_path.unlink() raise HTTPException(500, f"FFmpeg failed: {(proc.stderr or 'unknown')[-300:]}") # Mirror v S3 po regeneraciji _persist_to_s3(cache_path, "output") except subprocess.TimeoutExpired: if cache_path.exists(): cache_path.unlink() raise HTTPException(500, "Low-q source render timeout (>120s)") return FileResponse( path=cache_path, media_type="video/mp4", filename=f"source_low_{job_id}.mp4", headers={"Accept-Ranges": "bytes", "Cache-Control": "max-age=3600"}, ) return FileResponse( path=src, media_type="video/mp4", filename=f"source_{job_id}.mp4", headers={"Accept-Ranges": "bytes"}, ) @app.get("/api/waveform/{job_id}") async def waveform(job_id: str, width: int = 1200, height: int = 80, user: str = Depends(check_auth)): """Vrne PNG waveform celotne pesmi za visualizacijo v Edit modalu. Cache enkrat per pesem, file size ~10-50 KB. """ job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") if not src: raise HTTPException(404, "Original video ne obstaja") _ensure_local(src, "upload") if not Path(src).exists(): raise HTTPException(404, "Original video ne obstaja") width = max(400, min(width, 3000)) height = max(40, min(height, 200)) cache_path = OUTPUT_DIR / f"{job_id}_waveform_{width}x{height}.png" _ensure_local(cache_path, "output") cache_valid = cache_path.exists() and cache_path.stat().st_size > 100 if not cache_valid: if cache_path.exists(): cache_path.unlink() # ffmpeg showwavespic filter — generira en sam PNG s celotnim waveformom # colors: rdeč #ff6b6b kot accent cmd = [ "ffmpeg", "-y", "-i", str(src), "-filter_complex", f"[0:a]aformat=channel_layouts=mono,showwavespic=s={width}x{height}:colors=#ff6b6b:scale=lin:draw=full[wave]", "-map", "[wave]", "-frames:v", "1", "-loglevel", "error", str(cache_path), ] try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if proc.returncode != 0 or not cache_path.exists() or cache_path.stat().st_size < 100: if cache_path.exists(): cache_path.unlink() raise HTTPException(500, f"Waveform render failed: {(proc.stderr or 'unknown')[-300:]}") # Mirror v S3 po regeneraciji _persist_to_s3(cache_path, "output") except subprocess.TimeoutExpired: if cache_path.exists(): cache_path.unlink() raise HTTPException(500, "Waveform render timeout (>30s)") return FileResponse( path=cache_path, media_type="image/png", headers={"Cache-Control": "max-age=3600"}, ) @app.api_route("/api/preview-clip/{job_id}", methods=["GET", "HEAD"]) async def preview_clip( job_id: str, start: float, end: float, user: str = Depends(check_auth), ): """Live preview odseka + 10s konteksta pred/po — low-quality 480p clip. Vrne odsek od (start-10s) do (end+10s), 480p quality. Frontend lahko free-scruba v tem range-u + drag-a handle. BREZ reframe (16:9 ostane), BREZ napisov, BREZ face tracking. Cache po start+end timestampih. """ job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") if not src: raise HTTPException(404, "Original video ne obstaja") _ensure_local(src, "upload") if not Path(src).exists(): raise HTTPException(404, "Original video ne obstaja") if end <= start: raise HTTPException(400, "end mora biti večji od start") duration = end - start if duration < 1: raise HTTPException(400, "Trajanje vsaj 1s") if duration > 90: raise HTTPException(400, "Trajanje največ 90s") # Razširi z 10s pred in po (kontekst za fine-tune) CONTEXT_BEFORE = 10.0 CONTEXT_AFTER = 10.0 extract_start = max(0, start - CONTEXT_BEFORE) extract_end = end + CONTEXT_AFTER extract_duration = extract_end - extract_start # Cache key — razširjen preview se shrani po orig start+end cache_key = f"{job_id}_preview_ctx_{start:.1f}_{end:.1f}.mp4" cache_path = OUTPUT_DIR / cache_key # Validate cache: datoteka mora obstajati IN biti vsaj 1KB cache_valid = cache_path.exists() and cache_path.stat().st_size > 1024 if not cache_valid: if cache_path.exists(): cache_path.unlink() cmd = [ "ffmpeg", "-y", "-ss", f"{max(0, extract_start - 0.5):.2f}", "-i", str(src), "-ss", f"{min(0.5, extract_start):.2f}", "-t", f"{extract_duration:.2f}", "-vf", "scale=854:480:force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "30", "-c:a", "aac", "-b:a", "96k", "-movflags", "+faststart", "-loglevel", "error", str(cache_path), ] try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=20) if proc.returncode != 0 or not cache_path.exists() or cache_path.stat().st_size < 1024: if cache_path.exists(): cache_path.unlink() raise HTTPException(500, f"FFmpeg failed: {(proc.stderr or 'unknown')[-300:]}") except subprocess.TimeoutExpired: if cache_path.exists(): cache_path.unlink() raise HTTPException(500, "Preview render timeout (>20s)") return FileResponse( path=cache_path, media_type="video/mp4", headers={ "Accept-Ranges": "bytes", "Cache-Control": "max-age=300", # Vrnem original start kot custom header da frontend ve mapiranje "X-Preview-Original-Start": f"{extract_start:.2f}", "X-Preview-Original-End": f"{extract_end:.2f}", }, ) @app.get("/api/transcript/{job_id}") async def get_transcript(job_id: str, user: str = Depends(check_auth)): """Vrne transkript (segmente + words) za editor.""" job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" _ensure_local(analysis_path, "output") if not analysis_path.exists(): raise HTTPException(404, "Analysis ne obstaja") try: analysis = json.loads(analysis_path.read_text()) transcript = analysis.get("transcript", {}) clip_range = analysis.get("clip_range", {}) # video_duration: probaj iz analysis, sicer iz zadnjega segmenta, sicer 0 video_dur = analysis.get("video_duration") or job.get("video_duration") or 0 if not video_dur and transcript.get("segments"): video_dur = max((s.get("end", 0) for s in transcript["segments"]), default=0) return { "segments": transcript.get("segments", []), "language": transcript.get("language", "?"), "clip_range": clip_range, "video_duration": video_dur, } except Exception as e: raise HTTPException(500, f"Napaka pri branju: {e}") # ─── Qnet song match (MB player baza) ──────────────────────────── @app.get("/api/qnet/stats") async def qnet_stats(user: str = Depends(check_auth)): """Statistika Qnet baze (koliko songov, koliko star, po postajah).""" return qnet_match.db_stats() @app.get("/api/qnet/match") async def qnet_match_filename(filename: str, user: str = Depends(check_auth)): """Test endpoint — vrne match result za poljuben filename.""" if not filename: raise HTTPException(400, "filename query param required") return qnet_match.match_filename(filename) class QnetBatchMatchRequest(BaseModel): filenames: list[str] min_confidence: float = 0.85 @app.post("/api/qnet/match-batch") async def qnet_match_batch(payload: QnetBatchMatchRequest, user: str = Depends(check_auth)): """Batch match — kliče client ob izbiri datotek za live preview v queue.""" results = {} for fn in payload.filenames: if not fn: continue r = qnet_match.match_filename(fn) # Vrni samo zadetke nad minimum confidence; sicer null if r["matched"] and r["confidence"] >= payload.min_confidence: results[fn] = r else: results[fn] = None return {"results": results} @app.post("/api/qnet/sync") async def qnet_sync(background: BackgroundTasks, user: str = Depends(check_auth)): """Sproži sync (fetch Songs.txt iz vseh playerjev). Async background task.""" sync_script = SCRIPTS_DIR / "sync_qnet.py" if not sync_script.exists(): raise HTTPException(500, f"sync_qnet.py ne obstaja v {SCRIPTS_DIR}") def run_sync(): try: import subprocess env = os.environ.copy() env["QNET_DB_PATH"] = str(QNET_DIR / "songs.json") env["QNET_LOOKUP_PATH"] = str(QNET_DIR / "songs_lookup.json") proc = subprocess.run( ["python3", str(sync_script)], env=env, capture_output=True, text=True, timeout=300, ) print(f"[qnet sync] exit={proc.returncode}", flush=True) if proc.stdout: print(f"[qnet sync] stdout:\n{proc.stdout}", flush=True) if proc.stderr: print(f"[qnet sync] stderr:\n{proc.stderr}", flush=True) except Exception as e: print(f"[qnet sync] error: {e}", flush=True) background.add_task(run_sync) return {"started": True, "lookup_path": str(QNET_DIR / "songs_lookup.json")} class RecutRequest(BaseModel): start: float end: float custom_segments: Optional[list] = None # [{start, end, text}] za override napisov no_subs: Optional[bool] = None auto_upload: bool = True # po končanem recut avto-naloži v Nextcloud (default: da, ker user je že pregledal) # ─── Nextcloud upload ───────────────────────────────────────────── NEXTCLOUD_URL = os.environ.get("NEXTCLOUD_URL", "").rstrip("/") NEXTCLOUD_USER = os.environ.get("NEXTCLOUD_USER", "") NEXTCLOUD_PASS = os.environ.get("NEXTCLOUD_PASS", "") NEXTCLOUD_REELS_PATH = os.environ.get("NEXTCLOUD_REELS_PATH", "folxspeed/REELS") # Mapping: Qnet baza station naziv → Nextcloud folder naziv. # Qnet Songs.txt ima krajsa imena ("FOLX SLO", "ONE", "ZWEI"), Nextcloud # pa daljse ("FOLX SLOVENIJA", "ONE DE", "ZWEI MUSIC"). Pri uploadu mapiramo. STATION_TO_NEXTCLOUD_FOLDER = { "FOLX SLO": "FOLX SLOVENIJA", "FOLX SLOVENIJA": "FOLX SLOVENIJA", "FOLX DE": "FOLX DE", "ONE": "ONE DE", "ONE DE": "ONE DE", "ZWEI": "ZWEI MUSIC", "ZWEI MUSIC": "ZWEI MUSIC", "ADRIA": "ADRIA", } def _nextcloud_folder_for_station(tv_station: str) -> str: """Vrne pravi Nextcloud folder za dano tv_station (z mapping fallback).""" if not tv_station: return "FOLX SLOVENIJA" return STATION_TO_NEXTCLOUD_FOLDER.get(tv_station.strip(), tv_station.strip()) def _nextcloud_configured(): return bool(NEXTCLOUD_URL and NEXTCLOUD_USER and NEXTCLOUD_PASS) def _nextcloud_upload(local_path: str, remote_filename: str, target_subdir: str = None): """Naloži datoteko na Nextcloud preko WebDAV (stdlib urllib). Vrne (success: bool, url: str | error_msg: str). target_subdir = opcijsko podmapo (default: NEXTCLOUD_REELS_PATH) """ if not _nextcloud_configured(): return False, "Nextcloud ni konfiguriran (manjka NEXTCLOUD_URL/USER/PASS env)" if not Path(local_path).exists(): return False, f"Datoteka ne obstaja: {local_path}" import urllib.request, urllib.error, base64 from urllib.parse import quote base_path = (target_subdir or NEXTCLOUD_REELS_PATH).strip("/") safe_dir = "/".join(quote(p, safe="") for p in base_path.split("/")) safe_file = quote(remote_filename, safe="") url = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/{safe_dir}/{safe_file}" auth = base64.b64encode(f"{NEXTCLOUD_USER}:{NEXTCLOUD_PASS}".encode()).decode() try: with open(local_path, "rb") as f: data = f.read() req = urllib.request.Request( url, data=data, headers={ "Authorization": f"Basic {auth}", "Content-Type": "video/mp4", }, method="PUT", ) with urllib.request.urlopen(req, timeout=120) as resp: if resp.status in (200, 201, 204): return True, url return False, f"HTTP {resp.status}" except urllib.error.HTTPError as e: return False, f"HTTP {e.code}: {e.read().decode()[:200]}" except Exception as e: return False, f"Upload error: {e}" def _nextcloud_delete(remote_filename: str, target_subdir: str = None): """Pobriše datoteko iz Nextclouda preko WebDAV DELETE. Vrne (success: bool, msg: str). 404 (ne obstaja) tudi šteje kot success. """ if not _nextcloud_configured(): return False, "Nextcloud ni konfiguriran" import urllib.request, urllib.error, base64 from urllib.parse import quote base_path = (target_subdir or NEXTCLOUD_REELS_PATH).strip("/") safe_dir = "/".join(quote(p, safe="") for p in base_path.split("/")) safe_file = quote(remote_filename, safe="") url = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/{safe_dir}/{safe_file}" auth = base64.b64encode(f"{NEXTCLOUD_USER}:{NEXTCLOUD_PASS}".encode()).decode() try: req = urllib.request.Request( url, headers={"Authorization": f"Basic {auth}"}, method="DELETE", ) with urllib.request.urlopen(req, timeout=30) as resp: if resp.status in (200, 204): return True, "deleted" return False, f"HTTP {resp.status}" except urllib.error.HTTPError as e: if e.code == 404: return True, "not_found" # že ne obstaja — OK return False, f"HTTP {e.code}" except Exception as e: return False, f"Delete error: {e}" @app.post("/api/jobs/{job_id}/upload-nextcloud") async def upload_nextcloud(job_id: str, user: str = Depends(check_auth)): """Naloži dokončan reel na Nextcloud /folxspeed/REELS/{TV STATION}/.""" if not _nextcloud_configured(): raise HTTPException(500, "Nextcloud ni konfiguriran (manjka env vars)") job = load_job(job_id) if not job: raise HTTPException(404, "Job ne obstaja") if job.get("status") != "done": raise HTTPException(400, f"Job ni done (status={job.get('status')})") output_path = job.get("output_path") if not output_path or not Path(output_path).exists(): raise HTTPException(404, "Output mp4 ne obstaja") # Označim "uploading" update_job(job_id, nextcloud_status="uploading", nextcloud_error=None) # Lepo ime datoteke download_name = build_download_filename(job) # TV postaja določa target mapo (mapping Qnet -> Nextcloud folder) tv_station = job.get("tv_station", "FOLX SLO") nc_folder = _nextcloud_folder_for_station(tv_station) target_subdir = f"folxspeed/REELS/{nc_folder}" print(f"☁️ Uploading {output_path} → Nextcloud /{target_subdir}/{download_name}", flush=True) success, result = _nextcloud_upload(output_path, download_name, target_subdir=target_subdir) if success: update_job( job_id, nextcloud_status="uploaded", nextcloud_url=result, nextcloud_error=None, hidden_after_upload=True, # skrij iz default UI prikaza (toggle za pokaz) ) print(f"☁️ Upload OK: /{target_subdir}/{download_name}", flush=True) # Zabeleži v dedup try: orig_filename = job.get("filename") or download_name file_mb = job.get("output_size_mb") or job.get("size_mb") dedup_record(orig_filename, tv_station, job_id, nextcloud_url=result, file_size_mb=file_mb) except Exception as e: print(f"⚠️ Dedup record failed: {e}", flush=True) return {"ok": True, "url": result, "filename": download_name, "tv_station": tv_station} else: update_job(job_id, nextcloud_status="error", nextcloud_error=result) raise HTTPException(500, f"Upload failed: {result}") @app.post("/api/jobs/{job_id}/recut") async def recut_job(job_id: str, payload: RecutRequest, user: str = Depends(check_auth)): """Re-rendar reel z user-defined timestampi (in opcijsko popravljenimi napisi). Veliko hitrejše od full pipeline-a: - Brez Soniox transkripta (re-uporabi shranjenega) - Brez Claude analize (uporabnik je odločil) - Samo: clip → reframe → subtitle → output """ job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") if not src: raise HTTPException(400, "Original video manjka") _ensure_local(src, "upload") if not Path(src).exists(): raise HTTPException(400, "Original video manjka") if payload.end <= payload.start: raise HTTPException(400, "End mora biti večji od start") if payload.end - payload.start < 5: raise HTTPException(400, "Trajanje vsaj 5s") if payload.end - payload.start > 60: raise HTTPException(400, "Trajanje največ 60s") duration = payload.end - payload.start no_subs = payload.no_subs if payload.no_subs is not None else job.get("no_subs", False) # Naloži obstoječi analysis analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" _ensure_local(analysis_path, "output") if not analysis_path.exists(): raise HTTPException(500, "Analysis manjka — re-uplad pesmi") analysis = json.loads(analysis_path.read_text()) # Posodobi clip range analysis["clip_range"] = { "start": payload.start, "end": payload.end, "duration": duration, "reason": f"USER EDIT: ročno popravljen clip {payload.start:.1f}-{payload.end:.1f}s", "source": "user_edit", } # Če uporabnik popravil napise, override segmenti if payload.custom_segments: # Ohrani originalni transcript ampak shrani custom segments za SRT analysis["custom_subtitle_segments"] = payload.custom_segments analysis_path.write_text(json.dumps(analysis, indent=2, ensure_ascii=False)) # Re-queue job v processing (worker ga bo obdelal) # Reset upload flags da se reel pojavi v UI med renderiranjem, # po končanem renderu pa avto-upload spet (in spet izgine). update_job( job_id, status="queued", no_subs=no_subs, custom_clip=True, # flag da preskoči Soniox + Claude auto_upload_to_nextcloud=payload.auto_upload, # avto-upload po končanem recut hidden_after_upload=False, # vidno med renderiranjem nextcloud_status="recutting", # signal: ima staro Nextcloud verzijo, čaka novo current_step="V vrsti za recut", error=None, chorus_error=None, ) # Briši stari output out_mp4 = OUTPUT_DIR / f"{job_id}.mp4" if out_mp4.exists(): out_mp4.unlink() for ext in ("srt", "ass"): p = OUTPUT_DIR / f"{job_id}.subtitles.{ext}" if p.exists(): p.unlink() return { "status": "queued", "job_id": job_id, "start": payload.start, "end": payload.end, "duration": duration, } # ──────────────────────────────────────────────────────────────── # Retranscribe — ponovi STT z drugim providerjem (npr. Soniox→Scribe) # Ohrani isti clip range, samo regenerira transkript + podnapise + render. # ──────────────────────────────────────────────────────────────── class RetranscribeRequest(BaseModel): provider: str = "elevenlabs" # 'elevenlabs' (Scribe) | 'local' (faster-whisper) | 'soniox' | 'auto' lang: Optional[str] = None # ostane original če None auto_upload: bool = False # po končanem renderu naloži v Nextcloud (overwrites obstoječ) @app.post("/api/jobs/{job_id}/retranscribe") async def retranscribe_job(job_id: str, payload: RetranscribeRequest, user: str = Depends(check_auth)): """Ponovi transkripcijo z drugim STT providerjem in re-renderaj. Ohrani isti clip range (start/end iz obstoječe analize) — uporabnik ne potrebuje ponovno pozicionirati IN/OUT. Samo besedilo se popravi. Pogosto: Soniox je narobe slišal v slovenskem folkloru, gremo na Scribe. """ job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") if job.get("status") in ("queued", "processing", "downloading"): raise HTTPException(409, "Job je že v obdelavi") # Validate provider valid_providers = {"elevenlabs", "local", "soniox", "auto"} if payload.provider not in valid_providers: raise HTTPException(400, f"Neveljaven provider. Dovoljen: {sorted(valid_providers)}") # Original input mora obstajati (lokalno ali v S3) src = job.get("input_path") if not src: raise HTTPException(400, "Original video manjka") _ensure_local(src, "upload") if not Path(src).exists(): raise HTTPException(400, "Original video ne obstaja niti lokalno niti v S3") # Briši stari output + analysis (ker bo regen) out_mp4 = OUTPUT_DIR / f"{job_id}.mp4" if out_mp4.exists(): out_mp4.unlink() _delete_from_s3(out_mp4.name, "output") for ext in ("srt", "ass"): p = OUTPUT_DIR / f"{job_id}.subtitles.{ext}" if p.exists(): p.unlink() _delete_from_s3(p.name, "output") # Briši tudi analysis.json — zaženemo svežo analizo analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" if analysis_path.exists(): analysis_path.unlink() _delete_from_s3(analysis_path.name, "output") # Re-queue job — pomembno: NE postavi custom_clip=True (pustimo polno re-analizo) # whisper_provider override poskrbi, da bo tokrat drug STT updates = { "status": "queued", "current_step": f"V vrsti za retranscribe ({payload.provider})", "whisper_provider": payload.provider, "auto_upload_to_nextcloud": payload.auto_upload, "hidden_after_upload": False, "nextcloud_status": "retranscribing", "error": None, "chorus_error": None, "custom_clip": False, "retranscribe_count": (job.get("retranscribe_count", 0) or 0) + 1, } if payload.lang and payload.lang not in ("auto", ""): updates["lang"] = payload.lang update_job(job_id, **updates) return { "status": "queued", "job_id": job_id, "provider": payload.provider, "retranscribe_count": updates["retranscribe_count"], } # ──────────────────────────────────────────────────────────────── # Nextcloud upload (folxspeed/REELS/) # ──────────────────────────────────────────────────────────────── def _safe_filename_for_nextcloud(name: str) -> str: """Sanitize filename za Nextcloud — odstrani problematične znake.""" import re # Zamenjaj problematične znake z '_' name = re.sub(r'[\\/:*?"<>|]', '_', name) # Strip control chars name = ''.join(c for c in name if c.isprintable()) return name.strip()[:200] or "reel.mp4"