Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ all = [
'pyOpenSSL<24.0.0',
'pyarrow>=17.0.0',
'rosbags~=0.0',
'sift-stream-bindings>=0.2.0-rc2',
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
'types-pyOpenSSL<24.0.0',
]
build = [
Expand Down Expand Up @@ -100,7 +100,7 @@ dev-all = [
'pytest==8.2.2',
'rosbags~=0.0',
'ruff~=0.12.10',
'sift-stream-bindings>=0.2.0-rc2',
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
'tomlkit~=0.13.3',
'types-pyOpenSSL<24.0.0',
]
Expand Down Expand Up @@ -153,7 +153,7 @@ docs-build = [
'pytest==8.2.2',
'rosbags~=0.0',
'ruff~=0.12.10',
'sift-stream-bindings>=0.2.0-rc2',
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
'tomlkit~=0.13.3',
'types-pyOpenSSL<24.0.0',
]
Expand All @@ -176,10 +176,10 @@ rosbags = [
'rosbags~=0.0',
]
sift-stream = [
'sift-stream-bindings>=0.2.0-rc2',
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
]
sift-stream-bindings = [
'sift-stream-bindings>=0.2.0-rc2',
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
]
tdms = [
'npTDMS~=1.9',
Expand Down Expand Up @@ -215,7 +215,7 @@ docs = ["mkdocs",
openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"]
tdms = ["npTDMS~=1.9"]
rosbags = ["rosbags~=0.0"]
sift-stream = ["sift-stream-bindings>=0.2.0-rc2"]
sift-stream = ["sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4"]
hdf5 = ["h5py~=3.11", "polars~=1.8"]
data-review = ["pyarrow>=17.0.0"]

Expand Down
62 changes: 19 additions & 43 deletions rust/crates/sift_stream/benches/message_to_ingest_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::hint::black_box;
use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig};
use sift_stream::stream::mode::ingestion_config::Flow;
use sift_stream::{
ChannelDataType, ChannelValue, TimeValue, Value,
ChannelDataType, ChannelValue, FlowDescriptor, TimeValue, Value,
stream::mode::bench::{message_to_ingest_req, message_to_ingest_req_direct},
};

Expand Down Expand Up @@ -118,11 +118,9 @@ fn flow_randomized(name: &str, flow_config: &FlowConfig) -> Flow {
}

// Configuration constants - these can be adjusted to test different scenarios
const NUM_FLOWS: usize = 10; // Number of flow configs to create
const NUM_CHANNELS_PER_FLOW: usize = 2000; // Number of channels per flow
const INGESTION_CONFIG_ID: &str = "benchmark-config";
const RUN_ID: Option<String> = None;
const FLOW_TO_RANDOMIZE: usize = 8;

fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) {
// Create a flow with ordered channel values (matching the first flow config)
Expand All @@ -140,61 +138,42 @@ fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) {
}

fn benchmark_message_to_ingest_req_ordered(c: &mut Criterion) {
// Create flow configs
let mut flow_configs = Vec::with_capacity(NUM_FLOWS);
for i in 0..NUM_FLOWS {
flow_configs.push(flow_config(&format!("flow_{i}"), NUM_CHANNELS_PER_FLOW));
}
// Create a flow with ordered channel values.
let flow = flow_config("my_benchmark_flow", NUM_CHANNELS_PER_FLOW);
let message = flow_ordered("my_benchmark_flow", &flow);

// Create a flow with ordered channel values (matching the first flow config)
let message = flow_ordered("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();

c.bench_function("message_to_ingest_req_ordered", |b| {
b.iter(|| {
black_box(message_to_ingest_req(
&message,
INGESTION_CONFIG_ID,
RUN_ID.clone(),
&flow_configs,
))
})
b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor)))
});
}

fn benchmark_message_to_ingest_req_randomized(c: &mut Criterion) {
// Create flow configs
let mut flow_configs = Vec::with_capacity(NUM_FLOWS);
for i in 0..NUM_FLOWS {
flow_configs.push(flow_config(&format!("flow_{i}"), NUM_CHANNELS_PER_FLOW));
}
// Create a flow with randomized channel values.
let flow = flow_config("my_benchmark_flow", NUM_CHANNELS_PER_FLOW);
let message = flow_randomized("my_benchmark_flow", &flow);

// Create a flow with randomized channel values (matching the first flow config)
let message = flow_randomized("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();

c.bench_function("message_to_ingest_req_randomized", |b| {
b.iter(|| {
black_box(message_to_ingest_req(
&message,
INGESTION_CONFIG_ID,
RUN_ID.clone(),
&flow_configs,
))
})
b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor)))
});
}

fn benchmark_message_to_ingest_req_varying_sizes(c: &mut Criterion) {
let mut group = c.benchmark_group("message_to_ingest_req_varying_sizes");

for &num_channels in &[5, 10, 100, 1000, 5000] {
let flow_name = format!("flow_{num_channels}");

// Create flow configs with varying channel counts
let mut flow_configs = Vec::with_capacity(NUM_FLOWS);
for i in 0..NUM_FLOWS {
flow_configs.push(flow_config(&format!("flow_{i}"), num_channels));
}
let flow = flow_config(&flow_name, num_channels);
let message_ordered = flow_ordered(&flow_name, &flow);
let message_randomized = flow_randomized(&flow_name, &flow);
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();

// Test direct scenario
let message_ordered = flow_ordered("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
group.bench_function(&format!("direct_{num_channels}_channels"), |b| {
b.iter(|| {
black_box(message_to_ingest_req_direct(
Expand All @@ -209,22 +188,19 @@ fn benchmark_message_to_ingest_req_varying_sizes(c: &mut Criterion) {
b.iter(|| {
black_box(message_to_ingest_req(
&message_ordered,
INGESTION_CONFIG_ID,
RUN_ID.clone(),
&flow_configs,
&descriptor,
))
})
});

// Test randomized scenario
let message_randomized = flow_randomized("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
group.bench_function(&format!("randomized_{num_channels}_channels"), |b| {
b.iter(|| {
black_box(message_to_ingest_req(
&message_randomized,
INGESTION_CONFIG_ID,
RUN_ID.clone(),
&flow_configs,
&descriptor,
))
})
});
Expand Down
12 changes: 3 additions & 9 deletions rust/crates/sift_stream/src/stream/mode/bench.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
use crate::stream::flow::FlowDescriptor;
use crate::stream::mode::ingestion_config::Flow;
use crate::{IngestionConfigMode, SiftStream};
use sift_rs::ingestion_configs::v2::FlowConfig;

/// Unstable wrapper around [SiftStream::message_to_ingest_req] used for benchmarking purposes.
#[inline]
pub fn message_to_ingest_req(
message: &Flow,
ingestion_config_id: &str,
run_id: Option<String>,
flows: &[FlowConfig],
descriptor: &FlowDescriptor<String>,
) -> Option<sift_rs::ingest::v1::IngestWithConfigDataStreamRequest> {
SiftStream::<IngestionConfigMode>::message_to_ingest_req(
message,
ingestion_config_id,
run_id,
flows,
)
SiftStream::<IngestionConfigMode>::message_to_ingest_req(message, run_id, descriptor)
}

/// Unstable wrapper around [SiftStream::message_to_ingest_req_direct] used for benchmarking purposes.
Expand Down
Loading
Loading