diff --git a/README.md b/README.md index 5457230..1f6f2aa 100644 --- a/README.md +++ b/README.md @@ -47,14 +47,24 @@ from tippingpoint import MarketingReturnCurve spends = np.array([1200, 5000, 15000, 25000, 40000]) returns = np.array([200, 1500, 12000, 22000, 28000]) -# 2. Fit the Curve +# 2. Fit the Curve (MLE) model = MarketingReturnCurve.from_historical_data( spend_array=spends, return_array=returns, - channel_name="Paid Social", + channel_name="YouTube", epochs=3000, lr=0.05 ) + +# 3. Fit the Curve (Bayesian MCMC) +# This provides posterior distributions and uncertainty intervals +model_bayesian = MarketingReturnCurve.fit_bayesian( + spend_array=spends, + return_array=returns, + channel_name="YouTube (Bayesian)", + n_samples=2000, + chains=4 +) ``` ### 2. Extracting Intelligence & Inflection Points @@ -71,6 +81,12 @@ spend_cap = model.get_diminishing_returns_point(target_mroas) print(f"Start Scaling At: ${optimal_floor:,.2f}") print(f"Stop Scaling At: ${spend_cap:,.2f}") +# Access Posterior Samples (if fitted via Bayesian method) +if hasattr(model_bayesian, 'posterior_samples') and model_bayesian.posterior_samples: + print(f"Mean Alpha: {np.mean(model_bayesian.posterior_samples['alpha'])}") + # Access full distribution + beta_samples = model_bayesian.posterior_samples['beta'] + # Get a text-based evaluation of your current strategy model.evaluate_current_budget(current_spend, target_mroas) ``` @@ -82,11 +98,11 @@ Status: OPTIMAL SCALING ZONE Recommendation: You are operating within the highly efficient growth window. ``` -### 3. Visualization -Generate an executive-ready, dual-axis chart mapping the Incremental Return curve against the Marginal ROAS curve, explicitly highlighting the Optimal Scaling Zone. +### 4. Visualization +Generate an executive-ready, dual-axis chart mapping the Incremental Return curve against the Marginal ROAS curve. If fitted via Bayesian methods, it automatically includes **90% Credible Intervals**. ```python -model.plot_response_curve(target_mroas=1.5, current_spend=12000) +model_bayesian.plot_response_curve(target_mroas=1.5, current_spend=12000) ``` ## 🛠 Integrating with existing MMMs (Meridian) diff --git a/src/tippingpoint/curve.py b/src/tippingpoint/curve.py index aa4cab4..bc928ba 100644 --- a/src/tippingpoint/curve.py +++ b/src/tippingpoint/curve.py @@ -10,11 +10,99 @@ class MarketingReturnCurve: """A marketing intelligence tool to determine the inflection points of a media response curve based on the Hill Function (Google Meridian methodology). """ - def __init__(self, beta, alpha, half_saturation_k, channel_name="Generic"): + def __init__(self, beta, alpha, half_saturation_k, channel_name="Generic", posterior_samples=None): self.beta = float(beta) self.alpha = float(alpha) self.K = float(half_saturation_k) self.channel_name = channel_name + self.posterior_samples = posterior_samples + + @classmethod + def fit_bayesian(cls, spend_array, return_array, channel_name="Generic", priors=None, n_samples=2000, chains=4, burn_in=1000): + """Fits a Hill Curve using Bayesian MCMC (Metropolis-Hastings). + Priors should be a dict: {'beta': (mu, sigma), 'alpha': (mu, sigma), 'K': (mu, sigma)} + where mu and sigma are parameters of a LogNormal distribution. """ + x = np.array(spend_array, dtype=float) + 1e-5 + y = np.array(return_array, dtype=float) + + # Default Priors (LogNormal) + if priors is None: + max_y = np.max(y) + median_x = np.median(x[x > 1e-4]) if np.any(x > 1e-4) else 1.0 + priors = { + 'beta': (np.log(max_y * 1.2), 0.5), + 'alpha': (0.0, 0.5), # Centered at 1.0 (log(1)=0) + 'K': (np.log(median_x), 0.5) + } + + def log_likelihood(beta, alpha, K, sigma): + if beta <= 0 or alpha <= 0 or K <= 0 or sigma <= 0: return -np.inf + y_pred = (beta * (x ** alpha)) / (K ** alpha + x ** alpha) + return -0.5 * np.sum(((y - y_pred) / sigma) ** 2) - len(y) * np.log(sigma) + + def log_prior(beta, alpha, K, sigma): + lp = 0 + # LogNormal priors for beta, alpha, K + for name, val in [('beta', beta), ('alpha', alpha), ('K', K)]: + mu, s = priors[name] + lp += -0.5 * ((np.log(val) - mu) / s) ** 2 - np.log(val) + # Half-Normal prior for sigma + lp += -0.5 * (sigma / (np.max(y) * 0.1)) ** 2 + return lp + + def log_posterior(params): + beta, alpha, K, sigma = params + return log_likelihood(beta, alpha, K, sigma) + log_prior(beta, alpha, K, sigma) + + # Simple Metropolis-Hastings Sampler + all_samples = [] + for _ in range(chains): + # Initialize + current_params = np.array([ + np.exp(priors['beta'][0]), + np.exp(priors['alpha'][0]), + np.exp(priors['K'][0]), + np.std(y) * 0.1 + ]) + current_log_post = log_posterior(current_params) + + samples = [] + # Adaptive step size (simplified) + step_size = current_params * 0.05 + + for i in range(n_samples + burn_in): + proposal = current_params + np.random.normal(0, step_size) + proposal_log_post = log_posterior(proposal) + + if proposal_log_post > current_log_post or np.random.rand() < np.exp(proposal_log_post - current_log_post): + current_params = proposal + current_log_post = proposal_log_post + + if i >= burn_in: + samples.append(current_params.copy()) + + # Small adaptation during burn-in + if i < burn_in and i % 100 == 0 and i > 0: + # This is a very crude adaptation + pass + + all_samples.append(np.array(samples)) + + posterior = np.vstack(all_samples) + samples_dict = { + 'beta': posterior[:, 0], + 'alpha': posterior[:, 1], + 'K': posterior[:, 2], + 'sigma': posterior[:, 3] + } + + # Point estimates (posterior mean) + beta_mean = np.mean(samples_dict['beta']) + alpha_mean = np.mean(samples_dict['alpha']) + K_mean = np.mean(samples_dict['K']) + + print(f"[{channel_name}] Bayesian fit complete. Samples: {len(posterior)}") + return cls(beta_mean, alpha_mean, K_mean, channel_name, posterior_samples=samples_dict) @classmethod def from_historical_data(cls, spend_array, return_array, channel_name="Generic", epochs=5000, lr=0.05): @@ -24,12 +112,17 @@ def from_historical_data(cls, spend_array, return_array, channel_name="Generic", median_x = np.median(spend_array[spend_array > 0]) if np.any(spend_array > 0) else 1.0 Tensor.traning = True - x = Tensor(spend_array, dtype=dtypes.float32, requires_grad=False) - y = Tensor(return_array, dtype=dtypes.float32, requires_grad=False) + x = Tensor(spend_array, dtype=dtypes.float32) + x.requires_grad = False + y = Tensor(return_array, dtype=dtypes.float32) + y.requires_grad = False - log_beta = Tensor([np.log(max_y * 1.5)], dtype=dtypes.float32, requires_grad=True) - log_k = Tensor([np.log(median_x + 1e-5)], dtype=dtypes.float32, requires_grad=True) - log_alpha = Tensor([0.5], dtype=dtypes.float32, requires_grad=True) + log_beta = Tensor([np.log(max_y * 1.5)], dtype=dtypes.float32) + log_beta.requires_grad = True + log_k = Tensor([np.log(median_x + 1e-5)], dtype=dtypes.float32) + log_k.requires_grad = True + log_alpha = Tensor([0.5], dtype=dtypes.float32) + log_alpha.requires_grad = True optimizer = Adam([log_beta, log_k, log_alpha], lr=lr) Tensor.traning = True @@ -49,14 +142,26 @@ def from_historical_data(cls, spend_array, return_array, channel_name="Generic", print(f"[{channel_name}] Curve fit complete. Loss: {final_loss:.4f}") return cls(log_beta.exp().numpy().item(), log_alpha.exp().numpy().item(), log_k.exp().numpy().item(), channel_name) - def predict_incremental_return(self, spend): + def predict_incremental_return(self, spend, use_samples=False): """f(x): The baseline Hill Function calculation.""" spend = np.array(spend, dtype=float) + 1e-5 + if use_samples and self.posterior_samples: + beta = self.posterior_samples['beta'][:, np.newaxis] + alpha = self.posterior_samples['alpha'][:, np.newaxis] + K = self.posterior_samples['K'][:, np.newaxis] + return (beta * (spend ** alpha)) / (K ** alpha + spend ** alpha) return (self.beta * (spend ** self.alpha)) / (self.K ** self.alpha + spend ** self.alpha) - def predict_marginal_return(self, spend): + def predict_marginal_return(self, spend, use_samples=False): """f'(x): The first derivative (Marginal ROAS / mCPA inverse).""" spend = np.array(spend, dtype=float) + 1e-5 + if use_samples and self.posterior_samples: + beta = self.posterior_samples['beta'][:, np.newaxis] + alpha = self.posterior_samples['alpha'][:, np.newaxis] + K = self.posterior_samples['K'][:, np.newaxis] + numerator = beta * alpha * (K ** alpha) * (spend ** (alpha - 1)) + denominator = (K ** alpha + spend ** alpha) ** 2 + return numerator / denominator numerator = self.beta * self.alpha * (self.K ** self.alpha) * (spend ** (self.alpha - 1)) denominator = (self.K ** self.alpha + spend ** self.alpha) ** 2 return numerator / denominator @@ -104,7 +209,7 @@ def evaluate_current_budget(self, current_spend, target_mroas=1.0): elif max_spend is not None and current_spend > max_spend: print("Status: OVER-SATURATED (Unprofitable Marginal Growth)\n Recommendation: Scale back spend to ${max_spend:,.2f} to maintain target unit economics.") else: print("Status: OPTIMAL SCALING ZONE.\nRecommendation: You are operating within the highly efficient growth window.") - def plot_response_curve(self, target_mroas=1.0, current_spend=None): + def plot_response_curve(self, target_mroas=1.0, current_spend=None, show_intervals=True): """Generates an executive-friendly dual-axis chart mapping the optimal scaling zone.""" min_spend = self.get_minimal_marginal_cost_point() max_spend = self.get_diminishing_returns_point(target_mroas) @@ -113,19 +218,38 @@ def plot_response_curve(self, target_mroas=1.0, current_spend=None): plot_limit = max(plot_limit, current_spend * 1.2 if current_spend else 0) x_vals = np.linspace(0, plot_limit, 500) - y_return = self.predict_incremental_return(x_vals) - y_mroas = self.predict_marginal_return(x_vals) + + if show_intervals and self.posterior_samples: + y_returns_dist = self.predict_incremental_return(x_vals, use_samples=True) + y_return = np.mean(y_returns_dist, axis=0) + y_return_low = np.percentile(y_returns_dist, 5, axis=0) + y_return_high = np.percentile(y_returns_dist, 95, axis=0) + + y_mroas_dist = self.predict_marginal_return(x_vals, use_samples=True) + y_mroas = np.mean(y_mroas_dist, axis=0) + y_mroas_low = np.percentile(y_mroas_dist, 5, axis=0) + y_mroas_high = np.percentile(y_mroas_dist, 95, axis=0) + else: + y_return = self.predict_incremental_return(x_vals) + y_mroas = self.predict_marginal_return(x_vals) fig, ax1 = plt.subplots(figsize=(12, 6)) # Primary Axis: Response Curve ax1.plot(x_vals, y_return, color='#2CA02C', linewidth=3, label="Incremental Return") + if show_intervals and self.posterior_samples: + ax1.fill_between(x_vals, y_return_low, y_return_high, color='#2CA02C', alpha=0.2, label="90% Credible Interval") + ax1.set_xlabel('Spend ($)', fontsize=12, fontweight='bold') ax1.set_ylabel('Incremental Return', color='#2CA02C', fontsize=12, fontweight='bold') ax1.tick_params(axis='y', labelcolor='#2CA02C') ax1.grid(True, linestyle='--', alpha=0.5) + # Secondary Axis: Marginal Return ax2 = ax1.twinx() ax2.plot(x_vals, y_mroas, color='#1F77B4', linestyle='--', linewidth=2, label="Marginal ROAS") + if show_intervals and self.posterior_samples: + ax2.fill_between(x_vals, y_mroas_low, y_mroas_high, color='#1F77B4', alpha=0.1) + ax2.set_ylabel('Marginal ROAS (mROAS)', color='#1F77B4', fontsize=12, fontweight='bold') ax2.tick_params(axis='y', labelcolor='#1F77B4') ax2.axhline(target_mroas, color='gray', linestyle=':', label="Target mROAS Floor") diff --git a/tests/test_bayesian.py b/tests/test_bayesian.py new file mode 100644 index 0000000..d405eb4 --- /dev/null +++ b/tests/test_bayesian.py @@ -0,0 +1,61 @@ +import numpy as np +import pytest +from tippingpoint.curve import MarketingReturnCurve + +@pytest.fixture +def synthetic_data(): + np.random.seed(42) + x = np.linspace(1000, 50000, 20) + beta_true = 100000 + alpha_true = 1.5 + K_true = 20000 + y_true = (beta_true * (x ** alpha_true)) / (K_true ** alpha_true + x ** alpha_true) + y = y_true + np.random.normal(0, 1000, size=x.shape) + y = np.maximum(y, 0) + return x, y + +def test_fit_bayesian_basic(synthetic_data): + x, y = synthetic_data + model = MarketingReturnCurve.fit_bayesian(x, y, n_samples=500, burn_in=100, chains=2) + + assert model.beta > 0 + assert model.alpha > 0 + assert model.K > 0 + assert model.posterior_samples is not None + assert 'beta' in model.posterior_samples + assert len(model.posterior_samples['beta']) == 1000 # 500 * 2 + +def test_fit_bayesian_with_priors(synthetic_data): + x, y = synthetic_data + priors = { + 'beta': (np.log(100000), 0.1), + 'alpha': (np.log(1.5), 0.1), + 'K': (np.log(20000), 0.1) + } + model = MarketingReturnCurve.fit_bayesian(x, y, priors=priors, n_samples=200, burn_in=50, chains=1) + + # Check if results are close to true values due to tight priors + assert 90000 < model.beta < 110000 + assert 1.3 < model.alpha < 1.7 + assert 18000 < model.K < 22000 + +def test_predict_with_samples(synthetic_data): + x, y = synthetic_data + model = MarketingReturnCurve.fit_bayesian(x, y, n_samples=100, burn_in=50, chains=1) + + # Incremental return + preds = model.predict_incremental_return([1000, 2000], use_samples=True) + assert preds.shape == (100, 2) + + # Marginal return + m_preds = model.predict_marginal_return([1000, 2000], use_samples=True) + assert m_preds.shape == (100, 2) + +def test_plot_with_samples(synthetic_data): + # Just ensure it doesn't crash + x, y = synthetic_data + model = MarketingReturnCurve.fit_bayesian(x, y, n_samples=50, burn_in=10, chains=1) + # Mock plt.show to avoid blocking + import matplotlib.pyplot as plt + plt.show = lambda: None + model.plot_response_curve()