From 12b91bd916cb2edd31988c0d6971a6de05d3079f Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Tue, 19 May 2026 10:30:16 -0700 Subject: [PATCH 01/21] boilerplate --- rust/crates/sift_cli/src/cli/mod.rs | 73 ++++++++++++++++++- rust/crates/sift_cli/src/cli/parquet.rs | 18 +++++ .../import/parquet/detect_parquet_schema.rs | 14 +++- .../sift_cli/src/cmd/import/parquet/mod.rs | 1 + .../src/cmd/import/parquet/scpr_dataset.rs | 30 ++++++++ rust/crates/sift_cli/src/main.rs | 3 + 6 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index ef00512e4..0c3ba4fcc 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -1,6 +1,6 @@ use clap::{Parser, Subcommand, crate_description, crate_version}; use clap_complete::Shell; -use parquet::ComplexTypesMode; +use parquet::{ComplexTypesMode, ScprMode}; pub mod hdf5; pub mod tdms; use hdf5::Hdf5Schema; @@ -340,6 +340,10 @@ pub enum ImportParquetCmd { /// A parquet file where every column is exclusive to a single channel except for the time /// column FlatDataset(FlatDatasetArgs), + + /// A parquet file laid out single-channel-per-row, either one channel for the whole file + /// (single mode) or with a name column identifying the channel for each row (multi mode). + Scpr(ScprArgs), } #[derive(clap::Args)] @@ -405,6 +409,73 @@ pub struct FlatDatasetArgs { pub preview: bool, } +#[derive(clap::Args)] +pub struct ScprArgs { + /// Path to the Parquet file to import + pub path: PathBuf, + + /// Name of the asset this data belongs to + #[arg(short, long)] + pub asset: String, + + /// Optional run name to associate with this import + #[arg(short, long)] + pub run: Option, + + /// SCPR mode: single-channel or multi-channel + #[arg(long)] + pub mode: ScprMode, + + /// Path to the time column + #[arg(short, long)] + pub time_path: String, + + /// Time format used in the time column + #[arg(short = 'f', long)] + pub time_format: TimeFormat, + + /// Start time (RFC3339) to use if time format is relative + #[arg(short = 's', long)] + pub relative_start_time: Option, + + /// Path to the column holding values (used in both modes) + #[arg(long)] + pub data_path: String, + + /// (single mode) Channel name for every row in the file + #[arg(long)] + pub channel_name: Option, + + /// (single mode) Data type for the channel. Use `"infer"` to have the program infer the + /// data type from the parquet schema. + #[arg(long)] + pub data_type: Option, + + /// (single mode) Channel units + #[arg(long)] + pub unit: Option, + + /// (single mode) Channel description + #[arg(short = 'n', long)] + pub description: Option, + + /// (multi mode) Path to the column holding channel names + #[arg(long)] + pub name_path: Option, + + /// Strategy for handling complex types (maps, lists, structs) + #[arg(short = 'm', long, default_value_t = ComplexTypesMode::default())] + pub complex_types_mode: ComplexTypesMode, + + /// Wait until the import finishes processing + #[arg(short, long)] + pub wait: bool, + + /// Preview the parsed schema without uploading + #[arg(short, long)] + pub preview: bool, +} + #[derive(clap::Args)] pub struct ImportTdmsArgs { #[command(flatten)] diff --git a/rust/crates/sift_cli/src/cli/parquet.rs b/rust/crates/sift_cli/src/cli/parquet.rs index a9d267edf..8256ba0b5 100644 --- a/rust/crates/sift_cli/src/cli/parquet.rs +++ b/rust/crates/sift_cli/src/cli/parquet.rs @@ -38,3 +38,21 @@ impl Display for ComplexTypesMode { } } } + +/// Single-channel-per-row mode: tells the importer how each row is shaped. +#[derive(Debug, Copy, Clone, PartialEq, Eq, ValueEnum)] +pub enum ScprMode { + /// File has [time, value]. All rows belong to one named channel. + Single, + /// File has [time, name_column, value_column]. Channels created per unique name. + Multi, +} + +impl Display for ScprMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Single => write!(f, "single"), + Self::Multi => write!(f, "multi"), + } + } +} diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index 55f5d9296..79cbaeafa 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -9,11 +9,12 @@ use pbjson_types::Timestamp; use sift_rs::{ common::r#type::v1::{ChannelConfig, ChannelDataType}, data_imports::v2::{ - ParquetDataColumn, ParquetFlatDatasetConfig, ParquetTimeColumn, TimeFormat, + ParquetDataColumn, ParquetFlatDatasetConfig, ParquetSingleChannelPerRowConfig, + ParquetTimeColumn, TimeFormat, }, }; -use crate::cli::FlatDatasetArgs; +use crate::cli::{FlatDatasetArgs, ScprArgs}; pub fn detect_flat_dataset_config( file: &R, @@ -100,3 +101,12 @@ pub(super) fn arrow_type_to_channel_data_type(dt: &DataType) -> Option None, } } + +pub fn detect_scpr_config( + _file: &R, + _args: &ScprArgs, +) -> Result { + todo!( + "detect SCPR config — walk Arrow schema, build time_column + columns + Single/Multi oneof from args" + ) +} diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs b/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs index 299c888f9..a793fac23 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs @@ -6,6 +6,7 @@ use windows::{FooterMetadata, get_footer}; pub mod detect_parquet_schema; pub mod flat_dataset; +pub mod scpr_dataset; #[cfg(test)] mod tests; diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs new file mode 100644 index 000000000..562529178 --- /dev/null +++ b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs @@ -0,0 +1,30 @@ +use std::process::ExitCode; + +use anyhow::Result; +use sift_rs::data_imports::v2::{ + CreateDataImportFromUploadRequest, ParquetConfig, ParquetSingleChannelPerRowConfig, +}; + +use crate::{ + cli::ScprArgs, + cmd::{Context, import::parquet::FooterMetadata}, +}; + +pub async fn run(_ctx: Context, _args: ScprArgs) -> Result { + todo!("SCPR run flow — see flat_dataset::run for pattern") +} + +pub fn build_scpr_config(_args: &ScprArgs) -> Result { + todo!("validate per-mode args, then call detect_scpr_config") +} + +#[allow(dead_code)] +fn create_data_import_request( + _args: &ScprArgs, + _config: ParquetConfig, + _footer_md: FooterMetadata, +) -> Result { + todo!( + "mirror flat_dataset::create_data_import_request, set config: Some(Config::SingleChannelPerRow(...))" + ) +} diff --git a/rust/crates/sift_cli/src/main.rs b/rust/crates/sift_cli/src/main.rs index 037ce388b..269d881d4 100644 --- a/rust/crates/sift_cli/src/main.rs +++ b/rust/crates/sift_cli/src/main.rs @@ -74,6 +74,9 @@ fn run(clargs: cli::Args) -> Result { cli::ImportParquetCmd::FlatDataset(args) => { run_future(cmd::import::parquet::flat_dataset::run(ctx, args)) } + cli::ImportParquetCmd::Scpr(args) => { + run_future(cmd::import::parquet::scpr_dataset::run(ctx, args)) + } }, cli::ImportCmd::Tdms(args) => { run_future(cmd::import::tdms::detect_tdms_config::run(ctx, args)) From f1c089857d53140dcbd2634bf1f0d52e674f65a7 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Thu, 21 May 2026 13:54:41 -0700 Subject: [PATCH 02/21] added scpr cli args and condensed parquet args into general arg structs --- rust/crates/sift_cli/src/cli/mod.rs | 32 ++++------------------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 0c3ba4fcc..995b8b202 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -348,16 +348,8 @@ pub enum ImportParquetCmd { #[derive(clap::Args)] pub struct FlatDatasetArgs { - /// Path to the Parquet file to import - pub path: PathBuf, - - /// Name of the asset this data belongs to - #[arg(short, long)] - pub asset: String, - - /// Optional run name to associate with this import - #[arg(short, long)] - pub run: Option, + #[command(flatten)] + pub common: CommonImportArgs, /// Paths of data columns to import; can be specified multiple times #[arg(short, long)] @@ -399,28 +391,12 @@ pub struct FlatDatasetArgs { /// Strategy for handling complex types (maps, lists, structs) #[arg(short = 'm', long, default_value_t = ComplexTypesMode::default())] pub complex_types_mode: ComplexTypesMode, - - /// Wait until the import finishes processing - #[arg(short, long)] - pub wait: bool, - - /// Preview the parsed schema without uploading - #[arg(short, long)] - pub preview: bool, } #[derive(clap::Args)] pub struct ScprArgs { - /// Path to the Parquet file to import - pub path: PathBuf, - - /// Name of the asset this data belongs to - #[arg(short, long)] - pub asset: String, - - /// Optional run name to associate with this import - #[arg(short, long)] - pub run: Option, + #[command(flatten)] + pub common: CommonImportArgs, /// SCPR mode: single-channel or multi-channel #[arg(long)] From 81417642af69f83ba7d13b7b987ae75af90f603e Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Thu, 21 May 2026 14:26:40 -0700 Subject: [PATCH 03/21] Move arrow schema to dependencies bc of multimode preview --- rust/crates/sift_cli/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/crates/sift_cli/Cargo.toml b/rust/crates/sift_cli/Cargo.toml index d3bcb2756..f59eecb0e 100644 --- a/rust/crates/sift_cli/Cargo.toml +++ b/rust/crates/sift_cli/Cargo.toml @@ -18,6 +18,7 @@ path = "src/main.rs" [dependencies] anyhow = { workspace = true } +arrow-array = { workspace = true } arrow-schema = { workspace = true } chrono = { workspace = true } clap = { workspace = true } @@ -42,5 +43,4 @@ zip = { workspace = true } [dev-dependencies] indoc = { workspace = true } -arrow-array = { workspace = true } bytes = { workspace = true } From ef98552acb670a4b64c9e06dd8fa5c5037c301d6 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Thu, 21 May 2026 14:27:39 -0700 Subject: [PATCH 04/21] Bug fixing duplicate wait bools in cli args --- rust/crates/sift_cli/src/cli/mod.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 995b8b202..eec78369c 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -442,14 +442,6 @@ pub struct ScprArgs { /// Strategy for handling complex types (maps, lists, structs) #[arg(short = 'm', long, default_value_t = ComplexTypesMode::default())] pub complex_types_mode: ComplexTypesMode, - - /// Wait until the import finishes processing - #[arg(short, long)] - pub wait: bool, - - /// Preview the parsed schema without uploading - #[arg(short, long)] - pub preview: bool, } #[derive(clap::Args)] From 051aee2d3945a7a2a6deb4a13fcd1a4cd4a0237d Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Thu, 21 May 2026 15:14:25 -0700 Subject: [PATCH 05/21] Require certain args for both single and multi --- rust/crates/sift_cli/src/cli/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index eec78369c..3f1cf6cea 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -419,24 +419,24 @@ pub struct ScprArgs { pub data_path: String, /// (single mode) Channel name for every row in the file - #[arg(long)] + #[arg(long, required_if_eq("mode", "single"), conflicts_with = "name_path")] pub channel_name: Option, /// (single mode) Data type for the channel. Use `"infer"` to have the program infer the /// data type from the parquet schema. - #[arg(long)] + #[arg(long, conflicts_with = "name_path")] pub data_type: Option, /// (single mode) Channel units - #[arg(long)] + #[arg(long, conflicts_with = "name_path")] pub unit: Option, /// (single mode) Channel description - #[arg(short = 'n', long)] + #[arg(short = 'n', long, conflicts_with = "name_path")] pub description: Option, /// (multi mode) Path to the column holding channel names - #[arg(long)] + #[arg(long, required_if_eq("mode", "multi"))] pub name_path: Option, /// Strategy for handling complex types (maps, lists, structs) From 614375f34935cca16a4307f65338bafc0e41187a Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 10:24:10 -0700 Subject: [PATCH 06/21] building config for scpr single and multi parquet --- .../import/parquet/detect_parquet_schema.rs | 192 ++++++++++++++++-- 1 file changed, 180 insertions(+), 12 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index 79cbaeafa..3fa794ae3 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -1,19 +1,28 @@ use crate::cmd::import::utils::validate_time_format; use anyhow::{Context, Result}; +use arrow_array::{Array, LargeStringArray, StringArray}; use arrow_schema::DataType; use chrono::DateTime; -use parquet::arrow::parquet_to_arrow_schema; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use parquet::arrow::{ProjectionMask, parquet_to_arrow_schema}; use parquet::file::metadata::ParquetMetaDataReader; use parquet::file::reader::ChunkReader; use pbjson_types::Timestamp; use sift_rs::{ common::r#type::v1::{ChannelConfig, ChannelDataType}, data_imports::v2::{ - ParquetDataColumn, ParquetFlatDatasetConfig, ParquetSingleChannelPerRowConfig, - ParquetTimeColumn, TimeFormat, + ParquetColumn, ParquetDataColumn, ParquetFlatDatasetConfig, + ParquetSingleChannelPerRowConfig, ParquetSingleChannelPerRowMultiChannelConfig, + ParquetSingleChannelPerRowSingleChannelConfig, ParquetTimeColumn, TimeFormat, + parquet_single_channel_per_row_config::Config as ScprInnerConfig, }, }; +use std::collections::HashSet; +use std::fs::File; +use std::path::Path; +use crate::cli::channel::DataType as CliDataType; +use crate::cli::parquet::ScprMode; use crate::cli::{FlatDatasetArgs, ScprArgs}; pub fn detect_flat_dataset_config( @@ -78,6 +87,174 @@ pub fn detect_flat_dataset_config( }) } +pub fn detect_scpr_config( + file: &R, + args: &ScprArgs, +) -> Result { + validate_time_format(args.time_format, &args.relative_start_time) + .context("validating time format")?; + + let metadata = ParquetMetaDataReader::new().parse_and_finish(file)?; + let arrow_schema = parquet_to_arrow_schema( + metadata.file_metadata().schema_descr(), + metadata.file_metadata().key_value_metadata(), + ) + .context("detecting scpr arrow schema")?; + + let relative_start_time = match &args.relative_start_time { + Some(start) => { + let dt = DateTime::parse_from_rfc3339(start) + .context("--relative-start-time is not valid RFC3339")?; + Some(Timestamp::from(dt.to_utc())) + } + None => None, + }; + + arrow_schema + .fields() + .iter() + .find(|field| field.name() == &args.time_path) + .with_context(|| { + format!("time column '{}' not found in parquet schema", args.time_path) + })?; + + let time_column = Some(ParquetTimeColumn { + relative_start_time, + path: args.time_path.clone(), + format: TimeFormat::from(args.time_format).into(), + }); + + let data_field = arrow_schema + .fields() + .iter() + .find(|f| f.name() == &args.data_path) + .with_context(|| { + format!("data column '{}' not found in parquet schema", args.data_path) + })?; + let data_channel_type = arrow_type_to_channel_data_type(data_field.data_type()) + .with_context(|| format!("unsupported data type for column '{}'", args.data_path))?; + + let mut columns = vec![ParquetColumn { + path: args.data_path.clone(), + column_config: Some(ChannelConfig { + data_type: data_channel_type.into(), + ..Default::default() + }), + }]; + + let inner_config = match args.mode { + ScprMode::Single => { + let channel_name = args + .channel_name + .as_ref() + .expect("clap enforces --channel-name for --mode single"); + + let resolved_type = match args.data_type { + None | Some(CliDataType::Infer) => data_channel_type, + Some(ref dt) => ChannelDataType::from(dt.clone()), + }; + + ScprInnerConfig::SingleChannel(ParquetSingleChannelPerRowSingleChannelConfig { + data_path: args.data_path.clone(), + channel: Some(ChannelConfig { + name: channel_name.clone(), + data_type: resolved_type.into(), + units: args.unit.clone().unwrap_or_default(), + description: args.description.clone().unwrap_or_default(), + ..Default::default() + }), + }) + } + ScprMode::Multi => { + let name_path = args + .name_path + .as_ref() + .expect("clap enforces --name-path for --mode multi"); + + let name_field = arrow_schema + .fields() + .iter() + .find(|f| f.name() == name_path) + .with_context(|| { + format!("name column '{name_path}' not found in parquet schema") + })?; + let name_channel_type = + arrow_type_to_channel_data_type(name_field.data_type()).with_context(|| { + format!("unsupported data type for name column '{name_path}'") + })?; + + columns.push(ParquetColumn { + path: name_path.clone(), + column_config: Some(ChannelConfig { + data_type: name_channel_type.into(), + ..Default::default() + }), + }); + + ScprInnerConfig::MultiChannel(ParquetSingleChannelPerRowMultiChannelConfig { + name_path: name_path.clone(), + data_path: args.data_path.clone(), + }) + } + }; + + Ok(ParquetSingleChannelPerRowConfig { + time_column, + columns, + config: Some(inner_config), + }) +} + +/// Scan the parquet file's name column and return the distinct channel names +/// it contains (sorted, deduped). Used by multi-mode preview so the user can +/// see what channels the server will create at ingest. +pub fn discover_multi_channel_names(path: &Path, name_path: &str) -> Result> { + let file = File::open(path).context("failed to open parquet file for channel discovery")?; + let builder = ParquetRecordBatchReaderBuilder::try_new(file) + .context("failed to build parquet record batch reader")?; + + let schema = builder.schema().clone(); + let name_idx = schema + .fields() + .iter() + .position(|f| f.name() == name_path) + .with_context(|| format!("name column '{name_path}' not found in parquet schema"))?; + + let projection = ProjectionMask::roots(builder.parquet_schema(), [name_idx]); + let reader = builder + .with_projection(projection) + .build() + .context("failed to build projected parquet reader")?; + + let mut seen: HashSet = HashSet::new(); + for batch in reader { + let batch = batch.context("failed to read parquet record batch")?; + let col = batch.column(0); + if let Some(arr) = col.as_any().downcast_ref::() { + for i in 0..arr.len() { + if !arr.is_null(i) { + seen.insert(arr.value(i).to_string()); + } + } + } else if let Some(arr) = col.as_any().downcast_ref::() { + for i in 0..arr.len() { + if !arr.is_null(i) { + seen.insert(arr.value(i).to_string()); + } + } + } else { + anyhow::bail!( + "name column '{name_path}' must be a string type; got {:?}", + col.data_type() + ); + } + } + + let mut names: Vec = seen.into_iter().collect(); + names.sort(); + Ok(names) +} + pub(super) fn arrow_type_to_channel_data_type(dt: &DataType) -> Option { match dt { DataType::Boolean => Some(ChannelDataType::Bool), @@ -101,12 +278,3 @@ pub(super) fn arrow_type_to_channel_data_type(dt: &DataType) -> Option None, } } - -pub fn detect_scpr_config( - _file: &R, - _args: &ScprArgs, -) -> Result { - todo!( - "detect SCPR config — walk Arrow schema, build time_column + columns + Single/Multi oneof from args" - ) -} From 1d70010bfdfe077a5eaa9cbd46c74b6aebc90ef8 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 11:29:41 -0700 Subject: [PATCH 07/21] scpr import request --- .../src/cmd/import/parquet/scpr_dataset.rs | 141 +++++++++++++++--- 1 file changed, 122 insertions(+), 19 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs index 562529178..e3529b8c7 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs @@ -1,30 +1,133 @@ -use std::process::ExitCode; +use std::{fs::File, io::Seek, process::ExitCode}; -use anyhow::Result; -use sift_rs::data_imports::v2::{ - CreateDataImportFromUploadRequest, ParquetConfig, ParquetSingleChannelPerRowConfig, +use anyhow::{Context as AnyhowContext, Result}; +use crossterm::style::Stylize; +use sift_rs::{ + common::r#type::v1::ChannelConfig, + data_imports::v2::{ + CreateDataImportFromUploadRequest, CreateDataImportFromUploadResponse, + ParquetComplexTypesImportMode, ParquetConfig, + data_import_service_client::DataImportServiceClient, parquet_config::Config, + parquet_single_channel_per_row_config::Config as ScprInnerConfig, + }, }; -use crate::{ - cli::ScprArgs, - cmd::{Context, import::parquet::FooterMetadata}, +use crate::cli::ScprArgs; +use crate::cmd::import::parquet::detect_parquet_schema::{ + detect_scpr_config, discover_multi_channel_names, }; +use crate::cmd::{ + Context, + import::{ + parquet::FooterMetadata, preview_import_config, utils::upload_gzipped_file, + wait_for_job_completion, + }, +}; +use crate::util::{api::create_grpc_channel, tty::Output}; -pub async fn run(_ctx: Context, _args: ScprArgs) -> Result { - todo!("SCPR run flow — see flat_dataset::run for pattern") -} +pub async fn run(ctx: Context, args: ScprArgs) -> Result { + let grpc_channel = create_grpc_channel(&ctx)?; + let mut data_imports_client = DataImportServiceClient::new(grpc_channel.clone()); + let mut file = File::open(&args.common.path).context("failed to open parquet file")?; + let footer_md = FooterMetadata::try_from(&mut file)?; + + let scpr_config = detect_scpr_config(&file, &args).context("failed to detect parquet schema")?; + + if args.common.preview { + let run_label = args + .common + .run_id + .as_deref() + .filter(|s| !s.is_empty()) + .or(args.common.run.as_deref()) + .unwrap_or(""); + + let multi_channels: Vec = match scpr_config.config.as_ref() { + Some(ScprInnerConfig::MultiChannel(multi)) => { + let data_type = scpr_config + .columns + .iter() + .find(|c| c.path == multi.data_path) + .and_then(|c| c.column_config.as_ref()) + .map(|c| c.data_type) + .unwrap_or_default(); + + discover_multi_channel_names(&args.common.path, &multi.name_path)? + .into_iter() + .map(|name| ChannelConfig { + name, + data_type, + ..Default::default() + }) + .collect() + } + _ => Vec::new(), + }; + + let preview_channels: Vec<&ChannelConfig> = match scpr_config.config.as_ref() { + Some(ScprInnerConfig::SingleChannel(single)) => single.channel.iter().collect(), + Some(ScprInnerConfig::MultiChannel(_)) => multi_channels.iter().collect(), + None => Vec::new(), + }; + + preview_import_config(&args.common.asset, run_label, &preview_channels); + return Ok(ExitCode::SUCCESS); + } + + let parquet_config = ParquetConfig { + config: Some(Config::SingleChannelPerRow(scpr_config)), + ..Default::default() + }; + let create_data_import_req = create_data_import_request(&args, parquet_config, footer_md)?; + + let CreateDataImportFromUploadResponse { upload_url, .. } = data_imports_client + .create_data_import_from_upload(create_data_import_req) + .await + .context("error creating data import")? + .into_inner(); + + file.rewind()?; + let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/vnd.apache.parquet") + .await + .context("failed to upload Parquet file")?; + + let location = args.common.run.as_ref().map_or_else( + || format!("asset '{}'", args.common.asset.cyan()), + |r| format!("run '{}'", r.clone().cyan()), + ); + + if !args.common.wait { + Output::new() + .line(format!("{} file for processing", "Uploaded".green())) + .tip(format!( + "Once processing is complete the data will be available on the {location}." + )) + .print(); + return Ok(ExitCode::SUCCESS); + } -pub fn build_scpr_config(_args: &ScprArgs) -> Result { - todo!("validate per-mode args, then call detect_scpr_config") + wait_for_job_completion(grpc_channel, job_id, location).await } -#[allow(dead_code)] fn create_data_import_request( - _args: &ScprArgs, - _config: ParquetConfig, - _footer_md: FooterMetadata, + args: &ScprArgs, + config: ParquetConfig, + footer_md: FooterMetadata, ) -> Result { - todo!( - "mirror flat_dataset::create_data_import_request, set config: Some(Config::SingleChannelPerRow(...))" - ) + Ok(CreateDataImportFromUploadRequest { + parquet_config: Some(ParquetConfig { + asset_name: args.common.asset.clone(), + run_name: args.common.run.clone().unwrap_or_default(), + run_id: args.common.run_id.clone().unwrap_or_default(), + footer_offset: footer_md.offset, + footer_length: u32::try_from(footer_md.length) + .context("parquet footer length too large")?, + complex_types_import_mode: ParquetComplexTypesImportMode::from( + args.complex_types_mode.clone(), + ) + .into(), + config: config.config, + }), + ..Default::default() + }) } From 50bce4f2e5a377fd8f94ecfb86b9aa89d9f8596b Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 11:31:07 -0700 Subject: [PATCH 08/21] Condense the both flat and scpr parquet args to use common args --- .../src/cmd/import/parquet/flat_dataset.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs index 80aba690b..665ba8c89 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs @@ -29,7 +29,7 @@ use crate::{ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result { let grpc_channel = create_grpc_channel(&ctx)?; let mut data_imports_client = DataImportServiceClient::new(grpc_channel.clone()); - let mut file = File::open(&args.path).context("failed to open parquet file")?; + let mut file = File::open(&args.common.path).context("failed to open parquet file")?; let footer_md = FooterMetadata::try_from(&mut file)?; let mut config = { @@ -43,7 +43,7 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result { update_config_with_overrides(&mut config, &args)?; let create_data_import_req = create_data_import_request(&args, config, footer_md)?; - if args.preview { + if args.common.preview { let parquet_conf = create_data_import_req.parquet_config.unwrap(); let Config::FlatDataset(flatset_conf) = parquet_conf.config.unwrap() else { anyhow::bail!("expected flatdataset config for preview"); @@ -78,12 +78,12 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result { .await .context("failed to upload Parquet file")?; - let location = args.run.as_ref().map_or_else( - || format!("asset '{}'", args.asset.cyan()), + let location = args.common.run.as_ref().map_or_else( + || format!("asset '{}'", args.common.asset.cyan()), |r| format!("run '{}'", r.clone().cyan()), ); - if !args.wait { + if !args.common.wait { Output::new() .line(format!("{} file for processing", "Uploaded".green())) .tip(format!( @@ -224,8 +224,9 @@ fn create_data_import_request( ) -> Result { let req = CreateDataImportFromUploadRequest { parquet_config: Some(ParquetConfig { - asset_name: args.asset.clone(), - run_name: args.run.clone().unwrap_or_default(), + asset_name: args.common.asset.clone(), + run_name: args.common.run.clone().unwrap_or_default(), + run_id: args.common.run_id.clone().unwrap_or_default(), footer_offset: footer_md.offset, footer_length: u32::try_from(footer_md.length) .context("parquet footer length too large")?, @@ -234,7 +235,6 @@ fn create_data_import_request( ) .into(), config: config.config, - ..Default::default() }), ..Default::default() }; From 5c423d7ed38fab66c8ae6cb35033eb4abcfe1983 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 11:31:21 -0700 Subject: [PATCH 09/21] tests --- .../sift_cli/src/cmd/import/parquet/tests.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index 07d08d839..4f9f2ea0b 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -1,4 +1,4 @@ -use crate::cli::{FlatDatasetArgs, parquet::ComplexTypesMode, time::TimeFormat}; +use crate::cli::{CommonImportArgs, FlatDatasetArgs, parquet::ComplexTypesMode, time::TimeFormat}; use crate::cmd::import::parquet::detect_parquet_schema::{self, arrow_type_to_channel_data_type}; use arrow_array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, @@ -13,9 +13,14 @@ use std::sync::Arc; fn make_test_args(time_path: &str, time_format: TimeFormat) -> FlatDatasetArgs { FlatDatasetArgs { - path: PathBuf::from("test.parquet"), - asset: String::from("test-asset"), - run: None, + common: CommonImportArgs { + path: PathBuf::from("test.parquet"), + asset: String::from("test-asset"), + run: None, + run_id: None, + wait: false, + preview: false, + }, channel_path: vec![], data_type: vec![], unit: vec![], @@ -26,8 +31,6 @@ fn make_test_args(time_path: &str, time_format: TimeFormat) -> FlatDatasetArgs { time_format, relative_start_time: None, complex_types_mode: ComplexTypesMode::default(), - wait: false, - preview: false, } } From 1354cbdcaf228c3232d56f068160a818fde8df5e Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 12:35:04 -0700 Subject: [PATCH 10/21] scpr tests --- .../sift_cli/src/cmd/import/parquet/tests.rs | 234 +++++++++++++++++- 1 file changed, 233 insertions(+), 1 deletion(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index 4f9f2ea0b..09599deb1 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -1,4 +1,9 @@ -use crate::cli::{CommonImportArgs, FlatDatasetArgs, parquet::ComplexTypesMode, time::TimeFormat}; +use crate::cli::channel::DataType as CliDataType; +use crate::cli::{ + CommonImportArgs, FlatDatasetArgs, ScprArgs, + parquet::{ComplexTypesMode, ScprMode}, + time::TimeFormat, +}; use crate::cmd::import::parquet::detect_parquet_schema::{self, arrow_type_to_channel_data_type}; use arrow_array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, @@ -330,3 +335,230 @@ fn test_time_path_not_in_parquet_returns_error() { "should error when time path is not found in parquet schema" ); } + +// ---------- SCPR helpers and tests ---------- + +fn make_scpr_args(mode: ScprMode, time_format: TimeFormat) -> ScprArgs { + ScprArgs { + common: CommonImportArgs { + path: PathBuf::from("test.parquet"), + asset: "test-asset".into(), + run: None, + run_id: None, + wait: false, + preview: false, + }, + mode, + time_path: "timestamp".into(), + time_format, + relative_start_time: None, + data_path: "value".into(), + channel_name: None, + data_type: None, + unit: None, + description: None, + name_path: None, + complex_types_mode: ComplexTypesMode::default(), + } +} + +fn create_scpr_single_batch() -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("timestamp", DataType::Int64, false), + Field::new("value", DataType::Float64, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])), + ], + ) + .expect("failed to create scpr single batch") +} + +fn create_scpr_multi_batch(names: Vec<&str>) -> RecordBatch { + let n = names.len(); + let schema = Arc::new(Schema::new(vec![ + Field::new("timestamp", DataType::Int64, false), + Field::new("value", DataType::Float64, false), + Field::new("channel", DataType::Utf8, false), + ])); + let timestamps: Vec = (0..n as i64).collect(); + let values: Vec = (0..n).map(|i| i as f64).collect(); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(timestamps)), + Arc::new(Float64Array::from(values)), + Arc::new(StringArray::from(names)), + ], + ) + .expect("failed to create scpr multi batch") +} + +#[test] +fn test_detect_scpr_single_basic_infers_data_type() { + let batch = create_scpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + + let cfg = detect_parquet_schema::detect_scpr_config(&bytes, &args) + .expect("detect_scpr_config should succeed"); + + use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; + let inner = cfg.config.as_ref().expect("inner config present"); + let InnerConfig::SingleChannel(single) = inner else { + panic!("expected SingleChannel variant"); + }; + let channel = single.channel.as_ref().expect("channel config present"); + assert_eq!(channel.name, "temperature"); + assert_eq!(channel.data_type, i32::from(ChannelDataType::Double)); + assert_eq!(single.data_path, "value"); + + let time = cfg.time_column.as_ref().expect("time column present"); + assert_eq!(time.path, "timestamp"); +} + +#[test] +fn test_detect_scpr_single_honors_data_type_override() { + let batch = create_scpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + args.data_type = Some(CliDataType::Float); + + let cfg = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap(); + + use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; + let InnerConfig::SingleChannel(single) = cfg.config.as_ref().unwrap() else { + panic!("expected SingleChannel variant"); + }; + assert_eq!( + single.channel.as_ref().unwrap().data_type, + i32::from(ChannelDataType::Float), + "explicit --data-type should win over parquet-inferred type" + ); +} + +#[test] +fn test_detect_scpr_single_propagates_units_and_description() { + let batch = create_scpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + args.unit = Some("celsius".into()); + args.description = Some("ambient temperature".into()); + + let cfg = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap(); + use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; + let InnerConfig::SingleChannel(single) = cfg.config.as_ref().unwrap() else { + panic!("expected SingleChannel variant"); + }; + let channel = single.channel.as_ref().unwrap(); + assert_eq!(channel.units, "celsius"); + assert_eq!(channel.description, "ambient temperature"); +} + +#[test] +fn test_detect_scpr_multi_basic() { + let batch = create_scpr_multi_batch(vec!["a", "b", "c"]); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_scpr_args(ScprMode::Multi, TimeFormat::AbsoluteUnixMilliseconds); + args.name_path = Some("channel".into()); + + let cfg = detect_parquet_schema::detect_scpr_config(&bytes, &args) + .expect("detect_scpr_config multi should succeed"); + + use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; + let InnerConfig::MultiChannel(multi) = cfg.config.as_ref().unwrap() else { + panic!("expected MultiChannel variant"); + }; + assert_eq!(multi.name_path, "channel"); + assert_eq!(multi.data_path, "value"); + + // top-level columns should include both data and name columns + let paths: Vec<&str> = cfg.columns.iter().map(|c| c.path.as_str()).collect(); + assert!(paths.contains(&"value"), "columns should include data column"); + assert!(paths.contains(&"channel"), "columns should include name column"); +} + +#[test] +fn test_detect_scpr_missing_time_column_errors() { + let batch = create_scpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + args.time_path = "nonexistent_time".into(); + + let err = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("time column")), + "expected time column error, got: {err:#}" + ); +} + +#[test] +fn test_detect_scpr_missing_data_column_errors() { + let batch = create_scpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + args.channel_name = Some("temperature".into()); + args.data_path = "nonexistent_value".into(); + + let err = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("data column")), + "expected data column error, got: {err:#}" + ); +} + +#[test] +fn test_detect_scpr_multi_missing_name_column_errors() { + let batch = create_scpr_multi_batch(vec!["a"]); + let bytes = write_to_parquet_bytes(&batch); + let mut args = make_scpr_args(ScprMode::Multi, TimeFormat::AbsoluteUnixMilliseconds); + args.name_path = Some("nonexistent_name".into()); + + let err = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("name column")), + "expected name column error, got: {err:#}" + ); +} + +#[test] +fn test_discover_multi_channel_names_dedups_and_sorts() { + let batch = create_scpr_multi_batch(vec!["voltage", "temperature", "pressure", "voltage", "temperature"]); + let bytes = write_to_parquet_bytes(&batch); + + let names = detect_parquet_schema::discover_multi_channel_names(bytes, "channel") + .expect("discovery should succeed"); + assert_eq!(names, vec!["pressure", "temperature", "voltage"]); +} + +#[test] +fn test_discover_multi_channel_names_errors_on_non_string_column() { + // Use the single batch — `value` is Float64 + let batch = create_scpr_single_batch(); + let bytes = write_to_parquet_bytes(&batch); + + let err = detect_parquet_schema::discover_multi_channel_names(bytes, "value").unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("must be a string type")), + "expected non-string error, got: {err:#}" + ); +} + +#[test] +fn test_discover_multi_channel_names_missing_column_errors() { + let batch = create_scpr_multi_batch(vec!["a"]); + let bytes = write_to_parquet_bytes(&batch); + + let err = detect_parquet_schema::discover_multi_channel_names(bytes, "no_such_col").unwrap_err(); + assert!( + err.chain().any(|e| e.to_string().contains("not found")), + "expected not-found error, got: {err:#}" + ); +} From b5b2838edf7491021294fbf77eb7d78f7bdaa8dd Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 12:39:29 -0700 Subject: [PATCH 11/21] polish --- rust/crates/sift_cli/src/cli/mod.rs | 20 +++++++++---------- .../import/parquet/detect_parquet_schema.rs | 8 ++++---- .../src/cmd/import/parquet/scpr_dataset.rs | 4 +++- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 3f1cf6cea..e56059f69 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -418,25 +418,25 @@ pub struct ScprArgs { #[arg(long)] pub data_path: String, - /// (single mode) Channel name for every row in the file - #[arg(long, required_if_eq("mode", "single"), conflicts_with = "name_path")] + /// Channel name for every row in the file + #[arg( long, required_if_eq("mode", "single"), conflicts_with = "name_path", help_heading = "Single mode options",)] pub channel_name: Option, - /// (single mode) Data type for the channel. Use `"infer"` to have the program infer the + /// Data type for the channel. Use `"infer"` to have the program infer the /// data type from the parquet schema. - #[arg(long, conflicts_with = "name_path")] + #[arg(long, conflicts_with = "name_path", help_heading = "Single mode options")] pub data_type: Option, - /// (single mode) Channel units - #[arg(long, conflicts_with = "name_path")] + /// Channel units + #[arg(long, conflicts_with = "name_path", help_heading = "Single mode options")] pub unit: Option, - /// (single mode) Channel description - #[arg(short = 'n', long, conflicts_with = "name_path")] + /// Channel description + #[arg( short = 'n', long, conflicts_with = "name_path", help_heading = "Single mode options",)] pub description: Option, - /// (multi mode) Path to the column holding channel names - #[arg(long, required_if_eq("mode", "multi"))] + /// Path to the column holding channel names + #[arg( long, required_if_eq("mode", "multi"), help_heading = "Multi mode options",)] pub name_path: Option, /// Strategy for handling complex types (maps, lists, structs) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index 3fa794ae3..fc22af236 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -18,8 +18,6 @@ use sift_rs::{ }, }; use std::collections::HashSet; -use std::fs::File; -use std::path::Path; use crate::cli::channel::DataType as CliDataType; use crate::cli::parquet::ScprMode; @@ -208,8 +206,10 @@ pub fn detect_scpr_config( /// Scan the parquet file's name column and return the distinct channel names /// it contains (sorted, deduped). Used by multi-mode preview so the user can /// see what channels the server will create at ingest. -pub fn discover_multi_channel_names(path: &Path, name_path: &str) -> Result> { - let file = File::open(path).context("failed to open parquet file for channel discovery")?; +pub fn discover_multi_channel_names( + file: R, + name_path: &str, +) -> Result> { let builder = ParquetRecordBatchReaderBuilder::try_new(file) .context("failed to build parquet record batch reader")?; diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs index e3529b8c7..413317b6f 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs @@ -52,7 +52,9 @@ pub async fn run(ctx: Context, args: ScprArgs) -> Result { .map(|c| c.data_type) .unwrap_or_default(); - discover_multi_channel_names(&args.common.path, &multi.name_path)? + let discovery_file = File::open(&args.common.path) + .context("failed to open parquet file for channel discovery")?; + discover_multi_channel_names(discovery_file, &multi.name_path)? .into_iter() .map(|name| ChannelConfig { name, From 35cda5d01f42409a64f2e8e91a1fe355a9649f84 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 12:39:53 -0700 Subject: [PATCH 12/21] cargo fmt --- rust/crates/sift_cli/src/cli/mod.rs | 32 ++++++++++++++++--- .../import/parquet/detect_parquet_schema.rs | 16 ++++++---- .../src/cmd/import/parquet/scpr_dataset.rs | 3 +- .../sift_cli/src/cmd/import/parquet/tests.rs | 24 +++++++++++--- 4 files changed, 58 insertions(+), 17 deletions(-) diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index e56059f69..cf9c310c0 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -419,24 +419,46 @@ pub struct ScprArgs { pub data_path: String, /// Channel name for every row in the file - #[arg( long, required_if_eq("mode", "single"), conflicts_with = "name_path", help_heading = "Single mode options",)] + #[arg( + long, + required_if_eq("mode", "single"), + conflicts_with = "name_path", + help_heading = "Single mode options" + )] pub channel_name: Option, /// Data type for the channel. Use `"infer"` to have the program infer the /// data type from the parquet schema. - #[arg(long, conflicts_with = "name_path", help_heading = "Single mode options")] + #[arg( + long, + conflicts_with = "name_path", + help_heading = "Single mode options" + )] pub data_type: Option, /// Channel units - #[arg(long, conflicts_with = "name_path", help_heading = "Single mode options")] + #[arg( + long, + conflicts_with = "name_path", + help_heading = "Single mode options" + )] pub unit: Option, /// Channel description - #[arg( short = 'n', long, conflicts_with = "name_path", help_heading = "Single mode options",)] + #[arg( + short = 'n', + long, + conflicts_with = "name_path", + help_heading = "Single mode options" + )] pub description: Option, /// Path to the column holding channel names - #[arg( long, required_if_eq("mode", "multi"), help_heading = "Multi mode options",)] + #[arg( + long, + required_if_eq("mode", "multi"), + help_heading = "Multi mode options" + )] pub name_path: Option, /// Strategy for handling complex types (maps, lists, structs) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index fc22af236..a9f73d27c 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -113,7 +113,10 @@ pub fn detect_scpr_config( .iter() .find(|field| field.name() == &args.time_path) .with_context(|| { - format!("time column '{}' not found in parquet schema", args.time_path) + format!( + "time column '{}' not found in parquet schema", + args.time_path + ) })?; let time_column = Some(ParquetTimeColumn { @@ -127,7 +130,10 @@ pub fn detect_scpr_config( .iter() .find(|f| f.name() == &args.data_path) .with_context(|| { - format!("data column '{}' not found in parquet schema", args.data_path) + format!( + "data column '{}' not found in parquet schema", + args.data_path + ) })?; let data_channel_type = arrow_type_to_channel_data_type(data_field.data_type()) .with_context(|| format!("unsupported data type for column '{}'", args.data_path))?; @@ -176,10 +182,8 @@ pub fn detect_scpr_config( .with_context(|| { format!("name column '{name_path}' not found in parquet schema") })?; - let name_channel_type = - arrow_type_to_channel_data_type(name_field.data_type()).with_context(|| { - format!("unsupported data type for name column '{name_path}'") - })?; + let name_channel_type = arrow_type_to_channel_data_type(name_field.data_type()) + .with_context(|| format!("unsupported data type for name column '{name_path}'"))?; columns.push(ParquetColumn { path: name_path.clone(), diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs index 413317b6f..9005d114a 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs @@ -31,7 +31,8 @@ pub async fn run(ctx: Context, args: ScprArgs) -> Result { let mut file = File::open(&args.common.path).context("failed to open parquet file")?; let footer_md = FooterMetadata::try_from(&mut file)?; - let scpr_config = detect_scpr_config(&file, &args).context("failed to detect parquet schema")?; + let scpr_config = + detect_scpr_config(&file, &args).context("failed to detect parquet schema")?; if args.common.preview { let run_label = args diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index 09599deb1..920f6d8f9 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -480,8 +480,14 @@ fn test_detect_scpr_multi_basic() { // top-level columns should include both data and name columns let paths: Vec<&str> = cfg.columns.iter().map(|c| c.path.as_str()).collect(); - assert!(paths.contains(&"value"), "columns should include data column"); - assert!(paths.contains(&"channel"), "columns should include name column"); + assert!( + paths.contains(&"value"), + "columns should include data column" + ); + assert!( + paths.contains(&"channel"), + "columns should include name column" + ); } #[test] @@ -530,7 +536,13 @@ fn test_detect_scpr_multi_missing_name_column_errors() { #[test] fn test_discover_multi_channel_names_dedups_and_sorts() { - let batch = create_scpr_multi_batch(vec!["voltage", "temperature", "pressure", "voltage", "temperature"]); + let batch = create_scpr_multi_batch(vec![ + "voltage", + "temperature", + "pressure", + "voltage", + "temperature", + ]); let bytes = write_to_parquet_bytes(&batch); let names = detect_parquet_schema::discover_multi_channel_names(bytes, "channel") @@ -546,7 +558,8 @@ fn test_discover_multi_channel_names_errors_on_non_string_column() { let err = detect_parquet_schema::discover_multi_channel_names(bytes, "value").unwrap_err(); assert!( - err.chain().any(|e| e.to_string().contains("must be a string type")), + err.chain() + .any(|e| e.to_string().contains("must be a string type")), "expected non-string error, got: {err:#}" ); } @@ -556,7 +569,8 @@ fn test_discover_multi_channel_names_missing_column_errors() { let batch = create_scpr_multi_batch(vec!["a"]); let bytes = write_to_parquet_bytes(&batch); - let err = detect_parquet_schema::discover_multi_channel_names(bytes, "no_such_col").unwrap_err(); + let err = + detect_parquet_schema::discover_multi_channel_names(bytes, "no_such_col").unwrap_err(); assert!( err.chain().any(|e| e.to_string().contains("not found")), "expected not-found error, got: {err:#}" From 8acb734d92cff53f8c5d6c680574b8b80b8880d9 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 13:23:19 -0700 Subject: [PATCH 13/21] polish and function and var renames --- .../src/cmd/import/parquet/detect_parquet_schema.rs | 11 ++++------- .../sift_cli/src/cmd/import/parquet/scpr_dataset.rs | 4 ++-- rust/crates/sift_cli/src/cmd/import/parquet/tests.rs | 12 ++++++------ 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index a9f73d27c..d431c6c29 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -128,7 +128,7 @@ pub fn detect_scpr_config( let data_field = arrow_schema .fields() .iter() - .find(|f| f.name() == &args.data_path) + .find(|field| field.name() == &args.data_path) .with_context(|| { format!( "data column '{}' not found in parquet schema", @@ -178,7 +178,7 @@ pub fn detect_scpr_config( let name_field = arrow_schema .fields() .iter() - .find(|f| f.name() == name_path) + .find(|field| field.name() == name_path) .with_context(|| { format!("name column '{name_path}' not found in parquet schema") })?; @@ -207,10 +207,7 @@ pub fn detect_scpr_config( }) } -/// Scan the parquet file's name column and return the distinct channel names -/// it contains (sorted, deduped). Used by multi-mode preview so the user can -/// see what channels the server will create at ingest. -pub fn discover_multi_channel_names( +pub fn discover_multi_channel_names_for_preview( file: R, name_path: &str, ) -> Result> { @@ -221,7 +218,7 @@ pub fn discover_multi_channel_names( let name_idx = schema .fields() .iter() - .position(|f| f.name() == name_path) + .position(|field| field.name() == name_path) .with_context(|| format!("name column '{name_path}' not found in parquet schema"))?; let projection = ProjectionMask::roots(builder.parquet_schema(), [name_idx]); diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs index 9005d114a..eabcad30a 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs @@ -14,7 +14,7 @@ use sift_rs::{ use crate::cli::ScprArgs; use crate::cmd::import::parquet::detect_parquet_schema::{ - detect_scpr_config, discover_multi_channel_names, + detect_scpr_config, discover_multi_channel_names_for_preview, }; use crate::cmd::{ Context, @@ -55,7 +55,7 @@ pub async fn run(ctx: Context, args: ScprArgs) -> Result { let discovery_file = File::open(&args.common.path) .context("failed to open parquet file for channel discovery")?; - discover_multi_channel_names(discovery_file, &multi.name_path)? + discover_multi_channel_names_for_preview(discovery_file, &multi.name_path)? .into_iter() .map(|name| ChannelConfig { name, diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index 920f6d8f9..21e756ed2 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -535,7 +535,7 @@ fn test_detect_scpr_multi_missing_name_column_errors() { } #[test] -fn test_discover_multi_channel_names_dedups_and_sorts() { +fn test_discover_multi_channel_names_for_preview_dedups_and_sorts() { let batch = create_scpr_multi_batch(vec![ "voltage", "temperature", @@ -545,18 +545,18 @@ fn test_discover_multi_channel_names_dedups_and_sorts() { ]); let bytes = write_to_parquet_bytes(&batch); - let names = detect_parquet_schema::discover_multi_channel_names(bytes, "channel") + let names = detect_parquet_schema::discover_multi_channel_names_for_preview(bytes, "channel") .expect("discovery should succeed"); assert_eq!(names, vec!["pressure", "temperature", "voltage"]); } #[test] -fn test_discover_multi_channel_names_errors_on_non_string_column() { +fn test_discover_multi_channel_names_for_preview_errors_on_non_string_column() { // Use the single batch — `value` is Float64 let batch = create_scpr_single_batch(); let bytes = write_to_parquet_bytes(&batch); - let err = detect_parquet_schema::discover_multi_channel_names(bytes, "value").unwrap_err(); + let err = detect_parquet_schema::discover_multi_channel_names_for_preview(bytes, "value").unwrap_err(); assert!( err.chain() .any(|e| e.to_string().contains("must be a string type")), @@ -565,12 +565,12 @@ fn test_discover_multi_channel_names_errors_on_non_string_column() { } #[test] -fn test_discover_multi_channel_names_missing_column_errors() { +fn test_discover_multi_channel_names_for_preview_missing_column_errors() { let batch = create_scpr_multi_batch(vec!["a"]); let bytes = write_to_parquet_bytes(&batch); let err = - detect_parquet_schema::discover_multi_channel_names(bytes, "no_such_col").unwrap_err(); + detect_parquet_schema::discover_multi_channel_names_for_preview(bytes, "no_such_col").unwrap_err(); assert!( err.chain().any(|e| e.to_string().contains("not found")), "expected not-found error, got: {err:#}" From 75e0a2622b3a7c6fff076446146e477c165f1952 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 13:26:08 -0700 Subject: [PATCH 14/21] restructure discover multi channel names --- .../import/parquet/detect_parquet_schema.rs | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index d431c6c29..fc49b1b65 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -1,5 +1,5 @@ use crate::cmd::import::utils::validate_time_format; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; use arrow_array::{Array, LargeStringArray, StringArray}; use arrow_schema::DataType; use chrono::DateTime; @@ -221,6 +221,17 @@ pub fn discover_multi_channel_names_for_preview( .position(|field| field.name() == name_path) .with_context(|| format!("name column '{name_path}' not found in parquet schema"))?; + let name_col_type = schema.field(name_idx).data_type().clone(); + let is_large = match name_col_type { + DataType::Utf8 => false, + DataType::LargeUtf8 => true, + other => { + return Err(anyhow!( + "name column '{name_path}' must be a string type; got {other:?}" + )); + } + }; + let projection = ProjectionMask::roots(builder.parquet_schema(), [name_idx]); let reader = builder .with_projection(projection) @@ -231,23 +242,26 @@ pub fn discover_multi_channel_names_for_preview( for batch in reader { let batch = batch.context("failed to read parquet record batch")?; let col = batch.column(0); - if let Some(arr) = col.as_any().downcast_ref::() { + if is_large { + let arr = col + .as_any() + .downcast_ref::() + .expect("name column type validated above"); for i in 0..arr.len() { if !arr.is_null(i) { seen.insert(arr.value(i).to_string()); } } - } else if let Some(arr) = col.as_any().downcast_ref::() { + } else { + let arr = col + .as_any() + .downcast_ref::() + .expect("name column type validated above"); for i in 0..arr.len() { if !arr.is_null(i) { seen.insert(arr.value(i).to_string()); } } - } else { - anyhow::bail!( - "name column '{name_path}' must be a string type; got {:?}", - col.data_type() - ); } } From af447058014839c60e5dc2c2dcfa0b69102e5501 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 14:11:19 -0700 Subject: [PATCH 15/21] update preview of multi scpr to use field reader --- .../import/parquet/detect_parquet_schema.rs | 81 ++++++++----------- 1 file changed, 34 insertions(+), 47 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index fc49b1b65..e08cf68c2 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -1,12 +1,12 @@ use crate::cmd::import::utils::validate_time_format; use anyhow::{Context, Result, anyhow}; -use arrow_array::{Array, LargeStringArray, StringArray}; use arrow_schema::DataType; use chrono::DateTime; -use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; -use parquet::arrow::{ProjectionMask, parquet_to_arrow_schema}; +use parquet::arrow::parquet_to_arrow_schema; use parquet::file::metadata::ParquetMetaDataReader; -use parquet::file::reader::ChunkReader; +use parquet::file::reader::{ChunkReader, FileReader, SerializedFileReader}; +use parquet::record::Field; +use parquet::schema::types::Type as ParquetSchemaType; use pbjson_types::Timestamp; use sift_rs::{ common::r#type::v1::{ChannelConfig, ChannelDataType}, @@ -211,56 +211,43 @@ pub fn discover_multi_channel_names_for_preview( file: R, name_path: &str, ) -> Result> { - let builder = ParquetRecordBatchReaderBuilder::try_new(file) - .context("failed to build parquet record batch reader")?; + let reader = + SerializedFileReader::new(file).context("failed to build parquet file reader")?; - let schema = builder.schema().clone(); - let name_idx = schema - .fields() + let file_schema = reader.metadata().file_metadata().schema(); + let root_name = file_schema.name().to_string(); + let name_field = file_schema + .get_fields() .iter() - .position(|field| field.name() == name_path) - .with_context(|| format!("name column '{name_path}' not found in parquet schema"))?; - - let name_col_type = schema.field(name_idx).data_type().clone(); - let is_large = match name_col_type { - DataType::Utf8 => false, - DataType::LargeUtf8 => true, - other => { - return Err(anyhow!( - "name column '{name_path}' must be a string type; got {other:?}" - )); - } - }; + .find(|t| t.name() == name_path) + .with_context(|| format!("name column '{name_path}' not found in parquet schema"))? + .clone(); - let projection = ProjectionMask::roots(builder.parquet_schema(), [name_idx]); - let reader = builder - .with_projection(projection) + let projection = ParquetSchemaType::group_type_builder(&root_name) + .with_fields(vec![name_field]) .build() - .context("failed to build projected parquet reader")?; + .context("failed to build parquet projection schema")?; + + let row_iter = reader + .get_row_iter(Some(projection)) + .context("failed to build parquet row iterator")?; let mut seen: HashSet = HashSet::new(); - for batch in reader { - let batch = batch.context("failed to read parquet record batch")?; - let col = batch.column(0); - if is_large { - let arr = col - .as_any() - .downcast_ref::() - .expect("name column type validated above"); - for i in 0..arr.len() { - if !arr.is_null(i) { - seen.insert(arr.value(i).to_string()); - } + for row_result in row_iter { + let row = row_result.context("failed to read parquet row")?; + let (_, field) = row + .get_column_iter() + .next() + .ok_or_else(|| anyhow!("internal: projected row missing column"))?; + match field { + Field::Str(s) => { + seen.insert(s.clone()); } - } else { - let arr = col - .as_any() - .downcast_ref::() - .expect("name column type validated above"); - for i in 0..arr.len() { - if !arr.is_null(i) { - seen.insert(arr.value(i).to_string()); - } + Field::Null => {} + other => { + return Err(anyhow!( + "name column '{name_path}' must be a string type; got {other:?}" + )); } } } From 6812ff05d02ded4bc77d3db60573b0d5a0da1e46 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 14:17:19 -0700 Subject: [PATCH 16/21] moving detect multi channel in scpr_dataset --- .../import/parquet/detect_parquet_schema.rs | 57 +---------------- .../src/cmd/import/parquet/scpr_dataset.rs | 64 +++++++++++++++++-- .../sift_cli/src/cmd/import/parquet/tests.rs | 7 +- 3 files changed, 65 insertions(+), 63 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index e08cf68c2..2926b4876 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -1,12 +1,10 @@ use crate::cmd::import::utils::validate_time_format; -use anyhow::{Context, Result, anyhow}; +use anyhow::{Context, Result}; use arrow_schema::DataType; use chrono::DateTime; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::metadata::ParquetMetaDataReader; -use parquet::file::reader::{ChunkReader, FileReader, SerializedFileReader}; -use parquet::record::Field; -use parquet::schema::types::Type as ParquetSchemaType; +use parquet::file::reader::ChunkReader; use pbjson_types::Timestamp; use sift_rs::{ common::r#type::v1::{ChannelConfig, ChannelDataType}, @@ -17,7 +15,6 @@ use sift_rs::{ parquet_single_channel_per_row_config::Config as ScprInnerConfig, }, }; -use std::collections::HashSet; use crate::cli::channel::DataType as CliDataType; use crate::cli::parquet::ScprMode; @@ -207,56 +204,6 @@ pub fn detect_scpr_config( }) } -pub fn discover_multi_channel_names_for_preview( - file: R, - name_path: &str, -) -> Result> { - let reader = - SerializedFileReader::new(file).context("failed to build parquet file reader")?; - - let file_schema = reader.metadata().file_metadata().schema(); - let root_name = file_schema.name().to_string(); - let name_field = file_schema - .get_fields() - .iter() - .find(|t| t.name() == name_path) - .with_context(|| format!("name column '{name_path}' not found in parquet schema"))? - .clone(); - - let projection = ParquetSchemaType::group_type_builder(&root_name) - .with_fields(vec![name_field]) - .build() - .context("failed to build parquet projection schema")?; - - let row_iter = reader - .get_row_iter(Some(projection)) - .context("failed to build parquet row iterator")?; - - let mut seen: HashSet = HashSet::new(); - for row_result in row_iter { - let row = row_result.context("failed to read parquet row")?; - let (_, field) = row - .get_column_iter() - .next() - .ok_or_else(|| anyhow!("internal: projected row missing column"))?; - match field { - Field::Str(s) => { - seen.insert(s.clone()); - } - Field::Null => {} - other => { - return Err(anyhow!( - "name column '{name_path}' must be a string type; got {other:?}" - )); - } - } - } - - let mut names: Vec = seen.into_iter().collect(); - names.sort(); - Ok(names) -} - pub(super) fn arrow_type_to_channel_data_type(dt: &DataType) -> Option { match dt { DataType::Boolean => Some(ChannelDataType::Bool), diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs index eabcad30a..e86d4b9e6 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs @@ -1,7 +1,10 @@ -use std::{fs::File, io::Seek, process::ExitCode}; +use std::{collections::HashSet, fs::File, io::Seek, process::ExitCode}; -use anyhow::{Context as AnyhowContext, Result}; +use anyhow::{Context as AnyhowContext, Result, anyhow}; use crossterm::style::Stylize; +use parquet::file::reader::{ChunkReader, FileReader, SerializedFileReader}; +use parquet::record::Field; +use parquet::schema::types::Type as ParquetSchemaType; use sift_rs::{ common::r#type::v1::ChannelConfig, data_imports::v2::{ @@ -13,9 +16,7 @@ use sift_rs::{ }; use crate::cli::ScprArgs; -use crate::cmd::import::parquet::detect_parquet_schema::{ - detect_scpr_config, discover_multi_channel_names_for_preview, -}; +use crate::cmd::import::parquet::detect_parquet_schema::detect_scpr_config; use crate::cmd::{ Context, import::{ @@ -134,3 +135,56 @@ fn create_data_import_request( ..Default::default() }) } + +/// Scan the parquet file's name column and return the distinct channel names +/// it contains (sorted, deduped). Used by multi-mode preview so the user can +/// see what channels the server will create at ingest. +pub(super) fn discover_multi_channel_names_for_preview( + file: R, + name_path: &str, +) -> Result> { + let reader = + SerializedFileReader::new(file).context("failed to build parquet file reader")?; + + let file_schema = reader.metadata().file_metadata().schema(); + let root_name = file_schema.name().to_string(); + let name_field = file_schema + .get_fields() + .iter() + .find(|t| t.name() == name_path) + .with_context(|| format!("name column '{name_path}' not found in parquet schema"))? + .clone(); + + let projection = ParquetSchemaType::group_type_builder(&root_name) + .with_fields(vec![name_field]) + .build() + .context("failed to build parquet projection schema")?; + + let row_iter = reader + .get_row_iter(Some(projection)) + .context("failed to build parquet row iterator")?; + + let mut seen: HashSet = HashSet::new(); + for row_result in row_iter { + let row = row_result.context("failed to read parquet row")?; + let (_, field) = row + .get_column_iter() + .next() + .ok_or_else(|| anyhow!("internal: projected row missing column"))?; + match field { + Field::Str(s) => { + seen.insert(s.clone()); + } + Field::Null => {} + other => { + return Err(anyhow!( + "name column '{name_path}' must be a string type; got {other:?}" + )); + } + } + } + + let mut names: Vec = seen.into_iter().collect(); + names.sort(); + Ok(names) +} diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index 21e756ed2..786b4e46f 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -5,6 +5,7 @@ use crate::cli::{ time::TimeFormat, }; use crate::cmd::import::parquet::detect_parquet_schema::{self, arrow_type_to_channel_data_type}; +use crate::cmd::import::parquet::scpr_dataset; use arrow_array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, TimestampSecondArray, UInt32Array, UInt64Array, @@ -545,7 +546,7 @@ fn test_discover_multi_channel_names_for_preview_dedups_and_sorts() { ]); let bytes = write_to_parquet_bytes(&batch); - let names = detect_parquet_schema::discover_multi_channel_names_for_preview(bytes, "channel") + let names = scpr_dataset::discover_multi_channel_names_for_preview(bytes, "channel") .expect("discovery should succeed"); assert_eq!(names, vec!["pressure", "temperature", "voltage"]); } @@ -556,7 +557,7 @@ fn test_discover_multi_channel_names_for_preview_errors_on_non_string_column() { let batch = create_scpr_single_batch(); let bytes = write_to_parquet_bytes(&batch); - let err = detect_parquet_schema::discover_multi_channel_names_for_preview(bytes, "value").unwrap_err(); + let err = scpr_dataset::discover_multi_channel_names_for_preview(bytes, "value").unwrap_err(); assert!( err.chain() .any(|e| e.to_string().contains("must be a string type")), @@ -570,7 +571,7 @@ fn test_discover_multi_channel_names_for_preview_missing_column_errors() { let bytes = write_to_parquet_bytes(&batch); let err = - detect_parquet_schema::discover_multi_channel_names_for_preview(bytes, "no_such_col").unwrap_err(); + scpr_dataset::discover_multi_channel_names_for_preview(bytes, "no_such_col").unwrap_err(); assert!( err.chain().any(|e| e.to_string().contains("not found")), "expected not-found error, got: {err:#}" From e996689fabc7b20ea4e346784224367017f982a1 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 14:19:48 -0700 Subject: [PATCH 17/21] cargo fmt --- rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs index e86d4b9e6..3b11f3a95 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs @@ -143,8 +143,7 @@ pub(super) fn discover_multi_channel_names_for_preview file: R, name_path: &str, ) -> Result> { - let reader = - SerializedFileReader::new(file).context("failed to build parquet file reader")?; + let reader = SerializedFileReader::new(file).context("failed to build parquet file reader")?; let file_schema = reader.metadata().file_metadata().schema(); let root_name = file_schema.name().to_string(); From 61823f7b108bfe5e68c875bcd75cc459664f5b5a Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 14:23:17 -0700 Subject: [PATCH 18/21] update tests --- rust/crates/sift_cli/src/cmd/import/parquet/tests.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index 786b4e46f..cf0a15361 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -337,8 +337,6 @@ fn test_time_path_not_in_parquet_returns_error() { ); } -// ---------- SCPR helpers and tests ---------- - fn make_scpr_args(mode: ScprMode, time_format: TimeFormat) -> ScprArgs { ScprArgs { common: CommonImportArgs { From 9db8da7c32b1e8de4fd7e8f88353d2c17439d72f Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 14:29:08 -0700 Subject: [PATCH 19/21] tests --- rust/crates/sift_cli/src/cmd/import/parquet/tests.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index cf0a15361..d80efac21 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -477,7 +477,6 @@ fn test_detect_scpr_multi_basic() { assert_eq!(multi.name_path, "channel"); assert_eq!(multi.data_path, "value"); - // top-level columns should include both data and name columns let paths: Vec<&str> = cfg.columns.iter().map(|c| c.path.as_str()).collect(); assert!( paths.contains(&"value"), @@ -551,7 +550,6 @@ fn test_discover_multi_channel_names_for_preview_dedups_and_sorts() { #[test] fn test_discover_multi_channel_names_for_preview_errors_on_non_string_column() { - // Use the single batch — `value` is Float64 let batch = create_scpr_single_batch(); let bytes = write_to_parquet_bytes(&batch); From 21b7344ba5ac075b7f2ff7e46f7d640fb2cde493 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 14:49:09 -0700 Subject: [PATCH 20/21] update wording in cli to properly match up cpr -> single multi --- rust/crates/sift_cli/src/cli/mod.rs | 10 +-- rust/crates/sift_cli/src/cli/parquet.rs | 6 +- .../{scpr_dataset.rs => cpr_dataset.rs} | 28 +++--- .../import/parquet/detect_parquet_schema.rs | 20 ++--- .../sift_cli/src/cmd/import/parquet/mod.rs | 2 +- .../sift_cli/src/cmd/import/parquet/tests.rs | 90 +++++++++---------- rust/crates/sift_cli/src/main.rs | 4 +- 7 files changed, 80 insertions(+), 80 deletions(-) rename rust/crates/sift_cli/src/cmd/import/parquet/{scpr_dataset.rs => cpr_dataset.rs} (87%) diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index cf9c310c0..97ef9ba97 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -1,6 +1,6 @@ use clap::{Parser, Subcommand, crate_description, crate_version}; use clap_complete::Shell; -use parquet::{ComplexTypesMode, ScprMode}; +use parquet::{ComplexTypesMode, CprMode}; pub mod hdf5; pub mod tdms; use hdf5::Hdf5Schema; @@ -343,7 +343,7 @@ pub enum ImportParquetCmd { /// A parquet file laid out single-channel-per-row, either one channel for the whole file /// (single mode) or with a name column identifying the channel for each row (multi mode). - Scpr(ScprArgs), + Cpr(CprArgs), } #[derive(clap::Args)] @@ -394,13 +394,13 @@ pub struct FlatDatasetArgs { } #[derive(clap::Args)] -pub struct ScprArgs { +pub struct CprArgs { #[command(flatten)] pub common: CommonImportArgs, - /// SCPR mode: single-channel or multi-channel + /// Channel-per-row mode: single-channel or multi-channel #[arg(long)] - pub mode: ScprMode, + pub mode: CprMode, /// Path to the time column #[arg(short, long)] diff --git a/rust/crates/sift_cli/src/cli/parquet.rs b/rust/crates/sift_cli/src/cli/parquet.rs index 8256ba0b5..1c5e4cdbb 100644 --- a/rust/crates/sift_cli/src/cli/parquet.rs +++ b/rust/crates/sift_cli/src/cli/parquet.rs @@ -39,16 +39,16 @@ impl Display for ComplexTypesMode { } } -/// Single-channel-per-row mode: tells the importer how each row is shaped. +/// Channel-per-row mode: tells the importer how each row is shaped. #[derive(Debug, Copy, Clone, PartialEq, Eq, ValueEnum)] -pub enum ScprMode { +pub enum CprMode { /// File has [time, value]. All rows belong to one named channel. Single, /// File has [time, name_column, value_column]. Channels created per unique name. Multi, } -impl Display for ScprMode { +impl Display for CprMode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Single => write!(f, "single"), diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs similarity index 87% rename from rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs rename to rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs index 3b11f3a95..6f375be92 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/scpr_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs @@ -11,12 +11,12 @@ use sift_rs::{ CreateDataImportFromUploadRequest, CreateDataImportFromUploadResponse, ParquetComplexTypesImportMode, ParquetConfig, data_import_service_client::DataImportServiceClient, parquet_config::Config, - parquet_single_channel_per_row_config::Config as ScprInnerConfig, + parquet_single_channel_per_row_config::Config as CprInnerConfig, }, }; -use crate::cli::ScprArgs; -use crate::cmd::import::parquet::detect_parquet_schema::detect_scpr_config; +use crate::cli::CprArgs; +use crate::cmd::import::parquet::detect_parquet_schema::detect_cpr_config; use crate::cmd::{ Context, import::{ @@ -26,14 +26,14 @@ use crate::cmd::{ }; use crate::util::{api::create_grpc_channel, tty::Output}; -pub async fn run(ctx: Context, args: ScprArgs) -> Result { +pub async fn run(ctx: Context, args: CprArgs) -> Result { let grpc_channel = create_grpc_channel(&ctx)?; let mut data_imports_client = DataImportServiceClient::new(grpc_channel.clone()); let mut file = File::open(&args.common.path).context("failed to open parquet file")?; let footer_md = FooterMetadata::try_from(&mut file)?; - let scpr_config = - detect_scpr_config(&file, &args).context("failed to detect parquet schema")?; + let cpr_config = + detect_cpr_config(&file, &args).context("failed to detect parquet schema")?; if args.common.preview { let run_label = args @@ -44,9 +44,9 @@ pub async fn run(ctx: Context, args: ScprArgs) -> Result { .or(args.common.run.as_deref()) .unwrap_or(""); - let multi_channels: Vec = match scpr_config.config.as_ref() { - Some(ScprInnerConfig::MultiChannel(multi)) => { - let data_type = scpr_config + let multi_channels: Vec = match cpr_config.config.as_ref() { + Some(CprInnerConfig::MultiChannel(multi)) => { + let data_type = cpr_config .columns .iter() .find(|c| c.path == multi.data_path) @@ -68,9 +68,9 @@ pub async fn run(ctx: Context, args: ScprArgs) -> Result { _ => Vec::new(), }; - let preview_channels: Vec<&ChannelConfig> = match scpr_config.config.as_ref() { - Some(ScprInnerConfig::SingleChannel(single)) => single.channel.iter().collect(), - Some(ScprInnerConfig::MultiChannel(_)) => multi_channels.iter().collect(), + let preview_channels: Vec<&ChannelConfig> = match cpr_config.config.as_ref() { + Some(CprInnerConfig::SingleChannel(single)) => single.channel.iter().collect(), + Some(CprInnerConfig::MultiChannel(_)) => multi_channels.iter().collect(), None => Vec::new(), }; @@ -79,7 +79,7 @@ pub async fn run(ctx: Context, args: ScprArgs) -> Result { } let parquet_config = ParquetConfig { - config: Some(Config::SingleChannelPerRow(scpr_config)), + config: Some(Config::SingleChannelPerRow(cpr_config)), ..Default::default() }; let create_data_import_req = create_data_import_request(&args, parquet_config, footer_md)?; @@ -114,7 +114,7 @@ pub async fn run(ctx: Context, args: ScprArgs) -> Result { } fn create_data_import_request( - args: &ScprArgs, + args: &CprArgs, config: ParquetConfig, footer_md: FooterMetadata, ) -> Result { diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index 2926b4876..d00f456b0 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -12,13 +12,13 @@ use sift_rs::{ ParquetColumn, ParquetDataColumn, ParquetFlatDatasetConfig, ParquetSingleChannelPerRowConfig, ParquetSingleChannelPerRowMultiChannelConfig, ParquetSingleChannelPerRowSingleChannelConfig, ParquetTimeColumn, TimeFormat, - parquet_single_channel_per_row_config::Config as ScprInnerConfig, + parquet_single_channel_per_row_config::Config as CprInnerConfig, }, }; use crate::cli::channel::DataType as CliDataType; -use crate::cli::parquet::ScprMode; -use crate::cli::{FlatDatasetArgs, ScprArgs}; +use crate::cli::parquet::CprMode; +use crate::cli::{FlatDatasetArgs, CprArgs}; pub fn detect_flat_dataset_config( file: &R, @@ -82,9 +82,9 @@ pub fn detect_flat_dataset_config( }) } -pub fn detect_scpr_config( +pub fn detect_cpr_config( file: &R, - args: &ScprArgs, + args: &CprArgs, ) -> Result { validate_time_format(args.time_format, &args.relative_start_time) .context("validating time format")?; @@ -94,7 +94,7 @@ pub fn detect_scpr_config( metadata.file_metadata().schema_descr(), metadata.file_metadata().key_value_metadata(), ) - .context("detecting scpr arrow schema")?; + .context("detecting cpr arrow schema")?; let relative_start_time = match &args.relative_start_time { Some(start) => { @@ -144,7 +144,7 @@ pub fn detect_scpr_config( }]; let inner_config = match args.mode { - ScprMode::Single => { + CprMode::Single => { let channel_name = args .channel_name .as_ref() @@ -155,7 +155,7 @@ pub fn detect_scpr_config( Some(ref dt) => ChannelDataType::from(dt.clone()), }; - ScprInnerConfig::SingleChannel(ParquetSingleChannelPerRowSingleChannelConfig { + CprInnerConfig::SingleChannel(ParquetSingleChannelPerRowSingleChannelConfig { data_path: args.data_path.clone(), channel: Some(ChannelConfig { name: channel_name.clone(), @@ -166,7 +166,7 @@ pub fn detect_scpr_config( }), }) } - ScprMode::Multi => { + CprMode::Multi => { let name_path = args .name_path .as_ref() @@ -190,7 +190,7 @@ pub fn detect_scpr_config( }), }); - ScprInnerConfig::MultiChannel(ParquetSingleChannelPerRowMultiChannelConfig { + CprInnerConfig::MultiChannel(ParquetSingleChannelPerRowMultiChannelConfig { name_path: name_path.clone(), data_path: args.data_path.clone(), }) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs b/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs index a793fac23..acd3c8688 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs @@ -6,7 +6,7 @@ use windows::{FooterMetadata, get_footer}; pub mod detect_parquet_schema; pub mod flat_dataset; -pub mod scpr_dataset; +pub mod cpr_dataset; #[cfg(test)] mod tests; diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index d80efac21..990361252 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -1,11 +1,11 @@ use crate::cli::channel::DataType as CliDataType; use crate::cli::{ - CommonImportArgs, FlatDatasetArgs, ScprArgs, - parquet::{ComplexTypesMode, ScprMode}, + CommonImportArgs, FlatDatasetArgs, CprArgs, + parquet::{ComplexTypesMode, CprMode}, time::TimeFormat, }; use crate::cmd::import::parquet::detect_parquet_schema::{self, arrow_type_to_channel_data_type}; -use crate::cmd::import::parquet::scpr_dataset; +use crate::cmd::import::parquet::cpr_dataset; use arrow_array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, TimestampSecondArray, UInt32Array, UInt64Array, @@ -337,8 +337,8 @@ fn test_time_path_not_in_parquet_returns_error() { ); } -fn make_scpr_args(mode: ScprMode, time_format: TimeFormat) -> ScprArgs { - ScprArgs { +fn make_cpr_args(mode: CprMode, time_format: TimeFormat) -> CprArgs { + CprArgs { common: CommonImportArgs { path: PathBuf::from("test.parquet"), asset: "test-asset".into(), @@ -361,7 +361,7 @@ fn make_scpr_args(mode: ScprMode, time_format: TimeFormat) -> ScprArgs { } } -fn create_scpr_single_batch() -> RecordBatch { +fn create_cpr_single_batch() -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("timestamp", DataType::Int64, false), Field::new("value", DataType::Float64, false), @@ -373,10 +373,10 @@ fn create_scpr_single_batch() -> RecordBatch { Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])), ], ) - .expect("failed to create scpr single batch") + .expect("failed to create cpr single batch") } -fn create_scpr_multi_batch(names: Vec<&str>) -> RecordBatch { +fn create_cpr_multi_batch(names: Vec<&str>) -> RecordBatch { let n = names.len(); let schema = Arc::new(Schema::new(vec![ Field::new("timestamp", DataType::Int64, false), @@ -393,18 +393,18 @@ fn create_scpr_multi_batch(names: Vec<&str>) -> RecordBatch { Arc::new(StringArray::from(names)), ], ) - .expect("failed to create scpr multi batch") + .expect("failed to create cpr multi batch") } #[test] -fn test_detect_scpr_single_basic_infers_data_type() { - let batch = create_scpr_single_batch(); +fn test_detect_cpr_single_basic_infers_data_type() { + let batch = create_cpr_single_batch(); let bytes = write_to_parquet_bytes(&batch); - let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); args.channel_name = Some("temperature".into()); - let cfg = detect_parquet_schema::detect_scpr_config(&bytes, &args) - .expect("detect_scpr_config should succeed"); + let cfg = detect_parquet_schema::detect_cpr_config(&bytes, &args) + .expect("detect_cpr_config should succeed"); use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; let inner = cfg.config.as_ref().expect("inner config present"); @@ -421,14 +421,14 @@ fn test_detect_scpr_single_basic_infers_data_type() { } #[test] -fn test_detect_scpr_single_honors_data_type_override() { - let batch = create_scpr_single_batch(); +fn test_detect_cpr_single_honors_data_type_override() { + let batch = create_cpr_single_batch(); let bytes = write_to_parquet_bytes(&batch); - let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); args.channel_name = Some("temperature".into()); args.data_type = Some(CliDataType::Float); - let cfg = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap(); + let cfg = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap(); use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; let InnerConfig::SingleChannel(single) = cfg.config.as_ref().unwrap() else { @@ -442,15 +442,15 @@ fn test_detect_scpr_single_honors_data_type_override() { } #[test] -fn test_detect_scpr_single_propagates_units_and_description() { - let batch = create_scpr_single_batch(); +fn test_detect_cpr_single_propagates_units_and_description() { + let batch = create_cpr_single_batch(); let bytes = write_to_parquet_bytes(&batch); - let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); args.channel_name = Some("temperature".into()); args.unit = Some("celsius".into()); args.description = Some("ambient temperature".into()); - let cfg = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap(); + let cfg = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap(); use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; let InnerConfig::SingleChannel(single) = cfg.config.as_ref().unwrap() else { panic!("expected SingleChannel variant"); @@ -461,14 +461,14 @@ fn test_detect_scpr_single_propagates_units_and_description() { } #[test] -fn test_detect_scpr_multi_basic() { - let batch = create_scpr_multi_batch(vec!["a", "b", "c"]); +fn test_detect_cpr_multi_basic() { + let batch = create_cpr_multi_batch(vec!["a", "b", "c"]); let bytes = write_to_parquet_bytes(&batch); - let mut args = make_scpr_args(ScprMode::Multi, TimeFormat::AbsoluteUnixMilliseconds); + let mut args = make_cpr_args(CprMode::Multi, TimeFormat::AbsoluteUnixMilliseconds); args.name_path = Some("channel".into()); - let cfg = detect_parquet_schema::detect_scpr_config(&bytes, &args) - .expect("detect_scpr_config multi should succeed"); + let cfg = detect_parquet_schema::detect_cpr_config(&bytes, &args) + .expect("detect_cpr_config multi should succeed"); use sift_rs::data_imports::v2::parquet_single_channel_per_row_config::Config as InnerConfig; let InnerConfig::MultiChannel(multi) = cfg.config.as_ref().unwrap() else { @@ -489,14 +489,14 @@ fn test_detect_scpr_multi_basic() { } #[test] -fn test_detect_scpr_missing_time_column_errors() { - let batch = create_scpr_single_batch(); +fn test_detect_cpr_missing_time_column_errors() { + let batch = create_cpr_single_batch(); let bytes = write_to_parquet_bytes(&batch); - let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); args.channel_name = Some("temperature".into()); args.time_path = "nonexistent_time".into(); - let err = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap_err(); + let err = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap_err(); assert!( err.chain().any(|e| e.to_string().contains("time column")), "expected time column error, got: {err:#}" @@ -504,14 +504,14 @@ fn test_detect_scpr_missing_time_column_errors() { } #[test] -fn test_detect_scpr_missing_data_column_errors() { - let batch = create_scpr_single_batch(); +fn test_detect_cpr_missing_data_column_errors() { + let batch = create_cpr_single_batch(); let bytes = write_to_parquet_bytes(&batch); - let mut args = make_scpr_args(ScprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); + let mut args = make_cpr_args(CprMode::Single, TimeFormat::AbsoluteUnixMilliseconds); args.channel_name = Some("temperature".into()); args.data_path = "nonexistent_value".into(); - let err = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap_err(); + let err = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap_err(); assert!( err.chain().any(|e| e.to_string().contains("data column")), "expected data column error, got: {err:#}" @@ -519,13 +519,13 @@ fn test_detect_scpr_missing_data_column_errors() { } #[test] -fn test_detect_scpr_multi_missing_name_column_errors() { - let batch = create_scpr_multi_batch(vec!["a"]); +fn test_detect_cpr_multi_missing_name_column_errors() { + let batch = create_cpr_multi_batch(vec!["a"]); let bytes = write_to_parquet_bytes(&batch); - let mut args = make_scpr_args(ScprMode::Multi, TimeFormat::AbsoluteUnixMilliseconds); + let mut args = make_cpr_args(CprMode::Multi, TimeFormat::AbsoluteUnixMilliseconds); args.name_path = Some("nonexistent_name".into()); - let err = detect_parquet_schema::detect_scpr_config(&bytes, &args).unwrap_err(); + let err = detect_parquet_schema::detect_cpr_config(&bytes, &args).unwrap_err(); assert!( err.chain().any(|e| e.to_string().contains("name column")), "expected name column error, got: {err:#}" @@ -534,7 +534,7 @@ fn test_detect_scpr_multi_missing_name_column_errors() { #[test] fn test_discover_multi_channel_names_for_preview_dedups_and_sorts() { - let batch = create_scpr_multi_batch(vec![ + let batch = create_cpr_multi_batch(vec![ "voltage", "temperature", "pressure", @@ -543,17 +543,17 @@ fn test_discover_multi_channel_names_for_preview_dedups_and_sorts() { ]); let bytes = write_to_parquet_bytes(&batch); - let names = scpr_dataset::discover_multi_channel_names_for_preview(bytes, "channel") + let names = cpr_dataset::discover_multi_channel_names_for_preview(bytes, "channel") .expect("discovery should succeed"); assert_eq!(names, vec!["pressure", "temperature", "voltage"]); } #[test] fn test_discover_multi_channel_names_for_preview_errors_on_non_string_column() { - let batch = create_scpr_single_batch(); + let batch = create_cpr_single_batch(); let bytes = write_to_parquet_bytes(&batch); - let err = scpr_dataset::discover_multi_channel_names_for_preview(bytes, "value").unwrap_err(); + let err = cpr_dataset::discover_multi_channel_names_for_preview(bytes, "value").unwrap_err(); assert!( err.chain() .any(|e| e.to_string().contains("must be a string type")), @@ -563,11 +563,11 @@ fn test_discover_multi_channel_names_for_preview_errors_on_non_string_column() { #[test] fn test_discover_multi_channel_names_for_preview_missing_column_errors() { - let batch = create_scpr_multi_batch(vec!["a"]); + let batch = create_cpr_multi_batch(vec!["a"]); let bytes = write_to_parquet_bytes(&batch); let err = - scpr_dataset::discover_multi_channel_names_for_preview(bytes, "no_such_col").unwrap_err(); + cpr_dataset::discover_multi_channel_names_for_preview(bytes, "no_such_col").unwrap_err(); assert!( err.chain().any(|e| e.to_string().contains("not found")), "expected not-found error, got: {err:#}" diff --git a/rust/crates/sift_cli/src/main.rs b/rust/crates/sift_cli/src/main.rs index 269d881d4..d9ca8f099 100644 --- a/rust/crates/sift_cli/src/main.rs +++ b/rust/crates/sift_cli/src/main.rs @@ -74,8 +74,8 @@ fn run(clargs: cli::Args) -> Result { cli::ImportParquetCmd::FlatDataset(args) => { run_future(cmd::import::parquet::flat_dataset::run(ctx, args)) } - cli::ImportParquetCmd::Scpr(args) => { - run_future(cmd::import::parquet::scpr_dataset::run(ctx, args)) + cli::ImportParquetCmd::Cpr(args) => { + run_future(cmd::import::parquet::cpr_dataset::run(ctx, args)) } }, cli::ImportCmd::Tdms(args) => { From 7788f0111d2b1f7b367f171e6e325788e3349423 Mon Sep 17 00:00:00 2001 From: Brandon Shippy Date: Fri, 22 May 2026 14:49:44 -0700 Subject: [PATCH 21/21] cargo fmt --- rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs | 3 +-- .../sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs | 2 +- rust/crates/sift_cli/src/cmd/import/parquet/mod.rs | 2 +- rust/crates/sift_cli/src/cmd/import/parquet/tests.rs | 4 ++-- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs index 6f375be92..e1e062296 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs @@ -32,8 +32,7 @@ pub async fn run(ctx: Context, args: CprArgs) -> Result { let mut file = File::open(&args.common.path).context("failed to open parquet file")?; let footer_md = FooterMetadata::try_from(&mut file)?; - let cpr_config = - detect_cpr_config(&file, &args).context("failed to detect parquet schema")?; + let cpr_config = detect_cpr_config(&file, &args).context("failed to detect parquet schema")?; if args.common.preview { let run_label = args diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs index d00f456b0..aa717bcf4 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/detect_parquet_schema.rs @@ -18,7 +18,7 @@ use sift_rs::{ use crate::cli::channel::DataType as CliDataType; use crate::cli::parquet::CprMode; -use crate::cli::{FlatDatasetArgs, CprArgs}; +use crate::cli::{CprArgs, FlatDatasetArgs}; pub fn detect_flat_dataset_config( file: &R, diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs b/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs index acd3c8688..68a21b1fc 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/mod.rs @@ -4,9 +4,9 @@ use unix::FooterMetadata; #[cfg(target_os = "windows")] use windows::{FooterMetadata, get_footer}; +pub mod cpr_dataset; pub mod detect_parquet_schema; pub mod flat_dataset; -pub mod cpr_dataset; #[cfg(test)] mod tests; diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs index 990361252..1c27048ee 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/tests.rs @@ -1,11 +1,11 @@ use crate::cli::channel::DataType as CliDataType; use crate::cli::{ - CommonImportArgs, FlatDatasetArgs, CprArgs, + CommonImportArgs, CprArgs, FlatDatasetArgs, parquet::{ComplexTypesMode, CprMode}, time::TimeFormat, }; -use crate::cmd::import::parquet::detect_parquet_schema::{self, arrow_type_to_channel_data_type}; use crate::cmd::import::parquet::cpr_dataset; +use crate::cmd::import::parquet::detect_parquet_schema::{self, arrow_type_to_channel_data_type}; use arrow_array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, TimestampSecondArray, UInt32Array, UInt64Array,