reels-app/app/qnet_match.py

248 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Qnet match modul — iz uploaded filename najde matching song v Qnet bazah.
Vrne (artist, title, station, confidence, matched_file).
Strategija:
1. **Exact filename match** (case-insensitive, with normalized punctuation)
— stem only, brez extension. Confidence: 1.0.
2. **Normalized basename match** — odstrani noise besede (Official, HD, 4K,
letnice, številke v oklepajih), normaliziraj presledke, primerjaj.
Confidence: 0.9.
3. **Artist+Title fuzzy match** — najprej parsa filename z obstoječim
parse_artist_title, potem išče v bazi po normalized (artist|title)
bigram-keywords. Confidence: 0.60.85 odvisno od ratio.
4. Če ni zadetka → None.
Vrne lookup result dict:
{
"matched": bool,
"method": "exact" | "normalized" | "fuzzy" | None,
"confidence": float, # 0.01.0
"artist": str,
"title": str,
"station": str,
"file": str,
}
"""
import json
import re
import os
import time
import unicodedata
from pathlib import Path
from typing import Optional
from difflib import SequenceMatcher
LOOKUP_PATH = Path(os.environ.get("QNET_LOOKUP_PATH", "/data/qnet/songs_lookup.json"))
# In-memory cache — re-load if file is newer than cache
_cache: dict = {"mtime": 0.0, "songs": [], "by_norm_file": {}, "by_norm_stem": {}}
def _norm(s: str) -> str:
"""Aggressive normalize: lowercase, strip diacritics, kill punctuation."""
if not s:
return ""
s = unicodedata.normalize("NFKD", s)
s = "".join(c for c in s if not unicodedata.combining(c))
s = s.lower()
# Replace en-dash / em-dash / various separators with hyphen
s = s.replace("", "-").replace("", "-").replace("|", "-")
# Drop everything except alnum and space
s = re.sub(r"[^a-z0-9\s]+", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
# Noise words pogosto v "Official Video" / YouTube ipd.
_NOISE_WORDS = {
"official", "officiel", "video", "musikvideo", "musicvideo", "music",
"lyric", "lyrics", "audio", "hd", "4k", "8k", "uhd", "live", "remix",
"remaster", "remastered", "version", "ver", "extended", "edit",
"videoclip", "clip", "premiere", "premiera", "tv", "final", "promo",
"1080p", "720p", "2160p",
}
def _norm_minus_noise(s: str) -> str:
"""Norm + remove noise words + drop standalone digits 2/3/4-digit (years/versions)."""
n = _norm(s)
toks = []
for t in n.split():
if t in _NOISE_WORDS:
continue
if t.isdigit() and len(t) <= 4:
continue
toks.append(t)
return " ".join(toks)
def _load_db():
"""Re-load lookup file if changed. Uses mtime check."""
global _cache
try:
mtime = LOOKUP_PATH.stat().st_mtime
except FileNotFoundError:
return False
if mtime <= _cache["mtime"]:
return True
songs = json.loads(LOOKUP_PATH.read_text(encoding="utf-8"))
by_norm_file = {} # normalized whole filename → song
by_norm_stem = {} # normalized stem (no ext) → song
by_norm_minus = {} # normalized minus noise → song
for s in songs:
f = s.get("file") or ""
if not f:
continue
nf = _norm(f)
ns = _norm(Path(f).stem)
nm = _norm_minus_noise(Path(f).stem)
# Prvi zadetek wins (po prvi station ki ima ta file — kasneje lahko
# menjamo na "all stations match")
by_norm_file.setdefault(nf, s)
by_norm_stem.setdefault(ns, s)
if nm:
by_norm_minus.setdefault(nm, s)
_cache = {
"mtime": mtime,
"songs": songs,
"by_norm_file": by_norm_file,
"by_norm_stem": by_norm_stem,
"by_norm_minus": by_norm_minus,
}
print(f"[qnet_match] loaded {len(songs)} songs from {LOOKUP_PATH} (mtime={mtime})", flush=True)
return True
def _result(song: dict, method: str, confidence: float) -> dict:
return {
"matched": True,
"method": method,
"confidence": round(confidence, 3),
"artist": song.get("artist", ""),
"title": song.get("title", ""),
"station": song.get("station", ""),
"file": song.get("file", ""),
}
def _no_match() -> dict:
return {
"matched": False,
"method": None,
"confidence": 0.0,
"artist": "",
"title": "",
"station": "",
"file": "",
}
def match_filename(filename: str) -> dict:
"""Najdi match za uploaded filename. Glavna API funkcija."""
if not filename:
return _no_match()
if not _load_db():
return _no_match() # baza ni naložena
stem = Path(filename).stem
nf_full = _norm(filename)
nf_stem = _norm(stem)
nm_stem = _norm_minus_noise(stem)
# 1) Exact normalized match z extensionom
if nf_full in _cache["by_norm_file"]:
return _result(_cache["by_norm_file"][nf_full], "exact", 1.0)
# 2) Stem match (brez extension)
if nf_stem in _cache["by_norm_stem"]:
return _result(_cache["by_norm_stem"][nf_stem], "exact_stem", 0.95)
# 3) Stem match brez noise
if nm_stem and nm_stem in _cache["by_norm_minus"]:
return _result(_cache["by_norm_minus"][nm_stem], "normalized", 0.9)
# 4) Fuzzy substring — če se input vsebuje v song.file ali obratno (samo če dovolj dolgo)
# Najprej probaj parse "Artist - Title" iz filename
parsed_a, parsed_t = _parse_artist_title_simple(stem)
if parsed_a and parsed_t:
cand = _fuzzy_artist_title(parsed_a, parsed_t)
if cand:
return cand
# 5) Last-resort fuzzy — sequence ratio na stem proti vsem stem-om (slow but bounded)
# Samo če stem >= 8 znakov, da ne dobimo random match-ev
if len(nm_stem) >= 8:
best = None
best_ratio = 0.0
for cand_stem, song in _cache["by_norm_minus"].items():
ratio = SequenceMatcher(None, nm_stem, cand_stem).ratio()
if ratio > best_ratio:
best_ratio = ratio
best = song
if best and best_ratio >= 0.85:
return _result(best, "fuzzy", best_ratio * 0.85)
return _no_match()
def _parse_artist_title_simple(name: str):
"""Lightweight parser za 'Artist - Title' / 'Artist Title'."""
if not name:
return (None, None)
# Probaj najprej en/em-dash (najbolj distinctive), potem hyphen
for sep in [" ", "", " - ", "_-_"]:
if sep in name:
parts = name.split(sep, 1)
if len(parts) == 2:
a = parts[0].strip()
t = parts[1].strip()
if a and t and len(a) <= 80 and len(t) <= 100:
return (a, t)
return (None, None)
def _fuzzy_artist_title(artist: str, title: str) -> Optional[dict]:
"""Najdi pesem v bazi po normalized artist+title fuzzy match."""
na = _norm_minus_noise(artist)
nt = _norm_minus_noise(title)
if not na or not nt:
return None
best = None
best_score = 0.0
for song in _cache["songs"]:
sa = _norm_minus_noise(song.get("artist", ""))
st = _norm_minus_noise(song.get("title", ""))
if not sa or not st:
continue
ar = SequenceMatcher(None, na, sa).ratio()
tr = SequenceMatcher(None, nt, st).ratio()
score = (ar + tr) / 2
if score > best_score:
best_score = score
best = song
if best and best_score >= 0.82:
return _result(best, "fuzzy_at", best_score * 0.9)
return None
def db_stats() -> dict:
"""Vrne statistiko baze za health check / admin endpoint."""
_load_db()
if not _cache["songs"]:
return {"loaded": False, "total": 0, "stations": {}}
by_station = {}
for s in _cache["songs"]:
st = s.get("station", "?")
by_station[st] = by_station.get(st, 0) + 1
return {
"loaded": True,
"total": len(_cache["songs"]),
"stations": by_station,
"mtime": _cache["mtime"],
"age_seconds": time.time() - _cache["mtime"],
}