diff --git a/app/s3_storage.py b/app/s3_storage.py new file mode 100644 index 0000000..6a58218 --- /dev/null +++ b/app/s3_storage.py @@ -0,0 +1,296 @@ +""" +S3 storage abstraction for reels-app workfiles. + +Bucket layout: + s3://folxspeed/reels-app/ + uploads/{job_id}_yt.mp4 + uploads/{job_id}_yt.info.json + outputs/{job_id}.mp4 + outputs/{job_id}_source_low.mp4 + outputs/{job_id}.analysis.json + outputs/{job_id}.subtitles.srt + outputs/{job_id}.subtitles.ass + outputs/{job_id}_waveform_2400x72.png + jobs/{job_id}.json + +REELS/ prefix (final published reels for cron pickup) is NOT touched here. + +ENV (set via Coolify): + S3_ENDPOINT = https://fsn1.your-objectstorage.com + S3_BUCKET = folxspeed + S3_ACCESS_KEY = ... + S3_SECRET_KEY = ... + S3_PREFIX = reels-app/ (trailing slash required) + S3_REGION = fsn1 (optional, default fsn1) +""" +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import Iterator, Optional + +import boto3 +from botocore.client import Config +from botocore.exceptions import ClientError + +log = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Config from environment +# --------------------------------------------------------------------------- +S3_ENDPOINT = os.getenv("S3_ENDPOINT", "https://fsn1.your-objectstorage.com") +S3_BUCKET = os.getenv("S3_BUCKET", "folxspeed") +S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY", "") +S3_SECRET_KEY = os.getenv("S3_SECRET_KEY", "") +S3_PREFIX = os.getenv("S3_PREFIX", "reels-app/") +S3_REGION = os.getenv("S3_REGION", "fsn1") + +# Make sure prefix ends with slash +if S3_PREFIX and not S3_PREFIX.endswith("/"): + S3_PREFIX += "/" + +# Feature flag — if creds missing, all calls become no-ops (safe fallback during rollout) +S3_ENABLED = bool(S3_ACCESS_KEY and S3_SECRET_KEY) + + +# --------------------------------------------------------------------------- +# Lazy client +# --------------------------------------------------------------------------- +_client = None + + +def get_client(): + """Return a cached boto3 S3 client, or None if S3 is not configured.""" + global _client + if not S3_ENABLED: + return None + if _client is None: + _client = boto3.client( + "s3", + endpoint_url=S3_ENDPOINT, + aws_access_key_id=S3_ACCESS_KEY, + aws_secret_access_key=S3_SECRET_KEY, + region_name=S3_REGION, + config=Config( + signature_version="s3v4", + retries={"max_attempts": 3, "mode": "standard"}, + connect_timeout=10, + read_timeout=120, + ), + ) + return _client + + +def _full_key(key: str) -> str: + """Prepend S3_PREFIX if not already present.""" + key = key.lstrip("/") + if S3_PREFIX and not key.startswith(S3_PREFIX): + return S3_PREFIX + key + return key + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- +def is_enabled() -> bool: + """True if S3 credentials are present and client is usable.""" + return S3_ENABLED + + +def upload(local_path: str | Path, key: str, content_type: Optional[str] = None) -> bool: + """ + Upload local file to s3://{bucket}/{prefix}{key}. + Returns True on success, False on failure (logs error, does not raise). + """ + client = get_client() + if client is None: + log.warning("S3 not configured, skipping upload of %s", key) + return False + + local = Path(local_path) + if not local.exists(): + log.error("S3 upload: local file does not exist: %s", local) + return False + + full_key = _full_key(key) + extra: dict = {} + if content_type: + extra["ContentType"] = content_type + + try: + client.upload_file(str(local), S3_BUCKET, full_key, ExtraArgs=extra or None) + log.info("S3 upload OK: %s -> s3://%s/%s (%d bytes)", + local.name, S3_BUCKET, full_key, local.stat().st_size) + return True + except ClientError as e: + log.error("S3 upload FAILED %s: %s", full_key, e) + return False + except Exception as e: + log.error("S3 upload UNEXPECTED ERROR %s: %s", full_key, e) + return False + + +def download(key: str, local_path: str | Path) -> bool: + """ + Download s3://{bucket}/{prefix}{key} to local_path. + Creates parent directories if needed. + Returns True on success. + """ + client = get_client() + if client is None: + log.warning("S3 not configured, cannot download %s", key) + return False + + local = Path(local_path) + local.parent.mkdir(parents=True, exist_ok=True) + full_key = _full_key(key) + + try: + client.download_file(S3_BUCKET, full_key, str(local)) + log.info("S3 download OK: s3://%s/%s -> %s", S3_BUCKET, full_key, local) + return True + except ClientError as e: + code = e.response.get("Error", {}).get("Code", "") + if code in ("NoSuchKey", "404"): + log.info("S3 download: key not found: %s", full_key) + else: + log.error("S3 download FAILED %s: %s", full_key, e) + return False + except Exception as e: + log.error("S3 download UNEXPECTED ERROR %s: %s", full_key, e) + return False + + +def exists(key: str) -> bool: + """Return True if s3://{bucket}/{prefix}{key} exists.""" + client = get_client() + if client is None: + return False + + full_key = _full_key(key) + try: + client.head_object(Bucket=S3_BUCKET, Key=full_key) + return True + except ClientError as e: + code = e.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return False + log.error("S3 exists check FAILED %s: %s", full_key, e) + return False + + +def delete(key: str) -> bool: + """Delete a single object. Returns True if the call succeeded.""" + client = get_client() + if client is None: + return False + + full_key = _full_key(key) + try: + client.delete_object(Bucket=S3_BUCKET, Key=full_key) + log.info("S3 delete OK: %s", full_key) + return True + except ClientError as e: + log.error("S3 delete FAILED %s: %s", full_key, e) + return False + + +def presigned_url(key: str, expires_in: int = 3600, method: str = "get_object") -> Optional[str]: + """ + Generate a presigned URL for reading (default) or writing. + method: 'get_object' or 'put_object' + """ + client = get_client() + if client is None: + return None + + full_key = _full_key(key) + try: + url = client.generate_presigned_url( + method, + Params={"Bucket": S3_BUCKET, "Key": full_key}, + ExpiresIn=expires_in, + ) + return url + except ClientError as e: + log.error("S3 presigned URL FAILED %s: %s", full_key, e) + return None + + +def list_keys(prefix: str = "", max_keys: int = 1000) -> Iterator[str]: + """ + Iterator over keys under S3_PREFIX + prefix. + Returns keys WITHOUT S3_PREFIX (relative). + """ + client = get_client() + if client is None: + return + + full_prefix = _full_key(prefix) + paginator = client.get_paginator("list_objects_v2") + try: + for page in paginator.paginate( + Bucket=S3_BUCKET, Prefix=full_prefix, PaginationConfig={"PageSize": max_keys} + ): + for obj in page.get("Contents", []): + full_key = obj["Key"] + # Strip prefix to return relative key + if S3_PREFIX and full_key.startswith(S3_PREFIX): + yield full_key[len(S3_PREFIX):] + else: + yield full_key + except ClientError as e: + log.error("S3 list FAILED prefix=%s: %s", full_prefix, e) + + +def get_object_size(key: str) -> Optional[int]: + """Return size in bytes, or None if missing/error.""" + client = get_client() + if client is None: + return None + + full_key = _full_key(key) + try: + resp = client.head_object(Bucket=S3_BUCKET, Key=full_key) + return resp.get("ContentLength") + except ClientError: + return None + + +# --------------------------------------------------------------------------- +# Convenience helpers tailored to reels-app file layout +# --------------------------------------------------------------------------- +def upload_job_file(job_id: str, kind: str, local_path: str | Path) -> bool: + """ + Upload a workfile by kind: + 'upload' -> uploads/{filename} + 'output' -> outputs/{filename} + 'job_meta' -> jobs/{filename} + Filename comes from local_path.name. + """ + local = Path(local_path) + folder_map = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"} + folder = folder_map.get(kind) + if folder is None: + log.error("upload_job_file: unknown kind %r", kind) + return False + return upload(local, f"{folder}/{local.name}") + + +def download_upload(filename: str, local_path: str | Path) -> bool: + """Download from uploads/ prefix.""" + return download(f"uploads/{filename}", local_path) + + +def download_output(filename: str, local_path: str | Path) -> bool: + """Download from outputs/ prefix.""" + return download(f"outputs/{filename}", local_path) + + +def upload_exists(filename: str) -> bool: + return exists(f"uploads/{filename}") + + +def output_exists(filename: str) -> bool: + return exists(f"outputs/{filename}") diff --git a/requirements.txt b/requirements.txt index 43aea6c..70ff12d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ numpy==1.26.4 yt-dlp>=2025.10.0 pyacrcloud==1.0.11 requests==2.32.3 +boto3==1.35.49