Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions skexplain/common/multiprocessing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ def run_parallel(
total = len(args_list)
n_jobs = _resolve_n_jobs(n_jobs)

is_parallel = n_jobs != 1
# Auto-select: only go parallel if there are enough tasks to justify
# the overhead of spawning workers. For small task counts, serial is faster.
MIN_TASKS_FOR_PARALLEL = 3
is_parallel = n_jobs != 1 and total >= MIN_TASKS_FOR_PARALLEL

logger.debug(
"run_parallel: %s (%d tasks, n_jobs=%d, parallel=%s)",
Expand All @@ -155,8 +158,11 @@ def run_parallel(
start_time = time.perf_counter()

if is_parallel:
# Use 'threading' backend by default — avoids pickling overhead
# and works well when the GIL is released (sklearn predict, numpy ops).
backend = "threading"
with tqdm_joblib(tqdm(total=total, desc=description)):
results = Parallel(n_jobs=n_jobs, backend="loky")(
results = Parallel(n_jobs=n_jobs, backend=backend)(
delayed(_safe_call)(func, _ensure_tuple(args), kwargs)
for args in args_list
)
Expand Down
22 changes: 21 additions & 1 deletion skexplain/main/PermutationImportance/selection_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,32 @@ def __init__(
scoring_inputs.index if isinstance(scoring_inputs, pd.DataFrame) else None
)

# Pre-convert to numpy for fast column swapping
self._scoring_np = (
scoring_inputs.values if isinstance(scoring_inputs, pd.DataFrame)
else np.asarray(scoring_inputs)
)
self._shuffled_np = (
self.shuffled_scoring_inputs.values
if isinstance(self.shuffled_scoring_inputs, pd.DataFrame)
else np.asarray(self.shuffled_scoring_inputs)
)

def generate_datasets(self, important_variables):
"""Check each of the non-important variables. Dataset has columns which
are important shuffled
are important shuffled.
:returns: (training_data, scoring_data)
"""
scoring_inputs, scoring_outputs = self.scoring_data

# Fast path: numpy in-place column swap instead of pd.concat reassembly
if hasattr(self, '_scoring_np'):
complete = self._scoring_np.copy()
for i in important_variables:
complete[:, i] = self._shuffled_np[:, i]
return self.training_data, (complete, scoring_outputs)

# Fallback: original pandas path
complete_scoring_inputs = make_data_from_columns(
[
get_data_subset(
Expand Down
15 changes: 14 additions & 1 deletion skexplain/main/PermutationImportance/sklearn_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,20 @@ def __call__(self, training_data, scoring_data, var_idx):
(self.X_score, self.y_score) = scoring_data

permuted_set = [self.get_permuted_data(idx, var_idx) for idx in range(self.n_permute)]
scores = np.array([self._scorer(*arg) for arg in permuted_set])

if self.n_permute > 1 and var_idx is not None:
# Batch all permutations into a single predict call for speed.
# Stack X arrays, predict once, then split and evaluate.
X_all = np.vstack([X for X, _ in permuted_set])
n_per = len(permuted_set[0][0])
all_preds = self.prediction_fn(self.model, X_all)
scores = []
for i, (_, y_i) in enumerate(permuted_set):
preds_i = all_preds[i * n_per:(i + 1) * n_per]
scores.append(self.evaluation_fn(y_i, preds_i))
scores = np.array(scores)
else:
scores = np.array([self._scorer(*arg) for arg in permuted_set])

return np.array(scores)

Expand Down
125 changes: 120 additions & 5 deletions tests/benchmark_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,128 @@ def run_benchmarks(n_samples=2000):
return results


def run_stress_test():
"""Heavy benchmark: 10000 samples, 30 features, 100 trees."""
N, F, T = 10000, 30, 100
print(f"\n{'='*60}")
print(f"STRESS TEST: {N} samples, {F} features, {T}-tree RF")
print(f"{'='*60}")

np.random.seed(42)
X = pd.DataFrame(
np.random.randn(N, F),
columns=[f"f{i}" for i in range(F)],
)
y = (X["f0"] * 2 + X["f1"] - X["f2"] * 0.5 > 0).astype(int).values
rf = RandomForestClassifier(
n_estimators=T, max_depth=8, random_state=42, n_jobs=1,
)
rf.fit(X, y)
exp = skexplain.ExplainToolkit([("RF", rf)], X=X, y=y)

results = {}

# Baseline: raw predict overhead
results["predict_proba_10x"] = bench(
f"Raw predict_proba ×10 ({N} samples)",
lambda: [rf.predict_proba(X.values) for _ in range(10)],
n_runs=3,
)

# Permutation importance
results["perm_imp_10v_10p"] = bench(
"Perm Imp (10 vars, 10 permutes)",
lambda: exp.permutation_importance(n_vars=10, evaluation_fn="auc", n_permute=10),
n_runs=2,
)

# ALE
results["ale_1d_all_1boot"] = bench(
f"ALE 1D (all {F} features, 30 bins, 1 boot)",
lambda: exp.ale(features="all", n_bins=30),
n_runs=2,
)

results["ale_1d_all_10boot"] = bench(
f"ALE 1D (all {F} features, 30 bins, 10 boot)",
lambda: exp.ale(features="all", n_bins=30, n_bootstrap=10),
n_runs=2,
)

results["ale_1d_10feat_20boot"] = bench(
"ALE 1D (10 features, 30 bins, 20 boot)",
lambda: exp.ale(features=[f"f{i}" for i in range(10)], n_bins=30, n_bootstrap=20),
n_runs=2,
)

# PD
results["pd_1d_5feat_1boot"] = bench(
"PD 1D (5 feat, 30 bins, 1 boot)",
lambda: exp.pd(features=[f"f{i}" for i in range(5)], n_bins=30),
n_runs=2,
)

results["pd_1d_5feat_10boot"] = bench(
"PD 1D (5 feat, 30 bins, 10 boot)",
lambda: exp.pd(features=[f"f{i}" for i in range(5)], n_bins=30, n_bootstrap=10),
n_runs=2,
)

results["pd_1d_5feat_20boot"] = bench(
"PD 1D (5 feat, 30 bins, 20 boot)",
lambda: exp.pd(features=[f"f{i}" for i in range(5)], n_bins=30, n_bootstrap=20),
n_runs=2,
)

# ICE
results["ice_3feat_30bins_200sub"] = bench(
"ICE (3 feat, 30 bins, 200 sub)",
lambda: exp.ice(features=["f0", "f1", "f2"], n_bins=30, subsample=200),
n_runs=2,
)

# 2D ALE
results["ale_2d_1pair_20bins"] = bench(
"2D ALE (1 pair, 20 bins)",
lambda: exp.ale(features=[("f0", "f1")], n_bins=20),
n_runs=2,
)

results["ale_2d_3pairs_15bins"] = bench(
"2D ALE (3 pairs, 15 bins)",
lambda: exp.ale(features=[("f0", "f1"), ("f0", "f2"), ("f1", "f2")], n_bins=15),
n_runs=2,
)

# Parallel comparison
results["ale_1d_all_1boot_2jobs"] = bench(
f"ALE 1D (all {F}, 30 bins, 1 boot, n_jobs=2)",
lambda: exp.ale(features="all", n_bins=30, n_jobs=2),
n_runs=2,
)

results["pd_1d_5feat_10boot_2jobs"] = bench(
"PD 1D (5 feat, 30 bins, 10 boot, n_jobs=2)",
lambda: exp.pd(features=[f"f{i}" for i in range(5)], n_bins=30, n_bootstrap=10, n_jobs=2),
n_runs=2,
)

return results


if __name__ == "__main__":
all_results = {}
for n in [2000]:
all_results[n] = run_benchmarks(n)
# Standard benchmark
std_results = run_benchmarks(2000)

# Stress test
stress_results = run_stress_test()

print(f"\n{'='*60}")
print("Summary (seconds)")
print("SUMMARY")
print(f"{'='*60}")
for method, t in all_results[2000].items():
print("\nStandard (2000 samples, 10 features, 50 trees):")
for method, t in std_results.items():
print(f" {method}: {t:.4f}s")
print(f"\nStress (10000 samples, 30 features, 100 trees):")
for method, t in stress_results.items():
print(f" {method}: {t:.4f}s")
Loading