diff --git a/docs/releases.md b/docs/releases.md index 37b42a5a..5146e4d7 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -9,6 +9,11 @@ - `open_virtual_dataset` and `open_virtual_datatree` now populate `ds.encoding["source"]` with the normalized source URI, mirroring [`xarray.open_dataset`][]'s behaviour. Parsers that have already set `encoding["source"]` are left untouched. By [Tom Nicholas](https://github.com/TomNicholas). +### Bug fixes + +- The HDF parser now expresses CF `scale_factor`/`add_offset` packing as the zarr `scale_offset` + `cast_value` codecs (stripping those attributes) instead of leaving them as attributes. Files packed with different parameters therefore get different codec pipelines, so `check_same_codecs` refuses to concatenate them rather than silently keeping only the first file's parameters and corrupting decoded values for chunks sourced from other files. No concatenation code was changed. The kerchunk (zarr v2) writer reconstructs the equivalent `scale_factor`/`add_offset` attributes on write. Note that codec-based decoding (`packed / scale`) agrees with xarray's `decode_cf` (`packed * scale_factor`) only to within ~1 ULP. Closes [#1004](https://github.com/zarr-developers/VirtualiZarr/issues/1004). + By [Tom Nicholas](https://github.com/TomNicholas). + ### Documentation - Document that virtual concatenation also requires homogeneous CF encoding (`scale_factor`/`add_offset`) across files — xarray's default attribute-merging silently drops mismatched values and produces incorrectly-decoded data on read. Added a new FAQ bullet and a warning admonition under "Combining virtual datasets" in the usage docs. See [#1004](https://github.com/zarr-developers/VirtualiZarr/issues/1004). diff --git a/pyproject.toml b/pyproject.toml index ded4c679..05db7bd7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,9 @@ hdf = [ "hdf5plugin", "imagecodecs", "imagecodecs-numcodecs==2024.6.1", + # CF scale/offset packing is expressed via the zarr `scale_offset` + + # `cast_value` codecs; the latter needs its Rust backend at parse time. + "cast-value-rs", ] zarr = ["arro3-core"] diff --git a/virtualizarr/parsers/hdf/filters.py b/virtualizarr/parsers/hdf/filters.py index beabbf5a..06f010db 100644 --- a/virtualizarr/parsers/hdf/filters.py +++ b/virtualizarr/parsers/hdf/filters.py @@ -8,6 +8,7 @@ from numcodecs.abc import Codec from numcodecs.fixedscaleoffset import FixedScaleOffset from xarray.coding.variables import _choose_float_dtype +from zarr.codecs import CastValue, ScaleOffset from virtualizarr.utils import soft_import @@ -68,6 +69,13 @@ class CFCodec(TypedDict): codec: Codec +class CFCodecs(TypedDict): + target_dtype: np.dtype + codecs: list[dict] + scale: float + offset: float + + def _filter_to_codec( filter_id: str, filter_properties: Union[int, None, Tuple] = None ) -> Codec: @@ -170,6 +178,69 @@ def cfcodec_from_dataset(dataset: Dataset) -> Codec | None: return None +def cf_codecs_from_dataset(dataset: Dataset) -> CFCodecs | None: + """ + Express an h5py dataset's CF scale/offset packing as zarr v3 codecs. + + CF-packed data is stored as integers and decoded as + ``unpacked = packed * scale_factor + add_offset``. We represent that with + two zarr v3 codecs applied to the (decoded) float array: + + - ``scale_offset`` does the arithmetic. Its decode is ``x / scale + offset``, + so ``scale = 1 / scale_factor`` and ``offset = add_offset``. + - ``cast_value`` casts between the decoded float dtype and the stored + integer dtype. + + Returning these as codecs (rather than leaving ``scale_factor`` / + ``add_offset`` as attributes) means two arrays packed with different + parameters get different codec pipelines, so ``check_same_codecs`` refuses + to concatenate them instead of silently corrupting decoded values + (see https://github.com/zarr-developers/VirtualiZarr/issues/1004). + + Parameters + ---------- + dataset + An h5py dataset. + + Returns + ------- + CFCodecs or None + ``None`` when the dataset has no scale/offset packing. + """ + attributes = {attr: dataset.attrs[attr] for attr in dataset.attrs} + mapping = {} + if "scale_factor" in attributes: + try: + scale_factor = attributes["scale_factor"][0] + except IndexError: + scale_factor = attributes["scale_factor"] + mapping["scale_factor"] = float(1 / scale_factor) + else: + mapping["scale_factor"] = 1 + if "add_offset" in attributes: + try: + offset = attributes["add_offset"][0] + except IndexError: + offset = attributes["add_offset"] + mapping["add_offset"] = float(offset) + else: + mapping["add_offset"] = 0 + if mapping["scale_factor"] == 1 and mapping["add_offset"] == 0: + return None + + target_dtype = np.dtype(_choose_float_dtype(dtype=dataset.dtype, mapping=mapping)) + scale_offset = ScaleOffset( + scale=mapping["scale_factor"], offset=mapping["add_offset"] + ) + cast_value = CastValue(data_type=dataset.dtype.name) + return CFCodecs( + target_dtype=target_dtype, + codecs=[scale_offset.to_dict(), cast_value.to_dict()], + scale=mapping["scale_factor"], + offset=mapping["add_offset"], + ) + + def codecs_from_dataset(dataset: Dataset) -> List[Codec]: """ Extracts a list of numcodecs from an h5py dataset diff --git a/virtualizarr/parsers/hdf/hdf.py b/virtualizarr/parsers/hdf/hdf.py index cbbe2b74..063e0b2b 100644 --- a/virtualizarr/parsers/hdf/hdf.py +++ b/virtualizarr/parsers/hdf/hdf.py @@ -21,7 +21,7 @@ ManifestStore, ) from virtualizarr.manifests.utils import create_v3_array_metadata -from virtualizarr.parsers.hdf.filters import codecs_from_dataset +from virtualizarr.parsers.hdf.filters import cf_codecs_from_dataset, codecs_from_dataset from virtualizarr.parsers.typing import ReaderFactory from virtualizarr.parsers.utils import encode_cf_fill_value from virtualizarr.types import ChunkKey @@ -64,23 +64,28 @@ def _construct_manifest_array( attrs = _extract_attrs(dataset) dtype = dataset.dtype - # Temporarily disable use CF->Codecs - TODO re-enable in subsequent PR. - # cfcodec = cfcodec_from_dataset(dataset) - # if cfcodec: - # codecs.insert(0, cfcodec["codec"]) - # dtype = cfcodec["target_dtype"] - # attrs.pop("scale_factor", None) - # attrs.pop("add_offset", None) - # else: - # dtype = dataset.dtype + codec_configs = [zarr_codec_config_to_v3(codec.get_config()) for codec in codecs] + fill_value = dataset.fillvalue.item() + + # Express CF scale/offset packing as zarr codecs rather than attributes, so + # arrays packed with different parameters get different codec pipelines and + # cannot be silently concatenated. The scale_offset/cast_value codecs run on + # the decoded float array, so they go before the file's byte-level filters. + # See https://github.com/zarr-developers/VirtualiZarr/issues/1004. + cfcodecs = cf_codecs_from_dataset(dataset) + if cfcodecs is not None: + codec_configs = cfcodecs["codecs"] + codec_configs + dtype = cfcodecs["target_dtype"] + # The array is now logically float (decoded), so the fill value — stored + # in the packed integer domain — must be decoded to match, mirroring the + # ScaleOffset codec's decode of `packed / scale + offset`. + fill_value = fill_value / cfcodecs["scale"] + cfcodecs["offset"] + attrs.pop("scale_factor", None) + attrs.pop("add_offset", None) if "_FillValue" in attrs: encoded_cf_fill_value = encode_cf_fill_value(attrs["_FillValue"], dtype) attrs["_FillValue"] = encoded_cf_fill_value - - codec_configs = [zarr_codec_config_to_v3(codec.get_config()) for codec in codecs] - - fill_value = dataset.fillvalue.item() dims = tuple(_dataset_dims(dataset, group=group)) metadata = create_v3_array_metadata( shape=dataset.shape, diff --git a/virtualizarr/tests/test_parsers/test_hdf/test_hdf.py b/virtualizarr/tests/test_parsers/test_hdf/test_hdf.py index 9c7442c4..debd71e7 100644 --- a/virtualizarr/tests/test_parsers/test_hdf/test_hdf.py +++ b/virtualizarr/tests/test_parsers/test_hdf/test_hdf.py @@ -10,8 +10,10 @@ from virtualizarr.parsers import HDFParser from virtualizarr.tests import ( requires_hdf5plugin, + requires_icechunk, requires_imagecodecs, ) +from virtualizarr.tests.test_integration import roundtrip_as_in_memory_icechunk from virtualizarr.tests.utils import manifest_store_from_hdf_url @@ -228,3 +230,111 @@ def test_netcdf_over_https(): ): np.testing.assert_allclose(ds["z"].min().to_numpy(), -6) np.testing.assert_allclose(ds["z"].max().to_numpy(), 817) + + +def _write_packed_hdf5( + path, *, x_start, x_len, scale_factor, add_offset, fill_value=-9999 +): + """Write a small CF-packed int16 HDF5 file with the given scale/offset attrs.""" + with h5py.File(path, "w") as f: + data = np.arange(x_start, x_start + x_len, dtype="int16").reshape(x_len, 1) + d = f.create_dataset("foo", data=data, chunks=(x_len, 1)) + d.attrs["scale_factor"] = np.float64(scale_factor) + d.attrs["add_offset"] = np.float64(add_offset) + d.attrs["_FillValue"] = np.int16(fill_value) + x = f.create_dataset( + "x", data=np.arange(x_start, x_start + x_len, dtype="int32") + ) + x.make_scale("x") + d.dims[0].attach_scale(x) + y = f.create_dataset("y", data=np.arange(1, dtype="int32")) + y.make_scale("y") + d.dims[1].attach_scale(y) + + +@requires_hdf5plugin +@requires_imagecodecs +class TestConcatMismatchedCFEncoding: + """ + The HDF parser expresses CF scale_factor / add_offset packing as zarr + `scale_offset` + `cast_value` codecs rather than as attributes. Files packed + with different parameters therefore get different codec pipelines, so the + existing `check_same_codecs` refuses to concatenate them instead of silently + keeping only the first file's parameters and corrupting decoded values for + every chunk that did not come from the first file (see + https://github.com/zarr-developers/VirtualiZarr/issues/1004). + """ + + def test_concat_mismatched_scale_factor_raises(self, tmp_path, local_registry): + p1 = tmp_path / "a.nc" + p2 = tmp_path / "b.nc" + _write_packed_hdf5(p1, x_start=0, x_len=4, scale_factor=0.1, add_offset=0.0) + _write_packed_hdf5(p2, x_start=4, x_len=4, scale_factor=0.01, add_offset=0.0) + + parser = HDFParser() + with ( + open_virtual_dataset( + url=f"file://{p1}", parser=parser, registry=local_registry + ) as vds1, + open_virtual_dataset( + url=f"file://{p2}", parser=parser, registry=local_registry + ) as vds2, + ): + # the codec mismatch surfaces as different ScaleOffset codecs: + # scale = 1 / scale_factor, so 0.1 -> 10.0 and 0.01 -> 100.0 + with pytest.raises(NotImplementedError, match="ScaleOffset") as excinfo: + xr.concat([vds1, vds2], dim="x") + assert "scale=10.0" in str(excinfo.value) + assert "scale=100.0" in str(excinfo.value) + + def test_concat_mismatched_add_offset_raises(self, tmp_path, local_registry): + p1 = tmp_path / "a.nc" + p2 = tmp_path / "b.nc" + _write_packed_hdf5(p1, x_start=0, x_len=4, scale_factor=0.1, add_offset=0.0) + _write_packed_hdf5(p2, x_start=4, x_len=4, scale_factor=0.1, add_offset=100.0) + + parser = HDFParser() + with ( + open_virtual_dataset( + url=f"file://{p1}", parser=parser, registry=local_registry + ) as vds1, + open_virtual_dataset( + url=f"file://{p2}", parser=parser, registry=local_registry + ) as vds2, + ): + # add_offset maps directly onto the ScaleOffset codec's offset + with pytest.raises(NotImplementedError, match="ScaleOffset") as excinfo: + xr.concat([vds1, vds2], dim="x") + assert "offset=100.0" in str(excinfo.value) + + @requires_icechunk + def test_concat_matching_cf_encoding_roundtrips(self, tmp_path, local_registry): + # positive control: identical CF encoding must concatenate cleanly, and a + # write -> reopen -> decode round-trip must yield the correct unpacked + # values for chunks sourced from *both* files (packed = arange, + # decoded = packed * 0.1). + p1 = tmp_path / "a.nc" + p2 = tmp_path / "b.nc" + _write_packed_hdf5(p1, x_start=0, x_len=4, scale_factor=0.1, add_offset=0.0) + _write_packed_hdf5(p2, x_start=4, x_len=4, scale_factor=0.1, add_offset=0.0) + + parser = HDFParser() + with ( + open_virtual_dataset( + url=f"file://{p1}", parser=parser, registry=local_registry + ) as vds1, + open_virtual_dataset( + url=f"file://{p2}", parser=parser, registry=local_registry + ) as vds2, + ): + combined = xr.concat([vds1, vds2], dim="x") + assert combined["foo"].shape == (8, 1) + # scale/offset live on the codec pipeline, not as attributes + assert ( + "scale_factor" not in combined["foo"].variable.data.metadata.attributes + ) + assert "add_offset" not in combined["foo"].variable.data.metadata.attributes + + roundtrip = roundtrip_as_in_memory_icechunk(combined, tmp_path) + expected = (np.arange(0, 8) * 0.1).reshape(8, 1) + np.testing.assert_allclose(roundtrip["foo"].values, expected) diff --git a/virtualizarr/tests/test_xarray.py b/virtualizarr/tests/test_xarray.py index 356feaf7..c794bf5b 100644 --- a/virtualizarr/tests/test_xarray.py +++ b/virtualizarr/tests/test_xarray.py @@ -820,7 +820,15 @@ def test_loadable_variables( for name, var in ds.variables.items(): if name in actual_loadable_variables: - xrt.assert_identical(vds.variables[name], ds.variables[name]) + if "scale_factor" in var.encoding or "add_offset" in var.encoding: + # CF scale/offset packing is now decoded by the zarr + # scale_offset codec (packed / scale) rather than by + # xarray's decode_cf (packed * scale_factor). The two + # agree to within ~1 ULP but are not bit-identical, so + # the decoded values are compared with assert_allclose. + xrt.assert_allclose(vds.variables[name], ds.variables[name]) + else: + xrt.assert_identical(vds.variables[name], ds.variables[name]) def test_group_kwarg_not_a_group(self, hdf5_groups_file, local_registry): parser = HDFParser(group="doesnt_exist") @@ -846,7 +854,15 @@ def test_group_kwarg(self, hdf5_groups_file, local_registry): ): for name in full_ds.variables: if name in vars_to_load: - xrt.assert_identical(vds.variables[name], full_ds.variables[name]) + ref = full_ds.variables[name] + if "scale_factor" in ref.encoding or "add_offset" in ref.encoding: + # CF scale/offset packing is decoded by the zarr + # scale_offset codec (packed / scale) rather than by + # xarray's decode_cf (packed * scale_factor); these agree + # to within ~1 ULP but are not bit-identical. + xrt.assert_allclose(vds.variables[name], ref) + else: + xrt.assert_identical(vds.variables[name], ref) def test_open_dataset_with_empty(self, hdf5_empty, local_registry): parser = HDFParser() diff --git a/virtualizarr/writers/kerchunk.py b/virtualizarr/writers/kerchunk.py index 9cd0782c..195e7d60 100644 --- a/virtualizarr/writers/kerchunk.py +++ b/virtualizarr/writers/kerchunk.py @@ -1,6 +1,6 @@ import base64 import json -from typing import cast +from typing import Any, cast import numpy as np import ujson @@ -9,12 +9,15 @@ from xarray.backends.zarr import encode_zarr_variable from xarray.coding.times import CFDatetimeCoder from xarray.conventions import encode_dataset_coordinates +from zarr.codecs import CastValue, ScaleOffset from zarr.core.common import JSON from zarr.core.metadata.v2 import ArrayV2Metadata +from zarr.core.metadata.v3 import ArrayV3Metadata from zarr.dtype import parse_data_type from virtualizarr.manifests import ManifestArray from virtualizarr.manifests.manifest import join +from virtualizarr.manifests.utils import create_v3_array_metadata from virtualizarr.types.kerchunk import KerchunkArrRefs, KerchunkStoreRefs from virtualizarr.utils import convert_v3_to_v2_metadata @@ -106,6 +109,55 @@ def remove_file_uri_prefix(path: str): return path +def _revert_cf_codecs_to_attrs( + metadata: ArrayV3Metadata, +) -> tuple[ArrayV3Metadata, dict[str, Any]]: + """ + Undo CF scale/offset codec packing for the zarr-v2-based kerchunk format. + + The HDF parser expresses CF scale_factor/add_offset as the zarr v3 + ``scale_offset`` + ``cast_value`` codecs, which numcodecs — and therefore + kerchunk — cannot represent. Revert that here: drop the two codecs, restore + the stored integer dtype and fill value, and hand scale_factor / add_offset + back as attributes so xarray's ``decode_cf`` reapplies them on read (the + long-standing v2 behaviour). Returns the input unchanged when no CF codecs + are present. + """ + scale_offset = next( + (c for c in metadata.codecs if isinstance(c, ScaleOffset)), None + ) + cast_value = next((c for c in metadata.codecs if isinstance(c, CastValue)), None) + if scale_offset is None or cast_value is None: + return metadata, {} + + scale, offset = scale_offset.scale, scale_offset.offset + storage_dtype = cast_value.dtype.to_native_dtype() + + cf_attrs: dict[str, Any] = {"scale_factor": 1.0 / scale} + if offset != 0: + cf_attrs["add_offset"] = offset + + # re-pack the decoded float fill value into the stored integer domain, + # mirroring the ScaleOffset codec's encode of `(x - offset) * scale` + packed_fill = int(round((float(metadata.fill_value) - offset) * scale)) + + remaining_codecs = [ + codec + for codec in metadata.to_dict()["codecs"] + if codec.get("name") not in ("scale_offset", "cast_value", "bytes") + ] + reverted = create_v3_array_metadata( + shape=metadata.shape, + data_type=storage_dtype, + chunk_shape=metadata.chunks, + fill_value=packed_fill, + codecs=remaining_codecs, + attributes=dict(metadata.attributes), + dimension_names=metadata.dimension_names, + ) + return reverted, cf_attrs + + def variable_to_kerchunk_arr_refs(var: Variable, var_name: str) -> KerchunkArrRefs: """ Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps either a ManifestArray or a numpy array). @@ -129,8 +181,9 @@ def variable_to_kerchunk_arr_refs(var: Variable, var_name: str) -> KerchunkArrRe entry["offset"], entry["length"], ] - array_v2_metadata = convert_v3_to_v2_metadata(marr.metadata) - zattrs = {**var.attrs, **var.encoding} + reverted_metadata, cf_attrs = _revert_cf_codecs_to_attrs(marr.metadata) + array_v2_metadata = convert_v3_to_v2_metadata(reverted_metadata) + zattrs = {**cf_attrs, **var.attrs, **var.encoding} else: var = encode_zarr_variable(var) try: