Source code for src.knowledge_graph.graph

import logging
from datetime import timedelta
from rdflib import URIRef, Literal
from rdflib.namespace import DC, RDF, Namespace, FOAF, XSD
import pandas as pd
from src.knowledge_graph.memory_store import MemoryStore
from src.knowledge_graph.virtuoso_store import VirtuosoStore
from src.utils.utils import get_config
from src.data.knowledge_graphs import get_same_as_link, get_uri_from_label, get_uri_from_csv
import os

on_rtd = os.environ.get('READTHEDOCS') == 'True'

LOGGER = logging.getLogger('graph')

if on_rtd:
    CONFIG = get_config('../src/utils/config.yaml')
else:
    CONFIG = get_config('../src/utils/config.yaml')

HOME_URI = CONFIG['rdf']['uri']

MPEG7 = Namespace('http://purl.org/ontology/mpeg7/')
VIDEO = Namespace('http://purl.org/ontology/video/')
TEMPORAL = Namespace('http://swrl.stanford.edu/ontologies/builtins/3.3/temporal.owl')
DBO = Namespace('http://dbpedia.org/ontology/')
DBR = Namespace('http://dbpedia.org/resource/')


[docs]class Graph(object): """ Links new videos with their entities in a knowledge graph and allows to look up user queries. """ def __init__(self, storage_type: str = 'memory', memory_path: str = 'models/store', virtuoso_url: str = None, virtuoso_graph: str = None, virtuoso_username: str = None, virtuoso_password: str = None, dbpedia_csv: str = None, wikidata_csv: str = None): """ Args: storage_type (str): Whether to save links to a local rdf-file or a Virtuoso database. Should be 'memory' for a local file, 'virtuoso' for Virtuoso. memory_path (str): Path to which the links should be written. Only necessary if storage_type = memory. virtuoso_url (str): URL of the Virtuoso-SPARQL-instance. Only necessary if storage_type = virtuoso. virtuoso_graph (str): URL of the Virtuoso-Graph in which the links should be saved. Only necessary if storage_type = virtuoso. virtuoso_username (str): Username to access the Virtuoso instance. Only necessary if storage_type = virtuoso. virtuoso_password (str): Password to access the Virtuoso instance. Only necessary if storage_type = virtuoso. dbpedia_csv (str): Path of the normalized DBpedia-thumbnail-information. wikidata_csv (str): Path of the normalized Wikidata-thumbnail-information """ self.storage_type = storage_type if storage_type == 'memory': self.store = MemoryStore(memory_path) elif storage_type == 'virtuoso': self.store = VirtuosoStore(virtuoso_url, virtuoso_graph, virtuoso_username, virtuoso_password) else: raise Exception('Unknown storage type') self.entity_data = None if dbpedia_csv is not None and wikidata_csv is not None: self.entity_data = pd.concat([pd.read_csv(dbpedia_csv), pd.read_csv(wikidata_csv)]) elif wikidata_csv is not None: self.entity_data = pd.read_csv(wikidata_csv) elif dbpedia_csv is not None: self.entity_data = pd.read_csv(dbpedia_csv)
[docs] def insert_video(self, youtube_id: str, title: str): """ Creates the rdf triples for a new video. Args: youtube_id (str): Id of a youtube video to be linked. For example a4T5ylNQk6g for https://www.youtube.com/watch?v=a4T5ylNQk6g. title (str): The title of the video. """ video_uri = URIRef(f'{HOME_URI}{youtube_id}') self.store.insert((video_uri, RDF['type'], MPEG7['Video'])) self.store.insert((video_uri, DC['identifier'], Literal(f'http://www.youtube.com/watch?v={youtube_id}'))) self.store.insert((video_uri, DC['title'], Literal(title))) self.store.commit()
[docs] def insert_scene(self, entities: list, youtube_id: str, start_time: timedelta, end_time: timedelta): """ Creates the link between an entity and a video from youtube. Args: entities (list): Names of the occurring entities. youtube_id (str): Id of a youtube video to be linked. For example a4T5ylNQk6g for https://www.youtube.com/watch?v=a4T5ylNQk6g. start_time (timedelta): Start time of the scene in the respective video. end_time (timedelta): End time of the scene in the respective video. """ video_uri = URIRef(f'{HOME_URI}{youtube_id}') scene_uri = URIRef(f'{HOME_URI}{youtube_id}#t={str(start_time).split(".", 2)[0]},{str(end_time).split(".", 2)[0]}') self.store.insert((scene_uri, RDF['type'], VIDEO['Scene'])) self.store.insert((scene_uri, VIDEO['sceneFrom'], video_uri)) self.store.insert((scene_uri, VIDEO['temporalSegmentOf'], video_uri)) self.store.insert((scene_uri, TEMPORAL['hasStartTime'], Literal(str(start_time).split('.', 2)[0], datatype=XSD['dateTime']))) self.store.insert( (scene_uri, TEMPORAL['duration'], Literal(str(end_time - start_time).split('.', 2)[0], datatype=XSD['duration']))) self.store.insert((scene_uri, TEMPORAL['hasFinishTime'], Literal(str(end_time).split('.', 2)[0], datatype=XSD['dateTime']))) for entity in entities: if self.entity_data is None: dbpedia_uri, wikidata_uri = get_uri_from_label(entity) else: dbpedia_uri, wikidata_uri = get_uri_from_csv(entity, self.entity_data) if dbpedia_uri is not None: self.store.insert((scene_uri, FOAF['depicts'], dbpedia_uri)) elif wikidata_uri is not None: self.store.insert((scene_uri, FOAF['depicts'], wikidata_uri)) else: LOGGER.info(f'Failed to create link to {entity} for video {youtube_id}') self.store.commit()
[docs] def video_exists(self, youtube_id: str) -> bool: """ Returns whether a video is already in the graph or not Args: youtube_id (str): Id of the YouTube-Video. Returns: exists (bool): Whether it exists or not. """ return self.store.exists(youtube_id)
[docs] def get_scenes_from_video(self, identifier: str): """ Returns all scenes for a video Args: identifier (str): Identifier of the video on YouTube Returns: scenes (list): Returns a list of the scenes with a scene_uri, entity, start and end """ query = ('SELECT ?scene ?entity ?start ?end' ' WHERE {' ' ?scene a video:Scene ;' f' video:sceneFrom <{HOME_URI + identifier}>;' ' foaf:depicts ?entity;' ' temporal:hasStartTime ?start;' ' temporal:hasFinishTime ?end.' '}') print(query) return self.store.query(query)
[docs] def get_scenes_with_entity(self, identifier: str): """ Returns all scenes for an entity Args: identifier (str): Can be the name of the entity or a dbpedia/wikidata link. Returns: scenes (list): Returns a list of the videos in which a entity occurs. Format: [[<link>, <title>], ...] """ if identifier.startswith('http://www.wikidata'): identifier = get_same_as_link(identifier) elif not identifier.startswith('http://dbpedia'): if self.entity_data is None: uris = get_uri_from_label(identifier) else: uris = get_uri_from_csv(identifier, self.entity_data) identifier = uris[0] if uris[0] is not None else uris[1] if identifier is None: LOGGER.warning('Could not identify entity using the label') return None query = ('SELECT distinct ?title ?link ?dbpedia_entity ?start ?end' ' WHERE {' ' ?scene a video:Scene ;' f' foaf:depicts <{identifier}> ;' f' foaf:depicts ?dbpedia_entity ;' ' temporal:hasStartTime ?start ;' ' temporal:hasFinishTime ?end ;' ' video:sceneFrom ?video .' ' ?video a mpeg7:Video ;' ' dc:identifier ?link ;' ' dc:title ?title .' ' }') return self.store.query(query)
[docs] def get_videos_with_filters(self, query: str, filters: str): """ Returns videos for a user specific query. Example: select distinct ?title ?link ?dbpedia_entity where { ?scene a video:Scene; foaf:depicts ?dbpedia_entity; video:sceneFrom ?video. ?video dc:identifier ?link; dc:title ?title. service <http://dbpedia.org/sparql> { ?dbpedia_entity dbo:birthDate ?date; owl:sameAs ?wikidata_entity } service <https://query.wikidata.org/sparql> { ?wikidata_entity <http://www.wikidata.org/prop/direct/P21> ?sex . ?sex rdfs:label ?sex_label } filter (regex(str(?wikidata_entity), "www.wikidata.org") && (?sex_label = "male"@en) && ?date < "19700101"^^xsd:date) } Args: query (str): Further query details which are being inserted into the main query. filters (str): Allows the specification of filters to apply in the query. Returns: scenes (list): Returns a list of the scenes in which a entity occurs. Format: [[<title>, <link>, <dbpedia_entity>, <start>, <end>], ...] """ query = ( 'select distinct ?title ?link ?dbpedia_entity ?start ?end' ' where { ' ' ?scene a video:Scene; ' ' foaf:depicts ?dbpedia_entity;' ' temporal:hasStartTime ?start;' ' temporal:hasFinishTime ?end;' ' video:sceneFrom ?video. ' ' ?video dc:identifier ?link;' ' dc:title ?title.' f' {query}' f' {"filter ( "+filters+" )" if filters is not None else ""}' ' }' ) return self.store.query(query)