Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 6 additions & 66 deletions src/pysimmmulator/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,72 +319,11 @@ def _reformat_for_mmm(self, spend_df: pd.DataFrame) -> pd.DataFrame:
logger.info("You have completed running step 5a: pivoting the data frame to an MMM format.")
return mmm_df

@staticmethod
def _geometric_adstock(vector: pd.Series, lambda_: float) -> pd.Series:
"""Applies geometric decay adstock to a vector."""
decayed_vector = [vector.values[0]]
for i, val in enumerate(vector.values[1:]):
decayed_vector.append(val + lambda_ * decayed_vector[i])
return pd.Series(decayed_vector, index=vector.index)

@staticmethod
def _weibull_adstock(vector: pd.Series, shape: float, scale: float, adstock_type: str = 'pdf') -> pd.Series:
"""Applies Weibull adstock to a vector.

Args:
vector (pd.Series): media vector
shape (float): shape parameter (k)
scale (float): scale parameter (theta)
adstock_type (str): 'pdf' or 'cdf'
"""
n = len(vector)
x = np.arange(n)
if adstock_type == 'pdf':
# Weibull PDF: (k/theta) * (x/theta)**(k-1) * exp(-(x/theta)**k)
# We normalize it so it can be used as a weighting vector
weights = (shape / scale) * (x / scale)**(shape - 1) * np.exp(-(x / scale)**shape)
else:
# Weibull CDF: 1 - exp(-(x/theta)**k)
# For adstock, we typically use the survival function (1-CDF) or its increments
weights = np.exp(-(x / scale)**shape)

weights = weights / weights.sum() if weights.sum() > 0 else weights

# Convolution for adstock
# We use 'full' and then slice to maintain length
adstocked = np.convolve(vector.values, weights)[:n]
return pd.Series(adstocked, index=vector.index)

@staticmethod
def _scurve_saturation(vector: pd.Series, alpha: float, gamma: float) -> pd.Series:
"""Applies S-curve saturation (Logistic) to a vector."""
# gamma is treated as a quantile to find the inflection point
gamma_trans = np.quantile(np.linspace(min(vector), max(vector), num=100), gamma)
denom = vector**alpha + gamma_trans**alpha
return (vector**alpha / denom) * vector if np.any(denom != 0) else vector

@staticmethod
def _hill_saturation(vector: pd.Series, alpha: float, gamma: float) -> pd.Series:
"""Applies Hill saturation to a vector.

Args:
vector (pd.Series): adstocked media vector
alpha (float): shape parameter (slope)
gamma (float): scale parameter (half-saturation point)
"""
# Hill function: x**alpha / (x**alpha + gamma**alpha)
# Often gamma is specified as a value in the same scale as x
# Here we'll treat gamma as a quantile similar to scurve for consistency in config if preferred,
# but the classic Hill uses an absolute value.
# Let's use absolute value for Hill to differentiate it.
inflection = gamma * np.max(vector) if gamma <= 1.0 else gamma
denom = vector**alpha + inflection**alpha
return (vector**alpha / denom) * vector if np.any(denom != 0) else vector

def _simulate_decay(self, mmm_df: pd.DataFrame, adstock_config: dict) -> pd.DataFrame:
"""Helper function for the simulation of adstocking.
Ad stocking is the idea that an ad has a lasting effect for some amount of time in the future.
"""
from .transforms import geometric_adstock, weibull_adstock
for channel, config in adstock_config.items():
metric = ("impressions" if channel in self.basic_params.channels_impressions else "clicks")
vector = mmm_df[f"{channel}_{metric}"]
Expand All @@ -393,9 +332,9 @@ def _simulate_decay(self, mmm_df: pd.DataFrame, adstock_config: dict) -> pd.Data
params = config["params"].copy()
if 'lambda' in params:
params['lambda_'] = params.pop('lambda')
mmm_df[f"{channel}_{metric}_adstocked"] = self._geometric_adstock(vector, **params)
mmm_df[f"{channel}_{metric}_adstocked"] = geometric_adstock(vector, **params)
elif config["type"] == "weibull":
mmm_df[f"{channel}_{metric}_adstocked"] = self._weibull_adstock(vector, **config["params"])
mmm_df[f"{channel}_{metric}_adstocked"] = weibull_adstock(vector, **config["params"])
else:
logger.warning(f"Unknown adstock type {config['type']} for channel {channel}. Using raw values.")
mmm_df[f"{channel}_{metric}_adstocked"] = vector
Expand All @@ -405,14 +344,15 @@ def _simulate_decay(self, mmm_df: pd.DataFrame, adstock_config: dict) -> pd.Data

def _simulate_diminishing_returns(self, mmm_df: pd.DataFrame, saturation_config: dict) -> pd.DataFrame:
"""Helper function for the simulation of diminishing returns."""
from .transforms import scurve_saturation, hill_saturation
for channel, config in saturation_config.items():
metric = ("impressions" if channel in self.basic_params.channels_impressions else "clicks")
target = mmm_df[f"{channel}_{metric}_adstocked"]

if config["type"] == "scurve":
mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = self._scurve_saturation(target, **config["params"])
mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = scurve_saturation(target, **config["params"])
elif config["type"] == "hill":
mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = self._hill_saturation(target, **config["params"])
mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = hill_saturation(target, **config["params"])
else:
logger.warning(f"Unknown saturation type {config['type']} for channel {channel}. Using adstocked values.")
mmm_df[f"{channel}_{metric}_adstocked_decay_diminishing"] = target
Expand Down
60 changes: 60 additions & 0 deletions src/pysimmmulator/transforms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pandas as pd
import numpy as np

