Qnet song match — fetcha Songs.txt iz 5 MB playerjev (FOLX DE/SLO, ZWEI, ONE, ADRIA), 20K+ songs, fuzzy match na upload-u → clean parsed_artist/parsed_title + auto tv_station. /api/qnet/{stats,match,sync}

This commit is contained in:
OpenClaw Agent 2026-05-02 10:42:35 +00:00
parent 6f79aaea8d
commit b938d1e4d8
4 changed files with 513 additions and 9 deletions

View File

@ -30,7 +30,7 @@ COPY templates/ ./templates/
RUN mkdir -p ./static
# Data direktorij (Coolify bo prek Persistent Storage UI bind-al volume)
RUN mkdir -p /data/uploads /data/outputs /data/jobs /data/cookies
RUN mkdir -p /data/uploads /data/outputs /data/jobs /data/cookies /data/qnet
ENV DATA_DIR=/data
ENV PYTHONUNBUFFERED=1

View File

@ -43,11 +43,17 @@ DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
UPLOAD_DIR = DATA_DIR / "uploads"
OUTPUT_DIR = DATA_DIR / "outputs"
JOBS_DIR = DATA_DIR / "jobs"
QNET_DIR = DATA_DIR / "qnet"
SCRIPTS_DIR = Path(__file__).parent.parent / "scripts"
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
JOBS_DIR.mkdir(parents=True, exist_ok=True)
QNET_DIR.mkdir(parents=True, exist_ok=True)
# Qnet song match — povezava z MB player bazami
os.environ.setdefault("QNET_LOOKUP_PATH", str(QNET_DIR / "songs_lookup.json"))
from app import qnet_match # noqa: E402
# Dedup DB — sledi že obdelanim/naloženim komadom
DEDUP_DB = DATA_DIR / "processed.db"
@ -1280,14 +1286,32 @@ async def upload_video(
if batch_id:
job["batch_id"] = batch_id
# Artist + title — najprej user-provided, potem parse iz filename
# Artist + title — najprej user-provided, potem Qnet match, potem parse iz filename
if artist and title:
# User je vpisal ali potrdil
job["parsed_artist"] = artist.strip()
job["parsed_title"] = title.strip()
job["has_clean_name"] = True
else:
# Auto parse iz filename
# 1) Probaj match proti Qnet bazi (clean Artist+Title direct iz playlistov)
qm = qnet_match.match_filename(file.filename)
if qm["matched"] and qm["confidence"] >= 0.85:
job["parsed_artist"] = qm["artist"]
job["parsed_title"] = qm["title"]
job["has_clean_name"] = True
# Če station ni že nastavljen ročno, vzemi iz Qnet match-a
if not job.get("tv_station"):
job["tv_station"] = qm["station"]
job["qnet_match"] = {
"method": qm["method"],
"confidence": qm["confidence"],
"matched_file": qm["file"],
"matched_station": qm["station"],
}
print(f"🎯 Qnet match [{qm['method']}, {qm['confidence']}]: "
f"{qm['station']}{qm['artist']}{qm['title']}", flush=True)
else:
# 2) Fallback: filename parser (regex-based)
a, t = parse_artist_title(file.filename)
if a:
job["parsed_artist"] = a
@ -1733,6 +1757,53 @@ async def get_transcript(job_id: str, user: str = Depends(check_auth)):
raise HTTPException(500, f"Napaka pri branju: {e}")
# ─── Qnet song match (MB player baza) ────────────────────────────
@app.get("/api/qnet/stats")
async def qnet_stats(user: str = Depends(check_auth)):
"""Statistika Qnet baze (koliko songov, koliko star, po postajah)."""
return qnet_match.db_stats()
@app.get("/api/qnet/match")
async def qnet_match_filename(filename: str, user: str = Depends(check_auth)):
"""Test endpoint — vrne match result za poljuben filename."""
if not filename:
raise HTTPException(400, "filename query param required")
return qnet_match.match_filename(filename)
@app.post("/api/qnet/sync")
async def qnet_sync(background: BackgroundTasks, user: str = Depends(check_auth)):
"""Sproži sync (fetch Songs.txt iz vseh playerjev). Async background task."""
sync_script = SCRIPTS_DIR / "sync_qnet.py"
if not sync_script.exists():
raise HTTPException(500, f"sync_qnet.py ne obstaja v {SCRIPTS_DIR}")
def run_sync():
try:
import subprocess
env = os.environ.copy()
env["QNET_DB_PATH"] = str(QNET_DIR / "songs.json")
env["QNET_LOOKUP_PATH"] = str(QNET_DIR / "songs_lookup.json")
proc = subprocess.run(
["python3", str(sync_script)],
env=env,
capture_output=True,
text=True,
timeout=300,
)
print(f"[qnet sync] exit={proc.returncode}", flush=True)
if proc.stdout:
print(f"[qnet sync] stdout:\n{proc.stdout}", flush=True)
if proc.stderr:
print(f"[qnet sync] stderr:\n{proc.stderr}", flush=True)
except Exception as e:
print(f"[qnet sync] error: {e}", flush=True)
background.add_task(run_sync)
return {"started": True, "lookup_path": str(QNET_DIR / "songs_lookup.json")}
class RecutRequest(BaseModel):
start: float
end: float

247
app/qnet_match.py Normal file
View File

@ -0,0 +1,247 @@
"""
Qnet match modul iz uploaded filename najde matching song v Qnet bazah.
Vrne (artist, title, station, confidence, matched_file).
Strategija:
1. **Exact filename match** (case-insensitive, with normalized punctuation)
stem only, brez extension. Confidence: 1.0.
2. **Normalized basename match** odstrani noise besede (Official, HD, 4K,
letnice, številke v oklepajih), normaliziraj presledke, primerjaj.
Confidence: 0.9.
3. **Artist+Title fuzzy match** najprej parsa filename z obstoječim
parse_artist_title, potem išče v bazi po normalized (artist|title)
bigram-keywords. Confidence: 0.60.85 odvisno od ratio.
4. Če ni zadetka None.
Vrne lookup result dict:
{
"matched": bool,
"method": "exact" | "normalized" | "fuzzy" | None,
"confidence": float, # 0.01.0
"artist": str,
"title": str,
"station": str,
"file": str,
}
"""
import json
import re
import os
import time
import unicodedata
from pathlib import Path
from typing import Optional
from difflib import SequenceMatcher
LOOKUP_PATH = Path(os.environ.get("QNET_LOOKUP_PATH", "/data/qnet/songs_lookup.json"))
# In-memory cache — re-load if file is newer than cache
_cache: dict = {"mtime": 0.0, "songs": [], "by_norm_file": {}, "by_norm_stem": {}}
def _norm(s: str) -> str:
"""Aggressive normalize: lowercase, strip diacritics, kill punctuation."""
if not s:
return ""
s = unicodedata.normalize("NFKD", s)
s = "".join(c for c in s if not unicodedata.combining(c))
s = s.lower()
# Replace en-dash / em-dash / various separators with hyphen
s = s.replace("", "-").replace("", "-").replace("|", "-")
# Drop everything except alnum and space
s = re.sub(r"[^a-z0-9\s]+", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
# Noise words pogosto v "Official Video" / YouTube ipd.
_NOISE_WORDS = {
"official", "officiel", "video", "musikvideo", "musicvideo", "music",
"lyric", "lyrics", "audio", "hd", "4k", "8k", "uhd", "live", "remix",
"remaster", "remastered", "version", "ver", "extended", "edit",
"videoclip", "clip", "premiere", "premiera", "tv", "final", "promo",
"1080p", "720p", "2160p",
}
def _norm_minus_noise(s: str) -> str:
"""Norm + remove noise words + drop standalone digits 2/3/4-digit (years/versions)."""
n = _norm(s)
toks = []
for t in n.split():
if t in _NOISE_WORDS:
continue
if t.isdigit() and len(t) <= 4:
continue
toks.append(t)
return " ".join(toks)
def _load_db():
"""Re-load lookup file if changed. Uses mtime check."""
global _cache
try:
mtime = LOOKUP_PATH.stat().st_mtime
except FileNotFoundError:
return False
if mtime <= _cache["mtime"]:
return True
songs = json.loads(LOOKUP_PATH.read_text(encoding="utf-8"))
by_norm_file = {} # normalized whole filename → song
by_norm_stem = {} # normalized stem (no ext) → song
by_norm_minus = {} # normalized minus noise → song
for s in songs:
f = s.get("file") or ""
if not f:
continue
nf = _norm(f)
ns = _norm(Path(f).stem)
nm = _norm_minus_noise(Path(f).stem)
# Prvi zadetek wins (po prvi station ki ima ta file — kasneje lahko
# menjamo na "all stations match")
by_norm_file.setdefault(nf, s)
by_norm_stem.setdefault(ns, s)
if nm:
by_norm_minus.setdefault(nm, s)
_cache = {
"mtime": mtime,
"songs": songs,
"by_norm_file": by_norm_file,
"by_norm_stem": by_norm_stem,
"by_norm_minus": by_norm_minus,
}
print(f"[qnet_match] loaded {len(songs)} songs from {LOOKUP_PATH} (mtime={mtime})", flush=True)
return True
def _result(song: dict, method: str, confidence: float) -> dict:
return {
"matched": True,
"method": method,
"confidence": round(confidence, 3),
"artist": song.get("artist", ""),
"title": song.get("title", ""),
"station": song.get("station", ""),
"file": song.get("file", ""),
}
def _no_match() -> dict:
return {
"matched": False,
"method": None,
"confidence": 0.0,
"artist": "",
"title": "",
"station": "",
"file": "",
}
def match_filename(filename: str) -> dict:
"""Najdi match za uploaded filename. Glavna API funkcija."""
if not filename:
return _no_match()
if not _load_db():
return _no_match() # baza ni naložena
stem = Path(filename).stem
nf_full = _norm(filename)
nf_stem = _norm(stem)
nm_stem = _norm_minus_noise(stem)
# 1) Exact normalized match z extensionom
if nf_full in _cache["by_norm_file"]:
return _result(_cache["by_norm_file"][nf_full], "exact", 1.0)
# 2) Stem match (brez extension)
if nf_stem in _cache["by_norm_stem"]:
return _result(_cache["by_norm_stem"][nf_stem], "exact_stem", 0.95)
# 3) Stem match brez noise
if nm_stem and nm_stem in _cache["by_norm_minus"]:
return _result(_cache["by_norm_minus"][nm_stem], "normalized", 0.9)
# 4) Fuzzy substring — če se input vsebuje v song.file ali obratno (samo če dovolj dolgo)
# Najprej probaj parse "Artist - Title" iz filename
parsed_a, parsed_t = _parse_artist_title_simple(stem)
if parsed_a and parsed_t:
cand = _fuzzy_artist_title(parsed_a, parsed_t)
if cand:
return cand
# 5) Last-resort fuzzy — sequence ratio na stem proti vsem stem-om (slow but bounded)
# Samo če stem >= 8 znakov, da ne dobimo random match-ev
if len(nm_stem) >= 8:
best = None
best_ratio = 0.0
for cand_stem, song in _cache["by_norm_minus"].items():
ratio = SequenceMatcher(None, nm_stem, cand_stem).ratio()
if ratio > best_ratio:
best_ratio = ratio
best = song
if best and best_ratio >= 0.85:
return _result(best, "fuzzy", best_ratio * 0.85)
return _no_match()
def _parse_artist_title_simple(name: str):
"""Lightweight parser za 'Artist - Title' / 'Artist Title'."""
if not name:
return (None, None)
# Probaj najprej en/em-dash (najbolj distinctive), potem hyphen
for sep in [" ", "", " - ", "_-_"]:
if sep in name:
parts = name.split(sep, 1)
if len(parts) == 2:
a = parts[0].strip()
t = parts[1].strip()
if a and t and len(a) <= 80 and len(t) <= 100:
return (a, t)
return (None, None)
def _fuzzy_artist_title(artist: str, title: str) -> Optional[dict]:
"""Najdi pesem v bazi po normalized artist+title fuzzy match."""
na = _norm_minus_noise(artist)
nt = _norm_minus_noise(title)
if not na or not nt:
return None
best = None
best_score = 0.0
for song in _cache["songs"]:
sa = _norm_minus_noise(song.get("artist", ""))
st = _norm_minus_noise(song.get("title", ""))
if not sa or not st:
continue
ar = SequenceMatcher(None, na, sa).ratio()
tr = SequenceMatcher(None, nt, st).ratio()
score = (ar + tr) / 2
if score > best_score:
best_score = score
best = song
if best and best_score >= 0.82:
return _result(best, "fuzzy_at", best_score * 0.9)
return None
def db_stats() -> dict:
"""Vrne statistiko baze za health check / admin endpoint."""
_load_db()
if not _cache["songs"]:
return {"loaded": False, "total": 0, "stations": {}}
by_station = {}
for s in _cache["songs"]:
st = s.get("station", "?")
by_station[st] = by_station.get(st, 0) + 1
return {
"loaded": True,
"total": len(_cache["songs"]),
"stations": by_station,
"mtime": _cache["mtime"],
"age_seconds": time.time() - _cache["mtime"],
}

186
scripts/sync_qnet.py Normal file
View File

@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""
Qnet baz fetcher za reels-app.
Fetcha Songs.txt iz 5 Qnet instalacij na MB playerjih (preko ssh-api proxy-ja
na openclaw SSH na Windows playerje), pretvori iz Windows-1252 v UTF-8,
parsa TSV in shrani enotno JSON bazo v /data/qnet/songs.json.
Cron-friendly: poženi enkrat na uro.
Output struktura:
{
"synced_at": 1746198000.0,
"stations": {
"FOLX DE": {"count": 4038, "fetched_at": 1746198000.0},
...
},
"songs": [
{
"station": "FOLX DE",
"artist": "Sašo Avsenik und seine Oberkrainer",
"title": "Na Golici",
"file": "Sašo Avsenik und seine Oberkrainer - Na Golici.mp4",
"type": "DGL",
"length": "2:32.277",
"comments": "",
"last_played": "17/4/2026"
},
...
]
}
"""
import csv
import io
import json
import os
import sys
import time
import base64
import requests
from pathlib import Path
SSH_API = os.environ.get("PTC_SSH_API", "https://mail.folx.tv/ssh-api/v2")
SSH_TOKEN = os.environ.get("PTC_SSH_TOKEN") or "ptc-ssh-2026-a7b3c9d4e5f6012389abcdef01234567890abcdef01234567890abcdef012345"
OUT_PATH = Path(os.environ.get("QNET_DB_PATH", "/data/qnet/songs.json"))
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
# (station_label, player_ip, qnet_subdir_on_C)
STATIONS = [
("FOLX DE", "100.64.0.2", "qnet"),
("ZWEI", "100.64.0.2", "qnetzwei"),
("ONE", "100.64.0.3", "QnetONE"),
("ADRIA", "100.64.0.4", "Qnet"),
("FOLX SLO", "100.64.0.4", "QnetFOLXSLO"),
]
SSH_KEY = "/root/.ssh/players/folx_players"
def ssh_exec(cmd: str, timeout: int = 60) -> dict:
"""Pošlji ukaz preko ssh-api na openclaw."""
r = requests.post(
SSH_API,
headers={
"Authorization": f"Bearer {SSH_TOKEN}",
"Content-Type": "application/json",
},
json={"host": "openclaw", "cmd": cmd, "timeout": timeout},
timeout=timeout + 30,
)
r.raise_for_status()
return r.json()
def fetch_one(station: str, ip: str, subdir: str) -> str:
"""Fetcha Songs.txt z windows playerja, vrne UTF-8 string."""
# 1) scp z playerja na openclaw, iconv v utf8, base64 nazaj
cmd = (
f"set -e; "
f"TMP=$(mktemp); "
f"scp -i {SSH_KEY} -o StrictHostKeyChecking=no "
f'"folxadmin@{ip}:c:/{subdir}/Data/Songs.txt" "$TMP"; '
f'iconv -f WINDOWS-1252 -t UTF-8 "$TMP" | base64 -w 0; '
f'rm -f "$TMP"'
)
res = ssh_exec(cmd, timeout=90)
if res.get("exit_code") != 0:
raise RuntimeError(f"{station}: ssh-api error: {res}")
b64 = res.get("output", "").strip()
if not b64:
raise RuntimeError(f"{station}: empty response")
return base64.b64decode(b64).decode("utf-8", errors="replace")
def parse_songs_tsv(text: str, station: str) -> list[dict]:
"""Parse TSV → list of clean dicts. Drop incomplete rows."""
out = []
reader = csv.DictReader(io.StringIO(text), delimiter="\t")
for row in reader:
artist = (row.get("Artist") or "").strip()
title = (row.get("Title") or "").strip()
file_ = (row.get("File") or "").strip()
# Skip popolnoma prazne vrstice
if not (artist or title or file_):
continue
out.append({
"station": station,
"artist": artist,
"title": title,
"file": file_,
"type": (row.get("Type") or "").strip(),
"length": (row.get("Length") or "").strip(),
"comments": (row.get("Comments") or "").strip(),
"language": (row.get("Language") or "").strip(),
"genre": (row.get("Genre") or "").strip(),
"last_played": (row.get("Last date played") or "").strip(),
"display_artist": (row.get("Display artist") or "").strip(),
"display_title": (row.get("Display title") or "").strip(),
})
return out
def main():
t0 = time.time()
all_songs = []
stations_meta = {}
errors = []
for station, ip, subdir in STATIONS:
try:
print(f"{station} ({ip}:c:/{subdir}/Data/Songs.txt)", flush=True)
text = fetch_one(station, ip, subdir)
songs = parse_songs_tsv(text, station)
all_songs.extend(songs)
stations_meta[station] = {
"count": len(songs),
"fetched_at": time.time(),
"ok": True,
}
print(f"{len(songs)} songov", flush=True)
except Exception as e:
err = f"{station}: {type(e).__name__}: {e}"
print(f"{err}", flush=True)
errors.append(err)
stations_meta[station] = {"count": 0, "ok": False, "error": str(e)}
# Zapiši na disk (atomic preko temp + rename)
payload = {
"synced_at": time.time(),
"duration_seconds": round(time.time() - t0, 1),
"total_songs": len(all_songs),
"stations": stations_meta,
"errors": errors,
"songs": all_songs,
}
tmp = OUT_PATH.with_suffix(".json.tmp")
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(OUT_PATH)
# Tudi ločen "lookup index" — manjši fajl samo za matching
lookup = []
for s in all_songs:
if s["artist"] and s["title"]:
lookup.append({
"station": s["station"],
"artist": s["artist"],
"title": s["title"],
"file": s["file"],
})
lookup_path = OUT_PATH.parent / "songs_lookup.json"
tmp2 = lookup_path.with_suffix(".json.tmp")
tmp2.write_text(json.dumps(lookup, ensure_ascii=False), encoding="utf-8")
tmp2.replace(lookup_path)
print(f"\n✓ Done: {len(all_songs)} songov v {OUT_PATH} ({round(time.time()-t0,1)}s)")
if errors:
print(f"{len(errors)} napak:")
for e in errors:
print(f" - {e}")
sys.exit(1 if len(errors) == len(STATIONS) else 0)
if __name__ == "__main__":
main()