From dc700a47ec0c7504137b23c8dd813c9d52baa2c6 Mon Sep 17 00:00:00 2001 From: Ryan McKenna Date: Thu, 18 Jun 2026 14:50:41 -0700 Subject: [PATCH] Add heuristic one-way measurement for NumericalInitializer When an estimated_total is provided, NumericalInitializer now produces a LinearMeasurement with uniform counts and stddev = 1/sqrt(rho), based on empirical analysis of DPQuantiles noise. DataGenerationV3 orchestrates this by initializing closed-domain columns first to estimate the total count, falling back to a new DPGaussianCount primitive when no closed-domain columns exist. Also inlines _gaussian_histogram and select_partitions_gaussian_thresholding into their respective DPMechanism classes to reduce indirection. PiperOrigin-RevId: 934559141 --- dpsynth/data_generation_v3.py | 113 ++++++++++++++++--- dpsynth/local_mode/initialization.py | 70 ++++++++++-- dpsynth/local_mode/primitives.py | 137 +++++++++--------------- tests/data_generation_v3_test.py | 32 ++++++ tests/local_mode/initialization_test.py | 66 ++++++++++++ tests/local_mode/primitives_test.py | 80 ++++++++------ 6 files changed, 358 insertions(+), 140 deletions(-) diff --git a/dpsynth/data_generation_v3.py b/dpsynth/data_generation_v3.py index d4172f8..85573cc 100644 --- a/dpsynth/data_generation_v3.py +++ b/dpsynth/data_generation_v3.py @@ -28,6 +28,7 @@ from dpsynth.local_mode import primitives from dpsynth.local_mode import vectorized_transformations as vtx import mbi +from mbi import estimation as mbi_estimation import numpy as np import pandas as pd @@ -36,7 +37,10 @@ def _create_initializers( domains: Mapping[str, domain.AttributeType], numerical_bins: int, init_delta: float, -) -> dict[str, primitives.DPMechanism]: +) -> tuple[ + dict[str, primitives.DPMechanism], + dict[str, primitives.DPMechanism], +]: """Creates per-column initializers from the domain specification. Args: @@ -45,30 +49,31 @@ def _create_initializers( init_delta: Delta for open-set categorical partition selection. Returns: - A dictionary mapping column names to uncalibrated initializer instances. + A (closed_domain_initializers, numerical_initializers) tuple. Raises: ValueError: If a column has an unsupported attribute type. """ - initializers = {} + closed_inits = {} + numerical_inits = {} for col, attr in domains.items(): if isinstance(attr, domain.NumericalAttribute): - initializers[col] = initialization.NumericalInitializer( + numerical_inits[col] = initialization.NumericalInitializer( name=col, num_partitions=numerical_bins, attribute=attr ) elif isinstance(attr, domain.CategoricalAttribute): - initializers[col] = initialization.CategoricalInitializer( + closed_inits[col] = initialization.CategoricalInitializer( name=col, attribute=attr ) elif isinstance(attr, domain.OpenSetCategoricalAttribute): - initializers[col] = initialization.OpenSetCategoricalInitializer( + closed_inits[col] = initialization.OpenSetCategoricalInitializer( name=col, attribute=attr, delta=init_delta ) else: raise ValueError( f'Unsupported attribute type for column {col!r}: {type(attr)}' ) - return initializers + return closed_inits, numerical_inits @dataclasses.dataclass @@ -100,6 +105,7 @@ class DataGenerationV3(primitives.DPMechanism): default_factory=discrete_mechanisms.MSTMechanism ) initializers: dict[str, primitives.DPMechanism] | None = None + total_count_mechanism: primitives.DPGaussianCount | None = None cross_attribute_constraints: Sequence[constraints.Constraint] = () def calibrate( @@ -178,15 +184,41 @@ def _calibrate_zcdp( self, zcdp_rho, numerical_bins, init_delta, init_budget_fraction ): """Simple additive zCDP budget split.""" - inits = self.initializers or _create_initializers( - self.domains, numerical_bins, init_delta + if self.initializers is not None: + all_inits = dict(self.initializers) + has_closed_domain = any( + not isinstance(init, initialization.NumericalInitializer) + for init in all_inits.values() + ) + else: + closed_inits, numerical_inits = _create_initializers( + self.domains, numerical_bins, init_delta + ) + all_inits = {**closed_inits, **numerical_inits} + has_closed_domain = bool(closed_inits) + + has_numerical = any( + isinstance(init, initialization.NumericalInitializer) + for init in all_inits.values() ) + needs_total_count = has_numerical and not has_closed_domain + init_rho = init_budget_fraction * zcdp_rho - per_col_rho = init_rho / len(inits) + # If we need a separate total count and have no closed-domain columns to + # estimate it from, allocate one extra share of init budget for it. + num_shares = len(all_inits) + (1 if needs_total_count else 0) + per_col_rho = init_rho / num_shares discrete_rho = zcdp_rho - init_rho + calibrated_inits = { - col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items() + col: init.calibrate(zcdp_rho=per_col_rho) + for col, init in all_inits.items() } + calibrated_total = ( + primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho) + if needs_total_count + else None + ) calibrated_discrete = self.discrete_mechanism.calibrate( zcdp_rho=discrete_rho ) @@ -194,6 +226,7 @@ def _calibrate_zcdp( self, initializers=calibrated_inits, discrete_mechanism=calibrated_discrete, + total_count_mechanism=calibrated_total, ) def _calibrate_approx_dp( @@ -226,10 +259,24 @@ def _calibrate_approx_dp( Returns: A new DataGenerationV3 instance with calibrated sub-mechanisms. """ - inits = self.initializers or _create_initializers( - self.domains, numerical_bins, init_delta + if self.initializers is not None: + inits = dict(self.initializers) + has_closed_domain = any( + not isinstance(init, initialization.NumericalInitializer) + for init in inits.values() + ) + else: + closed_inits, numerical_inits = _create_initializers( + self.domains, numerical_bins, init_delta + ) + inits = {**closed_inits, **numerical_inits} + has_closed_domain = bool(closed_inits) + has_numerical = any( + isinstance(init, initialization.NumericalInitializer) + for init in inits.values() ) - num_columns = len(inits) + needs_total_count = has_numerical and not has_closed_domain + num_columns = len(inits) + (1 if needs_total_count else 0) # Stage 1: Convert (epsilon, remaining_delta) to zCDP and calibrate # initializers with init_budget_fraction of that budget. @@ -244,11 +291,17 @@ def _calibrate_approx_dp( calibrated_inits = { col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items() } - + calibrated_total = ( + primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho) + if needs_total_count + else None + ) # Stage 2: With init dp_events fixed, find the tightest discrete budget. # The accountant handles ApproximateDpEvent deltas from open-set # initializers automatically. init_events = [init.dp_event for init in calibrated_inits.values()] + if calibrated_total is not None: + init_events.append(calibrated_total.dp_event) # Determine accountant type based on discrete mechanism's dp_event. probe_event = self.discrete_mechanism.calibrate(zcdp_rho=1.0).dp_event @@ -277,6 +330,7 @@ def make_event_from_param(discrete_rho): self, initializers=calibrated_inits, discrete_mechanism=calibrated_discrete, + total_count_mechanism=calibrated_total, ) @property @@ -292,6 +346,8 @@ def dp_event(self) -> dp_accounting.DpEvent: if self.initializers is None: raise ValueError('Must call calibrate() before accessing dp_event.') events = [init.dp_event for init in self.initializers.values()] + if self.total_count_mechanism is not None: + events.append(self.total_count_mechanism.dp_event) events.append(self.discrete_mechanism.dp_event) return dp_accounting.ComposedDpEvent(events) @@ -321,9 +377,34 @@ def __call__( ) # Phase 1: Per-column initialization. + # Initialize closed-domain columns first to estimate the total count, + # then pass it to numerical initializers for heuristic measurements. col_results: dict[str, initialization.ColumnMeasurement] = {} + closed_domain_measurements = [] for col, init in self.initializers.items(): - col_results[col] = init(rng, data[col].values) + if not isinstance(init, initialization.NumericalInitializer): + col_results[col] = init(rng, data[col].values) + if col_results[col].measurement is not None: + closed_domain_measurements.append(col_results[col].measurement) + + # Estimate total from closed-domain measurements or DPGaussianCount. + estimated_total = None + if closed_domain_measurements: + estimated_total = mbi_estimation.minimum_variance_unbiased_total( + closed_domain_measurements + ) + elif self.total_count_mechanism is not None: + # Pick an arbitrary column to count records. + any_col = next(iter(self.domains)) + estimated_total = max( + 1.0, self.total_count_mechanism(rng, data[any_col].values) + ) + + for col, init in self.initializers.items(): + if isinstance(init, initialization.NumericalInitializer): + col_results[col] = init( + rng, data[col].values, estimated_total=estimated_total + ) # Phase 2: Encode data to discrete domain. discrete_domains = {} diff --git a/dpsynth/local_mode/initialization.py b/dpsynth/local_mode/initialization.py index cf05f3d..37147b4 100644 --- a/dpsynth/local_mode/initialization.py +++ b/dpsynth/local_mode/initialization.py @@ -26,7 +26,6 @@ import mbi import numpy as np - _M = TypeVar('_M') @@ -84,20 +83,79 @@ def calibrate(self, *, zcdp_rho: float) -> NumericalInitializer: ).calibrate(zcdp_rho=zcdp_rho) return dataclasses.replace(self, mechanism=mechanism) + @property + def zcdp_rho(self) -> float: + """Total zCDP rho consumed, derived from the composed dp_event. + + The dp_event is a ComposedDpEvent of ExponentialMechanismDpEvents. + Each exponential mechanism with parameter epsilon satisfies + (epsilon^2 / 8)-zCDP, so the total is sum(eps_i^2 / 8). + """ + event = self.dp_event # raises if not calibrated + assert isinstance(event, dp_accounting.ComposedDpEvent) + return sum(e.epsilon**2 / 8.0 for e in event.events) + @property def dp_event(self) -> dp_accounting.DpEvent: """Returns the composed privacy event for the quantile computation.""" return _validate_mechanism(self.mechanism).dp_event def __call__( - self, rng: np.random.Generator, data: np.ndarray + self, + rng: np.random.Generator, + data: np.ndarray, + *, + estimated_total: float | None = None, ) -> ColumnMeasurement: - """Returns a ColumnMeasurement with the discretization transform.""" + """Returns a ColumnMeasurement with the discretization transform. + + Args: + rng: A numpy random number generator. + data: 1D array of numerical data. + estimated_total: If provided, a heuristic one-way measurement is included + assuming a uniform distribution over the original bins, with stddev = + 1/sqrt(rho). When bin edges are deduplicated (e.g. for concentrated + integer data), merged bins receive proportionally more mass. + + Returns: + A ColumnMeasurement with the categorical attribute, bin edges, and + optionally a heuristic measurement. + """ # Dedup: concentrated data can make quantiles return duplicate edges. - edges = _validate_mechanism(self.mechanism)(rng, data) - bin_edges = np.unique(np.asarray(edges, dtype=float)) + raw_edges = _validate_mechanism(self.mechanism)(rng, data) + raw_edges = np.asarray(raw_edges, dtype=float) + if self.attribute.dtype == 'int': + # For integer data, snap edges to the integer lattice. Since bins + # are right-closed (left, right] and discretize uses searchsorted + # with side='left', floor preserves the partition: an edge at 4.7 + # splits integers as {≤4} | {≥5}, and floor(4.7) = 4 gives the + # same split via (…, 4] | (4, …]. + # Clamp to [min_value, max_value - 1] so the last bin (edge, max] + # contains at least one integer. + raw_edges = np.clip( + np.floor(raw_edges), + self.attribute.min_value, + self.attribute.max_value - 1, + ) + bin_edges, edge_counts = np.unique(raw_edges, return_counts=True) cat_attr = vtx.categorical_attribute_from_edges(bin_edges, self.attribute) - return ColumnMeasurement(cat_attr, bin_edges) + + measurement = None + if estimated_total is not None: + rho = self.zcdp_rho + # Each unique edge with count k means k original bins end at that + # boundary; append 1 for the rightmost bin (beyond the last edge). + bin_weights = np.append(edge_counts, 1) + uniform_counts = bin_weights * (estimated_total / self.num_partitions) + # Heuristic: empirically, the std dev of per-bin count noise from + # DPQuantiles is approximately 1/sqrt(rho), independent of N and + # roughly uniform across bins. See go/dpsynth-quantile-noise. + stddev = 1.0 / np.sqrt(rho) + measurement = mbi.LinearMeasurement( + uniform_counts, (self.name,), stddev=stddev + ) + + return ColumnMeasurement(cat_attr, bin_edges, measurement=measurement) @dataclasses.dataclass diff --git a/dpsynth/local_mode/primitives.py b/dpsynth/local_mode/primitives.py index 37b65eb..d941cce 100644 --- a/dpsynth/local_mode/primitives.py +++ b/dpsynth/local_mode/primitives.py @@ -267,62 +267,6 @@ def _get_threshold(delta, sigma, max_part): return thresholds.max() -def select_partitions_gaussian_thresholding( - rng: np.random.Generator, - data: np.ndarray, - gdp_budget: float, - delta: float, -) -> tuple[np.ndarray, np.ndarray, float]: - """Selects partitions using Gaussian Thresholding (Weighted Gaussian). - - This implements Algorithm 2 from the DP-SIPS paper (Swanberg et al., 2023) - under item-level DP. It is the simplest partition selection mechanism: - - 1. Compute the histogram of partition counts. - 2. Add Gaussian noise calibrated to the privacy budget. - 3. Return partitions whose noisy count exceeds a threshold chosen to - bound the false-positive probability per empty partition at delta. - - Under item-level DP each record is treated as a distinct user contributing - to exactly one partition, so the histogram has L2 sensitivity 1. The - threshold is T = 1 + sigma * Phi^{-1}(1 - delta), following the paper's - formula with max_part = 1. - - Args: - rng: A numpy random number generator. - data: 1D array of integers, where each element is a partition ID. - gdp_budget: Privacy budget in terms of squared Gaussian DP mu parameter - (gdp_budget = mu^2 = 1 / sigma^2). - delta: Failure probability (false positive bound per empty partition). - - Returns: - A tuple containing: - - selected_partitions: 1D array of partition IDs that passed the - threshold. - - estimated_counts: 1D array of noisy counts for each selected - partition. - - sigma: The standard deviation of the Gaussian noise added. - """ - if gdp_budget <= 0 or delta <= 0: - raise ValueError(f'{gdp_budget=} and {delta=} must be positive.') - - sigma = 1.0 / np.sqrt(gdp_budget) - - if data.size == 0: - return np.empty(0, dtype=data.dtype), np.empty(0, dtype=float), sigma - - unique_parts, counts = np.unique(data, return_counts=True) - noisy_counts = counts + rng.normal(scale=sigma, size=counts.size) - - # Threshold: ensures that an empty partition (true count 0) passes with - # probability at most delta. For max_part=1 this simplifies to: - # T = 1/sqrt(1) + sigma * Phi^{-1}(1 - delta) = 1 + sigma * ppf(1-delta) - threshold = 1.0 + sigma * scipy.stats.norm.ppf(1.0 - delta) - passed = noisy_counts >= threshold - - return unique_parts[passed], noisy_counts[passed], sigma - - def _select_partitions_sips( rng: np.random.Generator, data: np.ndarray, @@ -429,32 +373,6 @@ def _select_partitions_sips( return selected_partitions, selected_counts, max_sigma -def _gaussian_histogram( - rng: np.random.Generator, - data: np.ndarray, - domain_size: int, - sigma: float, -) -> np.ndarray: - """Computes a noisy histogram over a closed domain using the Gaussian mechanism. - - The histogram query has L2 sensitivity 1 under item-level DP (each record - contributes +1 to exactly one bin). Gaussian noise with the given standard - deviation is added independently to each bin count. - - Args: - rng: A numpy random number generator. - data: 1D array of integer-encoded categorical values in [0, domain_size). - domain_size: Number of categories in the closed domain. - sigma: Standard deviation of the Gaussian noise added to each bin. - - Returns: - A length-`domain_size` array of noisy counts. - """ - return np.bincount(data, minlength=domain_size) + rng.normal( - scale=sigma, size=domain_size - ) - - # --------------------------------------------------------------------------- # DPMechanism subclasses # --------------------------------------------------------------------------- @@ -561,7 +479,48 @@ def __call__(self, rng: np.random.Generator, data: np.ndarray) -> np.ndarray: """Computes a differentially private histogram.""" if self.sigma is None: raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) - return _gaussian_histogram(rng, data, self.domain_size, self.sigma) + return np.bincount(data, minlength=self.domain_size) + rng.normal( + scale=self.sigma, size=self.domain_size + ) + + +@dataclasses.dataclass +class DPGaussianCount(DPMechanism): + """Differentially private count via the Gaussian mechanism. + + Returns a noisy count of the number of records in the input data. The + conversion from zCDP is ``sigma = sqrt(0.5 / zcdp_rho)``. + + Attributes: + sigma: Gaussian noise standard deviation. Set directly or via ``calibrate``. + """ + + sigma: float | None = None + + def calibrate(self, *, zcdp_rho: float) -> DPGaussianCount: + """Returns a copy with sigma derived from the zCDP budget.""" + return dataclasses.replace(self, sigma=math.sqrt(0.5 / zcdp_rho)) + + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the Gaussian privacy event for this mechanism.""" + if self.sigma is None: + raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) + return dp_accounting.GaussianDpEvent(noise_multiplier=self.sigma) + + def __call__(self, rng: np.random.Generator, data: np.ndarray) -> float: + """Returns a differentially private count of records. + + Args: + rng: A numpy random number generator. + data: 1D array of data records. + + Returns: + The noisy count (true count + Gaussian noise). + """ + if self.sigma is None: + raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) + return float(len(data) + rng.normal(scale=self.sigma)) @dataclasses.dataclass @@ -607,7 +566,11 @@ def __call__( """ if self.sigma is None: raise ValueError(_UNCALIBRATED_MSG.format(param='sigma')) - gdp_budget = np.inf if self.sigma == 0.0 else 1.0 / (self.sigma**2) - return select_partitions_gaussian_thresholding( - rng, data, gdp_budget, self.delta - ) + if data.size == 0: + return np.empty(0, dtype=data.dtype), np.empty(0, dtype=float), self.sigma + unique_parts, counts = np.unique(data, return_counts=True) + noisy_counts = counts + rng.normal(scale=self.sigma, size=counts.size) + # Threshold ensures an empty partition passes with probability <= delta. + threshold = 1.0 + self.sigma * scipy.stats.norm.ppf(1.0 - self.delta) + passed = noisy_counts >= threshold + return unique_parts[passed], noisy_counts[passed], self.sigma diff --git a/tests/data_generation_v3_test.py b/tests/data_generation_v3_test.py index ba979e7..e1dd803 100644 --- a/tests/data_generation_v3_test.py +++ b/tests/data_generation_v3_test.py @@ -147,6 +147,38 @@ def test_calibrate_small_epsilon(self): self.assertIsInstance(synthetic_df, pd.DataFrame) self.assertListEqual(synthetic_df.columns.tolist(), ['A', 'B']) + def test_numerical_only_uses_dp_count(self): + """Numerical-only domains should allocate a DPGaussianCount for total.""" + domains = { + 'A': domain.NumericalAttribute(min_value=0, max_value=10), + 'B': domain.NumericalAttribute(min_value=-10, max_value=10), + } + df = pd.DataFrame({'A': [5, 5, 0], 'B': [5, -10, -5]}, dtype=float) + rng = np.random.default_rng(0) + calibrated = DataGenerationV3(domains=domains).calibrate(zcdp_rho=100.0) + + # total_count_mechanism should be set for numerical-only domains. + self.assertIsNotNone(calibrated.total_count_mechanism) + synthetic_df = calibrated(rng, df) + self.assertListEqual(synthetic_df.columns.tolist(), ['A', 'B']) + + def test_mixed_domain_no_dp_count(self): + """Mixed domains estimate total from categorical measurements.""" + domains = { + 'A': domain.CategoricalAttribute( + possible_values=['a', 'b', 'c'], out_of_domain_index=0 + ), + 'B': domain.NumericalAttribute(min_value=0, max_value=10), + } + df = pd.DataFrame({'A': ['a', 'b', 'c'], 'B': [1.0, 5.0, 10.0]}) + rng = np.random.default_rng(0) + calibrated = DataGenerationV3(domains=domains).calibrate(zcdp_rho=100.0) + + # No DPGaussianCount needed — total comes from categorical measurement. + self.assertIsNone(calibrated.total_count_mechanism) + synthetic_df = calibrated(rng, df) + self.assertListEqual(synthetic_df.columns.tolist(), ['A', 'B']) + if __name__ == '__main__': absltest.main() diff --git a/tests/local_mode/initialization_test.py b/tests/local_mode/initialization_test.py index 8a8c8ab..44deb86 100644 --- a/tests/local_mode/initialization_test.py +++ b/tests/local_mode/initialization_test.py @@ -96,6 +96,72 @@ def test_numerical_initializer_integer_data(self): # Domain size may be < 8 due to dedup, but must be >= 2. self.assertGreaterEqual(result.categorical_attribute.size, 2) + def test_numerical_initializer_integer_edges_are_floored(self): + """Integer attributes should produce integer-valued bin edges.""" + attr = domain.NumericalAttribute(min_value=0, max_value=100, dtype='int') + rng = np.random.default_rng(42) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=4, attribute=attr + ) + data = np.arange(100) + result = initializer.calibrate(zcdp_rho=100.0)(rng, data) + # All edges should be integers (floor was applied). + np.testing.assert_array_equal(result.bin_edges, np.floor(result.bin_edges)) + # Edges must be within [min_value, max_value - 1]. + self.assertGreaterEqual(result.bin_edges[0], 0) + self.assertLess(result.bin_edges[-1], 100) + + def test_numerical_initializer_measurement_with_merged_bins(self): + """When integer edges collapse, merged bins get proportionally more mass.""" + attr = domain.NumericalAttribute(min_value=0, max_value=100, dtype='int') + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=8, attribute=attr + ) + # Concentrated data will cause edge collisions after floor. + data = np.array([50] * 100 + [1, 99]) + result = initializer.calibrate(zcdp_rho=1.0)( + rng, data, estimated_total=100.0 + ) + self.assertIsNotNone(result.measurement) + # Measurement counts should sum to estimated_total. + np.testing.assert_allclose( + result.measurement.noisy_measurement.sum(), 100.0 + ) + + def test_numerical_initializer_measurement_with_estimated_total(self): + attr = domain.NumericalAttribute(min_value=0, max_value=10) + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='num_col', num_partitions=4, attribute=attr + ) + data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]) + result = initializer.calibrate(zcdp_rho=1.0)( + rng, data, estimated_total=100.0 + ) + + self.assertIsNotNone(result.measurement) + # Measurement should be uniform: 100.0 / num_bins for each bin. + num_bins = result.categorical_attribute.size + expected_count = 100.0 / num_bins + np.testing.assert_allclose( + result.measurement.noisy_measurement, + np.full(num_bins, expected_count), + ) + self.assertEqual(result.measurement.clique, ('num_col',)) + # stddev should be 1/sqrt(rho) = 1/sqrt(1.0) = 1.0 + self.assertAlmostEqual(result.measurement.stddev, 1.0) + + def test_numerical_initializer_no_measurement_without_estimated_total(self): + attr = domain.NumericalAttribute(min_value=0, max_value=10) + rng = np.random.default_rng(0) + initializer = initialization.NumericalInitializer( + name='test', num_partitions=4, attribute=attr + ) + data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]) + result = initializer.calibrate(zcdp_rho=1.0)(rng, data) + self.assertIsNone(result.measurement) + class CategoricalInitializerTest(absltest.TestCase): diff --git a/tests/local_mode/primitives_test.py b/tests/local_mode/primitives_test.py index 5257c68..cd10548 100644 --- a/tests/local_mode/primitives_test.py +++ b/tests/local_mode/primitives_test.py @@ -217,59 +217,43 @@ def setUp(self): def test_basic_operation(self): data = np.array([1] * 50 + [2] * 5) - selected, counts, sigma = ( - primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=10.0, delta=1e-5 - ) + mech = primitives.DPPartitionSelection( + delta=1e-5, sigma=1.0 / np.sqrt(10.0) ) + selected, counts, sigma = mech(self.rng, data) self.assertIn(1, selected) self.assertEqual(sigma, 1.0 / np.sqrt(10.0)) self.assertEqual(selected.size, counts.size) def test_empty_data(self): data = np.array([], dtype=int) - selected, counts, sigma = ( - primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=1.0, delta=1e-5 - ) - ) + mech = primitives.DPPartitionSelection(delta=1e-5, sigma=1.0) + selected, counts, sigma = mech(self.rng, data) self.assertEmpty(selected) self.assertEmpty(counts) self.assertEqual(sigma, 1.0) def test_high_budget_selects_all(self): data = np.array([1, 2, 3, 4, 5]) - selected, _, _ = primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=np.inf, delta=0.1 - ) + mech = primitives.DPPartitionSelection(delta=0.1, sigma=0.0) + selected, _, _ = mech(self.rng, data) self.assertCountEqual(selected, [1, 2, 3, 4, 5]) - def test_zero_budget_raises(self): - data = np.array([1, 2, 3]) - with self.assertRaises(ValueError): - primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=-0.1, delta=1e-5 - ) - with self.assertRaises(ValueError): - primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=1.0, delta=-0.001 - ) - def test_rare_items_not_selected(self): # One item with many occurrences, another with just 1. # With moderate budget and tight delta, the rare item should be dropped. data = np.array([1] * 100 + [2]) - selected, _, _ = primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=0.5, delta=1e-6 - ) + mech = primitives.DPPartitionSelection(delta=1e-6, sigma=1.0 / np.sqrt(0.5)) + selected, _, _ = mech(self.rng, data) self.assertIn(1, selected) self.assertNotIn(2, selected) def test_string_data_type(self): data = np.array(["a", "b", "a", "a", "c", "a", "c"]) - selected, _, _ = primitives.select_partitions_gaussian_thresholding( - self.rng, data, gdp_budget=10.0, delta=1e-5 + mech = primitives.DPPartitionSelection( + delta=1e-5, sigma=1.0 / np.sqrt(10.0) ) + selected, _, _ = mech(self.rng, data) self.assertTrue(all(isinstance(p, str) for p in selected)) @@ -281,19 +265,22 @@ def setUp(self): def test_basic_operation(self): data = np.array([0, 0, 1, 1, 1, 2]) - result = primitives._gaussian_histogram(self.rng, data, 4, sigma=1.0) + mech = primitives.DPGaussianHistogram(domain_size=4, sigma=1.0) + result = mech(self.rng, data) self.assertLen(result, 4) # Noisy counts should be close to true counts [2, 3, 1, 0]. np.testing.assert_allclose(result, [2, 3, 1, 0], atol=5.0) def test_zero_sigma(self): data = np.array([0, 0, 1, 2, 2, 2]) - result = primitives._gaussian_histogram(self.rng, data, 3, sigma=0.0) + mech = primitives.DPGaussianHistogram(domain_size=3, sigma=0.0) + result = mech(self.rng, data) np.testing.assert_array_equal(result, [2, 1, 3]) def test_empty_data(self): data = np.array([], dtype=int) - result = primitives._gaussian_histogram(self.rng, data, 3, sigma=1.0) + mech = primitives.DPGaussianHistogram(domain_size=3, sigma=1.0) + result = mech(self.rng, data) self.assertLen(result, 3) @@ -391,5 +378,36 @@ def test_dp_event_type(self): self.assertAlmostEqual(event.noise_multiplier, 1.0) +class DPGaussianCountTest(absltest.TestCase): + + def setUp(self): + super().setUp() + self.rng = np.random.default_rng(42) + + def test_calibrate_and_call(self): + mech = primitives.DPGaussianCount() + calibrated = mech.calibrate(zcdp_rho=0.5) + data = np.array([1, 2, 3, 4, 5]) + result = calibrated(self.rng, data) + self.assertIsInstance(result, float) + np.testing.assert_allclose(result, 5.0, atol=5.0) + + def test_zero_sigma_returns_exact_count(self): + mech = primitives.DPGaussianCount(sigma=0.0) + data = np.array([10, 20, 30]) + self.assertEqual(mech(self.rng, data), 3.0) + + def test_dp_event_raises_before_calibration(self): + mech = primitives.DPGaussianCount() + with self.assertRaises(ValueError): + _ = mech.dp_event + + def test_dp_event_type(self): + mech = primitives.DPGaussianCount().calibrate(zcdp_rho=0.5) + event = mech.dp_event + self.assertIsInstance(event, dp_accounting.GaussianDpEvent) + self.assertAlmostEqual(event.noise_multiplier, 1.0) + + if __name__ == "__main__": absltest.main()