diff --git a/sentry_streams/src/watermark.rs b/sentry_streams/src/watermark.rs index cd9f2dd0..e9e0df12 100644 --- a/sentry_streams/src/watermark.rs +++ b/sentry_streams/src/watermark.rs @@ -245,4 +245,86 @@ mod tests { set_timestamp(0); }); } + + #[test] + fn test_watermark_to_commit_step_with_python_adapter() { + crate::testutils::initialize_python(); + traced_with_gil!(|py| { + let class_def = c_str!( + r#" +class PassthroughDelegate: + def __init__(self): + self.pending = [] + + def submit(self, payload, committable): + self.pending.append((payload, committable)) + + def poll(self): + result = self.pending + self.pending = [] + return result + + def flush(self, timeout=None): + return self.poll() + +class PassthroughDelegateFactory: + def build(self): + return PassthroughDelegate() +"# + ); + let scope = PyModule::new(py, "test_scope").unwrap(); + py.run(class_def, Some(&scope.dict()), None).unwrap(); + let factory = scope + .getattr("PassthroughDelegateFactory") + .unwrap() + .call0() + .unwrap() + .unbind(); + + let python_adapter_step = Py::new( + py, + RuntimeOperator::PythonAdapter { + route: Route::new("source".to_string(), vec![]), + delegate_factory: factory, + }, + ) + .unwrap(); + + let mut watermark_step = build_chain( + "source", + &[python_adapter_step], + Box::new(WatermarkCommitOffsets::new(1)), + &ConcurrencyConfig::new(1), + &None, + false, + ); + + let msg1 = make_msg( + Some(b"test_message".to_vec()), + BTreeMap::from([(Partition::new(Topic::new("test_topic"), 0), 100)]), + ); + let _ = watermark_step.submit(msg1); + + let msg2 = make_msg( + Some(b"test_message2".to_vec()), + BTreeMap::from([(Partition::new(Topic::new("test_topic"), 1), 80)]), + ); + let _ = watermark_step.submit(msg2); + let res = watermark_step.poll().unwrap(); + assert!(res.is_none()); + + set_timestamp(20); + let res = watermark_step.poll().unwrap(); + assert_eq!( + res, + Some(CommitRequest { + positions: HashMap::from([ + (Partition::new(Topic::new("test_topic"), 0), 100), + (Partition::new(Topic::new("test_topic"), 1), 80), + ]) + }) + ); + set_timestamp(0); + }); + } } diff --git a/sentry_streams/uv.lock b/sentry_streams/uv.lock index 6a6baba9..4479b567 100644 --- a/sentry_streams/uv.lock +++ b/sentry_streams/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.11" [[package]] @@ -893,7 +893,7 @@ wheels = [ [[package]] name = "sentry-streams" -version = "0.0.48" +version = "0.0.50" source = { editable = "." } dependencies = [ { name = "click" },