From 26c351e7b6c2e056c9da16a964e66d6bf3a9ad94 Mon Sep 17 00:00:00 2001 From: RyanAugust Date: Wed, 29 Apr 2026 20:15:30 +0000 Subject: [PATCH] math outside of core classes --- src/pysimmmulator/simulate.py | 72 +++----------------------------- src/pysimmmulator/transforms.py | 60 ++++++++++++++++++++++++++ tests/test_adstock_saturation.py | 14 ++++--- 3 files changed, 75 insertions(+), 71 deletions(-) create mode 100644 src/pysimmmulator/transforms.py diff --git a/src/pysimmmulator/simulate.py b/src/pysimmmulator/simulate.py index 7494d6d..6500740 100644 --- a/src/pysimmmulator/simulate.py +++ b/src/pysimmmulator/simulate.py @@ -319,72 +319,11 @@ def _reformat_for_mmm(self, spend_df: pd.DataFrame) -> pd.DataFrame: logger.info("You have completed running step 5a: pivoting the data frame to an MMM format.") return mmm_df - @staticmethod - def _geometric_adstock(vector: pd.Series, lambda_: float) -> pd.Series: - """Applies geometric decay adstock to a vector.""" - decayed_vector = [vector.values[0]] - for i, val in enumerate(vector.values[1:]): - decayed_vector.append(val + lambda_ * decayed_vector[i]) - return pd.Series(decayed_vector, index=vector.index) - - @staticmethod - def _weibull_adstock(vector: pd.Series, shape: float, scale: float, adstock_type: str = 'pdf') -> pd.Series: - """Applies Weibull adstock to a vector. - - Args: - vector (pd.Series): media vector - shape (float): shape parameter (k) - scale (float): scale parameter (theta) - adstock_type (str): 'pdf' or 'cdf' - """ - n = len(vector) - x = np.arange(n) - if adstock_type == 'pdf': - # Weibull PDF: (k/theta) * (x/theta)**(k-1) * exp(-(x/theta)**k) - # We normalize it so it can be used as a weighting vector - weights = (shape / scale) * (x / scale)**(shape - 1) * np.exp(-(x / scale)**shape) - else: - # Weibull CDF: 1 - exp(-(x/theta)**k) - # For adstock, we typically use the survival function (1-CDF) or its increments - weights = np.exp(-(x / scale)**shape) - - weights = weights / weights.sum() if weights.sum() > 0 else weights - - # Convolution for adstock - # We use 'full' and then slice to maintain length - adstocked = np.convolve(vector.values, weights)[:n] - return pd.Series(adstocked, index=vector.index) - - @staticmethod - def _scurve_saturation(vector: pd.Series, alpha: float, gamma: float) -> pd.Series: - """Applies S-curve saturation (Logistic) to a vector.""" - # gamma is treated as a quantile to find the inflection point - gamma_trans = np.quantile(np.linspace(min(vector), max(vector), num=100), gamma) - denom = vector**alpha + gamma_trans**alpha - return (vector**alpha / denom) * vector if np.any(denom != 0) else vector - - @staticmethod - def _hill_saturation(vector: pd.Series, alpha: float, gamma: float) -> pd.Series: - """Applies Hill saturation to a vector. - - Args: - vector (pd.Series): adstocked media vector - alpha (float): shape parameter (slope) - gamma (float): scale parameter (half-saturation point) - """ - # Hill function: x**alpha / (x**alpha + gamma**alpha) - # Often gamma is specified as a value in the same scale as x - # Here we'll treat gamma as a quantile similar to scurve for consistency in config if preferred, - # but the classic Hill uses an absolute value. - # Let's use absolute value for Hill to differentiate it. - inflection = gamma * np.max(vector) if gamma <= 1.0 else gamma - denom = vector**alpha + inflection**alpha - return (vector**alpha / denom) * vector if np.any(denom != 0) else vector - def _simulate_decay(self, mmm_df: pd.DataFrame, adstock_config: dict) -> pd.DataFrame: """Helper function for the simulation of adstocking. Ad stocking is the idea that an ad has a lasting effect for some amount of time in the future. """ + from .transforms import geometric_adstock, weibull_adstock for channel, config in adstock_config.items(): metric = ("impressions" if channel in self.basic_params.channels_impressions else "clicks") vector = mmm_df[f"{channel}_{metric}"] @@ -393,9 +332,9 @@ def _simulate_decay(self, mmm_df: pd.DataFrame, adstock_config: dict) -> pd.Data params = config["params"].copy() if 'lambda' in params: params['lambda_'] = params.pop('lambda') - mmm_df[f"{channel}_{metric}_adstocked"] = self._geometric_adstock(vector, **params) + mmm_df[f"{channel}_{metric}_adstocked"] = geometric_adstock(vector, **params) elif config["type"] == "weibull": - mmm_df[f"{channel}_{metric}_adstocked"] = self._weibull_adstock(vector, **config["params"]) + mmm_df[f"{channel}_{metric}_adstocked"] = weibull_adstock(vector, **config["params"]) else: logger.warning(f"Unknown adstock type {config['type']} for channel {channel}. Using raw values.") mmm_df[f"{channel}_{metric}_adstocked"] = vector @@ -405,14 +344,15 @@ def _simulate_decay(self, mmm_df: pd.DataFrame, adstock_config: dict) -> pd.Data def _simulate_diminishing_returns(self, mmm_df: pd.DataFrame, saturation_config: dict) -> pd.DataFrame: """Helper function for the simulation of diminishing returns.""" + from .transforms import scurve_saturation, hill_saturation for channel, config in saturation_config.items(): metric = ("impressions" if channel in self.basic_params.channels_impressions else "clicks") target = mmm_df[f"{channel}_{metric}_adstocked"] if config["type"] == "scurve": - mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = self._scurve_saturation(target, **config["params"]) + mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = scurve_saturation(target, **config["params"]) elif config["type"] == "hill": - mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = self._hill_saturation(target, **config["params"]) + mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = hill_saturation(target, **config["params"]) else: logger.warning(f"Unknown saturation type {config['type']} for channel {channel}. Using adstocked values.") mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = target diff --git a/src/pysimmmulator/transforms.py b/src/pysimmmulator/transforms.py new file mode 100644 index 0000000..385a962 --- /dev/null +++ b/src/pysimmmulator/transforms.py @@ -0,0 +1,60 @@ +import pandas as pd +import numpy as np + +def geometric_adstock(vector: pd.Series, lambda_: float) -> pd.Series: + """Applies geometric decay adstock to a vector.""" + decayed_vector = [vector.values[0]] + for i, val in enumerate(vector.values[1:]): + decayed_vector.append(val + lambda_ * decayed_vector[i]) + return pd.Series(decayed_vector, index=vector.index) + +def weibull_adstock(vector: pd.Series, shape: float, scale: float, adstock_type: str = 'pdf') -> pd.Series: + """Applies Weibull adstock to a vector. + + Args: + vector (pd.Series): media vector + shape (float): shape parameter (k) + scale (float): scale parameter (theta) + adstock_type (str): 'pdf' or 'cdf' + """ + n = len(vector) + x = np.arange(n) + if adstock_type == 'pdf': + # Weibull PDF: (k/theta) * (x/theta)**(k-1) * exp(-(x/theta)**k) + # We normalize it so it can be used as a weighting vector + weights = (shape / scale) * (x / scale)**(shape - 1) * np.exp(-(x / scale)**shape) + else: + # Weibull CDF: 1 - exp(-(x/theta)**k) + # For adstock, we typically use the survival function (1-CDF) or its increments + weights = np.exp(-(x / scale)**shape) + + weights = weights / weights.sum() if weights.sum() > 0 else weights + + # Convolution for adstock + # We use 'full' and then slice to maintain length + adstocked = np.convolve(vector.values, weights)[:n] + return pd.Series(adstocked, index=vector.index) + +def scurve_saturation(vector: pd.Series, alpha: float, gamma: float) -> pd.Series: + """Applies S-curve saturation (Logistic) to a vector.""" + # gamma is treated as a quantile to find the inflection point + gamma_trans = np.quantile(np.linspace(min(vector), max(vector), num=100), gamma) + denom = vector**alpha + gamma_trans**alpha + return (vector**alpha / denom) * vector if np.any(denom != 0) else vector + +def hill_saturation(vector: pd.Series, alpha: float, gamma: float) -> pd.Series: + """Applies Hill saturation to a vector. + + Args: + vector (pd.Series): adstocked media vector + alpha (float): shape parameter (slope) + gamma (float): scale parameter (half-saturation point) + """ + # Hill function: x**alpha / (x**alpha + gamma**alpha) + # Often gamma is specified as a value in the same scale as x + # Here we'll treat gamma as a quantile similar to scurve for consistency in config if preferred, + # but the classic Hill uses an absolute value. + # Let's use absolute value for Hill to differentiate it. + inflection = gamma * np.max(vector) if gamma <= 1.0 else gamma + denom = vector**alpha + inflection**alpha + return (vector**alpha / denom) * vector if np.any(denom != 0) else vector diff --git a/tests/test_adstock_saturation.py b/tests/test_adstock_saturation.py index bc63708..9597e62 100644 --- a/tests/test_adstock_saturation.py +++ b/tests/test_adstock_saturation.py @@ -1,11 +1,15 @@ +import pytest import pandas as pd import numpy as np from pysimmmulator.simulate import Simulate +from pysimmmulator.transforms import ( + geometric_adstock, weibull_adstock, scurve_saturation, hill_saturation +) def test_geometric_adstock(): vector = pd.Series([100, 0, 0, 0]) lambda_ = 0.5 - adstocked = Simulate._geometric_adstock(vector, lambda_) + adstocked = geometric_adstock(vector, lambda_) expected = [100, 50, 25, 12.5] assert np.allclose(adstocked.values, expected) @@ -14,7 +18,7 @@ def test_weibull_adstock_pdf(): # Weibull with shape > 1 should peak after day 0 shape = 2.0 scale = 2.0 - adstocked = Simulate._weibull_adstock(vector, shape, scale, adstock_type='pdf') + adstocked = weibull_adstock(vector, shape, scale, adstock_type='pdf') # Peak should not be at index 0 assert adstocked.values[1] > adstocked.values[0] assert len(adstocked) == len(vector) @@ -23,7 +27,7 @@ def test_weibull_adstock_cdf(): vector = pd.Series([100, 0, 0, 0, 0]) shape = 2.0 scale = 2.0 - adstocked = Simulate._weibull_adstock(vector, shape, scale, adstock_type='cdf') + adstocked = weibull_adstock(vector, shape, scale, adstock_type='cdf') # Should decay from the peak at index 0 assert adstocked.values[0] > adstocked.values[1] assert len(adstocked) == len(vector) @@ -32,7 +36,7 @@ def test_scurve_saturation(): vector = pd.Series([0, 10, 100, 1000]) alpha = 2.0 gamma = 0.5 - saturated = Simulate._scurve_saturation(vector, alpha, gamma) + saturated = scurve_saturation(vector, alpha, gamma) assert saturated[0] == 0 assert saturated[3] < 1000 # Diminishing returns assert len(saturated) == len(vector) @@ -41,7 +45,7 @@ def test_hill_saturation(): vector = pd.Series([0, 10, 100, 1000]) alpha = 2.0 gamma = 100.0 # Absolute value - saturated = Simulate._hill_saturation(vector, alpha, gamma) + saturated = hill_saturation(vector, alpha, gamma) assert saturated[0] == 0 assert saturated[3] < 1000 # At vector=100, saturated should be 100 * (100**2 / (100**2 + 100**2)) = 100 * 0.5 = 50