S3 mirror integration: workfiles auto-mirror to s3://folxspeed/reels-app/

- main.py: 4 helper funcs (_persist_to_s3, _ensure_local, _delete_from_s3,
  _ffmpeg_then_persist) - no-op fallback when S3 creds missing
- save_job(): mirror metadata JSON to S3
- process_job(): mirror YT download + render output + analysis/srt/ass to S3
- upload_video(): mirror direct uploads to S3
- _precache_edit_assets(): Popen->threaded with S3 sync after ffmpeg
- read endpoints (download, preview, source_video, waveform, preview_clip,
  get_transcript, recut_job): _ensure_local() fallback fetch from S3
- delete_job(): cascade delete to S3 (mirror unlink)
- cleanup.py: NEW module, deletes local files >48h that exist in S3.
  Verified by S3 head_object + size match. NOT YET ACTIVATED in cron.

Backward compat: lokalna mapa ostane primary. Brez env vars S3_* vsi
helperji vrnejo False (no-op). Production behavior identičen, dokler
ne dobi S3 creds.
This commit is contained in:
Claude 2026-05-03 12:24:18 +00:00
parent ec1d109e3b
commit 0d72d70f5d
2 changed files with 318 additions and 13 deletions

167
app/cleanup.py Normal file
View File

@ -0,0 +1,167 @@
"""
Cleanup module: removes local files older than N hours, *only if they exist in S3*.
Safe by design:
- Never deletes a local file unless its S3 mirror is verified
- Never deletes job metadata (jobs/*.json) those are tiny
- Default age threshold is conservative (48 h)
- Dry-run mode for verification
Usage (cron):
python3 -m app.cleanup --apply # actual delete
python3 -m app.cleanup --dry-run # preview only (default)
python3 -m app.cleanup --apply --hours 72 # custom age threshold
Suggested cron (every night 03:30):
30 3 * * * cd /app && python3 -m app.cleanup --apply >> /data/cleanup.log 2>&1
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
import time
from pathlib import Path
# Konfiguracija — ujema main.py
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
UPLOAD_DIR = DATA_DIR / "uploads"
OUTPUT_DIR = DATA_DIR / "outputs"
DEFAULT_AGE_HOURS = 48
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [cleanup] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)
def _file_age_hours(p: Path) -> float:
"""Return file age in hours based on mtime."""
try:
return (time.time() - p.stat().st_mtime) / 3600
except OSError:
return 0.0
def _scan_dir(d: Path, kind: str, min_age_h: float):
"""Yield (local_path, kind, age_hours) for files older than threshold."""
if not d.exists():
return
for p in d.iterdir():
if not p.is_file():
continue
# Skip very small files (probably config/state) — only target real workfiles
if p.stat().st_size < 1024 * 100: # <100 KB
continue
age = _file_age_hours(p)
if age >= min_age_h:
yield p, kind, age
def cleanup(min_age_hours: float, apply: bool) -> dict:
"""Run cleanup pass. Returns stats dict.
Logic:
For each file older than min_age_hours in uploads/ and outputs/:
- Verify S3 mirror exists (s3.exists())
- If verified: delete local
- If not verified: skip (warn) never delete unverified
"""
from app import s3_storage
if not s3_storage.is_enabled():
log.error("S3 not configured — refusing to run cleanup. Aborting.")
return {"error": "s3_not_configured"}
stats = {
"scanned": 0,
"would_delete": 0,
"deleted": 0,
"skipped_no_s3_mirror": 0,
"freed_mb": 0.0,
"errors": 0,
}
deleted_files = []
skipped_files = []
for d, kind in [(UPLOAD_DIR, "upload"), (OUTPUT_DIR, "output")]:
for p, _, age_h in _scan_dir(d, kind, min_age_hours):
stats["scanned"] += 1
size_mb = p.stat().st_size / 1024 / 1024
folder = "uploads" if kind == "upload" else "outputs"
s3_key = f"{folder}/{p.name}"
# Verify S3 mirror
try:
s3_size = s3_storage.get_object_size(s3_key)
except Exception as e:
log.warning("S3 check failed for %s: %s", s3_key, e)
stats["errors"] += 1
continue
if s3_size is None:
log.warning("SKIP — no S3 mirror: %s (age %.1fh, %.1f MB)",
p.name, age_h, size_mb)
stats["skipped_no_s3_mirror"] += 1
skipped_files.append(p.name)
continue
# Check size match (sanity)
local_size = p.stat().st_size
if abs(s3_size - local_size) > 1024: # >1 KB delta — suspicious
log.warning("SKIP — size mismatch %s: local=%d s3=%d",
p.name, local_size, s3_size)
stats["skipped_no_s3_mirror"] += 1
skipped_files.append(p.name)
continue
stats["would_delete"] += 1
stats["freed_mb"] += size_mb
if apply:
try:
p.unlink()
stats["deleted"] += 1
deleted_files.append(p.name)
log.info("DEL %s (%.1f MB, age %.1fh, S3 verified)",
p.name, size_mb, age_h)
except OSError as e:
log.error("Delete failed %s: %s", p, e)
stats["errors"] += 1
else:
log.info("DRY %s (%.1f MB, age %.1fh, S3 verified)",
p.name, size_mb, age_h)
log.info("=" * 60)
log.info("Cleanup pass: %s", "APPLY" if apply else "DRY-RUN")
log.info(" scanned: %d", stats["scanned"])
log.info(" would-delete: %d", stats["would_delete"])
log.info(" deleted: %d", stats["deleted"])
log.info(" skipped (no S3): %d", stats["skipped_no_s3_mirror"])
log.info(" errors: %d", stats["errors"])
log.info(" freed: %.1f MB", stats["freed_mb"])
return stats
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--apply", action="store_true",
help="Actually delete files (default is dry-run)")
ap.add_argument("--dry-run", action="store_true",
help="Preview only, do not delete (default)")
ap.add_argument("--hours", type=float, default=DEFAULT_AGE_HOURS,
help=f"Min file age in hours (default {DEFAULT_AGE_HOURS})")
args = ap.parse_args()
apply = args.apply and not args.dry_run
stats = cleanup(min_age_hours=args.hours, apply=apply)
if stats.get("error"):
sys.exit(2)
if __name__ == "__main__":
main()

View File

@ -19,6 +19,7 @@ import os
import secrets
import shutil
import subprocess
import threading
import time
import uuid
from pathlib import Path
@ -55,6 +56,75 @@ QNET_DIR.mkdir(parents=True, exist_ok=True)
os.environ.setdefault("QNET_LOOKUP_PATH", str(QNET_DIR / "songs_lookup.json"))
from app import qnet_match # noqa: E402
# S3 storage mirror — uploads/outputs/jobs gredo tudi v s3://folxspeed/reels-app/
# Lokalna mapa ostane primary, S3 je replica/cache.
from app import s3_storage # noqa: E402
def _persist_to_s3(local_path, kind: str) -> bool:
"""Mirror local file to S3 after producing it (best-effort).
kind: 'upload' for uploads/, 'output' for outputs/, 'job_meta' for jobs/
Silent no-op when S3 not configured. Never raises.
"""
try:
if not s3_storage.is_enabled():
return False
p = Path(local_path)
if not p.exists() or p.stat().st_size == 0:
return False
return s3_storage.upload_job_file("", kind, p)
except Exception as e:
print(f"⚠️ S3 mirror failed for {local_path}: {e}", flush=True)
return False
def _ensure_local(local_path, kind: str) -> bool:
"""Make sure file is on disk; if missing, fetch from S3.
Returns True if file is ready locally after the call.
kind: 'upload' or 'output'
"""
p = Path(local_path)
if p.exists() and p.stat().st_size > 0:
return True
if not s3_storage.is_enabled():
return False
folder = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"}.get(kind, kind)
key = f"{folder}/{p.name}"
print(f"📥 Local missing, fetching from S3: {key}", flush=True)
return s3_storage.download(key, p)
def _delete_from_s3(filename: str, kind: str) -> bool:
"""Delete object from S3 (mirror local delete). Best-effort, no raise."""
try:
if not s3_storage.is_enabled():
return False
folder = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"}.get(kind, kind)
return s3_storage.delete(f"{folder}/{filename}")
except Exception:
return False
def _ffmpeg_then_persist(cmd, out_path, kind: str = "output", timeout: int = 600):
"""Run ffmpeg in background thread, then mirror result to S3.
Drop-in replacement for subprocess.Popen() when we want S3 sync after.
"""
def runner():
try:
subprocess.run(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=timeout,
)
_persist_to_s3(out_path, kind)
except Exception as e:
print(f"⚠️ ffmpeg/persist failed for {out_path}: {e}", flush=True)
threading.Thread(target=runner, daemon=True).start()
# Dedup DB — sledi že obdelanim/naloženim komadom
DEDUP_DB = DATA_DIR / "processed.db"
@ -346,7 +416,9 @@ def load_job(job_id):
def save_job(job):
job_path(job["id"]).write_text(json.dumps(job, ensure_ascii=False, indent=2))
p = job_path(job["id"])
p.write_text(json.dumps(job, ensure_ascii=False, indent=2))
_persist_to_s3(p, "job_meta")
def update_job(job_id, **kwargs):
@ -593,8 +665,8 @@ def _precache_edit_assets(job_id: str, src_path: str):
"-loglevel", "error",
str(low_path),
]
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print(f"📦 Pre-cache low-q source za {job_id} (background)", flush=True)
_ffmpeg_then_persist(cmd, low_path, kind="output")
print(f"📦 Pre-cache low-q source za {job_id} (background → S3)", flush=True)
# Waveform PNG (2400x72 — za zoom)
wave_path = OUTPUT_DIR / f"{job_id}_waveform_2400x72.png"
@ -611,8 +683,8 @@ def _precache_edit_assets(job_id: str, src_path: str):
"-loglevel", "error",
str(wave_path),
]
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print(f"📦 Pre-cache waveform za {job_id} (background)", flush=True)
_ffmpeg_then_persist(cmd, wave_path, kind="output")
print(f"📦 Pre-cache waveform za {job_id} (background → S3)", flush=True)
def process_job(job_id):
@ -633,6 +705,11 @@ def process_job(job_id):
if not run_subprocess_logged(cmd, job_id, "YouTube download"):
return
update_job(job_id, input_path=str(input_path))
# S3 mirror — original video + info.json
_persist_to_s3(input_path, "upload")
info_json = input_path.with_suffix(".info.json")
if info_json.exists():
_persist_to_s3(info_json, "upload")
# Probaj dobiti YT metadata (če še ni iz submit-a) — title, uploader, id, ...
# Single video submit ali playlist resolve že nastavi metadata, ampak
@ -926,6 +1003,12 @@ def process_job(job_id):
output_path=str(output_path),
output_size_mb=round(output_path.stat().st_size / 1024 / 1024, 2),
)
# S3 mirror — final reel + pomožne datoteke (analysis, subtitles)
_persist_to_s3(output_path, "output")
for suffix in (".analysis.json", ".subtitles.srt", ".subtitles.ass"):
aux = OUTPUT_DIR / f"{job_id}{suffix}"
if aux.exists():
_persist_to_s3(aux, "output")
# Telegram obvestilo
try:
from app.telegram import notify_job_done
@ -1375,6 +1458,8 @@ async def upload_video(
job["has_clean_name"] = bool(a and t)
save_job(job)
# S3 mirror — direct upload datoteka
_persist_to_s3(input_path, "upload")
return job
@ -1683,6 +1768,7 @@ async def download(job_id: str, user: str = Depends(check_auth)):
if not job or job.get("status") != "done":
raise HTTPException(404, "Ne pripravljen")
out = Path(job["output_path"])
_ensure_local(out, "output")
if not out.exists():
raise HTTPException(404, "Output ne obstaja")
@ -1703,6 +1789,7 @@ async def preview(job_id: str, request: Request, user: str = Depends(check_auth)
if not job or job.get("status") != "done":
raise HTTPException(404, "Ne pripravljen")
out = Path(job["output_path"])
_ensure_local(out, "output")
if not out.exists():
raise HTTPException(404, "Output ne obstaja")
@ -1762,11 +1849,41 @@ async def delete_job(job_id: str, user: str = Depends(check_auth)):
job = load_job(job_id)
if not job:
raise HTTPException(404, "Ne obstaja")
for key in ("input_path", "output_path"):
# Glavni input + output (po job records)
for key, kind in (("input_path", "upload"), ("output_path", "output")):
p = job.get(key)
if p and Path(p).exists():
Path(p).unlink(missing_ok=True)
job_path(job_id).unlink(missing_ok=True)
if p:
local_p = Path(p)
local_p.unlink(missing_ok=True)
_delete_from_s3(local_p.name, kind)
# Pomožne datoteke v outputs/ (analysis, subtitles, low-q, waveform)
for fname in (
f"{job_id}.mp4",
f"{job_id}.analysis.json",
f"{job_id}.subtitles.srt",
f"{job_id}.subtitles.ass",
f"{job_id}_source_low.mp4",
):
f = OUTPUT_DIR / fname
f.unlink(missing_ok=True)
_delete_from_s3(fname, "output")
# Waveform PNG-ji (več velikosti) — listanje ker imena niso fiksna
try:
for wf in OUTPUT_DIR.glob(f"{job_id}_waveform_*.png"):
wf_name = wf.name
wf.unlink(missing_ok=True)
_delete_from_s3(wf_name, "output")
except Exception:
pass
# YT info.json
info_json = UPLOAD_DIR / f"{job_id}_yt.info.json"
if info_json.exists():
info_json.unlink(missing_ok=True)
_delete_from_s3(f"{job_id}_yt.info.json", "upload")
# Job metadata
jp = job_path(job_id)
jp.unlink(missing_ok=True)
_delete_from_s3(f"{job_id}.json", "job_meta")
return {"deleted": job_id}
@ -1782,12 +1899,17 @@ async def source_video(job_id: str, quality: str = "high", user: str = Depends(c
if not job:
raise HTTPException(404, "Ne obstaja")
src = job.get("input_path")
if not src or not Path(src).exists():
if not src:
raise HTTPException(404, "Original video ne obstaja")
_ensure_local(src, "upload")
if not Path(src).exists():
raise HTTPException(404, "Original video ne obstaja")
if quality == "low":
# 480p cached za hitro scrubbanje
cache_path = OUTPUT_DIR / f"{job_id}_source_low.mp4"
# Najprej probaj fetch iz S3 (po cleanupu lahko manjka lokalno)
_ensure_local(cache_path, "output")
cache_valid = cache_path.exists() and cache_path.stat().st_size > 1024
if not cache_valid:
@ -1812,6 +1934,8 @@ async def source_video(job_id: str, quality: str = "high", user: str = Depends(c
if cache_path.exists():
cache_path.unlink()
raise HTTPException(500, f"FFmpeg failed: {(proc.stderr or 'unknown')[-300:]}")
# Mirror v S3 po regeneraciji
_persist_to_s3(cache_path, "output")
except subprocess.TimeoutExpired:
if cache_path.exists():
cache_path.unlink()
@ -1842,13 +1966,17 @@ async def waveform(job_id: str, width: int = 1200, height: int = 80, user: str =
if not job:
raise HTTPException(404, "Ne obstaja")
src = job.get("input_path")
if not src or not Path(src).exists():
if not src:
raise HTTPException(404, "Original video ne obstaja")
_ensure_local(src, "upload")
if not Path(src).exists():
raise HTTPException(404, "Original video ne obstaja")
width = max(400, min(width, 3000))
height = max(40, min(height, 200))
cache_path = OUTPUT_DIR / f"{job_id}_waveform_{width}x{height}.png"
_ensure_local(cache_path, "output")
cache_valid = cache_path.exists() and cache_path.stat().st_size > 100
if not cache_valid:
@ -1872,6 +2000,8 @@ async def waveform(job_id: str, width: int = 1200, height: int = 80, user: str =
if cache_path.exists():
cache_path.unlink()
raise HTTPException(500, f"Waveform render failed: {(proc.stderr or 'unknown')[-300:]}")
# Mirror v S3 po regeneraciji
_persist_to_s3(cache_path, "output")
except subprocess.TimeoutExpired:
if cache_path.exists():
cache_path.unlink()
@ -1903,7 +2033,10 @@ async def preview_clip(
if not job:
raise HTTPException(404, "Ne obstaja")
src = job.get("input_path")
if not src or not Path(src).exists():
if not src:
raise HTTPException(404, "Original video ne obstaja")
_ensure_local(src, "upload")
if not Path(src).exists():
raise HTTPException(404, "Original video ne obstaja")
if end <= start:
@ -1978,6 +2111,7 @@ async def get_transcript(job_id: str, user: str = Depends(check_auth)):
if not job:
raise HTTPException(404, "Ne obstaja")
analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json"
_ensure_local(analysis_path, "output")
if not analysis_path.exists():
raise HTTPException(404, "Analysis ne obstaja")
try:
@ -2217,7 +2351,10 @@ async def recut_job(job_id: str, payload: RecutRequest, user: str = Depends(chec
raise HTTPException(404, "Ne obstaja")
src = job.get("input_path")
if not src or not Path(src).exists():
if not src:
raise HTTPException(400, "Original video manjka")
_ensure_local(src, "upload")
if not Path(src).exists():
raise HTTPException(400, "Original video manjka")
if payload.end <= payload.start:
@ -2232,6 +2369,7 @@ async def recut_job(job_id: str, payload: RecutRequest, user: str = Depends(chec
# Naloži obstoječi analysis
analysis_path = OUTPUT_DIR / f"{job_id}.analysis.json"
_ensure_local(analysis_path, "output")
if not analysis_path.exists():
raise HTTPException(500, "Analysis manjka — re-uplad pesmi")