Auto-resume jobs interrupted by container restart

When Coolify redeploys, the container is killed mid-job.
Now on FastAPI startup:
- Detect status=processing jobs from JOBS_DIR
- If input file exists and resume_attempts < 3, restart pipeline (status=queued)
- After 3 failed attempts, mark as error
- If input is missing, mark error immediately
- Track resume_attempts and last_resume_at for diagnostics

Run actual process_job in asyncio executor (sync function in thread)
so startup completes quickly and resume happens in background.

Resolves: 'Veseli Dolenci stuck' issue
This commit is contained in:
Sebastjan Artič 2026-04-29 08:52:16 +00:00
parent 32baf9cd45
commit 534d710e8a

View File

@ -374,31 +374,80 @@ app.mount("/static", StaticFiles(directory=Path(__file__).parent.parent / "stati
@app.on_event("startup")
async def cleanup_stuck_jobs():
"""Ob startu containerja: označi vse 'processing' jobs kot prekinjene.
async def resume_or_cleanup_jobs():
"""Ob startu containerja: avto-resume processing jobs ali jih označi kot error.
Ko Coolify deployа nov container, prejšnji se ubije sredi obdelave,
JSON file pa ostane status='processing'. Tukaj preverimo in počistimo.
Ko Coolify deploya nov container, prejšnji se ubije sredi obdelave,
JSON file pa ostane status='processing'.
Strategija:
- Če je analyze.json že narejen (analiza je končana) resume z reframe+subs
- Če ni analyze.json restart pipeline od začetka
- Po 3 napakah (resume_attempts >= 3) mark error
"""
print("🔄 Preverjam stuck jobs...")
cleaned = 0
import asyncio
print("🔄 Preverjam in obnavljam jobs po restart-u...")
resumed_count = 0
error_count = 0
for f in JOBS_DIR.glob("*.json"):
try:
j = json.loads(f.read_text())
if j.get("status") == "processing":
if j.get("status") != "processing":
continue
job_id = j.get("job_id") or f.stem
attempts = j.get("resume_attempts", 0)
# Po 3 neuspehih nehamo
if attempts >= 3:
j["status"] = "error"
j["current_step"] = "Prekinjeno (container restart) — naloži ponovno"
j["chorus_error"] = "Container restart during deploy. Napaka ni vaša — obnovite z gumbom Process."
j["interrupted_at"] = time.time()
j["current_step"] = "Preveč napak pri ponovnem zagonu"
j["chorus_error"] = f"Job restartal {attempts} krat — napaka v pipeline-u"
j["updated_at"] = time.time()
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
cleaned += 1
error_count += 1
continue
# Preveri ali input file še obstaja
input_path = UPLOAD_DIR / f"{job_id}.mp4"
if not input_path.exists():
j["status"] = "error"
j["current_step"] = "Vhodna datoteka ne obstaja"
j["chorus_error"] = f"Upload {job_id}.mp4 ne obstaja po restart-u"
j["updated_at"] = time.time()
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
error_count += 1
continue
# Resume: status=queued, +1 attempt, in pošlji v background
j["status"] = "queued"
j["resume_attempts"] = attempts + 1
j["current_step"] = f"Avto-resume po restart-u (poskus {attempts + 1}/3)"
j["last_resume_at"] = time.time()
j["updated_at"] = time.time()
f.write_text(json.dumps(j, ensure_ascii=False, indent=2))
resumed_count += 1
# Pošlji v background po startup-u (ne smemo blokirati startup)
asyncio.create_task(_resume_job_async(job_id))
print(f" 🔁 Resume {job_id} (attempt {attempts + 1}/3)")
except Exception as e:
print(f" ⚠️ Napaka pri {f.name}: {e}")
if cleaned > 0:
print(f" ✅ Označenih {cleaned} prekinjenih jobs")
else:
print(" 👍 Ni stuck jobs")
print(f" ✅ Resumed: {resumed_count}, Error: {error_count}")
async def _resume_job_async(job_id):
"""Pomožna funkcija ki zažene process_job v background-u."""
import asyncio
# Počakaj kratek čas da je startup končan
await asyncio.sleep(2)
try:
# process_job je sync funkcija, izvedi v thread executor
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, process_job, job_id)
except Exception as e:
print(f" ❌ Resume failed for {job_id}: {e}")
@app.get("/", response_class=HTMLResponse)