""" S3 storage abstraction for reels-app workfiles. Bucket layout: s3://folxspeed/reels-app/ uploads/{job_id}_yt.mp4 uploads/{job_id}_yt.info.json outputs/{job_id}.mp4 outputs/{job_id}_source_low.mp4 outputs/{job_id}.analysis.json outputs/{job_id}.subtitles.srt outputs/{job_id}.subtitles.ass outputs/{job_id}_waveform_2400x72.png jobs/{job_id}.json REELS/ prefix (final published reels for cron pickup) is NOT touched here. ENV (set via Coolify): S3_ENDPOINT = https://fsn1.your-objectstorage.com S3_BUCKET = folxspeed S3_ACCESS_KEY = ... S3_SECRET_KEY = ... S3_PREFIX = reels-app/ (trailing slash required) S3_REGION = fsn1 (optional, default fsn1) """ from __future__ import annotations import logging import os from pathlib import Path from typing import Iterator, Optional import boto3 from botocore.client import Config from botocore.exceptions import ClientError log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Config from environment # --------------------------------------------------------------------------- S3_ENDPOINT = os.getenv("S3_ENDPOINT", "https://fsn1.your-objectstorage.com") S3_BUCKET = os.getenv("S3_BUCKET", "folxspeed") S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY", "") S3_SECRET_KEY = os.getenv("S3_SECRET_KEY", "") S3_PREFIX = os.getenv("S3_PREFIX", "reels-app/") S3_REGION = os.getenv("S3_REGION", "fsn1") # Make sure prefix ends with slash if S3_PREFIX and not S3_PREFIX.endswith("/"): S3_PREFIX += "/" # Feature flag — if creds missing, all calls become no-ops (safe fallback during rollout) S3_ENABLED = bool(S3_ACCESS_KEY and S3_SECRET_KEY) # --------------------------------------------------------------------------- # Lazy client # --------------------------------------------------------------------------- _client = None def get_client(): """Return a cached boto3 S3 client, or None if S3 is not configured.""" global _client if not S3_ENABLED: return None if _client is None: _client = boto3.client( "s3", endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY, region_name=S3_REGION, config=Config( signature_version="s3v4", retries={"max_attempts": 3, "mode": "standard"}, connect_timeout=10, read_timeout=120, ), ) return _client def _full_key(key: str) -> str: """Prepend S3_PREFIX if not already present.""" key = key.lstrip("/") if S3_PREFIX and not key.startswith(S3_PREFIX): return S3_PREFIX + key return key # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def is_enabled() -> bool: """True if S3 credentials are present and client is usable.""" return S3_ENABLED def upload(local_path: str | Path, key: str, content_type: Optional[str] = None) -> bool: """ Upload local file to s3://{bucket}/{prefix}{key}. Returns True on success, False on failure (logs error, does not raise). """ client = get_client() if client is None: log.warning("S3 not configured, skipping upload of %s", key) return False local = Path(local_path) if not local.exists(): log.error("S3 upload: local file does not exist: %s", local) return False full_key = _full_key(key) extra: dict = {} if content_type: extra["ContentType"] = content_type try: client.upload_file(str(local), S3_BUCKET, full_key, ExtraArgs=extra or None) log.info("S3 upload OK: %s -> s3://%s/%s (%d bytes)", local.name, S3_BUCKET, full_key, local.stat().st_size) return True except ClientError as e: log.error("S3 upload FAILED %s: %s", full_key, e) return False except Exception as e: log.error("S3 upload UNEXPECTED ERROR %s: %s", full_key, e) return False def download(key: str, local_path: str | Path) -> bool: """ Download s3://{bucket}/{prefix}{key} to local_path. Creates parent directories if needed. Returns True on success. """ client = get_client() if client is None: log.warning("S3 not configured, cannot download %s", key) return False local = Path(local_path) local.parent.mkdir(parents=True, exist_ok=True) full_key = _full_key(key) try: client.download_file(S3_BUCKET, full_key, str(local)) log.info("S3 download OK: s3://%s/%s -> %s", S3_BUCKET, full_key, local) return True except ClientError as e: code = e.response.get("Error", {}).get("Code", "") if code in ("NoSuchKey", "404"): log.info("S3 download: key not found: %s", full_key) else: log.error("S3 download FAILED %s: %s", full_key, e) return False except Exception as e: log.error("S3 download UNEXPECTED ERROR %s: %s", full_key, e) return False def exists(key: str) -> bool: """Return True if s3://{bucket}/{prefix}{key} exists.""" client = get_client() if client is None: return False full_key = _full_key(key) try: client.head_object(Bucket=S3_BUCKET, Key=full_key) return True except ClientError as e: code = e.response.get("Error", {}).get("Code", "") if code in ("404", "NoSuchKey", "NotFound"): return False log.error("S3 exists check FAILED %s: %s", full_key, e) return False def delete(key: str) -> bool: """Delete a single object. Returns True if the call succeeded.""" client = get_client() if client is None: return False full_key = _full_key(key) try: client.delete_object(Bucket=S3_BUCKET, Key=full_key) log.info("S3 delete OK: %s", full_key) return True except ClientError as e: log.error("S3 delete FAILED %s: %s", full_key, e) return False def presigned_url(key: str, expires_in: int = 3600, method: str = "get_object") -> Optional[str]: """ Generate a presigned URL for reading (default) or writing. method: 'get_object' or 'put_object' """ client = get_client() if client is None: return None full_key = _full_key(key) try: url = client.generate_presigned_url( method, Params={"Bucket": S3_BUCKET, "Key": full_key}, ExpiresIn=expires_in, ) return url except ClientError as e: log.error("S3 presigned URL FAILED %s: %s", full_key, e) return None def list_keys(prefix: str = "", max_keys: int = 1000) -> Iterator[str]: """ Iterator over keys under S3_PREFIX + prefix. Returns keys WITHOUT S3_PREFIX (relative). """ client = get_client() if client is None: return full_prefix = _full_key(prefix) paginator = client.get_paginator("list_objects_v2") try: for page in paginator.paginate( Bucket=S3_BUCKET, Prefix=full_prefix, PaginationConfig={"PageSize": max_keys} ): for obj in page.get("Contents", []): full_key = obj["Key"] # Strip prefix to return relative key if S3_PREFIX and full_key.startswith(S3_PREFIX): yield full_key[len(S3_PREFIX):] else: yield full_key except ClientError as e: log.error("S3 list FAILED prefix=%s: %s", full_prefix, e) def get_object_size(key: str) -> Optional[int]: """Return size in bytes, or None if missing/error.""" client = get_client() if client is None: return None full_key = _full_key(key) try: resp = client.head_object(Bucket=S3_BUCKET, Key=full_key) return resp.get("ContentLength") except ClientError: return None # --------------------------------------------------------------------------- # Convenience helpers tailored to reels-app file layout # --------------------------------------------------------------------------- def upload_job_file(job_id: str, kind: str, local_path: str | Path) -> bool: """ Upload a workfile by kind: 'upload' -> uploads/{filename} 'output' -> outputs/{filename} 'job_meta' -> jobs/{filename} Filename comes from local_path.name. """ local = Path(local_path) folder_map = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"} folder = folder_map.get(kind) if folder is None: log.error("upload_job_file: unknown kind %r", kind) return False return upload(local, f"{folder}/{local.name}") def download_upload(filename: str, local_path: str | Path) -> bool: """Download from uploads/ prefix.""" return download(f"uploads/{filename}", local_path) def download_output(filename: str, local_path: str | Path) -> bool: """Download from outputs/ prefix.""" return download(f"outputs/{filename}", local_path) def upload_exists(filename: str) -> bool: return exists(f"uploads/{filename}") def output_exists(filename: str) -> bool: return exists(f"outputs/{filename}")