Commit a0e57bd5 authored by ALVISET's avatar ALVISET
Browse files

The conversion file now use OpenAPI to fetch data

parent 30e89c48
from netCDF4 import Dataset
import argparse
import os
from datetime import date, timedelta
from datetime import date, timedelta, datetime
from rdflib import Graph, Literal, BNode, DCAT, SOSA, SSN
from rdflib.namespace import *
from tqdm import tqdm
import requests
import json
from ftplib import FTP
import re
import time
import decimal
import sys
import urllib.parse
def truncate(f, n):
......@@ -44,10 +48,10 @@ def append_cycle_date_triples(g, profgrp, cycle, prefix):
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("db", type=str, help="Argo dac database path")
args = parser.parse_args()
db_location = os.path.abspath(args.db)
# parser = argparse.ArgumentParser()
# parser.add_argument("db", type=str, help="Argo dac database path")
# args = parser.parse_args()
# db_location = os.path.abspath(args.db)
# RDF Namespaces
ARGO = Namespace("http://www.argodatamgt.org/vocab#")
......@@ -64,12 +68,9 @@ if __name__ == "__main__":
g.bind("sosa", SOSA)
g.bind("ssn", SSN)
g.bind("geo", GEO)
for dac in os.listdir(db_location):
argo_floats = {dac: {}}
# DCAT description
argo_catalog = ARGO["calaog"+BNode()]
argo_catalog = ARGO["catalog"+BNode()]
g.add((argo_catalog, RDF.type, DCAT.Catalog))
g.add((argo_catalog, DCTERMS.title, Literal("Argo program catalog")))
g.add((argo_catalog, DCTERMS.description, Literal("""
......@@ -87,231 +88,170 @@ if __name__ == "__main__":
g.add((argo_catalog, DCTERMS.hasPart, argofloats_catalog))
ftp_access = FTP("ftp.ifremer.fr")
ftp_access.login()
dac_path = "/ifremer/argo/dac/"
ftp_access.cwd(dac_path)
for dac in os.listdir(db_location):
if not os.path.isdir(os.path.join(args.db, dac)):
continue
for dac in ftp_access.nlst():
dac_uri = ARGO[dac]
g.add((ARGO[dac], RDF.type, ARGO.Datacenter))
g.add((ARGO[dac], ARGO.name, Literal(dac, datatype=XSD.string)))
print("DAC: {}".format(dac))
# progress = tqdm(total=len(os.listdir("{}/{}".format(db_location, dac))))
a = 0
for afloat in os.listdir("{}/{}".format(db_location, dac)):
# progress.set_description("Float {}".format(afloat))
ftp_access.cwd("{}/{}/".format(dac_path, dac))
floats_folders = ftp_access.nlst()
progress = tqdm(total=len(floats_folders), position=0, leave=True, file=sys.stdout)
for afloat in floats_folders:
progress.desc = str(afloat)
if afloat.startswith("."):
# progress.update(1)
continue
# argo_floats[dac] = afloat
afloat_uri = ARGO["argofloat" + afloat]
# g.add((argofloats_catalog, DCAT.dataset, afloat_uri))
g.add((argofloats_catalog, DCAT.dataset, afloat_uri))
deployment_uri = ARGO["deployment" + afloat]
platform_uri = ARGO["platform" + afloat]
# platform_uri = ARGO["platform" + afloat]
activity_uri = ARGO["activity" + afloat]
try:
print("Checking "+str(afloat))
float_info = requests.get("https://fleetmonitoring.euro-argo.eu/floats/"+str(afloat), params = {"wmo":afloat})
print(float_info)
float_info = json.loads(float_info.text)
metadata = Dataset("{}/{}/{}/{}".format(db_location, dac, afloat, afloat + "_meta.nc"))
traj = Dataset("{}/{}/{}/{}".format(db_location, dac, afloat, afloat + "_Rtraj.nc"))
except FileNotFoundError:
print("Float {} excluded from serialization because of missing meta or traj files".format(afloat))
# progress.update(1)
continue
float_info = requests.get("https://fleetmonitoring.euro-argo.eu/floats/"+str(afloat), params = {"wmo":afloat})
if float_info == None:
progress.write("Float {} metadata is nonexistent. Skipping...".format(afloat))
float_info = json.loads(float_info.text)
g.add((argofloats_catalog, DCAT.dataset, afloat_uri))
# Float node
if float_info == None:
progress.write("Float {} metadata is nonexistent. Skipping...".format(afloat))
continue
g.add((afloat_uri, RDF.type, ARGO.ArgoFloat))
g.add((afloat_uri, SSN.inDeployment, deployment_uri))
g.add((afloat_uri, SOSA.hosts, platform_uri))
# g.add((afloat_uri, SOSA.hosts, platform_uri))
g.add((afloat_uri, ARGO.activity, activity_uri))
g.add((afloat_uri, ARGO.dac, dac_uri))
# Platform node
# g.add((platform_uri, RDF.type, ARGO.Platform))
g.add((afloat_uri, ARGO.wmoCode, Literal(afloat, datatype=XSD.int)))
owner = numpy_chararray_to_string(metadata.variables["FLOAT_OWNER"])
if owner != "":
g.add((afloat_uri, ARGO.owner, Literal(ARGO[owner.replace(" ", "")], datatype=XSD.string)))
maker = numpy_chararray_to_string(metadata.variables["PLATFORM_MAKER"])
if maker != "":
owner = float_info["owner"]
if owner != None:
g.add((afloat_uri, ARGO.owner, Literal(ARGO[urllib.parse.quote(owner)], datatype=XSD.string)))
maker = float_info["maker"]
if maker != None:
g.add((afloat_uri, ARGO.maker,
NERC["R24/current/" + maker]))
platform_type = numpy_chararray_to_string(metadata.variables["PLATFORM_TYPE"])
g.add((afloat_uri, ARGO.type,
NERC["R23/current/" + platform_type]))
transmission = numpy_chararray_to_string(metadata.variables["TRANS_SYSTEM"])
g.add((afloat_uri, ARGO.transmissionSystem,
NERC["R10/current/" + transmission]))
NERC["R24/current/" + urllib.parse.quote(maker)]))
platform_type = float_info["platform"]["type"]
if platform_type != None:
g.add((afloat_uri, ARGO.type,
NERC["R23/current/" + urllib.parse.quote(platform_type)]))
transmission = float_info["transmissionSystem"]
if transmission != None:
g.add((afloat_uri, ARGO.transmissionSystem,
NERC["R10/current/" + urllib.parse.quote(transmission)]))
n_sensor = 0
for sensor in metadata.variables["SENSOR_SERIAL_NO"]:
sensor = numpy_chararray_to_string(sensor)
if sensor == "n/a":
sensor = BNode()
sensor_uri = ARGO["sensor" + sensor + "_" + str(n_sensor)]
for sensor in float_info["sensors"]:
sensor_uri = ARGO["sensor" + BNode() + "_" + str(n_sensor)]
g.add((sensor_uri, RDF.type, SOSA.Sensor))
g.add((afloat_uri, SOSA.hosts, sensor_uri))
g.add((sensor_uri, ARGO.type,
NERC["R25/current/" + numpy_chararray_to_string(metadata.variables["SENSOR"][n_sensor]) + "/"]))
g.add((sensor_uri, ARGO.maker,
NERC["R26/current/" + numpy_chararray_to_string(
metadata.variables["SENSOR_MAKER"][n_sensor]) + "/"]))
g.add((sensor_uri, ARGO.model,
NERC["R27/current/" + numpy_chararray_to_string(
metadata.variables["SENSOR_MODEL"][n_sensor]) + "/"]))
if sensor["id"] != None:
g.add((sensor_uri, ARGO.type, NERC["R25/current/" + urllib.parse.quote(sensor["id"]) + "/"]))
if sensor["maker"] != None:
g.add((sensor_uri, ARGO.maker, NERC["R26/current/" + urllib.parse.quote(sensor["maker"]) + "/"]))
if sensor["model"] != None:
g.add((sensor_uri, ARGO.model, NERC["R27/current/" + urllib.parse.quote(sensor["model"]) + "/"]))
n_sensor += 1
# Deployment node
g.add((deployment_uri, RDF.type, SSN.Deployment))
pi_name = numpy_chararray_to_string(metadata.variables["PI_NAME"])
pi_uri = ARGO[pi_name.replace(" ", "")]
pi_name = float_info["deployment"]["principalInvestigatorName"]
pi_uri = ARGO[urllib.parse.quote(pi_name)]
g.add((deployment_uri, ARGO.principalInvestigator, pi_uri))
g.add((pi_uri, RDF.type, FOAF.Person))
g.add((pi_uri, FOAF.name, Literal(pi_name)))
cruise_id = metadata.variables["DEPLOYMENT_CRUISE_ID"][:].tostring().strip().decode(
"UTF-8") # type: # str
if cruise_id != "":
cruise_id = float_info["deployment"]["cruiseName"]
if cruise_id != None:
g.add((deployment_uri, ARGO.cruise, Literal(cruise_id)))
g.add((deployment_uri, GEO.latitude,
Literal(metadata.variables["LAUNCH_LATITUDE"][:], datatype=XSD.float)))
Literal(float_info["deployment"]["lat"], datatype=XSD.float)))
g.add((deployment_uri, GEO.longitude,
Literal(metadata.variables["LAUNCH_LONGITUDE"][:], datatype=XSD.float)))
launch_date = numpy_chararray_to_string(metadata.variables["LAUNCH_DATE"]) # type: str
formatted_datetime = format_netcdf_datetime_to_xsd(launch_date)
Literal(float_info["deployment"]["lon"], datatype=XSD.float)))
formatted_datetime = float_info["deployment"]["launchDate"]
g.add((deployment_uri, ARGO.launchDate,
Literal(formatted_datetime, datatype=XSD.dateTime)))
# Activity node
g.add((activity_uri, RDF.type, ARGO.ActivityData))
# g.add((activity_uri, ARGO.lastCycle,
# Literal(traj.variables["CYCLE_NUMBER"][:].tolist()[-1], datatype=XSD.int)))
try:
fdate = datetime.strptime(float_info["earliestCycle"]["firstStation"]["date"], "%Y-%m-%dT%H:%M:%S.000+0000")
timedelta = (datetime.now() - datetime.fromtimestamp(time.mktime(fdate.timetuple())))
if timedelta != None:
age = timedelta.days / 365.25
age = decimal.Decimal(age).quantize(decimal.Decimal('0.01'), rounding=decimal.ROUND_DOWN)
g.add((activity_uri, ARGO.age, Literal(age, datatype=XSD.float)))
except TypeError:
progress.write("No cycles found for float {}".format(afloat))
ftp_access.cwd("{}/{}/{}/".format(dac_path, dac, afloat))
base_files = ftp_access.nlst()
regex = re.compile(str(afloat) + ".*" + "traj.nc")
traj_files = list(filter(regex.match, base_files))
for traj in traj_files:
g.add((activity_uri, ARGO.trajectoryData,
Literal("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, traj))))
regex = re.compile(str(afloat) + ".*" + "prof.nc")
station_files = list(filter(regex.match, base_files))
for prof in station_files:
g.add((activity_uri, ARGO.stationData,
Literal("ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/{}".format(dac, afloat, prof))))
# Cycle nodes
nb_cycles = traj.variables["CYCLE_NUMBER"][:].tolist()[-1]
try:
nb_cycles = float_info["latestCycle"]["id"]
except:
continue
profiles = {}
types = [
"R",
"D",
"BR",
"BD"
]
last_cycle = 0
for nb in range(0, nb_cycles, 1):
ftp_access.cwd("{}/{}/{}/profiles/".format(dac_path, dac, afloat))
cycle_files = ftp_access.nlst()
g.add((activity_uri, ARGO.lastCycle,
Literal(float_info["latestCycle"]["id"], datatype=XSD.int)))
for cycle in float_info["cycles"]:
nb = int(cycle["id"])
file_nb = (3 - len(str(nb))) * "0" + str(nb)
profiles[nb] = []
cycle_uri = ARGO["cycle" + BNode()]
g.add((afloat_uri, ARGO.cycle, cycle_uri))
date_written = False
for filetype in types:
full_file_name = filetype + afloat + "_" + file_nb + ".nc"
try:
if full_file_name in os.listdir("{}/{}/{}/profiles/".format(db_location, dac, afloat)):
if not date_written:
append_cycle_date_triples(g, Dataset(f"{db_location}/{dac}/{afloat}/{'profiles'}/{full_file_name}"),
cycle_uri, ARGO)
date_written = True
file_uri = ARGO["file" + BNode()]
g.add((file_uri, RDF.type, ARGO.File))
g.add((file_uri, DCAT.accessURL, Literal(
"ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/profiles/{}".format(dac, afloat, full_file_name))))
g.add((file_uri, ARGO.isFolder, Literal(False, datatype=XSD.boolean)))
g.add((dac_uri, ARGO.file, file_uri))
g.add((cycle_uri, ARGO["number"], Literal(nb, datatype=XSD.int)))
if full_file_name.startswith("R"):
g.add((cycle_uri, ARGO.coreArgoProfile, file_uri))
elif full_file_name.startswith("D"):
g.add((cycle_uri, ARGO.coreArgoProfile, file_uri))
elif full_file_name.startswith("BR"):
g.add((cycle_uri, ARGO.bcgArgoProfile, file_uri))
elif full_file_name.startswith("BD"):
g.add((cycle_uri, ARGO.bcgArgoProfile, file_uri))
except FileNotFoundError:
continue
last_cycle = nb
g.add((activity_uri, ARGO.lastCycle,
Literal(last_cycle, datatype=XSD.int)))
regex = re.compile(".*"+str(afloat)+"_"+str(file_nb)+".nc")
this_cycle_files = list(filter(regex.match, cycle_files))
if len(this_cycle_files) == 0:
# print("Float {}, cycle {} does not have cycle files ?".format(afloat, file_nb))
continue
file_uri = ARGO["file" + BNode()]
g.add((file_uri, RDF.type, ARGO.File))
for cycle_file in this_cycle_files:
g.add((file_uri, DCAT.accessURL, Literal(
"ftp://ftp.ifremer.fr/ifremer/argo/dac/{}/{}/profiles/{}".format(dac, afloat, cycle_file))))
if cycle_file.startswith("R"):
g.add((cycle_uri, ARGO.coreArgoProfile, file_uri))
elif cycle_file.startswith("D"):
g.add((cycle_uri, ARGO.coreArgoProfile, file_uri))
elif cycle_file.startswith("B"):
g.add((cycle_uri, ARGO.bcgArgoProfile, file_uri))
elif cycle_file.startswith("S"):
g.add((cycle_uri, ARGO.syntheticArgoProfile, file_uri))
g.add((dac_uri, ARGO.hostsFile, file_uri))
g.add((cycle_uri, ARGO["number"], Literal(nb, datatype=XSD.int)))
g.add((cycle_uri, ARGO.startDate, Literal(cycle["startDate"])))
g.add((cycle_uri, GEO.latitude, Literal(cycle["lat"])))
g.add((cycle_uri, GEO.longitude, Literal(cycle["lon"])))
a += 1
# progress.update(1)
progress.update(1)
if a == 25:
progress.close()
break
print(g.serialize(format="turtle").decode("utf-8"))
# print(len(traj.variables["CYCLE_NUMBER_ACTUAL"][:].tolist()))
# start_juld = traj.variables["JULD_DESCENT_START"][:].tolist()
# for cycle in traj.variables["CYCLE_NUMBER_ACTUAL"][:].tolist():
# # print(cycle)
# if cycle is not None:
# jdate = start_juld[cycle]
# # print(jdate)
# if jdate is None:
# continue
# days = int(str(jdate).split(".")[0])
# start = date(1950, 1, 1)
# delta = timedelta(days)
# offset = start + delta
#
# hours = int(str(24 * float("0." + (str(jdate).split(".")[1]))).split(".")[0])
# minutes = int(1440 * float("0." + str(jdate).split(".")[1]) % 60)
# seconds = int(86400 * float("0." + str(jdate).split(".")[1]) % 60)
#
# print("{} | {} | {} {}:{}:{}".format(cycle, jdate, offset, hours, minutes, seconds))
#
#
# jdate = 19557.1972222222
# days = int(str(jdate).split(".")[0])
# start = date(1950, 1, 1)
# delta = timedelta(days)
# offset = start + delta
#
#
# hours = int(str(24 * float("0." + (str(jdate).split(".")[1]))).split(".")[0])
# minutes = int(1440 * float("0." + str(jdate).split(".")[1]) % 60)
# seconds = int(86400 * float("0." + str(jdate).split(".")[1]) % 60)
#
# print("{} {}:{}:{}".format(offset,hours,minutes,seconds))
# rootgrp = Dataset(args.ncFile, "r")
# print(type(rootgrp.variables["LATITUDE"]))
# nMeasure = 1
# metaData = []
# for value in rootgrp.variables["CYCLE_NUMBER"]:
# # if len(value.shape) > 0:
# if value.data != 0.0:
#
# # print(value.tolist())
#
# # cycleNumber = rootgrp.variables["CYCLE_NUMBER"][nMeasure].tolist()
# latitude = rootgrp.variables["LATITUDE"][nMeasure].tolist()
# longitude = rootgrp.variables["LONGITUDE"][nMeasure].tolist()
# # Date processing
# if value < 0:
# continue
# jdate = rootgrp.variables["JULD"][value.tolist()].tolist()
# # print("{}, {}".format(rootgrp.variables["JULD"][cycleNumber].tolist(), jdate))
# days = int(str(jdate).split(".")[0])
# start = date(1950, 1, 1)
# delta = timedelta(days)
# offset = start + delta
#
# hours = int(str(24 * float("0." + (str(jdate).split(".")[1]))).split(".")[0])
# minutes = int(1440 * float("0." + str(jdate).split(".")[1]) % 60)
# seconds = int(86400 * float("0." + str(jdate).split(".")[1]) % 60)
# # print("Cycle Number {}, measurement {}".format(cycleNumber, value), )
# # print(offset)
# # print("{}, {}:{}:{}".format(jdate, hours, minutes, seconds))
#
# if longitude is not None and longitude is not None:
# print("{}, {}, {}, {}, {}, {}, {}".format(nMeasure, value, latitude, longitude, offset, hours, minutes))
# metaData.insert(value.tolist(), [latitude, longitude, date])
# # print([cycleNumber, value.tolist(), longitude])
# nMeasure += 1
ftp_access.close()
with open("argo_floats_generated.ttl", "w") as ttl:
ttl.write(g.serialize(format="turtle").decode("utf-8"))
# with open(args.ttlFile, "a") as ttl:
# for cycle in range(1, len(metaData)):
......
import setuptools
with open('requirements.txt', 'r') as f:
install_requires = f.read().splitlines()
setuptools.setup(name='my_project',
packages=['my_project'],
install_requires=install_requires)
import setuptools
with open('requirements.txt', 'r') as f:
install_requires = f.read().splitlines()
setuptools.setup(name='my_project',
packages=['my_project'],
install_requires=install_requires)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment