Commit a621f741 authored by CEVAER's avatar CEVAER

Setting start_date and stop_date in file name and as global attributes. Added...

Setting start_date and stop_date in file name and as global attributes. Added sourceProductVersion attribute.
parent 0e505d3b
......@@ -66,6 +66,7 @@ if __name__ == "__main__":
}
attrs_to_del = {"wind": ["missing_value"]}
attrs_rename = {"version": "sourceProductVersion"}
for f in args.input:
# TODO generate filename_format depending on input filename
......@@ -73,7 +74,8 @@ if __name__ == "__main__":
var_to_del=["minute"],
attrs=attrs,
attrs_to_del=attrs_to_del,
attrs_rename=attrs_rename,
wind_col="wind", time_col="minute",
lat_col="lat",
lon_col="lon", pass_col="node", pass_width=1000,
filename_format="RSS_smap_wind_<time>_<sid>_v01.0.nc")
filename_format="RSS_smap_wind_<start_date>_<stop_date>_<sid>.nc")
......@@ -95,14 +95,16 @@ if __name__ == "__main__":
"missionName": "SMOS"
}
attrs_to_del = {"time": ["authority"], "lat": ["authority"], "lon": ["authority"], "wind_speed": ["authority"]}
attrs_rename = {"product_version": "sourceProductVersion"}
from extract_cyclones import process_file
for f in args.input:
# TODO generate filename_format depending on input filename
process_file(Session(), file=f, output_path=args.output, extract_date_func=extract_date_from_filename,
var_to_del=["measurement_time"], wind_col="wind_speed", attrs_to_del=attrs_to_del,
var_to_del=["measurement_time", "time"], wind_col="wind_speed", attrs_to_del=attrs_to_del,
attrs=attrs,
attrs_rename=attrs_rename,
time_col="measurement_time",
lat_col="lat", lon_col="lon", pass_col=None, pass_width=1200,
filename_format="SM_OPER_MIR_<time>_<sid>.nc", specfic_func=set_nan_low_quality)
filename_format="SM_OPER_MIR_<start_date>_<stop_date>_<sid>.nc", specfic_func=set_nan_low_quality)
......@@ -14,6 +14,7 @@ import math
from geo_shapely import shape360
from shapely.wkt import dumps
from shapely.geometry import Polygon
from pkg_resources import get_distribution
logger = logging.getLogger(__name__)
......@@ -168,8 +169,10 @@ def compute_footprint(dataset, wind_field):
df, geometry=gpd.points_from_xy(df.lon, df.lat))
p = gdf.unary_union.convex_hull
# p = p.simplify(5)
# Transforming the polygon so its coords are in range -180:180
p = transform_polyg_180(p)
# p = transform_polyg_180(p)
# print(p)
return p
......@@ -181,9 +184,9 @@ def extract_start_stop_measure(dataset, time_col_name):
return min_time, max_time
def extract_write_cyclone_data(dataset, kept_track_points, filename, output_path,
file_date, source_filename, attrs, attrs_to_del, var_to_del, lon_col_name,
file_date, source_filename, attrs, attrs_to_del, attrs_rename,
var_to_del, lon_col_name,
lat_col_name, wind_col_name, time_col_name, pass_col_name,
filename_format,
is_full_time,
......@@ -192,6 +195,7 @@ def extract_write_cyclone_data(dataset, kept_track_points, filename, output_path
specific_func=None,
min_pixel=5):
for sid, track_point in kept_track_points.items():
logger.debug(f"Track point : {track_point}")
# Km per deg in latitude
km_per_deg_lat = 111
# Degrees offset per latitude index in data array
......@@ -216,6 +220,7 @@ def extract_write_cyclone_data(dataset, kept_track_points, filename, output_path
# the latitude_average of the current selected data and compute the km_per_deg at that latitude.
# Latitude average of the current area
logger.debug(f"sel_lat[lat_col_name] : {sel_lat[lat_col_name]}")
lat_av = sel_lat[lat_col_name].max() + sel_lat[lat_col_name].min() / 2
logger.debug(f"lat_av: {lat_av}")
# Earth circumference
......@@ -260,94 +265,119 @@ def extract_write_cyclone_data(dataset, kept_track_points, filename, output_path
logger.info(f"Data has less than {min_pixel} pixel of data. Not writing to file.")
return
# Deleting all attributes
sel.attrs.clear()
# Extracting min and max time to set attributes on NetCDF
min_time, max_time = extract_start_stop_measure(sel, time_col_name)
if max_time - min_time > datetime.timedelta(minutes=30):
logger.error(f"Time offset between min_time ({min_time}) and max_time ({max_time}) exceeds 30 min.")
sys.exit(1)
logger.debug(f"min_time: {min_time}, max_time: {max_time}")
if is_full_time:
#sel.attrs["measurement_start_date"] = min_time.strftime("%Y-%m-%d %H:%M:%S")
#sel.attrs["measurement_stop_date"] = max_time.strftime("%Y-%m-%d %H:%M:%S")
sel.attrs["measurementDate"] = (min_time + (max_time - min_time) / 2).strftime("%Y-%m-%d %H:%M:%S")
year = str(min_time.year)
doy = min_time.strftime('%j')
sel, start_date, stop_date, year, doy = set_metadata(sel, wind_col_name, time_col_name, file_date, attrs,
attrs_to_del, attrs_rename, var_to_del, var_attr_edit,
lon_col_name, source_filename, is_full_time)
write_ncdf(sel, output_path, filename_format, start_date, stop_date, year, doy, track_point["sid"])
def set_metadata(dataset, wind_col_name, time_col_name, file_date, attrs, attrs_to_del, attrs_rename,
var_to_del, var_attr_edit, lon_col_name, source_filename, is_full_time):
for attr in attrs_rename:
if attr in dataset.attrs:
attrs[attrs_rename[attr]] = dataset.attrs[attr]
# Deleting all attributes
dataset.attrs.clear()
# Extracting min and max time to set attributes on NetCDF
min_time, max_time = extract_start_stop_measure(dataset, time_col_name)
if max_time - min_time > datetime.timedelta(minutes=30):
logger.error(f"Time offset between min_time ({min_time}) and max_time ({max_time}) exceeds 30 min.")
sys.exit(1)
logger.debug(f"min_time: {min_time}, max_time: {max_time}")
# Computing acquisition start and stop dates and setting as global attribute
if is_full_time:
min_date = min_time
max_date = max_time
else:
# In the case of SMAP, the time dataArray contains only a time (hour, min, sec) and no date.
# To have a full date, we add the file date.
min_date = file_date + min_time
max_date = file_date + max_time
year = min_date.year
doy = min_date.strftime('%j')
# measure_date = min_date + (max_date - min_date) / 2
dataset.attrs["measurementStartDate"] = min_date.strftime("%Y-%m-%d %H:%M:%S")
dataset.attrs["measurementStopDate"] = max_date.strftime("%Y-%m-%d %H:%M:%S")
# Setting new attributes
for attr in attrs:
if type(attrs[attr]) == dict:
for var_attr in attrs[attr]:
# The _FillValue attribute is in the encoding dict
if var_attr == "_FillValue":
dataset[attr].encoding[var_attr] = attrs[attr][var_attr]
else:
dataset[attr].attrs[var_attr] = attrs[attr][var_attr]
else:
min_date = file_date + min_time
max_date = file_date + max_time
year = min_date.year
doy = min_date.strftime('%j')
#sel.attrs["measurement_start_date"] = str(min_date)
#sel.attrs["measurement_stop_date"] = str(file_date + max_time)
sel.attrs["measurementDate"] = (min_date + (max_date - min_date) / 2).strftime("%Y-%m-%d %H:%M:%S")
# Setting new attributes
for attr in attrs:
if type(attrs[attr]) == dict:
for var_attr in attrs[attr]:
if var_attr == "_FillValue":
sel[attr].encoding[var_attr] = attrs[attr][var_attr]
else:
sel[attr].attrs[var_attr] = attrs[attr][var_attr]
else:
sel.attrs[attr] = attrs[attr]
# Removing attributes
# FIXME potentially to remove because shadowed by the above deletion
for attr in attrs_to_del:
if type(attrs_to_del[attr]) == list:
for var_attr in attrs_to_del[attr]:
if var_attr in sel[attr].attrs:
del sel[attr].attrs[var_attr]
if var_attr in sel[attr].encoding:
del sel[attr].encoding[var_attr]
else:
if attr in sel.attrs:
del sel.attrs[attr]
dataset.attrs[attr] = attrs[attr]
# Removing attributes
# FIXME potentially to remove because shadowed by the above deletion. But for now it still has an effect because
# it deletes attributes from .encoding too.
for attr in attrs_to_del:
if type(attrs_to_del[attr]) == list:
for var_attr in attrs_to_del[attr]:
if var_attr in dataset[attr].attrs:
del dataset[attr].attrs[var_attr]
if var_attr in dataset[attr].encoding:
del dataset[attr].encoding[var_attr]
else:
if attr in dataset.attrs:
del dataset.attrs[attr]
for var in var_to_del:
if var in dataset.keys():
dataset = dataset.drop_vars(var)
new_wind_col = "wind_speed"
dataset = dataset.rename_vars({wind_col_name: new_wind_col})
for var in var_to_del:
if var in sel.keys():
sel = sel.drop_vars(var)
footprint_polyg = compute_footprint(dataset, new_wind_col)
new_wind_col = "wind_speed"
sel = sel.rename_vars({wind_col_name: new_wind_col})
# Converting lon from 0;360 range to -180;180
lon_attrs = dataset[lon_col_name].attrs
dataset[lon_col_name] = deg180(dataset[lon_col_name])
dataset[lon_col_name].attrs = lon_attrs
footprint_polyg = compute_footprint(sel, new_wind_col)
dataset.attrs["sourceProduct"] = source_filename
dataset.attrs["footprint"] = dumps(footprint_polyg)
__version__ = get_distribution('extract_cyclones').version
dataset.attrs["productVersion"] = __version__
# Converting lon from 0;360 range to -180;180
lon_attrs = sel[lon_col_name].attrs
sel[lon_col_name] = deg180(sel[lon_col_name])
sel[lon_col_name].attrs = lon_attrs
for var in var_attr_edit:
for attr in var_attr_edit[var]:
dataset[var].attrs[attr] = var_attr_edit[var][attr]
sel.attrs["sourceProduct"] = source_filename
sel.attrs["footprint"] = dumps(footprint_polyg)
return dataset, min_date, max_date, year, doy
for var in var_attr_edit:
for attr in var_attr_edit[var]:
sel[var].attrs[attr] = var_attr_edit[var][attr]
output_filename = filename_format.replace("<time>", track_point['date'].strftime('%Y_%m_%d_%H_%M_%S'))
output_filename = output_filename.replace("<sid>", track_point['sid'])
# Write the dataset to a netCDF file
def write_ncdf(dataset, output_path, filename_format, start_date, stop_date, year, doy, track_point_sid):
output_filename = filename_format.replace("<start_date>", start_date.strftime('%Y%m%dT%H%M%S'))
output_filename = output_filename.replace("<stop_date>", stop_date.strftime('%Y%m%dT%H%M%S'))
output_filename = output_filename.replace("<sid>", track_point_sid)
logger.info(f"Writing file {output_filename}")
logger.info(f"Writing file {output_filename}")
sel = sel.squeeze()
dataset = dataset.squeeze()
sel.rio.set_spatial_dims('lon', 'lat', inplace=True)
sel.rio.write_crs("epsg:4326", inplace=True)
dataset.rio.set_spatial_dims('lon', 'lat', inplace=True)
dataset.rio.write_crs("epsg:4326", inplace=True)
out = os.path.join(output_path, str(year), doy)
os.makedirs(out, exist_ok=True)
out = os.path.join(output_path, str(year), doy)
os.makedirs(out, exist_ok=True)
sel.to_netcdf(os.path.join(out, output_filename), format="NETCDF4_CLASSIC")
dataset.to_netcdf(os.path.join(out, output_filename), format="NETCDF4_CLASSIC")
def process_file(session, file, output_path, extract_date_func, attrs, var_to_del, wind_col,
time_col, lat_col, lon_col, filename_format, pass_width, pass_col=None, attrs_to_del=[],
attrs_rename={},
var_attr_edit={},
specfic_func=None):
logger.info(f"Processing {file}...")
......@@ -376,7 +406,9 @@ def process_file(session, file, output_path, extract_date_func, attrs, var_to_de
extract_write_cyclone_data(dataset, kept_track_points, filename, output_path, file_date,
source_filename=filename, wind_col_name=wind_col,
time_col_name=time_col, lat_col_name=lat_col, lon_col_name=lon_col,
pass_col_name=pass_col, attrs=attrs, attrs_to_del=attrs_to_del, var_to_del=var_to_del,
pass_col_name=pass_col, attrs=attrs, attrs_to_del=attrs_to_del,
attrs_rename=attrs_rename,
var_to_del=var_to_del,
filename_format=filename_format,
is_full_time=full_time, extract_size_km=pass_width * 2, var_attr_edit=var_attr_edit,
specific_func=specfic_func)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment