Commit a2d9fdc2 authored by PONCELET's avatar PONCELET
Browse files

Add some notebook examples

parent 2df80c64
......@@ -2,3 +2,22 @@
Python code repository related to Sonar Netcdf (https://github.com/ices-publications/SONAR-netCDF4) format and its bathymetry extension XSF format
### Get Started!
Ready to contribute? Here's how to set up `pySonar-netcdf` for local development.
. Install anaconda or miniconda on your computer
. Clone the `pySonar-netcdf` repo on GitLab.
. Create your developpent environnement with the create_anaconda_environments.py script in requirements directory.
+
`cd requirements`
+
`python create_anaconda_environments.py -t dev`
+
For detailed info see https://docs.conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html#building-identical-conda-environments
. activate anaconda environment
`conda activate sonarnetcdf_dev`
"""
Use to be able to retrieve modules and import them without install
"""
#if __name__ == '__main__' and __package__ is None:
from os import sys, path
sys.path.append(path.dirname(path.dirname(path.abspath("__file__"))))
This diff is collapsed.
......@@ -5,4 +5,5 @@ channels:
- defaults
dependencies:
- black
- jupyter
- spyder
......@@ -6,4 +6,5 @@ channels:
- defaults
dependencies:
- netcdf4
- matplotlib
- python=3.7
import sys
import netCDF4 as nc
import numpy as np
import matplotlib.pyplot as plt
import traceback
# ensure minimum size for figures
plt.rcParams["figure.dpi"] = 270
class bcolors:
"""Utility for color display in terminal or jupyter"""
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
def print_example(text: str):
print(f"{bcolors.WARNING}{text}{bcolors.ENDC}")
def pprint(msg, back_ground_color=None):
"""Intercept print call"""
if back_ground_color == None:
print(msg)
else:
print(f"{back_ground_color}{msg}{bcolors.ENDC}")
def error(msg: str):
pprint(msg, back_ground_color=bcolors.FAIL)
def warning(msg: str):
pprint(msg, back_ground_color=bcolors.WARNING)
def header(msg: str):
pprint(msg, back_ground_color=bcolors.BOLD)
class NcReader:
"""
Class used for reading a Sonar-netcdf like file
It has a set of methods allowing to parse and dump the file content on a jupyter netbook
"""
def __init__(self, filename):
"""
:param filename: the file path to read
:param ignore_variable: a list of variables name that will be ignored and not display
"""
# open the file
self.file_name = filename
self.dataset = nc.Dataset(self.file_name)
def __del__(self):
# close the file
self.dataset.close()
def _is_string_variable(self, variable_path):
variable = self.dataset[variable_path]
return variable.dtype == type("str")
def _get_variable_data(self, variable_path, slice_index=dict()):
"""
Return the variable as a np array matrix (either 1D or 2D)
:param variable_path: path to the variable name
:param substract_tvg_offset: do we substract TVG offset trying to something close to an absolute level value for backscatter
:param variable_name: the name of the variable to read
:param index: the current swath id, for WC data it will allow to load the current swath data only
:return: a np array
"""
variable = self.dataset[variable_path]
if self._is_xsfvlen(variable):
return self._get_vlen_variable(variable_path, slice_index)
return np.array(variable).transpose()
def _get_variable(self, variable_path):
return self.dataset[variable_path]
def _is_xsfvlen(self, variable):
"""
Check if the given variable is a variable length variable (in XSF variable length definition)
"""
return variable._isvlen
def _is_ignored(self, variable_name, ignore_variable):
"""
Tell is a variable is masked variable (ancillary variable for example)
"""
return variable_name in ignore_variable
def _squeeze_shape(self, shape: tuple, dimensions: tuple):
"""
Remove dimension equals to 1 in a tuple list, if everything is equal to 1 the initia
:param shape:
:return: a tuple with 1 value removed
"""
final_shape = ()
final_dims = ()
for sh, dim in zip(shape, dimensions):
if sh != 1:
final_shape += (sh,)
final_dims += (dim,)
return final_shape, final_dims
def _apply_scale_offset(self, values: np.ndarray, scale_factor=None, add_offset=None, missing_value=None):
replace_masked_values = False
if missing_value is not None:
mask = values == missing_value
if scale_factor is not None:
values = values * scale_factor
replace_masked_values = True
if add_offset is not None:
values = values + add_offset
replace_masked_values = False
if replace_masked_values and missing_value is not None:
values[mask] = np.nan
return values
def _get_vlen_variable(self, variable_path, slice_index: dict):
"""
retrieve a matrix containing a slice (matrix) of all vlen data, filled with NaN values
"""
vlen_variable = self.dataset[variable_path]
shape = vlen_variable.shape
dimensions = vlen_variable.dimensions
reduced_shape, reduced_dimensions = self._squeeze_shape(shape, dimensions)
# with netcdf < 1.5.4 we need to take into account for scale factor and add offset by ourself
scale_factor = None
add_offset = None
if hasattr(vlen_variable, "scale_factor"):
scale_factor = vlen_variable.scale_factor
if hasattr(vlen_variable, "add_offset"):
add_offset = add_offset
missing_value = None
if hasattr(vlen_variable, "missing_value"):
missing_value = vlen_variable.missing_value
if hasattr(vlen_variable, "_FillValue"):
missing_value = vlen_variable._FillValue
if len(reduced_shape) == 0:
# data are like a vector shape
values = vlen_variable[:]
values = np.squeeze(values)
values = self._apply_scale_offset(scale_factor, add_offset, missing_value)
return values[:]
if len(reduced_shape) > 1:
# data is still with a dimensionnality higher than 1 (in fact 2 with its vlen charactéristic)
# we need to reduce this, to do so we'll use the slice index dictionnary
values = np.squeeze(vlen_variable[:])
# compute slice taking into account for slice_index
selection_applied = ""
reduced_shape_in_progress = ()
reduced_dim_in_progress = ()
slicing = ()
for sh, dim in zip(reduced_shape, reduced_dimensions):
if dim in slice_index:
index = slice_index[dim]
slicing += (slice(1),) # we select the slice specified in index
else:
reduced_shape_in_progress += (sh,)
reduced_dim_in_progress += (dim,)
slicing += (slice(None),) # we select all data
values = values[slicing]
values = np.squeeze(values)
reduced_shape = reduced_shape_in_progress
reduced_dimensions = reduced_dim_in_progress
else:
# values = vlen_variable[:]
values = np.empty(shape=reduced_shape, dtype=object)
for index in range(0, reduced_shape[0]):
values[index] = vlen_variable[index]
pprint(f"Vlen variable {variable_path} : reduced shape for display is {reduced_dimensions} ({reduced_shape})")
if len(reduced_shape) == 1:
# dimension is 1 like a ping indexed variable, we can parse all vlen data and build a matrix from it
values = np.squeeze(values)
# retrieve max size
max_samples = 0
for sub_array in values: # in a performance point of view we'll read the data twice
max_samples = max(max_samples, len(sub_array))
# fill ping with data\n",
matrix = np.full(
(reduced_shape[0], max_samples),
dtype="float32",
fill_value=float(np.nan),
)
for bnr in range(reduced_shape[0]):
# Warning, auto scale if set by default ping[bnr][:count] = sample_amplitude[start:stop]
count = len(values[bnr])
matrix[bnr][:count] = self._apply_scale_offset(values[bnr], scale_factor, add_offset, missing_value)
return matrix.transpose()
raise NotImplementedError(
f"Not supported display of vlen variable {variable_path}, reduced dimensions are too high {reduced_dimensions} ({reduced_shape})"
)
def _display_variable_header(self, variable_name, variable_path):
"""
Print variable
:return: None
"""
header(f"\n\n")
header(f"Variable {variable_name} : {variable_path}")
pprint(f"{self.dataset[variable_path]}")
def _display_variable(
self, variable_name, variable_path, slice_index: dict, ignore_variable=[], cmap="viridis", vmin=None, vmax=None
) -> np.ndarray:
if cmap is None:
cmap = "viridis"
# this is the only way I found to clear the figure : clear it and recreate it
if not self._is_ignored(variable_name, ignore_variable):
if self._is_string_variable(variable_path):
# String variable, will only pprint a few data
pprint(f"Variable {variable_name} is of type string")
variable = self.dataset[variable_path]
values = variable[:]
pprint(f"Variable {variable_name} {len(values)} values (['{values[0]}',...,'{values[-1]}']")
else:
v = self._get_variable_data(variable_path, slice_index)
pprint("Variable : " + variable_path + " size =" + str(v.shape))
if len(v.shape) == 1:
if v.shape[0] > 0:
pprint("Statistics min:" + str(np.nanmin(v)) + " max:" + str(np.nanmax(v)))
plt.plot(v)
plt.show()
else:
warning("1D variable with a null size" + variable_path + " size =" + str(v.shape))
elif len(v.shape) == 2:
if v.shape[0] > 0 and v.shape[1] > 0:
pprint("statistics min:" + str(np.nanmin(v)) + " max:" + str(np.nanmax(v)))
if vmin is None:
vmin = np.nanmin(v)
if vmax is None:
vmax = np.nanmax(v)
fig, ax = plt.subplots()
im = ax.imshow(v, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax)
fig.colorbar(im)
plt.show()
else:
warning(f"Empty values dimensions = {v.shape}")
else:
error("Cannot display variable : " + variable_path + " size =" + str(v.shape))
pprint("Values =" + str(v))
return v
else:
warning("ignored variable :" + variable_name)
return None
def dump_content(
self,
root="/",
recurse_subgroup=True,
slice_index=dict(),
ignored_variable_list=[],
cmap=None,
vmax=None,
vmin=None,
):
"""
Display all group content (name, types, attributes, variables)
Variable content is displayed if numeric as 1D or 2D plots.
When dimensions of variable is higher that 2 a reduction is done on dimension equals to 1 and if not enough the slice_index parameter is used to reduce dimensions
Vlen data is filled up with invalid values along the variable length dimension in order to be displayed properly
:param root: the root path used as a starting point
:param recurse_subgroup: boolean indicating if recursion into subgroup is done
:param slice_index: parameter containing a dictionnary of index that should be use to reduce dimension if needed for example slice_index={'ping_time':3}
:param ignored_variable_list: list of variables that should be ignored
:return: None
"""
if root == "/" or root is None:
root_group = self.dataset
else:
root_group = self.dataset[root]
return self._recurse_and_display(
root_group,
slice_index=slice_index,
recurse_subgroup=recurse_subgroup,
ignored_variable_list=ignored_variable_list,
cmap=cmap,
vmin=vmin,
vmax=vmax,
)
def dump_groups(self, starting_path="/"):
"""
Parse recursively all group and print their names
:param starting_path: the group starting point
:return: None
"""
if starting_path == "/" or starting_path is None:
self._dump_groups_and_recurse(group=None)
# Root group is not strictly a group but a nc.Dataset (it has no name)
start_dataset = self.dataset
else:
start_dataset = self.dataset[starting_path]
self._dump_groups_and_recurse(start_dataset)
def _dump_groups_and_recurse(self, group=None, level=""):
"""Recurse every group and print their names"""
if group is None:
pprint("-+Root")
dataset = self.dataset
else:
if len(group.groups) > 0:
pprint(f"{level}|+{group.name}")
else:
pprint(f"{level}|-{group.name}")
dataset = group
for subgroup_name in dataset.groups:
self._dump_groups_and_recurse(dataset[subgroup_name], level=f"{level} ")
def _print_variable(self, variable, slice_index, ignored_variable_list, cmap=None, vmin=None, vmax=None):
v = None
try:
variable_path = variable._grp.path + "/" + variable.name
self._display_variable_header(variable, variable_path)
v = self._display_variable(
variable,
variable_path,
slice_index,
ignore_variable=ignored_variable_list,
cmap=cmap,
vmin=vmin,
vmax=vmax,
)
except:
error("Error for variable :" + variable_path)
error(f"Unexpected error: {sys.exc_info()}")
traceback.print_exc()
return v
def _recurse_and_display(
self, dataset, slice_index, recurse_subgroup=True, ignored_variable_list=[], cmap=None, vmin=None, vmax=None
):
last_variable = None
if isinstance(dataset, nc.Variable):
return self._print_variable(dataset, slice_index, ignored_variable_list, cmap=cmap, vmin=vmin, vmax=vmax)
if dataset.parent is not None:
header(f"Group {dataset.name} ({dataset.path}")
else:
# root dataset has not name
header(f"Root Group {dataset.path}")
pprint(f"{dataset}")
for variable in sorted(dataset.variables):
last_variable = self._print_variable(
dataset[variable], slice_index, ignored_variable_list, cmap=cmap, vmin=vmin, vmax=vmax
)
if recurse_subgroup:
for subgroup_name in dataset.groups:
last_variable = self._recurse_and_display(
dataset.groups[subgroup_name], slice_index, cmap=cmap, vmin=vmin, vmax=vmax
)
return last_variable
# in case we are started in standalone app, for debug only
if __name__ == "__main__":
file_path = "D:/data/file/XSF/Movies/Sardine_schools_1.xsf.nc"
file_path = "D:/XSF/0006_20200504_111056_FG_EM122.xsf.nc"
file_path = "D:/data/file/XSF/ExampleSonarData/test90-D20171107-T195133.nc"
file_path = "C:/data/datasets/ADCP/HYDROMOMAR-D20200904-T093759.nc"
root = "/"
reader = NcReader(file_path)
reader.dump_content(
root="Sonar/Beam_group1/ADCP/Mean_current/current_velocity_geographical_down",
cmap="inferno",
vmin=-0.5,
vmax=0.5,
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment