From 89b190932b1837de7b4cd019b8ef69365be45577 Mon Sep 17 00:00:00 2001 From: RyanAugust Date: Tue, 19 May 2026 19:49:59 +0000 Subject: [PATCH 1/3] > 1 frequency --- src/pysimmmulator/simulate.py | 14 +++++++-- tests/test_reach_frequency.py | 53 +++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/src/pysimmmulator/simulate.py b/src/pysimmmulator/simulate.py index bf7f164..1b6f562 100644 --- a/src/pysimmmulator/simulate.py +++ b/src/pysimmmulator/simulate.py @@ -275,6 +275,7 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd. if "frequency" in rf_config: freq = rf_config["frequency"] spend_df.loc[channel_idx, "lifetime_frequency"] = freq + # reach = impressions / frequency. Since frequency >= 1, reach <= impressions. spend_df.loc[channel_idx, "lifetime_reach"] = np.round(spend_df.loc[channel_idx, "lifetime_impressions"] / freq, 0) elif "reach" in rf_config: reach_val = rf_config["reach"] @@ -288,8 +289,17 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd. else: reach_count = reach_val - spend_df.loc[channel_idx, "lifetime_reach"] = np.round(reach_count, 0) - spend_df.loc[channel_idx, "lifetime_frequency"] = spend_df.loc[channel_idx, "lifetime_impressions"] / np.maximum(spend_df.loc[channel_idx, "lifetime_reach"], 1) + # Cap reach at impressions to ensure frequency >= 1 + spend_df.loc[channel_idx, "lifetime_reach"] = np.minimum(np.round(reach_count, 0), spend_df.loc[channel_idx, "lifetime_impressions"]) + # Avoid division by zero + denom = np.maximum(spend_df.loc[channel_idx, "lifetime_reach"], 1) + spend_df.loc[channel_idx, "lifetime_frequency"] = spend_df.loc[channel_idx, "lifetime_impressions"] / denom + + # Final pass to ensure frequency is at least 1 if impressions > 0 + mask = (spend_df["channel"] == channel) & (spend_df["lifetime_impressions"] > 0) + spend_df.loc[mask, "lifetime_frequency"] = np.maximum(spend_df.loc[mask, "lifetime_frequency"], 1.0) + # Re-calculate reach if we adjusted frequency to 1.0 + spend_df.loc[mask, "lifetime_reach"] = np.minimum(spend_df.loc[mask, "lifetime_reach"], spend_df.loc[mask, "lifetime_impressions"]) spend_df["daily_spend"] = np.round( spend_df["spend_channel"] / self.basic_params.frequency_of_campaigns, 2) spend_df["daily_impressions"] = np.round( spend_df["lifetime_impressions"] / self.basic_params.frequency_of_campaigns, 0,) diff --git a/tests/test_reach_frequency.py b/tests/test_reach_frequency.py index e50eff1..59bc422 100644 --- a/tests/test_reach_frequency.py +++ b/tests/test_reach_frequency.py @@ -128,3 +128,56 @@ def test_reach_as_proportion(): test_df = df[df["TV_impressions"] > 0] daily_total_reach = test_df.groupby("date")["TV_reach"].sum() assert daily_total_reach.mean() == pytest.approx(100000 / 7, abs=10) + +def test_frequency_min_one(): + # Force a case where reach could potentially exceed impressions + config = { + "basic_params": { + "years": 1, + "channels_impressions": ["TV"], + "channels_clicks": [], + "frequency_of_campaigns": 7, + "start_date": "2023/01/01", + "true_cvr": {"TV": 0.01}, + "revenue_per_conv": 100.0, + }, + "baseline_params": { + "base_p": 1000, "trend_p": 100, "temp_var": 10, "temp_coef_mean": 1.0, "temp_coef_sd": 0.1, "error_std": 50, + }, + "ad_spend_params": { + "campaign_spend_mean": 100, # Very low spend + "campaign_spend_std": 10, + "max_min_proportion_on_each_channel": {}, + }, + "media_params": { + "true_cpm": {"TV": 100.0}, # High CPM -> very few impressions + "true_cpc": {}, + "noisy_cpm_cpc": {"TV": {"loc": 0.0, "scale": 1.0}}, + "true_reach_frequency": { + "TV": {"reach": 1.0} # 100% reach + } + }, + "cvr_params": { "noisy_cvr": { "TV": {"loc": 1.0, "scale": 0.1} } }, + "adstock_params": { + "adstock": { "TV": {"type": "geometric", "params": {"lambda": 0.5}} }, + "saturation": { "TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}} }, + }, + "output_params": { "aggregation_level": "daily" }, + "geo_params": { + "total_population": 1000000, + "count": 1 + } + } + + sim = Simulate() + result = sim.run_with_config(config) + df = result.df + + # Filter where impressions > 0 + test_df = df[df["TV_impressions"] > 0] + assert len(test_df) > 0 + + # Frequency should never be below 1 + assert (test_df["TV_frequency"] >= 1.0).all() + # Reach should never exceed impressions + assert (test_df["TV_reach"] <= test_df["TV_impressions"]).all() From f0c8d8cc639f7a2eb144cb9411d417b5e962abef Mon Sep 17 00:00:00 2001 From: RyanAugust Date: Tue, 19 May 2026 19:55:36 +0000 Subject: [PATCH 2/3] add logic tests for generated data --- src/pysimmmulator/geos.py | 6 ++ src/pysimmmulator/param_handlers.py | 18 ++-- src/pysimmmulator/simulate.py | 32 +++++-- tests/test_edge_cases.py | 2 +- tests/test_logical_guards.py | 137 ++++++++++++++++++++++++++++ 5 files changed, 177 insertions(+), 18 deletions(-) create mode 100644 tests/test_logical_guards.py diff --git a/src/pysimmmulator/geos.py b/src/pysimmmulator/geos.py index 52d893e..4c9e426 100644 --- a/src/pysimmmulator/geos.py +++ b/src/pysimmmulator/geos.py @@ -153,6 +153,12 @@ def distribute_to_geos( if any(perf_spec) != 0.0 and "total_revenue" in geo_dataframe.columns: geo_dataframe["total_revenue"] *= ( 1 + abs(rng.normal(loc=pop_pct * perf_spec[0], scale=perf_spec[1]))) geo_dataframe["geo_name"] = geo_name + + # Ensure reach in geo does not exceed geo population + geo_reach_cols = [c for c in geo_dataframe.columns if "reach" in c] + if geo_reach_cols: + geo_dataframe[geo_reach_cols] = np.minimum(geo_dataframe[geo_reach_cols], geo_pop) + geo_dataframes.append(geo_dataframe) final = pd.concat(geo_dataframes, axis=0) final = final.reset_index().set_index(["geo_name", "date"]) diff --git a/src/pysimmmulator/param_handlers.py b/src/pysimmmulator/param_handlers.py index 9af6c84..2eaa212 100644 --- a/src/pysimmmulator/param_handlers.py +++ b/src/pysimmmulator/param_handlers.py @@ -39,7 +39,7 @@ def check(self): > 0), "You entered less than 1 year. Must generate more than a years worth of data" if self.true_cvr is not None: assert len(self.true_cvr.keys()) == len( - self.all_channels + set(self.all_channels) ), "True CVR must have equal number of entries as channel impressions and channel clicks" for cvr in self.true_cvr.values(): assert ( @@ -164,8 +164,8 @@ def check(self, basic_params: BasicParameters): Args: basic_params (basic_parameters): Previously submitted parameters as required by the simmmulate class """ - assert sorted(self.true_cpmcpc_channels) == sorted( - basic_params.all_channels + assert sorted(set(self.true_cpmcpc_channels)) == sorted( + set(basic_params.all_channels) ), "Channels declared within true_cpm & true_cpc must be the same as original base channel input" for val in self.true_cpm.values(): assert isinstance(val, float), "cpm values must be of type float" @@ -174,8 +174,8 @@ def check(self, basic_params: BasicParameters): assert isinstance(val, float), "cpc values must be of type float" assert val > 0, "CPC values must be greater than 0" - assert sorted(self.noise_channels) == sorted( - basic_params.all_channels + assert sorted(set(self.noise_channels)) == sorted( + set(basic_params.all_channels) ), "Channels declared within noisy_cpm_cpc must be the same as original base channel input" if self.true_reach_frequency: @@ -215,8 +215,8 @@ def check(self, basic_params: BasicParameters): Args: basic_params (basic_parameters): Previously submitted parameters as required by the simmmulate class """ - assert sorted(self.noise_channels) == sorted( - basic_params.all_channels + assert sorted(set(self.noise_channels)) == sorted( + set(basic_params.all_channels) ), "Channels declared within noisy_cpm_cpc must be the same as original base channel input" @dataclass @@ -250,11 +250,11 @@ def check(self, basic_params: BasicParameters): basic_params (basic_parameters): Previously submitted parameters as required by the simmmulate class """ assert sorted(list(self.adstock.keys())) == sorted( - basic_params.all_channels + set(basic_params.all_channels) ), "Channels declared within adstock must be the same as original base channel input" assert sorted(list(self.saturation.keys())) == sorted( - basic_params.all_channels + set(basic_params.all_channels) ), "Channels declared within saturation must be the same as original base channel input" @dataclass diff --git a/src/pysimmmulator/simulate.py b/src/pysimmmulator/simulate.py index 1b6f562..5169ece 100644 --- a/src/pysimmmulator/simulate.py +++ b/src/pysimmmulator/simulate.py @@ -244,13 +244,13 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd. **params.noisy_cpm_cpc[channel], low=-min(params.true_cpm.get(channel, np.inf), params.true_cpc.get(channel, np.inf))) - channel_true_cpm_value = (params.true_cpm[channel] if channel in params.true_cpm.keys() else np.nan) - channel_noisy_cpm_value = (params.true_cpm[channel] + channel_noise if channel in params.true_cpm.keys() else np.nan) + channel_true_cpm_value = (params.true_cpm[channel] if channel in params.true_cpm.keys() else 1e10) # Default to very high CPM + channel_noisy_cpm_value = (channel_true_cpm_value + channel_noise if channel in params.true_cpm.keys() else 1e10) spend_df.loc[channel_idx, "true_cpm"] = channel_true_cpm_value spend_df.loc[channel_idx, "noisy_cpm"] = channel_noisy_cpm_value - channel_true_cpc_value = (params.true_cpc[channel] if channel in params.true_cpc.keys() else np.nan) - channel_noisy_cpc_value = (params.true_cpc[channel] + channel_noise if channel in params.true_cpc.keys() else np.nan) + channel_true_cpc_value = (params.true_cpc[channel] if channel in params.true_cpc.keys() else 1e10) # Default to very high CPC + channel_noisy_cpc_value = (channel_true_cpc_value + channel_noise if channel in params.true_cpc.keys() else 1e10) spend_df.loc[channel_idx, "true_cpc"] = channel_true_cpc_value spend_df.loc[channel_idx, "noisy_cpc"] = channel_noisy_cpc_value @@ -262,12 +262,17 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd. spend_df["lifetime_impressions"] = np.round( spend_df["spend_channel"] / spend_df["noisy_cpm"] * 1000, 0) spend_df["lifetime_clicks"] = np.round( spend_df["spend_channel"] / spend_df["noisy_cpc"], 0) + # CTR cannot exceed 100% + # Handle NaN for clicks (some channels might only have impressions) + mask = ~np.isnan(spend_df["lifetime_clicks"]) & ~np.isnan(spend_df["lifetime_impressions"]) + spend_df.loc[mask, "lifetime_clicks"] = np.minimum(spend_df.loc[mask, "lifetime_clicks"], spend_df.loc[mask, "lifetime_impressions"]) # Reach and Frequency calculation spend_df["lifetime_reach"] = np.nan spend_df["lifetime_frequency"] = np.nan if params.true_reach_frequency: + population = getattr(self, "total_population", None) for channel in params.reach_frequency_channels: channel_idx = spend_df[spend_df["channel"] == channel].index rf_config = params.true_reach_frequency[channel] @@ -276,11 +281,13 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd. freq = rf_config["frequency"] spend_df.loc[channel_idx, "lifetime_frequency"] = freq # reach = impressions / frequency. Since frequency >= 1, reach <= impressions. - spend_df.loc[channel_idx, "lifetime_reach"] = np.round(spend_df.loc[channel_idx, "lifetime_impressions"] / freq, 0) + reach_count = np.round(spend_df.loc[channel_idx, "lifetime_impressions"] / freq, 0) + if population is not None: + reach_count = np.minimum(reach_count, population) + spend_df.loc[channel_idx, "lifetime_reach"] = reach_count elif "reach" in rf_config: reach_val = rf_config["reach"] if reach_val <= 1.0: - population = getattr(self, "total_population", None) if population is None: logger.warning(f"Reach for {channel} is <= 1.0 but no total_population found. Treating as absolute reach count.") reach_count = reach_val @@ -290,7 +297,12 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd. reach_count = reach_val # Cap reach at impressions to ensure frequency >= 1 - spend_df.loc[channel_idx, "lifetime_reach"] = np.minimum(np.round(reach_count, 0), spend_df.loc[channel_idx, "lifetime_impressions"]) + reach_count = np.minimum(np.round(reach_count, 0), spend_df.loc[channel_idx, "lifetime_impressions"]) + # Reach cannot exceed total population + if population is not None: + reach_count = np.minimum(reach_count, population) + + spend_df.loc[channel_idx, "lifetime_reach"] = reach_count # Avoid division by zero denom = np.maximum(spend_df.loc[channel_idx, "lifetime_reach"], 1) spend_df.loc[channel_idx, "lifetime_frequency"] = spend_df.loc[channel_idx, "lifetime_impressions"] / denom @@ -527,11 +539,15 @@ def finalize_output(self, mmm_df: pd.DataFrame, params: OutputParameters) -> pd. pd.DataFrame: Finalized output DataFrame""" metric_cols = [f"{channel}_impressions" for channel in self.basic_params.channels_impressions] [metric_cols.append(f"{channel}_clicks") for channel in self.basic_params.channels_clicks] - for channel in self.basic_params.channels_impressions: + for channel in self.basic_params.all_channels: if f"{channel}_reach" in mmm_df.columns: metric_cols.append(f"{channel}_reach") if f"{channel}_frequency" in mmm_df.columns: metric_cols.append(f"{channel}_frequency") + if f"{channel}_impressions" in mmm_df.columns and f"{channel}_impressions" not in metric_cols: + metric_cols.append(f"{channel}_impressions") + if f"{channel}_clicks" in mmm_df.columns and f"{channel}_clicks" not in metric_cols: + metric_cols.append(f"{channel}_clicks") spend_cols = [] [spend_cols.append(f"{channel}_spend") for channel in self.basic_params.all_channels] diff --git a/tests/test_edge_cases.py b/tests/test_edge_cases.py index 62dec53..c204413 100644 --- a/tests/test_edge_cases.py +++ b/tests/test_edge_cases.py @@ -174,4 +174,4 @@ def test_reproducibility(): result2 = sim2.run_with_config(config) pd.testing.assert_frame_equal(result1.df, result2.df) - assert result1.channel_roi == result2.channel_roi + pd.testing.assert_series_equal(pd.Series(result1.channel_roi), pd.Series(result2.channel_roi)) diff --git a/tests/test_logical_guards.py b/tests/test_logical_guards.py new file mode 100644 index 0000000..c42f52b --- /dev/null +++ b/tests/test_logical_guards.py @@ -0,0 +1,137 @@ +import pytest +import pandas as pd +import numpy as np +from pysimmmulator.simulate import Simulate + +def test_ctr_guard(): + config = { + "basic_params": { + "years": 1, "channels_impressions": ["TV", "Search"], "channels_clicks": ["Search"], + "frequency_of_campaigns": 7, "start_date": "2023/01/01", + "true_cvr": {"TV": 0.01, "Search": 0.01}, "revenue_per_conv": 100.0, + }, + "baseline_params": { + "base_p": 1000, "trend_p": 100, "temp_var": 10, "temp_coef_mean": 1.0, "temp_coef_sd": 0.1, "error_std": 50, + }, + "ad_spend_params": { + "campaign_spend_mean": 5000, "campaign_spend_std": 500, + "max_min_proportion_on_each_channel": {"TV": {"min": 0.5, "max": 0.5}}, + }, + "media_params": { + "true_cpm": {"TV": 1000.0}, # Very high CPM -> very few impressions + "true_cpc": {"Search": 0.001}, # Very low CPC -> many clicks + "noisy_cpm_cpc": { + "TV": {"loc": 0.0, "scale": 0.1}, + "Search": {"loc": 0.0, "scale": 0.0001}, + }, + }, + "cvr_params": { "noisy_cvr": { "TV": {"loc": 1.0, "scale": 0.1}, "Search": {"loc": 1.0, "scale": 0.1} } }, + "adstock_params": { + "adstock": { "TV": {"type": "geometric", "params": {"lambda": 0.5}}, "Search": {"type": "geometric", "params": {"lambda": 0.5}} }, + "saturation": { "TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}}, "Search": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}} }, + }, + "output_params": { "aggregation_level": "daily" } + } + + sim = Simulate() + result = sim.run_with_config(config) + df = result.df + + # Search clicks should be capped by Search impressions (which we didn't specify but are calculated) + # Actually, if Search is in channels_clicks, it might not have impressions if true_cpm is not provided. + # Let's check Search impressions. + assert (df["Search_clicks"] <= df["Search_impressions"]).all() + +def test_reach_population_guard(): + config = { + "basic_params": { + "years": 1, "channels_impressions": ["TV"], "channels_clicks": [], + "frequency_of_campaigns": 7, "start_date": "2023/01/01", + "true_cvr": {"TV": 0.01}, "revenue_per_conv": 100.0, + }, + "baseline_params": { + "base_p": 1000, "trend_p": 100, "temp_var": 10, "temp_coef_mean": 1.0, "temp_coef_sd": 0.1, "error_std": 50, + }, + "ad_spend_params": { + "campaign_spend_mean": 500000, "campaign_spend_std": 50000, + "max_min_proportion_on_each_channel": {}, + }, + "media_params": { + "true_cpm": {"TV": 1.0}, # Low CPM -> many impressions + "true_cpc": {}, + "noisy_cpm_cpc": {"TV": {"loc": 0.0, "scale": 0.1}}, + "true_reach_frequency": { + "TV": {"reach": 2.0} # Target reach is 200% of population (logical impossible) + } + }, + "cvr_params": { "noisy_cvr": { "TV": {"loc": 1.0, "scale": 0.1} } }, + "adstock_params": { + "adstock": { "TV": {"type": "geometric", "params": {"lambda": 0.5}} }, + "saturation": { "TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}} }, + }, + "output_params": { "aggregation_level": "daily" }, + "geo_params": { + "total_population": 100000, + "count": 1 + } + } + + sim = Simulate() + result = sim.run_with_config(config) + df = result.df + + # Reach should not exceed total population + # Summing across geos for each day + daily_reach = df.groupby("date")["TV_reach"].sum() + assert (daily_reach <= 100000).all() + +def test_geo_reach_guard(): + config = { + "basic_params": { + "years": 1, "channels_impressions": ["TV"], "channels_clicks": [], + "frequency_of_campaigns": 7, "start_date": "2023/01/01", + "true_cvr": {"TV": 0.01}, "revenue_per_conv": 100.0, + }, + "baseline_params": { + "base_p": 1000, "trend_p": 100, "temp_var": 10, "temp_coef_mean": 1.0, "temp_coef_sd": 0.1, "error_std": 50, + }, + "ad_spend_params": { + "campaign_spend_mean": 500000, "campaign_spend_std": 50000, + "max_min_proportion_on_each_channel": {}, + }, + "media_params": { + "true_cpm": {"TV": 1.0}, + "true_cpc": {}, + "noisy_cpm_cpc": {"TV": {"loc": 0.0, "scale": 0.1}}, + "true_reach_frequency": { + "TV": {"reach": 0.9} # 90% reach + } + }, + "cvr_params": { "noisy_cvr": { "TV": {"loc": 1.0, "scale": 0.1} } }, + "adstock_params": { + "adstock": { "TV": {"type": "geometric", "params": {"lambda": 0.5}} }, + "saturation": { "TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}} }, + }, + "output_params": { "aggregation_level": "daily" }, + "geo_params": { + "total_population": 100000, + "geo_specs": { + "SmallGeo": {"loc": 0.01, "scale": 0.001} # Very small geo + }, + "count": 2 # SmallGeo + one random + } + } + + sim = Simulate() + result = sim.run_with_config(config) + df = result.df # result.df has geo_name and date in index + + # For SmallGeo, reach should not exceed its population + # Get population of SmallGeo + from pysimmmulator.geos import Geos + geos = Geos(total_population=100000) + geo_details = geos(geo_specs=config["geo_params"]["geo_specs"], count=2) + small_geo_pop = geo_details["SmallGeo"] + + small_geo_data = df.xs("SmallGeo", level="geo_name") + assert (small_geo_data["TV_reach"] <= small_geo_pop).all() From 6cf6b7018ea2602a47255c41414347581ab6bb4f Mon Sep 17 00:00:00 2001 From: RyanAugust Date: Thu, 21 May 2026 11:01:59 -0400 Subject: [PATCH 3/3] remove whitespace --- src/pysimmmulator/geos.py | 4 ++-- src/pysimmmulator/simulate.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pysimmmulator/geos.py b/src/pysimmmulator/geos.py index 4c9e426..46a5a4d 100644 --- a/src/pysimmmulator/geos.py +++ b/src/pysimmmulator/geos.py @@ -153,12 +153,12 @@ def distribute_to_geos( if any(perf_spec) != 0.0 and "total_revenue" in geo_dataframe.columns: geo_dataframe["total_revenue"] *= ( 1 + abs(rng.normal(loc=pop_pct * perf_spec[0], scale=perf_spec[1]))) geo_dataframe["geo_name"] = geo_name - + # Ensure reach in geo does not exceed geo population geo_reach_cols = [c for c in geo_dataframe.columns if "reach" in c] if geo_reach_cols: geo_dataframe[geo_reach_cols] = np.minimum(geo_dataframe[geo_reach_cols], geo_pop) - + geo_dataframes.append(geo_dataframe) final = pd.concat(geo_dataframes, axis=0) final = final.reset_index().set_index(["geo_name", "date"]) diff --git a/src/pysimmmulator/simulate.py b/src/pysimmmulator/simulate.py index 5169ece..c1dd619 100644 --- a/src/pysimmmulator/simulate.py +++ b/src/pysimmmulator/simulate.py @@ -301,12 +301,12 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd. # Reach cannot exceed total population if population is not None: reach_count = np.minimum(reach_count, population) - + spend_df.loc[channel_idx, "lifetime_reach"] = reach_count # Avoid division by zero denom = np.maximum(spend_df.loc[channel_idx, "lifetime_reach"], 1) spend_df.loc[channel_idx, "lifetime_frequency"] = spend_df.loc[channel_idx, "lifetime_impressions"] / denom - + # Final pass to ensure frequency is at least 1 if impressions > 0 mask = (spend_df["channel"] == channel) & (spend_df["lifetime_impressions"] > 0) spend_df.loc[mask, "lifetime_frequency"] = np.maximum(spend_df.loc[mask, "lifetime_frequency"], 1.0)