Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ crate-type = ["lib", "cdylib"]
# lib is needed for the native benchmark

[dependencies]
pyo3 = "0.27.2"
pyo3 = { version = "0.27.2", features = ["multiple-pymethods"] }
shakmaty = "0.30"
pgn-reader = "0.29.0"
nom = "8.0"
Expand Down
166 changes: 61 additions & 105 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,59 @@ mod visitor;
use python_bindings::{ChunkData, ParsedGames, ParsedGamesIter, PyChunkView, PyGameView};
pub use visitor::{Buffers, ParseConfig, parse_game_to_buffers};

/// Shared parallel parsing logic for a slice of PGN strings.
///
/// Both `parse_games` and `parse_games_from_strings` delegate here after
/// extracting their `&str` slices from different input types.
fn parse_str_slices(
py: Python<'_>,
slices: &[&str],
num_threads: usize,
chunk_multiplier: usize,
config: &ParseConfig,
) -> PyResult<ParsedGames> {
let n_games = slices.len();
if n_games == 0 {
let empty_chunk = buffers_to_chunk_data(py, Buffers::default())?;
return build_parsed_games(py, vec![empty_chunk]);
}

let thread_pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"Failed to build thread pool: {}",
e
))
})?;

// num_chunks = num_threads * chunk_multiplier (e.g., 16 threads * 1 = 16 chunks)
let num_chunks = num_threads * chunk_multiplier;
let chunk_size = ((n_games + num_chunks - 1) / num_chunks).max(1);
let moves_per_game = 70;

let chunk_results: Vec<Buffers> = thread_pool.install(|| {
slices
.par_chunks(chunk_size)
.map(|chunk| {
let mut buffers = Buffers::with_capacity(chunk_size, moves_per_game, config);
for &pgn in chunk {
let _ = parse_game_to_buffers(pgn, &mut buffers, config);
}
buffers
})
.collect()
});

let chunk_data_vec: Vec<ChunkData> = chunk_results
.into_iter()
.map(|buf| buffers_to_chunk_data(py, buf))
.collect::<PyResult<Vec<_>>>()?;

build_parsed_games(py, chunk_data_vec)
}

/// Parse games from Arrow chunked array into a chunked ParsedGames container.
///
/// This implementation uses explicit chunking with a fixed number of chunks
Expand Down Expand Up @@ -43,11 +96,7 @@ fn parse_games(
let chunk_multiplier = chunk_multiplier.unwrap_or(1);

// Extract PGN strings from Arrow chunks
let mut num_elements = 0;
for chunk in pgn_chunked_array.chunks() {
num_elements += chunk.len();
}

let num_elements: usize = pgn_chunked_array.chunks().iter().map(|c| c.len()).sum();
let mut pgn_str_slices: Vec<&str> = Vec::with_capacity(num_elements);
for chunk in pgn_chunked_array.chunks() {
if let Some(string_array) = chunk.as_any().downcast_ref::<StringArray>() {
Expand All @@ -70,56 +119,7 @@ fn parse_games(
}
}

let n_games = pgn_str_slices.len();
if n_games == 0 {
let empty_chunk = buffers_to_chunk_data(py, Buffers::default())?;
return build_parsed_games(py, vec![empty_chunk]);
}

// Build thread pool
let thread_pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"Failed to build thread pool: {}",
e
))
})?;

// Calculate chunk size for explicit chunking.
// num_chunks = num_threads * chunk_multiplier (e.g., 16 threads * 1 = 16 chunks)
let num_chunks = num_threads * chunk_multiplier;
let chunk_size = (n_games + num_chunks - 1) / num_chunks; // ceiling division
let chunk_size = chunk_size.max(1); // ensure at least 1 game per chunk

// Estimate capacity per chunk
let games_per_chunk = chunk_size;
let moves_per_game = 70;

// Parse in parallel using par_chunks for explicit, fixed-size chunking.
// This creates exactly ceil(n_games / chunk_size) Buffers instances,
// avoiding the allocation storm from Rayon's dynamic work-stealing.
let chunk_results: Vec<Buffers> = thread_pool.install(|| {
pgn_str_slices
.par_chunks(chunk_size)
.map(|chunk| {
let mut buffers = Buffers::with_capacity(games_per_chunk, moves_per_game, &config);
for &pgn in chunk {
let _ = parse_game_to_buffers(pgn, &mut buffers, &config);
}
buffers
})
.collect()
});

// Convert each Buffers to ChunkData (numpy arrays) — no merge needed
let chunk_data_vec: Vec<ChunkData> = chunk_results
.into_iter()
.map(|buf| buffers_to_chunk_data(py, buf))
.collect::<PyResult<Vec<_>>>()?;

build_parsed_games(py, chunk_data_vec)
parse_str_slices(py, &pgn_str_slices, num_threads, chunk_multiplier, &config)
}

/// Convert a single Buffers into a ChunkData with NumPy arrays.
Expand Down Expand Up @@ -169,15 +169,9 @@ fn buffers_to_chunk_data(py: Python<'_>, buffers: Buffers) -> PyResult<ChunkData
let is_stalemate_array = PyArray1::from_vec(py, buffers.is_stalemate);

let is_insufficient_array = PyArray1::from_vec(py, buffers.is_insufficient);
let is_insufficient_reshaped = if n_games > 0 {
is_insufficient_array
.reshape([n_games, 2])
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?
} else {
is_insufficient_array
.reshape([0, 2])
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?
};
let is_insufficient_reshaped = is_insufficient_array
.reshape([n_games, 2])
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;

let legal_move_count_array = PyArray1::from_vec(py, buffers.legal_move_count);
let valid_array = PyArray1::from_vec(py, buffers.valid);
Expand Down Expand Up @@ -207,6 +201,7 @@ fn buffers_to_chunk_data(py: Python<'_>, buffers: Buffers) -> PyResult<ChunkData
valid: valid_array.unbind().into_any(),
headers: buffers.headers,
outcome: buffers.outcome,
parse_errors: buffers.parse_errors,
comments: buffers.comments,
legal_move_from_squares: legal_move_from_squares_array.unbind().into_any(),
legal_move_to_squares: legal_move_to_squares_array.unbind().into_any(),
Expand Down Expand Up @@ -336,47 +331,8 @@ fn parse_games_from_strings(
store_legal_moves,
};
let num_threads = num_threads.unwrap_or_else(num_cpus::get);

let n_games = pgns.len();
if n_games == 0 {
let empty_chunk = buffers_to_chunk_data(py, Buffers::default())?;
return build_parsed_games(py, vec![empty_chunk]);
}

let thread_pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"Failed to build thread pool: {}",
e
))
})?;

let num_chunks = num_threads;
let chunk_size = (n_games + num_chunks - 1) / num_chunks;
let chunk_size = chunk_size.max(1);
let games_per_chunk = chunk_size;
let moves_per_game = 70;

let chunk_results: Vec<Buffers> = thread_pool.install(|| {
pgns.par_chunks(chunk_size)
.map(|chunk| {
let mut buffers = Buffers::with_capacity(games_per_chunk, moves_per_game, &config);
for pgn in chunk {
let _ = parse_game_to_buffers(pgn, &mut buffers, &config);
}
buffers
})
.collect()
});

let chunk_data_vec: Vec<ChunkData> = chunk_results
.into_iter()
.map(|buf| buffers_to_chunk_data(py, buf))
.collect::<PyResult<Vec<_>>>()?;

build_parsed_games(py, chunk_data_vec)
let str_slices: Vec<&str> = pgns.iter().map(|s| s.as_str()).collect();
parse_str_slices(py, &str_slices, num_threads, 1, &config)
}

/// Parser for chess PGN notation
Expand Down
Loading