diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 5e30883e35..a4622eb932 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -131,3 +131,7 @@ harness = false [[bench]] name = "parquet_decode" harness = false + +[[bench]] +name = "array_conversion" +harness = false diff --git a/native/core/benches/array_conversion.rs b/native/core/benches/array_conversion.rs new file mode 100644 index 0000000000..5007bff1ce --- /dev/null +++ b/native/core/benches/array_conversion.rs @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for SparkUnsafeArray to Arrow array conversion. +//! This specifically tests the append_to_builder function used in shuffle read path. + +use arrow::array::builder::{ + Date32Builder, Float64Builder, Int32Builder, Int64Builder, TimestampMicrosecondBuilder, +}; +use arrow::datatypes::{DataType, TimeUnit}; +use comet::execution::shuffle::list::{append_to_builder, SparkUnsafeArray}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; + +const NUM_ELEMENTS: usize = 10000; + +/// Create a SparkUnsafeArray in memory with i32 elements. +/// Layout: +/// - 8 bytes: num_elements (i64) +/// - null bitset: 8 bytes per 64 elements +/// - element data: 4 bytes per element (i32) +fn create_spark_unsafe_array_i32(num_elements: usize, with_nulls: bool) -> Vec { + // Header size: 8 (num_elements) + ceil(num_elements/64) * 8 (null bitset) + let null_bitset_words = num_elements.div_ceil(64); + let header_size = 8 + null_bitset_words * 8; + let data_size = num_elements * 4; // i32 = 4 bytes + let total_size = header_size + data_size; + + let mut buffer = vec![0u8; total_size]; + + // Write num_elements + buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes()); + + // Write null bitset (set every 10th element as null if with_nulls) + if with_nulls { + for i in (0..num_elements).step_by(10) { + let word_idx = i / 64; + let bit_idx = i % 64; + let word_offset = 8 + word_idx * 8; + let current_word = + i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap()); + let new_word = current_word | (1i64 << bit_idx); + buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes()); + } + } + + // Write element data + for i in 0..num_elements { + let offset = header_size + i * 4; + buffer[offset..offset + 4].copy_from_slice(&(i as i32).to_le_bytes()); + } + + buffer +} + +/// Create a SparkUnsafeArray in memory with i64 elements. +fn create_spark_unsafe_array_i64(num_elements: usize, with_nulls: bool) -> Vec { + let null_bitset_words = num_elements.div_ceil(64); + let header_size = 8 + null_bitset_words * 8; + let data_size = num_elements * 8; // i64 = 8 bytes + let total_size = header_size + data_size; + + let mut buffer = vec![0u8; total_size]; + + // Write num_elements + buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes()); + + // Write null bitset + if with_nulls { + for i in (0..num_elements).step_by(10) { + let word_idx = i / 64; + let bit_idx = i % 64; + let word_offset = 8 + word_idx * 8; + let current_word = + i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap()); + let new_word = current_word | (1i64 << bit_idx); + buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes()); + } + } + + // Write element data + for i in 0..num_elements { + let offset = header_size + i * 8; + buffer[offset..offset + 8].copy_from_slice(&(i as i64).to_le_bytes()); + } + + buffer +} + +/// Create a SparkUnsafeArray in memory with f64 elements. +fn create_spark_unsafe_array_f64(num_elements: usize, with_nulls: bool) -> Vec { + let null_bitset_words = num_elements.div_ceil(64); + let header_size = 8 + null_bitset_words * 8; + let data_size = num_elements * 8; // f64 = 8 bytes + let total_size = header_size + data_size; + + let mut buffer = vec![0u8; total_size]; + + // Write num_elements + buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes()); + + // Write null bitset + if with_nulls { + for i in (0..num_elements).step_by(10) { + let word_idx = i / 64; + let bit_idx = i % 64; + let word_offset = 8 + word_idx * 8; + let current_word = + i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap()); + let new_word = current_word | (1i64 << bit_idx); + buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes()); + } + } + + // Write element data + for i in 0..num_elements { + let offset = header_size + i * 8; + buffer[offset..offset + 8].copy_from_slice(&(i as f64).to_le_bytes()); + } + + buffer +} + +fn benchmark_array_conversion(c: &mut Criterion) { + let mut group = c.benchmark_group("spark_unsafe_array_to_arrow"); + + // Benchmark i32 array conversion + for with_nulls in [false, true] { + let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls); + let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); + let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; + + group.bench_with_input( + BenchmarkId::new("i32", null_str), + &(&array, &buffer), + |b, (array, _buffer)| { + b.iter(|| { + let mut builder = Int32Builder::with_capacity(NUM_ELEMENTS); + if with_nulls { + append_to_builder::(&DataType::Int32, &mut builder, array).unwrap(); + } else { + append_to_builder::(&DataType::Int32, &mut builder, array).unwrap(); + } + builder.finish() + }); + }, + ); + } + + // Benchmark i64 array conversion + for with_nulls in [false, true] { + let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls); + let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); + let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; + + group.bench_with_input( + BenchmarkId::new("i64", null_str), + &(&array, &buffer), + |b, (array, _buffer)| { + b.iter(|| { + let mut builder = Int64Builder::with_capacity(NUM_ELEMENTS); + if with_nulls { + append_to_builder::(&DataType::Int64, &mut builder, array).unwrap(); + } else { + append_to_builder::(&DataType::Int64, &mut builder, array).unwrap(); + } + builder.finish() + }); + }, + ); + } + + // Benchmark f64 array conversion + for with_nulls in [false, true] { + let buffer = create_spark_unsafe_array_f64(NUM_ELEMENTS, with_nulls); + let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); + let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; + + group.bench_with_input( + BenchmarkId::new("f64", null_str), + &(&array, &buffer), + |b, (array, _buffer)| { + b.iter(|| { + let mut builder = Float64Builder::with_capacity(NUM_ELEMENTS); + if with_nulls { + append_to_builder::(&DataType::Float64, &mut builder, array).unwrap(); + } else { + append_to_builder::(&DataType::Float64, &mut builder, array) + .unwrap(); + } + builder.finish() + }); + }, + ); + } + + // Benchmark date32 array conversion (same memory layout as i32) + for with_nulls in [false, true] { + let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls); + let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); + let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; + + group.bench_with_input( + BenchmarkId::new("date32", null_str), + &(&array, &buffer), + |b, (array, _buffer)| { + b.iter(|| { + let mut builder = Date32Builder::with_capacity(NUM_ELEMENTS); + if with_nulls { + append_to_builder::(&DataType::Date32, &mut builder, array).unwrap(); + } else { + append_to_builder::(&DataType::Date32, &mut builder, array).unwrap(); + } + builder.finish() + }); + }, + ); + } + + // Benchmark timestamp array conversion (same memory layout as i64) + for with_nulls in [false, true] { + let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls); + let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); + let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; + + group.bench_with_input( + BenchmarkId::new("timestamp", null_str), + &(&array, &buffer), + |b, (array, _buffer)| { + b.iter(|| { + let mut builder = TimestampMicrosecondBuilder::with_capacity(NUM_ELEMENTS); + let dt = DataType::Timestamp(TimeUnit::Microsecond, None); + if with_nulls { + append_to_builder::(&dt, &mut builder, array).unwrap(); + } else { + append_to_builder::(&dt, &mut builder, array).unwrap(); + } + builder.finish() + }); + }, + ); + } + + group.finish(); +} + +fn config() -> Criterion { + Criterion::default() +} + +criterion_group! { + name = benches; + config = config(); + targets = benchmark_array_conversion +} +criterion_main!(benches); diff --git a/native/core/src/execution/shuffle/list.rs b/native/core/src/execution/shuffle/list.rs index c31244b87d..6236bdf5c1 100644 --- a/native/core/src/execution/shuffle/list.rs +++ b/native/core/src/execution/shuffle/list.rs @@ -90,6 +90,304 @@ impl SparkUnsafeArray { (word & mask) != 0 } } + + /// Returns the null bitset pointer (starts at row_addr + 8). + #[inline] + pub(crate) fn null_bitset_ptr(&self) -> *const i64 { + (self.row_addr + 8) as *const i64 + } + + /// Bulk append i32 values to builder. + /// For non-nullable: uses slice append for optimal performance. + /// For nullable: uses pointer iteration with efficient null bitset reading. + pub(crate) fn append_ints_to_builder(&self, builder: &mut Int32Builder) { + let num_elements = self.num_elements; + if num_elements == 0 { + return; + } + + if NULLABLE { + let mut ptr = self.element_offset as *const i32; + let null_words = self.null_bitset_ptr(); + for idx in 0..num_elements { + let word_idx = idx >> 6; + let bit_idx = idx & 0x3f; + let is_null = unsafe { (*null_words.add(word_idx) & (1i64 << bit_idx)) != 0 }; + + if is_null { + builder.append_null(); + } else { + builder.append_value(unsafe { *ptr }); + } + ptr = unsafe { ptr.add(1) }; + } + } else { + // Use slice-based append for non-nullable path (much faster) + let slice = unsafe { + std::slice::from_raw_parts(self.element_offset as *const i32, num_elements) + }; + builder.append_slice(slice); + } + } + + /// Bulk append i64 values to builder. + pub(crate) fn append_longs_to_builder(&self, builder: &mut Int64Builder) { + let num_elements = self.num_elements; + if num_elements == 0 { + return; + } + + if NULLABLE { + let mut ptr = self.element_offset as *const i64; + let null_words = self.null_bitset_ptr(); + for idx in 0..num_elements { + let word_idx = idx >> 6; + let bit_idx = idx & 0x3f; + let is_null = unsafe { (*null_words.add(word_idx) & (1i64 << bit_idx)) != 0 }; + + if is_null { + builder.append_null(); + } else { + builder.append_value(unsafe { *ptr }); + } + ptr = unsafe { ptr.add(1) }; + } + } else { + let slice = unsafe { + std::slice::from_raw_parts(self.element_offset as *const i64, num_elements) + }; + builder.append_slice(slice); + } + } + + /// Bulk append i16 values to builder. + pub(crate) fn append_shorts_to_builder( + &self, + builder: &mut Int16Builder, + ) { + let num_elements = self.num_elements; + if num_elements == 0 { + return; + } + + if NULLABLE { + let mut ptr = self.element_offset as *const i16; + let null_words = self.null_bitset_ptr(); + for idx in 0..num_elements { + let word_idx = idx >> 6; + let bit_idx = idx & 0x3f; + let is_null = unsafe { (*null_words.add(word_idx) & (1i64 << bit_idx)) != 0 }; + + if is_null { + builder.append_null(); + } else { + builder.append_value(unsafe { *ptr }); + } + ptr = unsafe { ptr.add(1) }; + } + } else { + let slice = unsafe { + std::slice::from_raw_parts(self.element_offset as *const i16, num_elements) + }; + builder.append_slice(slice); + } + } + + /// Bulk append i8 values to builder. + pub(crate) fn append_bytes_to_builder(&self, builder: &mut Int8Builder) { + let num_elements = self.num_elements; + if num_elements == 0 { + return; + } + + if NULLABLE { + let mut ptr = self.element_offset as *const i8; + let null_words = self.null_bitset_ptr(); + for idx in 0..num_elements { + let word_idx = idx >> 6; + let bit_idx = idx & 0x3f; + let is_null = unsafe { (*null_words.add(word_idx) & (1i64 << bit_idx)) != 0 }; + + if is_null { + builder.append_null(); + } else { + builder.append_value(unsafe { *ptr }); + } + ptr = unsafe { ptr.add(1) }; + } + } else { + let slice = unsafe { + std::slice::from_raw_parts(self.element_offset as *const i8, num_elements) + }; + builder.append_slice(slice); + } + } + + /// Bulk append f32 values to builder. + pub(crate) fn append_floats_to_builder( + &self, + builder: &mut Float32Builder, + ) { + let num_elements = self.num_elements; + if num_elements == 0 { + return; + } + + if NULLABLE { + let mut ptr = self.element_offset as *const f32; + let null_words = self.null_bitset_ptr(); + for idx in 0..num_elements { + let word_idx = idx >> 6; + let bit_idx = idx & 0x3f; + let is_null = unsafe { (*null_words.add(word_idx) & (1i64 << bit_idx)) != 0 }; + + if is_null { + builder.append_null(); + } else { + builder.append_value(unsafe { *ptr }); + } + ptr = unsafe { ptr.add(1) }; + } + } else { + let slice = unsafe { + std::slice::from_raw_parts(self.element_offset as *const f32, num_elements) + }; + builder.append_slice(slice); + } + } + + /// Bulk append f64 values to builder. + pub(crate) fn append_doubles_to_builder( + &self, + builder: &mut Float64Builder, + ) { + let num_elements = self.num_elements; + if num_elements == 0 { + return; + } + + if NULLABLE { + let mut ptr = self.element_offset as *const f64; + let null_words = self.null_bitset_ptr(); + for idx in 0..num_elements { + let word_idx = idx >> 6; + let bit_idx = idx & 0x3f; + let is_null = unsafe { (*null_words.add(word_idx) & (1i64 << bit_idx)) != 0 }; + + if is_null { + builder.append_null(); + } else { + builder.append_value(unsafe { *ptr }); + } + ptr = unsafe { ptr.add(1) }; + } + } else { + let slice = unsafe { + std::slice::from_raw_parts(self.element_offset as *const f64, num_elements) + }; + builder.append_slice(slice); + } + } + + /// Bulk append boolean values to builder using pointer iteration. + pub(crate) fn append_booleans_to_builder( + &self, + builder: &mut BooleanBuilder, + ) { + let num_elements = self.num_elements; + if num_elements == 0 { + return; + } + + let mut ptr = self.element_offset as *const u8; + + if NULLABLE { + let null_words = self.null_bitset_ptr(); + for idx in 0..num_elements { + let word_idx = idx >> 6; + let bit_idx = idx & 0x3f; + let is_null = unsafe { (*null_words.add(word_idx) & (1i64 << bit_idx)) != 0 }; + + if is_null { + builder.append_null(); + } else { + builder.append_value(unsafe { *ptr != 0 }); + } + ptr = unsafe { ptr.add(1) }; + } + } else { + for _ in 0..num_elements { + builder.append_value(unsafe { *ptr != 0 }); + ptr = unsafe { ptr.add(1) }; + } + } + } + + /// Bulk append timestamp values to builder. + pub(crate) fn append_timestamps_to_builder( + &self, + builder: &mut TimestampMicrosecondBuilder, + ) { + let num_elements = self.num_elements; + if num_elements == 0 { + return; + } + + if NULLABLE { + let mut ptr = self.element_offset as *const i64; + let null_words = self.null_bitset_ptr(); + for idx in 0..num_elements { + let word_idx = idx >> 6; + let bit_idx = idx & 0x3f; + let is_null = unsafe { (*null_words.add(word_idx) & (1i64 << bit_idx)) != 0 }; + + if is_null { + builder.append_null(); + } else { + builder.append_value(unsafe { *ptr }); + } + ptr = unsafe { ptr.add(1) }; + } + } else { + let slice = unsafe { + std::slice::from_raw_parts(self.element_offset as *const i64, num_elements) + }; + builder.append_slice(slice); + } + } + + /// Bulk append date values to builder. + pub(crate) fn append_dates_to_builder( + &self, + builder: &mut Date32Builder, + ) { + let num_elements = self.num_elements; + if num_elements == 0 { + return; + } + + if NULLABLE { + let mut ptr = self.element_offset as *const i32; + let null_words = self.null_bitset_ptr(); + for idx in 0..num_elements { + let word_idx = idx >> 6; + let bit_idx = idx & 0x3f; + let is_null = unsafe { (*null_words.add(word_idx) & (1i64 << bit_idx)) != 0 }; + + if is_null { + builder.append_null(); + } else { + builder.append_value(unsafe { *ptr }); + } + ptr = unsafe { ptr.add(1) }; + } + } else { + let slice = unsafe { + std::slice::from_raw_parts(self.element_offset as *const i32, num_elements) + }; + builder.append_slice(slice); + } + } } pub fn append_to_builder( @@ -112,77 +410,40 @@ pub fn append_to_builder( match data_type { DataType::Boolean => { - add_values!( - BooleanBuilder, - |builder: &mut BooleanBuilder, values: &SparkUnsafeArray, idx: usize| builder - .append_value(values.get_boolean(idx)), - |builder: &mut BooleanBuilder| builder.append_null() - ); + let builder = downcast_builder_ref!(BooleanBuilder, builder); + array.append_booleans_to_builder::(builder); } DataType::Int8 => { - add_values!( - Int8Builder, - |builder: &mut Int8Builder, values: &SparkUnsafeArray, idx: usize| builder - .append_value(values.get_byte(idx)), - |builder: &mut Int8Builder| builder.append_null() - ); + let builder = downcast_builder_ref!(Int8Builder, builder); + array.append_bytes_to_builder::(builder); } DataType::Int16 => { - add_values!( - Int16Builder, - |builder: &mut Int16Builder, values: &SparkUnsafeArray, idx: usize| builder - .append_value(values.get_short(idx)), - |builder: &mut Int16Builder| builder.append_null() - ); + let builder = downcast_builder_ref!(Int16Builder, builder); + array.append_shorts_to_builder::(builder); } DataType::Int32 => { - add_values!( - Int32Builder, - |builder: &mut Int32Builder, values: &SparkUnsafeArray, idx: usize| builder - .append_value(values.get_int(idx)), - |builder: &mut Int32Builder| builder.append_null() - ); + let builder = downcast_builder_ref!(Int32Builder, builder); + array.append_ints_to_builder::(builder); } DataType::Int64 => { - add_values!( - Int64Builder, - |builder: &mut Int64Builder, values: &SparkUnsafeArray, idx: usize| builder - .append_value(values.get_long(idx)), - |builder: &mut Int64Builder| builder.append_null() - ); + let builder = downcast_builder_ref!(Int64Builder, builder); + array.append_longs_to_builder::(builder); } DataType::Float32 => { - add_values!( - Float32Builder, - |builder: &mut Float32Builder, values: &SparkUnsafeArray, idx: usize| builder - .append_value(values.get_float(idx)), - |builder: &mut Float32Builder| builder.append_null() - ); + let builder = downcast_builder_ref!(Float32Builder, builder); + array.append_floats_to_builder::(builder); } DataType::Float64 => { - add_values!( - Float64Builder, - |builder: &mut Float64Builder, values: &SparkUnsafeArray, idx: usize| builder - .append_value(values.get_double(idx)), - |builder: &mut Float64Builder| builder.append_null() - ); + let builder = downcast_builder_ref!(Float64Builder, builder); + array.append_doubles_to_builder::(builder); } DataType::Timestamp(TimeUnit::Microsecond, _) => { - add_values!( - TimestampMicrosecondBuilder, - |builder: &mut TimestampMicrosecondBuilder, - values: &SparkUnsafeArray, - idx: usize| builder.append_value(values.get_timestamp(idx)), - |builder: &mut TimestampMicrosecondBuilder| builder.append_null() - ); + let builder = downcast_builder_ref!(TimestampMicrosecondBuilder, builder); + array.append_timestamps_to_builder::(builder); } DataType::Date32 => { - add_values!( - Date32Builder, - |builder: &mut Date32Builder, values: &SparkUnsafeArray, idx: usize| builder - .append_value(values.get_date(idx)), - |builder: &mut Date32Builder| builder.append_null() - ); + let builder = downcast_builder_ref!(Date32Builder, builder); + array.append_dates_to_builder::(builder); } DataType::Binary => { add_values!( diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index e2798df63e..172dc5f942 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -17,7 +17,7 @@ pub(crate) mod codec; mod comet_partitioning; -mod list; +pub mod list; mod map; pub mod row; mod shuffle_writer;