Skip to content
1 change: 1 addition & 0 deletions core/integration/tests/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod mongodb;
mod postgres;
mod quickwit;
mod random;
mod random_source_liveness;
mod runtime;
mod stdout;

Expand Down
97 changes: 2 additions & 95 deletions core/integration/tests/connectors/random/random_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,107 +17,14 @@
* under the License.
*/

use iggy_common::MessageClient;
use iggy_common::{Consumer, Identifier, PollingStrategy};
use crate::connectors::random_source_liveness;
use integration::harness::seeds;
use integration::iggy_harness;
use std::time::Duration;
use tokio::time::sleep;

#[iggy_harness(
server(connectors_runtime(config_path = "tests/connectors/random/source.toml")),
seed = seeds::connector_stream
)]
async fn random_source_produces_messages(harness: &TestHarness) {
sleep(Duration::from_secs(1)).await;

let client = harness.root_client().await.unwrap();
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
let consumer_id: Identifier = "test_consumer".try_into().unwrap();

let messages = client
.poll_messages(
&stream_id,
&topic_id,
None,
&Consumer::new(consumer_id),
&PollingStrategy::next(),
10,
true,
)
.await
.expect("Failed to poll messages");

assert!(
!messages.messages.is_empty(),
"No messages received from random source"
);
assert!(
messages.current_offset > 0,
"Current offset should be greater than 0"
);
}

#[iggy_harness(
server(connectors_runtime(config_path = "tests/connectors/random/source.toml")),
seed = seeds::connector_stream
)]
async fn state_persists_across_connector_restart(harness: &mut TestHarness) {
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
let consumer_id: Identifier = "state_test_consumer".try_into().unwrap();

sleep(Duration::from_secs(1)).await;

let client = harness.root_client().await.unwrap();
let offset_before = {
let messages = client
.poll_messages(
&stream_id,
&topic_id,
None,
&Consumer::new(consumer_id.clone()),
&PollingStrategy::next(),
100,
true,
)
.await
.expect("Failed to poll messages before restart");
assert!(
messages.current_offset > 0,
"Should have messages before restart"
);
messages.current_offset
};

harness
.server_mut()
.stop_dependents()
.expect("Failed to stop connectors");
harness
.server_mut()
.start_dependents()
.await
.expect("Failed to restart connectors");
sleep(Duration::from_secs(1)).await;

let offset_after = client
.poll_messages(
&stream_id,
&topic_id,
None,
&Consumer::new(consumer_id),
&PollingStrategy::next(),
100,
true,
)
.await
.expect("Failed to poll messages after restart")
.current_offset;

assert!(
offset_after > offset_before,
"After restart, offset {offset_after} should be greater than before {offset_before}"
);
random_source_liveness::assert_produces_messages(harness).await;
}
66 changes: 66 additions & 0 deletions core/integration/tests/connectors/random_source_liveness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use std::time::Duration;

use iggy_common::{Consumer, Identifier, MessageClient, PollingStrategy};
use integration::harness::{TestHarness, seeds};
use tokio::time::{sleep, timeout};

const CONSUMER_NAME: &str = "random_source_liveness_consumer";
const POLL_BATCH: u32 = 100;
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
const POLL_TIMEOUT: Duration = Duration::from_secs(5);

pub(crate) async fn assert_produces_messages(harness: &TestHarness) {
let client = harness.root_client().await.expect("root client");
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
let consumer_id: Identifier = CONSUMER_NAME.try_into().unwrap();

let poll = async {
loop {
if let Ok(polled) = client
.poll_messages(
&stream_id,
&topic_id,
None,
&Consumer::new(consumer_id.clone()),
&PollingStrategy::next(),
POLL_BATCH,
true,
)
.await
{
if !polled.messages.is_empty() {
return;
}
}

sleep(RETRY_INTERVAL).await;
}
};

timeout(POLL_TIMEOUT, poll).await.unwrap_or_else(|_| {
panic!(
"random source liveness timed out after {:?} waiting for messages",
POLL_TIMEOUT
)
})
}
Loading