diff --git a/app/main.py b/app/main.py index bfd6cae..adb4bc7 100644 --- a/app/main.py +++ b/app/main.py @@ -437,6 +437,53 @@ def run_subprocess_logged(cmd, job_id, step_name): return True +def _precache_edit_assets(job_id: str, src_path: str): + """Generira low-q source video + waveform PNG za Edit modal. + + Tečeta v ozadju (subprocess), ne blokira pipeline-a. + Če Edit modal je odprt, te assete dobi instant. + """ + src = Path(src_path) + if not src.exists(): + return + + # Low-q source (480p) — za hitro scrubbanje + low_path = OUTPUT_DIR / f"{job_id}_source_low.mp4" + if not low_path.exists() or low_path.stat().st_size < 1024: + if low_path.exists(): + low_path.unlink() + cmd = [ + "ffmpeg", "-y", + "-i", str(src), + "-vf", "scale=854:480:force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2", + "-c:v", "libx264", "-preset", "veryfast", "-crf", "28", + "-c:a", "aac", "-b:a", "96k", + "-movflags", "+faststart", + "-loglevel", "error", + str(low_path), + ] + subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + print(f"📦 Pre-cache low-q source za {job_id} (background)", flush=True) + + # Waveform PNG (2400x72 — za zoom) + wave_path = OUTPUT_DIR / f"{job_id}_waveform_2400x72.png" + if not wave_path.exists() or wave_path.stat().st_size < 100: + if wave_path.exists(): + wave_path.unlink() + cmd = [ + "ffmpeg", "-y", + "-i", str(src), + "-filter_complex", + "[0:a]aformat=channel_layouts=mono,showwavespic=s=2400x72:colors=#ff6b6b:scale=lin:draw=full[wave]", + "-map", "[wave]", + "-frames:v", "1", + "-loglevel", "error", + str(wave_path), + ] + subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + print(f"📦 Pre-cache waveform za {job_id} (background)", flush=True) + + def process_job(job_id): """Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs.""" job = load_job(job_id) @@ -706,6 +753,15 @@ def process_job(job_id): except Exception as e: print(f"⚠️ TG notify_job_done failed: {e}", flush=True) + # Pre-cache za Edit modal (instant odpiranje): + # - low-q source video (480p) za hitro scrubbanje + # - waveform PNG + # Ti se bodo zgenerirali tudi on-demand, ampak zdaj so že tu = Edit instant. + try: + _precache_edit_assets(job_id, str(input_path)) + except Exception as e: + print(f"⚠️ Edit precache failed: {e}", flush=True) + # Batch tracking — če je zadnji v batchu, pošlji summary _try_finalize_batch(job_id) else: @@ -768,70 +824,77 @@ app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "stati # ──────────────────────────────────────────────────────────────── import threading _queue_lock = threading.Lock() -_queue_running = {"current_job": None} # tracked v dict da je accessible iz threadov +# Set ID-jev v obdelavi — omogoča 3 paralelne workerje +_queue_running = {"jobs_in_progress": set()} + +# Število paralelnih worker-jev (CPU dovoljena hkratna obdelava) +NUM_WORKERS = int(os.environ.get("NUM_WORKERS", "3")) -def _queue_worker(): - """Background thread ki preverja queued jobe in jih obdeluje 1 po 1. +def _queue_worker(worker_id: int): + """Background thread ki preverja queued jobe in jih obdeluje paralelno. - Teče forever, sleep 3s med iteracijami če ni dela. + Več worker-jev teče hkrati. Vsak vzame naslednji "queued" job ki ni + že v obdelavi pri drugem worker-ju. Zaklep poskrbi za atomično vzetje. """ - print("🚜 Queue worker zagnan", flush=True) + print(f"🚜 Queue worker #{worker_id} zagnan", flush=True) while True: try: - # Že nekaj v obdelavi? Počakaj. - if _queue_running["current_job"]: - # Preverim ali je status še processing - cur = load_job(_queue_running["current_job"]) - if cur and cur.get("status") in ("processing", "downloading"): - time.sleep(3) - continue - # Ni več v obdelavi → sprosti - _queue_running["current_job"] = None + # Najdi naslednjega "queued" joba ki ni že v obdelavi + with _queue_lock: + in_progress = _queue_running["jobs_in_progress"] + queued_jobs = [] + for f in JOBS_DIR.glob("*.json"): + try: + j = json.loads(f.read_text()) + jid = j.get("id") + if j.get("status") == "queued" and jid not in in_progress: + queued_jobs.append(j) + except Exception: + continue + + if not queued_jobs: + next_job = None + else: + queued_jobs.sort(key=lambda x: x.get("created_at", 0)) + next_job = queued_jobs[0] + # Atomično rezerviraj + _queue_running["jobs_in_progress"].add(next_job["id"]) - # Najdi naslednjega "queued" joba (FIFO po created_at) - queued_jobs = [] - for f in JOBS_DIR.glob("*.json"): - try: - j = json.loads(f.read_text()) - if j.get("status") == "queued": - queued_jobs.append(j) - except Exception: - continue - - if not queued_jobs: - time.sleep(3) + if not next_job: + time.sleep(2) continue - queued_jobs.sort(key=lambda x: x.get("created_at", 0)) - next_job = queued_jobs[0] job_id = next_job["id"] - - # Mark "processing" + zaženi - with _queue_lock: - _queue_running["current_job"] = job_id - - print(f"🚜 Queue worker: obdelujem {job_id}", flush=True) + print(f"🚜 Worker #{worker_id}: obdelujem {job_id}", flush=True) try: process_job(job_id) except Exception as e: - print(f"❌ Queue worker error pri {job_id}: {e}", flush=True) - update_job(job_id, status="failed", error=f"Queue worker: {e}") + print(f"❌ Worker #{worker_id} error pri {job_id}: {e}", flush=True) + update_job(job_id, status="failed", error=f"Worker #{worker_id}: {e}") finally: with _queue_lock: - _queue_running["current_job"] = None + _queue_running["jobs_in_progress"].discard(job_id) except Exception as e: - print(f"❌ Queue worker outer error: {e}", flush=True) + print(f"❌ Worker #{worker_id} outer error: {e}", flush=True) time.sleep(5) -# Zaženi worker v ozadju (samo enkrat) -_worker_thread = None +# Zaženi N worker-jev v ozadju (samo enkrat) +_worker_threads = [] def _start_queue_worker(): - global _worker_thread - if _worker_thread is None or not _worker_thread.is_alive(): - _worker_thread = threading.Thread(target=_queue_worker, daemon=True) - _worker_thread.start() + global _worker_threads + if _worker_threads: + # Preveri da so vsi alive + alive = [t for t in _worker_threads if t.is_alive()] + if len(alive) == NUM_WORKERS: + return + _worker_threads = [] + for i in range(NUM_WORKERS): + t = threading.Thread(target=_queue_worker, args=(i+1,), daemon=True) + t.start() + _worker_threads.append(t) + print(f"🚜 Zagnal {NUM_WORKERS} paralelnih worker-jev", flush=True) @app.on_event("startup")