Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,24 @@ from tippingpoint import MarketingReturnCurve
spends = np.array([1200, 5000, 15000, 25000, 40000])
returns = np.array([200, 1500, 12000, 22000, 28000])

# 2. Fit the Curve
# 2. Fit the Curve (MLE)
model = MarketingReturnCurve.from_historical_data(
spend_array=spends,
return_array=returns,
channel_name="Paid Social",
channel_name="YouTube",
epochs=3000,
lr=0.05
)

# 3. Fit the Curve (Bayesian MCMC)
# This provides posterior distributions and uncertainty intervals
model_bayesian = MarketingReturnCurve.fit_bayesian(
spend_array=spends,
return_array=returns,
channel_name="YouTube (Bayesian)",
n_samples=2000,
chains=4
)
```

### 2. Extracting Intelligence & Inflection Points
Expand All @@ -71,6 +81,12 @@ spend_cap = model.get_diminishing_returns_point(target_mroas)
print(f"Start Scaling At: ${optimal_floor:,.2f}")
print(f"Stop Scaling At: ${spend_cap:,.2f}")

# Access Posterior Samples (if fitted via Bayesian method)
if hasattr(model_bayesian, 'posterior_samples') and model_bayesian.posterior_samples:
print(f"Mean Alpha: {np.mean(model_bayesian.posterior_samples['alpha'])}")
# Access full distribution
beta_samples = model_bayesian.posterior_samples['beta']

# Get a text-based evaluation of your current strategy
model.evaluate_current_budget(current_spend, target_mroas)
```
Expand All @@ -82,11 +98,11 @@ Status: OPTIMAL SCALING ZONE
Recommendation: You are operating within the highly efficient growth window.
```

### 3. Visualization
Generate an executive-ready, dual-axis chart mapping the Incremental Return curve against the Marginal ROAS curve, explicitly highlighting the Optimal Scaling Zone.
### 4. Visualization
Generate an executive-ready, dual-axis chart mapping the Incremental Return curve against the Marginal ROAS curve. If fitted via Bayesian methods, it automatically includes **90% Credible Intervals**.

```python
model.plot_response_curve(target_mroas=1.5, current_spend=12000)
model_bayesian.plot_response_curve(target_mroas=1.5, current_spend=12000)
```

## 🛠 Integrating with existing MMMs (Meridian)
Expand Down
146 changes: 135 additions & 11 deletions src/tippingpoint/curve.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,99 @@ class MarketingReturnCurve:
"""A marketing intelligence tool to determine the inflection points of a media
response curve based on the Hill Function (Google Meridian methodology). """

def __init__(self, beta, alpha, half_saturation_k, channel_name="Generic"):
def __init__(self, beta, alpha, half_saturation_k, channel_name="Generic", posterior_samples=None):
self.beta = float(beta)
self.alpha = float(alpha)
self.K = float(half_saturation_k)
self.channel_name = channel_name
self.posterior_samples = posterior_samples

@classmethod
def fit_bayesian(cls, spend_array, return_array, channel_name="Generic", priors=None, n_samples=2000, chains=4, burn_in=1000):
"""Fits a Hill Curve using Bayesian MCMC (Metropolis-Hastings).
Priors should be a dict: {'beta': (mu, sigma), 'alpha': (mu, sigma), 'K': (mu, sigma)}
where mu and sigma are parameters of a LogNormal distribution. """
x = np.array(spend_array, dtype=float) + 1e-5
y = np.array(return_array, dtype=float)

# Default Priors (LogNormal)
if priors is None:
max_y = np.max(y)
median_x = np.median(x[x > 1e-4]) if np.any(x > 1e-4) else 1.0
priors = {
'beta': (np.log(max_y * 1.2), 0.5),
'alpha': (0.0, 0.5), # Centered at 1.0 (log(1)=0)
'K': (np.log(median_x), 0.5)
}

def log_likelihood(beta, alpha, K, sigma):
if beta <= 0 or alpha <= 0 or K <= 0 or sigma <= 0: return -np.inf
y_pred = (beta * (x ** alpha)) / (K ** alpha + x ** alpha)
return -0.5 * np.sum(((y - y_pred) / sigma) ** 2) - len(y) * np.log(sigma)

def log_prior(beta, alpha, K, sigma):
lp = 0
# LogNormal priors for beta, alpha, K
for name, val in [('beta', beta), ('alpha', alpha), ('K', K)]:
mu, s = priors[name]
lp += -0.5 * ((np.log(val) - mu) / s) ** 2 - np.log(val)
# Half-Normal prior for sigma
lp += -0.5 * (sigma / (np.max(y) * 0.1)) ** 2
return lp

def log_posterior(params):
beta, alpha, K, sigma = params
return log_likelihood(beta, alpha, K, sigma) + log_prior(beta, alpha, K, sigma)

# Simple Metropolis-Hastings Sampler
all_samples = []
for _ in range(chains):
# Initialize
current_params = np.array([
np.exp(priors['beta'][0]),
np.exp(priors['alpha'][0]),
np.exp(priors['K'][0]),
np.std(y) * 0.1
])
current_log_post = log_posterior(current_params)

samples = []
# Adaptive step size (simplified)
step_size = current_params * 0.05

for i in range(n_samples + burn_in):
proposal = current_params + np.random.normal(0, step_size)
proposal_log_post = log_posterior(proposal)

if proposal_log_post > current_log_post or np.random.rand() < np.exp(proposal_log_post - current_log_post):
current_params = proposal
current_log_post = proposal_log_post

if i >= burn_in:
samples.append(current_params.copy())

# Small adaptation during burn-in
if i < burn_in and i % 100 == 0 and i > 0:
# This is a very crude adaptation
pass

all_samples.append(np.array(samples))

posterior = np.vstack(all_samples)
samples_dict = {
'beta': posterior[:, 0],
'alpha': posterior[:, 1],
'K': posterior[:, 2],
'sigma': posterior[:, 3]
}

# Point estimates (posterior mean)
beta_mean = np.mean(samples_dict['beta'])
alpha_mean = np.mean(samples_dict['alpha'])
K_mean = np.mean(samples_dict['K'])

print(f"[{channel_name}] Bayesian fit complete. Samples: {len(posterior)}")
return cls(beta_mean, alpha_mean, K_mean, channel_name, posterior_samples=samples_dict)

@classmethod
def from_historical_data(cls, spend_array, return_array, channel_name="Generic", epochs=5000, lr=0.05):
Expand All @@ -24,12 +112,17 @@ def from_historical_data(cls, spend_array, return_array, channel_name="Generic",
median_x = np.median(spend_array[spend_array > 0]) if np.any(spend_array > 0) else 1.0

Tensor.traning = True
x = Tensor(spend_array, dtype=dtypes.float32, requires_grad=False)
y = Tensor(return_array, dtype=dtypes.float32, requires_grad=False)
x = Tensor(spend_array, dtype=dtypes.float32)
x.requires_grad = False
y = Tensor(return_array, dtype=dtypes.float32)
y.requires_grad = False

log_beta = Tensor([np.log(max_y * 1.5)], dtype=dtypes.float32, requires_grad=True)
log_k = Tensor([np.log(median_x + 1e-5)], dtype=dtypes.float32, requires_grad=True)
log_alpha = Tensor([0.5], dtype=dtypes.float32, requires_grad=True)
log_beta = Tensor([np.log(max_y * 1.5)], dtype=dtypes.float32)
log_beta.requires_grad = True
log_k = Tensor([np.log(median_x + 1e-5)], dtype=dtypes.float32)
log_k.requires_grad = True
log_alpha = Tensor([0.5], dtype=dtypes.float32)
log_alpha.requires_grad = True
optimizer = Adam([log_beta, log_k, log_alpha], lr=lr)

Tensor.traning = True
Expand All @@ -49,14 +142,26 @@ def from_historical_data(cls, spend_array, return_array, channel_name="Generic",
print(f"[{channel_name}] Curve fit complete. Loss: {final_loss:.4f}")
return cls(log_beta.exp().numpy().item(), log_alpha.exp().numpy().item(), log_k.exp().numpy().item(), channel_name)

def predict_incremental_return(self, spend):
def predict_incremental_return(self, spend, use_samples=False):
"""f(x): The baseline Hill Function calculation."""
spend = np.array(spend, dtype=float) + 1e-5
if use_samples and self.posterior_samples:
beta = self.posterior_samples['beta'][:, np.newaxis]
alpha = self.posterior_samples['alpha'][:, np.newaxis]
K = self.posterior_samples['K'][:, np.newaxis]
return (beta * (spend ** alpha)) / (K ** alpha + spend ** alpha)
return (self.beta * (spend ** self.alpha)) / (self.K ** self.alpha + spend ** self.alpha)

def predict_marginal_return(self, spend):
def predict_marginal_return(self, spend, use_samples=False):
"""f'(x): The first derivative (Marginal ROAS / mCPA inverse)."""
spend = np.array(spend, dtype=float) + 1e-5
if use_samples and self.posterior_samples:
beta = self.posterior_samples['beta'][:, np.newaxis]
alpha = self.posterior_samples['alpha'][:, np.newaxis]
K = self.posterior_samples['K'][:, np.newaxis]
numerator = beta * alpha * (K ** alpha) * (spend ** (alpha - 1))
denominator = (K ** alpha + spend ** alpha) ** 2
return numerator / denominator
numerator = self.beta * self.alpha * (self.K ** self.alpha) * (spend ** (self.alpha - 1))
denominator = (self.K ** self.alpha + spend ** self.alpha) ** 2
return numerator / denominator
Expand Down Expand Up @@ -104,7 +209,7 @@ def evaluate_current_budget(self, current_spend, target_mroas=1.0):
elif max_spend is not None and current_spend > max_spend: print("Status: OVER-SATURATED (Unprofitable Marginal Growth)\n Recommendation: Scale back spend to ${max_spend:,.2f} to maintain target unit economics.")
else: print("Status: OPTIMAL SCALING ZONE.\nRecommendation: You are operating within the highly efficient growth window.")

def plot_response_curve(self, target_mroas=1.0, current_spend=None):
def plot_response_curve(self, target_mroas=1.0, current_spend=None, show_intervals=True):
"""Generates an executive-friendly dual-axis chart mapping the optimal scaling zone."""
min_spend = self.get_minimal_marginal_cost_point()
max_spend = self.get_diminishing_returns_point(target_mroas)
Expand All @@ -113,19 +218,38 @@ def plot_response_curve(self, target_mroas=1.0, current_spend=None):
plot_limit = max(plot_limit, current_spend * 1.2 if current_spend else 0)

x_vals = np.linspace(0, plot_limit, 500)
y_return = self.predict_incremental_return(x_vals)
y_mroas = self.predict_marginal_return(x_vals)

if show_intervals and self.posterior_samples:
y_returns_dist = self.predict_incremental_return(x_vals, use_samples=True)
y_return = np.mean(y_returns_dist, axis=0)
y_return_low = np.percentile(y_returns_dist, 5, axis=0)
y_return_high = np.percentile(y_returns_dist, 95, axis=0)

y_mroas_dist = self.predict_marginal_return(x_vals, use_samples=True)
y_mroas = np.mean(y_mroas_dist, axis=0)
y_mroas_low = np.percentile(y_mroas_dist, 5, axis=0)
y_mroas_high = np.percentile(y_mroas_dist, 95, axis=0)
else:
y_return = self.predict_incremental_return(x_vals)
y_mroas = self.predict_marginal_return(x_vals)

fig, ax1 = plt.subplots(figsize=(12, 6))
# Primary Axis: Response Curve
ax1.plot(x_vals, y_return, color='#2CA02C', linewidth=3, label="Incremental Return")
if show_intervals and self.posterior_samples:
ax1.fill_between(x_vals, y_return_low, y_return_high, color='#2CA02C', alpha=0.2, label="90% Credible Interval")

ax1.set_xlabel('Spend ($)', fontsize=12, fontweight='bold')
ax1.set_ylabel('Incremental Return', color='#2CA02C', fontsize=12, fontweight='bold')
ax1.tick_params(axis='y', labelcolor='#2CA02C')
ax1.grid(True, linestyle='--', alpha=0.5)

# Secondary Axis: Marginal Return
ax2 = ax1.twinx()
ax2.plot(x_vals, y_mroas, color='#1F77B4', linestyle='--', linewidth=2, label="Marginal ROAS")
if show_intervals and self.posterior_samples:
ax2.fill_between(x_vals, y_mroas_low, y_mroas_high, color='#1F77B4', alpha=0.1)

ax2.set_ylabel('Marginal ROAS (mROAS)', color='#1F77B4', fontsize=12, fontweight='bold')
ax2.tick_params(axis='y', labelcolor='#1F77B4')
ax2.axhline(target_mroas, color='gray', linestyle=':', label="Target mROAS Floor")
Expand Down
61 changes: 61 additions & 0 deletions tests/test_bayesian.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import numpy as np
import pytest
from tippingpoint.curve import MarketingReturnCurve

@pytest.fixture
def synthetic_data():
np.random.seed(42)
x = np.linspace(1000, 50000, 20)
beta_true = 100000
alpha_true = 1.5
K_true = 20000
y_true = (beta_true * (x ** alpha_true)) / (K_true ** alpha_true + x ** alpha_true)
y = y_true + np.random.normal(0, 1000, size=x.shape)
y = np.maximum(y, 0)
return x, y

def test_fit_bayesian_basic(synthetic_data):
x, y = synthetic_data
model = MarketingReturnCurve.fit_bayesian(x, y, n_samples=500, burn_in=100, chains=2)

assert model.beta > 0
assert model.alpha > 0
assert model.K > 0
assert model.posterior_samples is not None
assert 'beta' in model.posterior_samples
assert len(model.posterior_samples['beta']) == 1000 # 500 * 2

def test_fit_bayesian_with_priors(synthetic_data):
x, y = synthetic_data
priors = {
'beta': (np.log(100000), 0.1),
'alpha': (np.log(1.5), 0.1),
'K': (np.log(20000), 0.1)
}
model = MarketingReturnCurve.fit_bayesian(x, y, priors=priors, n_samples=200, burn_in=50, chains=1)

# Check if results are close to true values due to tight priors
assert 90000 < model.beta < 110000
assert 1.3 < model.alpha < 1.7
assert 18000 < model.K < 22000

def test_predict_with_samples(synthetic_data):
x, y = synthetic_data
model = MarketingReturnCurve.fit_bayesian(x, y, n_samples=100, burn_in=50, chains=1)

# Incremental return
preds = model.predict_incremental_return([1000, 2000], use_samples=True)
assert preds.shape == (100, 2)

# Marginal return
m_preds = model.predict_marginal_return([1000, 2000], use_samples=True)
assert m_preds.shape == (100, 2)

def test_plot_with_samples(synthetic_data):
# Just ensure it doesn't crash
x, y = synthetic_data
model = MarketingReturnCurve.fit_bayesian(x, y, n_samples=50, burn_in=10, chains=1)
# Mock plt.show to avoid blocking
import matplotlib.pyplot as plt
plt.show = lambda: None
model.plot_response_curve()
Loading