Commit 5e3dbdf6 authored by ALVISET's avatar ALVISET
Browse files

Progress on provenance data integration

parent 60c2941a
......@@ -60,6 +60,46 @@ def calculate_age(timestamp):
def netcdf_extract_variable(netcdf, variable, start=None, end=None):
return numpy_chararray_to_string(netcdf.variables[variable][start:end])
def netcdf_extract_variable_cell(netcdf, variable, row, column=None):
if column != None:
return numpy_chararray_to_string(netcdf.variables[variable][row][column])
else:
return numpy_chararray_to_string(netcdf.variables[variable][row])
def profile_get_netcdf_info(file):
infos = {}
try:
rootgrp = Dataset(file, "r", format="NETCDF4")
except FileNotFoundError:
print("{} not found, skipping...".format(file))
return
infos.update({"handbook":netcdf_extract_variable(rootgrp, "HANDBOOK_VERSION")})
infos.update({"created":format_netcdf_datetime_to_xsd(netcdf_extract_variable(rootgrp, "DATE_CREATION"))})
steps = {"steps":[]}
states = {"datastate":[]}
softwares = {"softwares":[]}
adjusted_params = {"qc":[]}
for j in range(0, rootgrp.dimensions["N_PROF"].size):
state = netcdf_extract_variable_cell(rootgrp, "DATA_STATE_INDICATOR", j)
if state != '' and state not in states["datastate"]:
states["datastate"].append(state)
for i in range(0, rootgrp.dimensions["N_HISTORY"].size):
step = netcdf_extract_variable_cell(rootgrp, "HISTORY_STEP", i, j)
software = netcdf_extract_variable_cell(rootgrp, "HISTORY_SOFTWARE", i, j)
if step != '' and step not in steps["steps"]:
steps["steps"].append(step)
if software != '' and software not in softwares["softwares"]:
softwares["softwares"].append(software)
for i in range(0, rootgrp.dimensions["N_LEVELS"].size):
flag = netcdf_extract_variable(rootgrp, "TEMP_ADJUSTED_QC")
if flag != '' and flag not in adjusted_params["qc"]:
adjusted_params["qc"].append(flag)
infos.update(steps)
infos.update(states)
infos.update(softwares)
infos.update(adjusted_params)
print(infos)
def map_filetype_and_datamode(cycle_files):
"""
Map cycle files to their data mode and type for mapping to core and provenance
......@@ -68,7 +108,7 @@ def map_filetype_and_datamode(cycle_files):
@type float_triples: list
@return: dict
"""
result = {}
result = {"infos":{}, "R":{}, "D":{}}
name_length = len(cycle_files[0])
cycle_number = cycle_files[0][name_length-6:name_length-3]
for file in cycle_files:
......@@ -80,7 +120,7 @@ def map_filetype_and_datamode(cycle_files):
result["D"].update({file[0]: file})
return result
def map_file_to_prov(file, cycle_uri, wrapper, files_info):
def map_file_to_prov(file, cycle_uri, wrapper, files_info, dac_uri):
"""
Add triples related to the cycle files related to the core metadata and provenance metadata
@type file: str
......@@ -93,20 +133,20 @@ def map_file_to_prov(file, cycle_uri, wrapper, files_info):
"B": ARGO.bcgArgoProfile,
"S": ARGO.syntheticArgoProfile
}
file_uri = ARGO[file[:len(file)-3]]
data_mode = files_info["infos"]["datamode"]
profile_type = files_info["infos"]["type"]
file_uri = ARGO["file"+file[:len(file)-3].replace("_","")]
data_mode = files_info["infos"][file]["datamode"]
profile_type = files_info["infos"][file]["type"]
wrapper.add_triple(file_uri, RDF.type, ARGO.File)
wrapper.add_triple(file_uri, RDF.type, PROV.Entity)
wrapper.add_triple(file_uri, RDF.type, DCTERMS.Dataset)
wrapper.add_triple(cycle_uri, link_type[profile_type], file_uri)
wrapper.add_triple(file_uri, ARGO.accessURL, (
wrapper.add_triple(file_uri, ARGO.accessURL, Literal(
"ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/profiles/{}".format(dac, afloat, file), None))
wrapper.add_triple(files_info["dac"], ARGO.hosts, file_uri)
wrapper.add_triple(dac_uri, ARGO.hosts, file_uri)
if file in files_info.keys():
wrapper.add_triple(file_uri, ARGO.datamode, Literal(data_mode, XSD.String))
if files_info["infos"][file] == "R":
if file in files_info["infos"].keys():
wrapper.add_triple(file_uri, ARGO.datamode, Literal(data_mode))
if files_info["infos"][file]["datamode"] == "R":
wrapper.add_triple(file_uri, PROV.wasGeneratedBy, cycle_uri)
else:
try:
......@@ -198,6 +238,8 @@ if __name__ == "__main__":
floats_folders = ftp_access.nlst()
floats_folders.reverse()
progress = tqdm(total=len(floats_folders), position=0, leave=True, file=sys.stdout)
if args.limit is not None:
progress.total = args.limit
part = 1
dac_uri = ARGO[dac]
......@@ -227,7 +269,6 @@ if __name__ == "__main__":
if float_info == None:
progress.write("Float {} metadata is nonexistent. Skipping...".format(afloat))
continue
print("TEST: "+ARGO)
pi_uri = GraphWrapper.create_uri(ARGO, float_info["deployment"]["principalInvestigatorName"])
argo_graph.add_triples({
......@@ -315,15 +356,12 @@ if __name__ == "__main__":
file_nb = (3 - len(str(nb))) * "0" + str(nb)
profiles[nb] = []
cycle_uri = ARGO["cycle" + str(afloat) + str(file_nb)]
argo_graph.add_triple(afloat_uri, ARGO.cycle, cycle_uri)
argo_graph.add_triples({
cycle_uri :[
(RDF.type, ARGO.Cycle),
(RDF.type, PROV.Activity)
]})
files_info = map_filetype_and_datamode(cycle_files)
# Do not create cycle nodes if they don't have any relevant information
if len(files_info["infos"]) == 0 : continue
regex = re.compile(".*"+str(afloat)+"_"+str(file_nb)+".nc")
this_cycle_files = list(filter(regex.match, cycle_files))
regex = re.compile("_"+str(file_nb)+".nc")
this_cycle_files = list(filter(regex.search, cycle_files))
if len(this_cycle_files) == 0:
# print("Float {}, cycle {} does not have cycle files ?".format(afloat, file_nb))
continue
......@@ -334,15 +372,20 @@ if __name__ == "__main__":
# for basin in cycleInfo["basins"]:
# argo_graph.add_triple((cycle_uri, ARGO.basin, Literal(basin)))
argo_graph.add_triple(afloat_uri, ARGO.cycle, cycle_uri)
argo_graph.add_triples({
cycle_uri: [
(RDF.type, ARGO.Cycle),
(RDF.type, PROV.Activity)
]})
if nb == float_info["latestCycle"]["id"]:
argo_graph.add_triple(activity_uri, ARGO.lastCycle, cycle_uri)
files_info = map_filetype_and_datamode(cycle_files)
for file in cycle_files:
map_file_to_prov(file, cycle_uri, argo_graph, files_info)
for file in this_cycle_files:
map_file_to_prov(file, cycle_uri, argo_graph, files_info, dac_uri)
profile_get_netcdf_info(args.netCDF+dac+"/"+afloat+"/profiles/"+file)
argo_graph.add_triple(cycle_uri, ARGO.number, Literal(nb, datatype=XSD.int))
argo_graph.add_triple(cycle_uri, ARGO.startDate, Literal(cycle["startDate"]))
......
......@@ -22,8 +22,6 @@ class GraphWrapper():
@param args: string()
@return: rdflib.term.URIRef
"""
print(namespace)
print(args)
full_string = ""
# i = 0
for element in args:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment