import mysql.connector import re import faiss import csv import os import numpy as np from sentence_transformers import SentenceTransformer, CrossEncoder from datetime import datetime import pytz import unicodedata from slugify import slugify import spacy import logging from dataclasses import dataclass, field from collections import defaultdict from elasticsearch import Elasticsearch # ============================================================ # LOGGING # ============================================================ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S" ) log = logging.getLogger(__name__) # ============================================================ # CONFIG # ============================================================ CACHE_DIR = "/var/www/vhosts/xdeal.gr/httpdocs/scripts/cache_v2" CSV_DIR = "/var/www/vhosts/xdeal.gr/httpdocs/scripts/csv" os.makedirs(CSV_DIR, exist_ok=True) DB_CONFIG = { "host": "localhost", "user": "xdeal_laravel_user", "password": "wke%gno1234567", "database": "xdeal_laravel", "unix_socket": "/var/run/mysqld/mysqld.sock", } # ── Elasticsearch connection ────────────────────────────────── ES_CONFIG = { "hosts": "https://localhost:9200", "basic_auth": ("elastic", "JmhAAYVocNaehYYYqELb"), "verify_certs": False, "ssl_show_warn": False, } ES_CAT_INDEX = "xdeal_categories" ES_WEIGHT = 0.5 # βάρος ES score στον συνδυασμό με FAISS FAISS_WEIGHT = 0.5 # βάρος FAISS score PRODUCT_LIMIT = 100 # 200 για test, None = όλα BATCH_SIZE = 200 ENCODE_BATCH_SIZE = 64 FAISS_MIN = 0.45 TOP_K = 10 FEEDBACK_THRESHOLD = 6.0 HIGH_SCORE = 6.0 MED_SCORE = 2.5 # ── FIX 2: Dynamic threshold — αντικαθιστά το fixed LOW_SCORE_FALLBACK ────── # Υπολογίζεται per-batch από την κατανομή των scores LOW_SCORE_FALLBACK = 3.5 # default fallback αν δεν υπάρχουν αρκετά scores DYNAMIC_THRESH_PERCENTILE = 20 # χρησιμοποιεί το 20ο percentile του batch DYNAMIC_THRESH_MIN = 2.8 # δεν πέφτει κάτω από αυτό ποτέ DYNAMIC_THRESH_MAX = 4.5 # δεν ανεβαίνει πάνω από αυτό ποτέ DYNAMIC_THRESH_MIN_SAMPLES = 10 # χρειάζεται τουλάχιστον 10 scores για dynamic calc # ── UPGRADE 1: Multi-strategy voting ───────────────────────── TOP_N_STRATEGIES = 3 STRATEGY_VOTE_W = [1.0, 0.6, 0.35] # ── UPGRADE 2: Cross-Encoder (blend mode) ──────────────────── CROSS_ENCODER_MODEL = "cross-encoder/mmarco-mMiniLMv2-L12-H384-v1" CROSS_ENCODER_TOP_N = 10 CROSS_ENCODER_WEIGHT = 0.45 # ── UPGRADE 3: Dynamic feedback thresholds ─────────────────── DYN_THRESH_FLOOR = 3.5 DYN_THRESH_CEIL = 7.0 DYN_THRESH_K = 1.5 # ── UPGRADE 4: Semantic Noun Ranker ────────────────────────── SEMANTIC_MIN_SCORE = 0.50 SEMANTIC_INDEX_SUBSET = frozenset({ "cat_products", "cat_core_keywords", "cat_name_singular", "cat_synonyms", "cat_name", }) SEMANTIC_STRATEGY_BOOST: dict[str, float] = { "compound": 1.20, "skip_adj+compound": 1.15, "brand+compound": 1.10, "first_noun": 1.00, "skip_adj": 0.95, "brand_skip": 0.90, "last_before_trigger": 0.85, "spacy_root": 0.88, "cat_name": 0.80, "second_word": 0.75, "longest": 0.70, "last_resort": 0.40, "none": 0.10, } INDEX_WEIGHTS = { "cat_name": 0.2, "cat_path": 1.5, "cat_products": 1.5, "cat_semantic": 1.0, "cat_name_singular": 1.1, "cat_core_keywords": 1.3, "cat_text": 0.8, "cat_synonyms": 1.0, "cat_synonyms_singular": 1.1, } INDEXES = [ ("cat_name", "NAME"), ("cat_path", "PATH"), ("cat_products", "PRODUCTS"), ("cat_semantic", "SEMANTIC"), ("cat_name_singular", "NAME_SING"), ("cat_core_keywords", "CORE"), ("cat_text", "TEXT"), ("cat_synonyms", "SYN"), ("cat_synonyms_singular","SYN_SING"), ] # ============================================================ # CONSTANTS # ============================================================ NOUN_BLACKLIST = frozenset({ "τυπου","τυπος","αποχρωση","αποχρωσεις","οψης", "παχος","χρωμα","σχεδιο","ακτινες","εκ", "βαση","παταρι","συρταρι","συρταρια", "φυλλο","φυλλη","σκελετο","υλικο", "γωνιακος","αναμεικτικη","αναμεικτικος", }) EXCLUDED_PREFIXES = frozenset({ "ηλεκτρικ","επαγγ","παλινδρομ","επαγγελματ", "βενζινοκινητ","πετρελαιοκινητ","πνευματικ", }) CONTEXT_GUARDS = {} CAT_NAME_JUNK = frozenset({ "διαφορα","λοιπα","uncategorized","other","αλλα", "γενικα","προιοντα","products","items","γενικη", "κατηγορια","misc","general","no category","none", "εργαλεια","επιπλα", }) _COLOR_BIGRAMS = [ "σαπιο μηλο","ανοιχτο γκρι","σκουρο γκρι", "σκουρο μπλε","σκουρο καφε","ανοιχτο μπλε","ανοιχτο καφε", ] _COLOR_TERMS = [ "λευκ","μαυρ","κοκκιν","μπλε","πρασιν","κιτριν","γκρι", "καφε","σκουρ","ανοιχτ","ανθρακ","μοκα","σαπι","βελουδ", "χρυσ","ασημ","μπεζ","ροζ","λιλα","τυρκουαζ","φυστικ", "κεραμιδ","πορτοκαλ","μωβ","εκρου","μελ","καραμελ", "σονομα","οξια","καρυδ","δρυ","φυσικ","ξυλ","χρωμα","χρωματ", "beige","black","white","brown","grey","gray","green","blue", "red","gold","silver","natural","oak","walnut","sonoma","μηλο", "βεγκε","wenge","castillo","beton", ] _COLOR_PATTERN = re.compile( r'\b(?:' + '|'.join(re.escape(t) for t in sorted(_COLOR_TERMS, key=len, reverse=True)) + r')[α-ωa-z]*\b', re.IGNORECASE ) _CAT_NAME_SEP = re.compile(r'[>|/→\\,;]+') # ============================================================ # NounResult dataclass # ============================================================ @dataclass class NounResult: noun: str compound: str strategy: str confidence: float # ── Compound pairs ──────────────────────────────────────────── COMPOUND_MAP: dict[str, dict[str, str]] = { "μπαταρι": { "νιπτηρ": "μπαταρια νιπτηρα", "μπανιου": "μπαταρια μπανιου", "ντους": "μπαταρια ντους", "θερμοσιφ": "μπαταρια θερμοσιφωνα", "κουζιν": "μπαταρια κουζινας", "νεροχυτ": "μπαταρια νεροχυτη", "μπιντε": "μπαταρια μπιντε", }, "αναμεικτ": { "μπαταρ": "μπαταρια βρυση αναμεικτικη", "νεροχυτ": "μπαταρια βρυση αναμεικτικη", }, "γωνιακ": { "τροχ": "γωνιακος τροχος", }, "παλινδρομ": { "σεγ": "παλινδρομικη σεγα", "πριον": "παλινδρομικη σεγα", }, "βενζιν": { "αλυσοπριον": "αλυσοπριονο βενζινης", "θαμνοκοπτ": "θαμνοκοπτικο βενζινης", "χλοοκοπτ": "χλοοκοπτικο βενζινης", "γεννητρ": "γεννητρια βενζινης", "φρεζ": "φρεζα βενζινης", }, "ηλεκτρ": { "σκουπ": "ηλεκτρικη σκουπα", "πριον": "ηλεκτρικο πριονι", "τριπαν": "ηλεκτρικο τριπανο", "φουρν": "ηλεκτρικος φουρνος", "κλαδευτ": "ηλεκτρικος κλαδευτηρας", }, "ηλεκτρογ": { "βενζιν": "γεννητρια βενζινης", }, "κωπηλατ": { "νερ": "κωπηλατικη νερου", "water": "κωπηλατικη νερου", }, "συνθεσ": { "κουζιν": "συνθεση κουζινας", "κουζ": "συνθεση κουζινας", }, "συνθετ": { "κουζιν": "συνθεση κουζινας", }, } _ADJ_ENDINGS = re.compile(r'^.+(?:ικο|ικη|ικος|ικα|ινο|ινη|ινος|εινο)$') SUBORDINATE_TRIGGERS = frozenset({ "με","για","από","σε","και","ή","της","του","των","τον","την","κ", "τυπου","χρωμα","υλικο","αποχρωση", "νερου","νερο", }) STOP_WORDS = frozenset({ "με","για","από","σε","και","ή","της","του","των","τον","την","κ", "τυπου","χρωμα","υλικο","αποχρωση","ηλεκτρικ","επαγγ", "παλινδρομ","επαγγελματ","ηλεκτρικο","ηλεκτρικη","ηλεκτρικος", "βενζινοκινητο","βενζινοκινητη","βενζινοκινητος", "πετρελαιοκινητο","πνευματικ", "νερου","νερο", }) def _norm(text: str) -> str: if not text: return "" text = ''.join(c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn') return re.sub(r'\s+', ' ', re.sub(r"[^α-ω\s]", " ", text.lower())).strip() def _norm_hint(text: str) -> str: if not text: return "" text = ''.join(c for c in unicodedata.normalize('NFD', text) if unicodedata.category(c) != 'Mn') text = re.sub(r"[^α-ωa-z\s]", " ", text.lower()) return re.sub(r'\s+', ' ', text).strip() NOUN_WHITELIST_PREFIXES = frozenset({ "νεροχυτ", "νεροπριον", "νεροβαρ", }) def _word_ok(w: str) -> bool: if any(w.startswith(wl) for wl in NOUN_WHITELIST_PREFIXES): return True return (len(w) >= 3 and w not in NOUN_BLACKLIST and not any(w.startswith(ex) for ex in EXCLUDED_PREFIXES) and not _COLOR_PATTERN.fullmatch(w)) def _is_adj(w: str) -> bool: return bool(_ADJ_ENDINGS.match(w)) def _find_compound(words: list[str]) -> str | None: for i, left in enumerate(words): for left_stem, right_map in COMPOUND_MAP.items(): if left.startswith(left_stem): for right in words[i+1:i+6]: for right_stem, query in right_map.items(): if right.startswith(right_stem): return query return None # ============================================================ # extract_all_noun_candidates # ============================================================ def extract_all_noun_candidates( title_raw: str, cat_name_raw: str = "", nlp_doc=None, ) -> list[NounResult]: t = title_raw.lower() for bigram in _COLOR_BIGRAMS: t = t.replace(bigram, " ") t = _COLOR_PATTERN.sub(' ', t) t = re.sub(r'\b\d+[\.,]?\d*\s*[xXχΧ×]\s*\d+[\.,]?\d*(?:\s*[xXχΧ×]\s*\d+[\.,]?\d*)?\s*[a-zA-Zα-ωΑ-Ω]*\b', ' ', t) t = re.sub(r'\b\d+[\.,]?\d*\s*(?:cm|εκ|mm|m|W|kg|lt|l|cc|rpm|db)\b', ' ', t, flags=re.IGNORECASE) t = re.sub(r'\b[A-Z]{1,4}[-\s]?[A-Z0-9]{2,10}\b', ' ', t) t = re.sub(r'\b\d{4,}\b', ' ', t) t = re.sub(r'\b\d+\.\d+\b', ' ', t) t = re.sub(r'\.\d+', ' ', t) cleaned = re.sub(r'\s+', ' ', t).strip() words = _norm(cleaned).split() if nlp_doc is not None: token_info = " | ".join( f"'{t.text}'[{t.pos_}{'→latin✗' if t.is_alpha and t.text.isascii() else '→adj✗' if t.pos_=='ADJ' else ''}]" for t in nlp_doc if t.is_alpha and len(t.text) >= 2 ) log.debug(f" 🔍 TOKENS: {token_info}") latin_words: set[str] = set() if nlp_doc is not None: latin_words = {_norm(t.text) for t in nlp_doc if t.is_alpha and t.text.isascii() and len(t.text) >= 2} if latin_words: log.debug(f" 🔤 latin_filter: {latin_words}") adj_words: set[str] = set() if nlp_doc is not None: adj_words = {_norm(t.text) for t in nlp_doc if t.pos_ == "ADJ"} if adj_words: log.debug(f" 🎨 adj_filter: {adj_words}") candidates: list[NounResult] = [] seen_compounds: set[str] = set() def _add(r: NounResult | None): if r and r.noun and r.compound not in seen_compounds: seen_compounds.add(r.compound) candidates.append(r) def _ok(w: str) -> bool: return _word_ok(w) and w not in adj_words and w not in latin_words # S1: COMPOUND compound = _find_compound(words) if compound: noun = compound.split()[0] _add(NounResult(noun=noun, compound=compound, strategy="compound", confidence=0.95)) # S2: FIRST NOUN for i, w in enumerate(words): if w in SUBORDINATE_TRIGGERS: break if _ok(w) and not _is_adj(w): compound_noun = w if i + 1 < len(words): nxt = words[i + 1] if (nxt not in SUBORDINATE_TRIGGERS and _ok(nxt) and not _is_adj(nxt) and nxt != w): compound_noun = f"{w} {nxt}" _add(NounResult(noun=w, compound=compound_noun, strategy="first_noun", confidence=0.80)) break # S3: SKIP ADJ skipped = False for w in words: if w in SUBORDINATE_TRIGGERS: break if _is_adj(w) or w in STOP_WORDS or w in adj_words: skipped = True continue if _ok(w) and skipped: idx = words.index(w) c2 = _find_compound(words[idx:]) if c2: noun = c2.split()[0] _add(NounResult(noun=noun, compound=c2, strategy="skip_adj+compound", confidence=0.90)) else: _add(NounResult(noun=w, compound=w, strategy="skip_adj", confidence=0.75)) break # S4: BRAND SKIP raw_words = re.split(r'[\s\-–_]+', title_raw.strip()) found_brand = False for rw in raw_words: clean_rw = re.sub(r'[^a-zA-Zα-ωΑ-Ω]', '', rw) if not clean_rw: continue if clean_rw == clean_rw.upper() and len(clean_rw) >= 3: found_brand = True continue if found_brand: norm_rw = _norm(clean_rw) try: idx = words.index(norm_rw) c2 = _find_compound(words[idx:]) if c2: noun = c2.split()[0] _add(NounResult(noun=noun, compound=c2, strategy="brand+compound", confidence=0.88)) elif _ok(norm_rw) and not _is_adj(norm_rw): _add(NounResult(noun=norm_rw, compound=norm_rw, strategy="brand_skip", confidence=0.72)) except ValueError: pass break # S5: LAST BEFORE TRIGGER last_valid = None for w in words: if w in SUBORDINATE_TRIGGERS: if last_valid: _add(NounResult(noun=last_valid, compound=last_valid, strategy="last_before_trigger", confidence=0.70)) break if _ok(w) and not _is_adj(w): last_valid = w # S6: LONGEST cands_w = [w for w in words if _ok(w) and not _is_adj(w) and w not in SUBORDINATE_TRIGGERS] if cands_w: best_w = max(cands_w, key=len) if len(best_w) >= 6: _add(NounResult(noun=best_w, compound=best_w, strategy="longest", confidence=0.65)) # S7: spaCy ROOT if nlp_doc is not None: for token in nlp_doc: w = _norm(token.text) if (token.dep_ in ("ROOT","nsubj","obj") and token.pos_ in ("NOUN","PROPN") and _ok(w)): _add(NounResult(noun=w, compound=w, strategy="spacy_root", confidence=0.68)) break # S8: CAT NAME if cat_name_raw: segments = [s.strip() for s in _CAT_NAME_SEP.split(cat_name_raw) if s.strip()] for seg in reversed(segments): norm_seg = _norm(seg) if norm_seg and norm_seg not in CAT_NAME_JUNK and len(norm_seg) > 3: seg_words = [w for w in norm_seg.split() if _ok(w) and not _is_adj(w)] if seg_words: _add(NounResult(noun=seg_words[0], compound=seg_words[0], strategy="cat_name", confidence=0.62)) break # S9: SECOND WORD valid_words = [w for w in words[:6] if _ok(w) and not _is_adj(w) and w not in STOP_WORDS] if len(valid_words) >= 2: _add(NounResult(noun=valid_words[1], compound=valid_words[1], strategy="second_word", confidence=0.55)) # S10: LAST RESORT for w in words: if len(w) >= 3 and not _COLOR_PATTERN.fullmatch(w): _add(NounResult(noun=w, compound=w, strategy="last_resort", confidence=0.20)) break if not candidates: candidates.append(NounResult(noun="", compound="", strategy="none", confidence=0.0)) candidates.sort(key=lambda r: r.confidence, reverse=True) top = candidates[0] log.debug(f" 📌 NOUN: '{top.noun}' compound='{top.compound}' strategy={top.strategy} conf={top.confidence:.2f}") return candidates def extract_commercial_noun(title_raw, cat_name_raw="", nlp_doc=None) -> NounResult: return extract_all_noun_candidates(title_raw, cat_name_raw, nlp_doc)[0] def make_query(noun_result: NounResult) -> str: if not noun_result.noun: return "" if noun_result.compound and noun_result.compound != noun_result.noun: parts = [noun_result.compound, noun_result.compound, noun_result.noun] else: parts = [noun_result.noun] * 3 return " ".join(parts) # ============================================================ # SEMANTIC NOUN RANKER # ============================================================ def semantic_noun_rank( candidates: list[NounResult], embeddings_map: dict, faiss_indexes: dict, top_k_local: int = 3, ) -> NounResult: if not candidates: return NounResult(noun="", compound="", strategy="none", confidence=0.0) if len(candidates) == 1: return candidates[0] scored: list[tuple[float, NounResult]] = [] for nr in candidates: if not nr.noun: continue lookup_keys: list[str] = [] if nr.compound: lookup_keys.append(nr.compound) if nr.noun != nr.compound: lookup_keys.append(nr.noun) if nr.compound and " " in nr.compound: lookup_keys.extend(nr.compound.split()) max_faiss = 0.0 for lk in lookup_keys: if lk not in embeddings_map: continue q_vec = embeddings_map[lk] for idx_key in SEMANTIC_INDEX_SUBSET: if idx_key not in faiss_indexes: continue weight = INDEX_WEIGHTS.get(idx_key, 1.0) try: D, _ = faiss_indexes[idx_key]["index"].search(q_vec, top_k_local) raw = float(D[0][0]) if len(D[0]) > 0 else 0.0 ws = raw * weight if ws > max_faiss: max_faiss = ws except Exception: continue if max_faiss < FAISS_MIN: blended = 0.0 else: boost = SEMANTIC_STRATEGY_BOOST.get(nr.strategy, 0.85) blended = max_faiss * nr.confidence * boost scored.append((blended, nr)) log.debug( f" 🔍 sem_rank | {nr.noun:<16} [{nr.strategy:<22}] " f"faiss={max_faiss:.4f} conf={nr.confidence:.2f} → blended={blended:.4f}" ) if not scored: return candidates[0] scored.sort(key=lambda x: x[0], reverse=True) top_score, top_nr = scored[0] fallback_threshold = SEMANTIC_MIN_SCORE * candidates[0].confidence * 0.5 if top_score < fallback_threshold: log.debug( f" ⚡ sem_rank FALLBACK (top={top_score:.4f} < thr={fallback_threshold:.4f})" f" → rule-based: {candidates[0].noun!r}" ) return candidates[0] if top_nr is not candidates[0]: log.info( f" 🎯 sem_rank OVERRIDE: {candidates[0].noun!r}[{candidates[0].strategy}]" f" → {top_nr.noun!r}[{top_nr.strategy}]" f" ({scored[-1][0]:.3f} → {top_score:.3f})" ) return top_nr # ============================================================ # FIX 2: DYNAMIC THRESHOLD per batch # ============================================================ _batch_scores: list[float] = [] def reset_batch_scores(): global _batch_scores _batch_scores = [] def record_batch_score(score: float): if score > 0: _batch_scores.append(score) def get_dynamic_low_threshold() -> float: if len(_batch_scores) < DYNAMIC_THRESH_MIN_SAMPLES: return LOW_SCORE_FALLBACK thresh = float(np.percentile(_batch_scores, DYNAMIC_THRESH_PERCENTILE)) thresh = max(DYNAMIC_THRESH_MIN, min(DYNAMIC_THRESH_MAX, thresh)) log.debug(f" 📊 dynamic_threshold: {thresh:.3f} (from {len(_batch_scores)} scores, p{DYNAMIC_THRESH_PERCENTILE})") return thresh # ============================================================ # CATEGORY CACHE # ============================================================ cat_cache: dict = {} cat_name_idx: dict = {} def preload_categories(db): c = db.cursor(dictionary=True) try: c.execute("SELECT id, name, full_path, parent_id, level FROM categories") rows = c.fetchall() for row in rows: cid = row["id"] full_path = (row["full_path"] or row["name"] or "").strip() cat_cache[cid] = { "name": row["name"] or "", "full_path": full_path, "level": row["level"] or 0, "parent_id": row["parent_id"], "leaf": row["name"] or "", } norm = _norm(row["name"] or "") cat_name_idx.setdefault(norm, []).append(cid) parent_ids = {row["parent_id"] for row in rows if row["parent_id"]} for cid, data in cat_cache.items(): data["is_leaf"] = cid not in parent_ids for cid, data in cat_cache.items(): fp = data["full_path"] if " > " in fp: data["leaf"] = fp.split(" > ")[-1].strip() else: data["leaf"] = fp.strip() log.info(f"📂 Loaded {len(cat_cache)} categories") leaf_count = sum(1 for d in cat_cache.values() if d["is_leaf"]) log.info(f"🍃 Leaf categories: {leaf_count}") finally: c.close() def get_cat_name(cid: int) -> str: return cat_cache.get(cid, {}).get("name", "UNKNOWN") def _hint_path_compatible(cid: int, hint_clean: str) -> bool: if not hint_clean: return True full_path = (cat_cache.get(cid, {}).get("full_path") or "").lower() if not full_path: return True hint_words = [w for w in _norm(hint_clean).split() if len(w) >= 4] if not hint_words: return True return any(w in full_path for w in hint_words) def get_cat_path(cid: int) -> str: return cat_cache.get(cid, {}).get("full_path", get_cat_name(cid)) # ============================================================ # CATEGORY FAISS INDEX # ============================================================ _cat_index = None _cat_ids = [] CAT_INDEX_FILE = os.path.join(CACHE_DIR, "cat_leaf_index.faiss") CAT_IDS_FILE = os.path.join(CACHE_DIR, "cat_leaf_index_ids.npy") def build_category_faiss(model): global _cat_index, _cat_ids if os.path.exists(CAT_INDEX_FILE) and os.path.exists(CAT_IDS_FILE): log.info("⚡ Loading category FAISS index from cache...") _cat_index = faiss.read_index(CAT_INDEX_FILE) _cat_ids = np.load(CAT_IDS_FILE).tolist() log.info(f"✅ Category FAISS loaded ({len(_cat_ids)} entries)") return ids = [cid for cid, d in cat_cache.items() if d["is_leaf"]] texts = [ "passage: " + cat_cache[cid]["full_path"] + " | " + cat_cache[cid]["leaf"] for cid in ids ] log.info(f"🧠 Encoding {len(texts)} LEAF category paths...") vecs = model.encode(texts, batch_size=128, normalize_embeddings=True, show_progress_bar=True) dim = vecs.shape[1] _cat_index = faiss.IndexFlatIP(dim) _cat_index.add(vecs.astype(np.float32)) _cat_ids = ids faiss.write_index(_cat_index, CAT_INDEX_FILE) np.save(CAT_IDS_FILE, np.array(ids)) log.info("✅ Category FAISS index ready") # ============================================================ # VALIDATION # ============================================================ CATEGORY_REQUIRED_HINTS: dict[str, list[str]] = {} CATEGORY_EXPLICIT_BLOCKS: dict[str, list[str]] = {} CATEGORY_REQUIRED_CONTEXT: dict[str, dict] = {} def category_valid(cid: int, title_norm: str) -> bool: return True def noun_fits(main_noun: str, cid: int) -> bool: if not main_noun: return True cat_norm = _norm(get_cat_name(cid)) cat_words = [w for w in cat_norm.split() if len(w) > 3] mn = _norm(main_noun) return any(mn[:5] in w or w[:5] in mn[:5] for w in cat_words) # ============================================================ # FEEDBACK CACHE # ============================================================ # ============================================================ # SHOP CATEGORY LEARN # ============================================================ MIN_PRODUCTS = 2 def _learn_populate_raw(db): log.info("📋 shop_cat_learn: populating shop_category_raw...") c = db.cursor() try: c.execute(""" INSERT INTO shop_category_raw (shop_id, category_name_raw, product_count) SELECT shop_id, category_name, COUNT(*) as product_count FROM product_sources WHERE category_name IS NOT NULL AND TRIM(category_name) != '' AND shop_id IS NOT NULL GROUP BY shop_id, category_name ON DUPLICATE KEY UPDATE product_count = VALUES(product_count), updated_at = NOW() """) db.commit() log.info(f"✅ shop_category_raw: {c.rowcount} rows inserted/updated") finally: c.close() def _learn_populate_resolved(db): log.info("🔍 shop_cat_learn: populating shop_category_resolved...") c = db.cursor(dictionary=True) try: c.execute(""" SELECT ps.shop_id, ps.category_name, ps.category_id, COUNT(*) as cnt FROM product_sources ps WHERE ps.category_name IS NOT NULL AND TRIM(ps.category_name) != '' AND ps.category_id IS NOT NULL AND ps.shop_id IS NOT NULL GROUP BY ps.shop_id, ps.category_name, ps.category_id ORDER BY ps.shop_id, ps.category_name, cnt DESC """) rows = c.fetchall() finally: c.close() if not rows: log.warning("⚠️ shop_cat_learn: no data found") return best: dict = {} for row in rows: key = (row["shop_id"], row["category_name"]) if key not in best: best[key] = {"cid": row["category_id"], "hits": row["cnt"]} filtered = {k: v for k, v in best.items() if v["hits"] >= MIN_PRODUCTS} log.info(f"📊 shop_cat_learn: {len(filtered)} reliable shop+category combinations") ins = db.cursor() inserted = skipped = 0 try: for (shop_id, cat_name), data in filtered.items(): confidence = min(5.0, 2.0 + data["hits"] * 0.1) try: ins.execute(""" INSERT INTO shop_category_resolved (shop_id, category_name_raw, category_id, confidence, hits, verified) VALUES (%s, %s, %s, %s, %s, 0) ON DUPLICATE KEY UPDATE category_id = VALUES(category_id), confidence = GREATEST(confidence, VALUES(confidence)), hits = hits + VALUES(hits), updated_at = NOW() """, (shop_id, cat_name, data["cid"], round(confidence, 3), data["hits"])) inserted += 1 except Exception as e: skipped += 1 if inserted % 500 == 0 and inserted > 0: db.commit() db.commit() log.info(f"✅ shop_category_resolved: {inserted} inserted, {skipped} skipped") finally: ins.close() def _learn_sync_to_mappings(db): log.info("🔄 shop_cat_learn: syncing → shop_category_mappings...") c = db.cursor() try: c.execute(""" INSERT INTO shop_category_mappings (shop_id, category_name_raw, category_id, confidence, hits, verified) SELECT shop_id, category_name_raw, category_id, confidence, hits, verified FROM shop_category_resolved WHERE is_wrong = 0 ON DUPLICATE KEY UPDATE category_id = VALUES(category_id), confidence = GREATEST(shop_category_mappings.confidence, VALUES(confidence)), hits = shop_category_mappings.hits + VALUES(hits), updated_at = NOW() """) db.commit() log.info(f"✅ shop_category_mappings synced: {c.rowcount} rows") finally: c.close() def run_shop_cat_learn(db): log.info("🏪 ── SHOP CATEGORY LEARNING START ──────────────────────") _learn_populate_raw(db) _learn_populate_resolved(db) _learn_sync_to_mappings(db) log.info("🏪 ── SHOP CATEGORY LEARNING DONE ───────────────────────") # ============================================================ # SHOP CATEGORY MAPPINGS # ============================================================ shop_cat_map: dict = {} def load_shop_category_mappings(db): c = db.cursor(dictionary=True) try: c.execute(""" SELECT shop_id, category_name_raw, category_id, confidence, hits, verified FROM shop_category_mappings """) for row in c.fetchall(): key = (row["shop_id"], (row["category_name_raw"] or "").strip()) shop_cat_map[key] = { "cid": row["category_id"], "confidence": float(row["confidence"] or 0), "hits": row["hits"] or 1, "verified": row["verified"] or 0, } log.info(f"🏪 Loaded {len(shop_cat_map)} shop→category mappings") finally: c.close() SHOP_CAT_JUNK = frozenset({ "root catalog", "root", "catalog", "default category", "home", "αρχικη", "αρχικη σελιδα", "shop", "eshop", "διαφορα", "λοιπα", "uncategorized", "other", "αλλα", "γενικα", "προιοντα", "products", "items", "γενικη", "κατηγορια", "misc", "general", "no category", "none", }) def get_shop_category(shop_id: int, category_name_raw: str) -> int | None: if not category_name_raw: return None norm = category_name_raw.strip().lower() if norm.startswith("root catalog"): return None if norm in SHOP_CAT_JUNK: return None key = (shop_id, category_name_raw.strip()) m = shop_cat_map.get(key) if not m: return None if m["verified"] or m["hits"] >= 2: return m["cid"] return None def save_shop_category_mapping(db, shop_id: int, category_name_raw: str, cid: int, confidence: float): if not category_name_raw or not category_name_raw.strip(): return raw = category_name_raw.strip() key = (shop_id, raw) c = db.cursor() try: c.execute(""" INSERT INTO shop_category_mappings (shop_id, category_name_raw, category_id, confidence, hits) VALUES (%s, %s, %s, %s, 1) ON DUPLICATE KEY UPDATE category_id = VALUES(category_id), confidence = GREATEST(confidence, VALUES(confidence)), hits = hits + 1, updated_at = NOW() """, (shop_id, raw, cid, round(confidence, 3))) db.commit() existing = shop_cat_map.get(key, {"hits": 0, "verified": 0}) shop_cat_map[key] = { "cid": cid, "confidence": max(confidence, existing.get("confidence", 0)), "hits": existing["hits"] + 1, "verified": existing.get("verified", 0), } finally: c.close() # ============================================================ # DYNAMIC FEEDBACK THRESHOLDS # ============================================================ @dataclass class CatScoreStats: scores: list = field(default_factory=list) @property def mean(self) -> float: return float(np.mean(self.scores)) if self.scores else FEEDBACK_THRESHOLD @property def std(self) -> float: return float(np.std(self.scores)) if len(self.scores) > 1 else 1.0 def dynamic_threshold(self) -> float: if len(self.scores) < 5: return FEEDBACK_THRESHOLD raw = self.mean - DYN_THRESH_K * self.std return max(DYN_THRESH_FLOOR, min(DYN_THRESH_CEIL, raw)) cat_score_stats: dict[int, CatScoreStats] = defaultdict(CatScoreStats) def dynamic_threshold_for(cid: int) -> float: return cat_score_stats[cid].dynamic_threshold() def record_cat_score(cid: int, score: float): cat_score_stats[cid].scores.append(score) # ============================================================ # CATEGORY STATS # ============================================================ category_stats: dict = {} def update_cat_stats(cid, score): s = category_stats.setdefault(cid, {"count": 0, "total": 0.0}) s["count"] += 1 s["total"] += score def is_suspicious(cid) -> bool: s = category_stats.get(cid) if not s or s["count"] < 5: return False return (s["total"] / s["count"]) < 3.5 # ============================================================ # FAISS SEARCH # ============================================================ def faiss_search(q_vec: np.ndarray, faiss_indexes: dict) -> dict: scores = {} for key, _ in INDEXES: idx = faiss_indexes[key] D, I = idx["index"].search(q_vec, TOP_K) for score, i in zip(D[0], I[0]): if score < FAISS_MIN: continue cid = int(idx["ids"][i]) scores[cid] = scores.get(cid, 0) + float(score) * INDEX_WEIGHTS[key] return scores def es_search_category(es: Elasticsearch, title: str, top_k: int = 10) -> dict: """ Ψάχνει κατηγορία στο Elasticsearch με τον τίτλο του προϊόντος. Επιστρέφει dict {cat_id: score} μόνο για leaf κατηγορίες. """ if not title or not title.strip(): return {} try: r = es.search(index=ES_CAT_INDEX, query={ "multi_match": { "query": title[:400], "fields": ["name^2", "full_path^1.5", "leaf^2"], "type": "best_fields", } }, size=top_k) scores = {} for hit in r["hits"]["hits"]: src = hit["_source"] cat_id = int(src.get("cat_id", 0)) is_leaf = src.get("is_leaf", False) if not cat_id or not is_leaf: continue scores[cat_id] = float(hit["_score"]) return scores except Exception as e: log.warning(f" ⚠️ ES search failed: {e}") return {} def combined_category_search( es: Elasticsearch, title: str, q_vec: np.ndarray, faiss_indexes: dict, ) -> dict: """ Συνδυάζει FAISS + ES scores για καλύτερο αποτέλεσμα. Κανονικοποιεί και τα δύο στο [0,1] και τα συνδυάζει με βάρη. """ faiss_scores = faiss_search(q_vec, faiss_indexes) es_scores = es_search_category(es, title) # Κανονικοποίηση FAISS max_faiss = max(faiss_scores.values()) if faiss_scores else 1.0 faiss_norm = {cid: sc / max_faiss for cid, sc in faiss_scores.items()} # Κανονικοποίηση ES max_es = max(es_scores.values()) if es_scores else 1.0 es_norm = {cid: sc / max_es for cid, sc in es_scores.items()} # Συνδυασμός all_cids = set(faiss_norm) | set(es_norm) combined = {} for cid in all_cids: f = faiss_norm.get(cid, 0.0) * FAISS_WEIGHT e = es_norm.get(cid, 0.0) * ES_WEIGHT combined[cid] = f + e return combined # ============================================================ # CROSS-ENCODER RE-RANKER # ============================================================ _cross_encoder: CrossEncoder | None = None def load_cross_encoder(): global _cross_encoder log.info(f"🔀 Loading cross-encoder: {CROSS_ENCODER_MODEL}...") _cross_encoder = CrossEncoder(CROSS_ENCODER_MODEL) log.info("✅ Cross-encoder ready") def rerank_with_cross_encoder( title: str, candidates: dict, top_n: int = CROSS_ENCODER_TOP_N, ) -> dict: if _cross_encoder is None or not candidates: return candidates sorted_cids = sorted(candidates, key=candidates.get, reverse=True) top_cids = sorted_cids[:top_n] rest_cids = sorted_cids[top_n:] pairs = [ (title, cat_cache[cid]["full_path"]) for cid in top_cids if cid in cat_cache ] if not pairs: return candidates ce_scores = _cross_encoder.predict(pairs) ce_norm = 1 / (1 + np.exp(-np.array(ce_scores))) max_faiss = max(candidates[c] for c in top_cids) or 1.0 blended = {} for cid, ce in zip(top_cids, ce_norm): faiss_norm = candidates[cid] / max_faiss blended[cid] = ((1 - CROSS_ENCODER_WEIGHT) * faiss_norm * max_faiss + CROSS_ENCODER_WEIGHT * float(ce) * max_faiss) for cid in rest_cids: blended[cid] = candidates[cid] return blended # ============================================================ # MULTI-STRATEGY VOTING SEARCH # ============================================================ def _run_single_strategy( nr: NounResult, embeddings_map: dict, faiss_indexes: dict, title_norm: str, title_raw: str, weight: float, merged: dict, ): if not nr.compound or nr.compound not in embeddings_map: return all_queries = [nr.compound] parts = nr.compound.split() if len(parts) >= 2: all_queries.extend(parts) for q_key in all_queries: if q_key not in embeddings_map: continue q_vec = embeddings_map[q_key] q_w = (3.0 if q_key == nr.compound else 0.5) * weight # Χρησιμοποιεί combined FAISS + ES αν υπάρχει ES if hasattr(_run_single_strategy, '_es') and _run_single_strategy._es: scores = combined_category_search(_run_single_strategy._es, title_raw, q_vec, faiss_indexes) else: scores = faiss_search(q_vec, faiss_indexes) for cid, sc in scores.items(): merged[cid] = merged.get(cid, 0) + sc * q_w SHOP_HINT_BOOST = 5.0 def _get_shop_hint_branch(category_hint: str) -> set[int] | None: if not category_hint or len(category_hint) < 4: return None hint_norm = _norm_hint(category_hint) hint_words = [w for w in hint_norm.split() if len(w) >= 4] if not hint_words: return None best_branch = None best_score = 0 for cid, data in cat_cache.items(): if data.get("level", 0) not in (1, 2, 3): continue fp_norm = _norm_hint(data["full_path"]) score = sum(1 for w in hint_words if w[:5] in fp_norm) if score > best_score: best_branch = data["full_path"] best_score = score if not best_branch or best_score == 0: return None root = best_branch.split(" > ")[0] branch_cids = { cid for cid, d in cat_cache.items() if d["is_leaf"] and d.get("full_path", "").startswith(root) } if len(branch_cids) < 3: return None log.debug(f" 🗂️ hint_branch: '{category_hint}' → {best_branch} ({len(branch_cids)} cats)") return branch_cids def smart_search( candidates: list[NounResult], embeddings_map: dict, faiss_indexes: dict, title_norm: str, title_raw: str, full_title_vec: np.ndarray = None, shop_cat_hint: str = "", es = None, ) -> dict: merged_scores: dict = {} primary_nr = semantic_noun_rank(candidates, embeddings_map, faiss_indexes) hint_branch = _get_shop_hint_branch(shop_cat_hint) top_cands = candidates[:TOP_N_STRATEGIES] for nr, w in zip(top_cands, STRATEGY_VOTE_W): _run_single_strategy(nr, embeddings_map, faiss_indexes, title_norm, title_raw, w, merged_scores) if hint_branch and merged_scores: for cid in list(merged_scores.keys()): if cid in hint_branch: merged_scores[cid] += SHOP_HINT_BOOST log.debug(f" 💡 hint_boost +{SHOP_HINT_BOOST} applied to {len(hint_branch)} branch cats") dyn_low = get_dynamic_low_threshold() best_so_far = max(merged_scores.values()) if merged_scores else 0.0 if best_so_far < dyn_low and full_title_vec is not None: t_scores = faiss_search(full_title_vec, faiss_indexes) if t_scores: t_scores = rerank_with_cross_encoder(title_raw, t_scores) for t_cid in sorted(t_scores, key=t_scores.get, reverse=True)[:5]: if not is_suspicious(t_cid) and category_valid(t_cid, title_norm): if t_scores[t_cid] > best_so_far: log.info(f" 🔄 full_title → {get_cat_name(t_cid)} ({t_scores[t_cid]:.3f})") merged_scores[t_cid] = t_scores[t_cid] break scores = merged_scores if scores: top3 = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:3] top3_str = " | ".join(f"{cat_cache.get(cid,{}).get('name','?')}={sc:.2f}" for cid, sc in top3) log.info(f" 🏆 FAISS TOP3: {top3_str}") if not scores: return { "winner_cid": None, "winner_score": 0.0, "winner_source": primary_nr.strategy, "winner_noun": primary_nr.noun, "winner_compound": primary_nr.compound, "breakdown": {"strategy": primary_nr.strategy, "noun": primary_nr.noun, "compound": primary_nr.compound, "cid": None, "score": 0.0, "cat": "—"}, } sorted_cids = sorted(scores, key=scores.get, reverse=True) best_cid = sorted_cids[0] best_score = scores[best_cid] if is_suspicious(best_cid) and len(sorted_cids) > 1: best_cid = sorted_cids[1] best_score = scores[best_cid] if best_cid: if not noun_fits(primary_nr.noun, best_cid): for alt in sorted_cids[1:6]: if noun_fits(primary_nr.noun, alt): log.info(f" 🔀 noun_fit_skip: {get_cat_name(best_cid)} → {get_cat_name(alt)}") best_cid = alt best_score = scores[alt] break bd = { "strategy": primary_nr.strategy, "noun": primary_nr.noun, "compound": primary_nr.compound, "cid": best_cid, "score": best_score, "cat": get_cat_name(best_cid) if best_cid else "—", } sorted_scores = sorted(scores.values(), reverse=True)[:3] return { "winner_cid": best_cid, "winner_score": best_score, "top1_score": sorted_scores[0] if len(sorted_scores) > 0 else 0.0, "top2_score": sorted_scores[1] if len(sorted_scores) > 1 else 0.0, "top3_score": sorted_scores[2] if len(sorted_scores) > 2 else 0.0, "winner_source": primary_nr.strategy, "winner_noun": primary_nr.noun, "winner_compound": primary_nr.compound, "breakdown": bd, } def log_smart_search(p_id, result: dict): bd = result["breakdown"] score = f"{bd['score']:.3f}" if bd["score"] else "—" noun = bd["noun"] or "—" comp = bd["compound"] if bd["compound"] != bd["noun"] else "" comp_str = f" [{comp}]" if comp else "" log.info(f" 🧠 {bd['strategy']:<22} noun={noun:<18}{comp_str} → {bd['cat']} ({score})") # ============================================================ # RESOLVE CATEGORY # ============================================================ def normalize_title_key(title: str) -> str: if not title: return "" t = title.lower().strip() t = re.sub(r'[^a-z0-9α-ω\s]', ' ', t) return re.sub(r'\s+', ' ', t).strip() def clean_category_hint(raw: str) -> str: if not raw: return "" ascii_ratio = len([c for c in raw if c.isascii() and c.isalpha()]) / max(len(raw), 1) greek_ratio = len([c for c in raw if '\u0370' <= c <= '\u03ff' or '\u1f00' <= c <= '\u1fff']) / max(len(raw), 1) if ascii_ratio > 0.6 and greek_ratio < 0.1: log.debug(f" 🗑️ junk hint filtered: '{raw[:50]}'") return "" segments = [s.strip() for s in re.split(r'[>|/\\]', raw) if s.strip()] useful = [] for seg in segments: sl = seg.lower() if "root" in sl or "catalog" in sl: continue if re.search(r'\d', seg): alpha_chars = len(re.findall(r'[α-ωa-zΑ-ΩA-Z]', seg)) digit_chars = len(re.findall(r'[\d/x×χ.,\s]', seg)) if digit_chars >= alpha_chars: continue if re.match(r'^[\d\s/xχXΧ×.,cm εκmm]+$', seg, re.IGNORECASE): continue seg_clean = re.sub(r'\.', ' ', seg).strip() if len(seg_clean) >= 2: useful.append(seg_clean) if not useful: return "" return " ".join(useful)[:60] def resolve_category( p, smart: dict, title_key: str, faiss_indexes: dict, db, shop_hint_clean: str = "" ) -> tuple[int, float, str]: title_norm = _norm(p.get("name", "")) source_cid = p.get("category_id") winner_cid = smart["winner_cid"] winner_score = smart["winner_score"] winner_src = smart["winner_source"] winner_compound = smart.get("winner_compound", "") SAFE_SCORE_THRESHOLD = 6.0 if winner_cid and winner_score < 20.0: log.warning(f" ⚠️ score {winner_score:.3f} < 20.0 → quarantine") return -1, winner_score, f"low_score_{winner_score:.2f}" def _apply_safety_check(cid: int, score: float, method: str): if not cid: return None return cid, score, method if winner_cid and winner_score >= HIGH_SCORE: if winner_score < SAFE_SCORE_THRESHOLD: result = _apply_safety_check(winner_cid, winner_score, f"faiss_high[{winner_src}]") if result: return result else: return winner_cid, winner_score, f"faiss_high[{winner_src}]" if winner_cid and HIGH_SCORE > winner_score >= LOW_SCORE_FALLBACK: result = _apply_safety_check(winner_cid, winner_score, f"faiss_med[{winner_src}]") if result: return result dyn_low = get_dynamic_low_threshold() if source_cid and source_cid in cat_cache: return source_cid, 2.0, "fallback_source_cid" return -1, 0.0, "skip" # ============================================================ # MASTER PRODUCT # ============================================================ def get_or_create_master(db, p, final_cid) -> int: c = db.cursor(dictionary=True) try: mpn = p.get("mpn") title_key = normalize_title_key(p.get("name", "")) if mpn: c.execute("SELECT id FROM products WHERE main_mpn=%s LIMIT 1", (mpn,)) r = c.fetchone() if r: return r["id"] c.execute("SELECT id FROM products WHERE title_group_key=%s LIMIT 1", (title_key,)) r = c.fetchone() if r: return r["id"] base_slug = slugify(p["name"]) slug, counter = base_slug, 1 while True: c.execute("SELECT id FROM products WHERE slug=%s LIMIT 1", (slug,)) if not c.fetchone(): break slug = f"{base_slug}-{counter}"; counter += 1 c.execute(""" INSERT INTO products ( name, slug, image_url, created_at, updated_at, clicks, favorites, best_price, best_title, best_image, main_mpn, category_id, suggested_category_id, title_group_key, ai_mapped ) VALUES (%s,%s,%s,NOW(),NOW(),0,0,%s,%s,%s,%s,%s,%s,%s,1) """, ( p["name"], slug, p.get("image_url"), p.get("price"), p["name"], p.get("image_url"), mpn, final_cid, final_cid, title_key )) db.commit() return c.lastrowid finally: c.close() def update_best_price(db, product_id): c = db.cursor(dictionary=True) try: c.execute(""" SELECT price, name, image_url FROM product_sources WHERE product_id=%s AND is_active=1 AND price IS NOT NULL ORDER BY price ASC LIMIT 1 """, (product_id,)) best = c.fetchone() if not best: return c.execute(""" UPDATE products SET best_price=%s, best_title=%s, best_image=%s, updated_at=NOW() WHERE id=%s """, (best["price"], best["name"], best["image_url"], product_id)) db.commit() finally: c.close() def save_unresolved(db, p: dict, noun: str, reason: str): c = db.cursor() try: c.execute(""" INSERT INTO ai_unresolved_products (source_id, shop_id, name, mpn, category_name_raw, noun, reason) VALUES (%s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE reason = VALUES(reason), noun = VALUES(noun), category_name_raw= VALUES(category_name_raw), created_at = NOW() """, ( p.get("id"), p.get("shop_id"), (p.get("name") or "")[:255], (p.get("mpn") or "")[:100], (p.get("category_name") or "")[:255], (noun or "")[:100], reason[:100], )) db.commit() except Exception as e: log.warning(f" ⚠️ save_unresolved failed: {e}") finally: c.close() def save_unresolved_with_suggestion(db, p: dict, noun: str, score: float, suggested_cid: int): c = db.cursor() try: c.execute(""" INSERT INTO ai_unresolved_products (source_id, shop_id, name, mpn, category_name_raw, noun, reason, suggested_category_id, ai_score) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE noun = VALUES(noun), reason = VALUES(reason), suggested_category_id= VALUES(suggested_category_id), ai_score = VALUES(ai_score), category_name_raw = VALUES(category_name_raw), created_at = NOW() """, ( p.get("id"), p.get("shop_id"), (p.get("name") or "")[:255], (p.get("mpn") or "")[:100], (p.get("category_name") or "")[:255], (noun or "")[:100], f"review_{score:.1f}", suggested_cid, round(score, 2), )) db.commit() except Exception as e: log.warning(f" ⚠️ save_unresolved_with_suggestion failed: {e} — trying fallback") save_unresolved(db, p, noun, f"review_{score:.1f}") finally: c.close() PROGRESS_FILE = os.path.join(CACHE_DIR, "xp7000_progress.json") def load_progress() -> dict: if os.path.exists(PROGRESS_FILE): try: with open(PROGRESS_FILE, "r") as f: return json.load(f) except Exception: pass return {} def save_progress(progress: dict): with open(PROGRESS_FILE, "w") as f: json.dump(progress, f, indent=2) def clear_progress(): if os.path.exists(PROGRESS_FILE): os.remove(PROGRESS_FILE) # ============================================================ # PROCESS ONE SHOP # ============================================================ def process_shop( shop_id: int, db, cur, model, nlp_el, faiss_indexes: dict, es, writer, tz, stats: dict, resume_offset: int = 0, shop_names: dict = None, ) -> int: if shop_names is None: shop_names = {} offset = resume_offset total_processed = 0 log.info(f"🏪 ── PRODUCTS START (offset={offset}) ─────────────────") while True: cur.execute(""" SELECT id, name, best_title, image_url, description, category_id, main_mpn as mpn, best_price as price, brand, manufacturer FROM products WHERE is_active=1 ORDER BY id ASC LIMIT %s OFFSET %s """, (BATCH_SIZE, offset)) products = cur.fetchall() if not products: break if total_processed + len(products) > PRODUCT_LIMIT: products = products[:max(0, PRODUCT_LIMIT - total_processed)] if not products: break reset_batch_scores() cleaned_texts = [] for p in products: raw = p.get("name","").lower() raw = re.sub(r'\b[a-z]+\b', ' ', raw) raw = re.sub(r'\s+', ' ', raw).strip() cleaned_texts.append(raw) spacy_docs = list(nlp_el.pipe(cleaned_texts, batch_size=64)) product_all_candidates: list[list[NounResult]] = [] all_unique_compounds: set[str] = set() for p, doc in zip(products, spacy_docs): cands = extract_all_noun_candidates( title_raw = p.get("name",""), cat_name_raw = p.get("category_name",""), nlp_doc = doc, ) product_all_candidates.append(cands) for nr in cands: if nr.compound: all_unique_compounds.add(nr.compound) parts = nr.compound.split() if len(parts) >= 2: all_unique_compounds.add(parts[0]) all_unique_compounds.add(parts[-1]) if not all_unique_compounds: total_processed += len(products) offset += BATCH_SIZE continue unique_list = list(all_unique_compounds) full_titles = [] for p in products: title = (p.get("name","") or "")[:100] cat_hint = clean_category_hint(p.get("category_name","") or "") combined = f"{title} {cat_hint}".strip() if cat_hint else title full_titles.append("query: " + combined) all_queries = ["query: " + c for c in unique_list] + full_titles log.info(f" 🔢 Shop {shop_id} | Encoding {len(unique_list)} compounds + {len(products)} titles...") all_encoded = model.encode( all_queries, batch_size=ENCODE_BATCH_SIZE, normalize_embeddings=True, show_progress_bar=False, ) embeddings_map = { compound: np.array([vec]) for compound, vec in zip(unique_list, all_encoded[:len(unique_list)]) } full_title_vecs = [ np.array([vec]) for vec in all_encoded[len(unique_list):] ] batch_results: list = [] for (p, all_cands), ftv in zip( zip(products, product_all_candidates), full_title_vecs ): try: title_norm = _norm(p.get("name","")) title_key = normalize_title_key(p.get("name","")) primary_nr = all_cands[0] shop_hint_raw = p.get("category_name","") or "" shop_hint_clean = clean_category_hint(shop_hint_raw) log.info(f" 🏷️ shop_cat_hint: '{shop_hint_raw[:50]}' → '{shop_hint_clean}'") verified_cid = get_shop_category(shop_id, shop_hint_raw) if verified_cid: cat_name = get_cat_name(verified_cid) cat_path = get_cat_path(verified_cid) uc = db.cursor() try: uc.execute("UPDATE products SET category_id=%s, ai_mapped=1, updated_at=NOW() WHERE id=%s", (verified_cid, p["id"])) db.commit() finally: uc.close() stats["shop_map"] = stats.get("shop_map", 0) + 1 stats["total"] += 1 print(f"\n {'─'*80}") print(f" 🏪 [{shop_id}] id={p['id']} [100% VERIFIED MAP]") print(f" 📦 ΤΙΤΛΟΣ : {p['name']}") print(f" 🏪 SHOP : {shop_names.get(shop_id, f'Shop#{shop_id}')}") print(f" 🏷️ HINT : {shop_hint_clean}") print(f" 🏆 ΚΑΤΗΓΟΡΙΑ: {cat_name}") print(f" 📍 PATH : {cat_path}") print(f" 📊 SCORE : 10.0 method=shop_map_verified") print(f" {'─'*80}\n") batch_results.append(10.0) continue if not shop_hint_clean: smart_nohint = smart_search( candidates = all_cands, embeddings_map = embeddings_map, faiss_indexes = faiss_indexes, title_norm = title_norm, title_raw = p.get("name",""), full_title_vec = ftv, shop_cat_hint = "", ) nohint_score = smart_nohint.get("winner_score", 0.0) if nohint_score >= 25.0: smart = smart_nohint else: save_unresolved(db, p, primary_nr.noun or "", "no_hint") print(f" 🔲 [{shop_id}] {p['id']:>6} | {p['name'][:45]:<45} | QUARANTINE (no_hint)") stats["quarantine"] = stats.get("quarantine", 0) + 1 batch_results.append("quarantine") continue smart = smart_search( candidates = all_cands, embeddings_map = embeddings_map, faiss_indexes = faiss_indexes, title_norm = title_norm, title_raw = p.get("name",""), full_title_vec = ftv, shop_cat_hint = shop_hint_clean, ) final_cid, final_score, method = resolve_category( p, smart, title_key, faiss_indexes, db, shop_hint_clean ) if final_cid == -1: noun_str = primary_nr.noun or "" score = smart.get("winner_score", 0.0) if not noun_str: reason = "no_noun" elif score < LOW_SCORE_FALLBACK: reason = f"low_score_{score:.2f}" elif not shop_hint_clean: reason = "no_hint" else: reason = "unresolved" save_unresolved(db, p, noun_str, reason) print(f" 🔲 [{shop_id}] {p['id']:>6} | {p['name'][:45]:<45} | QUARANTINE ({reason})") stats["quarantine"] += 1 continue if final_score < 55.0: save_unresolved_with_suggestion(db, p, primary_nr.noun or "", final_score, final_cid) cat_name = get_cat_name(final_cid) print(f"\n {'─'*80}") print(f" 🔶 [{shop_id}] id={p['id']}") print(f" 📦 ΤΙΤΛΟΣ : {p['name']}") print(f" 🔤 NOUN : {primary_nr.noun} | compound: {primary_nr.compound}") print(f" 🏪 SHOP : {shop_names.get(shop_id, f'Shop#{shop_id}')}") print(f" 🏷️ HINT : {shop_hint_clean or '—'}") print(f" 🏆 ΠΡΟΤΑΣΗ : {cat_name} (score={final_score:.2f} < 55 → REVIEW)") print(f" {'─'*80}\n") stats["review"] = stats.get("review", 0) + 1 batch_results.append(final_score) continue cat_name = get_cat_name(final_cid) cat_path = get_cat_path(final_cid) cat_raw = (p.get("category_name") or "—")[:30] icon = "🏪" if "shop_map" in method else "📚" if "feedback" in method else "✅" t1 = smart.get("top1_score", final_score) t2 = smart.get("top2_score", 0.0) t3 = smart.get("top3_score", 0.0) print(f"\n {'─'*80}") print(f" {icon} [{shop_id}] id={p['id']}") print(f" 📦 ΤΙΤΛΟΣ : {p['name']}") print(f" 🔤 NOUN : {primary_nr.noun} | compound: {primary_nr.compound} | strategy: {primary_nr.strategy}") print(f" 🏪 SHOP : {shop_names.get(shop_id, f'Shop#{shop_id}')}") print(f" 🏷️ HINT : {shop_hint_clean or '—'}") print(f" 🏆 ΚΑΤΗΓΟΡΙΑ: {cat_name}") print(f" 📍 PATH : {cat_path}") print(f" 📊 SCORE : {final_score:.2f} (T1={t1:.2f} T2={t2:.2f} T3={t3:.2f}) method={method}") print(f" {'─'*80}\n") batch_results.append(final_score) log.info(f" 📍 PATH: {get_cat_path(final_cid)}") stats[method] = stats.get(method, 0) + 1 stats["total"] += 1 dyn_thresh = dynamic_threshold_for(final_cid) if p.get("shop_id") and p.get("category_name") and final_score >= HIGH_SCORE: save_shop_category_mapping( db=db, shop_id=p["shop_id"], category_name_raw=p["category_name"], cid=final_cid, confidence=final_score, ) update_cat_stats(final_cid, final_score) master_id = get_or_create_master(db, p, final_cid) uc = db.cursor() try: uc.execute(""" UPDATE products SET category_id=%s, ai_mapped=1, updated_at=NOW() WHERE id=%s """, (final_cid, p["id"])) db.commit() finally: uc.close() writer.writerow([ datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S"), p["id"], p["name"], smart.get("winner_source", primary_nr.strategy), smart.get("winner_noun", primary_nr.noun), smart.get("winner_compound", primary_nr.compound), round(primary_nr.confidence, 2), final_cid, get_cat_name(final_cid), round(final_score, 3), method, round(dyn_thresh, 3), shop_id, ]) except Exception as e: log.error(f"❌ ERROR product {p.get('id')}: {e}", exc_info=True) stats["errors"] += 1 try: db.rollback() except Exception: pass total_processed += len(products) offset += BATCH_SIZE b_mapped = sum(1 for r in batch_results if isinstance(r, float)) b_quarantine = sum(1 for r in batch_results if r == "quarantine") b_scores = [r for r in batch_results if isinstance(r, float)] b_avg = sum(b_scores) / len(b_scores) if b_scores else 0.0 print(f" 📊 Batch offset={offset} | ✅ mapped={b_mapped} | 🔲 quarantine={b_quarantine} | avg_score={b_avg:.2f}") if total_processed >= PRODUCT_LIMIT: log.info(f"🛑 TEST MODE: σταμάτησα στα {PRODUCT_LIMIT} προϊόντα") break s_mapped = stats.get("total", 0) s_shop_map = stats.get("shop_map", 0) s_review = stats.get("review", 0) s_quarant = stats.get("quarantine", 0) s_grand = total_processed or 1 print(f"\n {'═'*70}") print(f" 📊 ΑΠΟΤΕΛΕΣΜΑΤΑ — {shop_names.get(shop_id, f'Shop#{shop_id}')} (id={shop_id})") print(f" {'─'*70}") print(f" ✅ Mapped αυτόματα : {s_mapped:>6} ({s_mapped/s_grand*100:5.1f}%)") print(f" 🏪 Shop map verified : {s_shop_map:>6} ({s_shop_map/s_grand*100:5.1f}%)") print(f" 🔶 Για Review : {s_review:>6} ({s_review/s_grand*100:5.1f}%)") print(f" 🔲 Quarantine : {s_quarant:>6} ({s_quarant/s_grand*100:5.1f}%)") print(f" {'─'*70}") print(f" 📦 Σύνολο : {total_processed:>6}") print(f" {'═'*70}\n") log.info(f"🏪 ── SHOP {shop_id} DONE — {total_processed} products ─────────────") return total_processed # ============================================================ # MAIN # ============================================================ def main(): import json stats = defaultdict(int) log.info("🧠 Loading SentenceTransformer...") model = SentenceTransformer("intfloat/multilingual-e5-large") load_cross_encoder() log.info("🔤 Loading spaCy Greek model...") nlp_el = spacy.load("el_core_news_sm") log.info("📦 Loading FAISS indexes...") faiss_indexes = {} for key, _ in INDEXES: faiss_indexes[key] = { "index": faiss.read_index(f"{CACHE_DIR}/{key}.index"), "ids": np.load(f"{CACHE_DIR}/{key}_ids.npy"), } log.info("🔌 Connecting to Elasticsearch...") es = Elasticsearch(**ES_CONFIG) log.info(f"✅ ES connected: {es.info()['version']['number']}") db = mysql.connector.connect(**DB_CONFIG) cur = db.cursor(dictionary=True) preload_categories(db) build_category_faiss(model) run_shop_cat_learn(db) load_shop_category_mappings(db) clear_progress() log.info("🔄 Ξεκινάει πάντα από την αρχή") tz = pytz.timezone("Europe/Athens") ts = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S") csv_path = f"{CSV_DIR}/import_log_{ts}.csv" grand_total = 0 with open(csv_path, "w", newline="", encoding="utf-8") as csv_file: writer = csv.writer(csv_file) writer.writerow([ "datetime_gr","product_id","title", "strategy","noun","compound","confidence", "cid","cat_name","score","method", "dyn_threshold", ]) try: grand_total = process_shop( shop_id = 0, db = db, cur = cur, model = model, nlp_el = nlp_el, faiss_indexes = faiss_indexes, es = es, writer = writer, tz = tz, stats = stats, resume_offset = 0, ) log.info(f"🏁 Ολοκληρώθηκε. Σύνολο: {grand_total}") except KeyboardInterrupt: log.warning("⚠️ Interrupted by user.") db.commit() cur.close() db.close() return clear_progress() db.commit() cur.close() db.close() total = stats["total"] or 1 print("\n" + "="*65) print(f"📊 SUMMARY — {grand_total} products") print("="*65) for key, val in sorted(stats.items()): if val == 0: continue pct = f"({val*100//total}%)" if key not in ("total","errors") else "" print(f" {key:<42} {val:>6} {pct}") print("="*65) correct = (sum(v for k, v in stats.items() if k.startswith("faiss_")) + stats.get("feedback", 0) + stats.get("shop_map", 0) + stats.get("noun_cache", 0) + stats.get("fallback_double_check", 0)) print(f"\n {'🏆 Mapping rate':<42} {correct*100//total}%") print("="*65) print("✅ DONE") if __name__ == "__main__": import json main()