Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/crates/sift_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ path = "src/main.rs"

[dependencies]
anyhow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
Expand All @@ -42,5 +43,4 @@ zip = { workspace = true }

[dev-dependencies]
indoc = { workspace = true }
arrow-array = { workspace = true }
bytes = { workspace = true }
93 changes: 77 additions & 16 deletions rust/crates/sift_cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::{Parser, Subcommand, crate_description, crate_version};
use clap_complete::Shell;
use parquet::ComplexTypesMode;
use parquet::{ComplexTypesMode, CprMode};
pub mod hdf5;
pub mod tdms;
use hdf5::Hdf5Schema;
Expand Down Expand Up @@ -340,20 +340,16 @@ pub enum ImportParquetCmd {
/// A parquet file where every column is exclusive to a single channel except for the time
/// column
FlatDataset(FlatDatasetArgs),

/// A parquet file laid out single-channel-per-row, either one channel for the whole file
/// (single mode) or with a name column identifying the channel for each row (multi mode).
Cpr(CprArgs),
}

#[derive(clap::Args)]
pub struct FlatDatasetArgs {
/// Path to the Parquet file to import
pub path: PathBuf,

/// Name of the asset this data belongs to
#[arg(short, long)]
pub asset: String,

/// Optional run name to associate with this import
#[arg(short, long)]
pub run: Option<String>,
#[command(flatten)]
pub common: CommonImportArgs,

/// Paths of data columns to import; can be specified multiple times
#[arg(short, long)]
Expand Down Expand Up @@ -395,14 +391,79 @@ pub struct FlatDatasetArgs {
/// Strategy for handling complex types (maps, lists, structs)
#[arg(short = 'm', long, default_value_t = ComplexTypesMode::default())]
pub complex_types_mode: ComplexTypesMode,
}

/// Wait until the import finishes processing
#[arg(short, long)]
pub wait: bool,
#[derive(clap::Args)]
pub struct CprArgs {
#[command(flatten)]
pub common: CommonImportArgs,

/// Preview the parsed schema without uploading
/// Channel-per-row mode: single-channel or multi-channel
#[arg(long)]
pub mode: CprMode,

/// Path to the time column
#[arg(short, long)]
pub preview: bool,
pub time_path: String,

/// Time format used in the time column
#[arg(short = 'f', long)]
pub time_format: TimeFormat,

/// Start time (RFC3339) to use if time format is relative
#[arg(short = 's', long)]
pub relative_start_time: Option<String>,

/// Path to the column holding values (used in both modes)
#[arg(long)]
pub data_path: String,

/// Channel name for every row in the file
#[arg(
long,
required_if_eq("mode", "single"),
conflicts_with = "name_path",
help_heading = "Single mode options"
)]
pub channel_name: Option<String>,

/// Data type for the channel. Use `"infer"` to have the program infer the
/// data type from the parquet schema.
#[arg(
long,
conflicts_with = "name_path",
help_heading = "Single mode options"
)]
pub data_type: Option<DataType>,

/// Channel units
#[arg(
long,
conflicts_with = "name_path",
help_heading = "Single mode options"
)]
pub unit: Option<String>,

/// Channel description
#[arg(
short = 'n',
long,
conflicts_with = "name_path",
help_heading = "Single mode options"
)]
pub description: Option<String>,

/// Path to the column holding channel names
#[arg(
long,
required_if_eq("mode", "multi"),
help_heading = "Multi mode options"
)]
pub name_path: Option<String>,

/// Strategy for handling complex types (maps, lists, structs)
#[arg(short = 'm', long, default_value_t = ComplexTypesMode::default())]
pub complex_types_mode: ComplexTypesMode,
}

#[derive(clap::Args)]
Expand Down
18 changes: 18 additions & 0 deletions rust/crates/sift_cli/src/cli/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,21 @@ impl Display for ComplexTypesMode {
}
}
}

/// Channel-per-row mode: tells the importer how each row is shaped.
#[derive(Debug, Copy, Clone, PartialEq, Eq, ValueEnum)]
pub enum CprMode {
/// File has [time, value]. All rows belong to one named channel.
Single,
/// File has [time, name_column, value_column]. Channels created per unique name.
Multi,
}

impl Display for CprMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Single => write!(f, "single"),
Self::Multi => write!(f, "multi"),
}
}
}
188 changes: 188 additions & 0 deletions rust/crates/sift_cli/src/cmd/import/parquet/cpr_dataset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use std::{collections::HashSet, fs::File, io::Seek, process::ExitCode};

use anyhow::{Context as AnyhowContext, Result, anyhow};
use crossterm::style::Stylize;
use parquet::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use parquet::record::Field;
use parquet::schema::types::Type as ParquetSchemaType;
use sift_rs::{
common::r#type::v1::ChannelConfig,
data_imports::v2::{
CreateDataImportFromUploadRequest, CreateDataImportFromUploadResponse,
ParquetComplexTypesImportMode, ParquetConfig,
data_import_service_client::DataImportServiceClient, parquet_config::Config,
parquet_single_channel_per_row_config::Config as CprInnerConfig,
},
};

