S3 storage module: boto3 abstraction for reels-app workfiles (uploads/outputs/jobs prefixes)
This commit is contained in:
parent
48bf0cf050
commit
ec1d109e3b
296
app/s3_storage.py
Normal file
296
app/s3_storage.py
Normal file
@ -0,0 +1,296 @@
|
||||
"""
|
||||
S3 storage abstraction for reels-app workfiles.
|
||||
|
||||
Bucket layout:
|
||||
s3://folxspeed/reels-app/
|
||||
uploads/{job_id}_yt.mp4
|
||||
uploads/{job_id}_yt.info.json
|
||||
outputs/{job_id}.mp4
|
||||
outputs/{job_id}_source_low.mp4
|
||||
outputs/{job_id}.analysis.json
|
||||
outputs/{job_id}.subtitles.srt
|
||||
outputs/{job_id}.subtitles.ass
|
||||
outputs/{job_id}_waveform_2400x72.png
|
||||
jobs/{job_id}.json
|
||||
|
||||
REELS/ prefix (final published reels for cron pickup) is NOT touched here.
|
||||
|
||||
ENV (set via Coolify):
|
||||
S3_ENDPOINT = https://fsn1.your-objectstorage.com
|
||||
S3_BUCKET = folxspeed
|
||||
S3_ACCESS_KEY = ...
|
||||
S3_SECRET_KEY = ...
|
||||
S3_PREFIX = reels-app/ (trailing slash required)
|
||||
S3_REGION = fsn1 (optional, default fsn1)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Optional
|
||||
|
||||
import boto3
|
||||
from botocore.client import Config
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config from environment
|
||||
# ---------------------------------------------------------------------------
|
||||
S3_ENDPOINT = os.getenv("S3_ENDPOINT", "https://fsn1.your-objectstorage.com")
|
||||
S3_BUCKET = os.getenv("S3_BUCKET", "folxspeed")
|
||||
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY", "")
|
||||
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY", "")
|
||||
S3_PREFIX = os.getenv("S3_PREFIX", "reels-app/")
|
||||
S3_REGION = os.getenv("S3_REGION", "fsn1")
|
||||
|
||||
# Make sure prefix ends with slash
|
||||
if S3_PREFIX and not S3_PREFIX.endswith("/"):
|
||||
S3_PREFIX += "/"
|
||||
|
||||
# Feature flag — if creds missing, all calls become no-ops (safe fallback during rollout)
|
||||
S3_ENABLED = bool(S3_ACCESS_KEY and S3_SECRET_KEY)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lazy client
|
||||
# ---------------------------------------------------------------------------
|
||||
_client = None
|
||||
|
||||
|
||||
def get_client():
|
||||
"""Return a cached boto3 S3 client, or None if S3 is not configured."""
|
||||
global _client
|
||||
if not S3_ENABLED:
|
||||
return None
|
||||
if _client is None:
|
||||
_client = boto3.client(
|
||||
"s3",
|
||||
endpoint_url=S3_ENDPOINT,
|
||||
aws_access_key_id=S3_ACCESS_KEY,
|
||||
aws_secret_access_key=S3_SECRET_KEY,
|
||||
region_name=S3_REGION,
|
||||
config=Config(
|
||||
signature_version="s3v4",
|
||||
retries={"max_attempts": 3, "mode": "standard"},
|
||||
connect_timeout=10,
|
||||
read_timeout=120,
|
||||
),
|
||||
)
|
||||
return _client
|
||||
|
||||
|
||||
def _full_key(key: str) -> str:
|
||||
"""Prepend S3_PREFIX if not already present."""
|
||||
key = key.lstrip("/")
|
||||
if S3_PREFIX and not key.startswith(S3_PREFIX):
|
||||
return S3_PREFIX + key
|
||||
return key
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
def is_enabled() -> bool:
|
||||
"""True if S3 credentials are present and client is usable."""
|
||||
return S3_ENABLED
|
||||
|
||||
|
||||
def upload(local_path: str | Path, key: str, content_type: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Upload local file to s3://{bucket}/{prefix}{key}.
|
||||
Returns True on success, False on failure (logs error, does not raise).
|
||||
"""
|
||||
client = get_client()
|
||||
if client is None:
|
||||
log.warning("S3 not configured, skipping upload of %s", key)
|
||||
return False
|
||||
|
||||
local = Path(local_path)
|
||||
if not local.exists():
|
||||
log.error("S3 upload: local file does not exist: %s", local)
|
||||
return False
|
||||
|
||||
full_key = _full_key(key)
|
||||
extra: dict = {}
|
||||
if content_type:
|
||||
extra["ContentType"] = content_type
|
||||
|
||||
try:
|
||||
client.upload_file(str(local), S3_BUCKET, full_key, ExtraArgs=extra or None)
|
||||
log.info("S3 upload OK: %s -> s3://%s/%s (%d bytes)",
|
||||
local.name, S3_BUCKET, full_key, local.stat().st_size)
|
||||
return True
|
||||
except ClientError as e:
|
||||
log.error("S3 upload FAILED %s: %s", full_key, e)
|
||||
return False
|
||||
except Exception as e:
|
||||
log.error("S3 upload UNEXPECTED ERROR %s: %s", full_key, e)
|
||||
return False
|
||||
|
||||
|
||||
def download(key: str, local_path: str | Path) -> bool:
|
||||
"""
|
||||
Download s3://{bucket}/{prefix}{key} to local_path.
|
||||
Creates parent directories if needed.
|
||||
Returns True on success.
|
||||
"""
|
||||
client = get_client()
|
||||
if client is None:
|
||||
log.warning("S3 not configured, cannot download %s", key)
|
||||
return False
|
||||
|
||||
local = Path(local_path)
|
||||
local.parent.mkdir(parents=True, exist_ok=True)
|
||||
full_key = _full_key(key)
|
||||
|
||||
try:
|
||||
client.download_file(S3_BUCKET, full_key, str(local))
|
||||
log.info("S3 download OK: s3://%s/%s -> %s", S3_BUCKET, full_key, local)
|
||||
return True
|
||||
except ClientError as e:
|
||||
code = e.response.get("Error", {}).get("Code", "")
|
||||
if code in ("NoSuchKey", "404"):
|
||||
log.info("S3 download: key not found: %s", full_key)
|
||||
else:
|
||||
log.error("S3 download FAILED %s: %s", full_key, e)
|
||||
return False
|
||||
except Exception as e:
|
||||
log.error("S3 download UNEXPECTED ERROR %s: %s", full_key, e)
|
||||
return False
|
||||
|
||||
|
||||
def exists(key: str) -> bool:
|
||||
"""Return True if s3://{bucket}/{prefix}{key} exists."""
|
||||
client = get_client()
|
||||
if client is None:
|
||||
return False
|
||||
|
||||
full_key = _full_key(key)
|
||||
try:
|
||||
client.head_object(Bucket=S3_BUCKET, Key=full_key)
|
||||
return True
|
||||
except ClientError as e:
|
||||
code = e.response.get("Error", {}).get("Code", "")
|
||||
if code in ("404", "NoSuchKey", "NotFound"):
|
||||
return False
|
||||
log.error("S3 exists check FAILED %s: %s", full_key, e)
|
||||
return False
|
||||
|
||||
|
||||
def delete(key: str) -> bool:
|
||||
"""Delete a single object. Returns True if the call succeeded."""
|
||||
client = get_client()
|
||||
if client is None:
|
||||
return False
|
||||
|
||||
full_key = _full_key(key)
|
||||
try:
|
||||
client.delete_object(Bucket=S3_BUCKET, Key=full_key)
|
||||
log.info("S3 delete OK: %s", full_key)
|
||||
return True
|
||||
except ClientError as e:
|
||||
log.error("S3 delete FAILED %s: %s", full_key, e)
|
||||
return False
|
||||
|
||||
|
||||
def presigned_url(key: str, expires_in: int = 3600, method: str = "get_object") -> Optional[str]:
|
||||
"""
|
||||
Generate a presigned URL for reading (default) or writing.
|
||||
method: 'get_object' or 'put_object'
|
||||
"""
|
||||
client = get_client()
|
||||
if client is None:
|
||||
return None
|
||||
|
||||
full_key = _full_key(key)
|
||||
try:
|
||||
url = client.generate_presigned_url(
|
||||
method,
|
||||
Params={"Bucket": S3_BUCKET, "Key": full_key},
|
||||
ExpiresIn=expires_in,
|
||||
)
|
||||
return url
|
||||
except ClientError as e:
|
||||
log.error("S3 presigned URL FAILED %s: %s", full_key, e)
|
||||
return None
|
||||
|
||||
|
||||
def list_keys(prefix: str = "", max_keys: int = 1000) -> Iterator[str]:
|
||||
"""
|
||||
Iterator over keys under S3_PREFIX + prefix.
|
||||
Returns keys WITHOUT S3_PREFIX (relative).
|
||||
"""
|
||||
client = get_client()
|
||||
if client is None:
|
||||
return
|
||||
|
||||
full_prefix = _full_key(prefix)
|
||||
paginator = client.get_paginator("list_objects_v2")
|
||||
try:
|
||||
for page in paginator.paginate(
|
||||
Bucket=S3_BUCKET, Prefix=full_prefix, PaginationConfig={"PageSize": max_keys}
|
||||
):
|
||||
for obj in page.get("Contents", []):
|
||||
full_key = obj["Key"]
|
||||
# Strip prefix to return relative key
|
||||
if S3_PREFIX and full_key.startswith(S3_PREFIX):
|
||||
yield full_key[len(S3_PREFIX):]
|
||||
else:
|
||||
yield full_key
|
||||
except ClientError as e:
|
||||
log.error("S3 list FAILED prefix=%s: %s", full_prefix, e)
|
||||
|
||||
|
||||
def get_object_size(key: str) -> Optional[int]:
|
||||
"""Return size in bytes, or None if missing/error."""
|
||||
client = get_client()
|
||||
if client is None:
|
||||
return None
|
||||
|
||||
full_key = _full_key(key)
|
||||
try:
|
||||
resp = client.head_object(Bucket=S3_BUCKET, Key=full_key)
|
||||
return resp.get("ContentLength")
|
||||
except ClientError:
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Convenience helpers tailored to reels-app file layout
|
||||
# ---------------------------------------------------------------------------
|
||||
def upload_job_file(job_id: str, kind: str, local_path: str | Path) -> bool:
|
||||
"""
|
||||
Upload a workfile by kind:
|
||||
'upload' -> uploads/{filename}
|
||||
'output' -> outputs/{filename}
|
||||
'job_meta' -> jobs/{filename}
|
||||
Filename comes from local_path.name.
|
||||
"""
|
||||
local = Path(local_path)
|
||||
folder_map = {"upload": "uploads", "output": "outputs", "job_meta": "jobs"}
|
||||
folder = folder_map.get(kind)
|
||||
if folder is None:
|
||||
log.error("upload_job_file: unknown kind %r", kind)
|
||||
return False
|
||||
return upload(local, f"{folder}/{local.name}")
|
||||
|
||||
|
||||
def download_upload(filename: str, local_path: str | Path) -> bool:
|
||||
"""Download from uploads/ prefix."""
|
||||
return download(f"uploads/{filename}", local_path)
|
||||
|
||||
|
||||
def download_output(filename: str, local_path: str | Path) -> bool:
|
||||
"""Download from outputs/ prefix."""
|
||||
return download(f"outputs/{filename}", local_path)
|
||||
|
||||
|
||||
def upload_exists(filename: str) -> bool:
|
||||
return exists(f"uploads/{filename}")
|
||||
|
||||
|
||||
def output_exists(filename: str) -> bool:
|
||||
return exists(f"outputs/{filename}")
|
||||
@ -8,3 +8,4 @@ numpy==1.26.4
|
||||
yt-dlp>=2025.10.0
|
||||
pyacrcloud==1.0.11
|
||||
requests==2.32.3
|
||||
boto3==1.35.49
|
||||
|
||||
Loading…
Reference in New Issue
Block a user