Commit 77f97e5e authored by ALVISET's avatar ALVISET
Browse files

added graph wrapper

parent 2b6e0dd4
......@@ -128,4 +128,5 @@ dmypy.json
# Pyre type checker
.pyre/
.idea/
\ No newline at end of file
.idea/
venv-bash/
......@@ -16,6 +16,7 @@ from netCDF4 import Dataset
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN, RDF
from rdflib.namespace import *
from tqdm import tqdm
from graph_wrapper import GraphWrapper
def truncate(f, n):
......@@ -26,16 +27,13 @@ def truncate(f, n):
i, p, d = s.partition('.')
return '.'.join([i, (d + '0' * n)[:n]])
def numpy_chararray_to_string(char_array):
return char_array[:].tostring().strip().decode("UTF-8")
def format_netcdf_datetime_to_xsd(datetime):
return "".join([datetime[0:4], "-", datetime[4:6], "-", datetime[6:8],
"T", datetime[8:10], ":", datetime[10:12], ":", datetime[12:14]])
def append_cycle_date_triples(g, profgrp, cycle, prefix):
"""
@type profile: string
......@@ -82,7 +80,6 @@ def add_triples(graph, triple_list):
graph.add((subject, triple[0], Literal(triple[1][0])))
except:
print("Error while adding triple "+str(triple))
# list(graph[:])
def create_uri(namespace, *args):
"""
......@@ -116,12 +113,23 @@ def extract_netcdf_infos(wmo, cycle_infos, file_path, cycle_files=None):
except FileNotFoundError:
return
def map_datamode_to_uri(files, wmo, nb):
result = {}
for file in files:
result[file[0]] = ARGO["file" + file[0] + wmo + nb]
return result
def map_filetype_and_datamode(cycle_files, cycle_uri):
"""
@type wmo: integer
@type cycle_files: list
@type float_triples: list
@return: list
"""
result = {}
datamode = ""
name_length = len(cycle_files[0])
cycle_number = cycle_files[0][name_length-6:name_length-3]
for file in cycle_files:
if file.contains("R"):
datamode = "Realtime"
else:
datamode = "Delayed"
if __name__ == "__main__":
parser = argparse.ArgumentParser()
......@@ -207,7 +215,10 @@ if __name__ == "__main__":
part = 1
dac_uri = ARGO[dac]
g = create_new_argo_graph(dac, prefixes)
argoGraph = GraphWrapper(prefixes)
argoGraph.add_triple(ARGO[dac], RDF.type, ARGO.Datacenter)
argoGraph.add_triple(ARGO[dac], RDFS.label, Literal(dac, datatype=XSD.string))
# g = create_new_argo_graph(dac, prefixes)
# g2 = create_new_argo_graph(dac, prefixes)
print("DAC: {}".format(dac))
a = 1
......@@ -234,7 +245,8 @@ if __name__ == "__main__":
continue
pi_uri = create_uri(ARGO, float_info["deployment"]["principalInvestigatorName"])
float_triples = {
# float_triples = {
argoGraph.add_triples({
afloat_uri: [
(RDF.type, ARGO.ArgoFloat),
(SSN.inDeployment, deployment_uri),
......@@ -242,17 +254,17 @@ if __name__ == "__main__":
(ARGO.dac, dac_uri),
(ARGO.wmoCode, (afloat, XSD.string)),
(ARGO.owner, Literal(float_info["owner"]), None),
(ARGO.maker, create_uri(NERC, "R24/current/", float_info["maker"])),
(ARGO.type, create_uri(NERC, "R23/current/", float_info["platform"]["type"])),
(ARGO.transmissionSystem, create_uri(NERC, "R10/current/", float_info["transmissionSystem"]))
(ARGO.maker, (NERC, "R24/current/", float_info["maker"])),
(ARGO.type, (NERC, "R23/current/", float_info["platform"]["type"])),
(ARGO.transmissionSystem, (NERC, "R10/current/", float_info["transmissionSystem"]))
],
deployment_uri: [
(RDF.type, SSN.Deployment),
(ARGO.cruise, (float_info["deployment"]["cruiseName"], None)),
(GEO.latitude, (float_info["deployment"]["lat"], XSD.float)),
(GEO.longitude, (float_info["deployment"]["lon"], XSD.float)),
(ARGO.launchDate, (float_info["deployment"]["launchDate"], XSD.dateTime)),
(ARGO.deployedByShip, (float_info["deployment"]["platform"], XSD.string)),
(ARGO.cruise, Literal(float_info["deployment"]["cruiseName"])),
(GEO.latitude, Literal(float_info["deployment"]["lat"], datatype=XSD.float)),
(GEO.longitude, Literal(float_info["deployment"]["lon"], datatype=XSD.float)),
(ARGO.launchDate, Literal(float_info["deployment"]["launchDate"], datatype=XSD.dateTime)),
(ARGO.deployedByShip, Literal(float_info["deployment"]["platform"], datatype=XSD.string)),
(ARGO.principalInvestigator, pi_uri)
],
pi_uri: [
......@@ -260,30 +272,30 @@ if __name__ == "__main__":
(FOAF.name, (float_info["deployment"]["principalInvestigatorName"], XSD.string))
],
dac_uri: [],
}
})
# Sensor node
for sensor in float_info["sensors"]:
sensor_uri = create_uri(ARGO, "sensor", sensor["serial"])
float_triples[sensor_uri] = [
argoGraph.add_triples({
sensor_uri: [
(RDF.type, SOSA.Sensor),
(ARGO.type, create_uri(NERC, "R25/current/", sensor["id"], "/")),
(ARGO.maker, create_uri(NERC, "R26/current/", sensor["maker"], "/")),
(ARGO.model, create_uri(NERC, "R27/current/", sensor["model"], "/"))
]
float_triples[afloat_uri].append((SOSA.hosts, sensor_uri))
(ARGO.type, (NERC, "R25/current/", sensor["id"], "/")),
(ARGO.maker, (NERC, "R26/current/", sensor["maker"], "/")),
(ARGO.model, (NERC, "R27/current/", sensor["model"], "/"))
],
afloat_uri: [(SOSA.hosts, sensor_uri)]
})
for calib in float_info["calibrations"]:
if calib["dimLevel"] == sensor["dimLevel"]:
float_triples[sensor_uri].append(
(SOSA.observes, create_uri(NERC, "R03/current/", calib["parameter"]))
)
argoGraph.add_triple(sensor_uri, SOSA.observes, (NERC, "R03/current/", calib["parameter"]))
# Activity node
float_triples[activity_uri] = [(RDF.type, ARGO.ActivityData)]
argoGraph.add_triple(activity_uri, RDF.type, ARGO.ActivityData)
try:
age = calculate_age(float_info["earliestCycle"]["firstStation"]["date"])
float_triples[activity_uri].append((ARGO.age, (age, XSD.float)))
argoGraph.add_triple(activity_uri, ARGO.age, Literal(age, XSD.float))
except TypeError:
progress.write("No cycles found for float {}".format(afloat))
ftp_access.cwd("{}/{}/{}/".format(dac_path, dac, afloat))
......@@ -291,17 +303,15 @@ if __name__ == "__main__":
regex = re.compile(str(afloat) + ".*" + "traj.nc")
traj_files = list(filter(regex.match, base_files))
for traj in traj_files:
float_triples[activity_uri].append(
(ARGO.trajectoryData,
("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, traj), None))
)
argoGraph.add_triple(activity_uri,
ARGO.trajectoryData,
Literal("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, traj)))
regex = re.compile(str(afloat) + ".*" + "prof.nc")
station_files = list(filter(regex.match, base_files))
for prof in station_files:
float_triples[activity_uri].append(
(ARGO.stationData,
("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, prof), None))
)
argoGraph.add_triple(activity_uri,
ARGO.stationData,
Literal("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, prof)))
# Cycle nodes
cycleIds = {}
......@@ -322,16 +332,15 @@ if __name__ == "__main__":
file_nb = (3 - len(str(nb))) * "0" + str(nb)
profiles[nb] = []
cycle_uri = ARGO["cycle" + afloat + file_nb]
float_triples[afloat_uri].append((ARGO.cycle, cycle_uri))
float_triples[cycle_uri] = [
(RDF.type, ARGO.Cycle),
(RDF.type, PROV.Activity)
]
argoGraph.add_triple(afloat_uri, ARGO.cycle, cycle_uri)
argoGraph.add_triples({
cycle_uri :[
(RDF.type, ARGO.Cycle),
(RDF.type, PROV.Activity)
]})
regex = re.compile(".*"+str(afloat)+"_"+str(file_nb)+".nc")
this_cycle_files = list(filter(regex.match, cycle_files))
if len(this_cycle_files) == 0:
# print("Float {}, cycle {} does not have cycle files ?".format(afloat, file_nb))
continue
......@@ -346,10 +355,10 @@ if __name__ == "__main__":
if nb == float_info["latestCycle"]["id"]:
float_triples[activity_uri].append((ARGO.lastCycle, cycle_uri))
argoGraph.add_triple(activity_uri, ARGO.lastCycle, cycle_uri)
netcdf_infos = []
file_nodes = map_datamode_to_uri(this_cycle_files, afloat, file_nb)
argoGraph.add_triples(map_filetype_and_datamode(this_cycle_files, afloat))
for cycle_file in this_cycle_files:
# file_uri = ARGO["file" + BNode()]
if len(file_nodes) == 0: break
......
import argparse
import decimal
import inspect
import json
import os
import rdflib.term
import re
import requests
import sys
import time
import urllib.parse
from datetime import date, timedelta, datetime
from ftplib import FTP
from netCDF4 import Dataset
from netCDF4 import Dataset
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN, RDF
from rdflib.namespace import *
from tqdm import tqdm
def truncate(f, n):
"""Truncates/pads a float f to n decimal places without rounding"""
s = '{}'.format(f)
if 'e' in s or 'E' in s:
return '{0:.{1}f}'.format(f, n)
i, p, d = s.partition('.')
return '.'.join([i, (d + '0' * n)[:n]])
def numpy_chararray_to_string(char_array):
return char_array[:].tostring().strip().decode("UTF-8")
def format_netcdf_datetime_to_xsd(datetime):
return "".join([datetime[0:4], "-", datetime[4:6], "-", datetime[6:8],
"T", datetime[8:10], ":", datetime[10:12], ":", datetime[12:14]])
def append_cycle_date_triples(g, profgrp, cycle, prefix):
"""
@type profile: string
@type profgrp: Dataset
@type g: Graph
"""
jdate = profgrp.variables["JULD"][:].tolist()[0]
days = int(str(jdate).split(".")[0])
start = date(1950, 1, 1)
delta = timedelta(days)
offset = start + delta
hours = int(str(24 * float("0." + (str(jdate).split(".")[1]))).split(".")[0])
minutes = int(1440 * float("0." + str(jdate).split(".")[1]) % 60)
g.add((cycle, prefix.date, Literal("{}T{}:{}:00".format(offset, hours, minutes), datatype=XSD.datetime)))
def bind_graph(graph, prefixes_array):
for prefix in prefixes_array:
print(prefix)
graph.bind(prefix[0], prefix[1])
def create_new_argo_graph(dac_name, prefixes_array):
g = Graph()
bind_graph(g, prefixes_array)
g.add((ARGO[dac_name], RDF.type, ARGO.Datacenter))
g.add((ARGO[dac_name], RDFS.label, Literal(dac_name, datatype=XSD.string)))
return g
def add_triples(graph, triple_list):
"""
@param graph: rdflib.Graph
@param triple_list: dict
"""
for subject in triple_list.keys():
for triple in triple_list[subject]:
try:
if isinstance(triple[1], rdflib.term.URIRef) and triple[1] != None:
graph.add((subject, triple[0], triple[1]))
else:
if type(triple[1][0]) == "string":
triple[1][0] = urllib.parse.quote(triple[1][0])
if len(triple[1]) > 1 and triple[1][0] != None:
graph.add((subject, triple[0], Literal(triple[1][0], datatype=triple[1][1])))
else:
graph.add((subject, triple[0], Literal(triple[1][0])))
except:
print("Error while adding triple "+str(triple))
# list(graph[:])
def create_uri(namespace, *args):
"""
@param namespace: rdflib.namespace
@param args: strings
@return: rdflib.term.URIRef
"""
full_string = ""
for element in args:
if element == None: continue
full_string += urllib.parse.quote(element)
if full_string != "":
return namespace[full_string]
else:
return None
def calculate_age(timestamp):
fdate = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.000+0000")
timedelta = (datetime.now() - datetime.fromtimestamp(time.mktime(fdate.timetuple())))
age = None
if timedelta != None:
age = timedelta.days / 365.25
return decimal.Decimal(age).quantize(decimal.Decimal('0.01'), rounding=decimal.ROUND_DOWN)
def extract_netcdf_infos(wmo, cycle_infos, file_path, cycle_files=None):
try:
rootgrp = Dataset(str(file_path), "r")
date = numpy_chararray_to_string(rootgrp.variables["DATE_CREATION"][:])
print()
except FileNotFoundError:
return
def map_datamode_to_uri(files, wmo, nb):
result = {}
for file in files:
result[file[0]] = ARGO["file" + file[0] + wmo + nb]
return result
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("destination", type=str, help="Absolute path where the converted files will be written")
parser.add_argument("netCDF", type=str, help="Absolute path of the dac folder location")
parser.add_argument("--dac", type=str, nargs="*", help="Convert only floats metadata from the specified dacs")
parser.add_argument("--limit", "-l", type=int, help="Limit the number of floats converted per DAC")
parser.add_argument("--local", "-lo", type=str,
help="Fetch WMO codes of floats to convert from a local copy of the dac folder")
args = parser.parse_args()
if not str.endswith(args.destination, "/"):
args.destination += "/"
# RDF Namespaces
ARGO = Namespace("http://www.argodatamgt.org/argo-ontology#")
NERC = Namespace("http://vocab.nerc.ac.uk/collection/")
GEO = Namespace("https://www.w3.org/2003/01/geo/wgs84_pos#")
PROV = Namespace("https://www.w3.org/TR/prov-o/")
prefixes = (
("argo", ARGO),
("nerc", NERC),
("foaf", FOAF),
("dcat", DCAT),
("dct", DCTERMS),
("sosa", SOSA),
("ssn", SSN),
("geo", GEO),
("prov", PROV)
)
argo_floats = {}
# DCAT description
# argo_catalog = ARGO["catalog"+BNode()]
# g.add((argo_catalog, RDF.type, DCAT.Catalog))
# g.add((argo_catalog, DCTERMS.title, Literal("Argo program catalog")))
# g.add((argo_catalog, DCTERMS.description, Literal("""
# The Argo program RDF catalog regrouping climate and floats catalogs.
# """)))
#
# argofloats_catalog = ARGO["catalog"+BNode()]
# g.add((argofloats_catalog, RDF.type, DCAT.Catalog))
# g.add((argofloats_catalog, DCTERMS.title, Literal("Argo floats datasets catalog")))
# g.add((argofloats_catalog, DCTERMS.description, Literal("""
# The catalogue RDF node from which are connected each float of the Argo program.
# Each float is represented as a DCAT dataset linked with this catalog.
# This catalog is one of the sub-catalogs of the Argo program.
# """)))
#
# g.add((argo_catalog, DCTERMS.hasPart, argofloats_catalog))
ftp_access = FTP("ftp.ifremer.fr")
ftp_access.login()
dac_path = "/ifremer/argo/dac/"
ftp_access.cwd(dac_path)
dac_folder = ftp_access.nlst()
if args.local != None:
dac_folder = os.listdir(args.local)
for dac in dac_folder:
check = True
# print(args)
if args.dac != None:
for dc in args.dac:
if dc != dac:
check = False
else:
check = True
break
if not check:
continue
floats_folders = []
if args.local != None:
floats_folders = os.listdir(args.local+"/"+dac)
else:
ftp_access.cwd("{}/{}/".format(dac_path, dac))
floats_folders = ftp_access.nlst()
floats_folders.reverse()
progress = tqdm(total=len(floats_folders), position=0, leave=True, file=sys.stdout)
part = 1
dac_uri = ARGO[dac]
g = create_new_argo_graph(dac, prefixes)
# g2 = create_new_argo_graph(dac, prefixes)
print("DAC: {}".format(dac))
a = 1
for afloat in floats_folders:
progress.desc = str(afloat)
if afloat.startswith("."):
# progress.update(1)
continue
afloat_uri = ARGO["argofloat" + afloat]
# g.add((argofloats_catalog, DCAT.dataset, afloat_uri))
deployment_uri = ARGO["deployment" + afloat]
activity_uri = ARGO["activity" + afloat]
float_info = requests.get("https://fleetmonitoring.euro-argo.eu/floats/"+str(afloat), params = {"wmo":afloat})
if float_info == None:
progress.write("Float {} metadata is nonexistent. Skipping...".format(afloat))
if float_info.text == "" or float_info.text == None:
continue
float_info = json.loads(float_info.text)
# g.add((argofloats_catalog, DCAT.dataset, afloat_uri))
if float_info == None:
progress.write("Float {} metadata is nonexistent. Skipping...".format(afloat))
continue
pi_uri = create_uri(ARGO, float_info["deployment"]["principalInvestigatorName"])
float_triples = {
afloat_uri: [
(RDF.type, ARGO.ArgoFloat),
(SSN.inDeployment, deployment_uri),
(ARGO.activity, activity_uri),
(ARGO.dac, dac_uri),
(ARGO.wmoCode, (afloat, XSD.string)),
(ARGO.owner, Literal(float_info["owner"]), None),
(ARGO.maker, create_uri(NERC, "R24/current/", float_info["maker"])),
(ARGO.type, create_uri(NERC, "R23/current/", float_info["platform"]["type"])),
(ARGO.transmissionSystem, create_uri(NERC, "R10/current/", float_info["transmissionSystem"]))
],
deployment_uri: [
(RDF.type, SSN.Deployment),
(ARGO.cruise, (float_info["deployment"]["cruiseName"], None)),
(GEO.latitude, (float_info["deployment"]["lat"], XSD.float)),
(GEO.longitude, (float_info["deployment"]["lon"], XSD.float)),
(ARGO.launchDate, (float_info["deployment"]["launchDate"], XSD.dateTime)),
(ARGO.deployedByShip, (float_info["deployment"]["platform"], XSD.string)),
(ARGO.principalInvestigator, pi_uri)
],
pi_uri: [
(RDF.type, FOAF.Person),
(FOAF.name, (float_info["deployment"]["principalInvestigatorName"], XSD.string))
],
dac_uri: [],
}
# Sensor node
for sensor in float_info["sensors"]:
sensor_uri = create_uri(ARGO, "sensor", sensor["serial"])
float_triples[sensor_uri] = [
(RDF.type, SOSA.Sensor),
(ARGO.type, create_uri(NERC, "R25/current/", sensor["id"], "/")),
(ARGO.maker, create_uri(NERC, "R26/current/", sensor["maker"], "/")),
(ARGO.model, create_uri(NERC, "R27/current/", sensor["model"], "/"))
]
float_triples[afloat_uri].append((SOSA.hosts, sensor_uri))
for calib in float_info["calibrations"]:
if calib["dimLevel"] == sensor["dimLevel"]:
float_triples[sensor_uri].append(
(SOSA.observes, create_uri(NERC, "R03/current/", calib["parameter"]))
)
# Activity node
float_triples[activity_uri] = [(RDF.type, ARGO.ActivityData)]
try:
age = calculate_age(float_info["earliestCycle"]["firstStation"]["date"])
float_triples[activity_uri].append((ARGO.age, (age, XSD.float)))
except TypeError:
progress.write("No cycles found for float {}".format(afloat))
ftp_access.cwd("{}/{}/{}/".format(dac_path, dac, afloat))
base_files = ftp_access.nlst()
regex = re.compile(str(afloat) + ".*" + "traj.nc")
traj_files = list(filter(regex.match, base_files))
for traj in traj_files:
float_triples[activity_uri].append(
(ARGO.trajectoryData,
("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, traj), None))
)
regex = re.compile(str(afloat) + ".*" + "prof.nc")
station_files = list(filter(regex.match, base_files))
for prof in station_files:
float_triples[activity_uri].append(
(ARGO.stationData,
("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, prof), None))
)
# Cycle nodes
cycleIds = {}
for c in float_info["cycleIds"]:
cycleIds[c["cycleNumber"]] = c["cycleId"]
try:
nb_cycles = float_info["latestCycle"]["id"]
except:
continue
profiles = {}
try:
ftp_access.cwd("{}/{}/{}/profiles/".format(dac_path, dac, afloat))
except:
continue
cycle_files = ftp_access.nlst()
for cycle in float_info["cycles"]:
nb = int(cycle["id"])
file_nb = (3 - len(str(nb))) * "0" + str(nb)
profiles[nb] = []
cycle_uri = ARGO["cycle" + afloat + file_nb]
float_triples[afloat_uri].append((ARGO.cycle, cycle_uri))
float_triples[cycle_uri] = [
(RDF.type, ARGO.Cycle),
(RDF.type, PROV.Activity)
]
regex = re.compile(".*"+str(afloat)+"_"+str(file_nb)+".nc")
this_cycle_files = list(filter(regex.match, cycle_files))
if len(this_cycle_files) == 0:
# print("Float {}, cycle {} does not have cycle files ?".format(afloat, file_nb))
continue
# if c != None:
# cycleInfo = requests.get("https://dataselection.euro-argo.eu/api/find-by-id/"+str(cycleIds[nb]))
# if cycleInfo != None and cycleInfo.text != None:
# cycleInfo = json.loads(cycleInfo.text)
# for basin in cycleInfo["basins"]:
# g.add((cycle_uri, ARGO.basin, Literal(basin)))
if nb == float_info["latestCycle"]["id"]:
float_triples[activity_uri].append((ARGO.lastCycle, cycle_uri))
netcdf_infos = []
file_nodes = map_datamode_to_uri(this_cycle_files, afloat, file_nb)
for cycle_file in this_cycle_files:
# file_uri = ARGO["file" + BNode()]
if len(file_nodes) == 0: break
file_uri = file_nodes[cycle_file[0]]
file_nodes[cycle_file] = file_uri
float_triples[file_uri] = [
(RDF.type, ARGO.File),
(RDF.type, PROV.Entity),
(RDF.type, DCTERMS.Dataset)
]
float_triples[file_uri].append((ARGO.accessURL,
("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/profiles/{}".format(dac, afloat, cycle_file), None)))
if cycle_file.startswith("R"):
float_triples[cycle_uri].append((ARGO.coreArgoProfile, file_uri))
# float_triples[file_uri].append((PROV.wasGeneratedBy, cycle_uri))
elif cycle_file.startswith("D"):
float_triples[cycle_uri].append((ARGO.coreArgoProfile, file_uri))
elif cycle_file.startswith("B"):
float_triples[cycle_uri].append((ARGO.bcgArgoProfile, file_uri))
elif cycle_file.startswith("S"):
float_triples[cycle_uri].append((ARGO.syntheticArgoProfile, file_uri))
# if not cycle_file.startswith("R"):
# if file_nodes["R"] != None:
# float_triples[file_uri].append((PROV.wasRevisionOf, file_nodes["R"]))
float_triples[file_uri].append((ARGO.dataMode, [Literal(cycle_file[0])]))
# cycle_nc = extract_netcdf_infos(afloat, cycle, "{}{}/{}/profiles/{}{}_{}.nc".format(args.netCDF, dac, afloat, cycle_file[0], afloat, file_nb), this_cycle_files)
# netcdf_infos.append(cycle_nc)
float_triples[dac_uri].append((ARGO.hostsFile, file_uri))
float_triples[cycle_uri].append((ARGO["number"], (nb, XSD.int)))