From 0d72d70f5dafe17a1fed0779a0a68f077d00599f Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 12:24:18 +0000 Subject: [PATCH] S3 mirror integration: workfiles auto-mirror to s3://folxspeed/reels-app/ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - main.py: 4 helper funcs (_persist_to_s3, _ensure_local, _delete_from_s3, _ffmpeg_then_persist) - no-op fallback when S3 creds missing - save_job(): mirror metadata JSON to S3 - process_job(): mirror YT download + render output + analysis/srt/ass to S3 - upload_video(): mirror direct uploads to S3 - _precache_edit_assets(): Popen->threaded with S3 sync after ffmpeg - read endpoints (download, preview, source_video, waveform, preview_clip, get_transcript, recut_job): _ensure_local() fallback fetch from S3 - delete_job(): cascade delete to S3 (mirror unlink) - cleanup.py: NEW module, deletes local files >48h that exist in S3. Verified by S3 head_object + size match. NOT YET ACTIVATED in cron. Backward compat: lokalna mapa ostane primary. Brez env vars S3_* vsi helperji vrnejo False (no-op). Production behavior identičen, dokler ne dobi S3 creds. --- app/cleanup.py | 167 +++++++++++++++++++++++++++++++++++++++++++++++++ app/main.py | 164 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 318 insertions(+), 13 deletions(-) create mode 100644 app/cleanup.py diff --git a/app/cleanup.py b/app/cleanup.py new file mode 100644 index 0000000..aed91a4 --- /dev/null +++ b/app/cleanup.py @@ -0,0 +1,167 @@ +""" +Cleanup module: removes local files older than N hours, *only if they exist in S3*. + +Safe by design: +- Never deletes a local file unless its S3 mirror is verified +- Never deletes job metadata (jobs/*.json) — those are tiny +- Default age threshold is conservative (48 h) +- Dry-run mode for verification + +Usage (cron): + python3 -m app.cleanup --apply # actual delete + python3 -m app.cleanup --dry-run # preview only (default) + python3 -m app.cleanup --apply --hours 72 # custom age threshold + +Suggested cron (every night 03:30): + 30 3 * * * cd /app && python3 -m app.cleanup --apply >> /data/cleanup.log 2>&1 +""" +from __future__ import annotations + +import argparse +import logging +import os +import sys +import time +from pathlib import Path + +# Konfiguracija — ujema main.py +DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) +UPLOAD_DIR = DATA_DIR / "uploads" +OUTPUT_DIR = DATA_DIR / "outputs" + +DEFAULT_AGE_HOURS = 48 + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [cleanup] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger(__name__) + + +def _file_age_hours(p: Path) -> float: + """Return file age in hours based on mtime.""" + try: + return (time.time() - p.stat().st_mtime) / 3600 + except OSError: + return 0.0 + + +def _scan_dir(d: Path, kind: str, min_age_h: float): + """Yield (local_path, kind, age_hours) for files older than threshold.""" + if not d.exists(): + return + for p in d.iterdir(): + if not p.is_file(): + continue + # Skip very small files (probably config/state) — only target real workfiles + if p.stat().st_size < 1024 * 100: # <100 KB + continue + age = _file_age_hours(p) + if age >= min_age_h: + yield p, kind, age + + +def cleanup(min_age_hours: float, apply: bool) -> dict: + """Run cleanup pass. Returns stats dict. + + Logic: + For each file older than min_age_hours in uploads/ and outputs/: + - Verify S3 mirror exists (s3.exists()) + - If verified: delete local + - If not verified: skip (warn) — never delete unverified + """ + from app import s3_storage + + if not s3_storage.is_enabled(): + log.error("S3 not configured — refusing to run cleanup. Aborting.") + return {"error": "s3_not_configured"} + + stats = { + "scanned": 0, + "would_delete": 0, + "deleted": 0, + "skipped_no_s3_mirror": 0, + "freed_mb": 0.0, + "errors": 0, + } + deleted_files = [] + skipped_files = [] + + for d, kind in [(UPLOAD_DIR, "upload"), (OUTPUT_DIR, "output")]: + for p, _, age_h in _scan_dir(d, kind, min_age_hours): + stats["scanned"] += 1 + size_mb = p.stat().st_size / 1024 / 1024 + folder = "uploads" if kind == "upload" else "outputs" + s3_key = f"{folder}/{p.name}" + + # Verify S3 mirror + try: + s3_size = s3_storage.get_object_size(s3_key) + except Exception as e: + log.warning("S3 check failed for %s: %s", s3_key, e) + stats["errors"] += 1 + continue + + if s3_size is None: + log.warning("SKIP — no S3 mirror: %s (age %.1fh, %.1f MB)", + p.name, age_h, size_mb) + stats["skipped_no_s3_mirror"] += 1 + skipped_files.append(p.name) + continue + + # Check size match (sanity) + local_size = p.stat().st_size + if abs(s3_size - local_size) > 1024: # >1 KB delta — suspicious + log.warning("SKIP — size mismatch %s: local=%d s3=%d", + p.name, local_size, s3_size) + stats["skipped_no_s3_mirror"] += 1 + skipped_files.append(p.name) + continue + + stats["would_delete"] += 1 + stats["freed_mb"] += size_mb + + if apply: + try: + p.unlink() + stats["deleted"] += 1 + deleted_files.append(p.name) + log.info("DEL %s (%.1f MB, age %.1fh, S3 verified)", + p.name, size_mb, age_h) + except OSError as e: + log.error("Delete failed %s: %s", p, e) + stats["errors"] += 1 + else: + log.info("DRY %s (%.1f MB, age %.1fh, S3 verified)", + p.name, size_mb, age_h) + + log.info("=" * 60) + log.info("Cleanup pass: %s", "APPLY" if apply else "DRY-RUN") + log.info(" scanned: %d", stats["scanned"]) + log.info(" would-delete: %d", stats["would_delete"]) + log.info(" deleted: %d", stats["deleted"]) + log.info(" skipped (no S3): %d", stats["skipped_no_s3_mirror"]) + log.info(" errors: %d", stats["errors"]) + log.info(" freed: %.1f MB", stats["freed_mb"]) + return stats + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--apply", action="store_true", + help="Actually delete files (default is dry-run)") + ap.add_argument("--dry-run", action="store_true", + help="Preview only, do not delete (default)") + ap.add_argument("--hours", type=float, default=DEFAULT_AGE_HOURS, + help=f"Min file age in hours (default {DEFAULT_AGE_HOURS})") + args = ap.parse_args() + + apply = args.apply and not args.dry_run + stats = cleanup(min_age_hours=args.hours, apply=apply) + if stats.get("error"): + sys.exit(2) + + +if __name__ == "__main__": + main() diff --git a/app/main.py b/app/main.py index f21e995..d4d0893 100644 --- a/app/main.py +++ b/app/main.py @@ -19,6 +19,7 @@ import os import secrets import shutil import subprocess +import threading import time import uuid from pathlib import Path @@ -55,6 +56,75 @@ QNET_DIR.mkdir(parents=True, exist_ok=True) os.environ.setdefault("QNET_LOOKUP_PATH", str(QNET_DIR / "songs_lookup.json")) from app import qnet_match # noqa: E402 +# S3 storage mirror — uploads/outputs/jobs gredo tudi v s3://folxspeed/reels-app/ +# Lokalna mapa ostane primary, S3 je replica/cache. +from app import s3_storage # noqa: E402 + + +def _persist_to_s3(local_path, kind: str) -> bool: + """Mirror local file to S3 after producing it (best-effort). + + kind: 'upload' for uploads/, 'output' for outputs/, 'job_meta' for jobs/ + Silent no-op when S3 not configured. Never raises. + """ + try: + if not s3_storage.is_enabled(): + return False + p = Path(local_path) + if not p.exists() or p.stat().st_size == 0: + return False + return s3_storage.upload_job_file("", kind, p) + except Exception as e: + print(f"⚠️ S3 mirror failed for {local_path}: {e}", flush=True) + return False + + +def _ensure_local(local_path, kind: str) -> bool: + """Make sure file is on disk; if missing, fetch from S3. + + Returns True if file is ready locally after the call. + kind: 'upload' or 'output' + """ + p = Path(local_path) + if p.exists() and p.stat().st_size > 0: + return True + if not s3_storage.is_enabled(): + return False + folder = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"}.get(kind, kind) + key = f"{folder}/{p.name}" + print(f"📥 Local missing, fetching from S3: {key}", flush=True) + return s3_storage.download(key, p) + + +def _delete_from_s3(filename: str, kind: str) -> bool: + """Delete object from S3 (mirror local delete). Best-effort, no raise.""" + try: + if not s3_storage.is_enabled(): + return False + folder = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"}.get(kind, kind) + return s3_storage.delete(f"{folder}/{filename}") + except Exception: + return False + + +def _ffmpeg_then_persist(cmd, out_path, kind: str = "output", timeout: int = 600): + """Run ffmpeg in background thread, then mirror result to S3. + + Drop-in replacement for subprocess.Popen() when we want S3 sync after. + """ + def runner(): + try: + subprocess.run( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + timeout=timeout, + ) + _persist_to_s3(out_path, kind) + except Exception as e: + print(f"⚠️ ffmpeg/persist failed for {out_path}: {e}", flush=True) + threading.Thread(target=runner, daemon=True).start() + # Dedup DB — sledi že obdelanim/naloženim komadom DEDUP_DB = DATA_DIR / "processed.db" @@ -346,7 +416,9 @@ def load_job(job_id): def save_job(job): - job_path(job["id"]).write_text(json.dumps(job, ensure_ascii=False, indent=2)) + p = job_path(job["id"]) + p.write_text(json.dumps(job, ensure_ascii=False, indent=2)) + _persist_to_s3(p, "job_meta") def update_job(job_id, **kwargs): @@ -593,8 +665,8 @@ def _precache_edit_assets(job_id: str, src_path: str): "-loglevel", "error", str(low_path), ] - subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - print(f"📦 Pre-cache low-q source za {job_id} (background)", flush=True) + _ffmpeg_then_persist(cmd, low_path, kind="output") + print(f"📦 Pre-cache low-q source za {job_id} (background → S3)", flush=True) # Waveform PNG (2400x72 — za zoom) wave_path = OUTPUT_DIR / f"{job_id}_waveform_2400x72.png" @@ -611,8 +683,8 @@ def _precache_edit_assets(job_id: str, src_path: str): "-loglevel", "error", str(wave_path), ] - subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - print(f"📦 Pre-cache waveform za {job_id} (background)", flush=True) + _ffmpeg_then_persist(cmd, wave_path, kind="output") + print(f"📦 Pre-cache waveform za {job_id} (background → S3)", flush=True) def process_job(job_id): @@ -633,6 +705,11 @@ def process_job(job_id): if not run_subprocess_logged(cmd, job_id, "YouTube download"): return update_job(job_id, input_path=str(input_path)) + # S3 mirror — original video + info.json + _persist_to_s3(input_path, "upload") + info_json = input_path.with_suffix(".info.json") + if info_json.exists(): + _persist_to_s3(info_json, "upload") # Probaj dobiti YT metadata (če še ni iz submit-a) — title, uploader, id, ... # Single video submit ali playlist resolve že nastavi metadata, ampak @@ -926,6 +1003,12 @@ def process_job(job_id): output_path=str(output_path), output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2), ) + # S3 mirror — final reel + pomožne datoteke (analysis, subtitles) + _persist_to_s3(output_path, "output") + for suffix in (".analysis.json", ".subtitles.srt", ".subtitles.ass"): + aux = OUTPUT_DIR / f"{job_id}{suffix}" + if aux.exists(): + _persist_to_s3(aux, "output") # Telegram obvestilo try: from app.telegram import notify_job_done @@ -1375,6 +1458,8 @@ async def upload_video( job["has_clean_name"] = bool(a and t) save_job(job) + # S3 mirror — direct upload datoteka + _persist_to_s3(input_path, "upload") return job @@ -1683,6 +1768,7 @@ async def download(job_id: str, user: str = Depends(check_auth)): if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) + _ensure_local(out, "output") if not out.exists(): raise HTTPException(404, "Output ne obstaja") @@ -1703,6 +1789,7 @@ async def preview(job_id: str, request: Request, user: str = Depends(check_auth) if not job or job.get("status") != "done": raise HTTPException(404, "Ne pripravljen") out = Path(job["output_path"]) + _ensure_local(out, "output") if not out.exists(): raise HTTPException(404, "Output ne obstaja") @@ -1762,11 +1849,41 @@ async def delete_job(job_id: str, user: str = Depends(check_auth)): job = load_job(job_id) if not job: raise HTTPException(404, "Ne obstaja") - for key in ("input_path", "output_path"): + # Glavni input + output (po job records) + for key, kind in (("input_path", "upload"), ("output_path", "output")): p = job.get(key) - if p and Path(p).exists(): - Path(p).unlink(missing_ok=True) - job_path(job_id).unlink(missing_ok=True) + if p: + local_p = Path(p) + local_p.unlink(missing_ok=True) + _delete_from_s3(local_p.name, kind) + # Pomožne datoteke v outputs/ (analysis, subtitles, low-q, waveform) + for fname in ( + f"{job_id}.mp4", + f"{job_id}.analysis.json", + f"{job_id}.subtitles.srt", + f"{job_id}.subtitles.ass", + f"{job_id}_source_low.mp4", + ): + f = OUTPUT_DIR / fname + f.unlink(missing_ok=True) + _delete_from_s3(fname, "output") + # Waveform PNG-ji (več velikosti) — listanje ker imena niso fiksna + try: + for wf in OUTPUT_DIR.glob(f"{job_id}_waveform_*.png"): + wf_name = wf.name + wf.unlink(missing_ok=True) + _delete_from_s3(wf_name, "output") + except Exception: + pass + # YT info.json + info_json = UPLOAD_DIR / f"{job_id}_yt.info.json" + if info_json.exists(): + info_json.unlink(missing_ok=True) + _delete_from_s3(f"{job_id}_yt.info.json", "upload") + # Job metadata + jp = job_path(job_id) + jp.unlink(missing_ok=True) + _delete_from_s3(f"{job_id}.json", "job_meta") return {"deleted": job_id} @@ -1782,12 +1899,17 @@ async def source_video(job_id: str, quality: str = "high", user: str = Depends(c if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") - if not src or not Path(src).exists(): + if not src: + raise HTTPException(404, "Original video ne obstaja") + _ensure_local(src, "upload") + if not Path(src).exists(): raise HTTPException(404, "Original video ne obstaja") if quality == "low": # 480p cached za hitro scrubbanje cache_path = OUTPUT_DIR / f"{job_id}_source_low.mp4" + # Najprej probaj fetch iz S3 (po cleanupu lahko manjka lokalno) + _ensure_local(cache_path, "output") cache_valid = cache_path.exists() and cache_path.stat().st_size > 1024 if not cache_valid: @@ -1812,6 +1934,8 @@ async def source_video(job_id: str, quality: str = "high", user: str = Depends(c if cache_path.exists(): cache_path.unlink() raise HTTPException(500, f"FFmpeg failed: {(proc.stderr or 'unknown')[-300:]}") + # Mirror v S3 po regeneraciji + _persist_to_s3(cache_path, "output") except subprocess.TimeoutExpired: if cache_path.exists(): cache_path.unlink() @@ -1842,13 +1966,17 @@ async def waveform(job_id: str, width: int = 1200, height: int = 80, user: str = if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") - if not src or not Path(src).exists(): + if not src: + raise HTTPException(404, "Original video ne obstaja") + _ensure_local(src, "upload") + if not Path(src).exists(): raise HTTPException(404, "Original video ne obstaja") width = max(400, min(width, 3000)) height = max(40, min(height, 200)) cache_path = OUTPUT_DIR / f"{job_id}_waveform_{width}x{height}.png" + _ensure_local(cache_path, "output") cache_valid = cache_path.exists() and cache_path.stat().st_size > 100 if not cache_valid: @@ -1872,6 +2000,8 @@ async def waveform(job_id: str, width: int = 1200, height: int = 80, user: str = if cache_path.exists(): cache_path.unlink() raise HTTPException(500, f"Waveform render failed: {(proc.stderr or 'unknown')[-300:]}") + # Mirror v S3 po regeneraciji + _persist_to_s3(cache_path, "output") except subprocess.TimeoutExpired: if cache_path.exists(): cache_path.unlink() @@ -1903,7 +2033,10 @@ async def preview_clip( if not job: raise HTTPException(404, "Ne obstaja") src = job.get("input_path") - if not src or not Path(src).exists(): + if not src: + raise HTTPException(404, "Original video ne obstaja") + _ensure_local(src, "upload") + if not Path(src).exists(): raise HTTPException(404, "Original video ne obstaja") if end <= start: @@ -1978,6 +2111,7 @@ async def get_transcript(job_id: str, user: str = Depends(check_auth)): if not job: raise HTTPException(404, "Ne obstaja") analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" + _ensure_local(analysis_path, "output") if not analysis_path.exists(): raise HTTPException(404, "Analysis ne obstaja") try: @@ -2217,7 +2351,10 @@ async def recut_job(job_id: str, payload: RecutRequest, user: str = Depends(chec raise HTTPException(404, "Ne obstaja") src = job.get("input_path") - if not src or not Path(src).exists(): + if not src: + raise HTTPException(400, "Original video manjka") + _ensure_local(src, "upload") + if not Path(src).exists(): raise HTTPException(400, "Original video manjka") if payload.end <= payload.start: @@ -2232,6 +2369,7 @@ async def recut_job(job_id: str, payload: RecutRequest, user: str = Depends(chec # Naloži obstoječi analysis analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json" + _ensure_local(analysis_path, "output") if not analysis_path.exists(): raise HTTPException(500, "Analysis manjka — re-uplad pesmi")