Commit ad8778bd authored by ALVISET's avatar ALVISET
Browse files

Added executable prototype and options to the main script

parent 161b8f63
......@@ -11,6 +11,7 @@ import time
import decimal
import sys
import urllib.parse
import argparse
def truncate(f, n):
......@@ -46,28 +47,42 @@ def append_cycle_date_triples(g, profgrp, cycle, prefix):
minutes = int(1440 * float("0." + str(jdate).split(".")[1]) % 60)
g.add((cycle, prefix.date, Literal("{}T{}:{}:00".format(offset, hours, minutes), datatype=XSD.datetime)))
def bind_graph(graph, **kwargs):
for prefix in kwargs:
graph.bind(prefix[0], prefix[1])
def create_new_argo_graph(dac_name, prefixes_array):
g = Graph()
bind_graph(prefixes_array)
g.add((ARGO[dac_name], RDF.type, ARGO.Datacenter))
g.add((ARGO[dac_name], RDFS.label, Literal(dac_name, datatype=XSD.string)))
return g
if __name__ == "__main__":
# parser = argparse.ArgumentParser()
# parser.add_argument("db", type=str, help="Argo dac database path")
# args = parser.parse_args()
# db_location = os.path.abspath(args.db)
parser = argparse.ArgumentParser()
parser.add_argument("destination", type=str, help="Absolute path where the converted files will be written")
parser.add_argument("--dac", type=str, nargs="*", help="Convert only floats metadata from the specified dacs")
parser.add_argument("--limit", "-l", type=int, help="Limit the number of floats converted per DAC")
args = parser.parse_args()
if not str.endswith(args.destination, "/"):
args.destination += "/"
# RDF Namespaces
ARGO = Namespace("http://www.argodatamgt.org/argo-ontology#")
NERC = Namespace("http://vocab.nerc.ac.uk/collection/")
GEO = Namespace("https://www.w3.org/2003/01/geo/wgs84_pos#")
prefixes = (("argo", ARGO),
("nerc", NERC),
("foaf", FOAF),
("dcat", DCAT),
("dct", DCTERMS),
("sosa", SOSA),
("ssn", SSN),
("geo", GEO))
argo_floats = {}
g = Graph()
g.bind("argo", ARGO)
g.bind("nerc", NERC)
g.bind("foaf", FOAF)
g.bind("dcat", DCAT)
g.bind("dct", DCTERMS)
g.bind("sosa", SOSA)
g.bind("ssn", SSN)
g.bind("geo", GEO)
# DCAT description
# argo_catalog = ARGO["catalog"+BNode()]
......@@ -94,14 +109,29 @@ if __name__ == "__main__":
ftp_access.cwd(dac_path)
for dac in ftp_access.nlst():
dac_uri = ARGO[dac]
g.add((ARGO[dac], RDF.type, ARGO.Datacenter))
g.add((ARGO[dac], ARGO.name, Literal(dac, datatype=XSD.string)))
print("DAC: {}".format(dac))
a = 0
check = True
# print(args)
if args.dac != None:
for dc in args.dac:
if dc != dac:
check = False
else:
check = True
break
if not check:
continue
ftp_access.cwd("{}/{}/".format(dac_path, dac))
floats_folders = ftp_access.nlst()
progress = tqdm(total=len(floats_folders), position=0, leave=True, file=sys.stdout)
part = 1
g = Graph()
bind_graph(prefixes)
dac_uri = ARGO[dac]
create_new_argo_graph(dac, prefixes)
print("DAC: {}".format(dac))
a = 1
for afloat in floats_folders:
progress.desc = str(afloat)
if afloat.startswith("."):
......@@ -110,11 +140,12 @@ if __name__ == "__main__":
afloat_uri = ARGO["argofloat" + afloat]
# g.add((argofloats_catalog, DCAT.dataset, afloat_uri))
deployment_uri = ARGO["deployment" + afloat]
# platform_uri = ARGO["platform" + afloat]
activity_uri = ARGO["activity" + afloat]
float_info = requests.get("https://fleetmonitoring.euro-argo.eu/floats/"+str(afloat), params = {"wmo":afloat})
if float_info == None:
progress.write("Float {} metadata is nonexistent. Skipping...".format(afloat))
if float_info.text == "" or float_info.text == None:
continue
float_info = json.loads(float_info.text)
# g.add((argofloats_catalog, DCAT.dataset, afloat_uri))
......@@ -124,11 +155,10 @@ if __name__ == "__main__":
continue
g.add((afloat_uri, RDF.type, ARGO.ArgoFloat))
g.add((afloat_uri, SSN.inDeployment, deployment_uri))
# g.add((afloat_uri, SOSA.hosts, platform_uri))
g.add((afloat_uri, ARGO.activity, activity_uri))
g.add((afloat_uri, ARGO.dac, dac_uri))
g.add((afloat_uri, ARGO.wmoCode, Literal(afloat, datatype=XSD.int)))
g.add((afloat_uri, ARGO.wmoCode, Literal(afloat, datatype=XSD.string)))
owner = float_info["owner"]
if owner != None:
g.add((afloat_uri, ARGO.owner, Literal(ARGO[urllib.parse.quote(owner)], datatype=XSD.string)))
......@@ -164,7 +194,8 @@ if __name__ == "__main__":
# Deployment node
g.add((deployment_uri, RDF.type, SSN.Deployment))
pi_name = float_info["deployment"]["principalInvestigatorName"]
pi_uri = ARGO[urllib.parse.quote(pi_name)]
if pi_name != None:
pi_uri = ARGO[urllib.parse.quote(pi_name)]
g.add((deployment_uri, ARGO.principalInvestigator, pi_uri))
g.add((pi_uri, RDF.type, FOAF.Person))
g.add((pi_uri, FOAF.name, Literal(pi_name)))
......@@ -212,10 +243,11 @@ if __name__ == "__main__":
except:
continue
profiles = {}
ftp_access.cwd("{}/{}/{}/profiles/".format(dac_path, dac, afloat))
try:
ftp_access.cwd("{}/{}/{}/profiles/".format(dac_path, dac, afloat))
except:
continue
cycle_files = ftp_access.nlst()
# g.add((activity_uri, ARGO.lastCycle,
# Literal(float_info["latestCycle"]["id"], datatype=XSD.int)))
for cycle in float_info["cycles"]:
nb = int(cycle["id"])
file_nb = (3 - len(str(nb))) * "0" + str(nb)
......@@ -254,13 +286,22 @@ if __name__ == "__main__":
g.add((cycle_uri, GEO.longitude, Literal(cycle["lon"])))
a += 1
progress.update(1)
if a == 25:
progress.close()
if a == args.limit:
break
if a > 500:
part += 1
with open("{}argo_floats_generated_{}_{}.ttl".format(args.destination, dac, part), "w") as ttl:
ttl.write(g.serialize(format="turtle").decode("utf-8"))
g = Graph()
bind_graph(g)
a = 1
# break
with open("{}argo_floats_generated_{}_{}.ttl".format(args.destination, dac, part), "w") as ttl:
ttl.write(g.serialize(format="turtle").decode("utf-8"))
ftp_access.close()
with open("argo_floats_generated.ttl", "w") as ttl:
ttl.write(g.serialize(format="turtle").decode("utf-8"))
# with open(args.ttlFile, "a") as ttl:
# for cycle in range(1, len(metaData)):
......
......@@ -5,6 +5,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("generated", type=str, help="The generated file from the convert script")
parser.add_argument("ontology", type=str, help="The ontology file to include in the generated file")
args = parser.parse_args()
graph1 = Graph()
......@@ -15,5 +16,5 @@ if __name__ == "__main__":
graph2.parse(f, format="ttl")
graph3 = graph1 + graph2
with open("merged.ttl", "w") as ttl:
with open(args.generated, "w") as ttl:
ttl.write(graph3.serialize(format="turtle").decode("utf-8"))
\ No newline at end of file
#!/bin/bash
# Convert all Argo DACs metadata into RDF format
# To gain time, they are processed separately based on how much floats they have
run1="aoml"
run2="coriolis"
run3=("bodc" "csio" "csiro" "incois" "jma" "kma" "kordi" "meds" "nmdis")
#PIDs=()
parent=$(dirname $(dirname $(realpath $0)))
cd "$parent/argoToRDF"
python3 convert.py $1 --dac $run1 2>&1 > /dev/null &
PID=$!
#PIDs+=$!
python3 convert.py $1 --dac $run2 2>&1 > /dev/null &
#PIDs+=$!
for dac in "${run3[@]}"; do
echo "DAC : $dac"
python3 convert.py $1 --dac $dac 2>&1 > /dev/null &
# PIDs+=$!
done
echo "Running conversion..."
wait $PID
echo "Merging with ontology..."
python3 merge.py "${1}/argo_floats_generated_${run1}.ttl" $2
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment