| import os |
| from collections import defaultdict |
| from neo4j import GraphDatabase |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| class KnowledgeGraphPersister: |
| def __init__(self): |
| |
| uri = os.getenv("NEO4J_URI") |
| user = os.getenv("NEO4J_USER") |
| password = os.getenv("NEO4J_PASSWORD") |
| |
| try: |
| self.driver = GraphDatabase.driver(uri, auth=(user, password)) |
| self.driver.verify_connectivity() |
| print(f"✅ Connesso a Neo4j ({uri}).") |
| |
| |
| |
| self._create_constraints() |
| |
| except Exception as e: |
| print(f"❌ Errore critico connessione Neo4j: {e}") |
| self.driver = None |
|
|
| def close(self): |
| |
| if self.driver: |
| self.driver.close() |
|
|
| def _create_constraints(self): |
| if not self.driver: return |
| |
| |
| |
| query = "CREATE CONSTRAINT resource_uri_unique IF NOT EXISTS FOR (n:Resource) REQUIRE n.uri IS UNIQUE" |
| |
| |
| query_vector = """ |
| CREATE VECTOR INDEX entity_embeddings IF NOT EXISTS |
| FOR (n:Resource) ON (n.embedding) |
| OPTIONS {indexConfig: { |
| `vector.dimensions`: 384, |
| `vector.similarity_function`: 'cosine' |
| }} |
| """ |
| with self.driver.session() as session: |
| try: |
| session.run(query) |
| print("⚡ Vincolo di unicità verificato.") |
| except Exception as e: |
| print(f"⚠️ Warning vincolo unicità: {e}") |
| |
| try: |
| session.run(query_vector) |
| print("⚡ Vector Index verificato.") |
| except Exception as e: |
| print(f"⚠️ Warning vector index: {e}") |
|
|
| def sanitize_name(self, name): |
| |
| if not name: return "Unknown" |
| return name.strip().replace(" ", "_").replace("'", "").replace('"', "") |
|
|
| def sanitize_predicate(self, pred): |
| |
| |
| |
| if not pred: return "RELATED_TO" |
| |
| pred = pred.replace(":", "_").replace("-", "_").replace(" ", "_") |
| clean = "".join(x for x in pred if x.isalnum() or x == "_") |
| |
| |
| return clean.upper() if clean else "RELATED_TO" |
|
|
| def save_triples(self, triples): |
| if not self.driver or not triples: |
| return |
|
|
| print(f"💾 Preparazione Batch di {len(triples)} triple...") |
|
|
| batched_by_pred = defaultdict(list) |
| |
| for t in triples: |
| safe_pred = self.sanitize_predicate(t.predicate) |
| |
| item = { |
| "subj_uri": self.sanitize_name(t.subject), |
| "subj_label": t.subject, |
| "subj_type": getattr(t, 'subject_type', '').replace(":", "_").replace("-", "_"), |
| "obj_uri": self.sanitize_name(t.object), |
| "obj_label": t.object, |
| "obj_type": getattr(t, 'object_type', '').replace(":", "_").replace("-", "_"), |
| "evidence": getattr(t, 'evidence', 'N/A'), |
| "reasoning": getattr(t, 'reasoning', 'N/A'), |
| "src": getattr(t, 'source', 'unknown') or 'unknown' |
| } |
| batched_by_pred[safe_pred].append(item) |
|
|
| with self.driver.session() as session: |
| for pred, data_list in batched_by_pred.items(): |
| try: |
| session.execute_write(self._unwind_write_tx, pred, data_list) |
| print(f" -> Inserite {len(data_list)} relazioni :{pred}") |
| except Exception as e: |
| print(f"⚠️ Errore batch per relazione :{pred} -> {e}") |
|
|
| print("✅ Salvataggio completato.") |
|
|
| def save_entities_and_triples(self, entities_to_save, triples): |
| if not self.driver: return |
|
|
| |
| |
| if entities_to_save: |
| print(f"💾 Salvataggio di {len(entities_to_save)} nodi singoli con vettori...") |
| |
| node_batch = [] |
| for item in entities_to_save: |
| item["uri"] = self.sanitize_name(item["label"]) |
| node_batch.append(item) |
| |
| with self.driver.session() as session: |
| session.execute_write(self._unwind_write_nodes, node_batch) |
|
|
| if triples: |
| self.save_triples(triples) |
|
|
| @staticmethod |
| def _unwind_write_nodes(tx, batch_data): |
| |
| |
| query = ( |
| "UNWIND $batch AS row " |
| "MERGE (n:Resource {uri: row.uri}) " |
| "ON CREATE SET n.label = row.label, " |
| " n.embedding = row.embedding, " |
| " n.wikidata_sameAs = row.wikidata_sameAs, " |
| " n.last_updated = datetime() " |
| ) |
| tx.run(query, batch=batch_data) |
|
|
| @staticmethod |
| def _unwind_write_tx(tx, predicate, batch_data): |
| |
| if predicate in ["RDF_TYPE", "TYPE", "A", "CORE_HASTYPE"]: |
| |
| |
| query = ( |
| "UNWIND $batch AS row " |
| "MERGE (s:Resource {uri: row.subj_uri}) " |
| "ON CREATE SET s.label = row.subj_label, s.last_updated = datetime() " |
| "WITH s, row " |
| "SET s:$( [replace(row.obj_label, ':', '_')] ) " |
| "RETURN count(node)" |
| ) |
| tx.run(query, batch=batch_data) |
| |
| else: |
| |
| |
| query = ( |
| f"UNWIND $batch AS row " |
| f"MERGE (s:Resource {{uri: row.subj_uri}}) " |
| f"ON CREATE SET s.label = row.subj_label " |
| f"MERGE (o:Resource {{uri: row.obj_uri}}) " |
| f"ON CREATE SET o.label = row.obj_label " |
| f"WITH s, o, row, " |
| f" CASE WHEN row.subj_type <> '' THEN [row.subj_type] ELSE [] END AS s_labels, " |
| f" CASE WHEN row.obj_type <> '' THEN [row.obj_type] ELSE [] END AS o_labels " |
| f"SET s:$(s_labels), o:$(o_labels) " |
| f"MERGE (s)-[r:`{predicate}`]->(o) " |
| f"SET r.evidence = row.evidence, " |
| f" r.reasoning = row.reasoning, " |
| f" r.source = row.src, " |
| f" r.last_updated = datetime()" |
| ) |
| tx.run(query, batch=batch_data) |