Commit 4aab4608 authored by ALVISET's avatar ALVISET
Browse files

fixed issues with conversion from API and started implementing provenance data from netCDF files

parent 9c2fd474
......@@ -14,7 +14,8 @@ import sys
import urllib.parse
import argparse
import inspect
from netCDF4 import Dataset
import os
def truncate(f, n):
"""Truncates/pads a float f to n decimal places without rounding"""
......@@ -68,16 +69,19 @@ def add_triples(graph, triple_list):
"""
for subject in triple_list.keys():
for triple in triple_list[subject]:
if isinstance(triple[1], rdflib.term.URIRef):
graph.add((subject, triple[0], triple[1]))
else:
if type(triple[1][0]) == "string":
triple[1][0] = urllib.parse.quote(triple[1][0])
if triple[1][0] != None:
graph.add((subject, triple[0], Literal(triple[1][0], datatype=triple[1][1])))
try:
if isinstance(triple[1], rdflib.term.URIRef) and triple[1] != None:
graph.add((subject, triple[0], triple[1]))
else:
graph.add((subject, triple[0], Literal(triple[1][0])))
list(graph[:])
if type(triple[1][0]) == "string":
triple[1][0] = urllib.parse.quote(triple[1][0])
if len(triple[1]) > 1 and triple[1][0] != None:
graph.add((subject, triple[0], Literal(triple[1][0], datatype=triple[1][1])))
else:
graph.add((subject, triple[0], Literal(triple[1][0])))
except:
print("Error while adding triple "+str(triple))
# list(graph[:])
def create_uri(namespace, *args):
"""
......@@ -87,14 +91,45 @@ def create_uri(namespace, *args):
"""
full_string = ""
for element in args:
if element == None: continue
full_string += urllib.parse.quote(element)
return namespace[full_string]
if full_string != "":
return namespace[full_string]
else:
return None
def calculate_age(timestamp):
fdate = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.000+0000")
timedelta = (datetime.now() - datetime.fromtimestamp(time.mktime(fdate.timetuple())))
age = None
if timedelta != None:
age = timedelta.days / 365.25
return decimal.Decimal(age).quantize(decimal.Decimal('0.01'), rounding=decimal.ROUND_DOWN)
def extract_netcdf_infos(wmo, cycle_infos, file_path, cycle_files=None):
try:
rootgrp = Dataset(str(file_path), "r")
date = numpy_chararray_to_string(rootgrp.variables["DATE_CREATION"][:])
print()
except FileNotFoundError:
return
def map_datamode_to_uri(files, wmo, nb):
result = {}
for file in files:
result[file[0]] = ARGO["file" + file[0] + wmo + nb]
return result
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("destination", type=str, help="Absolute path where the converted files will be written")
parser.add_argument("netCDF", type=str, help="Absolute path of the dac folder location")
parser.add_argument("--dac", type=str, nargs="*", help="Convert only floats metadata from the specified dacs")
parser.add_argument("--limit", "-l", type=int, help="Limit the number of floats converted per DAC")
parser.add_argument("--local", "-lo", type=str,
help="Fetch WMO codes of floats to convert from a local copy of the dac folder")
args = parser.parse_args()
if not str.endswith(args.destination, "/"):
......@@ -104,6 +139,7 @@ if __name__ == "__main__":
ARGO = Namespace("http://www.argodatamgt.org/argo-ontology#")
NERC = Namespace("http://vocab.nerc.ac.uk/collection/")
GEO = Namespace("https://www.w3.org/2003/01/geo/wgs84_pos#")
PROV = Namespace("https://www.w3.org/TR/prov-o/")
prefixes = (
("argo", ARGO),
("nerc", NERC),
......@@ -113,13 +149,14 @@ if __name__ == "__main__":
("sosa", SOSA),
("ssn", SSN),
("geo", GEO),
("prov", PROV)
)
argo_floats = {}
# DCAT description
# argo_catalog = ARGO["catalog"+BNode()]
# argo_catalog = ARGO["catalog"+BNode()]
# g.add((argo_catalog, RDF.type, DCAT.Catalog))
# g.add((argo_catalog, DCTERMS.title, Literal("Argo program catalog")))
# g.add((argo_catalog, DCTERMS.description, Literal("""
......@@ -142,7 +179,11 @@ if __name__ == "__main__":
dac_path = "/ifremer/argo/dac/"
ftp_access.cwd(dac_path)
for dac in ftp_access.nlst():
dac_folder = ftp_access.nlst()
if args.local != None:
dac_folder = os.listdir(args.local)
for dac in dac_folder:
check = True
# print(args)
if args.dac != None:
......@@ -154,8 +195,13 @@ if __name__ == "__main__":
break
if not check:
continue
ftp_access.cwd("{}/{}/".format(dac_path, dac))
floats_folders = ftp_access.nlst()
floats_folders = []
if args.local != None:
floats_folders = os.listdir(args.local+"/"+dac)
else:
ftp_access.cwd("{}/{}/".format(dac_path, dac))
floats_folders = ftp_access.nlst()
floats_folders.reverse()
progress = tqdm(total=len(floats_folders), position=0, leave=True, file=sys.stdout)
part = 1
dac_uri = ARGO[dac]
......@@ -185,6 +231,7 @@ if __name__ == "__main__":
if float_info == None:
progress.write("Float {} metadata is nonexistent. Skipping...".format(afloat))
continue
pi_uri = create_uri(ARGO, float_info["deployment"]["principalInvestigatorName"])
float_triples = {
afloat_uri: [
......@@ -193,7 +240,7 @@ if __name__ == "__main__":
(ARGO.activity, activity_uri),
(ARGO.dac, dac_uri),
(ARGO.wmoCode, (afloat, XSD.string)),
(ARGO.owner, (create_uri(ARGO, float_info["owner"]), XSD.string)),
(ARGO.owner, Literal(float_info["owner"]), None),
(ARGO.maker, create_uri(NERC, "R24/current/", float_info["maker"])),
(ARGO.type, create_uri(NERC, "R23/current/", float_info["platform"]["type"])),
(ARGO.transmissionSystem, create_uri(NERC, "R10/current/", float_info["transmissionSystem"]))
......@@ -204,36 +251,19 @@ if __name__ == "__main__":
(GEO.latitude, (float_info["deployment"]["lat"], XSD.float)),
(GEO.longitude, (float_info["deployment"]["lon"], XSD.float)),
(ARGO.launchDate, (float_info["deployment"]["launchDate"], XSD.dateTime)),
(ARGO.deployedByShip, (float_info["deployment"]["platform"], XSD.string))
(ARGO.deployedByShip, (float_info["deployment"]["platform"], XSD.string)),
(ARGO.principalInvestigator, pi_uri)
],
pi_uri: [
(RDF.type, FOAF.Person),
(FOAF.name, (float_info["deployment"]["principalInvestigatorName"], XSD.string))
],
dac_uri: [],
}
# g.add((afloat_uri, RDF.type, ARGO.ArgoFloat))
# g.add((afloat_uri, SSN.inDeployment, deployment_uri))
# g.add((afloat_uri, ARGO.activity, activity_uri))
# g.add((afloat_uri, ARGO.dac, dac_uri))
# g.add((afloat_uri, ARGO.wmoCode, Literal(afloat, datatype=XSD.string)))
# owner = float_info["owner"]
# if owner != None:
# g.add((afloat_uri, ARGO.owner, Literal(ARGO[urllib.parse.quote(owner)], datatype=XSD.string)))
# maker = float_info["maker"]
# if maker != None:
# g.add((afloat_uri, ARGO.maker,
# NERC["R24/current/" + urllib.parse.quote(maker)]))
# platform_type = float_info["platform"]["type"]
# if platform_type != None:
# g.add((afloat_uri, ARGO.type,
# NERC["R23/current/" + urllib.parse.quote(platform_type)]))
# transmission = float_info["transmissionSystem"]
# if transmission != None:
# g.add((afloat_uri, ARGO.transmissionSystem,
# NERC["R10/current/" + urllib.parse.quote(transmission)]))
# Sensor node
for sensor in float_info["sensors"]:
sensor_uri = ARGO["sensor" + BNode()]
sensor_uri = create_uri(ARGO, "sensor", sensor["serial"])
float_triples[sensor_uri] = [
(RDF.type, SOSA.Sensor),
(ARGO.type, create_uri(NERC, "R25/current/", sensor["id"], "/")),
......@@ -242,63 +272,17 @@ if __name__ == "__main__":
]
float_triples[afloat_uri].append((SOSA.hosts, sensor_uri))
# g.add((sensor_uri, RDF.type, SOSA.Sensor))
# g.add((afloat_uri, SOSA.hosts, sensor_uri))
# if sensor["id"] != None:
# g.add((sensor_uri, ARGO.type, NERC["R25/current/" + urllib.parse.quote(sensor["id"]) + "/"]))
# if sensor["maker"] != None:
# g.add((sensor_uri, ARGO.maker, NERC["R26/current/" + urllib.parse.quote(sensor["maker"]) + "/"]))
# if sensor["model"] != None:
# g.add((sensor_uri, ARGO.model, NERC["R27/current/" + urllib.parse.quote(sensor["model"]) + "/"]))
for calib in float_info["calibrations"]:
if calib["dimLevel"] == sensor["dimLevel"]:
float_triples[sensor_uri].append(
(SOSA.observes, create_uri(NERC, "R03/current/", calib["parameter"]))
)
# g.add((sensor_uri, SOSA.observes,
# NERC["R03/current/" + urllib.parse.quote(calib["parameter"])]))
# Deployment node
# g.add((deployment_uri, RDF.type, SSN.Deployment))
pi_name = float_info["deployment"]["principalInvestigatorName"]
if pi_name != None:
pi_uri = create_uri(ARGO, pi_name)
# g.add((pi_uri, RDF.type, FOAF.Person))
# g.add((pi_uri, FOAF.name, Literal(pi_name)))
# g.add((deployment_uri, ARGO.principalInvestigator, pi_uri))
float_triples[deployment_uri].append(
(ARGO.principalInvestigator, pi_uri)
)
float_triples[pi_uri] = [
(RDF.type, FOAF.Person),
(FOAF.name, (pi_name, XSD.string))
]
cruise_id = float_info["deployment"]["cruiseName"]
# if cruise_id != None:
# g.add((deployment_uri, ARGO.cruise, Literal(cruise_id)))
# g.add((deployment_uri, GEO.latitude,
# Literal(float_info["deployment"]["lat"], datatype=XSD.float)))
# g.add((deployment_uri, GEO.longitude,
# Literal(float_info["deployment"]["lon"], datatype=XSD.float)))
# formatted_datetime = float_info["deployment"]["launchDate"]
# g.add((deployment_uri, ARGO.launchDate,
# Literal(formatted_datetime, datatype=XSD.dateTime)))
# ship = float_info["deployment"]["platform"]
# if ship != None:
# g.add((deployment_uri, ARGO.deployedByShip, Literal(ship, datatype=XSD.string)))
# Activity node
# g.add((activity_uri, RDF.type, ARGO.ActivityData))
float_triples[activity_uri] = [(RDF.type, ARGO.ActivityData)]
try:
fdate = datetime.strptime(float_info["earliestCycle"]["firstStation"]["date"], "%Y-%m-%dT%H:%M:%S.000+0000")
timedelta = (datetime.now() - datetime.fromtimestamp(time.mktime(fdate.timetuple())))
if timedelta != None:
age = timedelta.days / 365.25
age = decimal.Decimal(age).quantize(decimal.Decimal('0.01'), rounding=decimal.ROUND_DOWN)
age = calculate_age(float_info["earliestCycle"]["firstStation"]["date"])
float_triples[activity_uri].append((ARGO.age, (age, XSD.float)))
# g.add((activity_uri, ARGO.age, Literal(age, datatype=XSD.float)))
except TypeError:
progress.write("No cycles found for float {}".format(afloat))
ftp_access.cwd("{}/{}/{}/".format(dac_path, dac, afloat))
......@@ -310,8 +294,6 @@ if __name__ == "__main__":
(ARGO.trajectoryData,
("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, traj), None))
)
# g.add((activity_uri, ARGO.trajectoryData,
# Literal("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, traj))))
regex = re.compile(str(afloat) + ".*" + "prof.nc")
station_files = list(filter(regex.match, base_files))
for prof in station_files:
......@@ -319,8 +301,6 @@ if __name__ == "__main__":
(ARGO.stationData,
("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, prof), None))
)
# g.add((activity_uri, ARGO.stationData,
# Literal("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, prof))))
# Cycle nodes
cycleIds = {}
......@@ -340,14 +320,17 @@ if __name__ == "__main__":
nb = int(cycle["id"])
file_nb = (3 - len(str(nb))) * "0" + str(nb)
profiles[nb] = []
cycle_uri = ARGO["cycle" + BNode()]
cycle_uri = ARGO["cycle" + afloat + file_nb]
float_triples[afloat_uri].append((ARGO.cycle, cycle_uri))
# g.add((afloat_uri, ARGO.cycle, cycle_uri))
float_triples[cycle_uri] = []
float_triples[cycle_uri] = [
(RDF.type, ARGO.Cycle),
(RDF.type, PROV.Activity)
]
regex = re.compile(".*"+str(afloat)+"_"+str(file_nb)+".nc")
this_cycle_files = list(filter(regex.match, cycle_files))
if len(this_cycle_files) == 0:
# print("Float {}, cycle {} does not have cycle files ?".format(afloat, file_nb))
continue
......@@ -359,41 +342,48 @@ if __name__ == "__main__":
# for basin in cycleInfo["basins"]:
# g.add((cycle_uri, ARGO.basin, Literal(basin)))
file_uri = ARGO["file" + BNode()]
float_triples[file_uri] = [(RDF.type, ARGO.File)]
# g.add((file_uri, RDF.type, ARGO.File))
if nb == float_info["latestCycle"]["id"]:
float_triples[activity_uri].append((ARGO.lastCycle, cycle_uri))
# g.add((activity_uri, ARGO.lastCycle, cycle_uri))
netcdf_infos = []
file_nodes = map_datamode_to_uri(this_cycle_files, afloat, file_nb)
for cycle_file in this_cycle_files:
# file_uri = ARGO["file" + BNode()]
if len(file_nodes) == 0: break
file_uri = file_nodes[cycle_file[0]]
file_nodes[cycle_file] = file_uri
float_triples[file_uri] = [
(RDF.type, ARGO.File),
(RDF.type, PROV.Entity),
(RDF.type, DCTERMS.Dataset)
]
float_triples[file_uri].append((ARGO.accessURL,
("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/profiles/{}".format(dac, afloat, cycle_file), None)))
# g.add((file_uri, ARGO.accessURL, Literal(
# "ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/profiles/{}".format(dac, afloat, cycle_file))))
if cycle_file.startswith("R"):
float_triples[cycle_uri].append((ARGO.coreArgoProfile, file_uri))
# g.add((cycle_uri, ARGO.coreArgoProfile, file_uri))
# float_triples[file_uri].append((PROV.wasGeneratedBy, cycle_uri))
elif cycle_file.startswith("D"):
float_triples[cycle_uri].append((ARGO.coreArgoProfile, file_uri))
# g.add((cycle_uri, ARGO.coreArgoProfile, file_uri))
elif cycle_file.startswith("B"):
float_triples[cycle_uri].append((ARGO.bcgArgoProfile, file_uri))
# g.add((cycle_uri, ARGO.bcgArgoProfile, file_uri))
elif cycle_file.startswith("S"):
float_triples[cycle_uri].append((ARGO.syntheticArgoProfile, file_uri))
# g.add((cycle_uri, ARGO.syntheticArgoProfile, file_uri))
float_triples[dac_uri].append((ARGO.hostsFile, file_uri))
# if not cycle_file.startswith("R"):
# if file_nodes["R"] != None:
# float_triples[file_uri].append((PROV.wasRevisionOf, file_nodes["R"]))
float_triples[file_uri].append((ARGO.dataMode, [Literal(cycle_file[0])]))
# cycle_nc = extract_netcdf_infos(afloat, cycle, "{}{}/{}/profiles/{}{}_{}.nc".format(args.netCDF, dac, afloat, cycle_file[0], afloat, file_nb), this_cycle_files)
# netcdf_infos.append(cycle_nc)
float_triples[dac_uri].append((ARGO.hostsFile, file_uri))
float_triples[cycle_uri].append((ARGO["number"], (nb, XSD.int)))
float_triples[cycle_uri].append((ARGO.startDate, (cycle["startDate"], None)))
float_triples[cycle_uri].append((GEO.latitude, (cycle["lat"], None)))
float_triples[cycle_uri].append((GEO.longitude, (cycle["lon"], None)))
# g.add((dac_uri, ARGO.hostsFile, file_uri))
# g.add((cycle_uri, ARGO["number"], Literal(nb, datatype=XSD.int)))
# g.add((cycle_uri, ARGO.startDate, Literal(cycle["startDate"])))
# g.add((cycle_uri, GEO.latitude, Literal(cycle["lat"])))
# g.add((cycle_uri, GEO.longitude, Literal(cycle["lon"])))
add_triples(g, float_triples)
a += 1
progress.update(1)
......@@ -410,20 +400,6 @@ if __name__ == "__main__":
with open("{}argo_floats_generated_{}_{}.ttl".format(args.destination, dac, part), "w") as ttl:
ttl.write("@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .\n")
ttl.write(g.serialize(format="turtle").decode("utf-8"))
# with open("{}argo_floats_generated_{}_{}_2.ttl".format(args.destination, dac, part), "w") as ttl:
# ttl.write("@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .\n")
# ttl.write("@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .\n")
# ttl.write(g2.serialize(format="turtle").decode("utf-8"))
ftp_access.close()
# with open(args.ttlFile, "a") as ttl:
# for cycle in range(1, len(metaData)):
# print("Writing...")
# ttl.write("argo:cycleexample" + str(cycle) + " a argo:Cycle;\n")
# # ttl.write("\targo:number \"" + str(cycle) + "\"^^xsd:int;\n")
# # ttl.write("\targo:startCycleDate \"" + str(cycle) + "\"^^xsd:int;\n")
# ttl.write("\targo:latitude \"" + str(metaData[cycle][0]) + "\"^^xsd:float;\n")
# ttl.write("\targo:longitude \"" + str(metaData[cycle][1]) + "\"^^xsd:float.\n\n")
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN, RDF
from rdflib.namespace import *
import rdflib.term
class GraphWrapper():
"""This class is designed to serve as an intermediate to insert new triples in a graph"""
def __init__(self, prefixes):
self.prefixes = prefixes
self.graph = Graph()
self.__bind_graph__()
def __bind_graph__(self):
for prefix in self.prefixes:
self.graph.bind(prefix[0], prefix[1])
def add_triple(self, subject, predicate, object):
pass
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment