| import os |
| from rdflib import Graph, Literal, RDF, URIRef, Namespace |
| from rdflib.namespace import SKOS, XSD |
| from pyshacl import validate |
|
|
| class SemanticValidator: |
| def __init__(self): |
| |
| |
| self.shapes_file = os.path.join(os.path.dirname(__file__), "shapes/schema_constraints.ttl") |
| |
| |
| |
| |
| self.namespaces = { |
| "arco": Namespace("https://w3id.org/arco/ontology/arco/"), |
| "core": Namespace("https://w3id.org/arco/ontology/core/"), |
| "a-loc": Namespace("https://w3id.org/arco/ontology/location/"), |
| "cis": Namespace("http://dati.beniculturali.it/cis/"), |
| "ex": Namespace("http://activadigital.it/ontology/") |
| } |
| |
| if os.path.exists(self.shapes_file): |
| self.shacl_graph = Graph() |
| self.shacl_graph.parse(self.shapes_file, format="turtle") |
| print("🛡️ SHACL Constraints caricati.") |
| else: |
| print("⚠️ File SHACL non trovato. Validazione disabilitata (pericoloso in prod!).") |
| self.shacl_graph = None |
|
|
| def _get_uri(self, text_val): |
| |
| |
| if ":" in text_val and not text_val.startswith("http"): |
| prefix, name = text_val.split(":", 1) |
| if prefix in self.namespaces: |
| return self.namespaces[prefix][name] |
| |
| |
| |
| clean_name = text_val.replace(" ", "_").replace("'", "").replace('"', "") |
| return self.namespaces["ex"][clean_name] |
|
|
| def _json_to_rdf(self, entities, triples): |
| |
| |
| g = Graph() |
| |
| |
| for prefix, ns in self.namespaces.items(): |
| g.bind(prefix, ns) |
| g.bind("skos", SKOS) |
|
|
| |
| if entities: |
| for ent in entities: |
| |
| label = ent["label"] if isinstance(ent, dict) else str(ent) |
| ent_uri = self._get_uri(label) |
| g.add((ent_uri, SKOS.prefLabel, Literal(label, lang="it"))) |
|
|
| |
| if triples: |
| for t in triples: |
| subj_uri = self._get_uri(t.subject) |
| |
| |
| |
| g.add((subj_uri, SKOS.prefLabel, Literal(t.subject, lang="it"))) |
|
|
| |
| if t.predicate.lower() in ["rdf:type", "a", "type", "rdf_type"]: |
| obj_uri = self._get_uri(t.object) |
| g.add((subj_uri, RDF.type, obj_uri)) |
| else: |
| |
| pred_uri = self._get_uri(t.predicate) |
| obj_uri = self._get_uri(t.object) |
| |
| g.add((subj_uri, pred_uri, obj_uri)) |
| |
| g.add((obj_uri, SKOS.prefLabel, Literal(t.object, lang="it"))) |
| |
| return g |
|
|
| def validate_batch(self, entities, triples): |
| """ |
| Scatena il motore di regole SHACL sia sulle entità isolate che sulle triple. |
| Ritorna l'esito, il report testuale degli errori, e il grafo temporaneo. |
| """ |
| if not self.shacl_graph: |
| return True, "No Constraints", None |
|
|
| |
| data_graph = self._json_to_rdf(entities, triples) |
| |
| print("🔍 Esecuzione Validazione SHACL...") |
| |
| |
| |
| conforms, report_graph, report_text = validate( |
| data_graph, |
| shacl_graph=self.shacl_graph, |
| inference='rdfs', |
| serialize_report_graph=True |
| ) |
| |
| return conforms, report_text, data_graph |