| import numpy as np |
| import requests |
| from sklearn.cluster import DBSCAN |
| from langchain_huggingface import HuggingFaceEmbeddings |
|
|
| class EntityResolver: |
| def __init__(self, neo4j_driver, model_name="all-MiniLM-L6-v2", similarity_threshold=0.85): |
| print("🧩 Inizializzazione Entity Resolver Ibrido (Vector Search + Wikidata EL)...") |
| |
| |
| self.embedding_model = HuggingFaceEmbeddings(model_name=model_name) |
| |
| |
| self.eps = 1 - similarity_threshold |
| self.similarity_threshold = similarity_threshold |
| self.driver = neo4j_driver |
| |
| def _find_canonical_in_db(self, embedding_vector): |
| """ |
| Interroga l'indice vettoriale nativo di Neo4j. |
| Se il nodo esiste già nel grafo globale con un nome leggermente diverso ma |
| semanticamente quasi identico, ce lo facciamo restituire per evitare sdoppiamenti. |
| """ |
| if not self.driver: return None |
| |
| query = """ |
| CALL db.index.vector.queryNodes('entity_embeddings', 1, $embedding) |
| YIELD node, score |
| WHERE score >= $threshold |
| RETURN node.label AS canonical_label, score |
| """ |
| with self.driver.session() as session: |
| result = session.run(query, embedding=embedding_vector, threshold=self.similarity_threshold) |
| record = result.single() |
| if record: |
| return record["canonical_label"] |
| return None |
|
|
| def _link_to_wikidata(self, entity_name): |
| """ |
| Chiamata REST a Wikidata (Entity Linking). |
| Ci serve per ancorare i nodi del nostro grafo a concetti universali (es. wd:Q12345). |
| Cruciale per il layer di GraphRAG futuro. |
| """ |
| url = "https://www.wikidata.org/w/api.php" |
| params = { |
| "action": "wbsearchentities", |
| "search": entity_name, |
| "language": "it", |
| "format": "json", |
| "limit": 1 |
| } |
| try: |
| |
| |
| response = requests.get(url, params=params, timeout=3.0) |
| if response.status_code == 200: |
| data = response.json() |
| if data.get("search"): |
| best_match = data["search"][0] |
| return f"wd:{best_match['id']}" |
| else: |
| print(f" [DEBUG] Wikidata non ha trovato corrispondenze per: '{entity_name}'") |
| except Exception as e: |
| print(f" ⚠️ Errore lookup Wikidata per '{entity_name}' (ignorato): {e}") |
| return None |
|
|
| def resolve_entities(self, extracted_entities, triples): |
| if not triples and not extracted_entities: |
| return [], [], [] |
|
|
| |
| chunk_entities = set(extracted_entities) |
| for t in triples: |
| chunk_entities.add(t.subject) |
| chunk_entities.add(t.object) |
| unique_chunk_entities = list(chunk_entities) |
|
|
| if not unique_chunk_entities: |
| return [], triples, [] |
|
|
| |
| embeddings = self.embedding_model.embed_documents(unique_chunk_entities) |
| |
| |
| |
| |
| clustering = DBSCAN(eps=self.eps, min_samples=1, metric='cosine').fit(np.array(embeddings)) |
| |
| local_cluster_map = {} |
| for entity, emb, label in zip(unique_chunk_entities, embeddings, clustering.labels_): |
| if label not in local_cluster_map: |
| local_cluster_map[label] = [] |
| local_cluster_map[label].append({"name": entity, "embedding": emb}) |
|
|
| entity_replacement_map = {} |
| entities_to_save = [] |
|
|
| |
| for label, items in local_cluster_map.items(): |
| |
| local_canonical_item = sorted(items, key=lambda x: len(x["name"]), reverse=True)[0] |
| local_canonical_name = local_canonical_item["name"] |
| local_canonical_emb = local_canonical_item["embedding"] |
|
|
| |
| db_canonical_name = self._find_canonical_in_db(local_canonical_emb) |
|
|
| if db_canonical_name: |
| |
| final_canonical = db_canonical_name |
| print(f" 🔗 Match Globale: '{local_canonical_name}' -> '{db_canonical_name}' (Neo4j)") |
| else: |
| |
| final_canonical = local_canonical_name |
| wikidata_uri = self._link_to_wikidata(final_canonical) |
| |
| entity_dict = { |
| "label": final_canonical, |
| "embedding": local_canonical_emb, |
| "wikidata_sameAs": wikidata_uri |
| } |
| |
| if wikidata_uri: |
| print(f" ✨ Nuova Entità: '{final_canonical}' 🌍 Linked to: {wikidata_uri}") |
| else: |
| print(f" ✨ Nuova Entità: '{final_canonical}' (No Wiki link)") |
| |
| entities_to_save.append(entity_dict) |
|
|
| |
| for item in items: |
| entity_replacement_map[item["name"]] = final_canonical |
|
|
| |
| |
| resolved_triples = [] |
| for t in triples: |
| t.subject = entity_replacement_map.get(t.subject, t.subject) |
| t.object = entity_replacement_map.get(t.object, t.object) |
| resolved_triples.append(t) |
|
|
| resolved_entities = list(set([entity_replacement_map.get(e, e) for e in extracted_entities])) |
|
|
| return resolved_entities, resolved_triples, entities_to_save |