use crate::cli::CprArgs;
use crate::cmd::import::parquet::detect_parquet_schema::detect_cpr_config;
use crate::cmd::{
Context,
import::{
parquet::FooterMetadata, preview_import_config, utils::upload_gzipped_file,
wait_for_job_completion,
},
};
use crate::util::{api::create_grpc_channel, tty::Output};

pub async fn run(ctx: Context, args: CprArgs) -> Result<ExitCode> {
let grpc_channel = create_grpc_channel(&ctx)?;
let mut data_imports_client = DataImportServiceClient::new(grpc_channel.clone());
let mut file = File::open(&args.common.path).context("failed to open parquet file")?;
let footer_md = FooterMetadata::try_from(&mut file)?;

let cpr_config = detect_cpr_config(&file, &args).context("failed to detect parquet schema")?;

if args.common.preview {
let run_label = args
.common
.run_id
.as_deref()
.filter(|s| !s.is_empty())
.or(args.common.run.as_deref())
.unwrap_or("");

let multi_channels: Vec<ChannelConfig> = match cpr_config.config.as_ref() {
Some(CprInnerConfig::MultiChannel(multi)) => {
let data_type = cpr_config
.columns
.iter()
.find(|c| c.path == multi.data_path)
.and_then(|c| c.column_config.as_ref())
.map(|c| c.data_type)
.unwrap_or_default();

let discovery_file = File::open(&args.common.path)
.context("failed to open parquet file for channel discovery")?;
discover_multi_channel_names_for_preview(discovery_file, &multi.name_path)?
.into_iter()
.map(|name| ChannelConfig {
name,
data_type,
..Default::default()
})
.collect()
}
_ => Vec::new(),
};

let preview_channels: Vec<&ChannelConfig> = match cpr_config.config.as_ref() {
Some(CprInnerConfig::SingleChannel(single)) => single.channel.iter().collect(),
Some(CprInnerConfig::MultiChannel(_)) => multi_channels.iter().collect(),
None => Vec::new(),
};

preview_import_config(&args.common.asset, run_label, &preview_channels);
return Ok(ExitCode::SUCCESS);
}

let parquet_config = ParquetConfig {
config: Some(Config::SingleChannelPerRow(cpr_config)),
..Default::default()
};
let create_data_import_req = create_data_import_request(&args, parquet_config, footer_md)?;

let CreateDataImportFromUploadResponse { upload_url, .. } = data_imports_client
.create_data_import_from_upload(create_data_import_req)
.await
.context("error creating data import")?
.into_inner();

file.rewind()?;
let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/vnd.apache.parquet")
.await
.context("failed to upload Parquet file")?;

let location = args.common.run.as_ref().map_or_else(
|| format!("asset '{}'", args.common.asset.cyan()),
|r| format!("run '{}'", r.clone().cyan()),
);

if !args.common.wait {
Output::new()
.line(format!("{} file for processing", "Uploaded".green()))
.tip(format!(
"Once processing is complete the data will be available on the {location}."
))
.print();
return Ok(ExitCode::SUCCESS);
}

wait_for_job_completion(grpc_channel, job_id, location).await
}

fn create_data_import_request(
args: &CprArgs,
config: ParquetConfig,
footer_md: FooterMetadata,
) -> Result<CreateDataImportFromUploadRequest> {
Ok(CreateDataImportFromUploadRequest {
parquet_config: Some(ParquetConfig {
asset_name: args.common.asset.clone(),
run_name: args.common.run.clone().unwrap_or_default(),
run_id: args.common.run_id.clone().unwrap_or_default(),
footer_offset: footer_md.offset,
footer_length: u32::try_from(footer_md.length)
.context("parquet footer length too large")?,
complex_types_import_mode: ParquetComplexTypesImportMode::from(
args.complex_types_mode.clone(),
)
.into(),
config: config.config,
}),
..Default::default()
})
}

/// Scan the parquet file's name column and return the distinct channel names
/// it contains (sorted, deduped). Used by multi-mode preview so the user can
/// see what channels the server will create at ingest.
pub(super) fn discover_multi_channel_names_for_preview<R: ChunkReader + 'static>(
file: R,
name_path: &str,
) -> Result<Vec<String>> {
let reader = SerializedFileReader::new(file).context("failed to build parquet file reader")?;

let file_schema = reader.metadata().file_metadata().schema();
let root_name = file_schema.name().to_string();
let name_field = file_schema
.get_fields()
.iter()
.find(|t| t.name() == name_path)
.with_context(|| format!("name column '{name_path}' not found in parquet schema"))?
.clone();

let projection = ParquetSchemaType::group_type_builder(&root_name)
.with_fields(vec![name_field])
.build()
.context("failed to build parquet projection schema")?;

let row_iter = reader
.get_row_iter(Some(projection))
.context("failed to build parquet row iterator")?;

let mut seen: HashSet<String> = HashSet::new();
for row_result in row_iter {
let row = row_result.context("failed to read parquet row")?;
let (_, field) = row
.get_column_iter()
.next()
.ok_or_else(|| anyhow!("internal: projected row missing column"))?;
match field {
Field::Str(s) => {
seen.insert(s.clone());
}
Field::Null => {}
other => {
return Err(anyhow!(
"name column '{name_path}' must be a string type; got {other:?}"
));
}
}
}

let mut names: Vec<String> = seen.into_iter().collect();
names.sort();
Ok(names)
}
Loading
Loading