Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/pysimmmulator/geos.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ def distribute_to_geos(
if any(perf_spec) != 0.0 and "total_revenue" in geo_dataframe.columns:
geo_dataframe["total_revenue"] *= ( 1 + abs(rng.normal(loc=pop_pct * perf_spec[0], scale=perf_spec[1])))
geo_dataframe["geo_name"] = geo_name

# Ensure reach in geo does not exceed geo population
geo_reach_cols = [c for c in geo_dataframe.columns if "reach" in c]
if geo_reach_cols:
geo_dataframe[geo_reach_cols] = np.minimum(geo_dataframe[geo_reach_cols], geo_pop)

geo_dataframes.append(geo_dataframe)
final = pd.concat(geo_dataframes, axis=0)
final = final.reset_index().set_index(["geo_name", "date"])
Expand Down
18 changes: 9 additions & 9 deletions src/pysimmmulator/param_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def check(self):
> 0), "You entered less than 1 year. Must generate more than a years worth of data"
if self.true_cvr is not None:
assert len(self.true_cvr.keys()) == len(
self.all_channels
set(self.all_channels)
), "True CVR must have equal number of entries as channel impressions and channel clicks"
for cvr in self.true_cvr.values():
assert (
Expand Down Expand Up @@ -164,8 +164,8 @@ def check(self, basic_params: BasicParameters):
Args:
basic_params (basic_parameters): Previously submitted parameters as required by the simmmulate class """

assert sorted(self.true_cpmcpc_channels) == sorted(
basic_params.all_channels
assert sorted(set(self.true_cpmcpc_channels)) == sorted(
set(basic_params.all_channels)
), "Channels declared within true_cpm & true_cpc must be the same as original base channel input"
for val in self.true_cpm.values():
assert isinstance(val, float), "cpm values must be of type float"
Expand All @@ -174,8 +174,8 @@ def check(self, basic_params: BasicParameters):
assert isinstance(val, float), "cpc values must be of type float"
assert val > 0, "CPC values must be greater than 0"

assert sorted(self.noise_channels) == sorted(
basic_params.all_channels
assert sorted(set(self.noise_channels)) == sorted(
set(basic_params.all_channels)
), "Channels declared within noisy_cpm_cpc must be the same as original base channel input"

if self.true_reach_frequency:
Expand Down Expand Up @@ -215,8 +215,8 @@ def check(self, basic_params: BasicParameters):
Args:
basic_params (basic_parameters): Previously submitted parameters as required by the simmmulate class
"""
assert sorted(self.noise_channels) == sorted(
basic_params.all_channels
assert sorted(set(self.noise_channels)) == sorted(
set(basic_params.all_channels)
), "Channels declared within noisy_cpm_cpc must be the same as original base channel input"

@dataclass
Expand Down Expand Up @@ -250,11 +250,11 @@ def check(self, basic_params: BasicParameters):
basic_params (basic_parameters): Previously submitted parameters as required by the simmmulate class
"""
assert sorted(list(self.adstock.keys())) == sorted(
basic_params.all_channels
set(basic_params.all_channels)
), "Channels declared within adstock must be the same as original base channel input"

assert sorted(list(self.saturation.keys())) == sorted(
basic_params.all_channels
set(basic_params.all_channels)
), "Channels declared within saturation must be the same as original base channel input"

@dataclass
Expand Down
44 changes: 35 additions & 9 deletions src/pysimmmulator/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,13 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd.
**params.noisy_cpm_cpc[channel],
low=-min(params.true_cpm.get(channel, np.inf), params.true_cpc.get(channel, np.inf)))

channel_true_cpm_value = (params.true_cpm[channel] if channel in params.true_cpm.keys() else np.nan)
channel_noisy_cpm_value = (params.true_cpm[channel] + channel_noise if channel in params.true_cpm.keys() else np.nan)
channel_true_cpm_value = (params.true_cpm[channel] if channel in params.true_cpm.keys() else 1e10) # Default to very high CPM
channel_noisy_cpm_value = (channel_true_cpm_value + channel_noise if channel in params.true_cpm.keys() else 1e10)
spend_df.loc[channel_idx, "true_cpm"] = channel_true_cpm_value
spend_df.loc[channel_idx, "noisy_cpm"] = channel_noisy_cpm_value

channel_true_cpc_value = (params.true_cpc[channel] if channel in params.true_cpc.keys() else np.nan)
channel_noisy_cpc_value = (params.true_cpc[channel] + channel_noise if channel in params.true_cpc.keys() else np.nan)
channel_true_cpc_value = (params.true_cpc[channel] if channel in params.true_cpc.keys() else 1e10) # Default to very high CPC
channel_noisy_cpc_value = (channel_true_cpc_value + channel_noise if channel in params.true_cpc.keys() else 1e10)
spend_df.loc[channel_idx, "true_cpc"] = channel_true_cpc_value
spend_df.loc[channel_idx, "noisy_cpc"] = channel_noisy_cpc_value

Expand All @@ -262,24 +262,32 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd.

spend_df["lifetime_impressions"] = np.round( spend_df["spend_channel"] / spend_df["noisy_cpm"] * 1000, 0)
spend_df["lifetime_clicks"] = np.round( spend_df["spend_channel"] / spend_df["noisy_cpc"], 0)
# CTR cannot exceed 100%
# Handle NaN for clicks (some channels might only have impressions)
mask = ~np.isnan(spend_df["lifetime_clicks"]) & ~np.isnan(spend_df["lifetime_impressions"])
spend_df.loc[mask, "lifetime_clicks"] = np.minimum(spend_df.loc[mask, "lifetime_clicks"], spend_df.loc[mask, "lifetime_impressions"])

# Reach and Frequency calculation
spend_df["lifetime_reach"] = np.nan
spend_df["lifetime_frequency"] = np.nan

if params.true_reach_frequency:
population = getattr(self, "total_population", None)
for channel in params.reach_frequency_channels:
channel_idx = spend_df[spend_df["channel"] == channel].index
rf_config = params.true_reach_frequency[channel]

if "frequency" in rf_config:
freq = rf_config["frequency"]
spend_df.loc[channel_idx, "lifetime_frequency"] = freq
spend_df.loc[channel_idx, "lifetime_reach"] = np.round(spend_df.loc[channel_idx, "lifetime_impressions"] / freq, 0)
# reach = impressions / frequency. Since frequency >= 1, reach <= impressions.
reach_count = np.round(spend_df.loc[channel_idx, "lifetime_impressions"] / freq, 0)
if population is not None:
reach_count = np.minimum(reach_count, population)
spend_df.loc[channel_idx, "lifetime_reach"] = reach_count
elif "reach" in rf_config:
reach_val = rf_config["reach"]
if reach_val <= 1.0:
population = getattr(self, "total_population", None)
if population is None:
logger.warning(f"Reach for {channel} is <= 1.0 but no total_population found. Treating as absolute reach count.")
reach_count = reach_val
Expand All @@ -288,8 +296,22 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd.
else:
reach_count = reach_val

spend_df.loc[channel_idx, "lifetime_reach"] = np.round(reach_count, 0)
spend_df.loc[channel_idx, "lifetime_frequency"] = spend_df.loc[channel_idx, "lifetime_impressions"] / np.maximum(spend_df.loc[channel_idx, "lifetime_reach"], 1)
# Cap reach at impressions to ensure frequency >= 1
reach_count = np.minimum(np.round(reach_count, 0), spend_df.loc[channel_idx, "lifetime_impressions"])
# Reach cannot exceed total population
if population is not None:
reach_count = np.minimum(reach_count, population)

spend_df.loc[channel_idx, "lifetime_reach"] = reach_count
# Avoid division by zero
denom = np.maximum(spend_df.loc[channel_idx, "lifetime_reach"], 1)
spend_df.loc[channel_idx, "lifetime_frequency"] = spend_df.loc[channel_idx, "lifetime_impressions"] / denom

# Final pass to ensure frequency is at least 1 if impressions > 0
mask = (spend_df["channel"] == channel) & (spend_df["lifetime_impressions"] > 0)
spend_df.loc[mask, "lifetime_frequency"] = np.maximum(spend_df.loc[mask, "lifetime_frequency"], 1.0)
# Re-calculate reach if we adjusted frequency to 1.0
spend_df.loc[mask, "lifetime_reach"] = np.minimum(spend_df.loc[mask, "lifetime_reach"], spend_df.loc[mask, "lifetime_impressions"])

spend_df["daily_spend"] = np.round( spend_df["spend_channel"] / self.basic_params.frequency_of_campaigns, 2)
spend_df["daily_impressions"] = np.round( spend_df["lifetime_impressions"] / self.basic_params.frequency_of_campaigns, 0,)
Expand Down Expand Up @@ -517,11 +539,15 @@ def finalize_output(self, mmm_df: pd.DataFrame, params: OutputParameters) -> pd.
pd.DataFrame: Finalized output DataFrame"""
metric_cols = [f"{channel}_impressions" for channel in self.basic_params.channels_impressions]
[metric_cols.append(f"{channel}_clicks") for channel in self.basic_params.channels_clicks]
for channel in self.basic_params.channels_impressions:
for channel in self.basic_params.all_channels:
if f"{channel}_reach" in mmm_df.columns:
metric_cols.append(f"{channel}_reach")
if f"{channel}_frequency" in mmm_df.columns:
metric_cols.append(f"{channel}_frequency")
if f"{channel}_impressions" in mmm_df.columns and f"{channel}_impressions" not in metric_cols:
metric_cols.append(f"{channel}_impressions")
if f"{channel}_clicks" in mmm_df.columns and f"{channel}_clicks" not in metric_cols:
metric_cols.append(f"{channel}_clicks")
spend_cols = []
[spend_cols.append(f"{channel}_spend") for channel in self.basic_params.all_channels]

Expand Down
2 changes: 1 addition & 1 deletion tests/test_edge_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,4 @@ def test_reproducibility():
result2 = sim2.run_with_config(config)

pd.testing.assert_frame_equal(result1.df, result2.df)
assert result1.channel_roi == result2.channel_roi
pd.testing.assert_series_equal(pd.Series(result1.channel_roi), pd.Series(result2.channel_roi))
137 changes: 137 additions & 0 deletions tests/test_logical_guards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import pytest
import pandas as pd
import numpy as np
from pysimmmulator.simulate import Simulate

def test_ctr_guard():
config = {
"basic_params": {
"years": 1, "channels_impressions": ["TV", "Search"], "channels_clicks": ["Search"],
"frequency_of_campaigns": 7, "start_date": "2023/01/01",
"true_cvr": {"TV": 0.01, "Search": 0.01}, "revenue_per_conv": 100.0,
},
"baseline_params": {
"base_p": 1000, "trend_p": 100, "temp_var": 10, "temp_coef_mean": 1.0, "temp_coef_sd": 0.1, "error_std": 50,
},
"ad_spend_params": {
"campaign_spend_mean": 5000, "campaign_spend_std": 500,
"max_min_proportion_on_each_channel": {"TV": {"min": 0.5, "max": 0.5}},
},
"media_params": {
"true_cpm": {"TV": 1000.0}, # Very high CPM -> very few impressions
"true_cpc": {"Search": 0.001}, # Very low CPC -> many clicks
"noisy_cpm_cpc": {
"TV": {"loc": 0.0, "scale": 0.1},
"Search": {"loc": 0.0, "scale": 0.0001},
},
},
"cvr_params": { "noisy_cvr": { "TV": {"loc": 1.0, "scale": 0.1}, "Search": {"loc": 1.0, "scale": 0.1} } },
"adstock_params": {
"adstock": { "TV": {"type": "geometric", "params": {"lambda": 0.5}}, "Search": {"type": "geometric", "params": {"lambda": 0.5}} },
"saturation": { "TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}}, "Search": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}} },
},
"output_params": { "aggregation_level": "daily" }
}

sim = Simulate()
result = sim.run_with_config(config)
df = result.df

# Search clicks should be capped by Search impressions (which we didn't specify but are calculated)
# Actually, if Search is in channels_clicks, it might not have impressions if true_cpm is not provided.
# Let's check Search impressions.
assert (df["Search_clicks"] <= df["Search_impressions"]).all()

def test_reach_population_guard():
config = {
"basic_params": {
"years": 1, "channels_impressions": ["TV"], "channels_clicks": [],
"frequency_of_campaigns": 7, "start_date": "2023/01/01",
"true_cvr": {"TV": 0.01}, "revenue_per_conv": 100.0,
},
"baseline_params": {
"base_p": 1000, "trend_p": 100, "temp_var": 10, "temp_coef_mean": 1.0, "temp_coef_sd": 0.1, "error_std": 50,
},
"ad_spend_params": {
"campaign_spend_mean": 500000, "campaign_spend_std": 50000,
"max_min_proportion_on_each_channel": {},
},
"media_params": {
"true_cpm": {"TV": 1.0}, # Low CPM -> many impressions
"true_cpc": {},
"noisy_cpm_cpc": {"TV": {"loc": 0.0, "scale": 0.1}},
"true_reach_frequency": {
"TV": {"reach": 2.0} # Target reach is 200% of population (logical impossible)
}
},
"cvr_params": { "noisy_cvr": { "TV": {"loc": 1.0, "scale": 0.1} } },
"adstock_params": {
"adstock": { "TV": {"type": "geometric", "params": {"lambda": 0.5}} },
"saturation": { "TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}} },
},
"output_params": { "aggregation_level": "daily" },
"geo_params": {
"total_population": 100000,
"count": 1
}
}

sim = Simulate()
result = sim.run_with_config(config)
df = result.df

# Reach should not exceed total population
# Summing across geos for each day
daily_reach = df.groupby("date")["TV_reach"].sum()
assert (daily_reach <= 100000).all()

def test_geo_reach_guard():
config = {
"basic_params": {
"years": 1, "channels_impressions": ["TV"], "channels_clicks": [],
"frequency_of_campaigns": 7, "start_date": "2023/01/01",
"true_cvr": {"TV": 0.01}, "revenue_per_conv": 100.0,
},
"baseline_params": {
"base_p": 1000, "trend_p": 100, "temp_var": 10, "temp_coef_mean": 1.0, "temp_coef_sd": 0.1, "error_std": 50,
},
"ad_spend_params": {
"campaign_spend_mean": 500000, "campaign_spend_std": 50000,
"max_min_proportion_on_each_channel": {},
},
"media_params": {
"true_cpm": {"TV": 1.0},
"true_cpc": {},
"noisy_cpm_cpc": {"TV": {"loc": 0.0, "scale": 0.1}},
"true_reach_frequency": {
"TV": {"reach": 0.9} # 90% reach
}
},
"cvr_params": { "noisy_cvr": { "TV": {"loc": 1.0, "scale": 0.1} } },
"adstock_params": {
"adstock": { "TV": {"type": "geometric", "params": {"lambda": 0.5}} },
"saturation": { "TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}} },
},
"output_params": { "aggregation_level": "daily" },
"geo_params": {
"total_population": 100000,
"geo_specs": {
"SmallGeo": {"loc": 0.01, "scale": 0.001} # Very small geo
},
"count": 2 # SmallGeo + one random
}
}

sim = Simulate()
result = sim.run_with_config(config)
df = result.df # result.df has geo_name and date in index

# For SmallGeo, reach should not exceed its population
# Get population of SmallGeo
from pysimmmulator.geos import Geos
geos = Geos(total_population=100000)
geo_details = geos(geo_specs=config["geo_params"]["geo_specs"], count=2)
small_geo_pop = geo_details["SmallGeo"]

small_geo_data = df.xs("SmallGeo", level="geo_name")
assert (small_geo_data["TV_reach"] <= small_geo_pop).all()
53 changes: 53 additions & 0 deletions tests/test_reach_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,56 @@ def test_reach_as_proportion():
test_df = df[df["TV_impressions"] > 0]
daily_total_reach = test_df.groupby("date")["TV_reach"].sum()
assert daily_total_reach.mean() == pytest.approx(100000 / 7, abs=10)

def test_frequency_min_one():
# Force a case where reach could potentially exceed impressions
config = {
"basic_params": {
"years": 1,
"channels_impressions": ["TV"],
"channels_clicks": [],
"frequency_of_campaigns": 7,
"start_date": "2023/01/01",
"true_cvr": {"TV": 0.01},
"revenue_per_conv": 100.0,
},
"baseline_params": {
"base_p": 1000, "trend_p": 100, "temp_var": 10, "temp_coef_mean": 1.0, "temp_coef_sd": 0.1, "error_std": 50,
},
"ad_spend_params": {
"campaign_spend_mean": 100, # Very low spend
"campaign_spend_std": 10,
"max_min_proportion_on_each_channel": {},
},
"media_params": {
"true_cpm": {"TV": 100.0}, # High CPM -> very few impressions
"true_cpc": {},
"noisy_cpm_cpc": {"TV": {"loc": 0.0, "scale": 1.0}},
"true_reach_frequency": {
"TV": {"reach": 1.0} # 100% reach
}
},
"cvr_params": { "noisy_cvr": { "TV": {"loc": 1.0, "scale": 0.1} } },
"adstock_params": {
"adstock": { "TV": {"type": "geometric", "params": {"lambda": 0.5}} },
"saturation": { "TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}} },
},
"output_params": { "aggregation_level": "daily" },
"geo_params": {
"total_population": 1000000,
"count": 1
}
}

sim = Simulate()
result = sim.run_with_config(config)
df = result.df

# Filter where impressions > 0
test_df = df[df["TV_impressions"] > 0]
assert len(test_df) > 0

# Frequency should never be below 1
assert (test_df["TV_frequency"] >= 1.0).all()
# Reach should never exceed impressions
assert (test_df["TV_reach"] <= test_df["TV_impressions"]).all()
Loading