imap with heuristic chunksize to speed up bead spread calculation#41
imap with heuristic chunksize to speed up bead spread calculation#41omgol411 merged 1 commit intoisblab:mainfrom
Conversation
- density and bead spread calculation are combined in a single function `get_bead_spread` which is called in parallel - `chunksize` is determined heuristically similar to `map`
📝 WalkthroughWalkthroughThe PR optimizes parallel bead spread computation by introducing a helper function that combines density and spread calculations, replacing a two-phase parallelization approach with direct parallel iteration using dynamic core selection and explicit chunking. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/main.py`:
- Around line 106-109: The current computation uses args.cores directly so a
zero or negative CLI value makes cores_ non-positive and breaks chunk sizing;
clamp/validate args.cores first (e.g., convert to int and ensure at least 1, and
optionally at most os.cpu_count() - 1), then compute cores_ using that validated
value; update the logic around cores_, args.cores, coords.shape[1], chunksize
and extra so cores_ is always >=1 before performing divmod on coords.shape[1].
- Around line 86-90: get_bead_spread currently forces binding of large objects
via functools.partial which causes huge per-task serialization; instead add an
initializer function (e.g., init_worker) that accepts and stores module-level
globals (coords, mass, radius, grid, voxel_size, n_breaks) for each worker,
change get_bead_spread(i) to only accept the bead index and call
main_density_calc(i, coords, mass, radius, grid, voxel_size, n_breaks) and
calc_bead_spread(density, grid) using those module-level variables, and update
the Pool construction to use initializer=init_worker with
initializer_args=(coords, mass, radius, grid, voxel_size, n_breaks) and map only
the indices (removing functools.partial usage).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
| def get_bead_spread(i, coords, mass, radius, grid, voxel_size, n_breaks): | ||
| # dummy function to call main_density_calc for parallelization | ||
| density = main_density_calc(i, coords, mass, radius, grid, voxel_size, n_breaks) | ||
| spread = calc_bead_spread(density, grid) | ||
| return spread |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
python - <<'PY'
from functools import partial
import inspect
import multiprocessing.pool as mp_pool
import pickle
def worker(i, coords, mass, radius, grid, voxel_size, n_breaks):
return i
coords = b"x" * 5_000_000
mass = [0] * 5000
radius = [0] * 5000
grid = {"cells": list(range(5000))}
bound = partial(worker, coords=coords, mass=mass, radius=radius, grid=grid, voxel_size=4, n_breaks=50)
print("serialized_partial_bytes =", len(pickle.dumps(bound)))
print("--- multiprocessing.pool.Pool._get_tasks ---")
print(inspect.getsource(mp_pool.Pool._get_tasks))
PYRepository: isblab/prism
Length of output: 352
🏁 Script executed:
# Read the file to check if it exists and get size
wc -l src/main.py && head -1 src/main.pyRepository: isblab/prism
Length of output: 90
🏁 Script executed:
# Read the relevant sections of the file
sed -n '80,120p' src/main.py | cat -nRepository: isblab/prism
Length of output: 2045
🏁 Script executed:
# Check for functools import and verify partial is used
grep -n "import\|from" src/main.py | head -20Repository: isblab/prism
Length of output: 573
🏁 Script executed:
# Verify the exact line numbers of Pool usage and get_bead_spread definition
grep -n "def get_bead_spread\|with Pool\|partial(" src/main.pyRepository: isblab/prism
Length of output: 381
Use initializer= instead of binding large objects via partial().
The helper function at line 86 forces line 113 to close over coords, mass, radius, and grid using functools.partial. With multiprocessing.Pool, the bound partial is serialized into every task batch instead of just the bead index i. On large inputs, this ~5 MB serialization overhead per batch can dominate runtime and memory, negating the parallelization speedup.
Load the shared data once per worker via initializer= and keep only the index in the queued argument:
Suggested pattern
+_bead_spread_ctx = {}
+
+def _init_bead_spread_worker(coords, mass, radius, grid, voxel_size, n_breaks):
+ _bead_spread_ctx.update({
+ "coords": coords,
+ "mass": mass,
+ "radius": radius,
+ "grid": grid,
+ "voxel_size": voxel_size,
+ "n_breaks": n_breaks,
+ })
+
-def get_bead_spread(i, coords, mass, radius, grid, voxel_size, n_breaks):
- # dummy function to call main_density_calc for parallelization
- density = main_density_calc(i, coords, mass, radius, grid, voxel_size, n_breaks)
- spread = calc_bead_spread(density, grid)
+def get_bead_spread(i):
+ density = main_density_calc(
+ i,
+ _bead_spread_ctx["coords"],
+ _bead_spread_ctx["mass"],
+ _bead_spread_ctx["radius"],
+ _bead_spread_ctx["grid"],
+ _bead_spread_ctx["voxel_size"],
+ _bead_spread_ctx["n_breaks"],
+ )
+ spread = calc_bead_spread(density, _bead_spread_ctx["grid"])
return spread- with Pool(cores_) as p:
+ with Pool(
+ cores_,
+ initializer=_init_bead_spread_worker,
+ initargs=(coords, mass, radius, grid, args.voxel_size, args.n_breaks),
+ ) as p:
bead_spread = []
- for spread in tqdm.tqdm( p.imap( partial(get_bead_spread, coords=coords, mass=mass, radius=radius, grid=grid, voxel_size=args.voxel_size, n_breaks=args.n_breaks), range(coords.shape[1]), chunksize=chunksize ) ):
+ for spread in tqdm.tqdm( p.imap( get_bead_spread, range(coords.shape[1]), chunksize=chunksize ) ):
bead_spread.append( spread )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/main.py` around lines 86 - 90, get_bead_spread currently forces binding
of large objects via functools.partial which causes huge per-task serialization;
instead add an initializer function (e.g., init_worker) that accepts and stores
module-level globals (coords, mass, radius, grid, voxel_size, n_breaks) for each
worker, change get_bead_spread(i) to only accept the bead index and call
main_density_calc(i, coords, mass, radius, grid, voxel_size, n_breaks) and
calc_bead_spread(density, grid) using those module-level variables, and update
the Pool construction to use initializer=init_worker with
initializer_args=(coords, mass, radius, grid, voxel_size, n_breaks) and map only
the indices (removing functools.partial usage).
| cores_ = min(max(os.cpu_count() - 1, 1), args.cores) | ||
| chunksize, extra = divmod(coords.shape[1], cores_ * 4) | ||
| if extra: | ||
| chunksize += 1 |
There was a problem hiding this comment.
Clamp the derived worker count before chunk sizing.
args.cores is unvalidated here, so --cores 0 or a negative value makes cores_ non-positive and breaks the arithmetic in this block. Clamp the CLI value before taking the minimum.
🩹 Minimal fix
- cores_ = min(max(os.cpu_count() - 1, 1), args.cores)
+ cores_ = min(max(os.cpu_count() - 1, 1), max(args.cores, 1))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/main.py` around lines 106 - 109, The current computation uses args.cores
directly so a zero or negative CLI value makes cores_ non-positive and breaks
chunk sizing; clamp/validate args.cores first (e.g., convert to int and ensure
at least 1, and optionally at most os.cpu_count() - 1), then compute cores_
using that validated value; update the logic around cores_, args.cores,
coords.shape[1], chunksize and extra so cores_ is always >=1 before performing
divmod on coords.shape[1].
|
Regarding the patch suggested by coderabbit, we don't need to use |
Addressing #39
get_bead_spreadwhich is called in parallelchunksizeis determined heuristically similar tomapSummary by CodeRabbit