Commit 3a29760b authored by ALVISET's avatar ALVISET
Browse files

Progress on provenance

parent d64fad04
......@@ -89,7 +89,7 @@ def profile_get_netcdf_info(file, data_type):
try:
rootgrp = Dataset(file, "r", format="NETCDF4")
except FileNotFoundError:
print("{} not found, skipping...".format(file))
# print("{} not found, skipping...".format(file))
return
infos.update({"handbook":netcdf_extract_variable(rootgrp, "HANDBOOK_VERSION")})
infos.update({"created":format_netcdf_datetime_to_xsd(netcdf_extract_variable(rootgrp, "DATE_CREATION"))})
......@@ -132,16 +132,54 @@ def profile_get_netcdf_info(file, data_type):
adjusted_params["adjustedParams"][param] = int(flag)
except KeyError:
# print("{} has no N_HISTORY param?".format(file))
return
break
infos.update(steps)
infos.update(states)
infos.update(softwares)
infos.update(adjusted_params)
# print(file)
# print(infos)
# print("------------------")
print(infos)
print("------------------")
return infos
def compare_prov_infos(prov, adjustedGroups, wmo):
"""
Compare file provenance information with existing groups if there's any
@param prov:
@return:
"""
if len(adjustedGroups) == 0:
adjustedGroups.append({"handbook":prov["handbook"], "steps":set(prov["steps"]), "datastate":set(prov["datastate"]),
"softwares":set(prov["softwares"]), "adjustedParams":prov["adjustedParams"],
"uri":ARGO["{}group0".format(wmo)]})
else:
is_identical = False
for i in range(0, len(groups)):
group = adjustedGroups[i]
if group["handbook"] == prov["handbook"] and len(set(prov["steps"]).symmetric_difference(group["steps"])) == 0:
if len(set(prov["datastate"]).symmetric_difference(group["datastate"])) == 0:
if len(set(prov["softwares"]).symmetric_difference(group["softwares"])) == 0:
if len(set(prov["adjustedParams"].keys()).symmetric_difference(set(group["adjustedParams"].keys()))) == 0:
is_identical = True
for par in prov["adjustedParams"].keys():
if prov["adjustedParams"][par] != group["adjustedParams"][par]:
is_identical = False
else:
print("ADJUSTED PARAMS DIFFERENT")
else:
print("SOFTWARES DIFFERENT")
else:
print("DATASTATES DIFFERENT")
else:
print("SETS DIFFERENT")
if is_identical:
return i
adjustedGroups.append({"handbook":prov["handbook"], "steps":set(prov["steps"]), "datastate":set(prov["datastate"]),
"softwares":set(prov["softwares"]), "adjustedParams":prov["adjustedParams"],
"uri":ARGO["{}group{}".format(wmo,len(groups))]})
return len(adjustedGroups)-1
def map_filetype_and_datamode(cycle_files):
"""
Map cycle files to their data mode and type for mapping to core and provenance
......@@ -162,10 +200,11 @@ def map_filetype_and_datamode(cycle_files):
result["D"].update({file[0]: file})
return result
def map_file_to_prov(file, cycle_uri, wrapper, files_info, dac_uri):
def map_file_to_prov(file, file_uri, cycle_uri, wrapper, files_info, dac_uri):
"""
Add triples related to the cycle files related to the core metadata and provenance metadata
@type file: str
@type file_uri: rdflib.term.URIRef
@type cycle_uri: rdflib.term.URIRef
@type wrapper: GraphWrapper
"""
......@@ -176,7 +215,6 @@ def map_file_to_prov(file, cycle_uri, wrapper, files_info, dac_uri):
"B": ARGO.bcgArgoProfile,
"S": ARGO.syntheticArgoProfile
}
file_uri = ARGO["file"+file[:len(file)-3].replace("_","")]
data_mode = files_info["infos"][file]["datamode"]
profile_type = files_info["infos"][file]["type"]
wrapper.add_triple(file_uri, RDF.type, ARGO.File)
......@@ -210,7 +248,7 @@ if __name__ == "__main__":
parser.add_argument("--noprov", type=bool, nargs="?", const=True, default=False, help="Don't look into netCDF files")
parser.add_argument("--dac", type=str, nargs="*", help="Convert only floats metadata from the specified dacs")
parser.add_argument("--limit", "-l", type=int, help="Limit the number of floats converted per DAC")
parser.add_argument("--local", "-lo", type=str,
parser.add_argument("--local", "-lo", type=bool, nargs="?", const=True, default=False,
help="Fetch WMO codes of floats to convert from a local copy of the dac folder")
args = parser.parse_args()
......@@ -263,7 +301,8 @@ if __name__ == "__main__":
dac_folder = ftp_access.nlst()
if args.local != None:
dac_folder = os.listdir(args.local)
dac_folder = os.listdir(args.netCDF)
print(dac_folder)
for dac in dac_folder:
check = True
......@@ -279,7 +318,7 @@ if __name__ == "__main__":
continue
floats_folders = []
if args.local != None:
floats_folders = os.listdir(args.local+"/"+dac)
floats_folders = os.listdir(args.netCDF+"/"+dac)
else:
ftp_access.cwd("{}/{}/".format(dac_path, dac))
floats_folders = ftp_access.nlst()
......@@ -369,8 +408,11 @@ if __name__ == "__main__":
argo_graph.add_triple(activity_uri, ARGO.age, Literal(age, datatype=XSD.float))
except TypeError:
progress.write("No cycles found for float {}".format(afloat))
ftp_access.cwd("{}/{}/{}/".format(dac_path, dac, afloat))
base_files = ftp_access.nlst()
if not args.local:
ftp_access.cwd("{}/{}/{}/".format(dac_path, dac, afloat))
base_files = ftp_access.nlst()
else:
base_files = os.listdir("{}/{}/{}".format(args.netCDF, dac, afloat))
regex = re.compile(str(afloat) + ".*" + "traj.nc")
traj_files = list(filter(regex.match, base_files))
for traj in traj_files:
......@@ -394,10 +436,14 @@ if __name__ == "__main__":
continue
profiles = {}
try:
ftp_access.cwd("{}/{}/{}/profiles/".format(dac_path, dac, afloat))
if not args.local:
ftp_access.cwd("{}/{}/{}/profiles/".format(dac_path, dac, afloat))
cycle_files = ftp_access.nlst()
else:
cycle_files = os.listdir("{}/{}/{}/profiles/".format(args.netCDF, dac, afloat))
except:
continue
cycle_files = ftp_access.nlst()
groups = []
for cycle in float_info["cycles"]:
nb = int(cycle["id"])
file_nb = (3 - len(str(nb))) * "0" + str(nb)
......@@ -427,17 +473,19 @@ if __name__ == "__main__":
]})
# print("{} {}".format(nb, float_info["latestCycle"]["id"]))
if nb == float_info["latestCycle"]["id"]:
print("TEST")
argo_graph.add_triple(activity_uri, ARGO.lastCycle, cycle_uri)
for file in this_cycle_files:
map_file_to_prov(file, cycle_uri, argo_graph, files_info, dac_uri)
file_uri = ARGO["file" + file[:len(file) - 3].replace("_", "")]
map_file_to_prov(file, file_uri, cycle_uri, argo_graph, files_info, dac_uri)
if not args.noprov:
# print("TEST")
prov = profile_get_netcdf_info(args.netCDF+dac+"/"+afloat+"/profiles/"+file,
files_info["infos"][file]["datamode"])
groupId = compare_prov_infos(prov, groups, afloat)
print(groupId)
argo_graph.add_triple(file_uri, DCTERMS.created, Literal(prov["created"], datatype=XSD.datetime))
argo_graph.add_triple(file_uri, PROV.influencedBy, groups[groupId]["uri"])
argo_graph.add_triple(cycle_uri, ARGO.number, Literal(nb, datatype=XSD.int))
argo_graph.add_triple(cycle_uri, ARGO.startDate, Literal(cycle["startDate"]))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment