Commit 81c63095 authored by CEVAER's avatar CEVAER
Browse files

Added comments, and a bit of cleaning.

parent 31bef3ad
......@@ -26,7 +26,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description=description)
parser.add_argument("--dbd", action="store", type=str, required=True,
help='database (postgresql://user:pass@host:5432/base_name)')
help='database (postgresql://user:pass@host:5432/base_name). NOT USED, TO REMOVE')
parser.add_argument("--debug", action="store_true", default=False, help="Run in debug mode (verbose output)")
parser.add_argument("-i", "--input", action="store", type=str, required=True, nargs="+",
help="Input files from which extract cyclone data")
......@@ -47,13 +47,11 @@ if __name__ == "__main__":
from extract_cyclones import process_file
engine = create_engine(args.dbd, pool_size=50, max_overflow=0)
Session = sessionmaker(bind=engine)
#attrs_to_del = ["aprrox_local_equatorial_crossing_time", "swath_sector", "first_orbit", "last_orbit"]
#var_attr_edit = {"lon": {"valid_min": -180, "valid_max": 180}}
# Attributes to add to the smap netcdf
attrs = {"Conventions": "CF-1.6",
"title": "SMAP ocean surface wind speed (10m above surface) cropped around Tropical Cyclone track",
"institution": "IFREMER/LOPS",
......@@ -70,7 +68,7 @@ if __name__ == "__main__":
for f in args.input:
# TODO generate filename_format depending on input filename
process_file(Session(), file=f, output_path=args.output, extract_date_func=extract_date_from_filename,
process_file(file=f, output_path=args.output, extract_date_func=extract_date_from_filename,
var_to_del=["minute"],
attrs=attrs,
attrs_to_del=attrs_to_del,
......
......@@ -58,7 +58,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description=description)
parser.add_argument("--dbd", action="store", type=str, required=True,
help='database (postgresql://user:pass@host:5432/base_name)')
help='database (postgresql://user:pass@host:5432/base_name). NOT USED, TO REMOVE')
parser.add_argument("--debug", action="store_true", default=False, help="Run in debug mode (verbose output)")
parser.add_argument("-i", "--input", action="store", type=str, required=True, nargs="+",
help="Input files from which extract cyclone data")
......@@ -77,9 +77,6 @@ if __name__ == "__main__":
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO)
engine = create_engine(args.dbd, pool_size=50, max_overflow=0)
Session = sessionmaker(bind=engine)
# Attributes that'll be deleted
# attrs_to_del = ["aprrox_local_equatorial_crossing_time", "swath_sector", "geospatial_bounds",
# "geospatial_lat_min", "geospatial_lat_max", "geospatial_lon_min", "geospatial_lon_max"]
......@@ -101,7 +98,7 @@ if __name__ == "__main__":
for f in args.input:
# TODO generate filename_format depending on input filename
process_file(Session(), file=f, output_path=args.output, extract_date_func=extract_date_from_filename,
process_file(file=f, output_path=args.output, extract_date_func=extract_date_from_filename,
var_to_del=["measurement_time", "time"], wind_col="wind_speed", attrs_to_del=attrs_to_del,
attrs=attrs,
attrs_rename=attrs_rename,
......
......@@ -35,7 +35,7 @@ def deg360(deg180):
lon = np.array(deg180) % 360
return lon
# Not used
def get_track_points_from_database(session, date):
req_tracks = session.query(SimpleTrack, func.ST_X(cast(SimpleTrack.geom, Geometry)).label("lon"),
func.ST_Y(cast(SimpleTrack.geom, Geometry)).label("lat")) \
......@@ -45,6 +45,20 @@ def get_track_points_from_database(session, date):
def get_track_points_from_api(date):
"""
Request cyclobs API for track points between date and date + 2 days.
Parameters
----------
date : datetime The date used to filter the query.
Returns
-------
pandas.DataFrame
DataFrame containing the API result. Can be empty.
"""
min_date = date.strftime("%Y-%m-%d")
max_date = (date + datetime.timedelta(days=2)).strftime("%Y-%m-%d")
req = f"https://cyclobs.ifremer.fr/app/api/track?min_date={min_date}&max_date={max_date}"
......@@ -99,11 +113,14 @@ def is_full_time(dataset, time_col_name):
def get_colocated_track_points(dataset, track_points, file_date, wind_col_name, is_full_time,
time_col_name, pass_col_name=None, lon_col_name="lon", lat_col_name="lat",
deg_delta=3, time_delta=60):
track_point_offsets = []
for index, track_point in track_points.iterrows():
logger.debug(f"Track point : {track_point}")
# Selecting point in database that is geographically the nearest from the current track point.
sel = dataset.sel(**{lon_col_name: track_point["lon"], lat_col_name: track_point["lat"], "method": "nearest"})
# Find the index in the lon and lat array of that nearest pixel.
lon_index = np.where(dataset[lon_col_name].data == sel[lon_col_name].data)[0][0]
lat_index = np.where(dataset[lat_col_name].data == sel[lat_col_name].data)[0][0]
logger.debug(f"Point find : lon: {sel[lon_col_name].values}, lat: {sel[lat_col_name].values},"
......@@ -113,6 +130,8 @@ def get_colocated_track_points(dataset, track_points, file_date, wind_col_name,
trk_ptn = track_point.to_dict()
if pass_col_name:
# If we are processing a file which has several pass, we select the "best" pass. That is the pass
# in which the selected pixel is the closest temporally to the track point date.
select_pass(sel, file_date, trk_ptn, lon_index, lat_index, time_col_name, is_full_time)
else:
df = sel.to_dataframe().reset_index()
......@@ -121,6 +140,8 @@ def get_colocated_track_points(dataset, track_points, file_date, wind_col_name,
point_date = file_date + df.loc[0, time_col_name]
else:
point_date = df.loc[0, time_col_name]
# Computing and storing the time offset between the track point and the pixel time of acquisition.
# This offset will later be used to decide if that pixel is used as a reference point to extract the data.
time_offset = abs(track_point["date"] - point_date)
trk_ptn["time_offset"] = time_offset
trk_ptn["node"] = None
......@@ -130,8 +151,10 @@ def get_colocated_track_points(dataset, track_points, file_date, wind_col_name,
track_point_offsets.append(trk_ptn)
kept_track_points = {}
# Getting the best track point for each sid (storm id).
# Getting the *best* track point for each sid (storm id).
# One file can contain several cyclones acquisitions so that is why we are doing this filtering per sid
# The kept track point are those that have temporal distance below 7 min 30 sec with the associated pixel and
# if several track points match that criteria, the closest temporally is kept.
for track_point in track_point_offsets:
# TODO replace timedelta with database track sampling time
if track_point["time_offset"] <= datetime.timedelta(minutes=7, seconds=30):
......@@ -201,6 +224,7 @@ def extract_start_stop_measure(dataset, time_col_name):
return min_time, max_time
def extract_write_cyclone_data(dataset, kept_track_points, filename, output_path,
file_date, source_filename, attrs, attrs_to_del, attrs_rename,
var_to_del, lon_col_name,
......@@ -392,17 +416,54 @@ def write_ncdf(dataset, output_path, filename_format, start_date, stop_date, yea
dataset.to_netcdf(os.path.join(out, output_filename), format="NETCDF4_CLASSIC")
def process_file(session, file, output_path, extract_date_func, attrs, var_to_del, wind_col,
def process_file(file, output_path, extract_date_func, attrs, var_to_del, wind_col,
time_col, lat_col, lon_col, filename_format, pass_width, pass_col=None, attrs_to_del=[],
attrs_rename={},
var_attr_edit={},
specfic_func=None):
"""
Extract portion on data from file (data corresponding to a cyclone) and write it to a NetCDF file.
Track points concerning the given file are retrieved from CyclObs API, then it is checked if
one or several of these track points are actually colocated with data points in the given file.
If there are, a portion of data around each selected point is extracted from file. These extracted portions
correspond to interesting areas where a cyclone has been captured. That extracted data is
saved as a well structured NETCDF file.
Parameters
----------
file : str Path to SMOS or SMAP file from which to extract the cyclone data
output_path : str Path in which save the netCDF file
extract_date_func : function Function used to extract the date using, as parameter, os.path.basename(file)
attrs : dict Attributes that will be set in the output NETCDF file.
var_to_del : list of str Variables that will not be included into the resulting netCDF file.
wind_col : str Name of the wind speed variable in the given file.
time_col : str Name of the time variable in the given file.
lat_col : str Name of the latitude variable in the given file.
lon_col : str Name of the lngitude variable in the given file.
filename_format : str Format to use to name the resulting NETCDF file. Example : RSS_smap_wind_<start_date>_<stop_date>_<sid>.nc
pass_width : int Size of the extracted portion.
pass_col : str or None Variable used to indicate if there is several pass in the given file. If there is (!=None), a string must be given indicating the variable name of the pass.
attrs_to_del : dict of list Variable attributes to not include into resulting NETCDF. Keys : variable name, Value: list of attribute names
attrs_rename : dict of str Rename global attributes. Key: attribute value to rename, Value: new attribute name
var_attr_edit : dict of dict Edit variable attribute values. Format : {"var": {"attr": new_value}}
specfic_func : function A function taking an xarray dataset as parameter and returning an xarray dataset. That function
can perform any operation on the data.
Returns
-------
Nothing
"""
logger.info(f"Processing {file}...")
filename = os.path.basename(file)
file_date = extract_date_func(filename)
logger.debug(f"File date {file_date}")
# Retrieve track points near that date from CyclObs API
track_points = get_track_points_from_api(file_date)
logger.debug(f"Number of track point found : {len(track_points)}")
......@@ -411,6 +472,7 @@ def process_file(session, file, output_path, extract_date_func, attrs, var_to_de
logger.warning("No track points, skipping this file.")
return
# Transform lon from -180, 180 to 0, 360
track_points["lon"] = track_points["lon"].apply(lambda x: shape360(x, 0)[0])
dataset = xarray.open_dataset(file)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment