Commit 08024d73 authored by ALVISET's avatar ALVISET
Browse files

WIP mapping of sextant product

parent 7847ea27
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN, RDF
import urllib.parse
from rdflib.namespace import *
import rdflib.term
class GraphWrapper():
"""This class is designed to serve as an intermediate to insert new triples in a graph"""
def __init__(self, prefixes):
self.prefixes = prefixes
self.graph = Graph()
self.__bind_graph__()
def __bind_graph__(self):
for prefix in self.prefixes:
self.graph.bind(prefix[0], prefix[1])
@staticmethod
def create_uri(*args):
"""
@param namespace: rdflib.namespace
@param args: string()
@return: rdflib.term.URIRef
"""
full_string = ""
i = 0
for element in args:
if i == 0: i += 1; continue
if element == None or element == "": continue
full_string += urllib.parse.quote(element)
if full_string != "":
try:
return args[0][full_string]
except:
print("ERROR "+str(isinstance(args[0], rdflib.Namespace))+" "+str(args[0]))
else:
return None
def add_triple(self, subject, predicate, object):
"""
Wrapper to simplify triple addition to the graph without doing manual operations on the nodes every time
@param subject: rdflib.term.URIRef
@param predicate: rdflib.term.URIRef
@param object: rdflib.term.URIRef | rdflib.Literal | list | str
"""
if isinstance(object, rdflib.Literal) or isinstance(object, rdflib.term.URIRef):
self.graph.add((subject, predicate, object))
elif type(object) == list or type(object) == tuple:
self.graph.add((subject, predicate, GraphWrapper.create_uri(*object)))
elif type(object) == str:
self.graph.add((subject, predicate, Literal(object)))
def add_triples(self, dictionary):
"""
Method to add a dictionary if triples where the key is the subject containing a list of tuples where the
first element is the predicate and the second element is the object
@param dictionary: dict
"""
for key in dictionary.keys():
for definer in dictionary[key]:
self.add_triple(key, definer[0], definer[1])
def serialize(self, destination=None, format="xml", base=None, encoding=None, *args):
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN, RDF
import urllib.parse
from rdflib.namespace import *
import rdflib.term
class GraphWrapper():
"""This class is designed to serve as an intermediate to insert new triples in a graph"""
def __init__(self, prefixes):
self.prefixes = prefixes
self.graph = Graph()
self.__bind_graph__()
def __bind_graph__(self):
for prefix in self.prefixes:
self.graph.bind(prefix[0], prefix[1])
@staticmethod
def create_uri(*args):
"""
@param namespace: rdflib.namespace
@param args: string()
@return: rdflib.term.URIRef
"""
full_string = ""
i = 0
for element in args:
if i == 0: i += 1; continue
if element == None or element == "": continue
full_string += urllib.parse.quote(element)
if full_string != "":
try:
return args[0][full_string]
except:
print("ERROR "+str(isinstance(args[0], rdflib.Namespace))+" "+str(args[0]))
else:
return None
def add_triple(self, subject, predicate, object):
"""
Wrapper to simplify triple addition to the graph without doing manual operations on the nodes every time
@param subject: rdflib.term.URIRef
@param predicate: rdflib.term.URIRef
@param object: rdflib.term.URIRef | rdflib.Literal | list | str
"""
if isinstance(object, rdflib.Literal) or isinstance(object, rdflib.term.URIRef):
self.graph.add((subject, predicate, object))
elif type(object) == list or type(object) == tuple:
self.graph.add((subject, predicate, GraphWrapper.create_uri(*object)))
elif type(object) == str:
self.graph.add((subject, predicate, Literal(object)))
def add_triples(self, dictionary):
"""
Method to add a dictionary if triples where the key is the subject containing a list of tuples where the
first element is the predicate and the second element is the object
@param dictionary: dict
"""
for key in dictionary.keys():
for definer in dictionary[key]:
self.add_triple(key, definer[0], definer[1])
def serialize(self, destination=None, format="xml", base=None, encoding=None, *args):
return self.graph.serialize(destination, format, base, encoding, *args)
\ No newline at end of file
import argparse
import csv
import decimal
import inspect
import json
import os
import rdflib.term
import re
import requests
import sys
import time
import urllib.parse
from datetime import date, timedelta, datetime
from ftplib import FTP
from netCDF4 import Dataset
from netCDF4 import Dataset
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN, RDF, URIRef
from rdflib.namespace import *
from tqdm import tqdm
from helpers.graph_wrapper import GraphWrapper
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("destination", type=str, help="Absolute path where the converted files will be written")
parser.add_argument("odvtxt", type=str, help="Absolute path odv converted to text")
parser.add_argument("--limit", "-l", type=int, help="Limit the number of rows processed")
args = parser.parse_args()
if not str.endswith(args.destination, "/"):
args.destination += "/"
product = args.odvtxt
product = re.sub(".*metadata_from_", "", product)
product.replace(".txt", "")
identifiers = {
"SDC_BAL_DATA_TS_V2" : "6a881c9e-1d38-4edd-84d3-c6b212c27eb7"
}
SEXTANT = Namespace("https://sextant.ifremer.fr/Donnees/Catalogue#")
NERC = Namespace("http://vocab.nerc.ac.uk/collection/")
GEO = Namespace("https://www.w3.org/2003/01/geo/wgs84_pos#")
PROV = Namespace("https://www.w3.org/TR/prov-o/")
SOSA = Namespace("http://www.w3.org/ns/sosa/")
SSN = Namespace("http://www.w3.org/ns/ssn/")
prefixes = (
("sextant", SEXTANT),
("nerc", NERC),
("foaf", FOAF),
("dcat", DCAT),
("dct", DCTERMS),
("sosa", SOSA),
("ssn", SSN),
("geo", GEO),
("prov", PROV)
)
snt_graph = GraphWrapper()
product_uri = SEXTANT["/metadata/"+identifiers[product]]
snt_graph.add_triple(product_uri, RDF.type, DCAT.Dataset)
# snt_graph.add_triple()
with open(args.odvtxt, "r") as f:
reader = csv.reader(f, delimiter="\t")
fields = {}
fields_row = True
agents = {}
for row in reader:
# Bypass header
if row[0].startswith("//"):
continue
else:
# Fetch column labels and map them to index
if fields_row:
for i in range(0,len(fields)):
fields[row[i]] = i
fields_row = False
# Add triples for that row
else:
# Station
station = BNode(row[fields["Station name"]])
snt_graph.add_triple(station, DCTERMS.identifier, Literal(row[fields["Station ID"]]))
snt_graph.add_triple(station, DCTERMS.title, Literal(row[fields["Station name"]]))
snt_graph.add_triple(station, DCTERMS.date, Literal(row[fields["yyyy-mm-ddThh:mm:ss.sss"]]))
snt_graph.add_triple(station, DCTERMS.alternative, Literal(row[fields["Alternative station name"]]))
snt_graph.add_triple(station, GEO.latitude, Literal(row[fields["Latitude [degrees_north]"]]))
snt_graph.add_triple(station, GEO.longitude, Literal(row[fields["Longitude [degrees_east]"]]))
# CDI
cdi = URIRef("https://cdi.seadatanet.org/report/"+row[fields["CDI-record id"]])
snt_graph.add_triple(product_uri, DCTERMS.hasPart, cdi)
snt_graph.add_triple(station, DCTERMS.isReferencedBy, cdi)
snt_graph.add_triple(cdi, DCTERMS.identifier, Literal(row[fields["CDI-record id"]]))
snt_graph.add_triple(cdi, DCTERMS.created, Literal(row[fields["CDI-record creation date"]]))
# Agents
agent = BNode(row[fields["CDI-partner"]])
snt_graph.add_triple(agent, RDF.type, DCTERMS.Agent)
snt_graph.add_triple(agent, DCTERMS.title, row[fields["CDI-partner"]])
snt_graph.add_triple(cdi, DCTERMS.creator, agent)
# Cruise
cruise = BNode(row[fields["Cruise name"]])
snt_graph.add_triple(cruise, DCTERMS.title, Literal(row[fields["Cruise name"]]))
snt_graph.add_triple(cruise, DCTERMS.alternative, Literal(row[fields["Alternative cruise name"]]))
snt_graph.add_triple(cruise, RDF.type, PROV.Activity)
snt_graph.add_triple(cruise, PROV.startedAtTime, Literal(row[fields["Cruise start date"]]))
# Platform
platform = BNode(row[fields["Station name"]]+"Platform")
snt_graph.add_triple(platform, RDF.type, SOSA.Platform)
# Sensors
sensors = row[fields["P01 Codes in Originator File"]]
for unit in sensors.split("|"):
sensor = BNode(unit.strip())
snt_graph.add_triple(sensor, SOSA.observes, URIRef("http://vocab.nerc.ac.uk/collection/P01/current/"+unit.strip()))
# temp = {}
# for i in range(0,len(fields)):
# temp[fields[i]] = row[i]
# print(temp)
# prompt = input("")
# if prompt == "export":
# with open("exported_fields.txt", "w") as e:
# for field in row:
# e.write(field+"\n")
# exit()
# elif prompt == "fields":
# for field in row:
# fields.append(field)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment