diff --git a/skexplain/common/multiprocessing_utils.py b/skexplain/common/multiprocessing_utils.py index cd238f0..c745993 100644 --- a/skexplain/common/multiprocessing_utils.py +++ b/skexplain/common/multiprocessing_utils.py @@ -145,7 +145,10 @@ def run_parallel( total = len(args_list) n_jobs = _resolve_n_jobs(n_jobs) - is_parallel = n_jobs != 1 + # Auto-select: only go parallel if there are enough tasks to justify + # the overhead of spawning workers. For small task counts, serial is faster. + MIN_TASKS_FOR_PARALLEL = 3 + is_parallel = n_jobs != 1 and total >= MIN_TASKS_FOR_PARALLEL logger.debug( "run_parallel: %s (%d tasks, n_jobs=%d, parallel=%s)", @@ -155,8 +158,11 @@ def run_parallel( start_time = time.perf_counter() if is_parallel: + # Use 'threading' backend by default — avoids pickling overhead + # and works well when the GIL is released (sklearn predict, numpy ops). + backend = "threading" with tqdm_joblib(tqdm(total=total, desc=description)): - results = Parallel(n_jobs=n_jobs, backend="loky")( + results = Parallel(n_jobs=n_jobs, backend=backend)( delayed(_safe_call)(func, _ensure_tuple(args), kwargs) for args in args_list ) diff --git a/skexplain/main/PermutationImportance/selection_strategies.py b/skexplain/main/PermutationImportance/selection_strategies.py index 92d85c0..66d316a 100644 --- a/skexplain/main/PermutationImportance/selection_strategies.py +++ b/skexplain/main/PermutationImportance/selection_strategies.py @@ -169,12 +169,32 @@ def __init__( scoring_inputs.index if isinstance(scoring_inputs, pd.DataFrame) else None ) + # Pre-convert to numpy for fast column swapping + self._scoring_np = ( + scoring_inputs.values if isinstance(scoring_inputs, pd.DataFrame) + else np.asarray(scoring_inputs) + ) + self._shuffled_np = ( + self.shuffled_scoring_inputs.values + if isinstance(self.shuffled_scoring_inputs, pd.DataFrame) + else np.asarray(self.shuffled_scoring_inputs) + ) + def generate_datasets(self, important_variables): """Check each of the non-important variables. Dataset has columns which - are important shuffled + are important shuffled. :returns: (training_data, scoring_data) """ scoring_inputs, scoring_outputs = self.scoring_data + + # Fast path: numpy in-place column swap instead of pd.concat reassembly + if hasattr(self, '_scoring_np'): + complete = self._scoring_np.copy() + for i in important_variables: + complete[:, i] = self._shuffled_np[:, i] + return self.training_data, (complete, scoring_outputs) + + # Fallback: original pandas path complete_scoring_inputs = make_data_from_columns( [ get_data_subset( diff --git a/skexplain/main/PermutationImportance/sklearn_api.py b/skexplain/main/PermutationImportance/sklearn_api.py index 8d8a485..e552f24 100755 --- a/skexplain/main/PermutationImportance/sklearn_api.py +++ b/skexplain/main/PermutationImportance/sklearn_api.py @@ -207,7 +207,20 @@ def __call__(self, training_data, scoring_data, var_idx): (self.X_score, self.y_score) = scoring_data permuted_set = [self.get_permuted_data(idx, var_idx) for idx in range(self.n_permute)] - scores = np.array([self._scorer(*arg) for arg in permuted_set]) + + if self.n_permute > 1 and var_idx is not None: + # Batch all permutations into a single predict call for speed. + # Stack X arrays, predict once, then split and evaluate. + X_all = np.vstack([X for X, _ in permuted_set]) + n_per = len(permuted_set[0][0]) + all_preds = self.prediction_fn(self.model, X_all) + scores = [] + for i, (_, y_i) in enumerate(permuted_set): + preds_i = all_preds[i * n_per:(i + 1) * n_per] + scores.append(self.evaluation_fn(y_i, preds_i)) + scores = np.array(scores) + else: + scores = np.array([self._scorer(*arg) for arg in permuted_set]) return np.array(scores) diff --git a/tests/benchmark_suite.py b/tests/benchmark_suite.py index 0211fb5..a7f9d6c 100644 --- a/tests/benchmark_suite.py +++ b/tests/benchmark_suite.py @@ -96,13 +96,128 @@ def run_benchmarks(n_samples=2000): return results +def run_stress_test(): + """Heavy benchmark: 10000 samples, 30 features, 100 trees.""" + N, F, T = 10000, 30, 100 + print(f"\n{'='*60}") + print(f"STRESS TEST: {N} samples, {F} features, {T}-tree RF") + print(f"{'='*60}") + + np.random.seed(42) + X = pd.DataFrame( + np.random.randn(N, F), + columns=[f"f{i}" for i in range(F)], + ) + y = (X["f0"] * 2 + X["f1"] - X["f2"] * 0.5 > 0).astype(int).values + rf = RandomForestClassifier( + n_estimators=T, max_depth=8, random_state=42, n_jobs=1, + ) + rf.fit(X, y) + exp = skexplain.ExplainToolkit([("RF", rf)], X=X, y=y) + + results = {} + + # Baseline: raw predict overhead + results["predict_proba_10x"] = bench( + f"Raw predict_proba ×10 ({N} samples)", + lambda: [rf.predict_proba(X.values) for _ in range(10)], + n_runs=3, + ) + + # Permutation importance + results["perm_imp_10v_10p"] = bench( + "Perm Imp (10 vars, 10 permutes)", + lambda: exp.permutation_importance(n_vars=10, evaluation_fn="auc", n_permute=10), + n_runs=2, + ) + + # ALE + results["ale_1d_all_1boot"] = bench( + f"ALE 1D (all {F} features, 30 bins, 1 boot)", + lambda: exp.ale(features="all", n_bins=30), + n_runs=2, + ) + + results["ale_1d_all_10boot"] = bench( + f"ALE 1D (all {F} features, 30 bins, 10 boot)", + lambda: exp.ale(features="all", n_bins=30, n_bootstrap=10), + n_runs=2, + ) + + results["ale_1d_10feat_20boot"] = bench( + "ALE 1D (10 features, 30 bins, 20 boot)", + lambda: exp.ale(features=[f"f{i}" for i in range(10)], n_bins=30, n_bootstrap=20), + n_runs=2, + ) + + # PD + results["pd_1d_5feat_1boot"] = bench( + "PD 1D (5 feat, 30 bins, 1 boot)", + lambda: exp.pd(features=[f"f{i}" for i in range(5)], n_bins=30), + n_runs=2, + ) + + results["pd_1d_5feat_10boot"] = bench( + "PD 1D (5 feat, 30 bins, 10 boot)", + lambda: exp.pd(features=[f"f{i}" for i in range(5)], n_bins=30, n_bootstrap=10), + n_runs=2, + ) + + results["pd_1d_5feat_20boot"] = bench( + "PD 1D (5 feat, 30 bins, 20 boot)", + lambda: exp.pd(features=[f"f{i}" for i in range(5)], n_bins=30, n_bootstrap=20), + n_runs=2, + ) + + # ICE + results["ice_3feat_30bins_200sub"] = bench( + "ICE (3 feat, 30 bins, 200 sub)", + lambda: exp.ice(features=["f0", "f1", "f2"], n_bins=30, subsample=200), + n_runs=2, + ) + + # 2D ALE + results["ale_2d_1pair_20bins"] = bench( + "2D ALE (1 pair, 20 bins)", + lambda: exp.ale(features=[("f0", "f1")], n_bins=20), + n_runs=2, + ) + + results["ale_2d_3pairs_15bins"] = bench( + "2D ALE (3 pairs, 15 bins)", + lambda: exp.ale(features=[("f0", "f1"), ("f0", "f2"), ("f1", "f2")], n_bins=15), + n_runs=2, + ) + + # Parallel comparison + results["ale_1d_all_1boot_2jobs"] = bench( + f"ALE 1D (all {F}, 30 bins, 1 boot, n_jobs=2)", + lambda: exp.ale(features="all", n_bins=30, n_jobs=2), + n_runs=2, + ) + + results["pd_1d_5feat_10boot_2jobs"] = bench( + "PD 1D (5 feat, 30 bins, 10 boot, n_jobs=2)", + lambda: exp.pd(features=[f"f{i}" for i in range(5)], n_bins=30, n_bootstrap=10, n_jobs=2), + n_runs=2, + ) + + return results + + if __name__ == "__main__": - all_results = {} - for n in [2000]: - all_results[n] = run_benchmarks(n) + # Standard benchmark + std_results = run_benchmarks(2000) + + # Stress test + stress_results = run_stress_test() print(f"\n{'='*60}") - print("Summary (seconds)") + print("SUMMARY") print(f"{'='*60}") - for method, t in all_results[2000].items(): + print("\nStandard (2000 samples, 10 features, 50 trees):") + for method, t in std_results.items(): + print(f" {method}: {t:.4f}s") + print(f"\nStress (10000 samples, 30 features, 100 trees):") + for method, t in stress_results.items(): print(f" {method}: {t:.4f}s")