489 lines
18 KiB
Python
489 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
reframe.py — Pretvori 16:9 video v 9:16 (reels/shorts/tiktok format).
|
|
|
|
Modi:
|
|
--mode track : Pametno sledi obrazu/osebi (MediaPipe face detection)
|
|
Crop okno se gladko premika za subjektom.
|
|
--mode center : Statični center crop (najhitrejše)
|
|
--mode blur : 9:16 platno z blur ozadjem + 16:9 video v sredini
|
|
|
|
Primer:
|
|
python3 reframe.py input.mp4 output.mp4 --mode track
|
|
python3 reframe.py input.mp4 output.mp4 --mode track --start 10 --duration 30
|
|
"""
|
|
import argparse
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
import json
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
def get_video_info(path):
|
|
"""Vrni dict z width, height, fps, duration.
|
|
|
|
width/height so SQUARE-PIXEL dimenzije (popravljeno za anamorphic SAR).
|
|
Pri broadcast 720x576 SAR 64:45 (PAL DV) bo vrnil 1024x576 (square pixel).
|
|
Pri standardnem source z SAR 1:1 ostane nespremenjeno.
|
|
"""
|
|
cmd = [
|
|
"ffprobe", "-v", "quiet", "-print_format", "json",
|
|
"-show_streams", "-show_format", str(path)
|
|
]
|
|
data = json.loads(subprocess.check_output(cmd))
|
|
vstream = next(s for s in data["streams"] if s["codec_type"] == "video")
|
|
fps_str = vstream["r_frame_rate"]
|
|
num, den = fps_str.split("/")
|
|
fps = float(num) / float(den)
|
|
|
|
raw_w = int(vstream["width"])
|
|
raw_h = int(vstream["height"])
|
|
|
|
# SAR (sample aspect ratio) — razmerje stranic pikslja
|
|
sar = vstream.get("sample_aspect_ratio", "1:1")
|
|
try:
|
|
sar_n, sar_d = sar.split(":")
|
|
sar_n, sar_d = int(sar_n), int(sar_d)
|
|
if sar_n == 0 or sar_d == 0:
|
|
sar_n, sar_d = 1, 1
|
|
except (ValueError, AttributeError):
|
|
sar_n, sar_d = 1, 1
|
|
|
|
# Square-pixel dimenzije: če SAR != 1:1, popravi širino
|
|
if sar_n != sar_d:
|
|
sq_w = int(round(raw_w * sar_n / sar_d))
|
|
# Zaokroži na sodo (libx264 ima rad sode dimenzije)
|
|
sq_w = sq_w + (sq_w % 2)
|
|
sq_h = raw_h
|
|
print(f"📐 Anamorphic source: {raw_w}x{raw_h} SAR {sar_n}:{sar_d} → {sq_w}x{sq_h} (square pixel)", file=sys.stderr)
|
|
else:
|
|
sq_w = raw_w
|
|
sq_h = raw_h
|
|
|
|
return {
|
|
"width": sq_w,
|
|
"height": sq_h,
|
|
"raw_width": raw_w,
|
|
"raw_height": raw_h,
|
|
"sar_n": sar_n,
|
|
"sar_d": sar_d,
|
|
"fps": fps,
|
|
"duration": float(data["format"]["duration"]),
|
|
}
|
|
|
|
|
|
def get_audio_streams(path):
|
|
"""Vrni seznam audio streamov z njihovimi metadati.
|
|
|
|
MXF datoteke imajo pogosto 4-8 audio streamov ali en stream z 8 kanali.
|
|
Vrne: [{'index': 1, 'channels': 2, 'codec': 'pcm_s24le', 'language': 'eng'}, ...]
|
|
"""
|
|
cmd = [
|
|
"ffprobe", "-v", "quiet", "-print_format", "json",
|
|
"-show_streams", "-select_streams", "a", str(path)
|
|
]
|
|
try:
|
|
data = json.loads(subprocess.check_output(cmd))
|
|
streams = []
|
|
for s in data.get("streams", []):
|
|
streams.append({
|
|
"index": s.get("index"),
|
|
"channels": int(s.get("channels", 2)),
|
|
"codec": s.get("codec_name", ""),
|
|
"sample_rate": int(s.get("sample_rate", 48000)),
|
|
"language": s.get("tags", {}).get("language", ""),
|
|
"channel_layout": s.get("channel_layout", ""),
|
|
})
|
|
return streams
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def build_audio_args(audio_streams):
|
|
"""Sestavi FFmpeg argumente za audio: izberi pravi stream + downmix v stereo.
|
|
|
|
Strategija:
|
|
- Če je samo 1 stream: pretvori ga v stereo (downmix iz multichannel)
|
|
- Če je več streamov (MXF z več jezikovnimi kanali):
|
|
* preferiraj prvi 2-kanalni stereo stream (običajno glavna audio mix)
|
|
* sicer prvi stream
|
|
- Vedno: output 2 channels @ 48kHz, codec AAC 192k (boljša kvaliteta za broadcast)
|
|
"""
|
|
if not audio_streams:
|
|
# Brez audia: prazen output
|
|
return ["-an"]
|
|
|
|
# Najdi najboljši stream
|
|
chosen = None
|
|
# 1. Stereo (2-kanalni) ima prednost
|
|
for s in audio_streams:
|
|
if s["channels"] == 2:
|
|
chosen = s
|
|
break
|
|
# 2. Sicer prvi stream
|
|
if chosen is None:
|
|
chosen = audio_streams[0]
|
|
|
|
args = [
|
|
"-map", f"0:{chosen['index']}", # samo izbrani stream
|
|
"-ac", "2", # downmix v stereo (če je multichannel)
|
|
"-ar", "48000",
|
|
"-c:a", "aac",
|
|
"-b:a", "192k", # 192k = boljša kvaliteta za glasbo (prej 128k)
|
|
]
|
|
return args
|
|
|
|
|
|
def detect_face_centers(video_path, sample_fps=5):
|
|
"""
|
|
Vzorči video pri sample_fps in vrni seznam (timestamp, x_center_normalized).
|
|
x_center_normalized je 0..1 (0 = levi rob, 1 = desni rob).
|
|
Če obraza ni, vrne None za to vzorčenje.
|
|
|
|
Uporablja OpenCV Haar cascade (frontalface_alt2) — robustno, brez external modela.
|
|
"""
|
|
cap = cv2.VideoCapture(str(video_path))
|
|
src_fps = cap.get(cv2.CAP_PROP_FPS)
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
step = max(1, int(src_fps / sample_fps))
|
|
|
|
cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_alt2.xml"
|
|
face_cascade = cv2.CascadeClassifier(cascade_path)
|
|
|
|
samples = []
|
|
frame_idx = 0
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
if frame_idx % step == 0:
|
|
ts = frame_idx / src_fps
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
faces = face_cascade.detectMultiScale(
|
|
gray, scaleFactor=1.1, minNeighbors=3, minSize=(40, 40)
|
|
)
|
|
if len(faces) > 0:
|
|
# Vzemi največji obraz
|
|
x, y, w, h = max(faces, key=lambda f: f[2] * f[3])
|
|
x_center = (x + w / 2) / width
|
|
samples.append((ts, x_center))
|
|
else:
|
|
samples.append((ts, None))
|
|
frame_idx += 1
|
|
|
|
cap.release()
|
|
return samples, width, height, src_fps, total_frames
|
|
|
|
|
|
def smooth_track(samples, total_duration, smoothing_window=2.0):
|
|
"""
|
|
Iz seznama (ts, x) naredi gladko krivuljo x(t) za vsako sekundo videa.
|
|
- None vrednosti se zapolni z zadnjo znano (ali 0.5 default).
|
|
- Drsno povprečje preko smoothing_window sekund.
|
|
"""
|
|
# Zapolni manjkajoče
|
|
last = 0.5
|
|
filled = []
|
|
for ts, x in samples:
|
|
if x is None:
|
|
x = last
|
|
else:
|
|
last = x
|
|
filled.append((ts, x))
|
|
|
|
if not filled:
|
|
return lambda t: 0.5
|
|
|
|
# Drsno povprečje
|
|
timestamps = np.array([t for t, _ in filled])
|
|
values = np.array([v for _, v in filled])
|
|
|
|
smoothed = np.zeros_like(values)
|
|
for i, t in enumerate(timestamps):
|
|
mask = np.abs(timestamps - t) <= smoothing_window / 2
|
|
smoothed[i] = np.mean(values[mask])
|
|
|
|
def x_at(t):
|
|
if t <= timestamps[0]:
|
|
return float(smoothed[0])
|
|
if t >= timestamps[-1]:
|
|
return float(smoothed[-1])
|
|
return float(np.interp(t, timestamps, smoothed))
|
|
|
|
return x_at
|
|
|
|
|
|
def build_track_filter(info, x_at, target_w, target_h, fps):
|
|
"""
|
|
Sestavi FFmpeg filter za track mode.
|
|
Generiramo crop expression, ki se premika z x(t).
|
|
Ker FFmpeg ne podpira poljubne funkcije časa, vzorčimo x(t) in
|
|
sestavimo piecewise linearno funkcijo prek `if(...)`.
|
|
|
|
Bolj robustno: pre-scale na ciljno višino, potem crop x = f(t).
|
|
"""
|
|
src_w = info["width"]
|
|
src_h = info["height"]
|
|
|
|
# Najprej scale: višina = target_h, širina proporcionalno
|
|
scale_h = target_h
|
|
scale_w = int(src_w * (target_h / src_h))
|
|
# Po skaliranju je crop širina = target_w
|
|
# x_center v skaliranem prostoru
|
|
max_x = scale_w - target_w # max levo-zgornji x
|
|
|
|
# Vzorčimo x(t) na ~5 fps (dovolj gladko po smoothingu)
|
|
duration = info["duration"]
|
|
# Limit: max 20 vzorcev, ker FFmpeg ima limit na expression dolžino
|
|
# Pri >20 vzorcih FFmpeg crop expression preseže 4096 char limit in zavrže
|
|
n_samples = max(2, min(20, int(duration * 0.7)))
|
|
times = np.linspace(0, duration, n_samples)
|
|
x_centers_norm = [x_at(t) for t in times]
|
|
# Pretvori normaliziran center v dejanski levi-zgornji x v skaliranem oknu
|
|
x_lefts = []
|
|
for xc in x_centers_norm:
|
|
x_left = xc * scale_w - target_w / 2
|
|
x_left = max(0, min(max_x, x_left))
|
|
x_lefts.append(x_left)
|
|
|
|
# n_samples je že omejen na 20, expression bo vedno < 2KB
|
|
|
|
# Linearna interpolacija med vzorci znotraj FFmpeg expression
|
|
# Format: če(t<t_i, lerp(x_{i-1}, x_i, (t-t_{i-1})/(t_i-t_{i-1})), nadaljuj)
|
|
expr = f"{x_lefts[-1]:.1f}"
|
|
for i in range(len(times) - 1, 0, -1):
|
|
t0, t1 = times[i - 1], times[i]
|
|
x0, x1 = x_lefts[i - 1], x_lefts[i]
|
|
# lerp = x0 + (x1-x0)*(t-t0)/(t1-t0)
|
|
if abs(t1 - t0) < 1e-6:
|
|
lerp = f"{x0:.1f}"
|
|
else:
|
|
lerp = f"({x0:.1f}+({x1 - x0:.1f})*(t-{t0:.3f})/{t1 - t0:.3f})"
|
|
expr = f"if(lt(t,{t1:.3f}),{lerp},{expr})"
|
|
|
|
vfilter = (
|
|
f"scale={scale_w}:{scale_h},"
|
|
f"crop={target_w}:{target_h}:'{expr}':0"
|
|
)
|
|
return vfilter
|
|
|
|
|
|
def build_center_filter(info, target_w, target_h):
|
|
src_w = info["width"]
|
|
src_h = info["height"]
|
|
scale_h = target_h
|
|
scale_w = int(src_w * (target_h / src_h))
|
|
return f"scale={scale_w}:{scale_h},crop={target_w}:{target_h}:(in_w-{target_w})/2:0"
|
|
|
|
|
|
def build_blur_filter(info, target_w, target_h, anamorphic_prefix=""):
|
|
"""
|
|
9:16 platno: spodaj/zgoraj blur kopija, v sredini originalni 16:9.
|
|
"""
|
|
# Originalna širina v 9:16 platnu = target_w, višina proporcionalno
|
|
src_w = info["width"]
|
|
src_h = info["height"]
|
|
fg_h = int(target_w * src_h / src_w)
|
|
pre = (anamorphic_prefix + ",") if anamorphic_prefix else ""
|
|
return (
|
|
f"[0:v]{pre}scale={target_w}:{target_h}:force_original_aspect_ratio=increase,"
|
|
f"crop={target_w}:{target_h},gblur=sigma=30[bg];"
|
|
f"[0:v]{pre}scale={target_w}:{fg_h}[fg];"
|
|
f"[bg][fg]overlay=0:(H-h)/2"
|
|
)
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("input")
|
|
ap.add_argument("output")
|
|
ap.add_argument("--mode", choices=["track", "center", "blur"], default="track")
|
|
ap.add_argument("--target-width", type=int, default=1080)
|
|
ap.add_argument("--target-height", type=int, default=1920)
|
|
ap.add_argument("--start", type=float, default=None, help="Začetek (s)")
|
|
ap.add_argument("--duration", type=float, default=None, help="Trajanje (s)")
|
|
ap.add_argument("--fade-in", type=float, default=0.0, help="Audio fade in (s)")
|
|
ap.add_argument("--fade-out", type=float, default=0.0, help="Audio fade out (s)")
|
|
ap.add_argument("--quality", default="medium", choices=["fast", "medium", "high"])
|
|
args = ap.parse_args()
|
|
|
|
src = Path(args.input)
|
|
dst = Path(args.output)
|
|
if not src.exists():
|
|
print(f"❌ Vhod ne obstaja: {src}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Če imamo --start/--duration, najprej trim z FFmpeg v temp file (hitreje)
|
|
work_input = src
|
|
tmp = None
|
|
|
|
# Probe audio streams iz originala
|
|
audio_streams = get_audio_streams(src)
|
|
src_ext = src.suffix.lower()
|
|
is_broadcast = src_ext in (".mxf", ".mpg", ".mpeg", ".ts", ".m2ts", ".mts")
|
|
has_complex_audio = (
|
|
len(audio_streams) > 1 or
|
|
(audio_streams and audio_streams[0].get("channels", 2) > 2) or
|
|
is_broadcast
|
|
)
|
|
|
|
if has_complex_audio:
|
|
print(f"🎚 Broadcast format ({src_ext}) — {len(audio_streams)} audio stream(s):", file=sys.stderr)
|
|
for s in audio_streams:
|
|
print(f" #{s['index']}: {s['codec']} {s['channels']}ch "
|
|
f"{s.get('channel_layout', '')} lang={s.get('language', '?')}", file=sys.stderr)
|
|
|
|
if args.start is not None or args.duration is not None:
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
|
|
tmp.close()
|
|
cmd = ["ffmpeg", "-y"]
|
|
if args.start is not None:
|
|
cmd += ["-ss", str(args.start)]
|
|
cmd += ["-i", str(src)]
|
|
if args.duration is not None:
|
|
cmd += ["-t", str(args.duration)]
|
|
|
|
if has_complex_audio:
|
|
# Broadcast format ali multichannel: ne kopiraj, transkodiraj v stereo MP4
|
|
audio_args = build_audio_args(audio_streams)
|
|
cmd += [
|
|
"-map", "0:v:0", # samo prvi video stream
|
|
"-c:v", "libx264", "-preset", "veryfast", "-crf", "20",
|
|
"-pix_fmt", "yuv420p", # web/mobile compat (broadcast .mpg = yuv422p)
|
|
]
|
|
cmd += audio_args
|
|
else:
|
|
# MP4/MOV s standardnim audiom — stream copy je OK in hitrejši
|
|
cmd += ["-c", "copy"]
|
|
cmd += [tmp.name]
|
|
|
|
print(f"🔧 TRIM CMD: {' '.join(cmd)}", file=sys.stderr)
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"❌ TRIM FAILED: {result.stderr[-1000:]}", file=sys.stderr)
|
|
sys.exit(1)
|
|
work_input = Path(tmp.name)
|
|
print(f"✂ Trim → {work_input}")
|
|
verify = subprocess.run(
|
|
["ffprobe", "-v", "error", "-show_entries", "format=duration",
|
|
"-of", "default=nw=1:nokey=1", str(work_input)],
|
|
capture_output=True, text=True
|
|
)
|
|
print(f"🔍 TRIMMED FILE DURATION: {verify.stdout.strip()}s (expected ~{args.duration}s)", file=sys.stderr)
|
|
elif has_complex_audio:
|
|
# Brez trim-a, ampak MXF/MPG → še vedno transkodiraj v MP4 z 2-channel audio
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
|
|
tmp.close()
|
|
audio_args = build_audio_args(audio_streams)
|
|
cmd = ["ffmpeg", "-y", "-i", str(src),
|
|
"-map", "0:v:0",
|
|
"-c:v", "libx264", "-preset", "veryfast", "-crf", "20",
|
|
"-pix_fmt", "yuv420p"] # web/mobile compat
|
|
cmd += audio_args + [tmp.name]
|
|
print(f"🔧 PRE-CONVERT CMD: {' '.join(cmd)}", file=sys.stderr)
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"❌ PRE-CONVERT FAILED: {result.stderr[-1000:]}", file=sys.stderr)
|
|
sys.exit(1)
|
|
work_input = Path(tmp.name)
|
|
print(f"🔄 Pre-converted → {work_input}")
|
|
|
|
info = get_video_info(work_input)
|
|
print(f"📹 Vhod: {info['width']}x{info['height']} @ {info['fps']:.2f}fps, {info['duration']:.1f}s")
|
|
|
|
# Anamorphic correction prefix: če je SAR != 1:1, najprej scale-amo na square-pixel
|
|
# in resetiramo SAR=1, šele potem track/crop filter dela na pravilnih dimenzijah.
|
|
# Brez tega: 720x576 SAR 64:45 (DAR 16:9) PAL DV se pri scale na 1920 razteguje
|
|
# nepravilno (ker filter misli da je 720 širina, dejansko prikazana širina je 1024).
|
|
if info.get("sar_n", 1) != info.get("sar_d", 1):
|
|
anamorphic_prefix = f"scale={info['width']}:{info['height']}:flags=lanczos,setsar=1,"
|
|
print(f"🔧 Anamorphic prefix: {anamorphic_prefix.rstrip(',')}", file=sys.stderr)
|
|
else:
|
|
anamorphic_prefix = ""
|
|
|
|
if args.mode == "track":
|
|
print("🔍 Detektiram obraze (OpenCV)...")
|
|
samples, _, _, _, _ = detect_face_centers(work_input, sample_fps=5)
|
|
n_with_face = sum(1 for _, x in samples if x is not None)
|
|
print(f" {n_with_face}/{len(samples)} vzorcev z obrazom")
|
|
x_at = smooth_track(samples, info["duration"], smoothing_window=4.0)
|
|
vfilter = anamorphic_prefix + build_track_filter(info, x_at, args.target_width, args.target_height, info["fps"])
|
|
elif args.mode == "center":
|
|
vfilter = anamorphic_prefix + build_center_filter(info, args.target_width, args.target_height)
|
|
elif args.mode == "blur":
|
|
# blur uporablja filter_complex z [0:v] referenco — anamorphic prefix gre v posebni veji
|
|
vfilter = build_blur_filter(info, args.target_width, args.target_height,
|
|
anamorphic_prefix=anamorphic_prefix.rstrip(","))
|
|
|
|
# KONČNI setsar=1: zagotovi 1:1 piksel v output-u (kompenzira morebitne rounding errore
|
|
# iz scale/crop filtrov, ki lahko dajo SAR npr. 10240:10239)
|
|
if args.mode != "blur":
|
|
vfilter = vfilter + ",setsar=1"
|
|
|
|
preset = {"fast": "veryfast", "medium": "medium", "high": "slow"}[args.quality]
|
|
crf = {"fast": "26", "medium": "21", "high": "18"}[args.quality]
|
|
|
|
# Audio fade filter (afade)
|
|
audio_filter = []
|
|
if args.fade_in > 0:
|
|
audio_filter.append(f"afade=t=in:st=0:d={args.fade_in}")
|
|
if args.fade_out > 0:
|
|
clip_dur = info["duration"]
|
|
fade_start = max(0, clip_dur - args.fade_out)
|
|
audio_filter.append(f"afade=t=out:st={fade_start}:d={args.fade_out}")
|
|
audio_filter_str = ",".join(audio_filter) if audio_filter else None
|
|
|
|
if args.mode == "blur":
|
|
# blur uporablja filter_complex
|
|
cmd = [
|
|
"ffmpeg", "-y", "-i", str(work_input),
|
|
"-filter_complex", vfilter,
|
|
"-c:v", "libx264", "-preset", preset, "-crf", crf,
|
|
"-pix_fmt", "yuv420p", # web/mobile compat (Instagram/FB/web players)
|
|
"-map", "0:a:0?", # samo prvi audio stream (če obstaja)
|
|
"-ac", "2", # force stereo
|
|
"-c:a", "aac", "-b:a", "192k",
|
|
]
|
|
if audio_filter_str:
|
|
cmd += ["-af", audio_filter_str]
|
|
cmd += ["-movflags", "+faststart", str(dst)]
|
|
else:
|
|
cmd = [
|
|
"ffmpeg", "-y", "-i", str(work_input),
|
|
"-vf", vfilter,
|
|
"-c:v", "libx264", "-preset", preset, "-crf", crf,
|
|
"-pix_fmt", "yuv420p", # web/mobile compat (Instagram/FB/web players)
|
|
"-map", "0:v:0", "-map", "0:a:0?", # video + samo prvi audio
|
|
"-ac", "2", # force stereo
|
|
"-c:a", "aac", "-b:a", "192k",
|
|
]
|
|
if audio_filter_str:
|
|
cmd += ["-af", audio_filter_str]
|
|
cmd += ["-movflags", "+faststart", str(dst)]
|
|
|
|
print(f"🎬 Render ({args.mode})...")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print("❌ FFmpeg napaka:", file=sys.stderr)
|
|
print(result.stderr[-2000:], file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if tmp:
|
|
os.unlink(tmp.name)
|
|
|
|
out_info = get_video_info(dst)
|
|
out_size = dst.stat().st_size / 1024 / 1024
|
|
print(f"✅ {dst} — {out_info['width']}x{out_info['height']}, {out_size:.1f} MB")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|