diff --git a/hls4ml/model/optimizer/passes/bit_exact.py b/hls4ml/model/optimizer/passes/bit_exact.py index 88dc65c806..5f04dc2b5b 100644 --- a/hls4ml/model/optimizer/passes/bit_exact.py +++ b/hls4ml/model/optimizer/passes/bit_exact.py @@ -486,14 +486,46 @@ def _(layer: Conv1D | Conv2D): bias = _bias.data if _bias is not None else 0 k_in, i_in, f_in = get_input_kifs(layer)[0] k_in, i_in, f_in = pad_arrs(layer, 0, k_in, i_in, f_in) - k_in, i_in, f_in = im2col(kernel.shape, k_in, i_in, f_in) - k_in, i_in, f_in = stride_arrs(layer, k_in, i_in, f_in) - kernel = kernel.reshape(-1, kernel.shape[-1]) - qint_in = QIntervalArray.from_kif(k_in, i_in, f_in) - qint_out = qint_in @ kernel - qint_out = qint_out + bias - k, i, f = qint_out.to_kif() - return k.astype(np.int16), i, f + + in_per_group = kernel.shape[-2] + n_chan = k_in.shape[-1] + if in_per_group == n_chan: + # Standard (non-grouped) convolution: kernel covers every input channel. + k_in, i_in, f_in = im2col(kernel.shape, k_in, i_in, f_in) + k_in, i_in, f_in = stride_arrs(layer, k_in, i_in, f_in) + kernel = kernel.reshape(-1, kernel.shape[-1]) + qint_in = QIntervalArray.from_kif(k_in, i_in, f_in) + qint_out = qint_in @ kernel + qint_out = qint_out + bias + k, i, f = qint_out.to_kif() + return k.astype(np.int16), i, f + + # Grouped / depthwise convolution: the kernel stores only ``in_per_group`` + # input channels (groups = n_chan // in_per_group). Each group is an + # independent standard convolution over its own channel slice; process the + # groups separately and concatenate along the channel axis. Depthwise is the + # degenerate groups == n_chan (in_per_group == 1) case. + out_chan = kernel.shape[-1] + groups = n_chan // in_per_group + out_per_group = out_chan // groups + k_outs, i_outs, f_outs = [], [], [] + for g in range(groups): + in_sl = slice(g * in_per_group, (g + 1) * in_per_group) + out_sl = slice(g * out_per_group, (g + 1) * out_per_group) + kernel_g = kernel[..., out_sl] + kg, ig, fg = im2col(kernel_g.shape, k_in[..., in_sl], i_in[..., in_sl], f_in[..., in_sl]) + kg, ig, fg = stride_arrs(layer, kg, ig, fg) + qint_in = QIntervalArray.from_kif(kg, ig, fg) + qint_out = qint_in @ kernel_g.reshape(-1, out_per_group) + qint_out = qint_out + (bias[out_sl] if _bias is not None else 0) + k, i, f = qint_out.to_kif() + k_outs.append(k) + i_outs.append(i) + f_outs.append(f) + k = np.concatenate(k_outs, axis=-1).astype(np.int16) + i = np.concatenate(i_outs, axis=-1) + f = np.concatenate(f_outs, axis=-1) + return k, i, f @_produce_kif.register(Pooling1D) diff --git a/test/pytest/test_bit_exact_grouped_conv.py b/test/pytest/test_bit_exact_grouped_conv.py new file mode 100644 index 0000000000..527bbf1a52 --- /dev/null +++ b/test/pytest/test_bit_exact_grouped_conv.py @@ -0,0 +1,133 @@ +"""Bit-exact precision propagation through grouped / depthwise Conv1D / Conv2D. + +The ``produce_kif`` handler for ``Conv1D`` / ``Conv2D`` assumed a non-grouped +kernel (``kernel.shape[-2] == n_chan``). A grouped or depthwise convolution +stores only ``in_per_group`` input channels per filter, so the im2col buffer no +longer lines up with the full-channel input and the bit_exact pass raised, e.g.:: + + ValueError: could not broadcast input array from shape (48,) into shape (3,) + +Each group is an independent standard convolution over its own channel slice, so +the fix processes the groups separately and concatenates along the channel axis. + +Scope: this exercises the (backend-independent) bit_exact precision-propagation +pass. The io_parallel/io_stream *codegen* for grouped convolutions is a separate +concern, so rather than comparing a compiled prediction the test asserts the +contract the pass must uphold: the output precision it assigns represents the +quantized Keras output exactly (no rounding, no saturation). +""" + +from pathlib import Path + +import keras +import numpy as np +import pytest + +from hls4ml.converters import convert_from_keras_model + +try: + from hgq.config import QuantizerConfigScope + from hgq.layers import QConv1D, QConv2D + from hgq.utils import trace_minmax +except ImportError: + pytest.skip('HGQ2 is not installed', allow_module_level=True) + +from keras.layers import Input # noqa: E402 + +from hls4ml.model.layers import Conv1D, Conv2D # noqa: E402 +from hls4ml.model.optimizer.passes.bit_exact import produce_kif # noqa: E402 + +test_root_path = Path(__file__).parent + + +def _assert_exactly_representable(values, k, i, f): + """Assert every value lands exactly on the signed fixed-point grid (k, i, f). + + ``k``/``i``/``f`` may be per-element arrays (broadcasting over ``values``) or + scalars. A value is representable when it is a multiple of 2**-f and lies in + the closed range [-(2**i) * k, 2**i - 2**-f]; if produce_kif under-allocated + any of k, i or f, the regridded value differs and the assertion fails. + """ + values = values.astype(np.float64) + k = np.asarray(k, dtype=np.float64) + i = np.asarray(i, dtype=np.float64) + f = np.asarray(f, dtype=np.float64) + delta = 2.0**-f + lo = -(2.0**i) * k + hi = 2.0**i - delta + regridded = np.clip(np.round(values / delta) * delta, lo, hi) + np.testing.assert_array_equal(regridded, values) + + +def _result_kif(node): + """k, i (excluding sign), f of the precision bit_exact assigned to ``node``.""" + precision = node.get_output_variable().type.precision + k = int(precision.signed) + f = precision.fractional + i = precision.integer - k + return k, i, f + + +def _find(hls_model, cls, name): + return next(node for node in hls_model.graph.values() if isinstance(node, cls) and node.name == name) + + +@pytest.mark.parametrize('backend', ['Vivado', 'Vitis', 'oneAPI']) +@pytest.mark.parametrize('n_chan, groups', [(16, 16), (16, 4), (16, 1)], ids=['depthwise', 'grouped', 'dense']) +def test_bit_exact_grouped_conv1d(test_case_id, backend, n_chan, groups): + """A grouped / depthwise QConv1D must convert through the bit_exact flow and + be assigned an output precision that represents the quantized Keras output + exactly. The leading QConv1D inserts the FixedPointQuantizer that triggers + the bit_exact pass.""" + with QuantizerConfigScope(f0=4, i0=4): + inp = Input((16, n_chan)) + x = QConv1D(n_chan, 1, name='c0')(inp) + out = QConv1D(n_chan, 3, padding='same', groups=groups, name='cg')(x) + model = keras.Model(inp, out) + + data = np.random.default_rng(0).standard_normal((1000, 16, n_chan)).astype(np.float32) + r_keras = trace_minmax(model, data, return_results=True) + + precision = 'ac_fixed<2,0>' if backend == 'oneAPI' else 'ap_fixed<1,0>' + hls_config = {'Model': {'Precision': precision, 'ReuseFactor': 1, 'Strategy': 'latency'}} + output_dir = str(test_root_path / test_case_id) + # Conversion runs the bit_exact pass; this raised ValueError pre-fix for groups > 1. + hls_model = convert_from_keras_model( + model, backend=backend, output_dir=output_dir, hls_config=hls_config, io_type='io_parallel' + ) + + conv = _find(hls_model, Conv1D, 'cg') + # The single output precision bit_exact assigned must represent the true output. + _assert_exactly_representable(r_keras, *_result_kif(conv)) + + # Per-channel check (stronger: catches per-channel / group-ordering errors the + # max-aggregated result_t could mask). oneAPI transposes conv weights after the + # pass, so the channels_last produce_kif recompute is only valid for Vivado/Vitis. + if backend != 'oneAPI': + _assert_exactly_representable(r_keras, *produce_kif(conv)) + + +@pytest.mark.parametrize('backend', ['Vivado', 'Vitis', 'oneAPI']) +@pytest.mark.parametrize('n_chan, groups', [(4, 4), (4, 2), (4, 1)], ids=['depthwise', 'grouped', 'dense']) +def test_bit_exact_grouped_conv2d(test_case_id, backend, n_chan, groups): + """2D counterpart of :func:`test_bit_exact_grouped_conv1d`.""" + with QuantizerConfigScope(f0=4, i0=4): + inp = Input((8, 8, n_chan)) + x = QConv2D(n_chan, 1, name='c0')(inp) + out = QConv2D(n_chan, 3, padding='same', groups=groups, name='cg')(x) + model = keras.Model(inp, out) + + data = np.random.default_rng(1).standard_normal((500, 8, 8, n_chan)).astype(np.float32) + r_keras = trace_minmax(model, data, return_results=True) + + precision = 'ac_fixed<2,0>' if backend == 'oneAPI' else 'ap_fixed<1,0>' + hls_config = {'Model': {'Precision': precision, 'ReuseFactor': 1, 'Strategy': 'latency'}} + output_dir = str(test_root_path / test_case_id) + hls_model = convert_from_keras_model( + model, backend=backend, output_dir=output_dir, hls_config=hls_config, io_type='io_parallel' + ) + + conv = _find(hls_model, Conv2D, 'cg') + _assert_exactly_representable(r_keras, *_result_kif(conv)) + if backend != 'oneAPI': + _assert_exactly_representable(r_keras, *produce_kif(conv))