AutomatedSemanticDiscovery / src /graph /entity_resolver.py
GaetanoParente's picture
riviste le varie sezioni e i commenti
c1b1880
import numpy as np
import requests
from sklearn.cluster import DBSCAN
from langchain_huggingface import HuggingFaceEmbeddings
class EntityResolver:
def __init__(self, neo4j_driver, model_name="all-MiniLM-L6-v2", similarity_threshold=0.85):
print("🧩 Inizializzazione Entity Resolver Ibrido (Vector Search + Wikidata EL)...")
# Uso un modello di embedding ultra-leggero per la risoluzione. Non serve la semantica
# profonda di un LLM qui, mi basta beccare le stringhe molto simili.
self.embedding_model = HuggingFaceEmbeddings(model_name=model_name)
# DBSCAN ragiona in termini di distanza (eps), quindi la deduco dalla soglia di similarità (1 - score)
self.eps = 1 - similarity_threshold
self.similarity_threshold = similarity_threshold
self.driver = neo4j_driver
def _find_canonical_in_db(self, embedding_vector):
"""
Interroga l'indice vettoriale nativo di Neo4j.
Se il nodo esiste già nel grafo globale con un nome leggermente diverso ma
semanticamente quasi identico, ce lo facciamo restituire per evitare sdoppiamenti.
"""
if not self.driver: return None
query = """
CALL db.index.vector.queryNodes('entity_embeddings', 1, $embedding)
YIELD node, score
WHERE score >= $threshold
RETURN node.label AS canonical_label, score
"""
with self.driver.session() as session:
result = session.run(query, embedding=embedding_vector, threshold=self.similarity_threshold)
record = result.single()
if record:
return record["canonical_label"]
return None
def _link_to_wikidata(self, entity_name):
"""
Chiamata REST a Wikidata (Entity Linking).
Ci serve per ancorare i nodi del nostro grafo a concetti universali (es. wd:Q12345).
Cruciale per il layer di GraphRAG futuro.
"""
url = "https://www.wikidata.org/w/api.php"
params = {
"action": "wbsearchentities",
"search": entity_name,
"language": "it",
"format": "json",
"limit": 1 # Ci serve solo il top-match per fare riconciliazione a tappeto, niente paginazione.
}
try:
# Metto un timeout super restrittivo (3s). Se Wikidata è congestionato,
# preferisco fallire silenziosamente il linking piuttosto che bloccare tutta l'ingestion della pipeline.
response = requests.get(url, params=params, timeout=3.0)
if response.status_code == 200:
data = response.json()
if data.get("search"):
best_match = data["search"][0]
return f"wd:{best_match['id']}"
else:
print(f" [DEBUG] Wikidata non ha trovato corrispondenze per: '{entity_name}'")
except Exception as e:
print(f" ⚠️ Errore lookup Wikidata per '{entity_name}' (ignorato): {e}")
return None
def resolve_entities(self, extracted_entities, triples):
if not triples and not extracted_entities:
return [], [], []
# 1. Raccoglitore: Metto a fattor comune tutte le entità del chunk di testo appena processato
chunk_entities = set(extracted_entities)
for t in triples:
chunk_entities.add(t.subject)
chunk_entities.add(t.object)
unique_chunk_entities = list(chunk_entities)
if not unique_chunk_entities:
return [], triples, []
# Embedding massivo di tutte le entità isolate in questo chunk
embeddings = self.embedding_model.embed_documents(unique_chunk_entities)
# 2. DEDUPLICA LOCALE IN RAM (DBSCAN)
# Se nel testo l'LLM ha estratto sia "Canova" che "Antonio Canova",
# li collassiamo in un solo cluster prima ancora di toccare il database.
clustering = DBSCAN(eps=self.eps, min_samples=1, metric='cosine').fit(np.array(embeddings))
local_cluster_map = {}
for entity, emb, label in zip(unique_chunk_entities, embeddings, clustering.labels_):
if label not in local_cluster_map:
local_cluster_map[label] = []
local_cluster_map[label].append({"name": entity, "embedding": emb})
entity_replacement_map = {}
entities_to_save = [] # Struttura per il loader Neo4j: {label, embedding, wikidata_sameAs}
# 3. RISOLUZIONE GLOBALE & ENTITY LINKING
for label, items in local_cluster_map.items():
# Tra le varianti locali, eleggo come canonica provvisoria la stringa più lunga (es. "Tempio di Giove" batte "Tempio")
local_canonical_item = sorted(items, key=lambda x: len(x["name"]), reverse=True)[0]
local_canonical_name = local_canonical_item["name"]
local_canonical_emb = local_canonical_item["embedding"]
# Guardo se il database conosce già qualcosa di molto simile
db_canonical_name = self._find_canonical_in_db(local_canonical_emb)
if db_canonical_name:
# Caso A: Entità già nota. Faccio override col nome che Neo4j conosce già per evitare biforcazioni.
final_canonical = db_canonical_name
print(f" 🔗 Match Globale: '{local_canonical_name}' -> '{db_canonical_name}' (Neo4j)")
else:
# Caso B: Entità inedita. Provo a darle una "carta d'identità" agganciandola a Wikidata.
final_canonical = local_canonical_name
wikidata_uri = self._link_to_wikidata(final_canonical)
entity_dict = {
"label": final_canonical,
"embedding": local_canonical_emb,
"wikidata_sameAs": wikidata_uri
}
if wikidata_uri:
print(f" ✨ Nuova Entità: '{final_canonical}' 🌍 Linked to: {wikidata_uri}")
else:
print(f" ✨ Nuova Entità: '{final_canonical}' (No Wiki link)")
entities_to_save.append(entity_dict)
# Costruisco la mappa di traduzione per tutte le varianti sporche di questo cluster
for item in items:
entity_replacement_map[item["name"]] = final_canonical
# 4. RISCRITTURA FINALE (Output pulito)
# Sostituisco i nomi vecchi/sporchi con il canonico definitivo prima di passare il blocco al validatore SHACL
resolved_triples = []
for t in triples:
t.subject = entity_replacement_map.get(t.subject, t.subject)
t.object = entity_replacement_map.get(t.object, t.object)
resolved_triples.append(t)
resolved_entities = list(set([entity_replacement_map.get(e, e) for e in extracted_entities]))
return resolved_entities, resolved_triples, entities_to_save