Commit 30e89c48 authored by ALVISET's avatar ALVISET
Browse files

Before data fetch conversion to the OpenAPI

parent 3e71bacf
......@@ -127,3 +127,5 @@ dmypy.json
# Pyre type checker
.pyre/
.idea/
\ No newline at end of file
# argoToRDF
# Argo to RDF
Toolkit to convert Argo netCDF files metadata into a DCAT compliant RDF file
from netCDF4 import Dataset
import argparse
import os
from datetime import date, timedelta
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN
from rdflib.namespace import *
from tqdm import tqdm
import requests
import json
def truncate(f, n):
"""Truncates/pads a float f to n decimal places without rounding"""
s = '{}'.format(f)
if 'e' in s or 'E' in s:
return '{0:.{1}f}'.format(f, n)
i, p, d = s.partition('.')
return '.'.join([i, (d + '0' * n)[:n]])
def numpy_chararray_to_string(char_array):
return char_array[:].tostring().strip().decode("UTF-8")
def format_netcdf_datetime_to_xsd(datetime):
return "".join([datetime[0:4], "-", datetime[4:6], "-", datetime[6:8],
"T", datetime[8:10], ":", datetime[10:12], ":", datetime[12:14]])
def append_cycle_date_triples(g, profgrp, cycle, prefix):
"""
@type profile: string
@type profgrp: Dataset
@type g: Graph
"""
jdate = profgrp.variables["JULD"][:].tolist()[0]
days = int(str(jdate).split(".")[0])
start = date(1950, 1, 1)
delta = timedelta(days)
offset = start + delta
hours = int(str(24 * float("0." + (str(jdate).split(".")[1]))).split(".")[0])
minutes = int(1440 * float("0." + str(jdate).split(".")[1]) % 60)
g.add((cycle, prefix.date, Literal("{}T{}:{}:00".format(offset, hours, minutes), datatype=XSD.datetime)))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("db", type=str, help="Argo dac database path")
args = parser.parse_args()
db_location = os.path.abspath(args.db)
# RDF Namespaces
ARGO = Namespace("http://www.argodatamgt.org/vocab#")
NERC = Namespace("http://vocab.nerc.ac.uk/collection/")
GEO = Namespace("https://www.w3.org/2003/01/geo/wgs84_pos#")
argo_floats = {}
g = Graph()
g.bind("argo", ARGO)
g.bind("nerc", NERC)
g.bind("foaf", FOAF)
g.bind("dcat", DCAT)
g.bind("dct", DCTERMS)
g.bind("sosa", SOSA)
g.bind("ssn", SSN)
g.bind("geo", GEO)
for dac in os.listdir(db_location):
argo_floats = {dac: {}}
# DCAT description
argo_catalog = ARGO["calaog"+BNode()]
g.add((argo_catalog, RDF.type, DCAT.Catalog))
g.add((argo_catalog, DCTERMS.title, Literal("Argo program catalog")))
g.add((argo_catalog, DCTERMS.description, Literal("""
The Argo program RDF catalog regrouping climate and floats catalogs.
""")))
argofloats_catalog = ARGO["catalog"+BNode()]
g.add((argofloats_catalog, RDF.type, DCAT.Catalog))
g.add((argofloats_catalog, DCTERMS.title, Literal("Argo floats datasets catalog")))
g.add((argofloats_catalog, DCTERMS.description, Literal("""
The catalogue RDF node from which are connected each float of the Argo program.
Each float is represented as a DCAT dataset linked with this catalog.
This catalog is one of the sub-catalogs of the Argo program.
""")))
g.add((argo_catalog, DCTERMS.hasPart, argofloats_catalog))
for dac in os.listdir(db_location):
if not os.path.isdir(os.path.join(args.db, dac)):
continue
dac_uri = ARGO[dac]
g.add((ARGO[dac], RDF.type, ARGO.Datacenter))
g.add((ARGO[dac], ARGO.name, Literal(dac, datatype=XSD.string)))
print("DAC: {}".format(dac))
# progress = tqdm(total=len(os.listdir("{}/{}".format(db_location, dac))))
a = 0
for afloat in os.listdir("{}/{}".format(db_location, dac)):
# progress.set_description("Float {}".format(afloat))
if afloat.startswith("."):
# progress.update(1)
continue
# argo_floats[dac] = afloat
afloat_uri = ARGO["argofloat" + afloat]
# g.add((argofloats_catalog, DCAT.dataset, afloat_uri))
deployment_uri = ARGO["deployment" + afloat]
platform_uri = ARGO["platform" + afloat]
activity_uri = ARGO["activity" + afloat]
try:
print("Checking "+str(afloat))
float_info = requests.get("https://fleetmonitoring.euro-argo.eu/floats/"+str(afloat), params = {"wmo":afloat})
print(float_info)
float_info = json.loads(float_info.text)
metadata = Dataset("{}/{}/{}/{}".format(db_location, dac, afloat, afloat + "_meta.nc"))
traj = Dataset("{}/{}/{}/{}".format(db_location, dac, afloat, afloat + "_Rtraj.nc"))
except FileNotFoundError:
print("Float {} excluded from serialization because of missing meta or traj files".format(afloat))
# progress.update(1)
continue
g.add((argofloats_catalog, DCAT.dataset, afloat_uri))
# Float node
g.add((afloat_uri, RDF.type, ARGO.ArgoFloat))
g.add((afloat_uri, SSN.inDeployment, deployment_uri))
g.add((afloat_uri, SOSA.hosts, platform_uri))
g.add((afloat_uri, ARGO.activity, activity_uri))
g.add((afloat_uri, ARGO.dac, dac_uri))
# Platform node
# g.add((platform_uri, RDF.type, ARGO.Platform))
g.add((afloat_uri, ARGO.wmoCode, Literal(afloat, datatype=XSD.int)))
owner = numpy_chararray_to_string(metadata.variables["FLOAT_OWNER"])
if owner != "":
g.add((afloat_uri, ARGO.owner, Literal(ARGO[owner.replace(" ", "")], datatype=XSD.string)))
maker = numpy_chararray_to_string(metadata.variables["PLATFORM_MAKER"])
if maker != "":
g.add((afloat_uri, ARGO.maker,
NERC["R24/current/" + maker]))
platform_type = numpy_chararray_to_string(metadata.variables["PLATFORM_TYPE"])
g.add((afloat_uri, ARGO.type,
NERC["R23/current/" + platform_type]))
transmission = numpy_chararray_to_string(metadata.variables["TRANS_SYSTEM"])
g.add((afloat_uri, ARGO.transmissionSystem,
NERC["R10/current/" + transmission]))
n_sensor = 0
for sensor in metadata.variables["SENSOR_SERIAL_NO"]:
sensor = numpy_chararray_to_string(sensor)
if sensor == "n/a":
sensor = BNode()
sensor_uri = ARGO["sensor" + sensor + "_" + str(n_sensor)]
g.add((sensor_uri, RDF.type, SOSA.Sensor))
g.add((afloat_uri, SOSA.hosts, sensor_uri))
g.add((sensor_uri, ARGO.type,
NERC["R25/current/" + numpy_chararray_to_string(metadata.variables["SENSOR"][n_sensor]) + "/"]))
g.add((sensor_uri, ARGO.maker,
NERC["R26/current/" + numpy_chararray_to_string(
metadata.variables["SENSOR_MAKER"][n_sensor]) + "/"]))
g.add((sensor_uri, ARGO.model,
NERC["R27/current/" + numpy_chararray_to_string(
metadata.variables["SENSOR_MODEL"][n_sensor]) + "/"]))
n_sensor += 1
# Deployment node
g.add((deployment_uri, RDF.type, SSN.Deployment))
pi_name = numpy_chararray_to_string(metadata.variables["PI_NAME"])
pi_uri = ARGO[pi_name.replace(" ", "")]
g.add((deployment_uri, ARGO.principalInvestigator, pi_uri))
g.add((pi_uri, RDF.type, FOAF.Person))
g.add((pi_uri, FOAF.name, Literal(pi_name)))
cruise_id = metadata.variables["DEPLOYMENT_CRUISE_ID"][:].tostring().strip().decode(
"UTF-8") # type: # str
if cruise_id != "":
g.add((deployment_uri, ARGO.cruise, Literal(cruise_id)))
g.add((deployment_uri, GEO.latitude,
Literal(metadata.variables["LAUNCH_LATITUDE"][:], datatype=XSD.float)))
g.add((deployment_uri, GEO.longitude,
Literal(metadata.variables["LAUNCH_LONGITUDE"][:], datatype=XSD.float)))
launch_date = numpy_chararray_to_string(metadata.variables["LAUNCH_DATE"]) # type: str
formatted_datetime = format_netcdf_datetime_to_xsd(launch_date)
g.add((deployment_uri, ARGO.launchDate,
Literal(formatted_datetime, datatype=XSD.dateTime)))
# Activity node
g.add((activity_uri, RDF.type, ARGO.ActivityData))
# g.add((activity_uri, ARGO.lastCycle,
# Literal(traj.variables["CYCLE_NUMBER"][:].tolist()[-1], datatype=XSD.int)))
# Cycle nodes
nb_cycles = traj.variables["CYCLE_NUMBER"][:].tolist()[-1]
profiles = {}
types = [
"R",
"D",
"BR",
"BD"
]
last_cycle = 0
for nb in range(0, nb_cycles, 1):
file_nb = (3 - len(str(nb))) * "0" + str(nb)
profiles[nb] = []
cycle_uri = ARGO["cycle" + BNode()]
g.add((afloat_uri, ARGO.cycle, cycle_uri))
date_written = False
for filetype in types:
full_file_name = filetype + afloat + "_" + file_nb + ".nc"
try:
if full_file_name in os.listdir("{}/{}/{}/profiles/".format(db_location, dac, afloat)):
if not date_written:
append_cycle_date_triples(g, Dataset(f"{db_location}/{dac}/{afloat}/{'profiles'}/{full_file_name}"),
cycle_uri, ARGO)
date_written = True
file_uri = ARGO["file" + BNode()]
g.add((file_uri, RDF.type, ARGO.File))
g.add((file_uri, DCAT.accessURL, Literal(
"ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/profiles/{}".format(dac, afloat, full_file_name))))
g.add((file_uri, ARGO.isFolder, Literal(False, datatype=XSD.boolean)))
g.add((dac_uri, ARGO.file, file_uri))
g.add((cycle_uri, ARGO["number"], Literal(nb, datatype=XSD.int)))
if full_file_name.startswith("R"):
g.add((cycle_uri, ARGO.coreArgoProfile, file_uri))
elif full_file_name.startswith("D"):
g.add((cycle_uri, ARGO.coreArgoProfile, file_uri))
elif full_file_name.startswith("BR"):
g.add((cycle_uri, ARGO.bcgArgoProfile, file_uri))
elif full_file_name.startswith("BD"):
g.add((cycle_uri, ARGO.bcgArgoProfile, file_uri))
except FileNotFoundError:
continue
last_cycle = nb
g.add((activity_uri, ARGO.lastCycle,
Literal(last_cycle, datatype=XSD.int)))
a += 1
# progress.update(1)
if a == 25:
break
print(g.serialize(format="turtle").decode("utf-8"))
# print(len(traj.variables["CYCLE_NUMBER_ACTUAL"][:].tolist()))
# start_juld = traj.variables["JULD_DESCENT_START"][:].tolist()
# for cycle in traj.variables["CYCLE_NUMBER_ACTUAL"][:].tolist():
# # print(cycle)
# if cycle is not None:
# jdate = start_juld[cycle]
# # print(jdate)
# if jdate is None:
# continue
# days = int(str(jdate).split(".")[0])
# start = date(1950, 1, 1)
# delta = timedelta(days)
# offset = start + delta
#
# hours = int(str(24 * float("0." + (str(jdate).split(".")[1]))).split(".")[0])
# minutes = int(1440 * float("0." + str(jdate).split(".")[1]) % 60)
# seconds = int(86400 * float("0." + str(jdate).split(".")[1]) % 60)
#
# print("{} | {} | {} {}:{}:{}".format(cycle, jdate, offset, hours, minutes, seconds))
#
#
# jdate = 19557.1972222222
# days = int(str(jdate).split(".")[0])
# start = date(1950, 1, 1)
# delta = timedelta(days)
# offset = start + delta
#
#
# hours = int(str(24 * float("0." + (str(jdate).split(".")[1]))).split(".")[0])
# minutes = int(1440 * float("0." + str(jdate).split(".")[1]) % 60)
# seconds = int(86400 * float("0." + str(jdate).split(".")[1]) % 60)
#
# print("{} {}:{}:{}".format(offset,hours,minutes,seconds))
# rootgrp = Dataset(args.ncFile, "r")
# print(type(rootgrp.variables["LATITUDE"]))
# nMeasure = 1
# metaData = []
# for value in rootgrp.variables["CYCLE_NUMBER"]:
# # if len(value.shape) > 0:
# if value.data != 0.0:
#
# # print(value.tolist())
#
# # cycleNumber = rootgrp.variables["CYCLE_NUMBER"][nMeasure].tolist()
# latitude = rootgrp.variables["LATITUDE"][nMeasure].tolist()
# longitude = rootgrp.variables["LONGITUDE"][nMeasure].tolist()
# # Date processing
# if value < 0:
# continue
# jdate = rootgrp.variables["JULD"][value.tolist()].tolist()
# # print("{}, {}".format(rootgrp.variables["JULD"][cycleNumber].tolist(), jdate))
# days = int(str(jdate).split(".")[0])
# start = date(1950, 1, 1)
# delta = timedelta(days)
# offset = start + delta
#
# hours = int(str(24 * float("0." + (str(jdate).split(".")[1]))).split(".")[0])
# minutes = int(1440 * float("0." + str(jdate).split(".")[1]) % 60)
# seconds = int(86400 * float("0." + str(jdate).split(".")[1]) % 60)
# # print("Cycle Number {}, measurement {}".format(cycleNumber, value), )
# # print(offset)
# # print("{}, {}:{}:{}".format(jdate, hours, minutes, seconds))
#
# if longitude is not None and longitude is not None:
# print("{}, {}, {}, {}, {}, {}, {}".format(nMeasure, value, latitude, longitude, offset, hours, minutes))
# metaData.insert(value.tolist(), [latitude, longitude, date])
# # print([cycleNumber, value.tolist(), longitude])
# nMeasure += 1
# with open(args.ttlFile, "a") as ttl:
# for cycle in range(1, len(metaData)):
# print("Writing...")
# ttl.write("argo:cycleexample" + str(cycle) + " a argo:Cycle;\n")
# # ttl.write("\targo:number \"" + str(cycle) + "\"^^xsd:int;\n")
# # ttl.write("\targo:startCycleDate \"" + str(cycle) + "\"^^xsd:int;\n")
# ttl.write("\targo:latitude \"" + str(metaData[cycle][0]) + "\"^^xsd:float;\n")
# ttl.write("\targo:longitude \"" + str(metaData[cycle][1]) + "\"^^xsd:float.\n\n")
netCDF4~=1.5.4
\ No newline at end of file
import setuptools
with open('requirements.txt', 'r') as f:
install_requires = f.read().splitlines()
setuptools.setup(name='my_project',
packages=['my_project'],
install_requires=install_requires)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment