diff --git a/rust/crates/sift_cli/Cargo.toml b/rust/crates/sift_cli/Cargo.toml index d3bcb2756..f59eecb0e 100644 --- a/rust/crates/sift_cli/Cargo.toml +++ b/rust/crates/sift_cli/Cargo.toml @@ -18,6 +18,7 @@ path = "src/main.rs" [dependencies] anyhow = { workspace = true } +arrow-array = { workspace = true } arrow-schema = { workspace = true } chrono = { workspace = true } clap = { workspace = true } @@ -42,5 +43,4 @@ zip = { workspace = true } [dev-dependencies] indoc = { workspace = true } -arrow-array = { workspace = true } bytes = { workspace = true } diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index ef00512e4..97ef9ba97 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -1,6 +1,6 @@ use clap::{Parser, Subcommand, crate_description, crate_version}; use clap_complete::Shell; -use parquet::ComplexTypesMode; +use parquet::{ComplexTypesMode, CprMode}; pub mod hdf5; pub mod tdms; use hdf5::Hdf5Schema; @@ -340,20 +340,16 @@ pub enum ImportParquetCmd { /// A parquet file where every column is exclusive to a single channel except for the time /// column FlatDataset(FlatDatasetArgs), + + /// A parquet file laid out single-channel-per-row, either one channel for the whole file + /// (single mode) or with a name column identifying the channel for each row (multi mode). + Cpr(CprArgs), } #[derive(clap::Args)] pub struct FlatDatasetArgs { - /// Path to the Parquet file to import - pub path: PathBuf, - - /// Name of the asset this data belongs to - #[arg(short, long)] - pub asset: String, - - /// Optional run name to associate with this import - #[arg(short, long)] - pub run: Option, + #[command(flatten)] + pub common: CommonImportArgs, /// Paths of data columns to import; can be specified multiple times #[arg(short, long)] @@ -395,14 +391,79 @@ pub struct FlatDatasetArgs { /// Strategy for handling complex types (maps, lists, structs) #[arg(short = 'm', long, default_value_t = ComplexTypesMode::default())] pub complex_types_mode: ComplexTypesMode, +} - /// Wait until the import finishes processing - #[arg(short, long)] - pub wait: bool, +#[derive(clap::Args)] +pub struct CprArgs { + #[command(flatten)] + pub common: CommonImportArgs, - /// Preview the parsed schema without uploading + /// Channel-per-row mode: single-channel or multi-channel + #[arg(long)] + pub mode: CprMode, + + /// Path to the time column #[arg(short, long)] - pub preview: bool, + pub time_path: String, + + /// Time format used in the time column + #[arg(short = 'f', long)] + pub time_format: TimeFormat, + + /// Start time (RFC3339) to use if time format is relative + #[arg(short = 's', long)] + pub relative_start_time: Option, + + /// Path to the column holding values (used in both modes) + #[arg(long)] + pub data_path: String, + + /// Channel name for every row in the file + #[arg( + long, + required_if_eq("mode", "single"), + conflicts_with = "name_path", + help_heading = "Single mode options" + )] + pub channel_name: Option, + + /// Data type for the channel. Use `"infer"` to have the program infer the + /// data type from the parquet schema. + #[arg( + long, + conflicts_with = "name_path", + help_heading = "Single mode options" + )] + pub data_type: Option, + + /// Channel units + #[arg( + long, + conflicts_with = "name_path", + help_heading = "Single mode options" + )] + pub unit: Option, + + /// Channel description + #[arg( + short = 'n', + long, + conflicts_with = "name_path", + help_heading = "Single mode options" + )] + pub description: Option, + + /// Path to the column holding channel names + #[arg( + long, + required_if_eq("mode", "multi"), + help_heading = "Multi mode options" + )] + pub name_path: Option, + + /// Strategy for handling complex types (maps, lists, structs) + #[arg(short = 'm', long, default_value_t = ComplexTypesMode::default())] + pub complex_types_mode: ComplexTypesMode, } #[derive(clap::Args)] diff --git a/rust/crates/sift_cli/src/cli/parquet.rs b/rust/crates/sift_cli/src/cli/parquet.rs index a9d267edf..1c5e4cdbb 100644 --- a/rust/crates/sift_cli/src/cli/parquet.rs +++ b/rust/crates/sift_cli/src/cli/parquet.rs @@ -38,3 +38,21 @@ impl Display for ComplexTypesMode { } } } + +/// Channel-per-row mode: tells the importer how each row is shaped. +#[derive(Debug, Copy, Clone, PartialEq, Eq, ValueEnum)] +pub enum CprMode { + /// File has [time, value]. All rows belong to one named channel. + Single, + /// File has [time, name_column, value_column]. Channels created per unique name. + Multi, +} + +impl Display for CprMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Single => write!(f, "single"), + Self::Multi => write!(f, "multi"), + } + } +} diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs new file mode 100644 index 000000000..e1e062296 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs @@ -0,0 +1,188 @@ +use std::{collections::HashSet, fs::File, io::Seek, process::ExitCode}; + +use anyhow::{Context as AnyhowContext, Result, anyhow}; +use crossterm::style::Stylize; +use parquet::file::reader::{ChunkReader, FileReader, SerializedFileReader}; +use parquet::record::Field; +use parquet::schema::types::Type as ParquetSchemaType; +use sift_rs::{ + common::r#type::v1::ChannelConfig, + data_imports::v2::{ + CreateDataImportFromUploadRequest, CreateDataImportFromUploadResponse, + ParquetComplexTypesImportMode, ParquetConfig, + data_import_service_client::DataImportServiceClient, parquet_config::Config, + parquet_single_channel_per_row_config::Config as CprInnerConfig, + }, +}; + +use crate::cli::CprArgs; +use crate::cmd::import::parquet::detect_parquet_schema::detect_cpr_config; +use crate::cmd::{ + Context, + import::{ + parquet::FooterMetadata, preview_import_config, utils::upload_gzipped_file, + wait_for_job_completion, + }, +}; +use crate::util::{api::create_grpc_channel, tty::Output}; + +pub async fn run(ctx: Context, args: CprArgs) -> Result { + let grpc_channel = create_grpc_channel(&ctx)?; + let mut data_imports_client = DataImportServiceClient::new(grpc_channel.clone()); + let mut file = File::open(&args.common.path).context("failed to open parquet file")?; + let footer_md = FooterMetadata::try_from(&mut file)?; + + let cpr_config = detect_cpr_config(&file, &args).context("failed to detect parquet schema")?; + + if args.common.preview { + let run_label = args + .common + .run_id + .as_deref() + .filter(|s| !s.is_empty()) + .or(args.common.run.as_deref()) + .unwrap_or(""); + + let multi_channels: Vec = match cpr_config.config.as_ref() { + Some(CprInnerConfig::MultiChannel(multi)) => { + let data_type = cpr_config + .columns + .iter() + .find(|c| c.path == multi.data_path) + .and_then(|c| c.column_config.as_ref()) + .map(|c| c.data_type) + .unwrap_or_default(); + + let discovery_file = File::open(&args.common.path) + .context("failed to open parquet file for channel discovery")?; + discover_multi_channel_names_for_preview(discovery_file, &multi.name_path)? + .into_iter() + .map(|name| ChannelConfig { + name, + data_type, + ..Default::default() + }) + .collect() + } + _ => Vec::new(), + }; + + let preview_channels: Vec<&ChannelConfig> = match cpr_config.config.as_ref() { + Some(CprInnerConfig::SingleChannel(single)) => single.channel.iter().collect(), + Some(CprInnerConfig::MultiChannel(_)) => multi_channels.iter().collect(), + None => Vec::new(), + }; + + preview_import_config(&args.common.asset, run_label, &preview_channels); + return Ok(ExitCode::SUCCESS); + } + + let parquet_config = ParquetConfig { + config: Some(Config::SingleChannelPerRow(cpr_config)), + ..Default::default() + }; + let create_data_import_req = create_data_import_request(&args, parquet_config, footer_md)?; + + let CreateDataImportFromUploadResponse { upload_url, .. } = data_imports_client + .create_data_import_from_upload(create_data_import_req) + .await + .context("error creating data import")? + .into_inner(); + + file.rewind()?; + let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/vnd.apache.parquet") + .await + .context("failed to upload Parquet file")?; + + let location = args.common.run.as_ref().map_or_else( + || format!("asset '{}'", args.common.asset.cyan()), + |r| format!("run '{}'", r.clone().cyan()), + ); + + if !args.common.wait { + Output::new() + .line(format!("{} file for processing", "Uploaded".green())) + .tip(format!( + "Once processing is complete the data will be available on the {location}." + )) + .print(); + return Ok(ExitCode::SUCCESS); + } + + wait_for_job_completion(grpc_channel, job_id, location).await +} + +fn create_data_import_request( + args: &CprArgs, + config: ParquetConfig, + footer_md: FooterMetadata, +) -> Result { + Ok(CreateDataImportFromUploadRequest { + parquet_config: Some(ParquetConfig { + asset_name: args.common.asset.clone(), + run_name: args.common.run.clone().unwrap_or_default(), + run_id: args.common.run_id.clone().unwrap_or_default(), + footer_offset: footer_md.offset, + footer_length: u32::try_from(footer_md.length) + .context("parquet footer length too large")?, + complex_types_import_mode: ParquetComplexTypesImportMode::from( + args.complex_types_mode.clone(), + ) + .into(), + config: config.config, + }), + ..Default::default() + }) +} + +/// Scan the parquet file's name column and return the distinct channel names +/// it contains (sorted, deduped). Used by multi-mode preview so the user can +/// see what channels the server will create at ingest. +pub(super) fn discover_multi_channel_names_for_preview( + file: R, + name_path: &str, +) -> Result> { + let reader = SerializedFileReader::new(file).context("failed to build parquet file reader")?; + + let file_schema = reader.metadata().file_metadata().schema(); + let root_name = file_schema.name().to_string(); + let name_field = file_schema + .get_fields() + .iter() + .find(|t| t.name() == name_path) + .with_context(|| format!("name column '{name_path}' not found in parquet schema"))? + .clone(); + + let projection = ParquetSchemaType::group_type_builder(&root_name) + .with_fields(vec![name_field]) + .build() + .context("failed to build parquet projection schema")?; + + let row_iter = reader + .get_row_iter(Some(projection)) + .context("failed to build parquet row iterator")?; + + let mut seen: HashSet = HashSet::new(); + for row_result in row_iter { + let row = row_result.context("failed to read parquet row")?; + let (_, field) = row + .get_column_iter() + .next() + .ok_or_else(|| anyhow!("internal: projected row missing column"))?; + match field { + Field::Str(s) => { + seen.insert(s.clone()); + } + Field::Null => {} + other => { + return Err(anyhow!( + "name column '{name_path}' must be a string type; got {other:?}" + )); + } + } + } + + let mut names: Vec = seen.into_iter().collect(); + names.sort(); + Ok(names) +} diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index 55f5d9296..aa717bcf4 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -9,11 +9,16 @@ use pbjson_types::Timestamp; use sift_rs::{ common::r#type::v1::{ChannelConfig, ChannelDataType}, data_imports::v2::{ - ParquetDataColumn, ParquetFlatDatasetConfig, ParquetTimeColumn, TimeFormat, + ParquetColumn, ParquetDataColumn, ParquetFlatDatasetConfig, + ParquetSingleChannelPerRowConfig, ParquetSingleChannelPerRowMultiChannelConfig, + ParquetSingleChannelPerRowSingleChannelConfig, ParquetTimeColumn, TimeFormat, + parquet_single_channel_per_row_config::Config as CprInnerConfig, }, }; -use crate::cli::FlatDatasetArgs; +use crate::cli::channel::DataType as CliDataType; +use crate::cli::parquet::CprMode; +use crate::cli::{CprArgs, FlatDatasetArgs}; pub fn detect_flat_dataset_config( file: &R, @@ -77,6 +82,128 @@ pub fn detect_flat_dataset_config( }) } +pub fn detect_cpr_config( + file: &R, + args: &CprArgs, +) -> Result { + validate_time_format(args.time_format, &args.relative_start_time) + .context("validating time format")?; + + let metadata = ParquetMetaDataReader::new().parse_and_finish(file)?; + let arrow_schema = parquet_to_arrow_schema( + metadata.file_metadata().schema_descr(), + metadata.file_metadata().key_value_metadata(), + ) + .context("detecting cpr arrow schema")?; + + let relative_start_time = match &args.relative_start_time { + Some(start) => { + let dt = DateTime::parse_from_rfc3339(start) + .context("--relative-start-time is not valid RFC3339")?; + Some(Timestamp::from(dt.to_utc())) + } + None => None, + }; + + arrow_schema + .fields() + .iter() + .find(|field| field.name() == &args.time_path) + .with_context(|| { + format!( + "time column '{}' not found in parquet schema", + args.time_path + ) + })?; + + let time_column = Some(ParquetTimeColumn { + relative_start_time, + path: args.time_path.clone(), + format: TimeFormat::from(args.time_format).into(), + }); + + let data_field = arrow_schema + .fields() + .iter() + .find(|field| field.name() == &args.data_path) + .with_context(|| { + format!( + "data column '{}' not found in parquet schema", + args.data_path + ) + })?; + let data_channel_type = arrow_type_to_channel_data_type(data_field.data_type()) + .with_context(|| format!("unsupported data type for column '{}'", args.data_path))?; + + let mut columns = vec![ParquetColumn { + path: args.data_path.clone(), + column_config: Some(ChannelConfig { + data_type: data_channel_type.into(), + ..Default::default() + }), + }]; + + let inner_config = match args.mode { + CprMode::Single => { + let channel_name = args + .channel_name + .as_ref() + .expect("clap enforces --channel-name for --mode single"); + + let resolved_type = match args.data_type { + None | Some(CliDataType::Infer) => data_channel_type, + Some(ref dt) => ChannelDataType::from(dt.clone()), + }; + + CprInnerConfig::SingleChannel(ParquetSingleChannelPerRowSingleChannelConfig { + data_path: args.data_path.clone(), + channel: Some(ChannelConfig { + name: channel_name.clone(), + data_type: resolved_type.into(), + units: args.unit.clone().unwrap_or_default(), + description: args.description.clone().unwrap_or_default(), + ..Default::default() + }), + }) + } + CprMode::Multi => { + let name_path = args + .name_path + .as_ref() + .expect("clap enforces --name-path for --mode multi"); + + let name_field = arrow_schema + .fields() + .iter() + .find(|field| field.name() == name_path) + .with_context(|| { + format!("name column '{name_path}' not found in parquet schema") + })?; + let name_channel_type = arrow_type_to_channel_data_type(name_field.data_type()) + .with_context(|| format!("unsupported data type for name column '{name_path}'"))?; + + columns.push(ParquetColumn { + path: name_path.clone(), + column_config: Some(ChannelConfig { + data_type: name_channel_type.into(), + ..Default::default() + }), + }); + + CprInnerConfig::MultiChannel(ParquetSingleChannelPerRowMultiChannelConfig { + name_path: name_path.clone(), + data_path: args.data_path.clone(), + }) + } + }; + + Ok(ParquetSingleChannelPerRowConfig { + time_column, + columns, + config: Some(inner_config), + }) +} + pub(super) fn arrow_type_to_channel_data_type(dt: &DataType) -> Option { match dt { DataType::Boolean => Some(ChannelDataType::Bool), diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs index 80aba690b..665ba8c89 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs @@ -29,7 +29,7 @@ use crate::{ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result { let grpc_channel = create_grpc_channel(&ctx)?; let mut data_imports_client = DataImportServiceClient::new(grpc_channel.clone()); - let mut file = File::open(&args.path).context("failed to open parquet file")?; + let mut file = File::open(&args.common.path).context("failed to open parquet file")?; let footer_md = FooterMetadata::try_from(&mut file)?; let mut config = { @@ -43,7 +43,7 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result { update_config_with_overrides(&mut config, &args)?; let create_data_import_req = create_data_import_request(&args, config, footer_md)?; - if args.preview { + if args.common.preview { let parquet_conf = create_data_import_req.parquet_config.unwrap(); let Config::FlatDataset(flatset_conf) = parquet_conf.config.unwrap() else { anyhow::bail!("expected flatdataset config for preview"); @@ -78,12 +78,12 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result { .await .context("failed to upload Parquet file")?; - let location = args.run.as_ref().map_or_else( - || format!("asset '{}'", args.asset.cyan()), + let location = args.common.run.as_ref().map_or_else( + || format!("asset '{}'", args.common.asset.cyan()), |r| format!("run '{}'", r.clone().cyan()), ); - if !args.wait { + if !args.common.wait { Output::new() .line(format!("{} file for processing", "Uploaded".green())) .tip(format!( @@ -224,8 +224,9 @@ fn create_data_import_request( ) -> Result { let req = CreateDataImportFromUploadRequest { parquet_config: Some(ParquetConfig { - asset_name: args.asset.clone(), - run_name: args.run.clone().unwrap_or_default(), + asset_name: args.common.asset.clone(), + run_name: args.common.run.clone().unwrap_or_default(), + run_id: args.common.run_id.clone().unwrap_or_default(), footer_offset: footer_md.offset, footer_length: u32::try_from(footer_md.length) .context("parquet footer length too large")?, @@ -234,7 +235,6 @@ fn create_data_import_request( ) .into(), config: config.config, - ..Default::default() }), ..Default::default() }; diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs b/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs index 299c888f9..68a21b1fc 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs @@ -4,6 +4,7 @@ use unix::FooterMetadata; #[cfg(target_os = "windows")] use windows::{FooterMetadata, get_footer}; +pub mod cpr_dataset; pub mod detect_parquet_schema; pub mod flat_dataset; diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index 07d08d839..1c27048ee 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -1,4 +1,10 @@ -use crate::cli::{FlatDatasetArgs, parquet::ComplexTypesMode, time::TimeFormat}; +use crate::cli::channel::DataType as CliDataType; +use crate::cli::{ + CommonImportArgs, CprArgs, FlatDatasetArgs, + parquet::{ComplexTypesMode, CprMode}, + time::TimeFormat, +}; +use crate::cmd::import::parquet::cpr_dataset; use crate::cmd::import::parquet::detect_parquet_schema::{self, arrow_type_to_channel_data_type}; use arrow_array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, @@ -13,9 +19,14 @@ use std::sync::Arc; fn make_test_args(time_path: &str, time_format: TimeFormat) -> FlatDatasetArgs { FlatDatasetArgs { - path: PathBuf::from("test.parquet"), - asset: String::from("test-asset"), - run: None, + common: CommonImportArgs { + path: PathBuf::from("test.parquet"), + asset: String::from("test-asset"), + run: None, + run_id: None, + wait: false, + preview: false, + }, channel_path: vec![], data_type: vec![], unit: vec![], @@ -26,8 +37,6 @@ fn make_test_args(time_path: &str, time_format: TimeFormat) -> FlatDatasetArgs { time_format, relative_start_time: None, complex_types_mode: ComplexTypesMode::default(), - wait: false, - preview: false, } } @@ -327,3 +336,240 @@ fn test_time_path_not_in_parquet_returns_error() { "should error when time path is not found in parquet schema" ); } + +fn make_cpr_args(mode: CprMode, time_format: TimeFormat) -> CprArgs { + CprArgs { + common: CommonImportArgs { + path: PathBuf::from("test.parquet"), + asset: "test-asset".into(), + run: None, + run_id: None, + wait: false, + preview: false, + }, + mode, + time_path: "timestamp".into(), + time_format, + relative_start_time: None, + data_path: "value".into(), + channel_name: None, + data_type: None, + unit: None, + description: None, + name_path: None, + complex_types_mode: ComplexTypesMode::default(), + } +} + +fn create_cpr_single_batch() -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("timestamp", DataType::Int64, false), + Field::new("value", DataType::Float64, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])), + ], + ) + .expect("failed to create cpr single batch") +} + +fn create_cpr_multi_batch(names: Vec<&str>) -> RecordBatch { + let n = names.len(); + let schema = Arc::new(Schema::new(vec![ + Field::new("timestamp", DataType::Int64, false), + Field::new("value", DataType::Float64, false), + Field::new("channel", DataType::Utf8, false), + ])); + let timestamps: Vec = (0..n as i64).collect(); + let values: Vec = (0..n).map(|i| i as f64).collect(); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(timestamps)), + Arc::new(Float64Array::from(values)), + Arc::new(StringArray::from(names)), + ], + ) + .expect("failed to create cpr multi batch") +} + +#[test] +fn test_detect_cpr_single_basic_infers_data_type() { + let batch = create_cpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + + let cfg = detect_parquet_schema::detect_cpr_config(&bytes, &args) + .expect("detect_cpr_config should succeed"); + + use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; + let inner = cfg.config.as_ref().expect("inner config present"); + let InnerConfig::SingleChannel(single) = inner else { + panic!("expected SingleChannel variant"); + }; + let channel = single.channel.as_ref().expect("channel config present"); + assert_eq!(channel.name, "temperature"); + assert_eq!(channel.data_type, i32::from(ChannelDataType::Double)); + assert_eq!(single.data_path, "value"); + + let time = cfg.time_column.as_ref().expect("time column present"); + assert_eq!(time.path, "timestamp"); +} + +#[test] +fn test_detect_cpr_single_honors_data_type_override() { + let batch = create_cpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + args.data_type = Some(CliDataType::Float); + + let cfg = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap(); + + use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; + let InnerConfig::SingleChannel(single) = cfg.config.as_ref().unwrap() else { + panic!("expected SingleChannel variant"); + }; + assert_eq!( + single.channel.as_ref().unwrap().data_type, + i32::from(ChannelDataType::Float), + "explicit --data-type should win over parquet-inferred type" + ); +} + +#[test] +fn test_detect_cpr_single_propagates_units_and_description() { + let batch = create_cpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + args.unit = Some("celsius".into()); + args.description = Some("ambient temperature".into()); + + let cfg = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap(); + use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; + let InnerConfig::SingleChannel(single) = cfg.config.as_ref().unwrap() else { + panic!("expected SingleChannel variant"); + }; + let channel = single.channel.as_ref().unwrap(); + assert_eq!(channel.units, "celsius"); + assert_eq!(channel.description, "ambient temperature"); +} + +#[test] +fn test_detect_cpr_multi_basic() { + let batch = create_cpr_multi_batch(vec!["a", "b", "c"]); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_cpr_args(CprMode::Multi, TimeFormat::AbsoluteUnixMilliseconds); + args.name_path = Some("channel".into()); + + let cfg = detect_parquet_schema::detect_cpr_config(&bytes, &args) + .expect("detect_cpr_config multi should succeed"); + + use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; + let InnerConfig::MultiChannel(multi) = cfg.config.as_ref().unwrap() else { + panic!("expected MultiChannel variant"); + }; + assert_eq!(multi.name_path, "channel"); + assert_eq!(multi.data_path, "value"); + + let paths: Vec<&str> = cfg.columns.iter().map(|c| c.path.as_str()).collect(); + assert!( + paths.contains(&"value"), + "columns should include data column" + ); + assert!( + paths.contains(&"channel"), + "columns should include name column" + ); +} + +#[test] +fn test_detect_cpr_missing_time_column_errors() { + let batch = create_cpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + args.time_path = "nonexistent_time".into(); + + let err = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("time column")), + "expected time column error, got: {err:#}" + ); +} + +#[test] +fn test_detect_cpr_missing_data_column_errors() { + let batch = create_cpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + args.data_path = "nonexistent_value".into(); + + let err = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("data column")), + "expected data column error, got: {err:#}" + ); +} + +#[test] +fn test_detect_cpr_multi_missing_name_column_errors() { + let batch = create_cpr_multi_batch(vec!["a"]); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_cpr_args(CprMode::Multi, TimeFormat::AbsoluteUnixMilliseconds); + args.name_path = Some("nonexistent_name".into()); + + let err = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("name column")), + "expected name column error, got: {err:#}" + ); +} + +#[test] +fn test_discover_multi_channel_names_for_preview_dedups_and_sorts() { + let batch = create_cpr_multi_batch(vec![ + "voltage", + "temperature", + "pressure", + "voltage", + "temperature", + ]); + let bytes = write_to_parquet_bytes(&batch); + + let names = cpr_dataset::discover_multi_channel_names_for_preview(bytes, "channel") + .expect("discovery should succeed"); + assert_eq!(names, vec!["pressure", "temperature", "voltage"]); +} + +#[test] +fn test_discover_multi_channel_names_for_preview_errors_on_non_string_column() { + let batch = create_cpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + + let err = cpr_dataset::discover_multi_channel_names_for_preview(bytes, "value").unwrap_err(); + assert!( + err.chain() + .any(|e| e.to_string().contains("must be a string type")), + "expected non-string error, got: {err:#}" + ); +} + +#[test] +fn test_discover_multi_channel_names_for_preview_missing_column_errors() { + let batch = create_cpr_multi_batch(vec!["a"]); + let bytes = write_to_parquet_bytes(&batch); + + let err = + cpr_dataset::discover_multi_channel_names_for_preview(bytes, "no_such_col").unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("not found")), + "expected not-found error, got: {err:#}" + ); +} diff --git a/rust/crates/sift_cli/src/main.rs b/rust/crates/sift_cli/src/main.rs index 037ce388b..d9ca8f099 100644 --- a/rust/crates/sift_cli/src/main.rs +++ b/rust/crates/sift_cli/src/main.rs @@ -74,6 +74,9 @@ fn run(clargs: cli::Args) -> Result { cli::ImportParquetCmd::FlatDataset(args) => { run_future(cmd::import::parquet::flat_dataset::run(ctx, args)) } + cli::ImportParquetCmd::Cpr(args) => { + run_future(cmd::import::parquet::cpr_dataset::run(ctx, args)) + } }, cli::ImportCmd::Tdms(args) => { run_future(cmd::import::tdms::detect_tdms_config::run(ctx, args))