Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ jobs:
- uses: actions/checkout@v5
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
- name: Install cargo-msrv
run: cargo install cargo-msrv
- name: Install cargo-msrv (if needed)
# cargo-msrv binary may be cached by the cargo cache step in setup-builder, and cargo install will error if it is already installed
run: if which cargo-msrv ; then echo "using existing cargo-msrv binary" ; else cargo install cargo-msrv ; fi
- name: Check all packages
run: |
# run `cargo msrv verify --manifest-path "path/to/Cargo.toml"` to see problematic dependencies
Expand Down
2 changes: 1 addition & 1 deletion parquet-variant-compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ arrow-schema = { workspace = true }
half = { version = "2.1", default-features = false }
parquet-variant = { workspace = true }
parquet-variant-json = { workspace = true }
chrono = {workspace = true}
chrono = { workspace = true }

[lib]
name = "parquet_variant_compute"
Expand Down
152 changes: 150 additions & 2 deletions parquet-variant-compute/src/cast_to_variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ use arrow::array::{
use arrow::datatypes::{
i256, BinaryType, BinaryViewType, Decimal128Type, Decimal256Type, Decimal32Type, Decimal64Type,
Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
LargeBinaryType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
LargeBinaryType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_s_to_datetime,
timestamp_us_to_datetime,
};
use arrow_schema::{ArrowError, DataType, TimeUnit};
use chrono::NaiveTime;
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use half::f16;
use parquet_variant::{
Expand Down Expand Up @@ -353,6 +355,75 @@ pub fn cast_to_variant(input: &dyn Array) -> Result<VariantArray, ArrowError> {
DataType::Timestamp(time_unit, time_zone) => {
convert_timestamp(time_unit, time_zone, input, &mut builder);
}
DataType::Time32(unit) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic looks good to me -- thank you

match *unit {
TimeUnit::Second => {
generic_conversion!(
Time32SecondType,
as_primitive,
// nano second are always 0
|v| NaiveTime::from_num_seconds_from_midnight_opt(v as u32, 0u32).unwrap(),
input,
builder
);
}
TimeUnit::Millisecond => {
generic_conversion!(
Time32MillisecondType,
as_primitive,
|v| NaiveTime::from_num_seconds_from_midnight_opt(
v as u32 / 1000,
(v as u32 % 1000) * 1_000_000
)
.unwrap(),
input,
builder
);
}
_ => {
return Err(ArrowError::CastError(format!(
"Unsupported Time32 unit: {:?}",
unit
)));
}
};
}
DataType::Time64(unit) => {
match *unit {
TimeUnit::Microsecond => {
generic_conversion!(
Time64MicrosecondType,
as_primitive,
|v| NaiveTime::from_num_seconds_from_midnight_opt(
(v / 1_000_000) as u32,
(v % 1_000_000 * 1_000) as u32
)
.unwrap(),
input,
builder
);
}
TimeUnit::Nanosecond => {
generic_conversion!(
Time64NanosecondType,
as_primitive,
|v| NaiveTime::from_num_seconds_from_midnight_opt(
(v / 1_000_000_000) as u32,
(v % 1_000_000_000) as u32
)
.unwrap(),
input,
builder
);
}
_ => {
return Err(ArrowError::CastError(format!(
"Unsupported Time64 unit: {:?}",
unit
)));
}
};
}
DataType::Interval(_) => {
return Err(ArrowError::InvalidArgumentError(
"Casting interval types to Variant is not supported. \
Expand Down Expand Up @@ -435,7 +506,8 @@ mod tests {
Decimal64Array, FixedSizeBinaryBuilder, Float16Array, Float32Array, Float64Array,
GenericByteBuilder, GenericByteViewBuilder, Int16Array, Int32Array, Int64Array, Int8Array,
IntervalYearMonthArray, LargeStringArray, NullArray, StringArray, StringViewArray,
StructArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
StructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::buffer::NullBuffer;
use arrow_schema::{Field, Fields};
Expand Down Expand Up @@ -1241,6 +1313,82 @@ mod tests {
)
}

#[test]
fn test_cast_time32_second_to_variant_time() {
let array: Time32SecondArray = vec![Some(1), Some(86_399), None].into();
let values = Arc::new(array);
run_test(
values,
vec![
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(1, 0).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(86_399, 0).unwrap(),
)),
None,
],
)
}

#[test]
fn test_cast_time32_millisecond_to_variant_time() {
let array: Time32MillisecondArray = vec![Some(123_456), Some(456_000), None].into();
let values = Arc::new(array);
run_test(
values,
vec![
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(123, 456_000_000).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(456, 0).unwrap(),
)),
None,
],
)
}

#[test]
fn test_cast_time64_micro_to_variant_time() {
let array: Time64MicrosecondArray = vec![Some(1), Some(123_456_789), None].into();
let values = Arc::new(array);
run_test(
values,
vec![
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(0, 1_000).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(123, 456_789_000).unwrap(),
)),
None,
],
)
}

#[test]
fn test_cast_time64_nano_to_variant_time() {
let array: Time64NanosecondArray =
vec![Some(1), Some(1001), Some(123_456_789_012), None].into();
run_test(
Arc::new(array),
// as we can only present with micro second, so the nano second will round donw to 0
vec![
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(0, 0).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(0, 1_000).unwrap(),
)),
Some(Variant::Time(
NaiveTime::from_num_seconds_from_midnight_opt(123, 456_789_000).unwrap(),
)),
None,
],
)
}

