Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,16 @@ def export_model_outputs(data_type: str, config: OmegaConf, **kwargs) -> None:
fsteps = get_fsteps(fsteps, fname_zarr)
samples = get_samples(samples, fname_zarr)
streams = get_streams(stream, fname_zarr)
_logger.info(f"Streams to process: {streams}")
for stream in streams:
grid_type = get_grid_type(data_type, stream, fname_zarr)
channels = get_channels(channels, stream, fname_zarr)
stream_channels = get_channels(channels, stream, fname_zarr)
source_starts, source_ends = get_source_info(fname_zarr, stream, samples)
# prevent overwritting of channels between streams
kwargs["grid_type"] = grid_type
kwargs["channels"] = channels
kwargs["channels"] = stream_channels
kwargs["data_type"] = data_type
kwargs["stream"] = stream

parser = CfParserFactory.get_parser(config=config, **kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def parse_args(args: list) -> argparse.Namespace:
parser.add_argument(
"--stream",
type=str,
choices=["ERA5", "CERRA", "MEPS", "NORA3", "IMERG_ANEMOI"],
choices=["ERA5", "ERA5pl", "ERA5ml", "CERRA", "MEPS", "NORA3", "IMERG_ANEMOI"],
help="Stream name to retrieve data for",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def process_sample(
da_fs = []

for result in fstep_iterator_results:
if result is None:
if result is None or (isinstance(result, xr.DataArray) and result.size == 0):
continue

# result is already a materialized xarray DataArray (built in the worker).
Expand Down Expand Up @@ -139,19 +139,22 @@ def reshape(self, data: xr.DataArray) -> xr.Dataset:
# Original logic
var_dict = find_pl(data.channel.values)
data_vars = {}

#order of appending upoints should be ipoint, pressure_level, mem (if mem exists)
for new_var, pls in var_dict.items():
data_dims = ["ipoint"]
if "mem" in data.dims:
data_dims.append("mem")
if pls[0] is not None:
data_dims.append("pressure_level")
if "mem" in data.dims:
data_dims.append("mem")
old_vars = [f"{new_var}_{p}" for p in pls]
data_vars[new_var] = xr.DataArray(
data.sel(channel=old_vars).values,
dims=[*data_dims, "pressure_level"],
dims=data_dims,
coords={"pressure_level": pls},
)
else:
if "mem" in data.dims:
data_dims.append("mem")
data_vars[new_var] = xr.DataArray(
data.sel(channel=new_var).values,
dims=data_dims,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def process_sample(
ref_time: np.datetime64,
source_interval_start: np.datetime64 = None,
source_interval_end: np.datetime64 = None,
**kwargs
):
"""
Process results from get_data_worker: reshape, concatenate, add metadata, and save.
Expand Down
Loading