diff --git a/app/main.py b/app/main.py index c98d94a..999c865 100644 --- a/app/main.py +++ b/app/main.py @@ -374,31 +374,80 @@ app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "stati @app.on_event("startup") -async def cleanup_stuck_jobs(): - """Ob startu containerja: označi vse 'processing' jobs kot prekinjene. +async def resume_or_cleanup_jobs(): + """Ob startu containerja: avto-resume processing jobs ali jih označi kot error. - Ko Coolify deployа nov container, prejšnji se ubije sredi obdelave, - JSON file pa ostane status='processing'. Tukaj preverimo in počistimo. + Ko Coolify deploya nov container, prejšnji se ubije sredi obdelave, + JSON file pa ostane status='processing'. + + Strategija: + - Če je analyze.json že narejen (analiza je končana) → resume z reframe+subs + - Če ni analyze.json → restart pipeline od začetka + - Po 3 napakah (resume_attempts >= 3) → mark error """ - print("🔄 Preverjam stuck jobs...") - cleaned = 0 + import asyncio + print("🔄 Preverjam in obnavljam jobs po restart-u...") + resumed_count = 0 + error_count = 0 + for f in JOBS_DIR.glob("*.json"): try: j = json.loads(f.read_text()) - if j.get("status") == "processing": + if j.get("status") != "processing": + continue + + job_id = j.get("job_id") or f.stem + attempts = j.get("resume_attempts", 0) + + # Po 3 neuspehih nehamo + if attempts >= 3: j["status"] = "error" - j["current_step"] = "Prekinjeno (container restart) — naloži ponovno" - j["chorus_error"] = "Container restart during deploy. Napaka ni vaša — obnovite z gumbom Process." - j["interrupted_at"] = time.time() + j["current_step"] = "Preveč napak pri ponovnem zagonu" + j["chorus_error"] = f"Job restartal {attempts} krat — napaka v pipeline-u" j["updated_at"] = time.time() f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) - cleaned += 1 + error_count += 1 + continue + + # Preveri ali input file še obstaja + input_path = UPLOAD_DIR / f"{job_id}.mp4" + if not input_path.exists(): + j["status"] = "error" + j["current_step"] = "Vhodna datoteka ne obstaja" + j["chorus_error"] = f"Upload {job_id}.mp4 ne obstaja po restart-u" + j["updated_at"] = time.time() + f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) + error_count += 1 + continue + + # Resume: status=queued, +1 attempt, in pošlji v background + j["status"] = "queued" + j["resume_attempts"] = attempts + 1 + j["current_step"] = f"Avto-resume po restart-u (poskus {attempts + 1}/3)" + j["last_resume_at"] = time.time() + j["updated_at"] = time.time() + f.write_text(json.dumps(j, ensure_ascii=False, indent=2)) + resumed_count += 1 + # Pošlji v background po startup-u (ne smemo blokirati startup) + asyncio.create_task(_resume_job_async(job_id)) + print(f" 🔁 Resume {job_id} (attempt {attempts + 1}/3)") except Exception as e: print(f" ⚠️ Napaka pri {f.name}: {e}") - if cleaned > 0: - print(f" ✅ Označenih {cleaned} prekinjenih jobs") - else: - print(" 👍 Ni stuck jobs") + + print(f" ✅ Resumed: {resumed_count}, Error: {error_count}") + + +async def _resume_job_async(job_id): + """Pomožna funkcija ki zažene process_job v background-u.""" + import asyncio + # Počakaj kratek čas da je startup končan + await asyncio.sleep(2) + try: + # process_job je sync funkcija, izvedi v thread executor + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, process_job, job_id) + except Exception as e: + print(f" ❌ Resume failed for {job_id}: {e}") @app.get("/", response_class=HTMLResponse)