Commit 55dcaf91 authored by CEVAER's avatar CEVAER

Fully working SMAP extraction. Tests still needed.

parent fa50f41a
......@@ -12,6 +12,8 @@ from cyclobs_orm import SimpleTrack
import xarray
import pandas as pd
import numpy as np
import math
import glob
from geo_shapely import shape360
......@@ -20,7 +22,17 @@ logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)
def deg180(deg360):
"""
convert [0,360] to [-180,180]
"""
res = np.copy(deg360)
res[res > 180] = res[res > 180]-360
return res
def extract_date_from_filename(filename):
logger.debug(f"Filename: {filename}")
date_part = "-".join(filename.split("_")[4:7])
return datetime.datetime.strptime(date_part, "%Y-%m-%d")
......@@ -56,6 +68,7 @@ def get_colocated_track_points(dataset, track_points, file_date, deg_delta=3, ti
# If this is the first node processed OR that the previous nodes had NAN times OR that previous nodes
# offset times are greater than this one
if node == 0 or not "time_offset" in track_point or track_point["time_offset"] > time_offset:
logger.debug(f"Minute : {df['minute'][node]}")
track_point["time_offset"] = time_offset
track_point["node"] = node
track_point["lon_index"] = lonIndex
......@@ -77,12 +90,105 @@ def get_colocated_track_points(dataset, track_points, file_date, deg_delta=3, ti
return kept_track_points
def extract_write_cyclone_data(dataset, kept_track_points, filename, output_path, extract_size=30):
# Extracting min and max time
def extract_start_stop_measure(dataset):
df = dataset.to_dataframe()
min_time = df["minute"].min(skipna=True)
max_time = df["minute"].max(skipna=True)
return min_time, max_time
# Set NaNs where the field "minute" and "wind" of the dataset is outside the +-time_offset
# relative to the cyclone track point date
def set_nan_outside_time_offset(dataset, track_data, time_offset=30):
track_timedelta = datetime.timedelta(hours=track_data["date"].hour, minutes=track_data["date"].minute)
dataset["minute"] = dataset["minute"].where((dataset["minute"].data <
np.timedelta64(int((track_timedelta + datetime.timedelta(
minutes=time_offset)).total_seconds()), 's')) &
(dataset["minute"].data >
np.timedelta64(int((track_timedelta - datetime.timedelta(
minutes=time_offset)).total_seconds()), 's')))
# Apply "minute" created NaNs on "wind" field
dataset["wind"].data[np.isnan(dataset["minute"].data)] = np.nan
return dataset
def extract_write_cyclone_data(dataset, kept_track_points, filename, output_path, file_date, extract_size_km=2000):
for sid, track_point in kept_track_points.items():
sel = dataset.isel(lon=slice(track_point["lon_index"] - extract_size, track_point["lon_index"] + extract_size),
lat=slice(track_point["lat_index"] - extract_size, track_point["lat_index"] + extract_size),
# Km per deg in latitude
km_per_deg_lat = 111
# Degrees offset per latitude index in data array
deg_per_index_lat = abs(dataset["lat"][0] - dataset["lat"][1])
# Km offset per latitude index in data array
km_per_idx_lat = km_per_deg_lat * deg_per_index_lat
logger.debug(f"km_per_idx_lat: {km_per_idx_lat}")
# Number of latitude index to take to have extract_size_km size of data
nb_idx_lat = round(float(extract_size_km / km_per_idx_lat) / 2)
logger.debug(f"nb_idx_lat: {nb_idx_lat}")
sel_lat = dataset.isel(lat=slice(track_point["lat_index"] - nb_idx_lat,
track_point["lat_index"] + nb_idx_lat),
node=track_point["node"])
# In next section, the goal is to compute how many longitude index we must take to have a data section of
# 'extract_size_km'. As the km offset between each longitude degrees depends of the latitude, we first compute
# the latitude_average of the current selected data and compute the km_per_deg at that latitude.
# Latitude average of the current area
lat_av = sel_lat["lat"].max() + sel_lat["lat"].min() / 2
logger.debug(f"lat_av: {lat_av}")
# Earth circumference
earth_circum = 40000
# Kilometer per longitude deg at current latitude
km_per_deg = abs(earth_circum * math.cos(math.radians(lat_av)) / 360)
logger.debug(f"km_per_deg: {km_per_deg}")
# Deg offset per index in data
deg_per_index = abs(sel_lat["lon"][0] - sel_lat["lon"][1])
# Km offset per index in data
km_per_idx = km_per_deg * deg_per_index
logger.debug(f"km_per_idx: {km_per_idx}")
# Number of longitude index to take to have extract_size_km size of data
nb_idx_lon = float(extract_size_km / km_per_idx)
nb_idx_lon_2 = round(nb_idx_lon / 2)
logger.debug(f"nb_idx_lon_2 : {nb_idx_lon_2}")
# Selecting area of interest
sel = dataset.isel(lon=slice(track_point["lon_index"] - nb_idx_lon_2, track_point["lon_index"] + nb_idx_lon_2),
lat=slice(track_point["lat_index"] - nb_idx_lat, track_point["lat_index"] + nb_idx_lat),
node=track_point["node"])
sel.to_netcdf(os.path.join(output_path, filename))
# Removing data that is on other swath than the one where the cyclone is
sel = set_nan_outside_time_offset(sel, track_point)
# Extracting min and max time to set attributes on NetCDF
min_time, max_time = extract_start_stop_measure(sel)
# print("jlupiku", df[df["minute"] == min_time], df[df["minute"] == max_time])
# print(sel["minute"].data.tolist())
if max_time - min_time > datetime.timedelta(minutes=30):
logger.error(f"Time offset between min_time ({min_time}) and max_time ({max_time}) exceeds 30 min.")
sys.exit(1)
logger.debug(f"min_time: {min_time}, max_time: {max_time}")
sel.attrs["measurement_start_date"] = str(file_date + min_time)
sel.attrs["measurement_stop_date"] = str(file_date + max_time)
# Removing useless attributes
if "aprrox_local_equatorial_crossing_time" in sel.attrs:
del sel.attrs["aprrox_local_equatorial_crossing_time"]
if "swath_sector" in sel.attrs:
del sel.attrs["swath_sector"]
sel = sel.drop_vars("minute")
sel = sel.rename_vars({"wind": "wind_speed"})
# Converting lon from 0;360 range to -180;180
sel["lon"] = deg180(sel["lon"])
output_filename = f"RSS_smap_wind_{track_point['date'].strftime('%Y_%m_%d_%H_%M_%S')}_{track_point['sid']}" \
f"_v01.0.nc"
sel.to_netcdf(os.path.join(output_path, output_filename))
def process_smap_file(session, file, output_path):
......@@ -102,7 +208,7 @@ def process_smap_file(session, file, output_path):
kept_track_points = get_colocated_track_points(dataset, track_points, file_date)
logger.info(f"For file {filename} Kept track that will be used to extract SMAP data: {kept_track_points}")
extract_write_cyclone_data(dataset, kept_track_points, filename, output_path)
extract_write_cyclone_data(dataset, kept_track_points, filename, output_path, file_date)
if __name__ == "__main__":
......@@ -114,6 +220,8 @@ if __name__ == "__main__":
parser.add_argument("--dbd", action="store", type=str, required=True,
help='database (postgresql://user:pass@host:5432/base_name)')
parser.add_argument("--debug", action="store_true", default=False, help="Run in debug mode (verbose output)")
parser.add_argument("-i", "--input", action="store", type=str, required=True, nargs="+",
help="Input files from which extract cyclone data")
parser.add_argument("-o", "--output", action="store", type=str, default="./output_test",
help="Output path where files will be written")
args = parser.parse_args()
......@@ -124,7 +232,7 @@ if __name__ == "__main__":
if args.debug:
logger.setLevel(logging.DEBUG)
engine = create_engine(args.dbd)
engine = create_engine(args.dbd, pool_size=20, max_overflow=0)
Session = sessionmaker(bind=engine)
smap_tests = [
"/home/datawork-cersat-public/provider/remss/satellite/l3/smap/smap/wind/v1.0/daily/2020/205/RSS_smap_wind_daily_2020_07_23_v01.0.nc",
......@@ -135,5 +243,5 @@ if __name__ == "__main__":
"/home/datawork-cersat-public/provider/remss/satellite/l3/smap/smap/wind/v1.0/daily/2020/210/RSS_smap_wind_daily_2020_07_28_v01.0.nc",
"/home/datawork-cersat-public/provider/remss/satellite/l3/smap/smap/wind/v1.0/daily/2020/211/RSS_smap_wind_daily_2020_07_29_v01.0.nc",
]
for f in smap_tests:
for f in args.input:
process_smap_file(Session(), f, args.output)
#!/bin/bash
while getopts ":d:" o; do
case "${o}" in
d)
shift;
days=$1 ; shift ;;
* ) echo "unknown opt $1" >&2 ; exit 1 ;;
esac
done
# Activate our conda environment
if [ -z "$CONDA_PREFIX" ] ; then
conda_dir=$(readlink -f $script_dir/..)
. "$conda_dir/bin/activate $conda_dir"
fi
# Get DATABASE_URL if not defined
if [ -z "$DATABASE_URL" ] ; then
source "$CONDA_PREFIX/etc/cyclobs_db.conf"
fi
echo "Database URL : $DATABASE_URL"
smap_path=/home/datawork-cersat-public/provider/remss/satellite/l3/smap/smap/wind/v1.0/daily
shopt -s globstar
if [[ -n $days ]]; then
echo "Updating last $days days"
# Converting the date "$days days ago" to seconds since UNIX-time
sec=$(date --date="$days days ago" +%s)
files_to_process=""
# Keeping only files that are in the requested time interval
for smap_file in "$smap_path"/**/*.nc; do # Whitespace-safe and recursive
secDt=$(echo "$smap_file" | awk -F_ '{print $5"-"$6"-"$7}' | xargs -IDT date -d DT +%s)
if (( $secDt > $sec )); then
files_to_process="$files_to_process"$'\n'"$smap_file"
fi
done
else
echo "Updating whole SMAP archive"
files_to_process=$(find $smap_path -name "*.nc")
fi
# Removing empty lines
files_to_process=$(echo "$files_to_process" | sed -r '/^\s*$/d')
#printf "%s\n $files_to_process"
# Generating cyclone SMAP files
printf "%s\n $files_to_process" | xargs -r -P 10 -l50 extractSmapCyclone.py --dbd "$DATABASE_URL" -i
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment