Commit 4ef2afc5 authored by ALVISET's avatar ALVISET
Browse files

Progress on prov inclusion, first tests possible

parent 3a29760b
......@@ -97,49 +97,38 @@ def profile_get_netcdf_info(file, data_type):
states = {"datastate":[]}
softwares = {"softwares":[]}
params = netcdf_getparams(rootgrp, rootgrp.dimensions["N_PROF"].size, rootgrp.dimensions["N_PARAM"].size)
adjusted_params = {"adjustedParams":{}}
if data_type != "R":
for param in params:
adjusted_params["adjustedParams"][param] = 0
params_qc = {"paramsqc":{}}
for j in range(0, rootgrp.dimensions["N_PROF"].size):
state = netcdf_extract_variable_cell(rootgrp, "DATA_STATE_INDICATOR", j)
if state != '' and state not in states["datastate"]:
if state != '' and state not in states["datastate"] and state != False:
states["datastate"].append(state)
try:
for i in range(0, rootgrp.dimensions["N_HISTORY"].size):
step = netcdf_extract_variable_cell(rootgrp, "HISTORY_STEP", i, j)
software = netcdf_extract_variable_cell(rootgrp, "HISTORY_SOFTWARE", i, j)
if step != '' and step not in steps["steps"]:
steps["steps"].append(step)
if software != '' and software not in softwares["softwares"]:
softwares["softwares"].append(software)
# print(rootgrp.dimensions["N_LEVELS"].size-1)
if data_type != "R":
check = True
for i in range(0, rootgrp.dimensions["N_LEVELS"].size-1):
if check:
for param in params:
# print("{} {} {}".format(file, i, param))
try:
flag = netcdf_extract_variable_cell(rootgrp, param + "_ADJUSTED_QC", j, i)
except IndexError:
check = False
break
if not flag:
break
if flag != None and flag != '' and int(flag) > adjusted_params["adjustedParams"][param]:
# print("{} {}".format(file, int(flag)))
adjusted_params["adjustedParams"][param] = int(flag)
except KeyError:
# print("{} has no N_HISTORY param?".format(file))
break
if data_type != "R":
for param in params:
flag = netcdf_extract_variable(rootgrp, "PROFILE_{}_QC".format(param), j)
# print(str(param) + " " + str(flag))
if flag != None and flag != '' and flag != False:
params_qc["paramsqc"][param] = flag
try:
for i in range(0, rootgrp.dimensions["N_HISTORY"].size):
step = netcdf_extract_variable_cell(rootgrp, "HISTORY_STEP", i, j)
software = netcdf_extract_variable_cell(rootgrp, "HISTORY_SOFTWARE", i, j)
if step != '' and step not in steps["steps"]:
steps["steps"].append(step)
if software != '' and software not in softwares["softwares"]:
softwares["softwares"].append(software)
# print(rootgrp.dimensions["N_LEVELS"].size-1)
except KeyError:
# print("{} has no N_HISTORY param?".format(file))
break
infos.update(steps)
infos.update(states)
infos.update(softwares)
infos.update(adjusted_params)
infos.update(params_qc)
# print(file)
print(infos)
print("------------------")
# if data_type != "R":
# print(file + " " + str(infos))
# print("------------------ "+data_type)
return infos
def compare_prov_infos(prov, adjustedGroups, wmo):
......@@ -150,33 +139,59 @@ def compare_prov_infos(prov, adjustedGroups, wmo):
"""
if len(adjustedGroups) == 0:
adjustedGroups.append({"handbook":prov["handbook"], "steps":set(prov["steps"]), "datastate":set(prov["datastate"]),
"softwares":set(prov["softwares"]), "adjustedParams":prov["adjustedParams"],
"softwares":set(prov["softwares"]), "paramsqc":prov["paramsqc"],
"uri":ARGO["{}group0".format(wmo)]})
else:
is_identical = False
for i in range(0, len(groups)):
group = adjustedGroups[i]
if group["handbook"] == prov["handbook"] and len(set(prov["steps"]).symmetric_difference(group["steps"])) == 0:
if len(set(prov["datastate"]).symmetric_difference(group["datastate"])) == 0:
if len(set(prov["softwares"]).symmetric_difference(group["softwares"])) == 0:
if len(set(prov["adjustedParams"].keys()).symmetric_difference(set(group["adjustedParams"].keys()))) == 0:
is_identical = True
for par in prov["adjustedParams"].keys():
if prov["adjustedParams"][par] != group["adjustedParams"][par]:
is_identical = False
else:
print("ADJUSTED PARAMS DIFFERENT")
else:
print("SOFTWARES DIFFERENT")
if (
group["handbook"] == prov["handbook"] and
len(set(prov["steps"]).symmetric_difference(group["steps"])) == 0 and
len(set(prov["datastate"]).symmetric_difference(group["datastate"])) == 0 and
len(set(prov["softwares"]).symmetric_difference(group["softwares"])) == 0 and
len(set(prov["paramsqc"].keys()).symmetric_difference(set(group["paramsqc"].keys()))) == 0
):
if len(prov["paramsqc"]) > 0:
for par in prov["paramsqc"].keys():
if prov["paramsqc"][par] != group["paramsqc"][par]:
is_identical = False
break
is_identical = True
else:
print("DATASTATES DIFFERENT")
return i
if is_identical:
return i
# else:
# is_identical = False
# print("------------------------------")
# print(prov["paramsqc"])
# print("ADJUSTED PARAMS DIFFERENT")
# print(group["paramsqc"])
# else:
# is_identical = False
# print("------------------------------")
# print(prov["softwares"])
# print("SOFTWARES DIFFERENT")
# print(group["softwares"])
# else:
# is_identical = False
# print("------------------------------")
# print(prov["datastate"])
# print("DATASTATES DIFFERENT")
# print(group["datastate"])
else:
print("SETS DIFFERENT")
continue
# is_identical = False
# print("------------------------------")
# print(prov["steps"])
# print("STEPS DIFFERENT")
# print(group["steps"])
if is_identical:
return i
adjustedGroups.append({"handbook":prov["handbook"], "steps":set(prov["steps"]), "datastate":set(prov["datastate"]),
"softwares":set(prov["softwares"]), "adjustedParams":prov["adjustedParams"],
"softwares":set(prov["softwares"]), "paramsqc":prov["paramsqc"],
"uri":ARGO["{}group{}".format(wmo,len(groups))]})
return len(adjustedGroups)-1
......@@ -250,6 +265,7 @@ if __name__ == "__main__":
parser.add_argument("--limit", "-l", type=int, help="Limit the number of floats converted per DAC")
parser.add_argument("--local", "-lo", type=bool, nargs="?", const=True, default=False,
help="Fetch WMO codes of floats to convert from a local copy of the dac folder")
parser.add_argument("--float", type=str, help="Convert a specific float. Debug purposes.")
args = parser.parse_args()
if not str.endswith(args.destination, "/"):
......@@ -302,7 +318,7 @@ if __name__ == "__main__":
dac_folder = ftp_access.nlst()
if args.local != None:
dac_folder = os.listdir(args.netCDF)
print(dac_folder)
# print(dac_folder)
for dac in dac_folder:
check = True
......@@ -336,6 +352,8 @@ if __name__ == "__main__":
a = 1
for afloat in floats_folders:
if args.float != None:
if afloat != args.float: continue
progress.desc = str(afloat)
if afloat.startswith("."):
# progress.update(1)
......@@ -363,7 +381,7 @@ if __name__ == "__main__":
(SSN.inDeployment, deployment_uri),
(ARGO.activity, activity_uri),
(ARGO.dac, dac_uri),
(ARGO.wmoCode, (afloat, XSD.string)),
(ARGO.wmoCode, Literal(afloat, datatype=XSD.string)),
(ARGO.owner, Literal(float_info["owner"]), None),
(ARGO.maker, (NERC, "R24/current/", float_info["maker"])),
(ARGO.type, (NERC, "R23/current/", float_info["platform"]["type"])),
......@@ -482,15 +500,37 @@ if __name__ == "__main__":
# print("TEST")
prov = profile_get_netcdf_info(args.netCDF+dac+"/"+afloat+"/profiles/"+file,
files_info["infos"][file]["datamode"])
groupId = compare_prov_infos(prov, groups, afloat)
print(groupId)
argo_graph.add_triple(file_uri, DCTERMS.created, Literal(prov["created"], datatype=XSD.datetime))
argo_graph.add_triple(file_uri, PROV.influencedBy, groups[groupId]["uri"])
if files_info["infos"][file]["datamode"] != "R":
groupId = compare_prov_infos(prov, groups, afloat)
argo_graph.add_triple(file_uri, PROV.influencedBy, groups[groupId]["uri"])
else:
argo_graph.add_triple(file_uri, ARGO.handbookVersion, prov["handbook"])
argo_graph.add_triple(cycle_uri, ARGO.number, Literal(nb, datatype=XSD.int))
argo_graph.add_triple(cycle_uri, ARGO.startDate, Literal(cycle["startDate"]))
argo_graph.add_triple(cycle_uri, GEO.latitude, Literal(cycle["lat"]))
argo_graph.add_triple(cycle_uri, GEO.longitude, Literal(cycle["lon"]))
adjusted_Params = {} # Keep profile params in memory to avoid duplicates
for group in groups:
argo_graph.add_triple(group["uri"], ARGO.handbookVersion, group["handbook"])
for step in group["steps"]:
argo_graph.add_triple(group["uri"], ARGO.step, (NERC, "R12/current/", step))
for datastate in group["datastate"]:
argo_graph.add_triple(group["uri"], ARGO.dataState, (NERC, "R06/current/", datastate))
for software in group["softwares"]:
argo_graph.add_triple(group["uri"], PROV.software, Literal(software))
for param in group["paramsqc"].keys():
duo = "{};{}".format(param,group["paramsqc"][param])
if duo not in adjusted_Params.keys():
adjusted_Params[duo] = ARGO["{}profparam{}{}".format(afloat, param, group["paramsqc"][param])]
print(group["paramsqc"])
argo_graph.add_triple(adjusted_Params[duo], ARGO.param, (NERC, "R03/current/", param))
argo_graph.add_triple(adjusted_Params[duo], ARGO.qc, (NERC, "RP2/current/", group["paramsqc"][param]))
argo_graph.add_triple(group["uri"], ARGO.profileParam, adjusted_Params[duo])
print("Number of groups: " + str(len(groups)))
print("Number of files: " + str(len(cycle_files)))
a += 1
progress.update(1)
if a == args.limit:
......
......@@ -16,23 +16,23 @@ class GraphWrapper():
self.graph.bind(prefix[0], prefix[1])
@staticmethod
def create_uri(namespace, *args):
def create_uri(*args):
"""
@param namespace: rdflib.namespace
@param args: string()
@return: rdflib.term.URIRef
"""
full_string = ""
# i = 0
i = 0
for element in args:
# if i == 0: i += 1; continue
if element == None: continue
if i == 0: i += 1; continue
if element == None or element == "": continue
full_string += urllib.parse.quote(element)
if full_string != "":
try:
return namespace[full_string]
return args[0][full_string]
except:
print("ERROR "+str(isinstance(namespace, rdflib.Namespace))+" "+str(namespace))
print("ERROR "+str(isinstance(args[0], rdflib.Namespace))+" "+str(args[0]))
else:
return None
......@@ -45,8 +45,8 @@ class GraphWrapper():
"""
if isinstance(object, rdflib.Literal) or isinstance(object, rdflib.term.URIRef):
self.graph.add((subject, predicate, object))
elif type(object) == list:
self.graph.add((subject, predicate, self.create_uri(object)))
elif type(object) == list or type(object) == tuple:
self.graph.add((subject, predicate, GraphWrapper.create_uri(*object)))
elif type(object) == str:
self.graph.add((subject, predicate, Literal(object)))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment