import numpy as np import requests from sklearn.cluster import DBSCAN from langchain_huggingface import HuggingFaceEmbeddings class EntityResolver: def __init__(self, neo4j_driver, model_name="all-MiniLM-L6-v2", similarity_threshold=0.85): print("🧩 Inizializzazione Entity Resolver Ibrido (Vector Search + Wikidata EL)...") # Uso un modello di embedding ultra-leggero per la risoluzione. Non serve la semantica # profonda di un LLM qui, mi basta beccare le stringhe molto simili. self.embedding_model = HuggingFaceEmbeddings(model_name=model_name) # DBSCAN ragiona in termini di distanza (eps), quindi la deduco dalla soglia di similarità (1 - score) self.eps = 1 - similarity_threshold self.similarity_threshold = similarity_threshold self.driver = neo4j_driver def _find_canonical_in_db(self, embedding_vector): """ Interroga l'indice vettoriale nativo di Neo4j. Se il nodo esiste già nel grafo globale con un nome leggermente diverso ma semanticamente quasi identico, ce lo facciamo restituire per evitare sdoppiamenti. """ if not self.driver: return None query = """ CALL db.index.vector.queryNodes('entity_embeddings', 1, $embedding) YIELD node, score WHERE score >= $threshold RETURN node.label AS canonical_label, score """ with self.driver.session() as session: result = session.run(query, embedding=embedding_vector, threshold=self.similarity_threshold) record = result.single() if record: return record["canonical_label"] return None def _link_to_wikidata(self, entity_name): """ Chiamata REST a Wikidata (Entity Linking). Ci serve per ancorare i nodi del nostro grafo a concetti universali (es. wd:Q12345). Cruciale per il layer di GraphRAG futuro. """ url = "https://www.wikidata.org/w/api.php" params = { "action": "wbsearchentities", "search": entity_name, "language": "it", "format": "json", "limit": 1 # Ci serve solo il top-match per fare riconciliazione a tappeto, niente paginazione. } try: # Metto un timeout super restrittivo (3s). Se Wikidata è congestionato, # preferisco fallire silenziosamente il linking piuttosto che bloccare tutta l'ingestion della pipeline. response = requests.get(url, params=params, timeout=3.0) if response.status_code == 200: data = response.json() if data.get("search"): best_match = data["search"][0] return f"wd:{best_match['id']}" else: print(f" [DEBUG] Wikidata non ha trovato corrispondenze per: '{entity_name}'") except Exception as e: print(f" ⚠️ Errore lookup Wikidata per '{entity_name}' (ignorato): {e}") return None def resolve_entities(self, extracted_entities, triples): if not triples and not extracted_entities: return [], [], [] # 1. Raccoglitore: Metto a fattor comune tutte le entità del chunk di testo appena processato chunk_entities = set(extracted_entities) for t in triples: chunk_entities.add(t.subject) chunk_entities.add(t.object) unique_chunk_entities = list(chunk_entities) if not unique_chunk_entities: return [], triples, [] # Embedding massivo di tutte le entità isolate in questo chunk embeddings = self.embedding_model.embed_documents(unique_chunk_entities) # 2. DEDUPLICA LOCALE IN RAM (DBSCAN) # Se nel testo l'LLM ha estratto sia "Canova" che "Antonio Canova", # li collassiamo in un solo cluster prima ancora di toccare il database. clustering = DBSCAN(eps=self.eps, min_samples=1, metric='cosine').fit(np.array(embeddings)) local_cluster_map = {} for entity, emb, label in zip(unique_chunk_entities, embeddings, clustering.labels_): if label not in local_cluster_map: local_cluster_map[label] = [] local_cluster_map[label].append({"name": entity, "embedding": emb}) entity_replacement_map = {} entities_to_save = [] # Struttura per il loader Neo4j: {label, embedding, wikidata_sameAs} # 3. RISOLUZIONE GLOBALE & ENTITY LINKING for label, items in local_cluster_map.items(): # Tra le varianti locali, eleggo come canonica provvisoria la stringa più lunga (es. "Tempio di Giove" batte "Tempio") local_canonical_item = sorted(items, key=lambda x: len(x["name"]), reverse=True)[0] local_canonical_name = local_canonical_item["name"] local_canonical_emb = local_canonical_item["embedding"] # Guardo se il database conosce già qualcosa di molto simile db_canonical_name = self._find_canonical_in_db(local_canonical_emb) if db_canonical_name: # Caso A: Entità già nota. Faccio override col nome che Neo4j conosce già per evitare biforcazioni. final_canonical = db_canonical_name print(f" 🔗 Match Globale: '{local_canonical_name}' -> '{db_canonical_name}' (Neo4j)") else: # Caso B: Entità inedita. Provo a darle una "carta d'identità" agganciandola a Wikidata. final_canonical = local_canonical_name wikidata_uri = self._link_to_wikidata(final_canonical) entity_dict = { "label": final_canonical, "embedding": local_canonical_emb, "wikidata_sameAs": wikidata_uri } if wikidata_uri: print(f" ✨ Nuova Entità: '{final_canonical}' 🌍 Linked to: {wikidata_uri}") else: print(f" ✨ Nuova Entità: '{final_canonical}' (No Wiki link)") entities_to_save.append(entity_dict) # Costruisco la mappa di traduzione per tutte le varianti sporche di questo cluster for item in items: entity_replacement_map[item["name"]] = final_canonical # 4. RISCRITTURA FINALE (Output pulito) # Sostituisco i nomi vecchi/sporchi con il canonico definitivo prima di passare il blocco al validatore SHACL resolved_triples = [] for t in triples: t.subject = entity_replacement_map.get(t.subject, t.subject) t.object = entity_replacement_map.get(t.object, t.object) resolved_triples.append(t) resolved_entities = list(set([entity_replacement_map.get(e, e) for e in extracted_entities])) return resolved_entities, resolved_triples, entities_to_save