def geometric_adstock(vector: pd.Series, lambda_: float) -> pd.Series:
"""Applies geometric decay adstock to a vector."""
decayed_vector = [vector.values[0]]
for i, val in enumerate(vector.values[1:]):
decayed_vector.append(val + lambda_ * decayed_vector[i])
return pd.Series(decayed_vector, index=vector.index)

def weibull_adstock(vector: pd.Series, shape: float, scale: float, adstock_type: str = 'pdf') -> pd.Series:
"""Applies Weibull adstock to a vector.

Args:
vector (pd.Series): media vector
shape (float): shape parameter (k)
scale (float): scale parameter (theta)
adstock_type (str): 'pdf' or 'cdf'
"""
n = len(vector)
x = np.arange(n)
if adstock_type == 'pdf':
# Weibull PDF: (k/theta) * (x/theta)**(k-1) * exp(-(x/theta)**k)
# We normalize it so it can be used as a weighting vector
weights = (shape / scale) * (x / scale)**(shape - 1) * np.exp(-(x / scale)**shape)
else:
# Weibull CDF: 1 - exp(-(x/theta)**k)
# For adstock, we typically use the survival function (1-CDF) or its increments
weights = np.exp(-(x / scale)**shape)

weights = weights / weights.sum() if weights.sum() > 0 else weights

# Convolution for adstock
# We use 'full' and then slice to maintain length
adstocked = np.convolve(vector.values, weights)[:n]
return pd.Series(adstocked, index=vector.index)

def scurve_saturation(vector: pd.Series, alpha: float, gamma: float) -> pd.Series:
"""Applies S-curve saturation (Logistic) to a vector."""
# gamma is treated as a quantile to find the inflection point
gamma_trans = np.quantile(np.linspace(min(vector), max(vector), num=100), gamma)
denom = vector**alpha + gamma_trans**alpha
return (vector**alpha / denom) * vector if np.any(denom != 0) else vector

def hill_saturation(vector: pd.Series, alpha: float, gamma: float) -> pd.Series:
"""Applies Hill saturation to a vector.

Args:
vector (pd.Series): adstocked media vector
alpha (float): shape parameter (slope)
gamma (float): scale parameter (half-saturation point)
"""
# Hill function: x**alpha / (x**alpha + gamma**alpha)
# Often gamma is specified as a value in the same scale as x
# Here we'll treat gamma as a quantile similar to scurve for consistency in config if preferred,
# but the classic Hill uses an absolute value.
# Let's use absolute value for Hill to differentiate it.
inflection = gamma * np.max(vector) if gamma <= 1.0 else gamma
denom = vector**alpha + inflection**alpha
return (vector**alpha / denom) * vector if np.any(denom != 0) else vector
14 changes: 9 additions & 5 deletions tests/test_adstock_saturation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import pytest
import pandas as pd
import numpy as np
from pysimmmulator.simulate import Simulate
from pysimmmulator.transforms import (
geometric_adstock, weibull_adstock, scurve_saturation, hill_saturation
)

def test_geometric_adstock():
vector = pd.Series([100, 0, 0, 0])
lambda_ = 0.5
adstocked = Simulate._geometric_adstock(vector, lambda_)
adstocked = geometric_adstock(vector, lambda_)
expected = [100, 50, 25, 12.5]
assert np.allclose(adstocked.values, expected)

Expand All @@ -14,7 +18,7 @@ def test_weibull_adstock_pdf():
# Weibull with shape > 1 should peak after day 0
shape = 2.0
scale = 2.0
adstocked = Simulate._weibull_adstock(vector, shape, scale, adstock_type='pdf')
adstocked = weibull_adstock(vector, shape, scale, adstock_type='pdf')
# Peak should not be at index 0
assert adstocked.values[1] > adstocked.values[0]
assert len(adstocked) == len(vector)
Expand All @@ -23,7 +27,7 @@ def test_weibull_adstock_cdf():
vector = pd.Series([100, 0, 0, 0, 0])
shape = 2.0
scale = 2.0
adstocked = Simulate._weibull_adstock(vector, shape, scale, adstock_type='cdf')
adstocked = weibull_adstock(vector, shape, scale, adstock_type='cdf')
# Should decay from the peak at index 0
assert adstocked.values[0] > adstocked.values[1]
assert len(adstocked) == len(vector)
Expand All @@ -32,7 +36,7 @@ def test_scurve_saturation():
vector = pd.Series([0, 10, 100, 1000])
alpha = 2.0
gamma = 0.5
saturated = Simulate._scurve_saturation(vector, alpha, gamma)
saturated = scurve_saturation(vector, alpha, gamma)
assert saturated[0] == 0
assert saturated[3] < 1000 # Diminishing returns
assert len(saturated) == len(vector)
Expand All @@ -41,7 +45,7 @@ def test_hill_saturation():
vector = pd.Series([0, 10, 100, 1000])
alpha = 2.0
gamma = 100.0 # Absolute value
saturated = Simulate._hill_saturation(vector, alpha, gamma)
saturated = hill_saturation(vector, alpha, gamma)
assert saturated[0] == 0
assert saturated[3] < 1000
# At vector=100, saturated should be 100 * (100**2 / (100**2 + 100**2)) = 100 * 0.5 = 50
Expand Down
Loading