From 3140569e3762750bd68fab4ba732a79aa54682ee Mon Sep 17 00:00:00 2001 From: Arun Singh Date: Fri, 29 May 2026 18:23:08 +0530 Subject: [PATCH 1/6] test(connectors): add source suite helper --- core/integration/tests/connectors/mod.rs | 1 + .../tests/connectors/source_suite.rs | 101 ++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 core/integration/tests/connectors/source_suite.rs diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index bb4bcc69f1..32e5af5dfd 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -31,6 +31,7 @@ mod postgres; mod quickwit; mod random; mod runtime; +mod source_suite; mod stdout; use iggy_common::IggyTimestamp; diff --git a/core/integration/tests/connectors/source_suite.rs b/core/integration/tests/connectors/source_suite.rs new file mode 100644 index 0000000000..c5a6558447 --- /dev/null +++ b/core/integration/tests/connectors/source_suite.rs @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::time::Duration; + +use iggy_common::{Consumer, Identifier, IggyMessage, MessageClient, PollingStrategy}; +use integration::harness::{TestHarness, seeds}; +use tokio::time::{sleep, timeout}; + +pub(crate) struct SourceSuiteConfig { + pub(crate) consumer_name: &'static str, + pub(crate) min_messages: usize, + pub(crate) poll_batch: u32, + pub(crate) warmup: Duration, + pub(crate) retry_interval: Duration, + pub(crate) timeout: Duration, +} + +impl Default for SourceSuiteConfig { + fn default() -> Self { + Self { + consumer_name: "source_suite_consumer", + min_messages: 1, + poll_batch: 100, + warmup: Duration::from_secs(1), + retry_interval: Duration::from_millis(100), + timeout: Duration::from_secs(5), + } + } +} + +pub(crate) async fn poll_until_min_messages( + harness: &TestHarness, + config: &SourceSuiteConfig, +) -> Vec { + sleep(config.warmup).await; + + let client = harness.root_client().await.expect("root client"); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = config.consumer_name.try_into().unwrap(); + + let poll = async { + loop { + let polled = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + config.poll_batch, + true, + ) + .await + .expect("poll source messages"); + + if polled.messages.len() >= config.min_messages { + return polled.messages; + } + + sleep(config.retry_interval).await; + } + }; + + timeout(config.timeout, poll) + .await + .expect("source suite timed out waiting for messages") +} + +pub(crate) async fn assert_source_produces_messages(harness: &TestHarness) -> Vec { + let messages = poll_until_min_messages(harness, &SourceSuiteConfig::default()).await; + assert!( + !messages.is_empty(), + "source suite expected at least one message" + ); + messages +} + +pub(crate) fn config_for_consumer(consumer_name: &'static str) -> SourceSuiteConfig { + SourceSuiteConfig { + consumer_name, + ..SourceSuiteConfig::default() + } +} From a1efe2af930de1be5cf18b9685d451661e01654c Mon Sep 17 00:00:00 2001 From: Arun Singh Date: Fri, 29 May 2026 22:43:59 +0530 Subject: [PATCH 2/6] test(connectors): wire random source suite --- .../tests/connectors/random/random_source.rs | 85 +++---------------- 1 file changed, 13 insertions(+), 72 deletions(-) diff --git a/core/integration/tests/connectors/random/random_source.rs b/core/integration/tests/connectors/random/random_source.rs index 5efb6fd086..39ad318037 100644 --- a/core/integration/tests/connectors/random/random_source.rs +++ b/core/integration/tests/connectors/random/random_source.rs @@ -17,46 +17,20 @@ * under the License. */ -use iggy_common::MessageClient; -use iggy_common::{Consumer, Identifier, PollingStrategy}; +use crate::connectors::source_suite; use integration::harness::seeds; use integration::iggy_harness; -use std::time::Duration; -use tokio::time::sleep; #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/random/source.toml")), seed = seeds::connector_stream )] async fn random_source_produces_messages(harness: &TestHarness) { - sleep(Duration::from_secs(1)).await; - - let client = harness.root_client().await.unwrap(); - let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); - let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); - let consumer_id: Identifier = "test_consumer".try_into().unwrap(); - - let messages = client - .poll_messages( - &stream_id, - &topic_id, - None, - &Consumer::new(consumer_id), - &PollingStrategy::next(), - 10, - true, - ) - .await - .expect("Failed to poll messages"); - + let messages = source_suite::assert_source_produces_messages(harness).await; assert!( - !messages.messages.is_empty(), + !messages.is_empty(), "No messages received from random source" ); - assert!( - messages.current_offset > 0, - "Current offset should be greater than 0" - ); } #[iggy_harness( @@ -64,32 +38,12 @@ async fn random_source_produces_messages(harness: &TestHarness) { seed = seeds::connector_stream )] async fn state_persists_across_connector_restart(harness: &mut TestHarness) { - let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); - let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); - let consumer_id: Identifier = "state_test_consumer".try_into().unwrap(); - - sleep(Duration::from_secs(1)).await; - - let client = harness.root_client().await.unwrap(); - let offset_before = { - let messages = client - .poll_messages( - &stream_id, - &topic_id, - None, - &Consumer::new(consumer_id.clone()), - &PollingStrategy::next(), - 100, - true, - ) - .await - .expect("Failed to poll messages before restart"); - assert!( - messages.current_offset > 0, - "Should have messages before restart" - ); - messages.current_offset - }; + let before_config = source_suite::config_for_consumer("source_suite_before_restart"); + let before = source_suite::poll_until_min_messages(harness, &before_config).await; + assert!( + !before.is_empty(), + "source suite expected messages before connector restart" + ); harness .server_mut() @@ -100,24 +54,11 @@ async fn state_persists_across_connector_restart(harness: &mut TestHarness) { .start_dependents() .await .expect("Failed to restart connectors"); - sleep(Duration::from_secs(1)).await; - - let offset_after = client - .poll_messages( - &stream_id, - &topic_id, - None, - &Consumer::new(consumer_id), - &PollingStrategy::next(), - 100, - true, - ) - .await - .expect("Failed to poll messages after restart") - .current_offset; + let after_config = source_suite::config_for_consumer("source_suite_after_restart"); + let after = source_suite::poll_until_min_messages(harness, &after_config).await; assert!( - offset_after > offset_before, - "After restart, offset {offset_after} should be greater than before {offset_before}" + !after.is_empty(), + "source suite expected messages after connector restart" ); } From 52ad054236e3ccccf03db89de10be5e7832be186 Mon Sep 17 00:00:00 2001 From: Arun Singh Date: Fri, 29 May 2026 23:43:23 +0530 Subject: [PATCH 3/6] docs(connectors): explain source suite usage --- .../tests/connectors/source_suite.md | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 core/integration/tests/connectors/source_suite.md diff --git a/core/integration/tests/connectors/source_suite.md b/core/integration/tests/connectors/source_suite.md new file mode 100644 index 0000000000..edcb9bc2fc --- /dev/null +++ b/core/integration/tests/connectors/source_suite.md @@ -0,0 +1,104 @@ + + +# Source Connector Pareto Suite + +## Purpose + +The source suite provides reusable source-side integration test helpers for +connector behavior that repeatedly appears in review. It is a Pareto baseline, +not a replacement for backend-specific source tests. + +## Covered In The First Suite + +- Source eventually produces messages into Iggy. +- Source continues producing after connector runtime restart. +- Polling uses bounded retries and a timeout. + +## Not Covered By The Generic Helper + +- Backend mark/delete side effects. +- Exact duplicate or replay assertions. +- Forced Iggy send failures. +- Source-specific cursor validation. + +## How To Use + +Import the helper from a connector integration test module and keep the +connector-specific harness setup in that test file. + +```rust +use crate::connectors::source_suite; +use integration::harness::seeds; +use integration::iggy_harness; + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors//source.toml")), + seed = seeds::connector_stream +)] +async fn source_produces_messages(harness: &TestHarness) { + let messages = source_suite::assert_source_produces_messages(harness).await; + assert!(!messages.is_empty()); +} +``` + +For restart coverage, use distinct consumers before and after the runtime +restart so the helper validates fresh production on both sides of the restart. + +```rust +use crate::connectors::source_suite; +use integration::harness::seeds; +use integration::iggy_harness; + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors//source.toml")), + seed = seeds::connector_stream +)] +async fn source_produces_after_restart(harness: &mut TestHarness) { + let before_config = source_suite::config_for_consumer("source_before_restart"); + let before = source_suite::poll_until_min_messages(harness, &before_config).await; + assert!(!before.is_empty()); + + harness.server_mut().stop_dependents().expect("stop connectors"); + harness + .server_mut() + .start_dependents() + .await + .expect("restart connectors"); + + let after_config = source_suite::config_for_consumer("source_after_restart"); + let after = source_suite::poll_until_min_messages(harness, &after_config).await; + assert!(!after.is_empty()); +} +``` + +## When To Add Connector-Specific Tests + +Add connector-specific tests when the source has an external cursor, +delete-after-read behavior, processed or mark columns, CDC offsets, or +backend-specific replay guarantees. + +## Future Follow-Ups + +- Deterministic send-failure tests after the runtime has an approved injection + point. +- Source adapters for Postgres, Elasticsearch, and InfluxDB once Docker-backed + tests are available locally. +- Explicit duplicate or replay-window assertions for connectors with stable + payload identity. From 72bafa44b9cff3e1bbb17bc3d8b56a9b7636780b Mon Sep 17 00:00:00 2001 From: Arun Singh Date: Mon, 1 Jun 2026 15:24:06 +0530 Subject: [PATCH 4/6] docs(connectors): remove source suite guide --- .../tests/connectors/source_suite.md | 104 ------------------ 1 file changed, 104 deletions(-) delete mode 100644 core/integration/tests/connectors/source_suite.md diff --git a/core/integration/tests/connectors/source_suite.md b/core/integration/tests/connectors/source_suite.md deleted file mode 100644 index edcb9bc2fc..0000000000 --- a/core/integration/tests/connectors/source_suite.md +++ /dev/null @@ -1,104 +0,0 @@ - - -# Source Connector Pareto Suite - -## Purpose - -The source suite provides reusable source-side integration test helpers for -connector behavior that repeatedly appears in review. It is a Pareto baseline, -not a replacement for backend-specific source tests. - -## Covered In The First Suite - -- Source eventually produces messages into Iggy. -- Source continues producing after connector runtime restart. -- Polling uses bounded retries and a timeout. - -## Not Covered By The Generic Helper - -- Backend mark/delete side effects. -- Exact duplicate or replay assertions. -- Forced Iggy send failures. -- Source-specific cursor validation. - -## How To Use - -Import the helper from a connector integration test module and keep the -connector-specific harness setup in that test file. - -```rust -use crate::connectors::source_suite; -use integration::harness::seeds; -use integration::iggy_harness; - -#[iggy_harness( - server(connectors_runtime(config_path = "tests/connectors//source.toml")), - seed = seeds::connector_stream -)] -async fn source_produces_messages(harness: &TestHarness) { - let messages = source_suite::assert_source_produces_messages(harness).await; - assert!(!messages.is_empty()); -} -``` - -For restart coverage, use distinct consumers before and after the runtime -restart so the helper validates fresh production on both sides of the restart. - -```rust -use crate::connectors::source_suite; -use integration::harness::seeds; -use integration::iggy_harness; - -#[iggy_harness( - server(connectors_runtime(config_path = "tests/connectors//source.toml")), - seed = seeds::connector_stream -)] -async fn source_produces_after_restart(harness: &mut TestHarness) { - let before_config = source_suite::config_for_consumer("source_before_restart"); - let before = source_suite::poll_until_min_messages(harness, &before_config).await; - assert!(!before.is_empty()); - - harness.server_mut().stop_dependents().expect("stop connectors"); - harness - .server_mut() - .start_dependents() - .await - .expect("restart connectors"); - - let after_config = source_suite::config_for_consumer("source_after_restart"); - let after = source_suite::poll_until_min_messages(harness, &after_config).await; - assert!(!after.is_empty()); -} -``` - -## When To Add Connector-Specific Tests - -Add connector-specific tests when the source has an external cursor, -delete-after-read behavior, processed or mark columns, CDC offsets, or -backend-specific replay guarantees. - -## Future Follow-Ups - -- Deterministic send-failure tests after the runtime has an approved injection - point. -- Source adapters for Postgres, Elasticsearch, and InfluxDB once Docker-backed - tests are available locally. -- Explicit duplicate or replay-window assertions for connectors with stable - payload identity. From 0d9c13b4b23826f0f06d1797ac3f9de1682c285a Mon Sep 17 00:00:00 2001 From: Arun Singh Date: Tue, 2 Jun 2026 18:14:08 +0530 Subject: [PATCH 5/6] test(connectors): narrow source suite scope --- .../tests/connectors/random/random_source.rs | 36 +------------------ .../tests/connectors/source_suite.rs | 32 ++++++++--------- 2 files changed, 15 insertions(+), 53 deletions(-) diff --git a/core/integration/tests/connectors/random/random_source.rs b/core/integration/tests/connectors/random/random_source.rs index 39ad318037..b9eff29c4d 100644 --- a/core/integration/tests/connectors/random/random_source.rs +++ b/core/integration/tests/connectors/random/random_source.rs @@ -26,39 +26,5 @@ use integration::iggy_harness; seed = seeds::connector_stream )] async fn random_source_produces_messages(harness: &TestHarness) { - let messages = source_suite::assert_source_produces_messages(harness).await; - assert!( - !messages.is_empty(), - "No messages received from random source" - ); -} - -#[iggy_harness( - server(connectors_runtime(config_path = "tests/connectors/random/source.toml")), - seed = seeds::connector_stream -)] -async fn state_persists_across_connector_restart(harness: &mut TestHarness) { - let before_config = source_suite::config_for_consumer("source_suite_before_restart"); - let before = source_suite::poll_until_min_messages(harness, &before_config).await; - assert!( - !before.is_empty(), - "source suite expected messages before connector restart" - ); - - harness - .server_mut() - .stop_dependents() - .expect("Failed to stop connectors"); - harness - .server_mut() - .start_dependents() - .await - .expect("Failed to restart connectors"); - - let after_config = source_suite::config_for_consumer("source_suite_after_restart"); - let after = source_suite::poll_until_min_messages(harness, &after_config).await; - assert!( - !after.is_empty(), - "source suite expected messages after connector restart" - ); + source_suite::assert_source_produces_messages(harness).await; } diff --git a/core/integration/tests/connectors/source_suite.rs b/core/integration/tests/connectors/source_suite.rs index c5a6558447..462c659394 100644 --- a/core/integration/tests/connectors/source_suite.rs +++ b/core/integration/tests/connectors/source_suite.rs @@ -27,7 +27,6 @@ pub(crate) struct SourceSuiteConfig { pub(crate) consumer_name: &'static str, pub(crate) min_messages: usize, pub(crate) poll_batch: u32, - pub(crate) warmup: Duration, pub(crate) retry_interval: Duration, pub(crate) timeout: Duration, } @@ -38,7 +37,6 @@ impl Default for SourceSuiteConfig { consumer_name: "source_suite_consumer", min_messages: 1, poll_batch: 100, - warmup: Duration::from_secs(1), retry_interval: Duration::from_millis(100), timeout: Duration::from_secs(5), } @@ -49,16 +47,16 @@ pub(crate) async fn poll_until_min_messages( harness: &TestHarness, config: &SourceSuiteConfig, ) -> Vec { - sleep(config.warmup).await; - let client = harness.root_client().await.expect("root client"); let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); let consumer_id: Identifier = config.consumer_name.try_into().unwrap(); let poll = async { + let mut messages = Vec::new(); + loop { - let polled = client + if let Ok(mut polled) = client .poll_messages( &stream_id, &topic_id, @@ -69,19 +67,24 @@ pub(crate) async fn poll_until_min_messages( true, ) .await - .expect("poll source messages"); + { + messages.append(&mut polled.messages); - if polled.messages.len() >= config.min_messages { - return polled.messages; + if messages.len() >= config.min_messages { + return messages; + } } sleep(config.retry_interval).await; } }; - timeout(config.timeout, poll) - .await - .expect("source suite timed out waiting for messages") + timeout(config.timeout, poll).await.unwrap_or_else(|_| { + panic!( + "source suite timed out after {:?} waiting for {} messages for consumer {}", + config.timeout, config.min_messages, config.consumer_name + ) + }) } pub(crate) async fn assert_source_produces_messages(harness: &TestHarness) -> Vec { @@ -92,10 +95,3 @@ pub(crate) async fn assert_source_produces_messages(harness: &TestHarness) -> Ve ); messages } - -pub(crate) fn config_for_consumer(consumer_name: &'static str) -> SourceSuiteConfig { - SourceSuiteConfig { - consumer_name, - ..SourceSuiteConfig::default() - } -} From 08d2287982d2ca423b88e005e2b1a6c77fa0cbf1 Mon Sep 17 00:00:00 2001 From: Arun Singh Date: Thu, 4 Jun 2026 14:28:00 +0530 Subject: [PATCH 6/6] test(connectors): collapse random source liveness helper --- core/integration/tests/connectors/mod.rs | 2 +- .../tests/connectors/random/random_source.rs | 4 +- .../connectors/random_source_liveness.rs | 66 +++++++++++++ .../tests/connectors/source_suite.rs | 97 ------------------- 4 files changed, 69 insertions(+), 100 deletions(-) create mode 100644 core/integration/tests/connectors/random_source_liveness.rs delete mode 100644 core/integration/tests/connectors/source_suite.rs diff --git a/core/integration/tests/connectors/mod.rs b/core/integration/tests/connectors/mod.rs index 32e5af5dfd..1016c6e517 100644 --- a/core/integration/tests/connectors/mod.rs +++ b/core/integration/tests/connectors/mod.rs @@ -30,8 +30,8 @@ mod mongodb; mod postgres; mod quickwit; mod random; +mod random_source_liveness; mod runtime; -mod source_suite; mod stdout; use iggy_common::IggyTimestamp; diff --git a/core/integration/tests/connectors/random/random_source.rs b/core/integration/tests/connectors/random/random_source.rs index b9eff29c4d..5a05d7f532 100644 --- a/core/integration/tests/connectors/random/random_source.rs +++ b/core/integration/tests/connectors/random/random_source.rs @@ -17,7 +17,7 @@ * under the License. */ -use crate::connectors::source_suite; +use crate::connectors::random_source_liveness; use integration::harness::seeds; use integration::iggy_harness; @@ -26,5 +26,5 @@ use integration::iggy_harness; seed = seeds::connector_stream )] async fn random_source_produces_messages(harness: &TestHarness) { - source_suite::assert_source_produces_messages(harness).await; + random_source_liveness::assert_produces_messages(harness).await; } diff --git a/core/integration/tests/connectors/random_source_liveness.rs b/core/integration/tests/connectors/random_source_liveness.rs new file mode 100644 index 0000000000..eb52eacb7d --- /dev/null +++ b/core/integration/tests/connectors/random_source_liveness.rs @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::time::Duration; + +use iggy_common::{Consumer, Identifier, MessageClient, PollingStrategy}; +use integration::harness::{TestHarness, seeds}; +use tokio::time::{sleep, timeout}; + +const CONSUMER_NAME: &str = "random_source_liveness_consumer"; +const POLL_BATCH: u32 = 100; +const RETRY_INTERVAL: Duration = Duration::from_millis(100); +const POLL_TIMEOUT: Duration = Duration::from_secs(5); + +pub(crate) async fn assert_produces_messages(harness: &TestHarness) { + let client = harness.root_client().await.expect("root client"); + let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); + let consumer_id: Identifier = CONSUMER_NAME.try_into().unwrap(); + + let poll = async { + loop { + if let Ok(polled) = client + .poll_messages( + &stream_id, + &topic_id, + None, + &Consumer::new(consumer_id.clone()), + &PollingStrategy::next(), + POLL_BATCH, + true, + ) + .await + { + if !polled.messages.is_empty() { + return; + } + } + + sleep(RETRY_INTERVAL).await; + } + }; + + timeout(POLL_TIMEOUT, poll).await.unwrap_or_else(|_| { + panic!( + "random source liveness timed out after {:?} waiting for messages", + POLL_TIMEOUT + ) + }) +} diff --git a/core/integration/tests/connectors/source_suite.rs b/core/integration/tests/connectors/source_suite.rs deleted file mode 100644 index 462c659394..0000000000 --- a/core/integration/tests/connectors/source_suite.rs +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use std::time::Duration; - -use iggy_common::{Consumer, Identifier, IggyMessage, MessageClient, PollingStrategy}; -use integration::harness::{TestHarness, seeds}; -use tokio::time::{sleep, timeout}; - -pub(crate) struct SourceSuiteConfig { - pub(crate) consumer_name: &'static str, - pub(crate) min_messages: usize, - pub(crate) poll_batch: u32, - pub(crate) retry_interval: Duration, - pub(crate) timeout: Duration, -} - -impl Default for SourceSuiteConfig { - fn default() -> Self { - Self { - consumer_name: "source_suite_consumer", - min_messages: 1, - poll_batch: 100, - retry_interval: Duration::from_millis(100), - timeout: Duration::from_secs(5), - } - } -} - -pub(crate) async fn poll_until_min_messages( - harness: &TestHarness, - config: &SourceSuiteConfig, -) -> Vec { - let client = harness.root_client().await.expect("root client"); - let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); - let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); - let consumer_id: Identifier = config.consumer_name.try_into().unwrap(); - - let poll = async { - let mut messages = Vec::new(); - - loop { - if let Ok(mut polled) = client - .poll_messages( - &stream_id, - &topic_id, - None, - &Consumer::new(consumer_id.clone()), - &PollingStrategy::next(), - config.poll_batch, - true, - ) - .await - { - messages.append(&mut polled.messages); - - if messages.len() >= config.min_messages { - return messages; - } - } - - sleep(config.retry_interval).await; - } - }; - - timeout(config.timeout, poll).await.unwrap_or_else(|_| { - panic!( - "source suite timed out after {:?} waiting for {} messages for consumer {}", - config.timeout, config.min_messages, config.consumer_name - ) - }) -} - -pub(crate) async fn assert_source_produces_messages(harness: &TestHarness) -> Vec { - let messages = poll_until_min_messages(harness, &SourceSuiteConfig::default()).await; - assert!( - !messages.is_empty(), - "source suite expected at least one message" - ); - messages -}