Commit 9bcf746c authored by CEVAER's avatar CEVAER

Genericity of processing to include SMOS files. Added report .status and .log...

Genericity of processing to include SMOS files. Added report .status and .log files to be able to see errors.
parent 174d233d
......@@ -5,30 +5,12 @@ import logging
import datetime
import os
import sys
from sqlalchemy import create_engine, and_, func, cast
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from geoalchemy2 import Geometry
from cyclobs_orm import SimpleTrack
import xarray
import pandas as pd
import geopandas as gpd
import numpy as np
import math
from geo_shapely import shape360
from shapely.wkt import dumps
logging.basicConfig()
#logging.basicConfig()
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)
def deg180(deg360):
"""
convert [0,360] to [-180,180]
"""
res = np.copy(deg360)
res[res > 180] = res[res > 180] - 360
return res
#logger.setLevel(logging.INFO)
def extract_date_from_filename(filename):
......@@ -37,196 +19,6 @@ def extract_date_from_filename(filename):
return datetime.datetime.strptime(date_part, "%Y-%m-%d")
def get_track_points_from_database(session, smap_Date):
req_tracks = session.query(SimpleTrack, func.ST_X(cast(SimpleTrack.geom, Geometry)).label("lon"),
func.ST_Y(cast(SimpleTrack.geom, Geometry)).label("lat")) \
.filter(and_(SimpleTrack.date >= smap_Date,
SimpleTrack.date < smap_Date + datetime.timedelta(days=1))).group_by(SimpleTrack).all()
return req_tracks
def get_colocated_track_points(dataset, track_points, file_date, deg_delta=3, time_delta=60):
track_point_offsets = []
for track_point in track_points:
logger.debug(f"Track point : {track_point}")
sel = dataset.sel(lon=track_point["lon"], lat=track_point["lat"], method="nearest")
lonIndex = np.where(dataset["lon"].data == sel["lon"].data)[0][0]
latIndex = np.where(dataset["lat"].data == sel["lat"].data)[0][0]
logger.debug(f"Point find : lon: {sel['lon'].values}, lat: {sel['lat'].values}, minute: {sel['minute'].values},"
f" lon_index: {lonIndex}, lat_index {latIndex}")
df = sel.to_dataframe()
# Storing the time offset between the data point and the track point
# The time offset is stored with all the track point data to ease later processing
for node in df.index:
if not pd.isna(df["minute"][node]):
point_date = file_date + df["minute"][node]
time_offset = abs(track_point["date"] - point_date)
logger.debug(f" Data point date: {point_date}, Track point date: {track_point['date']},"
f" Time offset: {time_offset}")
# If this is the first node processed OR that the previous nodes had NAN times OR that previous nodes
# offset times are greater than this one
if node == 0 or not "time_offset" in track_point or track_point["time_offset"] > time_offset:
logger.debug(f"Minute : {df['minute'][node]}")
track_point["time_offset"] = time_offset
track_point["node"] = node
track_point["lon_index"] = lonIndex
track_point["lat_index"] = latIndex
if "time_offset" in track_point:
track_point_offsets.append(track_point)
kept_track_points = {}
# Getting the best track point for each sid (storm id).
# One SMAP file can contain several cyclones acquisitions so that is why we are doing this filtering per sid
for track_point in track_point_offsets:
# TODO replace timedelta with database track sampling time
if track_point["time_offset"] <= datetime.timedelta(minutes=7, seconds=30):
if track_point["sid"] not in kept_track_points:
kept_track_points[track_point["sid"]] = track_point
elif kept_track_points[track_point["sid"]]["time_offset"] > track_point["time_offset"]:
kept_track_points[track_point["sid"]] = track_point
return kept_track_points
# Extracting min and max time
def extract_start_stop_measure(dataset):
df = dataset.to_dataframe()
min_time = df["minute"].min(skipna=True)
max_time = df["minute"].max(skipna=True)
return min_time, max_time
# Set NaNs where the field "minute" and "wind" of the dataset is outside the +-time_offset
# relative to the cyclone track point date
def set_nan_outside_time_offset(dataset, track_data, time_offset=30):
track_timedelta = datetime.timedelta(hours=track_data["date"].hour, minutes=track_data["date"].minute)
dataset["minute"] = dataset["minute"].where((dataset["minute"].data <
np.timedelta64(int((track_timedelta + datetime.timedelta(
minutes=time_offset)).total_seconds()), 's')) &
(dataset["minute"].data >
np.timedelta64(int((track_timedelta - datetime.timedelta(
minutes=time_offset)).total_seconds()), 's')))
# Apply "minute" created NaNs on "wind" field
dataset["wind"].data[np.isnan(dataset["minute"].data)] = np.nan
return dataset
# Extract footprint (not taking NaN data into account)
def compute_footprint(dataset, wind_field):
df = dataset.to_dataframe()
df = df[pd.isna(df[wind_field]) == False]
df = df.reset_index()
gdf = gpd.GeoDataFrame(
df, geometry=gpd.points_from_xy(df.lon, df.lat))
p = gdf.unary_union.convex_hull
return p
def extract_write_cyclone_data(dataset, kept_track_points, filename, output_path, file_date, extract_size_km=2000):
for sid, track_point in kept_track_points.items():
# Km per deg in latitude
km_per_deg_lat = 111
# Degrees offset per latitude index in data array
deg_per_index_lat = abs(dataset["lat"][0] - dataset["lat"][1])
# Km offset per latitude index in data array
km_per_idx_lat = km_per_deg_lat * deg_per_index_lat
logger.debug(f"km_per_idx_lat: {km_per_idx_lat}")
# Number of latitude index to take to have extract_size_km size of data
nb_idx_lat = round(float(extract_size_km / km_per_idx_lat) / 2)
logger.debug(f"nb_idx_lat: {nb_idx_lat}")
sel_lat = dataset.isel(lat=slice(track_point["lat_index"] - nb_idx_lat,
track_point["lat_index"] + nb_idx_lat),
node=track_point["node"])
# In next section, the goal is to compute how many longitude index we must take to have a data section of
# 'extract_size_km'. As the km offset between each longitude degrees depends of the latitude, we first compute
# the latitude_average of the current selected data and compute the km_per_deg at that latitude.
# Latitude average of the current area
lat_av = sel_lat["lat"].max() + sel_lat["lat"].min() / 2
logger.debug(f"lat_av: {lat_av}")
# Earth circumference
earth_circum = 40000
# Kilometer per longitude deg at current latitude
km_per_deg = abs(earth_circum * math.cos(math.radians(lat_av)) / 360)
logger.debug(f"km_per_deg: {km_per_deg}")
# Deg offset per index in data
deg_per_index = abs(sel_lat["lon"][0] - sel_lat["lon"][1])
# Km offset per index in data
km_per_idx = km_per_deg * deg_per_index
logger.debug(f"km_per_idx: {km_per_idx}")
# Number of longitude index to take to have extract_size_km size of data
nb_idx_lon = float(extract_size_km / km_per_idx)
nb_idx_lon_2 = round(nb_idx_lon / 2)
logger.debug(f"nb_idx_lon_2 : {nb_idx_lon_2}")
# Selecting area of interest
sel = dataset.isel(lon=slice(track_point["lon_index"] - nb_idx_lon_2, track_point["lon_index"] + nb_idx_lon_2),
lat=slice(track_point["lat_index"] - nb_idx_lat, track_point["lat_index"] + nb_idx_lat),
node=track_point["node"])
# Removing data that is on other swath than the one where the cyclone is
sel = set_nan_outside_time_offset(sel, track_point)
# Extracting min and max time to set attributes on NetCDF
min_time, max_time = extract_start_stop_measure(sel)
# print("jlupiku", df[df["minute"] == min_time], df[df["minute"] == max_time])
# print(sel["minute"].data.tolist())
if max_time - min_time > datetime.timedelta(minutes=30):
logger.error(f"Time offset between min_time ({min_time}) and max_time ({max_time}) exceeds 30 min.")
sys.exit(1)
logger.debug(f"min_time: {min_time}, max_time: {max_time}")
sel.attrs["measurement_start_date"] = str(file_date + min_time)
sel.attrs["measurement_stop_date"] = str(file_date + max_time)
# Removing useless attributes
if "aprrox_local_equatorial_crossing_time" in sel.attrs:
del sel.attrs["aprrox_local_equatorial_crossing_time"]
if "swath_sector" in sel.attrs:
del sel.attrs["swath_sector"]
sel = sel.drop_vars("minute")
sel = sel.rename_vars({"wind": "wind_speed"})
# Converting lon from 0;360 range to -180;180
sel["lon"] = deg180(sel["lon"])
footprint_polyg = compute_footprint(sel, "wind_speed")
sel.attrs["footprint"] = dumps(footprint_polyg)
output_filename = f"RSS_smap_wind_{track_point['date'].strftime('%Y_%m_%d_%H_%M_%S')}_{track_point['sid']}" \
f"_v01.0.nc"
sel.to_netcdf(os.path.join(output_path, output_filename))
def process_smap_file(session, file, output_path):
logger.debug(f"Processing {file}...")
filename = os.path.basename(file)
file_date = extract_date_from_filename(filename)
logger.debug(f"File date {file_date}")
track_points = get_track_points_from_database(session, file_date)
logger.debug(f"Number of track point found : {len(track_points)}")
track_points = [{"sid": track_point[0].sid, "lon": shape360(track_point.lon, 0)[0],
"lat": track_point.lat,
"date": track_point[0].date} for track_point in track_points]
dataset = xarray.open_dataset(file)
kept_track_points = get_colocated_track_points(dataset, track_points, file_date)
logger.info(f"For file {filename} Kept track that will be used to extract SMAP data: {kept_track_points}")
extract_write_cyclone_data(dataset, kept_track_points, filename, output_path, file_date)
if __name__ == "__main__":
description = """
Read SMAP netCDF files from a directory, extract the wind data of cyclones and save it into a new netCDF file.
......@@ -244,12 +36,24 @@ if __name__ == "__main__":
if sys.gettrace():
logger.setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
if args.debug:
logger.setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
else:
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO)
engine = create_engine(args.dbd, pool_size=20, max_overflow=0)
from smap_cyclones import process_file
engine = create_engine(args.dbd, pool_size=50, max_overflow=0)
Session = sessionmaker(bind=engine)
attrs_to_del = ["aprrox_local_equatorial_crossing_time", "swath_sector"]
for f in args.input:
process_smap_file(Session(), f, args.output)
# TODO generate filename_format depending on input filename
process_file(Session(), file=f, output_path=args.output, extract_date_func=extract_date_from_filename,
attrs_to_del=attrs_to_del, var_to_del=["minute"], wind_col="wind", time_col="minute", lat_col="lat",
lon_col="lon", pass_col="node", pass_width=1000, filename_format="RSS_smap_wind_<time>_<sid>_v01.0.nc")
#!/usr/bin/env python
import argparse
import logging
import datetime
import os
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# logger.setLevel(logging.INFO)
def extract_date_from_filename(filename):
logger.debug(f"Filename: {filename}")
date_part = filename.split("_")[4]
return datetime.datetime.strptime(date_part, "%Y%m%d")
if __name__ == "__main__":
description = """
Read SMOS netCDF files from a directory, extract the wind data of cyclones and save it into a new netCDF file.
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument("--dbd", action="store", type=str, required=True,
help='database (postgresql://user:pass@host:5432/base_name)')
parser.add_argument("--debug", action="store_true", default=False, help="Run in debug mode (verbose output)")
parser.add_argument("-i", "--input", action="store", type=str, required=True, nargs="+",
help="Input files from which extract cyclone data")
parser.add_argument("-o", "--output", action="store", type=str, default="./output_test",
help="Output path where files will be written")
args = parser.parse_args()
if sys.gettrace():
logger.setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
if args.debug:
logger.setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG)
else:
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO)
engine = create_engine(args.dbd, pool_size=50, max_overflow=0)
Session = sessionmaker(bind=engine)
attrs_to_del = ["aprrox_local_equatorial_crossing_time", "swath_sector"]
from smap_cyclones import process_file
for f in args.input:
# TODO generate filename_format depending on input filename
process_file(Session(), file=f, output_path=args.output, extract_date_func=extract_date_from_filename,
attrs_to_del=attrs_to_del, var_to_del=["measurement_time"], wind_col="wind_speed",
time_col="measurement_time",
lat_col="lat", lon_col="lon", pass_col=None, pass_width=1200,
filename_format="SM_OPER_MIR_<time>_<sid>.nc")
#!/bin/bash
output_path=$5
file_path=$7
filename="${file_path##*/}"
filename_no_ext="${filename%.*}"
file_report_dir="$output_path/report/$filename_no_ext"
mkdir -p "$file_report_dir"
$1 "${@:2}" > "$file_report_dir/Extract.log" 2>&1
status=$?
echo $status > "$file_report_dir/Extract.status"
cat "$file_report_dir/Extract.log"
exit $status
#!/bin/bash
while getopts ":d:o:" o; do
case "${o}" in
d)
days=${OPTARG}
;;
o)
output_path=${OPTARG}
;;
GETOPT=$(getopt -n $0 -o d:o:h:: --long smos,smap,help -- "$@")
eval set -- "$GETOPT"
while true; do
case "$1" in
-d) days=$2 ; shift 2 ;;
-o) output_path=$2 ; shift 2 ;;
--smos) smos=True ; shift ;;
--smap) smap=True ; shift ;;
-- ) shift ; break ;;
* ) echo "unknown opt $1" >&2 ; exit 1 ;;
esac
done
......@@ -37,31 +39,61 @@ if [ -z "$output_path" ] ; then
fi
smap_path=/home/datawork-cersat-public/provider/remss/satellite/l3/smap/smap/wind/v1.0/daily
smos_path=/home/ref-smoswind-public/data/v2.0/l3/data
if [ -z "$smos" ] && [ -z "$smap" ] ; then
echo "You must chose which archive to update with --smos or --smap"
exit 1
fi
if [ "$smap" == "True" ] && [ "$smos" == "True" ] ; then
echo "Cannot update SMOS and SMAP at the same time. Only chose --smos or --smap."
exit 1
elif [ "$smap" == "True" ] ; then
echo "Updating SMAP files."
inpath=$smap_path
elif [ "$smos" == "True" ] ; then
echo "Updating SMOS files."
inpath=$smos_path
fi
shopt -s globstar
if [[ -n $days ]]; then
echo "Updating last $days days"
# Converting the date "$days days ago" to seconds since UNIX-time
sec=$(date --date="$days days ago" +%s)
files_to_process=""
# Keeping only files that are in the requested time interval
for smap_file in "$smap_path"/**/*.nc; do # Whitespace-safe and recursive
secDt=$(echo "$smap_file" | awk -F_ '{print $5"-"$6"-"$7}' | xargs -IDT date -d DT +%s)
# Keeping only files that are in the requested time interval for SMAP
echo "Fetching files for the last $days days. Generated files will be written in $output_path"
for file in "$inpath"/**/*.nc; do # Whitespace-safe and recursive
if [ "$smap" == "True" ] ; then
secDt=$(echo "$file" | awk -F_ '{print $5"-"$6"-"$7}' | xargs -IDT date -d DT +%s)
elif [ "$smos" == "True" ] ; then
secDt=$(echo "$file" | awk -F_ '{print $5}' | xargs -IDT date -d DT +%s)
fi
if (( $secDt > $sec )); then
files_to_process="$files_to_process"$'\n'"$smap_file"
files_to_process="$files_to_process"$'\n'"$file"
fi
done
else
echo "Updating whole SMAP archive"
files_to_process=$(find $smap_path -name "*.nc")
echo "Updating whole archive in $output_path"
files_to_process=$(find $inpath -name "*.nc")
fi
# Removing empty lines
files_to_process=$(echo "$files_to_process" | sed -r '/^\s*$/d')
#printf "%s\n $files_to_process"
# Generating cyclone SMAP files
printf "%s\n $files_to_process" | xargs -r -P 10 -l50 extractSmapCyclone.py --dbd "$DATABASE_URL" -o "$output_path" -i
# Generating cyclone files
if [ "$smap" == "True" ] ; then
echo "Generating SMAP files..."
#printf "%s\n $files_to_process" | xargs -r -P 5 extractSmapCyclone.py --dbd "$DATABASE_URL" -o "$output_path" -i
printf "%s\n $files_to_process" | xargs -r -P 5 -L 1 extractWrapper.sh extractSmapCyclone.py --dbd "$DATABASE_URL" -o "$output_path" -i
elif [ "$smos" == "True" ] ; then
echo "Generating SMOS files..."
printf "%s\n $files_to_process" | xargs -r -P 5 -L 1 extractWrapper.sh extractSmosCyclone.py --dbd "$DATABASE_URL" -o "$output_path" -i
fi
import logging
import datetime
import os
import sys
from sqlalchemy import and_, func, cast
from geoalchemy2 import Geometry
from cyclobs_orm import SimpleTrack
import xarray
import pandas as pd
import geopandas as gpd
import numpy as np
import math
from geo_shapely import shape360
from shapely.wkt import dumps
logger = logging.getLogger(__name__)
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
def deg180(deg360):
"""
convert [0,360] to [-180,180]
"""
res = np.copy(deg360)
res[res > 180] = res[res > 180] - 360
return res
def get_track_points_from_database(session, date):
req_tracks = session.query(SimpleTrack, func.ST_X(cast(SimpleTrack.geom, Geometry)).label("lon"),
func.ST_Y(cast(SimpleTrack.geom, Geometry)).label("lat")) \
.filter(and_(SimpleTrack.date >= date,
SimpleTrack.date < date + datetime.timedelta(days=1))).group_by(SimpleTrack).all()
return req_tracks
# Selecting best pass
def select_pass(selected_set, file_date, track_point, lon_index, lat_index, time_col_name, is_full_time):
df = selected_set.to_dataframe()
# Storing the time offset between the data point and the track point
# The time offset is stored with all the track point data to ease later processing
for node in df.index:
if not pd.isna(df[time_col_name][node]):
if not is_full_time:
point_date = file_date + df[time_col_name][node]
else:
point_date = df[time_col_name][node]
time_offset = abs(track_point["date"] - point_date)
logger.debug(f" Data point date: {point_date}, Track point date: {track_point['date']},"
f" Time offset: {time_offset}")
# If this is the first node processed OR that the previous nodes had NAN times OR that previous nodes
# offset times are greater than this one
if node == 0 or not "time_offset" in track_point or track_point["time_offset"] > time_offset:
logger.debug(f"Minute : {df[time_col_name][node]}")
track_point["time_offset"] = time_offset
track_point["node"] = node
track_point["lon_index"] = lon_index
track_point["lat_index"] = lat_index
def is_full_time(dataset, time_col_name):
df = dataset.to_dataframe().reset_index()
df = df.dropna(subset=[time_col_name]).reset_index()
time = df.loc[0, time_col_name]
if type(time) == pd._libs.tslibs.timestamps.Timestamp:
return True
elif type(time) == pd._libs.tslibs.timedeltas.Timedelta:
return False
else:
logger.error(f"Error : could not detect time type on column {time_col_name}. Exiting.")
sys.exit(1)
def get_colocated_track_points(dataset, track_points, file_date, wind_col_name, is_full_time,
time_col_name, pass_col_name=None, lon_col_name="lon", lat_col_name="lat",
deg_delta=3, time_delta=60):
track_point_offsets = []
for track_point in track_points:
logger.debug(f"Track point : {track_point}")
sel = dataset.sel(**{lon_col_name: track_point["lon"], lat_col_name: track_point["lat"], "method": "nearest"})
lon_index = np.where(dataset[lon_col_name].data == sel[lon_col_name].data)[0][0]
lat_index = np.where(dataset[lat_col_name].data == sel[lat_col_name].data)[0][0]
logger.debug(f"Point find : lon: {sel[lon_col_name].values}, lat: {sel[lat_col_name].values},"
f" minute: {sel[time_col_name].values},"
f" lon_index: {lon_index}, lat_index {lat_index}")
if pass_col_name:
select_pass(sel, file_date, track_point, lon_index, lat_index, time_col_name, is_full_time)
else:
df = sel.to_dataframe().reset_index()
if not pd.isna(df.loc[0, time_col_name]):
if not is_full_time:
point_date = file_date + df.loc[0, time_col_name]
else:
point_date = df.loc[0, time_col_name]
time_offset = abs(track_point["date"] - point_date)
track_point["time_offset"] = time_offset
track_point["node"] = None
track_point["lon_index"] = lon_index
track_point["lat_index"] = lat_index
if "time_offset" in track_point:
track_point_offsets.append(track_point)
kept_track_points = {}
# Getting the best track point for each sid (storm id).
# One file can contain several cyclones acquisitions so that is why we are doing this filtering per sid
for track_point in track_point_offsets:
# TODO replace timedelta with database track sampling time
if track_point["time_offset"] <= datetime.timedelta(minutes=7, seconds=30):
if track_point["sid"] not in kept_track_points:
kept_track_points[track_point["sid"]] = track_point
elif kept_track_points[track_point["sid"]]["time_offset"] > track_point["time_offset"]:
kept_track_points[track_point["sid"]] = track_point
return kept_track_points
# Set NaNs where the field "minute" and "wind" of the dataset is outside the +-time_offset
# relative to the cyclone track point date
def set_nan_outside_time_offset(dataset, track_data, time_col_name, wind_col_name, is_full_time, time_offset=30):
track_timedelta = datetime.timedelta(hours=track_data["date"].hour, minutes=track_data["date"].minute)
if is_full_time:
min_time_bound = np.datetime64(track_data["date"] - datetime.timedelta(minutes=time_offset))
max_time_bound = np.datetime64(track_data["date"] + datetime.timedelta(minutes=time_offset))
else:
min_time_bound = np.timedelta64(int((track_timedelta - datetime.timedelta(
minutes=time_offset)).total_seconds()), 's')
max_time_bound = np.timedelta64(int((track_timedelta + datetime.timedelta(
minutes=time_offset)).total_seconds()), 's')
dataset[time_col_name] = dataset[time_col_name].where((dataset[time_col_name].data < max_time_bound) &
(dataset[time_col_name].data > min_time_bound))
# Apply "minute" created NaNs on "wind" field
dataset[wind_col_name].data[np.isnan(dataset[time_col_name].data)] = np.nan
return dataset
# Extract footprint (not taking NaN data into account)
def compute_footprint(dataset, wind_field):
df = dataset.to_dataframe()
df = df[pd.isna(df[wind_field]) == False]
df = df.reset_index()
gdf = gpd.GeoDataFrame(
df, geometry=gpd.points_from_xy(df.lon, df.lat))
p = gdf.unary_union.convex_hull
return p
# Extracting min and max time
def extract_start_stop_measure(dataset, time_col_name):
df = dataset.to_dataframe().reset_index()
min_time = df[time_col_name].min(skipna=True)
max_time = df[time_col_name].max(skipna=True)
return min_time, max_time
def extract_write_cyclone_data(dataset, kept_track_points, filename, output_path,
file_date, attrs_to_del, var_to_del, lon_col_name,
lat_col_name, wind_col_name, time_col_name, pass_col_name,
filename_format,
is_full_time,
extract_size_km=2000):
for sid, track_point in kept_track_points.items():
# Km per deg in latitude
km_per_deg_lat = 111
# Degrees offset per latitude index in data array
deg_per_index_lat = abs(dataset[lat_col_name][0] - dataset[lat_col_name][1])
# Km offset per latitude index in data array
km_per_idx_lat = km_per_deg_lat * deg_per_index_lat
logger.debug(f"km_per_idx_lat: {km_per_idx_lat}")
# Number of latitude index to take to have extract_size_km size of data
nb_idx_lat = round(float(extract_size_km / km_per_idx_lat) / 2)
logger.debug(f"nb_idx_lat: {nb_idx_lat}")
if track_point["node"] is not None:
sel_lat = dataset.isel(**{lat_col_name: slice(track_point["lat_index"] - nb_idx_lat,
track_point["lat_index"] + nb_idx_lat),