""" Cleanup module: removes local files older than N hours, *only if they exist in S3*. Safe by design: - Never deletes a local file unless its S3 mirror is verified - Never deletes job metadata (jobs/*.json) — those are tiny - Default age threshold is conservative (48 h) - Dry-run mode for verification Usage (cron): python3 -m app.cleanup --apply # actual delete python3 -m app.cleanup --dry-run # preview only (default) python3 -m app.cleanup --apply --hours 72 # custom age threshold Suggested cron (every night 03:30): 30 3 * * * cd /app && python3 -m app.cleanup --apply >> /data/cleanup.log 2>&1 """ from __future__ import annotations import argparse import logging import os import sys import time from pathlib import Path # Konfiguracija — ujema main.py DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) UPLOAD_DIR = DATA_DIR / "uploads" OUTPUT_DIR = DATA_DIR / "outputs" DEFAULT_AGE_HOURS = 48 logging.basicConfig( level=logging.INFO, format="%(asctime)s [cleanup] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger(__name__) def _file_age_hours(p: Path) -> float: """Return file age in hours based on mtime.""" try: return (time.time() - p.stat().st_mtime) / 3600 except OSError: return 0.0 def _scan_dir(d: Path, kind: str, min_age_h: float): """Yield (local_path, kind, age_hours) for files older than threshold.""" if not d.exists(): return for p in d.iterdir(): if not p.is_file(): continue # Skip very small files (probably config/state) — only target real workfiles if p.stat().st_size < 1024 * 100: # <100 KB continue age = _file_age_hours(p) if age >= min_age_h: yield p, kind, age def cleanup(min_age_hours: float, apply: bool) -> dict: """Run cleanup pass. Returns stats dict. Logic: For each file older than min_age_hours in uploads/ and outputs/: - Verify S3 mirror exists (s3.exists()) - If verified: delete local - If not verified: skip (warn) — never delete unverified """ from app import s3_storage if not s3_storage.is_enabled(): log.error("S3 not configured — refusing to run cleanup. Aborting.") return {"error": "s3_not_configured"} stats = { "scanned": 0, "would_delete": 0, "deleted": 0, "skipped_no_s3_mirror": 0, "freed_mb": 0.0, "errors": 0, } deleted_files = [] skipped_files = [] for d, kind in [(UPLOAD_DIR, "upload"), (OUTPUT_DIR, "output")]: for p, _, age_h in _scan_dir(d, kind, min_age_hours): stats["scanned"] += 1 size_mb = p.stat().st_size / 1024 / 1024 folder = "uploads" if kind == "upload" else "outputs" s3_key = f"{folder}/{p.name}" # Verify S3 mirror try: s3_size = s3_storage.get_object_size(s3_key) except Exception as e: log.warning("S3 check failed for %s: %s", s3_key, e) stats["errors"] += 1 continue if s3_size is None: log.warning("SKIP — no S3 mirror: %s (age %.1fh, %.1f MB)", p.name, age_h, size_mb) stats["skipped_no_s3_mirror"] += 1 skipped_files.append(p.name) continue # Check size match (sanity) local_size = p.stat().st_size if abs(s3_size - local_size) > 1024: # >1 KB delta — suspicious log.warning("SKIP — size mismatch %s: local=%d s3=%d", p.name, local_size, s3_size) stats["skipped_no_s3_mirror"] += 1 skipped_files.append(p.name) continue stats["would_delete"] += 1 stats["freed_mb"] += size_mb if apply: try: p.unlink() stats["deleted"] += 1 deleted_files.append(p.name) log.info("DEL %s (%.1f MB, age %.1fh, S3 verified)", p.name, size_mb, age_h) except OSError as e: log.error("Delete failed %s: %s", p, e) stats["errors"] += 1 else: log.info("DRY %s (%.1f MB, age %.1fh, S3 verified)", p.name, size_mb, age_h) log.info("=" * 60) log.info("Cleanup pass: %s", "APPLY" if apply else "DRY-RUN") log.info(" scanned: %d", stats["scanned"]) log.info(" would-delete: %d", stats["would_delete"]) log.info(" deleted: %d", stats["deleted"]) log.info(" skipped (no S3): %d", stats["skipped_no_s3_mirror"]) log.info(" errors: %d", stats["errors"]) log.info(" freed: %.1f MB", stats["freed_mb"]) return stats def main(): ap = argparse.ArgumentParser() ap.add_argument("--apply", action="store_true", help="Actually delete files (default is dry-run)") ap.add_argument("--dry-run", action="store_true", help="Preview only, do not delete (default)") ap.add_argument("--hours", type=float, default=DEFAULT_AGE_HOURS, help=f"Min file age in hours (default {DEFAULT_AGE_HOURS})") args = ap.parse_args() apply = args.apply and not args.dry_run stats = cleanup(min_age_hours=args.hours, apply=apply) if stats.get("error"): sys.exit(2) if __name__ == "__main__": main()