From d08775faa676e6b63abfd1129a673e47f68cde18 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Tue, 25 Nov 2025 18:36:04 -0600 Subject: [PATCH 1/2] rust(feat): Update sift-stream-bindings to support FlowDescriptor/FlowBuilder --- python/pyproject.toml | 12 +- .../benches/message_to_ingest_req.rs | 62 ++---- .../sift_stream/src/stream/mode/bench.rs | 12 +- .../src/stream/mode/ingestion_config.rs | 118 ++++------ .../sift_stream/src/stream/mode/test.rs | 210 ++---------------- rust/crates/sift_stream_bindings/Cargo.toml | 2 +- .../sift_stream_bindings.pyi | 68 +++++- rust/crates/sift_stream_bindings/src/lib.rs | 5 + .../sift_stream_bindings/src/metrics/mod.rs | 18 ++ .../sift_stream_bindings/src/stream/config.rs | 192 +++++++++++++++- .../sift_stream_bindings/src/stream/mod.rs | 39 +++- .../src/stream/request.rs | 19 ++ 12 files changed, 427 insertions(+), 330 deletions(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 17fc5dd3e..0c80b0763 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -59,7 +59,7 @@ all = [ 'pyOpenSSL<24.0.0', 'pyarrow>=17.0.0', 'rosbags~=0.0', - 'sift-stream-bindings>=0.2.0-rc2', + 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', 'types-pyOpenSSL<24.0.0', ] build = [ @@ -100,7 +100,7 @@ dev-all = [ 'pytest==8.2.2', 'rosbags~=0.0', 'ruff~=0.12.10', - 'sift-stream-bindings>=0.2.0-rc2', + 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', 'tomlkit~=0.13.3', 'types-pyOpenSSL<24.0.0', ] @@ -153,7 +153,7 @@ docs-build = [ 'pytest==8.2.2', 'rosbags~=0.0', 'ruff~=0.12.10', - 'sift-stream-bindings>=0.2.0-rc2', + 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', 'tomlkit~=0.13.3', 'types-pyOpenSSL<24.0.0', ] @@ -176,10 +176,10 @@ rosbags = [ 'rosbags~=0.0', ] sift-stream = [ - 'sift-stream-bindings>=0.2.0-rc2', + 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', ] sift-stream-bindings = [ - 'sift-stream-bindings>=0.2.0-rc2', + 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', ] tdms = [ 'npTDMS~=1.9', @@ -215,7 +215,7 @@ docs = ["mkdocs", openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"] tdms = ["npTDMS~=1.9"] rosbags = ["rosbags~=0.0"] -sift-stream = ["sift-stream-bindings>=0.2.0-rc2"] +sift-stream = ["sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4"] hdf5 = ["h5py~=3.11", "polars~=1.8"] data-review = ["pyarrow>=17.0.0"] diff --git a/rust/crates/sift_stream/benches/message_to_ingest_req.rs b/rust/crates/sift_stream/benches/message_to_ingest_req.rs index de31cfa46..b25cf558f 100644 --- a/rust/crates/sift_stream/benches/message_to_ingest_req.rs +++ b/rust/crates/sift_stream/benches/message_to_ingest_req.rs @@ -30,7 +30,7 @@ use std::hint::black_box; use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig}; use sift_stream::stream::mode::ingestion_config::Flow; use sift_stream::{ - ChannelDataType, ChannelValue, TimeValue, Value, + ChannelDataType, ChannelValue, FlowDescriptor, TimeValue, Value, stream::mode::bench::{message_to_ingest_req, message_to_ingest_req_direct}, }; @@ -118,11 +118,9 @@ fn flow_randomized(name: &str, flow_config: &FlowConfig) -> Flow { } // Configuration constants - these can be adjusted to test different scenarios -const NUM_FLOWS: usize = 10; // Number of flow configs to create const NUM_CHANNELS_PER_FLOW: usize = 2000; // Number of channels per flow const INGESTION_CONFIG_ID: &str = "benchmark-config"; const RUN_ID: Option = None; -const FLOW_TO_RANDOMIZE: usize = 8; fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) { // Create a flow with ordered channel values (matching the first flow config) @@ -140,46 +138,26 @@ fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) { } fn benchmark_message_to_ingest_req_ordered(c: &mut Criterion) { - // Create flow configs - let mut flow_configs = Vec::with_capacity(NUM_FLOWS); - for i in 0..NUM_FLOWS { - flow_configs.push(flow_config(&format!("flow_{i}"), NUM_CHANNELS_PER_FLOW)); - } + // Create a flow with ordered channel values. + let flow = flow_config("my_benchmark_flow", NUM_CHANNELS_PER_FLOW); + let message = flow_ordered("my_benchmark_flow", &flow); - // Create a flow with ordered channel values (matching the first flow config) - let message = flow_ordered("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]); + let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap(); c.bench_function("message_to_ingest_req_ordered", |b| { - b.iter(|| { - black_box(message_to_ingest_req( - &message, - INGESTION_CONFIG_ID, - RUN_ID.clone(), - &flow_configs, - )) - }) + b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor))) }); } fn benchmark_message_to_ingest_req_randomized(c: &mut Criterion) { - // Create flow configs - let mut flow_configs = Vec::with_capacity(NUM_FLOWS); - for i in 0..NUM_FLOWS { - flow_configs.push(flow_config(&format!("flow_{i}"), NUM_CHANNELS_PER_FLOW)); - } + // Create a flow with randomized channel values. + let flow = flow_config("my_benchmark_flow", NUM_CHANNELS_PER_FLOW); + let message = flow_randomized("my_benchmark_flow", &flow); - // Create a flow with randomized channel values (matching the first flow config) - let message = flow_randomized("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]); + let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap(); c.bench_function("message_to_ingest_req_randomized", |b| { - b.iter(|| { - black_box(message_to_ingest_req( - &message, - INGESTION_CONFIG_ID, - RUN_ID.clone(), - &flow_configs, - )) - }) + b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor))) }); } @@ -187,14 +165,15 @@ fn benchmark_message_to_ingest_req_varying_sizes(c: &mut Criterion) { let mut group = c.benchmark_group("message_to_ingest_req_varying_sizes"); for &num_channels in &[5, 10, 100, 1000, 5000] { + let flow_name = format!("flow_{num_channels}"); + // Create flow configs with varying channel counts - let mut flow_configs = Vec::with_capacity(NUM_FLOWS); - for i in 0..NUM_FLOWS { - flow_configs.push(flow_config(&format!("flow_{i}"), num_channels)); - } + let flow = flow_config(&flow_name, num_channels); + let message_ordered = flow_ordered(&flow_name, &flow); + let message_randomized = flow_randomized(&flow_name, &flow); + let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap(); // Test direct scenario - let message_ordered = flow_ordered("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]); group.bench_function(&format!("direct_{num_channels}_channels"), |b| { b.iter(|| { black_box(message_to_ingest_req_direct( @@ -209,22 +188,19 @@ fn benchmark_message_to_ingest_req_varying_sizes(c: &mut Criterion) { b.iter(|| { black_box(message_to_ingest_req( &message_ordered, - INGESTION_CONFIG_ID, RUN_ID.clone(), - &flow_configs, + &descriptor, )) }) }); // Test randomized scenario - let message_randomized = flow_randomized("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]); group.bench_function(&format!("randomized_{num_channels}_channels"), |b| { b.iter(|| { black_box(message_to_ingest_req( &message_randomized, - INGESTION_CONFIG_ID, RUN_ID.clone(), - &flow_configs, + &descriptor, )) }) }); diff --git a/rust/crates/sift_stream/src/stream/mode/bench.rs b/rust/crates/sift_stream/src/stream/mode/bench.rs index 833e7e034..ead9edc7f 100644 --- a/rust/crates/sift_stream/src/stream/mode/bench.rs +++ b/rust/crates/sift_stream/src/stream/mode/bench.rs @@ -1,21 +1,15 @@ +use crate::stream::flow::FlowDescriptor; use crate::stream::mode::ingestion_config::Flow; use crate::{IngestionConfigMode, SiftStream}; -use sift_rs::ingestion_configs::v2::FlowConfig; /// Unstable wrapper around [SiftStream::message_to_ingest_req] used for benchmarking purposes. #[inline] pub fn message_to_ingest_req( message: &Flow, - ingestion_config_id: &str, run_id: Option, - flows: &[FlowConfig], + descriptor: &FlowDescriptor, ) -> Option { - SiftStream::::message_to_ingest_req( - message, - ingestion_config_id, - run_id, - flows, - ) + SiftStream::::message_to_ingest_req(message, run_id, descriptor) } /// Unstable wrapper around [SiftStream::message_to_ingest_req_direct] used for benchmarking purposes. diff --git a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs index 27f49a86a..8e5fd616e 100644 --- a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs +++ b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs @@ -1,8 +1,8 @@ use super::super::{SiftStream, SiftStreamMode, channel::ChannelValue, time::TimeValue}; use crate::{ - FlowDescriptor, metrics::SiftStreamMetrics, stream::{ + flow::{FlowBuilder, FlowDescriptor}, run::{RunSelector, load_run_by_form, load_run_by_id}, tasks::{ControlMessage, DataMessage, StreamSystem, TaskConfig, start_tasks}, }, @@ -17,7 +17,6 @@ use sift_error::prelude::*; use sift_rs::{ ingest::v1::{ IngestWithConfigDataChannelValue, IngestWithConfigDataStreamRequest, - ingest_with_config_data_channel_value::Type, }, ingestion_configs::v2::{FlowConfig, IngestionConfig}, runs::v2::Run, @@ -41,7 +40,7 @@ use uuid::Uuid; pub struct IngestionConfigMode { pub(crate) run: Option, ingestion_config: IngestionConfig, - flows_by_name: HashMap>, + flows_by_name: HashMap>, flows_seen: HashSet, sift_stream_id: Uuid, message_id_counter: u64, @@ -99,13 +98,14 @@ impl SiftStream { task_config: TaskConfig, metrics: Arc, ) -> Result { - let mut flows_by_name = HashMap::>::new(); + let ingestion_config_id = ingestion_config.ingestion_config_id.clone(); + let mut flows_by_name = + HashMap::>::with_capacity(flows.len()); for flow in flows { - flows_by_name - .entry(flow.name.clone()) - .and_modify(|group| group.push(flow.clone())) - .or_insert_with(|| vec![flow]); + let flow_name = flow.name.clone(); + let flow_descriptor = FlowDescriptor::try_from((&ingestion_config_id, flow))?; + flows_by_name.insert(flow_name, flow_descriptor); } // Spawn a task to register metrics without blocking @@ -162,7 +162,6 @@ impl SiftStream { pub async fn send(&mut self, message: Flow) -> Result<()> { self.metrics.messages_received.increment(); - let ingestion_config_id = &self.mode.ingestion_config.ingestion_config_id; let run_id = self.mode.run.as_ref().map(|r| r.run_id.clone()); let Some(flows) = self.mode.flows_by_name.get(&message.flow_name) else { @@ -172,12 +171,15 @@ impl SiftStream { "flow '{}' not found in local flow cache - message will still be transmitted but will not show in Sift if the flow was not registered", message.flow_name, ); - let req = Self::message_to_ingest_req_direct(&message, ingestion_config_id, run_id); + let req = Self::message_to_ingest_req_direct( + &message, + &self.mode.ingestion_config.ingestion_config_id, + run_id, + ); return self.send_impl(req); }; let Some(req) = Self::message_to_ingest_req( &message, - &self.mode.ingestion_config.ingestion_config_id, self.mode.run.as_ref().map(|r| r.run_id.clone()), flows, ) else { @@ -187,7 +189,11 @@ impl SiftStream { values = format!("{message:?}"), "encountered a message that doesn't match any cached flows - message will still be transmitted but will not show in Sift if the flow was not registered" ); - let req = Self::message_to_ingest_req_direct(&message, ingestion_config_id, run_id); + let req = Self::message_to_ingest_req_direct( + &message, + &self.mode.ingestion_config.ingestion_config_id, + run_id, + ); return self.send_impl(req); }; self.send_impl(req) @@ -356,11 +362,12 @@ impl SiftStream { self.metrics.loaded_flows.add(filtered.len() as u64); for flow_config in filtered { - self.mode - .flows_by_name - .entry(flow_config.name.clone()) - .and_modify(|flows| flows.push(flow_config.clone())) - .or_insert_with(|| vec![flow_config.clone()]); + let flow_name = flow_config.name.clone(); + let flow_descriptor = FlowDescriptor::try_from(( + self.mode.ingestion_config.ingestion_config_id.clone(), + flow_config, + ))?; + self.mode.flows_by_name.insert(flow_name, flow_descriptor); #[cfg(feature = "tracing")] tracing::info!( @@ -372,43 +379,23 @@ impl SiftStream { Ok(()) } - /// Get a copy of the current flow configs known to SiftStream as a HashMap keyed to the flow name. + /// Get a copy of the current flow descriptors known to SiftStream as a HashMap keyed to the flow name. /// This includes flows provided at initialization, and any existing configs /// previously registered in Sift - pub fn get_flows(&self) -> HashMap { + pub fn get_flows(&self) -> HashMap> { // Currently we get the first FlowConfig provided in the Vec to match how send() validates flows self.mode .flows_by_name .iter() - .filter_map(|(k, v)| { - if v.is_empty() { - None - } else { - Some((k.clone(), v[0].clone())) - } - }) + .map(|(k, v)| (k.clone(), v.clone())) .collect() } /// Get the flow descriptor for a given flow name. pub fn get_flow_descriptor(&self, flow_name: &str) -> Result> { - let Some(flow) = self.mode.flows_by_name.get(flow_name) else { - return Err(Error::new_msg( - ErrorKind::NotFoundError, - format!("flow '{}' not found", flow_name), - )); - }; - - if flow.is_empty() { - return Err(Error::new_msg( - ErrorKind::NotFoundError, - format!("flow '{}' not found", flow_name), - )); - } - - FlowDescriptor::try_from(( - self.mode.ingestion_config.ingestion_config_id.clone(), - &flow[0], + self.mode.flows_by_name.get(flow_name).cloned().ok_or(Error::new_msg( + ErrorKind::NotFoundError, + format!("flow '{}' not found", flow_name), )) } @@ -477,47 +464,26 @@ impl SiftStream { /// in which this returns `None` is if there is no [FlowConfig] for the given `message`. pub(crate) fn message_to_ingest_req( message: &Flow, - ingestion_config_id: &str, run_id: Option, - flows: &[FlowConfig], + descriptor: &FlowDescriptor, ) -> Option { - // Find the flow config for the given flow name. - let found_flow = flows.iter().find(|f| f.name == message.flow_name)?; - // Create a vector of empty channel values. If the provided channel values // have a matching channel name and data type, the value will be updated. - let mut channel_values = found_flow - .channels - .iter() - .map(|_| IngestWithConfigDataChannelValue { - r#type: Some(Type::Empty(pbjson_types::Empty {})), - }) - .collect::>(); - - // Create a map of channel name and data type to the index of the channel in the vector - // so we can update the channel value if it matches. - let channel_map: HashMap<(&str, i32), usize> = found_flow - .channels - .iter() - .enumerate() - .map(|(i, channel)| ((channel.name.as_str(), channel.data_type), i)) - .collect(); + let mut builder = FlowBuilder::new(descriptor); - for v in &message.values { - let i = channel_map.get(&(v.name.as_str(), i32::from(v.value.pb_data_type())))?; - channel_values[*i].r#type = Some(v.pb_value()); + // Update all provided channel values in the flow. + for value in message.values.iter() { + builder + .set_with_key(&value.name, value.value.clone()) + .ok()?; } - let request = IngestWithConfigDataStreamRequest { - flow: message.flow_name.to_string(), - ingestion_config_id: ingestion_config_id.to_string(), - timestamp: Some(message.timestamp.0), - run_id: run_id.unwrap_or_default(), - channel_values, - ..Default::default() - }; + // Attach the run ID to the flow if it is provided. + if let Some(run_id) = run_id.as_ref() { + builder.attach_run_id(run_id); + } - Some(request) + Some(builder.request(message.timestamp.clone())) } /// Creates an [IngestWithConfigDataStreamRequest] directly without consulting the flow cache. diff --git a/rust/crates/sift_stream/src/stream/mode/test.rs b/rust/crates/sift_stream/src/stream/mode/test.rs index 2b52ea20d..5978515c5 100644 --- a/rust/crates/sift_stream/src/stream/mode/test.rs +++ b/rust/crates/sift_stream/src/stream/mode/test.rs @@ -1,5 +1,6 @@ use super::super::SiftStream; use super::ingestion_config::{Flow, IngestionConfigMode}; +use crate::stream::flow::FlowDescriptor; use crate::{ChannelValue, TimeValue}; use sift_rs::{ common::r#type::v1::ChannelDataType, @@ -9,7 +10,7 @@ use sift_rs::{ #[test] fn validate_handling_empty_values() { - let flow_configs = vec![FlowConfig { + let flow_config = FlowConfig { name: String::from("foo"), channels: vec![ ChannelConfig { @@ -34,7 +35,9 @@ fn validate_handling_empty_values() { }, ], ..Default::default() - }]; + }; + let flow_descriptor = FlowDescriptor::try_from(("ingestion_config_id", flow_config)) + .expect("flow descriptor should be generated"); let flow = Flow::new( "foo", @@ -45,13 +48,9 @@ fn validate_handling_empty_values() { ], ); - let req = SiftStream::::message_to_ingest_req( - &flow, - "ingestion-config-id", - None, - &flow_configs, - ) - .expect("request should have been generated"); + let req = + SiftStream::::message_to_ingest_req(&flow, None, &flow_descriptor) + .expect("request should have been generated"); assert!( req.channel_values.len() == 4, @@ -76,7 +75,7 @@ fn validate_handling_empty_values() { #[test] fn validate_handling_no_matches_based_on_name() { - let flow_configs = vec![FlowConfig { + let flow_config = FlowConfig { name: String::from("foo"), channels: vec![ ChannelConfig { @@ -101,7 +100,9 @@ fn validate_handling_no_matches_based_on_name() { }, ], ..Default::default() - }]; + }; + let flow_descriptor = FlowDescriptor::try_from(("ingestion_config_id", flow_config)) + .expect("flow descriptor should be generated"); let flow = Flow::new( "foo", @@ -112,12 +113,8 @@ fn validate_handling_no_matches_based_on_name() { ], ); - let req = SiftStream::::message_to_ingest_req( - &flow, - "ingestion-config-id", - None, - &flow_configs, - ); + let req = + SiftStream::::message_to_ingest_req(&flow, None, &flow_descriptor); assert!( req.is_none(), @@ -127,7 +124,7 @@ fn validate_handling_no_matches_based_on_name() { #[test] fn validate_handling_no_matches_based_on_type() { - let flow_configs = vec![FlowConfig { + let flow_config = FlowConfig { name: String::from("foo"), channels: vec![ ChannelConfig { @@ -152,7 +149,9 @@ fn validate_handling_no_matches_based_on_type() { }, ], ..Default::default() - }]; + }; + let flow_descriptor = FlowDescriptor::try_from(("ingestion_config_id", flow_config)) + .expect("flow descriptor should be generated"); let flow = Flow::new( "foo", @@ -163,180 +162,11 @@ fn validate_handling_no_matches_based_on_type() { ], ); - let req = SiftStream::::message_to_ingest_req( - &flow, - "ingestion-config-id", - None, - &flow_configs, - ); + let req = + SiftStream::::message_to_ingest_req(&flow, None, &flow_descriptor); assert!( req.is_none(), "request should be none because no flows match" ); } - -#[test] -fn validate_handling_message_against_multiple_flows_with_same_name_with_atleast_one_match() { - let flow_configs = vec![ - FlowConfig { - name: String::from("foo"), - channels: vec![ - ChannelConfig { - name: String::from("bar"), - data_type: ChannelDataType::Double.into(), - ..Default::default() - }, - ChannelConfig { - name: String::from("baz"), - data_type: ChannelDataType::Int32.into(), - ..Default::default() - }, - ChannelConfig { - name: String::from("qux"), - data_type: ChannelDataType::Int64.into(), - ..Default::default() - }, - ChannelConfig { - name: String::from("quux"), - data_type: ChannelDataType::Uint32.into(), - ..Default::default() - }, - ], - ..Default::default() - }, - FlowConfig { - name: String::from("foo"), - channels: vec![ - ChannelConfig { - name: String::from("baz"), - data_type: ChannelDataType::Int32.into(), - ..Default::default() - }, - ChannelConfig { - name: String::from("qux"), - data_type: ChannelDataType::Int64.into(), - ..Default::default() - }, - ], - ..Default::default() - }, - ]; - - let flow = Flow::new( - "foo", - TimeValue::default(), - &[ - ChannelValue::new("baz", 10_i32), - ChannelValue::new("quux", 12_u32), - ChannelValue::new("qux", 15_i64), - ], - ); - - let req = SiftStream::::message_to_ingest_req( - &flow, - "ingestion-config-id", - None, - &flow_configs, - ) - .expect("expected request to be generated because there is a matching flow"); - - assert!( - req.channel_values.len() == 4, - "should have 4 channel values since one of the 'foo' flows has 4 channel configs" - ); - - let mut channel_values = req.channel_values.into_iter(); - assert_eq!( - Some(Type::Empty(pbjson_types::Empty {})), - channel_values.next().unwrap().r#type, - "bar should be empty" - ); - assert_eq!( - Some(Type::Int32(10)), - channel_values.next().unwrap().r#type, - "baz should be 10_i32" - ); - assert_eq!( - Some(Type::Int64(15)), - channel_values.next().unwrap().r#type, - "qux should be 15_i64" - ); - assert_eq!( - Some(Type::Uint32(12)), - channel_values.next().unwrap().r#type, - "quux should be 12_u32" - ); -} - -#[test] -fn validate_handling_message_against_multiple_flows_with_same_name_with_no_match() { - let flow_configs = vec![ - FlowConfig { - name: String::from("foo"), - channels: vec![ - ChannelConfig { - name: String::from("bar"), - data_type: ChannelDataType::Double.into(), - ..Default::default() - }, - ChannelConfig { - name: String::from("baz"), - data_type: ChannelDataType::Int32.into(), - ..Default::default() - }, - ChannelConfig { - name: String::from("qux"), - data_type: ChannelDataType::Int64.into(), - ..Default::default() - }, - ChannelConfig { - name: String::from("quux"), - data_type: ChannelDataType::Uint32.into(), - ..Default::default() - }, - ], - ..Default::default() - }, - FlowConfig { - name: String::from("foo"), - channels: vec![ - ChannelConfig { - name: String::from("baz"), - data_type: ChannelDataType::Int32.into(), - ..Default::default() - }, - ChannelConfig { - name: String::from("qux"), - data_type: ChannelDataType::Int64.into(), - ..Default::default() - }, - ], - ..Default::default() - }, - ]; - - let flow = Flow::new( - "foo", - TimeValue::default(), - &[ - ChannelValue::new("baz", 10_i32), - ChannelValue::new("quux", 12_u32), - ChannelValue::new("qux", 15_i64), - ChannelValue::new("foobar", 15_i64), - ChannelValue::new("foobaz", 15_i64), - ], - ); - - let req = SiftStream::::message_to_ingest_req( - &flow, - "ingestion-config-id", - None, - &flow_configs, - ); - - assert!( - req.is_none(), - "should be None because there are no flows that contain all specified channels" - ); -} diff --git a/rust/crates/sift_stream_bindings/Cargo.toml b/rust/crates/sift_stream_bindings/Cargo.toml index 15285744f..6ed61e241 100644 --- a/rust/crates/sift_stream_bindings/Cargo.toml +++ b/rust/crates/sift_stream_bindings/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sift-stream-bindings" -version = "0.2.0-rc.3" +version = "0.2.0-rc.4" edition = { workspace = true } authors = { workspace = true } homepage = { workspace = true } diff --git a/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi b/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi index 8aab9dcaa..ab613cba1 100644 --- a/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi +++ b/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi @@ -7,15 +7,20 @@ __all__ = [ "ChannelDataTypePy", "ChannelEnumPy", "ChannelEnumTypePy", + "ChannelIndexPy", "ChannelValuePy", "ChannelValueTypePy", "CheckpointMetricsSnapshotPy", "DiskBackupPolicyPy", "DurationPy", + "FlowBuilderPy", "FlowConfigPy", + "FlowDescriptorBuilderPy", + "FlowDescriptorPy", "FlowPy", "IngestWithConfigDataChannelValuePy", "IngestWithConfigDataStreamRequestPy", + "IngestWithConfigDataStreamRequestWrapperPy", "IngestionConfigFormPy", "MetadataPy", "MetadataValuePy", @@ -44,6 +49,9 @@ class BackupMetricsSnapshotPy: total_file_count: builtins.int total_bytes: builtins.int total_messages: builtins.int + committed_message_id: builtins.int + queued_checkpoints: builtins.int + queued_file_ctxs: builtins.int files_pending_ingestion: builtins.int files_ingested: builtins.int cur_ingest_retries: builtins.int @@ -75,6 +83,10 @@ class ChannelEnumTypePy: key: builtins.int def __new__(cls, name:builtins.str, key:builtins.int) -> ChannelEnumTypePy: ... +@typing.final +class ChannelIndexPy: + ... + @typing.final class ChannelValuePy: name: builtins.str @@ -136,12 +148,57 @@ class DurationPy: nanos: builtins.int def __new__(cls, secs:builtins.int, nanos:builtins.int) -> DurationPy: ... +@typing.final +class FlowBuilderPy: + def __new__(cls, descriptor:FlowDescriptorPy) -> FlowBuilderPy: ... + def attach_run_id(self, run_id:builtins.str) -> None: + r""" + Attaches a run ID to the flow. + """ + def set(self, index:ChannelIndexPy, value:ValuePy) -> None: + r""" + Sets the value of the channel with the given index. + """ + def set_with_key(self, key:builtins.str, value:ValuePy) -> None: + r""" + Sets the value of the channel with the given key. + """ + def request(self, now:TimeValuePy) -> IngestWithConfigDataStreamRequestWrapperPy: + r""" + Builds an IngestWithConfigDataStreamRequest, consuming the builder. + """ + @typing.final class FlowConfigPy: name: builtins.str channels: builtins.list[ChannelConfigPy] def __new__(cls, name:builtins.str, channels:typing.Sequence[ChannelConfigPy]) -> FlowConfigPy: ... +@typing.final +class FlowDescriptorBuilderPy: + def __new__(cls, ingestion_config_id:builtins.str, name:builtins.str) -> FlowDescriptorBuilderPy: ... + def add(self, key:builtins.str, field_type:ChannelDataTypePy) -> ChannelIndexPy: + r""" + Adds a new channel to the flow. + + This returns the index of the channel in the flow. + """ + def build(self) -> FlowDescriptorPy: + r""" + Builds the FlowDescriptor from the builder. + """ + +@typing.final +class FlowDescriptorPy: + def get(self, key:builtins.str) -> typing.Optional[ChannelDataTypePy]: + r""" + Gets the type of the channel with the given key. + """ + def mapping(self) -> builtins.dict[builtins.str, ChannelIndexPy]: + r""" + Gets the mapping of keys to channel indices. + """ + @typing.final class FlowPy: def __new__(cls, flow_name:builtins.str, timestamp:TimeValuePy, values:typing.Sequence[ChannelValuePy]) -> FlowPy: ... @@ -183,6 +240,10 @@ class IngestWithConfigDataStreamRequestPy: organization_id: builtins.str def __new__(cls, ingestion_config_id:builtins.str, flow:builtins.str, timestamp:typing.Optional[TimeValuePy], channel_values:typing.Sequence[IngestWithConfigDataChannelValuePy], run_id:builtins.str, end_stream_on_validation_error:builtins.bool, organization_id:builtins.str) -> IngestWithConfigDataStreamRequestPy: ... +@typing.final +class IngestWithConfigDataStreamRequestWrapperPy: + ... + @typing.final class IngestionConfigFormPy: asset_name: builtins.str @@ -263,7 +324,10 @@ class SiftStreamMetricsSnapshotPy: bytes_sent: builtins.int byte_rate: builtins.float messages_sent_to_backup: builtins.int + old_messages_dropped_for_ingestion: builtins.int cur_retry_count: builtins.int + ingestion_channel_depth: builtins.int + backup_channel_depth: builtins.int checkpoint: CheckpointMetricsSnapshotPy backups: BackupMetricsSnapshotPy @@ -272,9 +336,11 @@ class SiftStreamPy: def send(self, flow:FlowPy) -> typing.Any: ... def batch_send(self, flows:typing.Any) -> typing.Any: ... def send_requests(self, requests:typing.Sequence[IngestWithConfigDataStreamRequestPy]) -> typing.Any: ... + def send_requests_nonblocking(self, flows:typing.Any) -> None: ... def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy: ... def add_new_flows(self, flow_configs:typing.Sequence[FlowConfigPy]) -> typing.Any: ... - def get_flows(self) -> builtins.dict[builtins.str, FlowConfigPy]: ... + def get_flow_descriptor(self, flow_name:builtins.str) -> FlowDescriptorPy: ... + def get_flows(self) -> builtins.dict[builtins.str, FlowDescriptorPy]: ... def attach_run(self, run_selector:RunSelectorPy) -> typing.Any: ... def detach_run(self) -> None: ... def run(self) -> typing.Optional[builtins.str]: ... diff --git a/rust/crates/sift_stream_bindings/src/lib.rs b/rust/crates/sift_stream_bindings/src/lib.rs index 33d07adfc..ecde12223 100644 --- a/rust/crates/sift_stream_bindings/src/lib.rs +++ b/rust/crates/sift_stream_bindings/src/lib.rs @@ -149,7 +149,11 @@ fn sift_stream_bindings(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -167,6 +171,7 @@ fn sift_stream_bindings(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_function(wrap_pyfunction!(init_tracing, m)?)?; m.add_function(wrap_pyfunction!(init_tracing_with_file, m)?)?; diff --git a/rust/crates/sift_stream_bindings/src/metrics/mod.rs b/rust/crates/sift_stream_bindings/src/metrics/mod.rs index 840e45305..8cecf9d3e 100644 --- a/rust/crates/sift_stream_bindings/src/metrics/mod.rs +++ b/rust/crates/sift_stream_bindings/src/metrics/mod.rs @@ -46,6 +46,12 @@ pub struct BackupMetricsSnapshotPy { #[pyo3(get)] pub total_messages: u64, #[pyo3(get)] + pub committed_message_id: u64, + #[pyo3(get)] + pub queued_checkpoints: u64, + #[pyo3(get)] + pub queued_file_ctxs: u64, + #[pyo3(get)] pub files_pending_ingestion: u64, #[pyo3(get)] pub files_ingested: u64, @@ -76,8 +82,14 @@ pub struct SiftStreamMetricsSnapshotPy { #[pyo3(get)] pub messages_sent_to_backup: u64, #[pyo3(get)] + pub old_messages_dropped_for_ingestion: u64, + #[pyo3(get)] pub cur_retry_count: u64, #[pyo3(get)] + pub ingestion_channel_depth: u64, + #[pyo3(get)] + pub backup_channel_depth: u64, + #[pyo3(get)] pub checkpoint: CheckpointMetricsSnapshotPy, #[pyo3(get)] pub backups: BackupMetricsSnapshotPy, @@ -95,7 +107,10 @@ impl From for SiftStreamMetricsSnapshotPy { bytes_sent: snapshot.bytes_sent, byte_rate: snapshot.byte_rate, messages_sent_to_backup: snapshot.messages_sent_to_backup, + old_messages_dropped_for_ingestion: snapshot.old_messages_dropped_for_ingestion, cur_retry_count: snapshot.cur_retry_count, + ingestion_channel_depth: snapshot.ingestion_channel_depth, + backup_channel_depth: snapshot.backup_channel_depth, checkpoint: CheckpointMetricsSnapshotPy { checkpoint_count: snapshot.checkpoint.checkpoint_count, failed_checkpoint_count: snapshot.checkpoint.failed_checkpoint_count, @@ -117,6 +132,9 @@ impl From for SiftStreamMetricsSnapshotPy { total_file_count: snapshot.backups.total_file_count, total_bytes: snapshot.backups.total_bytes, total_messages: snapshot.backups.total_messages, + committed_message_id: snapshot.backups.committed_message_id, + queued_checkpoints: snapshot.backups.queued_checkpoints, + queued_file_ctxs: snapshot.backups.queued_file_ctxs, files_pending_ingestion: snapshot.backups.files_pending_ingestion, files_ingested: snapshot.backups.files_ingested, cur_ingest_retries: snapshot.backups.cur_ingest_retries, diff --git a/rust/crates/sift_stream_bindings/src/stream/config.rs b/rust/crates/sift_stream_bindings/src/stream/config.rs index 69c4d6d1f..6de261857 100644 --- a/rust/crates/sift_stream_bindings/src/stream/config.rs +++ b/rust/crates/sift_stream_bindings/src/stream/config.rs @@ -1,12 +1,21 @@ use crate::{ + error::SiftErrorWrapper, sift::metadata::MetadataPy, - stream::channel::{ChannelBitFieldElementPy, ChannelDataTypePy, ChannelEnumTypePy}, + stream::{ + channel::{ChannelBitFieldElementPy, ChannelDataTypePy, ChannelEnumTypePy, ValuePy}, + request::IngestWithConfigDataStreamRequestWrapperPy, + time::TimeValuePy, + }, }; use pyo3::prelude::*; use pyo3_stub_gen::derive::*; use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig}; -use sift_stream::stream::run::RunSelector; +use sift_stream::{ + ChannelIndex, FlowBuilder, FlowDescriptor, FlowDescriptorBuilder, stream::run::RunSelector, +}; use sift_stream::{IngestionConfigForm, RunForm}; +use std::collections::HashMap; +use std::sync::Arc; // Type Definitions #[gen_stub_pyclass] @@ -39,6 +48,37 @@ pub struct FlowConfigPy { channels: Vec, } +#[gen_stub_pyclass] +#[pyclass] +#[derive(Clone)] +pub struct ChannelIndexPy { + inner: ChannelIndex, +} + +#[gen_stub_pyclass] +#[pyclass] +pub struct FlowDescriptorBuilderPy { + inner: Option>, +} + +#[gen_stub_pyclass] +#[pyclass] +#[derive(Clone)] +pub struct FlowDescriptorPy { + // Use Arc to make cloning cheap - cloning Arc just increments a reference count + inner: Arc>, +} + +#[gen_stub_pyclass] +#[pyclass] +pub struct FlowBuilderPy { + // We store the descriptor to ensure it lives as long as FlowBuilderPy. + // Since FlowDescriptorPy uses Arc internally, cloning is cheap (just increments a reference count). + // The field is intentionally unused (we only need it for lifetime management). + _descriptor: FlowDescriptorPy, + builder: Option>, +} + #[gen_stub_pyclass] #[pyclass] #[derive(Clone)] @@ -156,6 +196,26 @@ impl From for ChannelConfigPy { } } +impl From> for FlowDescriptorPy { + fn from(descriptor: FlowDescriptor) -> Self { + FlowDescriptorPy { + inner: Arc::new(descriptor), + } + } +} + +impl From for ChannelIndex { + fn from(index: ChannelIndexPy) -> Self { + index.inner + } +} + +impl From for ChannelIndexPy { + fn from(index: ChannelIndex) -> Self { + ChannelIndexPy { inner: index } + } +} + // PyO3 Method Implementations #[gen_stub_pymethods] #[pymethods] @@ -259,3 +319,131 @@ impl RunSelectorPy { } } } + +#[gen_stub_pymethods] +#[pymethods] +impl FlowDescriptorBuilderPy { + #[new] + pub fn new(ingestion_config_id: &str, name: &str) -> Self { + Self { + inner: Some(FlowDescriptorBuilder::new(ingestion_config_id, name)), + } + } + + /// Adds a new channel to the flow. + /// + /// This returns the index of the channel in the flow. + pub fn add(&mut self, key: &str, field_type: ChannelDataTypePy) -> PyResult { + let Some(builder) = self.inner.as_mut() else { + return Err(PyErr::new::( + "Builder has already been consumed", + )); + }; + Ok(builder.add(key.to_string(), field_type.into()).into()) + } + + /// Builds the FlowDescriptor from the builder. + pub fn build(&mut self) -> PyResult { + let Some(builder) = self.inner.take() else { + return Err(PyErr::new::( + "Builder has already been consumed", + )); + }; + Ok(builder.build().into()) + } +} + +#[gen_stub_pymethods] +#[pymethods] +impl FlowDescriptorPy { + /// Gets the type of the channel with the given key. + pub fn get(&self, key: &str) -> Option { + self.inner.as_ref().get(key).map(|dt| dt.into()) + } + + /// Gets the mapping of keys to channel indices. + pub fn mapping(&self) -> HashMap { + self.inner + .as_ref() + .mapping() + .iter() + .map(|(k, v)| (k.clone(), (*v).into())) + .collect() + } +} + +#[gen_stub_pymethods] +#[pymethods] +impl FlowBuilderPy { + #[new] + pub fn new(descriptor: FlowDescriptorPy) -> Self { + // Since FlowDescriptorPy uses Arc internally, cloning is cheap (just increments a reference count). + // We can safely create a 'static reference because: + // 1. We store `descriptor` in the same struct, ensuring it lives as long as `FlowBuilderPy` + // 2. The `FlowBuilder` only holds a reference and doesn't outlive `FlowBuilderPy` + // 3. The reference is never used after `FlowBuilderPy` is dropped + // 4. The descriptor's inner `FlowDescriptor` is wrapped in Arc, making it shareable + // + // SAFETY: We extend the lifetime of the reference to 'static using transmute. + // This is safe because the Arc ensures the data lives as long as needed. + let descriptor_ref: &'static FlowDescriptor = unsafe { + // Get a reference to the inner FlowDescriptor through the Arc + std::mem::transmute(&*Arc::as_ptr(&descriptor.inner)) + }; + let flow_builder = FlowBuilder::new(descriptor_ref); + Self { + _descriptor: descriptor, + builder: Some(flow_builder), + } + } + + /// Attaches a run ID to the flow. + pub fn attach_run_id(&mut self, run_id: &str) -> PyResult<()> { + let Some(builder) = self.builder.as_mut() else { + return Err(PyErr::new::( + "Builder has already been consumed", + )); + }; + builder.attach_run_id(run_id); + Ok(()) + } + + /// Sets the value of the channel with the given index. + pub fn set(&mut self, index: ChannelIndexPy, value: ValuePy) -> PyResult<()> { + let Some(builder) = self.builder.as_mut() else { + return Err(PyErr::new::( + "Builder has already been consumed", + )); + }; + match builder.set(index.into(), value) { + Ok(()) => Ok(()), + Err(e) => Err(SiftErrorWrapper(e).into()), + } + } + + /// Sets the value of the channel with the given key. + pub fn set_with_key(&mut self, key: &str, value: ValuePy) -> PyResult<()> { + let Some(builder) = self.builder.as_mut() else { + return Err(PyErr::new::( + "Builder has already been consumed", + )); + }; + match builder.set_with_key(key, value) { + Ok(()) => Ok(()), + Err(e) => Err(SiftErrorWrapper(e).into()), + } + } + + /// Builds an IngestWithConfigDataStreamRequest, consuming the builder. + pub fn request( + &mut self, + now: TimeValuePy, + ) -> PyResult { + let Some(builder) = self.builder.take() else { + return Err(PyErr::new::( + "Builder has already been consumed", + )); + }; + Ok(builder.request(now.into()).into()) + } +} diff --git a/rust/crates/sift_stream_bindings/src/stream/mod.rs b/rust/crates/sift_stream_bindings/src/stream/mod.rs index 60ab2fefc..be248c71d 100644 --- a/rust/crates/sift_stream_bindings/src/stream/mod.rs +++ b/rust/crates/sift_stream_bindings/src/stream/mod.rs @@ -8,11 +8,13 @@ pub mod time; use crate::error::SiftErrorWrapper; use crate::metrics::SiftStreamMetricsSnapshotPy; use crate::stream::channel::ChannelValuePy; -use crate::stream::config::{FlowConfigPy, RunSelectorPy}; +use crate::stream::config::{FlowConfigPy, FlowDescriptorPy, RunSelectorPy}; +use crate::stream::request::IngestWithConfigDataStreamRequestWrapperPy; use crate::stream::time::TimeValuePy; use pyo3::{prelude::*, types::PyIterator}; use pyo3_async_runtimes::tokio::future_into_py; use pyo3_stub_gen::derive::*; +use sift_rs::ingest::v1::IngestWithConfigDataStreamRequest; use sift_stream::{Flow, FlowConfig, IngestionConfigMode, SiftStream}; use std::collections::HashMap; use std::sync::Arc; @@ -132,6 +134,26 @@ impl SiftStreamPy { Ok(awaitable.into()) } + pub fn send_requests_nonblocking(&self, flows: &Bound<'_, PyAny>) -> PyResult<()> { + let flow_iter = PyIterator::from_object(flows)?; + let mut flows_vec: Vec = Vec::new(); + for item in flow_iter { + let request = item?.extract::()?; + flows_vec.push(request.into()); + } + + let mut inner_guard = self.inner.blocking_lock(); + let stream = inner_guard.as_mut().ok_or_else(|| { + PyErr::new::( + "Stream has been consumed by finish()", + ) + })?; + + stream + .send_requests_nonblocking(flows_vec) + .map_err(|e| SiftErrorWrapper(e).into()) + } + pub fn get_metrics_snapshot(&self) -> PyResult { let inner_guard = self.inner.blocking_lock(); let stream = inner_guard.as_ref().ok_or_else(|| { @@ -171,7 +193,20 @@ impl SiftStreamPy { Ok(awaitable.into()) } - pub fn get_flows(&self) -> PyResult> { + pub fn get_flow_descriptor(&self, flow_name: &str) -> PyResult { + let inner_guard = self.inner.blocking_lock(); + let stream = inner_guard.as_ref().ok_or_else(|| { + PyErr::new::( + "Stream has been consumed by finish()", + ) + })?; + match stream.get_flow_descriptor(flow_name) { + Ok(descriptor) => Ok(FlowDescriptorPy::from(descriptor)), + Err(e) => Err(SiftErrorWrapper(e).into()), + } + } + + pub fn get_flows(&self) -> PyResult> { let inner_guard = self.inner.blocking_lock(); let sift_stream = inner_guard.as_ref().ok_or_else(|| { PyErr::new::( diff --git a/rust/crates/sift_stream_bindings/src/stream/request.rs b/rust/crates/sift_stream_bindings/src/stream/request.rs index ea07887fd..207d9497a 100644 --- a/rust/crates/sift_stream_bindings/src/stream/request.rs +++ b/rust/crates/sift_stream_bindings/src/stream/request.rs @@ -25,6 +25,14 @@ pub struct IngestWithConfigDataStreamRequestPy { pub organization_id: String, } +// Type Definitions +#[gen_stub_pyclass] +#[pyclass] +#[derive(Clone)] +pub struct IngestWithConfigDataStreamRequestWrapperPy { + inner: IngestWithConfigDataStreamRequest, +} + #[gen_stub_pyclass] #[pyclass] #[derive(Clone)] @@ -54,6 +62,17 @@ impl From for IngestWithConfigDataStreamReq } } +impl From for IngestWithConfigDataStreamRequestWrapperPy { + fn from(request: IngestWithConfigDataStreamRequest) -> Self { + Self { inner: request } + } +} + +impl From for IngestWithConfigDataStreamRequest { + fn from(request: IngestWithConfigDataStreamRequestWrapperPy) -> Self { + request.inner + } +} // PyO3 Method Implementations #[gen_stub_pymethods] #[pymethods] From 5ea7d9058b22411b6cf664f06fcf95dba21a7177 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Tue, 25 Nov 2025 18:45:20 -0600 Subject: [PATCH 2/2] Formatting --- .../src/stream/mode/ingestion_config.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs index 8e5fd616e..4f5f79c3e 100644 --- a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs +++ b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs @@ -15,9 +15,7 @@ use futures_core::Stream; use prost::Message; use sift_error::prelude::*; use sift_rs::{ - ingest::v1::{ - IngestWithConfigDataChannelValue, IngestWithConfigDataStreamRequest, - }, + ingest::v1::{IngestWithConfigDataChannelValue, IngestWithConfigDataStreamRequest}, ingestion_configs::v2::{FlowConfig, IngestionConfig}, runs::v2::Run, wrappers::ingestion_configs::{IngestionConfigServiceWrapper, new_ingestion_config_service}, @@ -393,10 +391,14 @@ impl SiftStream { /// Get the flow descriptor for a given flow name. pub fn get_flow_descriptor(&self, flow_name: &str) -> Result> { - self.mode.flows_by_name.get(flow_name).cloned().ok_or(Error::new_msg( - ErrorKind::NotFoundError, - format!("flow '{}' not found", flow_name), - )) + self.mode + .flows_by_name + .get(flow_name) + .cloned() + .ok_or(Error::new_msg( + ErrorKind::NotFoundError, + format!("flow '{}' not found", flow_name), + )) } /// Attach a run to the stream. Any data provided through [SiftStream::send] after return