Source code for src.knowledge_graph.virtuoso_store

import logging

from SPARQLWrapper import SPARQLWrapper, JSON, DIGEST, POST
from pandas import json_normalize

LOGGER = logging.getLogger('virtuoso-store')


[docs]class VirtuosoStore(object): """ Implementation to store and query rdf-triples in a Virtuoso instance """ def __init__(self, conn_url: str = 'http://localhost:8890/sparql-auth', graph: str = 'http://localhost:8890/DAV/', username: str = 'dba', password: str = 'dba'): """ Args: conn_url (str): The SPARQL-endpoint with authentication of the Virtuoso instance. graph (str): The Virtuoso graph in which the information should be saved. username (str): Username for the authentication. password (str): Password for the authentication. """ self.graph = graph self.endpoint = SPARQLWrapper(conn_url) self.endpoint.setHTTPAuth(DIGEST) self.endpoint.setCredentials(username, password) self.endpoint.setReturnFormat(JSON) self.insert_query = ''
[docs] def insert(self, triple: tuple): """ Add a triple to the knowledge graph Args: triple (tuple): The triple to save. """ self.insert_query += f' <{str(triple[0])}> <{str(triple[1])}>' if type(triple[2]).__name__ == 'Literal': self.insert_query += f' "{str(triple[2])}" .' else: self.insert_query += f' <{str(triple[2])}> .'
[docs] def commit(self): """ Make the changes persistent and available """ self.endpoint.setMethod(POST) query = ('INSERT DATA {' f'GRAPH <{self.graph}>' ' {' f'{self.insert_query}' '}' '}') self.query(query)
[docs] def query(self, query: str) -> list: """ Execute a SPARQL query with a Virtuoso endpoint Args: query: (str): A valid SPARQL query. Returns: results (list): Returns a list of lists with the queried properties. Format: [[<property1>, <property2>, ...], ...] """ self.endpoint.setQuery(query) query_results = json_normalize(self.endpoint.query().convert()['results']['bindings']) value_columns = [col for col in query_results if col.endswith('.value')] results = [] for index, row in query_results.iterrows(): results.append([row[col] for col in value_columns]) return results
[docs] def exists(self, youtube_id) -> bool: """ Checks if a video already exists in the graph Args: youtube_id (str): The id of the video to check. """ query = ('SELECT count(?video)' 'WHERE {' '?video a mpeg7:Video ;' f'dc:identifier "http://www.youtube.com/watch?v={youtube_id}" .' '}') return True if int(self.query(query)[0][0]) > 0 else False