""" reels.biba.live — FastAPI backend. Endpoints: GET / — frontend HTML POST /api/upload — naloži video file POST /api/youtube — submit YouTube URL POST /api/process/{id} — start processing job GET /api/jobs — list vseh jobov GET /api/jobs/{id} — status job-a GET /api/stream/{id} — SSE progress stream GET /api/download/{id} — download finalni reel GET /api/preview/{id} — preview video stream DELETE /api/jobs/{id} — pobriši job + datoteke """ import asyncio import json import hashlib import hmac import os import secrets import shutil import subprocess import threading import time import urllib.parse import uuid from pathlib import Path from typing import Optional from fastapi import ( FastAPI, UploadFile, File, Form, HTTPException, Depends, BackgroundTasks, Request, Response as FastAPIResponse, status, Cookie ) from fastapi.responses import ( FileResponse, HTMLResponse, StreamingResponse, JSONResponse, Response, RedirectResponse ) from fastapi.staticfiles import StaticFiles from fastapi.security import HTTPBasic, HTTPBasicCredentials from pydantic import BaseModel # ──────────────────────────────────────────────────────────────── # Config # ──────────────────────────────────────────────────────────────── DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) UPLOAD_DIR = DATA_DIR / "uploads" OUTPUT_DIR = DATA_DIR / "outputs" JOBS_DIR = DATA_DIR / "jobs" QNET_DIR = DATA_DIR / "qnet" SCRIPTS_DIR = Path(__file__).parent.parent / "scripts" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) JOBS_DIR.mkdir(parents=True, exist_ok=True) QNET_DIR.mkdir(parents=True, exist_ok=True) # Qnet song match — povezava z MB player bazami os.environ.setdefault("QNET_LOOKUP_PATH", str(QNET_DIR / "songs_lookup.json")) from app import qnet_match # noqa: E402 # S3 storage mirror — uploads/outputs/jobs gredo tudi v s3://folxspeed/reels-app/ # Lokalna mapa ostane primary, S3 je replica/cache. from app import s3_storage # noqa: E402 def _persist_to_s3(local_path, kind: str) -> bool: """Mirror local file to S3 after producing it (best-effort). kind: 'upload' for uploads/, 'output' for outputs/, 'job_meta' for jobs/ Silent no-op when S3 not configured. Never raises. """ try: if not s3_storage.is_enabled(): return False p = Path(local_path) if not p.exists() or p.stat().st_size == 0: return False return s3_storage.upload_job_file("", kind, p) except Exception as e: print(f"⚠️ S3 mirror failed for {local_path}: {e}", flush=True) return False def _ensure_local(local_path, kind: str) -> bool: """Make sure file is on disk; if missing, fetch from S3. Returns True if file is ready locally after the call. kind: 'upload' or 'output' """ p = Path(local_path) if p.exists() and p.stat().st_size > 0: return True if not s3_storage.is_enabled(): return False folder = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"}.get(kind, kind) key = f"{folder}/{p.name}" print(f"📥 Local missing, fetching from S3: {key}", flush=True) return s3_storage.download(key, p) def _delete_from_s3(filename: str, kind: str) -> bool: """Delete object from S3 (mirror local delete). Best-effort, no raise.""" try: if not s3_storage.is_enabled(): return False folder = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"}.get(kind, kind) return s3_storage.delete(f"{folder}/{filename}") except Exception: return False def _ffmpeg_then_persist(cmd, out_path, kind: str = "output", timeout: int = 600): """Run ffmpeg in background thread, then mirror result to S3. Drop-in replacement for subprocess.Popen() when we want S3 sync after. """ def runner(): try: subprocess.run( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=timeout, ) _persist_to_s3(out_path, kind) except Exception as e: print(f"⚠️ ffmpeg/persist failed for {out_path}: {e}", flush=True) threading.Thread(target=runner, daemon=True).start() # Dedup DB — sledi že obdelanim/naloženim komadom DEDUP_DB = DATA_DIR / "processed.db" def _normalize_filename(filename: str) -> str: """Normaliziraj filename za dedup primerjavo. 'BRAJDE (Official Video).mp4' → 'brajde' 'Brajde (HD).mxf' → 'brajde' """ import re name = Path(filename).stem.lower() # Odstrani pogoste suffix-e name = re.sub(r'\b(official|video|hd|4k|lyric|audio|music|mv|live|cover|version|remix)\b', '', name) # Odstrani parentheses content name = re.sub(r'\([^)]*\)', '', name) name = re.sub(r'\[[^\]]*\]', '', name) # Whitespace normalize name = re.sub(r'\s+', ' ', name).strip() # Odstrani pogoste ločila name = re.sub(r'[-_.]+', ' ', name).strip() return name def _dedup_init(): """Ustvari SQLite tabelo če ne obstaja.""" import sqlite3 conn = sqlite3.connect(str(DEDUP_DB)) conn.execute(""" CREATE TABLE IF NOT EXISTS processed_videos ( normalized_name TEXT NOT NULL, tv_station TEXT NOT NULL, filename_orig TEXT NOT NULL, job_id TEXT NOT NULL, nextcloud_url TEXT, file_size_mb REAL, uploaded_at REAL NOT NULL, PRIMARY KEY (normalized_name, tv_station) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_norm ON processed_videos(normalized_name)") conn.commit() conn.close() def dedup_check(filename: str, tv_station: str) -> Optional[dict]: """Vrne dict z info o že obdelanem komadu, ali None.""" import sqlite3 _dedup_init() norm = _normalize_filename(filename) if not norm: return None conn = sqlite3.connect(str(DEDUP_DB)) conn.row_factory = sqlite3.Row row = conn.execute( "SELECT * FROM processed_videos WHERE normalized_name = ? AND tv_station = ?", (norm, tv_station) ).fetchone() conn.close() if row: return dict(row) return None def dedup_record(filename: str, tv_station: str, job_id: str, nextcloud_url: str = None, file_size_mb: float = None): """Zabeleži uspešno obdelan + naložen komad.""" import sqlite3 _dedup_init() norm = _normalize_filename(filename) if not norm: return conn = sqlite3.connect(str(DEDUP_DB)) conn.execute(""" INSERT OR REPLACE INTO processed_videos (normalized_name, tv_station, filename_orig, job_id, nextcloud_url, file_size_mb, uploaded_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (norm, tv_station, filename, job_id, nextcloud_url, file_size_mb, time.time())) conn.commit() conn.close() print(f"📒 Dedup: zabeležen {norm} → {tv_station} (job {job_id})", flush=True) def dedup_remove(filename: str, tv_station: str): """Izbriši zapis (npr. če uporabnik želi re-narediti).""" import sqlite3 _dedup_init() norm = _normalize_filename(filename) if not norm: return conn = sqlite3.connect(str(DEDUP_DB)) conn.execute("DELETE FROM processed_videos WHERE normalized_name = ? AND tv_station = ?", (norm, tv_station)) conn.commit() conn.close() AUTH_USER = os.environ.get("AUTH_USER", "sebastjan") AUTH_PASS = os.environ.get("AUTH_PASS", "change-me-in-coolify-env") # Google Sign-In (https://developers.google.com/identity/gsi/web) GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "").strip() ALLOWED_EMAILS = [e.strip().lower() for e in os.environ.get("ALLOWED_EMAILS", "").split(",") if e.strip()] SESSION_SECRET = os.environ.get("SESSION_SECRET", "").strip() or hashlib.sha256( f"{AUTH_USER}|{AUTH_PASS}|reels-app-default-secret".encode() ).hexdigest() SESSION_COOKIE_NAME = "reels_session" SESSION_TTL_SECONDS = 30 * 24 * 3600 # 30 dni MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "2000")) # Nextcloud upload (folxspeed/REELS/) NEXTCLOUD_URL = os.environ.get("NEXTCLOUD_URL", "https://nextcloud.folx.tv") NEXTCLOUD_USER = os.environ.get("NEXTCLOUD_USER", "admin") NEXTCLOUD_PASS = os.environ.get("NEXTCLOUD_PASS", "") NEXTCLOUD_FOLDER = os.environ.get("NEXTCLOUD_FOLDER", "folxspeed/REELS") # ──────────────────────────────────────────────────────────────── # Auth — Google Sign-In + basic auth fallback (za API/cron) # ──────────────────────────────────────────────────────────────── security = HTTPBasic(auto_error=False) def _email_allowed(email: str) -> bool: """Email mora biti v ALLOWED_EMAILS whitelistu.""" if not email: return False return email.lower().strip() in ALLOWED_EMAILS def _make_session_token(email: str) -> str: """Sign session: base64url(email|expiry|hmac_sha256).""" import base64 expiry = int(time.time()) + SESSION_TTL_SECONDS payload = f"{email}|{expiry}" sig = hmac.new(SESSION_SECRET.encode(), payload.encode(), hashlib.sha256).hexdigest() full = f"{payload}|{sig}" return base64.urlsafe_b64encode(full.encode()).decode().rstrip("=") def _verify_session_token(token: str) -> Optional[str]: """Returns email if valid+not expired, else None.""" if not token: return None try: import base64 pad = len(token) % 4 if pad: token += "=" * (4 - pad) decoded = base64.urlsafe_b64decode(token.encode()).decode() parts = decoded.split("|") if len(parts) != 3: return None email, expiry, sig = parts expected = hmac.new(SESSION_SECRET.encode(), f"{email}|{expiry}".encode(), hashlib.sha256).hexdigest() if not hmac.compare_digest(sig, expected): return None if int(expiry) < int(time.time()): return None return email except Exception: return None def _verify_google_id_token(id_token: str) -> Optional[dict]: """Verify Google ID token via tokeninfo endpoint. Returns claims dict or None.""" if not id_token or not GOOGLE_CLIENT_ID: return None try: import urllib.request, urllib.error, json as _json url = f"https://oauth2.googleapis.com/tokeninfo?id_token={urllib.parse.quote(id_token)}" with urllib.request.urlopen(url, timeout=10) as resp: data = _json.loads(resp.read().decode()) if data.get("aud") != GOOGLE_CLIENT_ID: print(f"⚠️ Google token aud mismatch: {data.get('aud')!r}", flush=True) return None if data.get("iss") not in ("accounts.google.com", "https://accounts.google.com"): return None if int(data.get("exp", 0)) < int(time.time()): return None if data.get("email_verified") not in (True, "true"): return None return data except Exception as e: print(f"⚠️ Google token verify error: {e}", flush=True) return None def check_auth( request: Request, creds: Optional[HTTPBasicCredentials] = Depends(security), ): """Auth dependency. Accepts (in order): 1) Valid session cookie (set by Google Sign-In flow) 2) HTTP Basic with AUTH_USER/AUTH_PASS (legacy + cron + scripts) On failure: redirect browser GETs to /login, return 401 to API clients. """ # 1) Session cookie token = request.cookies.get(SESSION_COOKIE_NAME) if token: email = _verify_session_token(token) if email and _email_allowed(email): return email # 2) Basic auth fallback if creds and creds.username and creds.password: correct_user = secrets.compare_digest(creds.username, AUTH_USER) correct_pass = secrets.compare_digest(creds.password, AUTH_PASS) if correct_user and correct_pass: return creds.username # 3) Failure — redirect browsers, 401 API accept = request.headers.get("accept", "") is_browser_get = request.method == "GET" and "text/html" in accept if is_browser_get and GOOGLE_CLIENT_ID: # Browser → /login (raise s 303 ne dela v Depends, zato uporabljamo HTTPException) raise HTTPException( status_code=status.HTTP_303_SEE_OTHER, headers={"Location": "/login"}, ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Basic"}, ) # ──────────────────────────────────────────────────────────────── # Artist + title parsing iz filename / YouTube title # ──────────────────────────────────────────────────────────────── import re _NOISE_PATTERNS = [ # Variacije "official" z možnimi tipkarskimi napakami (offiicial, offical, oficial, ...) # Match liberalno: "off" + 0-3 dodatnih črk + "icial" + "video"/"audio" + opcijsko številka r"\(Off[a-z]*icial\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\s*\)", r"\(Off[a-z]*icia[lk]\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\)", r"\bOff[a-z]*icial\s+(?:Music\s+|HD\s+|4K\s+)?(?:Video|Audio)\b", # Nemške variacije r"\(Offizielles?\s+(?:Musik)?[Vv]ideo\)", r"\bOffizielles?\s+(?:Musik)?[Vv]ideo\b", # Lyric videos r"\(Lyric[s]?\s+Video\)", r"\bLyric[s]?\s+Video\b", # Audio quality / version markers r"\(Audio\)", r"\(HD\)", r"\(HQ\)", r"\(4K\)", r"\(8K\)", r"\(1080p?\)", r"\(Live\)", r"\(Remix\)", r"\(Cover\)", r"\(Acoustic\)", r"\(Remastered\)", r"\(Remaster(?:ed)?\s*\d{0,4}\)", r"\(Extended(?:\s+Mix)?\)", r"\(Radio(?:\s+Edit)?\)", r"\(Clean(?:\s+Version)?\)", r"\(Explicit\)", # Square brackets variations r"\[Official[^\]]*\]", r"\[Music[^\]]*\]", r"\[Audio[^\]]*\]", r"\[HD\]", r"\[HQ\]", r"\[4K\]", r"\[Lyric[s]?[^\]]*\]", # Bare words (without brackets) r"\boriginal\s+(?:video|audio)\b", r"\bMV\b", r"\b(?:4K|HD|HQ|8K|1080p|720p|4k|hd|hq)\s*$", # trailing 4K/HD brez oklepajev # Trailing year in parens (npr. "(2024)") r"\(\d{4}\)\s*$", # Trailing 2-4 digit number na koncu (verjetno leto: "23", "2023", "33"): # POMEMBNO: samo če je ZADNJA stvar v stringu in ne del besede # NE odstrani '33 točk' ampak DA odstrani 'Naslov 33' # Pred številko: presledek/oklepaji/ničesar r"\s+\d{2,4}\s*$", # Prazni / dummy oklepaji: "( )", "( )", "( - )", "(-)", "(.)" r"\(\s*[-–—._]*\s*\)", # Catch-all: oklepaji z "video"/"audio"/"version"/"mix"/"edit"/"remix" # (široko match — če oklepaji vsebujejo te besede, so verjetno noise) r"\([^)]*\b(?:video|audio|version|mix|edit|remix|cover|live|hd|hq|4k|8k|remaster(?:ed)?|extended|clean|explicit|radio|lyric[s]?|official|offizielles?|musik)\b[^)]*\)", # Catch-all: oglati oklepaji z noise besedami r"\[[^\]]*\b(?:video|audio|version|mix|edit|remix|cover|live|hd|hq|4k|official|musik)\b[^\]]*\]", # Avtor/feat. v oklepajih: "(prod. by X)", "(feat. Y)", "(ft. Z)" r"\(\s*(?:prod\.?(?:uced)?\s+by|feat\.?(?:uring)?|ft\.?)\s+[^)]+\)", # Trailing številke ki označujejo verzije: "33" na koncu (npr. "Modrijani - X 33") # POZOR: zelo previdno, ker so lahko legit (npr. del naslova) # — Ne dodam splošnega trailing številk pattern-a, ker bi razbil legitime ] def parse_artist_title(filename_or_title): """Iz imena datoteke / YouTube naslova ekstrahira (artist, title). Podpira pogoste vzorce: - "Artist - Title.mp4" - "Artist - Title (Official Music Video).mp4" - "Artist – Title" (em-dash) - "Artist | Title" Vrne (artist, title) ali (None, None) če ni razvidno. """ if not filename_or_title: return (None, None) # Odstrani extension SAMO če je dejansko file extension (kratko, brez presledka). # Path("ANS.NAVEZA - SREČA OPOTEČA").stem vrne "ANS" — narobe za YT title. # Pravilno: samo če je string brez presledkov ali se konča na običajno ext. _COMMON_EXT = {".mp4", ".mp3", ".m4a", ".webm", ".mkv", ".avi", ".mov", ".wav", ".flac", ".aac", ".opus", ".ogg", ".wmv", ".mxf"} name = filename_or_title if "." in name: # Vzemi zadnjo piko in preveri ali je to znana ext last_dot = name.rfind(".") ext = name[last_dot:].lower() if ext in _COMMON_EXT and last_dot > 0: name = name[:last_dot] # Odstrani noise patterns for pat in _NOISE_PATTERNS: name = re.sub(pat, "", name, flags=re.IGNORECASE) # Normaliziraj presledke name = re.sub(r"\s+", " ", name).strip() # Probaj različne separatorje for sep in [" - ", " – ", " — ", " | ", " : "]: if sep in name: parts = name.split(sep, 1) artist = parts[0].strip() title = parts[1].strip() # Strip trailing/leading puncutation artist = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', artist) title = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', title) if artist and title and len(artist) <= 80 and len(title) <= 100: return (artist, title) return (None, None) def safe_filename(s, max_len=80): """Naredi varno ime datoteke (brez znakov ki bi razbili FS).""" if not s: return "" # Replace problematic chars with safe alternative s = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', s) s = re.sub(r'\s+', ' ', s).strip() return s[:max_len] def clean_noise(s): """Odstrani 'noise' (Official Video itd.) iz besedila - tudi že-parsed values.""" if not s: return s cleaned = s for pat in _NOISE_PATTERNS: cleaned = re.sub(pat, "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"\s+", " ", cleaned).strip() cleaned = re.sub(r'^[\s\-–—|.:_]+|[\s\-–—|.:_]+$', '', cleaned) return cleaned def build_download_filename(job): """Sestavi pravilno ime download datoteke iz job metadata.""" # Najprej probaj job-shranjene parsed values artist = clean_noise(job.get("parsed_artist")) title = clean_noise(job.get("parsed_title")) # Fallback: parse from filename if not artist or not title: source = job.get("filename") or job.get("youtube_title") or "" parsed_artist, parsed_title = parse_artist_title(source) artist = artist or parsed_artist title = title or parsed_title if artist and title: return f"{safe_filename(artist)} - {safe_filename(title)} - REEL.mp4" if title: return f"{safe_filename(title)} - REEL.mp4" # Last resort: job ID (vendar to bi se moralo preprečiti že ob upload-u) return f"reel_{job['id']}.mp4" # ──────────────────────────────────────────────────────────────── # Job state (filesystem-based, persistent prek restartov) # ──────────────────────────────────────────────────────────────── def job_path(job_id): return JOBS_DIR / f"{job_id}.json" def load_job(job_id): p = job_path(job_id) if not p.exists(): return None return json.loads(p.read_text()) def save_job(job): p = job_path(job["id"]) p.write_text(json.dumps(job, ensure_ascii=False, indent=2)) _persist_to_s3(p, "job_meta") def update_job(job_id, **kwargs): job = load_job(job_id) if not job: return None job.update(kwargs) job["updated_at"] = time.time() save_job(job) return job def _clean_job_titles(job): """Očisti 'Official Video' ipd. iz parsed_title v real-time (brez perzistiranja).""" if not job: return job if job.get("parsed_title"): cleaned = clean_noise(job["parsed_title"]) if cleaned and cleaned != job["parsed_title"]: job["parsed_title"] = cleaned if job.get("parsed_artist"): cleaned = clean_noise(job["parsed_artist"]) if cleaned and cleaned != job["parsed_artist"]: job["parsed_artist"] = cleaned return job def list_jobs(): out = [] for f in sorted(JOBS_DIR.glob("*.json"), reverse=True): try: out.append(_clean_job_titles(json.loads(f.read_text()))) except Exception: pass return out def generate_srt_from_segments(segments, clip_start, clip_end, output_path): """Generira SRT samo za dele, ki spadajo v [clip_start, clip_end]. Timestamp-i so re-mapirani na 0-based (kot je v trim-anem videu). Razdeli dolge segmente (>2.5s) na enake kose za hiter pacing v reels stilu. Vse besedilo VELIKE TISKANE ČRKE. """ MAX_CHUNK_DURATION = 2.5 def fmt_ts(s): h = int(s // 3600) m = int((s % 3600) // 60) sec = s % 60 return f"{h:02d}:{m:02d}:{sec:06.3f}".replace(".", ",") lines = [] idx = 1 for seg in segments: s_start = float(seg["start"]) s_end = float(seg["end"]) text = str(seg["text"]).strip() words = seg.get("words", []) or [] # Filter v range if s_end <= clip_start or s_start >= clip_end: continue # ── HALLUCINATION FILTER ── # STT (Scribe, Whisper) občasno halucinira pri dolgih instrumentalih: # vrne segment 60-100s z 1-2 besedama. Tak segment ne smemo dati v SRT. # Pravilo: če je segment > 15s IN ima < 5 besed (ali words array < 5), # je verjetno halucinacija. seg_dur = s_end - s_start word_count = len(words) if words else len(text.split()) if seg_dur > 15 and word_count < 5: print(f"[SRT] Preskočil halucinacijski segment [{s_start:.1f}-{s_end:.1f}] " f"({seg_dur:.1f}s, {word_count} besed): {text[:50]!r}", flush=True) continue # Pripravi words_in_clip vedno (če imamo word-level timestampe) # Uporabili ga bomo tako za segment trim kot za chunk boundaries words_in_clip = None if words: words_in_clip = [] for w in words: w_start = float(w.get("start", 0)) w_end = float(w.get("end", 0)) w_text = w.get("text", "").strip() if not w_text: continue if w_end > clip_start and w_start < clip_end: words_in_clip.append({ "start": max(w_start, clip_start), "end": min(w_end, clip_end), "text": w_text, }) # Če segment delno štrli iz clip range-a IN imamo word-level timestampe, # uporabi samo tiste besede ki dejansko padejo v clip range if words_in_clip and (s_start < clip_start or s_end > clip_end): if not words_in_clip: continue # Reconstruiraj segment z dejanskim word-level timing-om text = " ".join(w["text"] for w in words_in_clip) s_start = words_in_clip[0]["start"] s_end = words_in_clip[-1]["end"] else: # Klipni segment začetek/konec na clip range s_start = max(s_start, clip_start) s_end = min(s_end, clip_end) if s_end - s_start < 0.2: continue # Re-mapraj na 0-based rel_start = s_start - clip_start rel_end = s_end - clip_start if not text: continue text_upper = text.upper() # Razdeli na chunk-e če je predolg duration = rel_end - rel_start if duration <= MAX_CHUNK_DURATION: lines.append(f"{idx}\n{fmt_ts(rel_start)} --> {fmt_ts(rel_end)}\n{text_upper}\n") idx += 1 else: # ── WORD-LEVEL CHUNKING ── # Če imamo word-level timestampe (Soniox/Scribe), uporabi DEJANSKE čase besed # za chunk boundaries (NE enake time chunks, ker pevec ne poje enakomerno). if words_in_clip and len(words_in_clip) >= 2: # Group besede v chunke z max trajanjem MAX_CHUNK_DURATION # Pravila: # 1. Vsak chunk traja max MAX_CHUNK_DURATION (2.5s) # 2. NE začni novega chunka če bi prejšnji ostal kratek (<3 besede ali <12 znakov) # 3. NE pusti zadnji chunk siroto z 1 kratko besedo (<3 znakov) — združi nazaj MIN_CHUNK_WORDS = 3 MIN_CHUNK_CHARS = 12 chunks = [] current_chunk = [words_in_clip[0]] for w in words_in_clip[1:]: chunk_start_time = current_chunk[0]["start"] chunk_dur_so_far = w["end"] - chunk_start_time # Bi začeli nov chunk? if chunk_dur_so_far > MAX_CHUNK_DURATION: # Preveri ali bi current_chunk ostal "siroto kratek" current_text = " ".join(c["text"] for c in current_chunk) too_short = (len(current_chunk) < MIN_CHUNK_WORDS or len(current_text) < MIN_CHUNK_CHARS) if too_short: # Daj še to besedo zraven (raje malo predolg chunk kot siroto kratek) current_chunk.append(w) else: chunks.append(current_chunk) current_chunk = [w] else: current_chunk.append(w) if current_chunk: # Če zadnji chunk ima samo 1-2 kratki besedi, združi z zadnjim chunkom last_text = " ".join(c["text"] for c in current_chunk) if (chunks and (len(current_chunk) < 2 or len(last_text) < 5)): chunks[-1].extend(current_chunk) else: chunks.append(current_chunk) # Generiraj SRT iz chunks (z dejanskimi word timestampi) for chunk in chunks: cs = chunk[0]["start"] - clip_start ce = chunk[-1]["end"] - clip_start chunk_text = " ".join(w["text"] for w in chunk).upper().strip() if not chunk_text: continue lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text}\n") idx += 1 else: # Fallback: brez word-level timestampov, razdeli enako n_parts = int(duration / MAX_CHUNK_DURATION) + 1 words_split = text_upper.split() words_per_part = max(1, len(words_split) // n_parts) chunk_dur = duration / n_parts for i in range(n_parts): cs = rel_start + i * chunk_dur ce = rel_start + (i + 1) * chunk_dur wstart = i * words_per_part wend = (i + 1) * words_per_part if i < n_parts - 1 else len(words_split) chunk_text = " ".join(words_split[wstart:wend]) if wstart < len(words_split) else text_upper if not chunk_text.strip(): chunk_text = text_upper lines.append(f"{idx}\n{fmt_ts(cs)} --> {fmt_ts(ce)}\n{chunk_text.strip()}\n") idx += 1 with open(output_path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) return output_path # ──────────────────────────────────────────────────────────────── # Pipeline runner (background task) # ──────────────────────────────────────────────────────────────── def run_subprocess_logged(cmd, job_id, step_name): """Pokliče subprocess, logi gredo v job.""" update_job(job_id, current_step=step_name, status="processing") proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: # Combine stdout + stderr za diagnostiko err_msg = (proc.stderr or "") + "\n" + (proc.stdout or "") update_job( job_id, status="failed", error=f"{step_name}: {err_msg[-800:].strip()}", ) return False # Tudi pri success-u beleži stderr za diagnostiko (samo zadnji del) if proc.stderr and proc.stderr.strip(): update_job(job_id, last_step_log=proc.stderr[-500:].strip()) return True def _precache_edit_assets(job_id: str, src_path: str): """Generira low-q source video + waveform PNG za Edit modal. Tečeta v ozadju (subprocess), ne blokira pipeline-a. Če Edit modal je odprt, te assete dobi instant. """ src = Path(src_path) if not src.exists(): return # Low-q source (480p) — za hitro scrubbanje low_path = OUTPUT_DIR / f"{job_id}_source_low.mp4" if not low_path.exists() or low_path.stat().st_size < 1024: if low_path.exists(): low_path.unlink() cmd = [ "ffmpeg", "-y", "-i", str(src), "-vf", "scale=854:480:force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2", "-c:v", "libx264", "-preset", "veryfast", "-crf", "28", "-c:a", "aac", "-b:a", "96k", "-movflags", "+faststart", "-loglevel", "error", str(low_path), ] _ffmpeg_then_persist(cmd, low_path, kind="output") print(f"📦 Pre-cache low-q source za {job_id} (background → S3)", flush=True) # Waveform PNG (2400x72 — za zoom) wave_path = OUTPUT_DIR / f"{job_id}_waveform_2400x72.png" if not wave_path.exists() or wave_path.stat().st_size < 100: if wave_path.exists(): wave_path.unlink() cmd = [ "ffmpeg", "-y", "-i", str(src), "-filter_complex", "[0:a]aformat=channel_layouts=mono,showwavespic=s=2400x72:colors=#ff6b6b:scale=lin:draw=full[wave]", "-map", "[wave]", "-frames:v", "1", "-loglevel", "error", str(wave_path), ] _ffmpeg_then_persist(cmd, wave_path, kind="output") print(f"📦 Pre-cache waveform za {job_id} (background → S3)", flush=True) def process_job(job_id): """Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs.""" job = load_job(job_id) if not job: return try: # ── 1. Source preparation ───────────────────────────── if job["source_type"] == "youtube": update_job(job_id, status="downloading", current_step="YouTube download") input_path = UPLOAD_DIR / f"{job_id}_yt.mp4" cmd = [ "python3", str(SCRIPTS_DIR / "yt_download.py"), job["youtube_url"], str(input_path), ] if not run_subprocess_logged(cmd, job_id, "YouTube download"): return update_job(job_id, input_path=str(input_path)) # S3 mirror — original video + info.json _persist_to_s3(input_path, "upload") info_json = input_path.with_suffix(".info.json") if info_json.exists(): _persist_to_s3(info_json, "upload") # Probaj dobiti YT metadata (če še ni iz submit-a) — title, uploader, id, ... # Single video submit ali playlist resolve že nastavi metadata, ampak # včasih (npr. če je submit fetch failed) je še manjka. need_metadata_fetch = not job.get("youtube_title") or not job.get("youtube_uploader") if need_metadata_fetch: try: info_cmd = [ "python3", str(SCRIPTS_DIR / "yt_download.py"), job["youtube_url"], "/dev/null", "--info-only", ] proc = subprocess.run(info_cmd, capture_output=True, text=True, timeout=30) if proc.returncode == 0 and proc.stdout: info = json.loads(proc.stdout) yt_title = info.get("title", "") or "" updates = {} if yt_title: updates["youtube_title"] = yt_title if info.get("id"): updates["youtube_id"] = info["id"] if info.get("uploader") or info.get("channel"): updates["youtube_uploader"] = info.get("uploader") or info.get("channel") or "" if info.get("duration") is not None: updates["youtube_duration"] = info["duration"] if info.get("thumbnail"): updates["youtube_thumbnail"] = info["thumbnail"] if info.get("description"): updates["youtube_description"] = info["description"][:2000] if info.get("upload_date"): updates["youtube_upload_date"] = info["upload_date"] if info.get("webpage_url"): updates["youtube_webpage_url"] = info["webpage_url"] # Qnet match + parser samo če še nimamo clean name if yt_title and not job.get("has_clean_name"): qm = qnet_match.match_filename(yt_title) if qm["matched"] and qm["confidence"] >= 0.85: updates["parsed_artist"] = qm["artist"] updates["parsed_title"] = qm["title"] updates["has_clean_name"] = True updates["qnet_match"] = { "method": qm["method"], "confidence": qm["confidence"], "matched_file": qm["file"], "matched_station": qm["station"], } updates["tv_station"] = qm["station"] else: a, t = parse_artist_title(yt_title) if a: updates["parsed_artist"] = a if t: updates["parsed_title"] = t updates["has_clean_name"] = bool(a and t) if updates: update_job(job_id, **updates) # Reload job for downstream use job = load_job(job_id) except Exception as e: print(f"⚠️ Cannot fetch YT metadata: {e}", flush=True) else: input_path = Path(job["input_path"]) # ── 1b. Music recognition (ACRCloud) — če nimamo artist+title ───── # Tudi za YouTube jobs lahko naslov ni razviden (npr. iz playliste, "Track 5") if not (job.get("parsed_artist") and job.get("parsed_title")): update_job(job_id, current_step="Avto-prepoznavam pesem (ACRCloud)") try: acr_cmd = [ "python3", str(SCRIPTS_DIR / "acr_recognize.py"), str(input_path), ] proc = subprocess.run(acr_cmd, capture_output=True, text=True, timeout=120) if proc.returncode == 0 and proc.stdout: data = json.loads(proc.stdout) a, t = data.get("artist"), data.get("title") if a and t: update_job( job_id, parsed_artist=a, parsed_title=t, has_clean_name=True, recognized_via="acrcloud", ) job = load_job(job_id) print(f"✅ ACR prepoznal: {a} - {t}", flush=True) else: print(f"⚠️ ACR ni prepoznal pesmi", flush=True) else: print(f"⚠️ ACR exit {proc.returncode}: {proc.stderr[:200]}", flush=True) except Exception as e: print(f"⚠️ ACR error: {e}", flush=True) # ── 2. Smart analysis (če auto_chorus) ────────────────────────── # ÄŒe je custom_clip=True (user-edit mode), preskoÄi analizo # in samo posodobi start/duration iz obstojeÄega clip_range v analysis.json if job.get("custom_clip"): update_job(job_id, current_step="User-edit recut (preskočim analizo)") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" srt_from_claude = None if analysis_path.exists(): try: with open(analysis_path, "r", encoding="utf-8") as f: analysis = json.load(f) cr = analysis["clip_range"] fade = analysis.get("fade", {"fade_in": 0.05, "fade_out": 0.5}) # Generiraj nov SRT iz obstoječega transkripta na novem range-u if analysis.get("transcript", {}).get("segments") and not job.get("no_subs"): srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt" try: # Če imamo custom_subtitle_segments (user popravil napise) segs_to_use = analysis.get("custom_subtitle_segments") or analysis["transcript"]["segments"] generate_srt_from_segments( segs_to_use, cr["start"], cr["end"], srt_path_out, ) srt_from_claude = str(srt_path_out) except Exception as e: print(f"⚠️ Recut SRT generation failed: {e}", flush=True) update_job( job_id, start=cr["start"], duration=cr["duration"], fade_in=fade.get("fade_in", 0.05), fade_out=fade.get("fade_out", 0.5), claude_srt_path=srt_from_claude, ) job = load_job(job_id) except Exception as e: update_job(job_id, status="failed", error=f"Recut analysis read: {e}") return else: update_job(job_id, status="failed", error="Analysis manjka za recut") return elif job.get("auto_chorus"): update_job(job_id, current_step="Analiza pesmi (transkript + energija)") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" cmd = [ "python3", str(SCRIPTS_DIR / "analyze.py"), str(input_path), "--target-duration", str(job.get("duration", 30)), "--max-duration", str(job.get("max_duration", 45)), "--min-duration", str(job.get("min_duration", 20)), "--output", str(analysis_path), ] if job.get("include_prebuild"): cmd += ["--include-prebuild"] # LLM provider (claude/gemini/auto) if job.get("llm_provider"): cmd += ["--llm-provider", job["llm_provider"]] if job.get("llm_model"): cmd += ["--llm-model", job["llm_model"]] # Filename hint za Claude/Scribe — preferiraj parsed artist+title (čistejše) if job.get("parsed_artist") and job.get("parsed_title"): fn_hint = f"{job['parsed_artist']} - {job['parsed_title']}" cmd += ["--filename-hint", fn_hint] elif job.get("filename"): fn_hint = Path(job["filename"]).stem cmd += ["--filename-hint", fn_hint] # STT provider (elevenlabs = Scribe, local = faster-whisper, auto = preferiraj Scribe) # Per-station default routing: # FOLX DE / ZWEI (nemške) → Scribe (boljši za nemščino, brez "Mrs. Sadie" halucinacij) # FOLX SLO / ONE / ADRIA → Soniox primary (auto = soniox_chain) # User lahko v UI override-a (whisper_provider polje). station = (job.get("tv_station") or "").upper() DE_STATIONS = ("FOLX DE", "ZWEI", "ZWEI MUSIC TV", "FOLX MUSIC TV") user_provider = job.get("whisper_provider") if user_provider and user_provider not in ("auto", ""): # User explicitly chose a provider — respect it cmd += ["--whisper-provider", user_provider] elif any(s in station for s in DE_STATIONS): # Auto-route DE → Scribe cmd += ["--whisper-provider", "elevenlabs"] print(f"🇩🇪 Auto-routing STT to Scribe za {job.get('tv_station')}", flush=True) elif user_provider: # auto → analyze.py si izbere (soniox_chain default) cmd += ["--whisper-provider", user_provider] # lang: če None ali 'auto', pusti analyze.py auto-detect if job.get("lang") and job["lang"] not in ("auto", ""): cmd += ["--lang", job["lang"]] cmd += ["--model", job.get("whisper_model", "large-v3")] proc = subprocess.run(cmd, capture_output=True, text=True) srt_from_claude = None # Pot do SRT iz Claude-popravljenega transcript-a if proc.returncode == 0 and analysis_path.exists(): try: with open(analysis_path, "r", encoding="utf-8") as f: analysis = json.load(f) cr = analysis["clip_range"] fade = analysis["fade"] # Generiraj SRT iz transcript-a TRIM-ANEGA na clip_range # (Claude je morda popravil besedilo — uporabi popravljeno) if analysis.get("transcript", {}).get("segments"): srt_path_out = OUTPUT_DIR / f"{job_id}.subtitles.srt" try: generate_srt_from_segments( analysis["transcript"]["segments"], cr["start"], cr["end"], srt_path_out, ) srt_from_claude = str(srt_path_out) llm_src = cr.get("source", "LLM") print(f"📝 Generated SRT from {llm_src} transcript: {srt_path_out}") except Exception as e: print(f"⚠️ SRT generation failed: {e}") update_job( job_id, analysis_summary={ "language": analysis["language"], "language_probability": analysis["language_probability"], "instrumental": analysis["instrumental"], "clip_range": cr, "fade": fade, "chorus_preview": analysis["chorus"]["best"]["text_preview"] if analysis.get("chorus") and analysis["chorus"].get("best") else None, "video_duration": analysis.get("video_duration"), "candidates": analysis["chorus"].get("all_candidates", [])[:5] if analysis.get("chorus") else [], "claude_corrected_text": analysis.get("claude_corrected_text", False), }, # Cel transkript shranimo za UI prikaz full_transcript=[ {"start": s["start"], "end": s["end"], "text": s["text"]} for s in analysis.get("transcript", {}).get("segments", []) ], start=cr["start"], duration=cr["duration"], fade_in=fade["fade_in"], fade_out=fade["fade_out"], detected_language=analysis["language"], is_instrumental=analysis["instrumental"], claude_srt_path=srt_from_claude, ) # Auto-disable subs za instrumental if analysis["instrumental"] and not job.get("no_subs"): update_job(job_id, no_subs=True, auto_disabled_subs=True) # Reload local dict job = load_job(job_id) except (json.JSONDecodeError, KeyError) as e: update_job(job_id, chorus_error=f"Analysis parse: {e}") else: update_job(job_id, chorus_error=(proc.stderr or "")[-500:]) # ── 3. Reframe + subtitles (clip.py orchestrator) ───── output_path = OUTPUT_DIR / f"{job_id}.mp4" update_job(job_id, current_step="Reframe + subtitles") cmd = [ "python3", str(SCRIPTS_DIR / "clip.py"), str(input_path), str(output_path), "--mode", job.get("mode", "track"), "--quality", job.get("quality", "medium"), "--style", job.get("subtitle_style", "reels"), ] if job.get("start") is not None: cmd += ["--start", str(job["start"])] if job.get("duration") is not None: cmd += ["--duration", str(job["duration"])] if job.get("fade_in", 0) > 0: cmd += ["--fade-in", str(job["fade_in"])] if job.get("fade_out", 0) > 0: cmd += ["--fade-out", str(job["fade_out"])] # SRT iz Claude (boljše besedilo) — preda direktno v subtitle.py if job.get("claude_srt_path") and Path(job["claude_srt_path"]).exists() and not job.get("no_subs"): cmd += ["--srt", job["claude_srt_path"]] # lang: prefer detected_language če auto chosen_lang = job.get("lang") if chosen_lang in (None, "auto", ""): chosen_lang = job.get("detected_language") if chosen_lang: cmd += ["--lang", chosen_lang] if job.get("no_subs"): cmd += ["--no-subs"] cmd += ["--model", job.get("whisper_model", "large-v3")] # DEBUG: zapiši natanko kakšen ukaz se izvede update_job(job_id, debug_clip_cmd=" ".join(cmd)) if not run_subprocess_logged(cmd, job_id, "Reframe + subtitles"): return # ── Done ────────────────────────────────────────────── if output_path.exists(): update_job( job_id, status="done", current_step="Končano", output_path=str(output_path), output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2), ) # S3 mirror — final reel + pomožne datoteke (analysis, subtitles) _persist_to_s3(output_path, "output") for suffix in (".analysis.json", ".subtitles.srt", ".subtitles.ass"): aux = OUTPUT_DIR / f"{job_id}{suffix}" if aux.exists(): _persist_to_s3(aux, "output") # Telegram obvestilo try: from app.telegram import notify_job_done final_job = load_job(job_id) notify_job_done(final_job) except Exception as e: print(f"⚠️ TG notify_job_done failed: {e}", flush=True) # Pre-cache za Edit modal (instant odpiranje): # - low-q source video (480p) za hitro scrubbanje # - waveform PNG # Ti se bodo zgenerirali tudi on-demand, ampak zdaj so že tu = Edit instant. try: _precache_edit_assets(job_id, str(input_path)) except Exception as e: print(f"⚠️ Edit precache failed: {e}", flush=True) # Avto-upload v Nextcloud če flag set (npr. po Save/recut) try: final_job = load_job(job_id) if final_job.get("auto_upload_to_nextcloud") and _nextcloud_configured(): update_job(job_id, nextcloud_status="uploading", nextcloud_error=None) download_name = build_download_filename(final_job) tv_station = final_job.get("tv_station", "FOLX SLO") nc_folder = _nextcloud_folder_for_station(tv_station) target_subdir = f"folxspeed/REELS/{nc_folder}" print(f"☁️ Avto-upload v Nextcloud: /{target_subdir}/{download_name}", flush=True) success, result = _nextcloud_upload(str(output_path), download_name, target_subdir=target_subdir) if success: update_job( job_id, nextcloud_status="uploaded", nextcloud_url=result, nextcloud_error=None, auto_upload_to_nextcloud=False, # disable da se ne ponovi hidden_after_upload=True, # signal za UI da ga skrije ) # Zabeleži v dedup try: orig_filename = final_job.get("filename") or download_name file_mb = final_job.get("output_size_mb") or final_job.get("size_mb") dedup_record(orig_filename, tv_station, job_id, nextcloud_url=result, file_size_mb=file_mb) except Exception as e: print(f"⚠️ Dedup record failed: {e}", flush=True) print(f"☁️ Auto-upload OK: /{target_subdir}/{download_name}", flush=True) else: update_job(job_id, nextcloud_status="error", nextcloud_error=result) print(f"⚠️ Auto-upload failed: {result}", flush=True) except Exception as e: print(f"⚠️ Auto-upload error: {e}", flush=True) # Batch tracking — če je zadnji v batchu, pošlji summary _try_finalize_batch(job_id) else: update_job( job_id, status="failed", error="Output datoteka ne obstaja po obdelavi", ) try: from app.telegram import notify_job_failed notify_job_failed(load_job(job_id), "Output datoteka ne obstaja") except Exception: pass _try_finalize_batch(job_id) except Exception as e: update_job(job_id, status="failed", error=str(e)) try: from app.telegram import notify_job_failed notify_job_failed(load_job(job_id), str(e)) except Exception: pass _try_finalize_batch(job_id) def _try_finalize_batch(job_id): """Če je job del batch-a, preveri če so vsi zaključili in pošlji summary.""" try: job = load_job(job_id) batch_id = job.get("batch_id") if not batch_id: return # Preštej batch jobe all_jobs = [load_job(jp.stem) for jp in JOBS_DIR.glob("*.json")] batch_jobs = [j for j in all_jobs if j and j.get("batch_id") == batch_id] unfinished = [j for j in batch_jobs if j.get("status") not in ("done", "failed")] if unfinished: return # še niso vsi končani # Vsi so končani — pošlji summary total = len(batch_jobs) succeeded = len([j for j in batch_jobs if j.get("status") == "done"]) failed = total - succeeded try: from app.telegram import notify_batch_complete notify_batch_complete(batch_id, total, succeeded, failed) except Exception as e: print(f"⚠️ TG batch summary failed: {e}", flush=True) except Exception as e: print(f"⚠️ _try_finalize_batch error: {e}", flush=True) # ──────────────────────────────────────────────────────────────── # FastAPI app # ──────────────────────────────────────────────────────────────── app = FastAPI(title="Reels Clipper") app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "static"), name="static") # ──────────────────────────────────────────────────────────────── # Queue worker — procesira queued jobe enega za drugim # ──────────────────────────────────────────────────────────────── import threading _queue_lock = threading.Lock() # Set ID-jev v obdelavi — omogoča 3 paralelne workerje _queue_running = {"jobs_in_progress": set()} # Število paralelnih worker-jev (CPU dovoljena hkratna obdelava) NUM_WORKERS = int(os.environ.get("NUM_WORKERS", "3")) def _queue_worker(worker_id: int): """Background thread ki preverja queued jobe in jih obdeluje paralelno. Več worker-jev teče hkrati. Vsak vzame naslednji "queued" job ki ni že v obdelavi pri drugem worker-ju. Zaklep poskrbi za atomično vzetje. """ print(f"🚜 Queue worker #{worker_id} zagnan", flush=True) while True: try: # Najdi naslednjega "queued" joba ki ni že v obdelavi with _queue_lock: in_progress = _queue_running["jobs_in_progress"] queued_jobs = [] for f in JOBS_DIR.glob("*.json"): try: j = json.loads(f.read_text()) jid = j.get("id") if j.get("status") == "queued" and jid not in in_progress: queued_jobs.append(j) except Exception: continue if not queued_jobs: next_job = None else: queued_jobs.sort(key=lambda x: x.get("created_at", 0)) next_job = queued_jobs[0] # Atomično rezerviraj _queue_running["jobs_in_progress"].add(next_job["id"]) if not next_job: time.sleep(2) continue job_id = next_job["id"] print(f"🚜 Worker #{worker_id}: obdelujem {job_id}", flush=True) try: process_job(job_id) except Exception as e: print(f"❌ Worker #{worker_id} error pri {job_id}: {e}", flush=True) update_job(job_id, status="failed", error=f"Worker #{worker_id}: {e}") finally: with _queue_lock: _queue_running["jobs_in_progress"].discard(job_id) except Exception as e: print(f"❌ Worker #{worker_id} outer error: {e}", flush=True) time.sleep(5) # Zaženi N worker-jev v ozadju (samo enkrat) _worker_threads = [] def _start_queue_worker(): global _worker_threads if _worker_threads: # Preveri da so vsi alive alive = [t for t in _worker_threads if t.is_alive()] if len(alive) == NUM_WORKERS: return _worker_threads = [] for i in range(NUM_WORKERS): t = threading.Thread(target=_queue_worker, args=(i+1,), daemon=True) t.start() _worker_threads.append(t) print(f"🚜 Zagnal {NUM_WORKERS} paralelnih worker-jev", flush=True) @app.on_event("startup") async def resume_or_cleanup_jobs(): """Ob startu containerja: avto-resume processing jobs ali jih označi kot error. Ko Coolify deploya nov container, prejšnji se ubije sredi obdelave, JSON file pa ostane status='processing'. Strategija: - Če je analyze.json že narejen (analiza je končana) → resume z reframe+subs - Če ni analyze.json → restart pipeline od začetka - Po 3 napakah (resume_attempts >= 3) → mark error """ import asyncio print("🔄 Preverjam in obnavljam jobs po restart-u...") resumed_count = 0 error_count = 0 for f in JOBS_DIR.glob("*.json"): try: j = json.loads(f.read_text()) if j.get("status") != "processing": continue job_id = j.get("job_id") or f.stem attempts = j.get("resume_attempts", 0) # Po 3 neuspehih nehamo if attempts >= 3: j["status"] = "error" j["current_step"] = "Preveč napak pri ponovnem zagonu" j["chorus_error"] = f"Job restartal {attempts} krat — napaka v pipeline-u" j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) error_count += 1 continue # Preveri ali input file še obstaja input_path = UPLOAD_DIR / f"{job_id}.mp4" if not input_path.exists(): j["status"] = "error" j["current_step"] = "Vhodna datoteka ne obstaja" j["chorus_error"] = f"Upload {job_id}.mp4 ne obstaja po restart-u" j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) error_count += 1 continue # Resume: status=queued, +1 attempt, in pošlji v background j["status"] = "queued" j["resume_attempts"] = attempts + 1 j["current_step"] = f"Avto-resume po restart-u (poskus {attempts + 1}/3)" j["last_resume_at"] = time.time() j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) resumed_count += 1 # Pošlji v background po startup-u (ne smemo blokirati startup) asyncio.create_task(_resume_job_async(job_id)) print(f" 🔁 Resume {job_id} (attempt {attempts + 1}/3)") except Exception as e: print(f" ⚠️ Napaka pri {f.name}: {e}") print(f" ✅ Resumed: {resumed_count}, Error: {error_count}") # Zaženi queue worker za queued jobe (multi-upload batch) _start_queue_worker() async def _resume_job_async(job_id): """Pomožna funkcija ki zažene process_job v background-u.""" import asyncio # Počakaj kratek čas da je startup končan await asyncio.sleep(2) try: # process_job je sync funkcija, izvedi v thread executor loop = asyncio.get_event_loop() await loop.run_in_executor(None, process_job, job_id) except Exception as e: print(f" ❌ Resume failed for {job_id}: {e}") # ──────────────────────────────────────────────────────────────── # Google Sign-In endpointi # ──────────────────────────────────────────────────────────────── @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request, error: Optional[str] = None): """Login stran z Google Sign-In gumbom.""" # Če že prijavljen, redirect na / token = request.cookies.get(SESSION_COOKIE_NAME) if token and _verify_session_token(token): email = _verify_session_token(token) if email and _email_allowed(email): return RedirectResponse(url="/", status_code=303) error_html = "" if error == "not_allowed": error_html = '