#[test]
fn test_cast_to_variant_utf8() {
// Test with short strings (should become ShortString variants)
Expand Down
33 changes: 30 additions & 3 deletions parquet-variant-json/src/to_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
//! Module for converting Variant data to JSON format
use arrow_schema::ArrowError;
use base64::{engine::general_purpose, Engine as _};
use chrono::Timelike;
use parquet_variant::{Variant, VariantList, VariantObject};
use serde_json::Value;
use std::io::Write;

use parquet_variant::{Variant, VariantList, VariantObject};

// Format string constants to avoid duplication and reduce errors
const DATE_FORMAT: &str = "%Y-%m-%d";
const TIMESTAMP_NTZ_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.6f";
Expand All @@ -40,6 +40,19 @@ fn format_binary_base64(bytes: &[u8]) -> String {
general_purpose::STANDARD.encode(bytes)
}

fn format_time_ntz_str(time: &chrono::NaiveTime) -> String {
let base = time.format("%H:%M:%S").to_string();
let micros = time.nanosecond() / 1000;
match micros {
0 => format!("{}.{}", base, 0),
_ => {
let micros_str = format!("{:06}", micros);
let micros_str_trimmed = micros_str.trim_matches('0');
format!("{}.{}", base, micros_str_trimmed)
}
}
}

///
/// This function writes JSON directly to any type that implements [`Write`],
/// making it efficient for streaming or when you want to control the output destination.
Expand Down Expand Up @@ -110,6 +123,7 @@ pub fn variant_to_json(json_buffer: &mut impl Write, variant: &Variant) -> Resul
Variant::TimestampNtzMicros(ts) => {
write!(json_buffer, "\"{}\"", format_timestamp_ntz_string(ts))?
}
Variant::Time(time) => write!(json_buffer, "\"{}\"", format_time_ntz_str(time))?,
Variant::Binary(bytes) => {
// Encode binary as base64 string
let base64_str = format_binary_base64(bytes);
Expand Down Expand Up @@ -348,6 +362,7 @@ pub fn variant_to_json_value(variant: &Variant) -> Result<Value, ArrowError> {
Variant::Date(date) => Ok(Value::String(format_date_string(date))),
Variant::TimestampMicros(ts) => Ok(Value::String(ts.to_rfc3339())),
Variant::TimestampNtzMicros(ts) => Ok(Value::String(format_timestamp_ntz_string(ts))),
Variant::Time(time) => Ok(Value::String(format_time_ntz_str(time))),
Variant::Binary(bytes) => Ok(Value::String(format_binary_base64(bytes))),
Variant::String(s) => Ok(Value::String(s.to_string())),
Variant::ShortString(s) => Ok(Value::String(s.to_string())),
Expand All @@ -371,7 +386,7 @@ pub fn variant_to_json_value(variant: &Variant) -> Result<Value, ArrowError> {
#[cfg(test)]
mod tests {
use super::*;
use chrono::{DateTime, NaiveDate, Utc};
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use parquet_variant::{VariantDecimal16, VariantDecimal4, VariantDecimal8};

#[test]
Expand Down Expand Up @@ -457,6 +472,18 @@ mod tests {
Ok(())
}

#[test]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

fn test_time_to_json() -> Result<(), ArrowError> {
let naive_time = NaiveTime::from_num_seconds_from_midnight_opt(12345, 123460708).unwrap();
let variant = Variant::Time(naive_time);
let json = variant_to_json_string(&variant)?;
assert_eq!("\"03:25:45.12346\"", json);

let json_value = variant_to_json_value(&variant)?;
assert!(matches!(json_value, Value::String(_)));
Ok(())
}

#[test]
fn test_binary_to_json() -> Result<(), ArrowError> {
let binary_data = b"Hello, World!";
Expand Down
10 changes: 10 additions & 0 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
VariantMetadata, VariantObject,
};
use arrow_schema::ArrowError;
use chrono::Timelike;
use indexmap::{IndexMap, IndexSet};
use std::collections::HashSet;

Expand Down Expand Up @@ -190,6 +191,13 @@ impl ValueBuffer {
self.append_slice(&micros.to_le_bytes());
}

fn append_time_micros(&mut self, value: chrono::NaiveTime) {
self.append_primitive_header(VariantPrimitiveType::Time);
let micros_from_midnight = value.num_seconds_from_midnight() as u64 * 1_000_000
+ value.nanosecond() as u64 / 1_000;
self.append_slice(&micros_from_midnight.to_le_bytes());
}

fn append_decimal4(&mut self, decimal4: VariantDecimal4) {
self.append_primitive_header(VariantPrimitiveType::Decimal4);
self.append_u8(decimal4.scale());
Expand Down Expand Up @@ -334,6 +342,7 @@ impl ValueBuffer {
Variant::ShortString(s) => self.append_short_string(s),
Variant::Object(obj) => self.append_object(metadata_builder, obj),
Variant::List(list) => self.append_list(metadata_builder, list),
Variant::Time(v) => self.append_time_micros(v),
}
}

Expand Down Expand Up @@ -364,6 +373,7 @@ impl ValueBuffer {
Variant::ShortString(s) => self.append_short_string(s),
Variant::Object(obj) => self.try_append_object(metadata_builder, obj)?,
Variant::List(list) => self.try_append_list(metadata_builder, list)?,
Variant::Time(v) => self.append_time_micros(v),
}

Ok(())
Expand Down
Loading
Loading