diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 5e30883e35..8799fcad4d 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -131,3 +131,7 @@ harness = false [[bench]] name = "parquet_decode" harness = false + +[[bench]] +name = "struct_conversion" +harness = false diff --git a/native/core/benches/struct_conversion.rs b/native/core/benches/struct_conversion.rs new file mode 100644 index 0000000000..953978e263 --- /dev/null +++ b/native/core/benches/struct_conversion.rs @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmark for struct column processing in native shuffle. +//! +//! This benchmark measures the performance of converting Spark UnsafeRow +//! with struct columns to Arrow arrays. + +use arrow::datatypes::{DataType, Field, Fields}; +use comet::execution::shuffle::row::{ + process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow, +}; +use comet::execution::shuffle::CompressionCodec; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use tempfile::Builder; + +const BATCH_SIZE: usize = 5000; + +/// Create a struct schema with the given number of int64 fields. +fn make_struct_schema(num_fields: usize) -> DataType { + let fields: Vec = (0..num_fields) + .map(|i| Field::new(format!("f{}", i), DataType::Int64, true)) + .collect(); + DataType::Struct(Fields::from(fields)) +} + +/// Calculate the row size for a struct with the given number of fields. +/// UnsafeRow layout: [null bits] [fixed-length values] +/// For struct: the struct value is stored as offset+size (8 bytes) pointing to nested row +fn get_row_size(num_struct_fields: usize) -> usize { + // Top-level row has 1 column (the struct) + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + // Struct pointer (offset + size) is 8 bytes + let struct_pointer_size = 8; + // Nested struct row + let nested_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_struct_fields); + let nested_data_size = num_struct_fields * 8; // int64 values + + top_level_bitset_width + struct_pointer_size + nested_bitset_width + nested_data_size +} + +struct RowData { + data: Vec, +} + +impl RowData { + fn new(num_struct_fields: usize) -> Self { + let row_size = get_row_size(num_struct_fields); + let mut data = vec![0u8; row_size]; + + // Top-level row layout: + // [null bits for 1 field] [struct pointer (offset, size)] + let top_level_bitset_width = SparkUnsafeRow::get_row_bitset_width(1); + + // Nested struct starts after top-level row header + pointer + let nested_offset = top_level_bitset_width + 8; + let nested_bitset_width = SparkUnsafeRow::get_row_bitset_width(num_struct_fields); + let nested_size = nested_bitset_width + num_struct_fields * 8; + + // Write struct pointer (offset in upper 32 bits, size in lower 32 bits) + let offset_and_size = ((nested_offset as i64) << 32) | (nested_size as i64); + data[top_level_bitset_width..top_level_bitset_width + 8] + .copy_from_slice(&offset_and_size.to_le_bytes()); + + // Fill nested struct with some data + for i in 0..num_struct_fields { + let value_offset = nested_offset + nested_bitset_width + i * 8; + let value = (i as i64) * 100; + data[value_offset..value_offset + 8].copy_from_slice(&value.to_le_bytes()); + } + + RowData { data } + } + + fn to_spark_row(&self, spark_row: &mut SparkUnsafeRow) { + spark_row.point_to_slice(&self.data); + } +} + +fn benchmark_struct_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("struct_conversion"); + + // Test with different struct sizes and row counts + for num_fields in [5, 10, 20] { + for num_rows in [1000, 10000] { + let schema = vec![make_struct_schema(num_fields)]; + + // Create row data + let rows: Vec = (0..num_rows).map(|_| RowData::new(num_fields)).collect(); + + let spark_rows: Vec = rows + .iter() + .map(|row_data| { + let mut spark_row = SparkUnsafeRow::new_with_num_fields(1); + row_data.to_spark_row(&mut spark_row); + // Mark the struct column as not null + spark_row.set_not_null_at(0); + spark_row + }) + .collect(); + + let mut row_addresses: Vec = + spark_rows.iter().map(|row| row.get_row_addr()).collect(); + let mut row_sizes: Vec = spark_rows.iter().map(|row| row.get_row_size()).collect(); + + let row_address_ptr = row_addresses.as_mut_ptr(); + let row_size_ptr = row_sizes.as_mut_ptr(); + + group.bench_with_input( + BenchmarkId::new( + format!("fields_{}", num_fields), + format!("rows_{}", num_rows), + ), + &(num_rows, &schema), + |b, (num_rows, schema)| { + b.iter(|| { + let tempfile = Builder::new().tempfile().unwrap(); + + process_sorted_row_partition( + *num_rows, + BATCH_SIZE, + row_address_ptr, + row_size_ptr, + schema, + tempfile.path().to_str().unwrap().to_string(), + 1.0, + false, + 0, + None, + &CompressionCodec::Zstd(1), + ) + .unwrap(); + }); + }, + ); + + // Keep spark_rows alive for the benchmark + std::mem::drop(spark_rows); + } + } + + group.finish(); +} + +fn config() -> Criterion { + Criterion::default() +} + +criterion_group! { + name = benches; + config = config(); + targets = benchmark_struct_conversion +} +criterion_main!(benches); diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index 821607ddb9..73a126ff67 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -439,6 +439,205 @@ pub(crate) fn append_field( Ok(()) } +/// Appends struct fields to the struct builder using field-major order. +/// This processes one field at a time across all rows, which moves type dispatch +/// outside the row loop (O(fields) dispatches instead of O(rows × fields)). +#[allow(clippy::redundant_closure_call, clippy::too_many_arguments)] +fn append_struct_fields_field_major( + row_addresses_ptr: *mut jlong, + row_sizes_ptr: *mut jint, + row_start: usize, + row_end: usize, + parent_row: &mut SparkUnsafeRow, + column_idx: usize, + struct_builder: &mut StructBuilder, + fields: &arrow::datatypes::Fields, +) -> Result<(), CometError> { + let num_rows = row_end - row_start; + let num_fields = fields.len(); + + // First pass: Build struct validity and collect which structs are null + // We use a Vec for simplicity; could use a bitset for better memory + let mut struct_is_null = Vec::with_capacity(num_rows); + + for i in row_start..row_end { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + parent_row.point_to(row_addr, row_size); + + let is_null = parent_row.is_null_at(column_idx); + struct_is_null.push(is_null); + + if is_null { + struct_builder.append_null(); + } else { + struct_builder.append(true); + } + } + + // Helper macro for processing primitive fields + macro_rules! process_field { + ($builder_type:ty, $field_idx:expr, $get_value:expr) => {{ + let field_builder = struct_builder + .field_builder::<$builder_type>($field_idx) + .unwrap(); + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + // Struct is null, field is also null + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + parent_row.point_to(row_addr, row_size); + let nested_row = parent_row.get_struct(column_idx, num_fields); + + if nested_row.is_null_at($field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value($get_value(&nested_row, $field_idx)); + } + } + } + }}; + } + + // Second pass: Process each field across all rows + for (field_idx, field) in fields.iter().enumerate() { + match field.data_type() { + DataType::Boolean => { + process_field!(BooleanBuilder, field_idx, |row: &SparkUnsafeRow, idx| row + .get_boolean(idx)); + } + DataType::Int8 => { + process_field!(Int8Builder, field_idx, |row: &SparkUnsafeRow, idx| row + .get_byte(idx)); + } + DataType::Int16 => { + process_field!(Int16Builder, field_idx, |row: &SparkUnsafeRow, idx| row + .get_short(idx)); + } + DataType::Int32 => { + process_field!(Int32Builder, field_idx, |row: &SparkUnsafeRow, idx| row + .get_int(idx)); + } + DataType::Int64 => { + process_field!(Int64Builder, field_idx, |row: &SparkUnsafeRow, idx| row + .get_long(idx)); + } + DataType::Float32 => { + process_field!(Float32Builder, field_idx, |row: &SparkUnsafeRow, idx| row + .get_float(idx)); + } + DataType::Float64 => { + process_field!(Float64Builder, field_idx, |row: &SparkUnsafeRow, idx| row + .get_double(idx)); + } + DataType::Date32 => { + process_field!(Date32Builder, field_idx, |row: &SparkUnsafeRow, idx| row + .get_date(idx)); + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + process_field!( + TimestampMicrosecondBuilder, + field_idx, + |row: &SparkUnsafeRow, idx| row.get_timestamp(idx) + ); + } + DataType::Binary => { + let field_builder = struct_builder + .field_builder::(field_idx) + .unwrap(); + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + parent_row.point_to(row_addr, row_size); + let nested_row = parent_row.get_struct(column_idx, num_fields); + + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_binary(field_idx)); + } + } + } + } + DataType::Utf8 => { + let field_builder = struct_builder + .field_builder::(field_idx) + .unwrap(); + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + parent_row.point_to(row_addr, row_size); + let nested_row = parent_row.get_struct(column_idx, num_fields); + + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_string(field_idx)); + } + } + } + } + DataType::Decimal128(p, _) => { + let p = *p; + let field_builder = struct_builder + .field_builder::(field_idx) + .unwrap(); + + for (row_idx, i) in (row_start..row_end).enumerate() { + if struct_is_null[row_idx] { + field_builder.append_null(); + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + parent_row.point_to(row_addr, row_size); + let nested_row = parent_row.get_struct(column_idx, num_fields); + + if nested_row.is_null_at(field_idx) { + field_builder.append_null(); + } else { + field_builder.append_value(nested_row.get_decimal(field_idx, p)); + } + } + } + } + // For complex types (struct, list, map), fall back to append_field + // since they have their own nested processing logic + dt @ (DataType::Struct(_) | DataType::List(_) | DataType::Map(_, _)) => { + for (row_idx, i) in (row_start..row_end).enumerate() { + let nested_row = if struct_is_null[row_idx] { + SparkUnsafeRow::default() + } else { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + parent_row.point_to(row_addr, row_size); + parent_row.get_struct(column_idx, num_fields) + }; + append_field(dt, struct_builder, &nested_row, field_idx)?; + } + } + _ => { + unreachable!( + "Unsupported data type of struct field: {:?}", + field.data_type() + ) + } + } + } + + Ok(()) +} + /// Appends column of top rows to the given array builder. #[allow(clippy::redundant_closure_call, clippy::too_many_arguments)] pub(crate) fn append_columns( @@ -637,27 +836,17 @@ pub(crate) fn append_columns( .expect("StructBuilder"); let mut row = SparkUnsafeRow::new(schema); - for i in row_start..row_end { - let row_addr = unsafe { *row_addresses_ptr.add(i) }; - let row_size = unsafe { *row_sizes_ptr.add(i) }; - row.point_to(row_addr, row_size); - - let is_null = row.is_null_at(column_idx); - - let nested_row = if is_null { - // The struct is null. - // Append a null value to the struct builder and field builders. - struct_builder.append_null(); - SparkUnsafeRow::default() - } else { - struct_builder.append(true); - row.get_struct(column_idx, fields.len()) - }; - - for (idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), struct_builder, &nested_row, idx)?; - } - } + // Use field-major processing to avoid per-row type dispatch + append_struct_fields_field_major( + row_addresses_ptr, + row_sizes_ptr, + row_start, + row_end, + &mut row, + column_idx, + struct_builder, + fields, + )?; } _ => { unreachable!("Unsupported data type of column: {:?}", dt)