Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 191 additions & 66 deletions arrow-string/src/concat_elements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
// under the License.

//! Provides utility functions for concatenation of elements in arrays.

use std::marker::PhantomData;
use std::sync::Arc;

use arrow_array::builder::{
BinaryViewBuilder, BufferBuilder, FixedSizeBinaryBuilder, StringViewBuilder,
};
use arrow_array::types::ByteArrayType;
use arrow_array::builder::{BufferBuilder, FixedSizeBinaryBuilder, make_view};
use arrow_array::types::{ByteArrayType, ByteViewType};
use arrow_array::*;
use arrow_buffer::{ArrowNativeType, MutableBuffer, NullBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayDataBuilder, MAX_INLINE_VIEW_LEN};
use arrow_schema::{ArrowError, DataType};

/// Returns the elementwise concatenation of a [`GenericByteArray`].
Expand Down Expand Up @@ -221,6 +221,170 @@ pub fn concat_elements_fixed_size_binary(
Ok(result.finish())
}

struct ConcatByteViewBuilder<T>
where
T: ByteViewType,
{
views: Vec<u128>,
data: Vec<u8>,
inline: Vec<u8>,
phantom: PhantomData<T>,
}

impl<T> ConcatByteViewBuilder<T>
where
T: ByteViewType,
{
/// Returns the elementwise concatenation of two [`GenericByteViewArray`]s.
fn concat_elements_view_array(
left: &GenericByteViewArray<T>,
right: &GenericByteViewArray<T>,
) -> Result<GenericByteViewArray<T>, ArrowError> {
let len = left.len();
if len != right.len() {
return Err(ArrowError::ComputeError(format!(
"Arrays must have the same length: {} != {}",
len,
right.len()
)));
}

let mut null_buffer = NullBuffer::union(left.nulls(), right.nulls());
if let Some(n) = &null_buffer {
if n.null_count() == 0 {
null_buffer = None
}
}

match null_buffer {
None => {
let data_size = left
.lengths()
.zip(right.lengths())
.map(|(l, r)| l + r)
.filter(|len| *len > MAX_INLINE_VIEW_LEN)
.map(|len| len as usize)
.sum();

if data_size > u32::MAX as usize {
return Err(ArrowError::ArithmeticOverflow(
"byte array offset overflow".to_string(),
));
}

let mut builder = Self::with_capacity(len, data_size);
for (l, r) in left.bytes_iter().zip(right.bytes_iter()) {
builder.append_concat_view(l, r);
}
builder.finish(None)
}
Some(nulls) => {
let data_size = left
.lengths()
.zip(right.lengths())
.zip(nulls.iter())
.filter(|((_, _), not_null)| *not_null)
.map(|((l, r), _)| l + r)
.filter(|len| *len > MAX_INLINE_VIEW_LEN)
.map(|len| len as usize)
.sum();

if data_size > u32::MAX as usize {
return Err(ArrowError::ArithmeticOverflow(
"byte array offset overflow".to_string(),
));
}

let mut builder = Self::with_capacity(len, data_size);
for ((l, r), not_null) in
left.bytes_iter().zip(right.bytes_iter()).zip(nulls.iter())
{
if not_null {
builder.append_concat_view(l, r);
} else {
builder.append_empty_view();
}
}

builder.finish(Some(nulls))
}
}
}

fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
Self {
views: Vec::with_capacity(item_capacity),
data: Vec::with_capacity(data_capacity),
inline: Vec::with_capacity(MAX_INLINE_VIEW_LEN as usize),
phantom: PhantomData,
}
}

/// Append a view containing the concatenation of `left` and `right`.
fn append_concat_view(&mut self, left: &[u8], right: &[u8]) {
let total_len = left.len() + right.len();
if total_len > MAX_INLINE_VIEW_LEN as usize {
let offset = self.data.len();

// SAFETY: we've checked that the total data size is within u32::MAX
// so offset cannot exceed it.
// Not using `u32::try_from` on each insertion makes a ~5% difference
// in benchmarking
debug_assert!(offset <= u32::MAX as usize);
let view_offset: u32 = offset as u32;

self.data.extend_from_slice(left);
self.data.extend_from_slice(right);
self.views
.push(make_view(&self.data[offset..], 0, view_offset));
} else {
self.inline.extend_from_slice(left);
self.inline.extend_from_slice(right);
self.views.push(make_view(&self.inline, 0, 0));
self.inline.clear();
};
}

/// Append an empty view.
#[inline]
fn append_empty_view(&mut self) {
self.views.push(0);
}

fn finish(
self,
null_buffer: Option<NullBuffer>,
) -> Result<GenericByteViewArray<T>, ArrowError> {
if let Some(ref nulls) = null_buffer {
if nulls.len() != self.views.len() {
return Err(ArrowError::ComputeError(format!(
"Null buffer length ({}) must match row count ({})",
nulls.len(),
self.views.len()
)));
}
}

let buffers = if self.data.is_empty() {
Arc::from([])
} else {
Arc::from([Buffer::from(self.data)])
};

// SAFETY: views were constructed with correct lengths, offsets, and
// prefixes. UTF-8 validity is implicitly guaranteed by never concatenating
// arrays with mixed ByteViewTypes.
let array = unsafe {
GenericByteViewArray::<T>::new_unchecked(
ScalarBuffer::from(self.views),
buffers,
null_buffer,
)
};
Ok(array)
}
}

/// Concatenates two `BinaryViewArray`s element-wise.
/// If either element is `Null`, the result element is also `Null`.
///
Expand All @@ -231,32 +395,7 @@ pub fn concat_elements_binary_view_array(
left: &BinaryViewArray,
right: &BinaryViewArray,
) -> Result<BinaryViewArray, ArrowError> {
if left.len() != right.len() {
return Err(ArrowError::ComputeError(format!(
"Arrays must have the same length: {} != {}",
left.len(),
right.len()
)));
}
let mut result = BinaryViewBuilder::with_capacity(left.len());

// Avoid reallocations by writing to a reused buffer
let mut buffer = MutableBuffer::new(0);

// Pre-compute combined null bitmap, so the per-row NULL check is efficient
let nulls = NullBuffer::union(left.nulls(), right.nulls());

for i in 0..left.len() {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
result.append_null();
} else {
buffer.clear();
buffer.extend_from_slice(left.value(i));
buffer.extend_from_slice(right.value(i));
result.try_append_value(&buffer)?;
}
}
Ok(result.finish())
ConcatByteViewBuilder::concat_elements_view_array(left, right)
}

/// Concatenates two `StringViewArray`s element-wise.
Expand All @@ -266,41 +405,11 @@ pub fn concat_elements_binary_view_array(
/// - Returns an error if the input arrays have different lengths.
/// - Returns an error if any concatenated value exceeds `u32::MAX` in length.
/// - Returns an error if concatenated strings do not result in a proper UTF-8 string
// Cannot reuse code with `GenericByteViewBuilder` since `try_append_value` works with
// `AsRef<T::Native>`, and there is no conversion from `ByteViewType` to this or [u8]
pub fn concat_elements_string_view_array(
left: &StringViewArray,
right: &StringViewArray,
) -> Result<StringViewArray, ArrowError> {
if left.len() != right.len() {
return Err(ArrowError::ComputeError(format!(
"Arrays must have the same length: {} != {}",
left.len(),
right.len()
)));
}

let mut result = StringViewBuilder::with_capacity(left.len());

// Avoid reallocations by writing to a reused buffer
let mut buffer: Vec<u8> = Vec::new();

let nulls = NullBuffer::union(left.nulls(), right.nulls());

for i in 0..left.len() {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
result.append_null();
} else {
buffer.clear();
buffer.extend_from_slice(left.value(i).as_bytes());
buffer.extend_from_slice(right.value(i).as_bytes());
let s = std::str::from_utf8(&buffer).map_err(|_| {
ArrowError::ComputeError("Concatenated values are not valid UTF-8".into())
})?;
result.try_append_value(s)?;
}
}
Ok(result.finish())
ConcatByteViewBuilder::concat_elements_view_array(left, right)
}

/// Returns the elementwise concatenation of [`Array`]s.
Expand Down Expand Up @@ -574,12 +683,28 @@ mod tests {

#[test]
fn test_string_view_concat() {
let left = StringViewArray::from_iter(vec![Some("foo"), Some("bar"), None]);
let right = StringViewArray::from_iter(vec![None, Some("yyy"), Some("zzz")]);
let left =
StringViewArray::from_iter(vec![Some("foo"), Some("bar"), None, Some("foofoofoo")]);
let right =
StringViewArray::from_iter(vec![None, Some("yyy"), Some("zzz"), Some("barbarbar")]);

let output = concat_elements_string_view_array(&left, &right).unwrap();

let expected = StringViewArray::from_iter(vec![
None,
Some("baryyy"),
None,
Some("foofoofoobarbarbar"),
]);
assert_eq!(output, expected);

let left = StringViewArray::from_iter(vec![Some("a"), Some("b"), Some("foofoofoo")]);
let right = StringViewArray::from_iter(vec![Some("c"), Some("d"), Some("barbarbar")]);

let output = concat_elements_string_view_array(&left, &right).unwrap();

let expected = StringViewArray::from_iter(vec![None, Some("baryyy"), None]);
let expected =
StringViewArray::from_iter(vec![Some("ac"), Some("bd"), Some("foofoofoobarbarbar")]);
assert_eq!(output, expected);
}

Expand Down
5 changes: 5 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ required-features = ["test_utils"]
name = "array_slice"
harness = false

[[bench]]
name = "concatenate_elements"
harness = false
required-features = ["test_utils"]

[[bench]]
name = "concatenate_kernel"
harness = false
Expand Down
Loading
Loading