Commit 7847ea27 authored by ALVISET's avatar ALVISET
Browse files

Fixes and added bare DCAT description (+125k triples)

parent cf502fd0
......@@ -13,7 +13,7 @@ from datetime import date, timedelta, datetime
from ftplib import FTP
from netCDF4 import Dataset
from netCDF4 import Dataset
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN, RDF
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN, RDF, URIRef
from rdflib.namespace import *
from tqdm import tqdm
from graph_wrapper import GraphWrapper
......@@ -242,7 +242,7 @@ def map_file_to_prov(file, file_uri, cycle_uri, wrapper, files_info, dac_uri):
profile_type = files_info["infos"][file]["type"]
wrapper.add_triple(file_uri, RDF.type, ARGO.File)
wrapper.add_triple(file_uri, RDF.type, PROV.Entity)
wrapper.add_triple(file_uri, RDF.type, DCTERMS.Dataset)
# wrapper.add_triple(file_uri, RDF.type, DCTERMS.Dataset)
wrapper.add_triple(cycle_uri, link_type[profile_type], file_uri)
wrapper.add_triple(file_uri, ARGO.accessURL, Literal(
"ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/profiles/{}".format(dac, afloat, file), None))
......@@ -294,14 +294,16 @@ if __name__ == "__main__":
NERC = Namespace("http://vocab.nerc.ac.uk/collection/")
GEO = Namespace("https://www.w3.org/2003/01/geo/wgs84_pos#")
PROV = Namespace("https://www.w3.org/TR/prov-o/")
SOSA = Namespace("http://www.w3.org/ns/sosa/")
SSN = Namespace("http://www.w3.org/ns/ssn/")
prefixes = (
("argo", ARGO),
("nerc", NERC),
("foaf", FOAF),
("dcat", DCAT),
("dct", DCTERMS),
("sosa", SSN), #RDFLib reverse SOSA and SSN...
("ssn", SOSA),
("sosa", SOSA), #RDFLib reverse SOSA and SSN...
("ssn", SSN),
("geo", GEO),
("prov", PROV)
)
......@@ -310,23 +312,21 @@ if __name__ == "__main__":
# DCAT description
# argo_catalog = ARGO["catalog"+BNode()]
# g.add((argo_catalog, RDF.type, DCAT.Catalog))
# g.add((argo_catalog, DCTERMS.title, Literal("Argo program catalog")))
# g.add((argo_catalog, DCTERMS.description, Literal("""
# The Argo program RDF catalog regrouping climate and floats catalogs.
# """)))
#
# argofloats_catalog = ARGO["catalog"+BNode()]
# g.add((argofloats_catalog, RDF.type, DCAT.Catalog))
# g.add((argofloats_catalog, DCTERMS.title, Literal("Argo floats datasets catalog")))
# g.add((argofloats_catalog, DCTERMS.description, Literal("""
# The catalogue RDF node from which are connected each float of the Argo program.
# Each float is represented as a DCAT dataset linked with this catalog.
# This catalog is one of the sub-catalogs of the Argo program.
# """)))
#
# g.add((argo_catalog, DCTERMS.hasPart, argofloats_catalog))
dcat_graph = GraphWrapper(prefixes)
argo_catalog = BNode()
dcat_graph.add_triple(argo_catalog, RDF.type, DCAT.Catalog)
dcat_graph.add_triple(argo_catalog, DCTERMS.title, Literal("Argo Floats metadata"))
dcat_graph.add_triple(argo_catalog, DCTERMS.description, Literal("""
Argo is an international program that collects information from inside the ocean using a fleet of robotic
instruments that drift with the ocean currents and move up and down between the surface and a mid-water level.
Each instrument (float) spends almost all its life below the surface. The name Argo was chosen because the
array of floats works in partnership with the Jason earth observing satellites that measure the shape of the
ocean surface. (In Greek mythology Jason sailed on his ship the Argo in search of the golden fleece).
""", lang="en"))
dcat_graph.add_triple(argo_catalog, DCTERMS.publisher, URIRef("http://www.argodatamgt.org/Data-Mgt-Team"))
dcat_graph.add_triple(argo_catalog, DCAT.service,
URIRef("https://www.ifremer.fr/co/argo-linked-data/html/Argo-HTML-SPARQL/"))
ftp_access = FTP("ftp.ifremer.fr")
ftp_access.login()
......@@ -371,6 +371,16 @@ if __name__ == "__main__":
dmqc_ops = {}
softwares = {}
# DCAT: a dac is a catalog
dcat_graph.add_triple(dac_uri, RDF.type, DCAT.Catalog)
dcat_graph.add_triple(dac_uri, DCTERMS.title, Literal("{} Argo DAC metadata".format(dac)))
dcat_graph.add_triple(dac_uri, DCTERMS.description, Literal("""
Catalog of the Argo datacenter {} containing floats metadata.
""".format(dac)))
dcat_graph.add_triple(dac_uri, DCTERMS.publisher,
URIRef("http://www.argodatamgt.org/Data-Mgt-Team/ADMT-team-and-Executive-Committee"))
for afloat in floats_folders:
if args.float != None:
if afloat != args.float: continue
......@@ -399,7 +409,7 @@ if __name__ == "__main__":
(RDF.type, ARGO.ArgoFloat),
(SSN.inDeployment, deployment_uri),
(ARGO.activity, activity_uri),
(ARGO.dac, dac_uri),
# (ARGO.dac, dac_uri),
(ARGO.wmoCode, Literal(afloat, datatype=XSD.string)),
(ARGO.owner, Literal(float_info["owner"]), None),
(ARGO.maker, (NERC, "R24/current/", float_info["maker"])),
......@@ -409,8 +419,8 @@ if __name__ == "__main__":
deployment_uri: [
(RDF.type, SSN.Deployment),
(ARGO.cruise, Literal(float_info["deployment"]["cruiseName"])),
(GEO.latitude, str(Literal(float_info["deployment"]["lat"], datatype=XSD.decimal))),
(GEO.longitude, str(Literal(float_info["deployment"]["lon"], datatype=XSD.decimal))),
(GEO.latitude, Literal(float_info["deployment"]["lat"], datatype=XSD.decimal)),
(GEO.longitude, Literal(float_info["deployment"]["lon"], datatype=XSD.decimal)),
(ARGO.launchDate, Literal(float_info["deployment"]["launchDate"], datatype=XSD.dateTime)),
(ARGO.deployedByShip, Literal(float_info["deployment"]["platform"], datatype=XSD.string)),
(ARGO.principalInvestigator, pi_uri)
......@@ -420,6 +430,15 @@ if __name__ == "__main__":
(FOAF.name, Literal(float_info["deployment"]["principalInvestigatorName"], datatype=XSD.string))
]
})
# DCAT: a float is a dataset belonging to a DAC catalog
dcat_graph.add_triple(dac_uri, DCAT.dataset, afloat_uri)
dcat_graph.add_triple(afloat_uri, RDF.type, DCAT.Dataset)
dcat_graph.add_triple(afloat_uri, DCTERMS.title, Literal("Argo float {}".format(afloat)))
dcat_graph.add_triple(afloat_uri, DCTERMS.description, Literal("""
Float WMO {} metadata from the datacenter {}
""".format(afloat, dac)))
# dcat_graph.add_triple(afloat_uri, DCTERMS.identifier, Literal(afloat))
# Sensor node
for sensor in float_info["sensors"]:
sensor_uri = GraphWrapper.create_uri(ARGO, "sensor", sensor["serial"])
......@@ -535,12 +554,22 @@ if __name__ == "__main__":
(RDF.type, PROV.Activity)
]})
# print("{} {}".format(nb, float_info["latestCycle"]["id"]))
# DCAT: a cycle is a distribution of the float (dataset)
if nb == 0 or nb == 1:
dcat_graph.add_triple(afloat_uri, DCTERMS.issued, Literal(cycle["startDate"], datatype=XSD.date))
if nb == float_info["latestCycle"]["id"]:
argo_graph.add_triple(activity_uri, ARGO.lastCycle, cycle_uri)
dcat_graph.add_triple(afloat_uri, DCTERMS.modified, cycle["startDate"])
dcat_graph.add_triple(afloat_uri, DCAT.distribution, cycle_uri)
dcat_graph.add_triple(cycle_uri, RDF.type, DCAT.Distribution)
dcat_graph.add_triple(cycle_uri, DCTERMS.title, Literal(
"Float {} cycle {}".format(afloat, nb)))
# Process each file of the cycle and repgroup their provenance metadata with other cycles if possible
for file in this_cycle_files:
file_uri = ARGO["file" + file[:len(file) - 3].replace("_", "")]
argo_graph.add_triple(file_uri, DCTERMS["format"], Literal("netCDF4"))
map_file_to_prov(file, file_uri, cycle_uri, argo_graph, files_info, dac_uri)
dcat_graph.add_triple(cycle_uri, DCAT.downloadURL, file_uri)
if not args.noprov:
# print("TEST")
prov = profile_get_netcdf_info(args.netCDF+dac+"/"+afloat+"/profiles/"+file,
......@@ -553,10 +582,13 @@ if __name__ == "__main__":
argo_graph.add_triple(file_uri, ARGO.handbookVersion, prov["handbook"])
argo_graph.add_triple(cycle_uri, ARGO.number, Literal(nb, datatype=XSD.int))
argo_graph.add_triple(cycle_uri, ARGO.startDate, Literal(cycle["startDate"]))
argo_graph.add_triple(cycle_uri, DCTERMS.identifier, Literal(nb, datatype=XSD.int))
argo_graph.add_triple(cycle_uri, ARGO.startDate, Literal(cycle["startDate"], datatype=XSD.date))
argo_graph.add_triple(cycle_uri, GEO.latitude, Literal(cycle["lat"], datatype=XSD.decimal))
argo_graph.add_triple(cycle_uri, GEO.longitude, Literal(cycle["lon"], datatype=XSD.decimal))
adjusted_Params = {} # Keep profile params in memory to avoid duplicates
for group in groups:
# argo_graph.add_triple(group["uri"], ARGO.handbookVersion, group["handbook"])
......@@ -580,7 +612,7 @@ if __name__ == "__main__":
argo_graph.add_triple(group["uri"], ARGO.profileParam, adjusted_Params[duo])
if group["dmqc"] != None:
dmqc_infos = group["dmqc"].split("|")
print(dmqc_infos)
# print(dmqc_infos)
if len(dmqc_infos) > 1:
dmqc_name = dmqc_infos[2].split(",")[0].lower().strip()
if dmqc_name.lower() not in dmqc_ops.keys():
......@@ -591,11 +623,11 @@ if __name__ == "__main__":
argo_graph.add_triple(dmqc_uri, RDF.type, FOAF.Person)
argo_graph.add_triple(dmqc_uri, DCTERMS.identifier, dmqc_infos[1].strip())
argo_graph.add_triple(group["uri"], PROV.wasAttributedTo, dmqc_ops[dmqc_name])
print("Number of groups: " + str(len(groups)))
print("Number of files: " + str(len(cycle_files)))
# print("Number of groups: " + str(len(groups)))
# print("Number of files: " + str(len(cycle_files)))
a += 1
progress.update(1)
if a == args.limit:
if a == args.limit+1:
break
if a > 500:
with open("{}argo_floats_generated_{}_{}.ttl".format(args.destination, dac, part), "w") as ttl:
......@@ -608,5 +640,8 @@ if __name__ == "__main__":
with open("{}argo_floats_generated_{}_{}.ttl".format(args.destination, dac, part), "w") as ttl:
ttl.write("@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .\n")
ttl.write(argo_graph.serialize(format="turtle").decode("utf-8"))
with open("{}dcat_argo_floats_generated.ttl".format(args.destination), "w") as ttl:
ttl.write(dcat_graph.serialize(format="turtle").decode("utf-8"))
ftp_access.close()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment