Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/pysimmmulator/geos.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,26 +133,34 @@ def distribute_to_geos(
perf_spec (tuple[float, float]): Parameters to control the normal distribution function for allocation of performance across geographies
Returns:
(pd.DataFrame): simulated MMM data divided into geographies as specified"""
mmm_input = mmm_input.dropna()
if "date" in mmm_input.columns:
mmm_input = mmm_input.set_index("date")

geo_dataframes = []
total_population: int = sum(geo_details.values())
rng = rng if rng is not None else np.random.default_rng(seed=random_seed)
media_cols = [w for w in mmm_input.columns if "impressions" in w or "clicks" in w]
media_cols = [w for w in mmm_input.columns if "impressions" in w or "clicks" in w or "reach" in w]
count_cols = [w for w in mmm_input.columns if any(x in w for x in ["impressions", "clicks", "reach", "spend", "revenue", "conversions"])]

for geo_name, geo_pop in geo_details.items():
pop_pct = geo_pop / total_population
geo_prop = pop_pct * (1 + abs(rng.normal(loc=pop_pct * dist_spec[0], scale=dist_spec[1])))
geo_dataframe = mmm_input.copy()
geo_dataframe *= geo_prop
if any(media_cost_spec) != 0.0: geo_dataframe[media_cols] *= ( 1 + abs(rng.normal(loc=pop_pct * media_cost_spec[0], scale=media_cost_spec[1])))
if any(perf_spec) != 0.0: geo_dataframe["total_revenue"] *= ( 1 + abs(rng.normal(loc=pop_pct * perf_spec[0], scale=perf_spec[1])))
if count_cols:
geo_dataframe[count_cols] *= geo_prop
if any(media_cost_spec) != 0.0 and media_cols:
geo_dataframe[media_cols] *= ( 1 + abs(rng.normal(loc=pop_pct * media_cost_spec[0], scale=media_cost_spec[1])))
if any(perf_spec) != 0.0 and "total_revenue" in geo_dataframe.columns:
geo_dataframe["total_revenue"] *= ( 1 + abs(rng.normal(loc=pop_pct * perf_spec[0], scale=perf_spec[1])))
geo_dataframe["geo_name"] = geo_name
geo_dataframes.append(geo_dataframe)
final = pd.concat(geo_dataframes, axis=0)
final = final.reset_index().set_index(["geo_name", "date"])
final[media_cols] *= mmm_input[media_cols].sum() / final[media_cols].fillna(0.0).sum()
final["total_revenue"] *= mmm_input["total_revenue"].sum() / final["total_revenue"].sum()
final[["total_revenue"] + media_cols] = final[["total_revenue"] + media_cols].round(0)
return final.dropna()
if media_cols:
final[media_cols] *= mmm_input[media_cols].sum() / final[media_cols].fillna(0.0).sum()
if "total_revenue" in final.columns:
final["total_revenue"] *= mmm_input["total_revenue"].sum() / final["total_revenue"].sum()
final["total_revenue"] = final["total_revenue"].round(0)
if media_cols:
final[media_cols] = final[media_cols].round(0)
return final
14 changes: 13 additions & 1 deletion src/pysimmmulator/param_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,18 @@ class MediaParameters:
Args:
true_cpm (dict): Specifies the true Cost per Impression (CPM) of each channel (noise will be added to this to simulate number of impressions)
true_cpc (dict): Specifies the true Cost per Click (CPC) of each channel (noise will be added to this to simulate number of clicks)
noisy_cpm_cpc (dict): Specifies the bias and scale of noise added to the true value CPM or CPC for each channel."""
noisy_cpm_cpc (dict): Specifies the bias and scale of noise added to the true value CPM or CPC for each channel.
true_reach_frequency (Optional[dict]): Specifies the true reach or frequency of each channel. If reach is provided, frequency is calculated, and vice versa."""

true_cpm: dict
true_cpc: dict
noisy_cpm_cpc: dict
true_reach_frequency: Optional[dict] = None

def __post_init__(self):
self.true_cpmcpc_channels = list(self.true_cpm.keys()) + list(self.true_cpc.keys())
self.noise_channels = list(self.noisy_cpm_cpc.keys())
self.reach_frequency_channels = list(self.true_reach_frequency.keys()) if self.true_reach_frequency else []

def check(self, basic_params: BasicParameters):
"""Validates media parameters parameters agianst previously constructed basic
Expand All @@ -175,6 +178,15 @@ def check(self, basic_params: BasicParameters):
basic_params.all_channels
), "Channels declared within noisy_cpm_cpc must be the same as original base channel input"

if self.true_reach_frequency:
for channel, config in self.true_reach_frequency.items():
assert channel in basic_params.all_channels, f"Channel {channel} in true_reach_frequency not found in basic_params"
assert ("reach" in config or "frequency" in config), f"Either 'reach' or 'frequency' must be specified for channel {channel} in true_reach_frequency"
if "reach" in config:
assert config["reach"] > 0, f"Reach for channel {channel} must be greater than 0"
if "frequency" in config:
assert config["frequency"] >= 1, f"Frequency for channel {channel} must be at least 1"

@dataclass
class CVRParameters:
"""Handler for loading in parameters used by simmmulate class to generate cvr data.
Expand Down
47 changes: 46 additions & 1 deletion src/pysimmmulator/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,39 @@ def simulate_media(self, spend_df: pd.DataFrame, params: MediaParameters) -> pd.
spend_df["lifetime_impressions"] = np.round( spend_df["spend_channel"] / spend_df["noisy_cpm"] * 1000, 0)
spend_df["lifetime_clicks"] = np.round( spend_df["spend_channel"] / spend_df["noisy_cpc"], 0)

# Reach and Frequency calculation
spend_df["lifetime_reach"] = np.nan
spend_df["lifetime_frequency"] = np.nan

if params.true_reach_frequency:
for channel in params.reach_frequency_channels:
channel_idx = spend_df[spend_df["channel"] == channel].index
rf_config = params.true_reach_frequency[channel]

if "frequency" in rf_config:
freq = rf_config["frequency"]
spend_df.loc[channel_idx, "lifetime_frequency"] = freq
spend_df.loc[channel_idx, "lifetime_reach"] = np.round(spend_df.loc[channel_idx, "lifetime_impressions"] / freq, 0)
elif "reach" in rf_config:
reach_val = rf_config["reach"]
if reach_val <= 1.0:
population = getattr(self, "total_population", None)
if population is None:
logger.warning(f"Reach for {channel} is <= 1.0 but no total_population found. Treating as absolute reach count.")
reach_count = reach_val
else:
reach_count = reach_val * population
else:
reach_count = reach_val

spend_df.loc[channel_idx, "lifetime_reach"] = np.round(reach_count, 0)
spend_df.loc[channel_idx, "lifetime_frequency"] = spend_df.loc[channel_idx, "lifetime_impressions"] / np.maximum(spend_df.loc[channel_idx, "lifetime_reach"], 1)

spend_df["daily_spend"] = np.round( spend_df["spend_channel"] / self.basic_params.frequency_of_campaigns, 2)
spend_df["daily_impressions"] = np.round( spend_df["lifetime_impressions"] / self.basic_params.frequency_of_campaigns, 0,)
spend_df["daily_clicks"] = np.round( spend_df["lifetime_clicks"] / self.basic_params.frequency_of_campaigns, 0,)
spend_df["daily_reach"] = np.round( spend_df["lifetime_reach"] / self.basic_params.frequency_of_campaigns, 0)
spend_df["daily_frequency"] = spend_df["lifetime_frequency"]

logger.info("You have completed running step 3: Simulating media.")
return spend_df
Expand Down Expand Up @@ -305,7 +335,7 @@ def _reformat_for_mmm(self, spend_df: pd.DataFrame) -> pd.DataFrame:
mmm_df = pd.DataFrame({"date": date_backbone, "id_map": campaign_id_to_date_map})
mmm_df.set_index("id_map", inplace=True)

agg_media_df = spend_df.groupby(["channel", "campaign_id"]).sum()[["daily_impressions", "daily_clicks", "daily_spend", "noisy_cvr" ]]
agg_media_df = spend_df.groupby(["channel", "campaign_id"]).sum()[["daily_impressions", "daily_clicks", "daily_spend", "noisy_cvr", "daily_reach", "daily_frequency" ]]
agg_media_df = agg_media_df.unstack(level=0)
joined_columns = []
for _metric, _channel in agg_media_df.columns:
Expand All @@ -315,6 +345,10 @@ def _reformat_for_mmm(self, spend_df: pd.DataFrame) -> pd.DataFrame:
agg_media_df.columns = joined_columns

mmm_df = mmm_df.join(agg_media_df)
# Fill NAs for reach and frequency if they weren't generated for all channels
reach_cols = [c for c in mmm_df.columns if "_reach" in c]
freq_cols = [c for c in mmm_df.columns if "_frequency" in c]
mmm_df[reach_cols + freq_cols] = mmm_df[reach_cols + freq_cols].fillna(0.0)

logger.info("You have completed running step 5a: pivoting the data frame to an MMM format.")
return mmm_df
Expand Down Expand Up @@ -441,6 +475,11 @@ def consolidate_dataframe(self, mmm_df: pd.DataFrame, baseline_sales_df: pd.Data
for channel in self.basic_params.channels_impressions
]
[metric_cols.append(f"{channel}_clicks") for channel in self.basic_params.channels_clicks]
for channel in self.basic_params.channels_impressions:
if f"{channel}_reach" in mmm_df.columns:
metric_cols.append(f"{channel}_reach")
if f"{channel}_frequency" in mmm_df.columns:
metric_cols.append(f"{channel}_frequency")
spend_cols = []
[spend_cols.append(f"{channel}_spend") for channel in self.basic_params.all_channels]
conv_cols = []
Expand Down Expand Up @@ -478,6 +517,11 @@ def finalize_output(self, mmm_df: pd.DataFrame, params: OutputParameters) -> pd.
pd.DataFrame: Finalized output DataFrame"""
metric_cols = [f"{channel}_impressions" for channel in self.basic_params.channels_impressions]
[metric_cols.append(f"{channel}_clicks") for channel in self.basic_params.channels_clicks]
for channel in self.basic_params.channels_impressions:
if f"{channel}_reach" in mmm_df.columns:
metric_cols.append(f"{channel}_reach")
if f"{channel}_frequency" in mmm_df.columns:
metric_cols.append(f"{channel}_frequency")
spend_cols = []
[spend_cols.append(f"{channel}_spend") for channel in self.basic_params.all_channels]

Expand Down Expand Up @@ -519,6 +563,7 @@ def run_with_config(self, config: dict) -> SimulationResult:
from .load_parameters import create_all_parameters
params = create_all_parameters(config)
self.basic_params = params["basic_params"]
self.total_population = params["geo_params"].total_population if "geo_params" in params else None

baseline_sales_df = self.simulate_baseline(params["baseline_params"])
spend_df = self.simulate_ad_spend(baseline_sales_df=baseline_sales_df, params=params["ad_spend_params"])
Expand Down
42 changes: 32 additions & 10 deletions src/pysimmmulator/visualize.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,30 @@ def plot_clicks(self, df: pd.DataFrame, agg: str = None):
plot_cols = self._filter_columns(columns=plot_frame.columns.tolist(), filter_string='_clicks')
return self._plot_majors(plot_frame, columns=plot_cols)

def plot_reach(self, df: pd.DataFrame, agg: str = None):
"""Plot simulated reach data based on a passed date-wise aggregation

Args:
df (pd.DataFrame): DataFrame containing simulated data
agg (str): pick from ['daily', 'weekly', 'monthly', 'yearly'] to aggregate simulated data by"""
assert agg in self._valid_agg_levels, f"""Please select [{', '.join(self._valid_agg_levels)}] for your aggregation level.
{agg} is an invalid selection."""
plot_frame = self._plot_frame_overhead(df, agg_level=agg)
plot_cols = self._filter_columns(columns=plot_frame.columns.tolist(), filter_string='_reach')
return self._plot_majors(plot_frame, columns=plot_cols)

def plot_frequency(self, df: pd.DataFrame, agg: str = None):
"""Plot simulated frequency data based on a passed date-wise aggregation

Args:
df (pd.DataFrame): DataFrame containing simulated data
agg (str): pick from ['daily', 'weekly', 'monthly', 'yearly'] to aggregate simulated data by"""
assert agg in self._valid_agg_levels, f"""Please select [{', '.join(self._valid_agg_levels)}] for your aggregation level.
{agg} is an invalid selection."""
plot_frame = self._plot_frame_overhead(df, agg_level=agg)
plot_cols = self._filter_columns(columns=plot_frame.columns.tolist(), filter_string='_frequency')
return self._plot_majors(plot_frame, columns=plot_cols)

def plot_revenue(self, df: pd.DataFrame, agg: str = None):
"""Plot simulated revenue data based on a passed date-wise aggregation

Expand Down Expand Up @@ -70,28 +94,26 @@ def _plot_frame_overhead(self, df: pd.DataFrame, agg_level: str = None) -> pd.Da
return plot_frame

def _aggregator(self, plot_frame: pd.DataFrame, agg_level: str) -> pd.DataFrame:
# Identify frequency columns to use mean instead of sum
freq_cols = [c for c in plot_frame.columns if "frequency" in c]
agg_dict = {c: ("mean" if c in freq_cols else "sum") for c in plot_frame.columns if c not in ["date", "week_start", "month_start", "year_start"]}

if agg_level == 'daily':
plot_frame = plot_frame.groupby("date").sum()
plot_frame = plot_frame.groupby("date").agg(agg_dict)

elif agg_level == 'weekly':
plot_frame["week_start"] = plot_frame["date"] - pd.to_timedelta(plot_frame["date"].dt.weekday, unit="D")
if "date" in plot_frame.columns:
del plot_frame["date"]
plot_frame = plot_frame.groupby("week_start").sum()
plot_frame = plot_frame.groupby("week_start").agg(agg_dict)

elif agg_level == 'monthly':
plot_frame["month_start"] = plot_frame["date"] - pd.to_timedelta(
plot_frame["date"].dt.day - 1, unit="D")
if "date" in plot_frame.columns:
del plot_frame["date"]
plot_frame = plot_frame.groupby("month_start").sum()
plot_frame = plot_frame.groupby("month_start").agg(agg_dict)

elif agg_level == 'yearly':
plot_frame["year_start"] = plot_frame["date"] - pd.to_timedelta(
plot_frame["date"].dt.dayofyear - 1, unit="D")
if "date" in plot_frame.columns:
del plot_frame["date"]
plot_frame = plot_frame.groupby("year_start").sum()
plot_frame = plot_frame.groupby("year_start").agg(agg_dict)

return plot_frame

Expand Down
130 changes: 130 additions & 0 deletions tests/test_reach_frequency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import pytest
import pandas as pd
import numpy as np
from pysimmmulator.simulate import Simulate

def test_reach_frequency_generation():
config = {
"basic_params": {
"years": 1,
"channels_impressions": ["TV"],
"channels_clicks": ["Search"],
"frequency_of_campaigns": 7,
"start_date": "2023/01/01",
"true_cvr": {"TV": 0.01, "Search": 0.02},
"revenue_per_conv": 100.0,
},
"baseline_params": {
"base_p": 1000,
"trend_p": 100,
"temp_var": 10,
"temp_coef_mean": 1.0,
"temp_coef_sd": 0.1,
"error_std": 50,
},
"ad_spend_params": {
"campaign_spend_mean": 5000,
"campaign_spend_std": 500,
"max_min_proportion_on_each_channel": {
"TV": {"min": 0.4, "max": 0.6},
},
},
"media_params": {
"true_cpm": {"TV": 10.0},
"true_cpc": {"Search": 1.0},
"noisy_cpm_cpc": {
"TV": {"loc": 0.0, "scale": 1.0},
"Search": {"loc": 0.0, "scale": 0.1},
},
"true_reach_frequency": {
"TV": {"frequency": 2.5}
}
},
"cvr_params": {
"noisy_cvr": {
"TV": {"loc": 1.0, "scale": 0.1},
"Search": {"loc": 1.0, "scale": 0.1},
}
},
"adstock_params": {
"adstock": {
"TV": {"type": "geometric", "params": {"lambda": 0.5}},
"Search": {"type": "geometric", "params": {"lambda": 0.3}},
},
"saturation": {
"TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}},
"Search": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}},
},
},
"output_params": {
"aggregation_level": "daily"
}
}

sim = Simulate()
result = sim.run_with_config(config)
df = result.df

assert "TV_reach" in df.columns
assert "TV_frequency" in df.columns

# Check relationship: impressions / reach = frequency
# We use a small epsilon because of rounding in daily_reach and daily_impressions
# impressions = reach * frequency

# Filter where impressions > 0
test_df = df[df["TV_impressions"] > 0]
assert len(test_df) > 0

for idx, row in test_df.iterrows():
calc_freq = row["TV_impressions"] / row["TV_reach"]
assert pytest.approx(calc_freq, abs=0.1) == row["TV_frequency"]

def test_reach_as_proportion():
config = {
"basic_params": {
"years": 1,
"channels_impressions": ["TV"],
"channels_clicks": [],
"frequency_of_campaigns": 7,
"start_date": "2023/01/01",
"true_cvr": {"TV": 0.01},
"revenue_per_conv": 100.0,
},
"baseline_params": {
"base_p": 1000, "trend_p": 100, "temp_var": 10, "temp_coef_mean": 1.0, "temp_coef_sd": 0.1, "error_std": 50,
},
"ad_spend_params": {
"campaign_spend_mean": 5000, "campaign_spend_std": 500,
"max_min_proportion_on_each_channel": {},
},
"media_params": {
"true_cpm": {"TV": 10.0}, "true_cpc": {},
"noisy_cpm_cpc": {"TV": {"loc": 0.0, "scale": 1.0}},
"true_reach_frequency": {
"TV": {"reach": 0.1} # 10% reach
}
},
"cvr_params": { "noisy_cvr": { "TV": {"loc": 1.0, "scale": 0.1} } },
"adstock_params": {
"adstock": { "TV": {"type": "geometric", "params": {"lambda": 0.5}} },
"saturation": { "TV": {"type": "scurve", "params": {"alpha": 1.0, "gamma": 0.5}} },
},
"output_params": { "aggregation_level": "daily" },
"geo_params": {
"total_population": 1000000,
"count": 5
}
}

sim = Simulate()
result = sim.run_with_config(config)
df = result.df

assert "TV_reach" in df.columns
# With 1M population and 0.1 reach, reach count should be around 100,000 for the campaign.
# Daily reach should be 100,000 / 7 approx 14286.

test_df = df[df["TV_impressions"] > 0]
daily_total_reach = test_df.groupby("date")["TV_reach"].sum()
assert daily_total_reach.mean() == pytest.approx(100000 / 7, abs=10)
Loading
Loading