Parallel workers (3) + pre-cache Edit assets

User feedback: 'ne morem nič drugega delat dokler izvaža reel?
a če bi bile večje mašine bi blo bolj?'

Without GPU upgrade, optimize CPU usage:

1. PARALLEL WORKERS:
   - Was: 1 worker thread, processes 1 job at a time
   - Now: NUM_WORKERS=3 parallel threads (configurable via env)
   - Each worker locks its job atomically (set instead of single var)
   - 3 reels render simultaneously instead of sequentially
   - Edit feature usable while other reels render

2. PRE-CACHE EDIT ASSETS:
   - On job done, fire-and-forget ffmpeg subprocess.Popen for:
     * low-q source video (480p) — used in Edit modal video player
     * waveform PNG (2400x72) — used in Edit modal trim bar
   - Both run in background, don't block pipeline
   - When user later clicks Edit, assets already cached → modal instant
   - On-demand fallback still works if precache failed

Result: Edit modal opens instantly even while other reels render.
3 reels can render in parallel = ~3x throughput on multi-core CPU.
This commit is contained in:
Sebastjan Artič 2026-04-30 12:55:38 +00:00
parent 47a114ce6a
commit 1d6af29a23

View File

@ -437,6 +437,53 @@ def run_subprocess_logged(cmd, job_id, step_name):
return True
def _precache_edit_assets(job_id: str, src_path: str):
"""Generira low-q source video + waveform PNG za Edit modal.
Tečeta v ozadju (subprocess), ne blokira pipeline-a.
Če Edit modal je odprt, te assete dobi instant.
"""
src = Path(src_path)
if not src.exists():
return
# Low-q source (480p) — za hitro scrubbanje
low_path = OUTPUT_DIR / f"{job_id}_source_low.mp4"
if not low_path.exists() or low_path.stat().st_size < 1024:
if low_path.exists():
low_path.unlink()
cmd = [
"ffmpeg", "-y",
"-i", str(src),
"-vf", "scale=854:480:force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "28",
"-c:a", "aac", "-b:a", "96k",
"-movflags", "+faststart",
"-loglevel", "error",
str(low_path),
]
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print(f"📦 Pre-cache low-q source za {job_id} (background)", flush=True)
# Waveform PNG (2400x72 — za zoom)
wave_path = OUTPUT_DIR / f"{job_id}_waveform_2400x72.png"
if not wave_path.exists() or wave_path.stat().st_size < 100:
if wave_path.exists():
wave_path.unlink()
cmd = [
"ffmpeg", "-y",
"-i", str(src),
"-filter_complex",
"[0:a]aformat=channel_layouts=mono,showwavespic=s=2400x72:colors=#ff6b6b:scale=lin:draw=full[wave]",
"-map", "[wave]",
"-frames:v", "1",
"-loglevel", "error",
str(wave_path),
]
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print(f"📦 Pre-cache waveform za {job_id} (background)", flush=True)
def process_job(job_id):
"""Glavni pipeline: download (če YT) → find_chorus (če auto) → reframe → subs."""
job = load_job(job_id)
@ -706,6 +753,15 @@ def process_job(job_id):
except Exception as e:
print(f"⚠️ TG notify_job_done failed: {e}", flush=True)
# Pre-cache za Edit modal (instant odpiranje):
# - low-q source video (480p) za hitro scrubbanje
# - waveform PNG
# Ti se bodo zgenerirali tudi on-demand, ampak zdaj so že tu = Edit instant.
try:
_precache_edit_assets(job_id, str(input_path))
except Exception as e:
print(f"⚠️ Edit precache failed: {e}", flush=True)
# Batch tracking — če je zadnji v batchu, pošlji summary
_try_finalize_batch(job_id)
else:
@ -768,70 +824,77 @@ app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "stati
# ────────────────────────────────────────────────────────────────
import threading
_queue_lock = threading.Lock()
_queue_running = {"current_job": None} # tracked v dict da je accessible iz threadov
# Set ID-jev v obdelavi — omogoča 3 paralelne workerje
_queue_running = {"jobs_in_progress": set()}
# Število paralelnih worker-jev (CPU dovoljena hkratna obdelava)
NUM_WORKERS = int(os.environ.get("NUM_WORKERS", "3"))
def _queue_worker():
"""Background thread ki preverja queued jobe in jih obdeluje 1 po 1.
def _queue_worker(worker_id: int):
"""Background thread ki preverja queued jobe in jih obdeluje paralelno.
Teče forever, sleep 3s med iteracijami če ni dela.
Več worker-jev teče hkrati. Vsak vzame naslednji "queued" job ki ni
že v obdelavi pri drugem worker-ju. Zaklep poskrbi za atomično vzetje.
"""
print("🚜 Queue worker zagnan", flush=True)
print(f"🚜 Queue worker #{worker_id} zagnan", flush=True)
while True:
try:
# Že nekaj v obdelavi? Počakaj.
if _queue_running["current_job"]:
# Preverim ali je status še processing
cur = load_job(_queue_running["current_job"])
if cur and cur.get("status") in ("processing", "downloading"):
time.sleep(3)
continue
# Ni več v obdelavi → sprosti
_queue_running["current_job"] = None
# Najdi naslednjega "queued" joba ki ni že v obdelavi
with _queue_lock:
in_progress = _queue_running["jobs_in_progress"]
queued_jobs = []
for f in JOBS_DIR.glob("*.json"):
try:
j = json.loads(f.read_text())
jid = j.get("id")
if j.get("status") == "queued" and jid not in in_progress:
queued_jobs.append(j)
except Exception:
continue
if not queued_jobs:
next_job = None
else:
queued_jobs.sort(key=lambda x: x.get("created_at", 0))
next_job = queued_jobs[0]
# Atomično rezerviraj
_queue_running["jobs_in_progress"].add(next_job["id"])
# Najdi naslednjega "queued" joba (FIFO po created_at)
queued_jobs = []
for f in JOBS_DIR.glob("*.json"):
try:
j = json.loads(f.read_text())
if j.get("status") == "queued":
queued_jobs.append(j)
except Exception:
continue
if not queued_jobs:
time.sleep(3)
if not next_job:
time.sleep(2)
continue
queued_jobs.sort(key=lambda x: x.get("created_at", 0))
next_job = queued_jobs[0]
job_id = next_job["id"]
# Mark "processing" + zaženi
with _queue_lock:
_queue_running["current_job"] = job_id
print(f"🚜 Queue worker: obdelujem {job_id}", flush=True)
print(f"🚜 Worker #{worker_id}: obdelujem {job_id}", flush=True)
try:
process_job(job_id)
except Exception as e:
print(f"Queue worker error pri {job_id}: {e}", flush=True)
update_job(job_id, status="failed", error=f"Queue worker: {e}")
print(f"❌ Worker #{worker_id} error pri {job_id}: {e}", flush=True)
update_job(job_id, status="failed", error=f"Worker #{worker_id}: {e}")
finally:
with _queue_lock:
_queue_running["current_job"] = None
_queue_running["jobs_in_progress"].discard(job_id)
except Exception as e:
print(f"Queue worker outer error: {e}", flush=True)
print(f"Worker #{worker_id} outer error: {e}", flush=True)
time.sleep(5)
# Zaženi worker v ozadju (samo enkrat)
_worker_thread = None
# Zaženi N worker-jev v ozadju (samo enkrat)
_worker_threads = []
def _start_queue_worker():
global _worker_thread
if _worker_thread is None or not _worker_thread.is_alive():
_worker_thread = threading.Thread(target=_queue_worker, daemon=True)
_worker_thread.start()
global _worker_threads
if _worker_threads:
# Preveri da so vsi alive
alive = [t for t in _worker_threads if t.is_alive()]
if len(alive) == NUM_WORKERS:
return
_worker_threads = []
for i in range(NUM_WORKERS):
t = threading.Thread(target=_queue_worker, args=(i+1,), daemon=True)
t.start()
_worker_threads.append(t)
print(f"🚜 Zagnal {NUM_WORKERS} paralelnih worker-jev", flush=True)
@app.on_event("startup")