Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 97 additions & 16 deletions dpsynth/data_generation_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dpsynth.local_mode import primitives
from dpsynth.local_mode import vectorized_transformations as vtx
import mbi
from mbi import estimation as mbi_estimation
import numpy as np
import pandas as pd

Expand All @@ -36,7 +37,10 @@ def _create_initializers(
domains: Mapping[str, domain.AttributeType],
numerical_bins: int,
init_delta: float,
) -> dict[str, primitives.DPMechanism]:
) -> tuple[
dict[str, primitives.DPMechanism],
dict[str, primitives.DPMechanism],
]:
"""Creates per-column initializers from the domain specification.

Args:
Expand All @@ -45,30 +49,31 @@ def _create_initializers(
init_delta: Delta for open-set categorical partition selection.

Returns:
A dictionary mapping column names to uncalibrated initializer instances.
A (closed_domain_initializers, numerical_initializers) tuple.

Raises:
ValueError: If a column has an unsupported attribute type.
"""
initializers = {}
closed_inits = {}
numerical_inits = {}
for col, attr in domains.items():
if isinstance(attr, domain.NumericalAttribute):
initializers[col] = initialization.NumericalInitializer(
numerical_inits[col] = initialization.NumericalInitializer(
name=col, num_partitions=numerical_bins, attribute=attr
)
elif isinstance(attr, domain.CategoricalAttribute):
initializers[col] = initialization.CategoricalInitializer(
closed_inits[col] = initialization.CategoricalInitializer(
name=col, attribute=attr
)
elif isinstance(attr, domain.OpenSetCategoricalAttribute):
initializers[col] = initialization.OpenSetCategoricalInitializer(
closed_inits[col] = initialization.OpenSetCategoricalInitializer(
name=col, attribute=attr, delta=init_delta
)
else:
raise ValueError(
f'Unsupported attribute type for column {col!r}: {type(attr)}'
)
return initializers
return closed_inits, numerical_inits


@dataclasses.dataclass
Expand Down Expand Up @@ -100,6 +105,7 @@ class DataGenerationV3(primitives.DPMechanism):
default_factory=discrete_mechanisms.MSTMechanism
)
initializers: dict[str, primitives.DPMechanism] | None = None
total_count_mechanism: primitives.DPGaussianCount | None = None
cross_attribute_constraints: Sequence[constraints.Constraint] = ()

def calibrate(
Expand Down Expand Up @@ -178,22 +184,49 @@ def _calibrate_zcdp(
self, zcdp_rho, numerical_bins, init_delta, init_budget_fraction
):
"""Simple additive zCDP budget split."""
inits = self.initializers or _create_initializers(
self.domains, numerical_bins, init_delta
if self.initializers is not None:
all_inits = dict(self.initializers)
has_closed_domain = any(
not isinstance(init, initialization.NumericalInitializer)
for init in all_inits.values()
)
else:
closed_inits, numerical_inits = _create_initializers(
self.domains, numerical_bins, init_delta
)
all_inits = {**closed_inits, **numerical_inits}
has_closed_domain = bool(closed_inits)

has_numerical = any(
isinstance(init, initialization.NumericalInitializer)
for init in all_inits.values()
)
needs_total_count = has_numerical and not has_closed_domain

init_rho = init_budget_fraction * zcdp_rho
per_col_rho = init_rho / len(inits)
# If we need a separate total count and have no closed-domain columns to
# estimate it from, allocate one extra share of init budget for it.
num_shares = len(all_inits) + (1 if needs_total_count else 0)
per_col_rho = init_rho / num_shares
discrete_rho = zcdp_rho - init_rho

calibrated_inits = {
col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items()
col: init.calibrate(zcdp_rho=per_col_rho)
for col, init in all_inits.items()
}
calibrated_total = (
primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho)
if needs_total_count
else None
)
calibrated_discrete = self.discrete_mechanism.calibrate(
zcdp_rho=discrete_rho
)
return dataclasses.replace(
self,
initializers=calibrated_inits,
discrete_mechanism=calibrated_discrete,
total_count_mechanism=calibrated_total,
)

def _calibrate_approx_dp(
Expand Down Expand Up @@ -226,10 +259,24 @@ def _calibrate_approx_dp(
Returns:
A new DataGenerationV3 instance with calibrated sub-mechanisms.
"""
inits = self.initializers or _create_initializers(
self.domains, numerical_bins, init_delta
if self.initializers is not None:
inits = dict(self.initializers)
has_closed_domain = any(
not isinstance(init, initialization.NumericalInitializer)
for init in inits.values()
)
else:
closed_inits, numerical_inits = _create_initializers(
self.domains, numerical_bins, init_delta
)
inits = {**closed_inits, **numerical_inits}
has_closed_domain = bool(closed_inits)
has_numerical = any(
isinstance(init, initialization.NumericalInitializer)
for init in inits.values()
)
num_columns = len(inits)
needs_total_count = has_numerical and not has_closed_domain
num_columns = len(inits) + (1 if needs_total_count else 0)

# Stage 1: Convert (epsilon, remaining_delta) to zCDP and calibrate
# initializers with init_budget_fraction of that budget.
Expand All @@ -244,11 +291,17 @@ def _calibrate_approx_dp(
calibrated_inits = {
col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items()
}

calibrated_total = (
primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho)
if needs_total_count
else None
)
# Stage 2: With init dp_events fixed, find the tightest discrete budget.
# The accountant handles ApproximateDpEvent deltas from open-set
# initializers automatically.
init_events = [init.dp_event for init in calibrated_inits.values()]
if calibrated_total is not None:
init_events.append(calibrated_total.dp_event)

# Determine accountant type based on discrete mechanism's dp_event.
probe_event = self.discrete_mechanism.calibrate(zcdp_rho=1.0).dp_event
Expand Down Expand Up @@ -277,6 +330,7 @@ def make_event_from_param(discrete_rho):
self,
initializers=calibrated_inits,
discrete_mechanism=calibrated_discrete,
total_count_mechanism=calibrated_total,
)

@property
Expand All @@ -292,6 +346,8 @@ def dp_event(self) -> dp_accounting.DpEvent:
if self.initializers is None:
raise ValueError('Must call calibrate() before accessing dp_event.')
events = [init.dp_event for init in self.initializers.values()]
if self.total_count_mechanism is not None:
events.append(self.total_count_mechanism.dp_event)
events.append(self.discrete_mechanism.dp_event)
return dp_accounting.ComposedDpEvent(events)

Expand Down Expand Up @@ -321,9 +377,34 @@ def __call__(
)

# Phase 1: Per-column initialization.
# Initialize closed-domain columns first to estimate the total count,
# then pass it to numerical initializers for heuristic measurements.
col_results: dict[str, initialization.ColumnMeasurement] = {}
closed_domain_measurements = []
for col, init in self.initializers.items():
col_results[col] = init(rng, data[col].values)
if not isinstance(init, initialization.NumericalInitializer):
col_results[col] = init(rng, data[col].values)
if col_results[col].measurement is not None:
closed_domain_measurements.append(col_results[col].measurement)

# Estimate total from closed-domain measurements or DPGaussianCount.
estimated_total = None
if closed_domain_measurements:
estimated_total = mbi_estimation.minimum_variance_unbiased_total(
closed_domain_measurements
)
elif self.total_count_mechanism is not None:
# Pick an arbitrary column to count records.
any_col = next(iter(self.domains))
estimated_total = max(
1.0, self.total_count_mechanism(rng, data[any_col].values)
)

for col, init in self.initializers.items():
if isinstance(init, initialization.NumericalInitializer):
col_results[col] = init(
rng, data[col].values, estimated_total=estimated_total
)

# Phase 2: Encode data to discrete domain.
discrete_domains = {}
Expand Down
70 changes: 64 additions & 6 deletions dpsynth/local_mode/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import mbi
import numpy as np


_M = TypeVar('_M')


Expand Down Expand Up @@ -84,20 +83,79 @@ def calibrate(self, *, zcdp_rho: float) -> NumericalInitializer:
).calibrate(zcdp_rho=zcdp_rho)
return dataclasses.replace(self, mechanism=mechanism)

@property
def zcdp_rho(self) -> float:
"""Total zCDP rho consumed, derived from the composed dp_event.

The dp_event is a ComposedDpEvent of ExponentialMechanismDpEvents.
Each exponential mechanism with parameter epsilon satisfies
(epsilon^2 / 8)-zCDP, so the total is sum(eps_i^2 / 8).
"""
event = self.dp_event # raises if not calibrated
assert isinstance(event, dp_accounting.ComposedDpEvent)
return sum(e.epsilon**2 / 8.0 for e in event.events)

@property
def dp_event(self) -> dp_accounting.DpEvent:
"""Returns the composed privacy event for the quantile computation."""
return _validate_mechanism(self.mechanism).dp_event

def __call__(
self, rng: np.random.Generator, data: np.ndarray
self,
rng: np.random.Generator,
data: np.ndarray,
*,
estimated_total: float | None = None,
) -> ColumnMeasurement:
"""Returns a ColumnMeasurement with the discretization transform."""
"""Returns a ColumnMeasurement with the discretization transform.

Args:
rng: A numpy random number generator.
data: 1D array of numerical data.
estimated_total: If provided, a heuristic one-way measurement is included
assuming a uniform distribution over the original bins, with stddev =
1/sqrt(rho). When bin edges are deduplicated (e.g. for concentrated
integer data), merged bins receive proportionally more mass.

Returns:
A ColumnMeasurement with the categorical attribute, bin edges, and
optionally a heuristic measurement.
"""
# Dedup: concentrated data can make quantiles return duplicate edges.
edges = _validate_mechanism(self.mechanism)(rng, data)
bin_edges = np.unique(np.asarray(edges, dtype=float))
raw_edges = _validate_mechanism(self.mechanism)(rng, data)
raw_edges = np.asarray(raw_edges, dtype=float)
if self.attribute.dtype == 'int':
# For integer data, snap edges to the integer lattice. Since bins
# are right-closed (left, right] and discretize uses searchsorted
# with side='left', floor preserves the partition: an edge at 4.7
# splits integers as {≤4} | {≥5}, and floor(4.7) = 4 gives the
# same split via (…, 4] | (4, …].
# Clamp to [min_value, max_value - 1] so the last bin (edge, max]
# contains at least one integer.
raw_edges = np.clip(
np.floor(raw_edges),
self.attribute.min_value,
self.attribute.max_value - 1,
)
bin_edges, edge_counts = np.unique(raw_edges, return_counts=True)
cat_attr = vtx.categorical_attribute_from_edges(bin_edges, self.attribute)
return ColumnMeasurement(cat_attr, bin_edges)

measurement = None
if estimated_total is not None:
rho = self.zcdp_rho
# Each unique edge with count k means k original bins end at that
# boundary; append 1 for the rightmost bin (beyond the last edge).
bin_weights = np.append(edge_counts, 1)
uniform_counts = bin_weights * (estimated_total / self.num_partitions)
# Heuristic: empirically, the std dev of per-bin count noise from
# DPQuantiles is approximately 1/sqrt(rho), independent of N and
# roughly uniform across bins. See go/dpsynth-quantile-noise.
stddev = 1.0 / np.sqrt(rho)
measurement = mbi.LinearMeasurement(
uniform_counts, (self.name,), stddev=stddev
)

return ColumnMeasurement(cat_attr, bin_edges, measurement=measurement)


@dataclasses.dataclass
Expand Down
Loading
Loading