diff --git a/foreign/cpp/build.rs b/foreign/cpp/build.rs index d2cbf0094c..5a45b56196 100644 --- a/foreign/cpp/build.rs +++ b/foreign/cpp/build.rs @@ -21,7 +21,8 @@ fn main() { .compile("iggy-cpp-bridge"); println!("cargo:rerun-if-changed=src/client.rs"); - println!("cargo:rerun-if-changed=src/consumer_group.rs"); + println!("cargo:rerun-if-changed=src/consumer.rs"); + println!("cargo:rerun-if-changed=src/producer.rs"); println!("cargo:rerun-if-changed=src/identifier.rs"); println!("cargo:rerun-if-changed=src/lib.rs"); println!("cargo:rerun-if-changed=src/messages.rs"); diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index 4e35f9179e..743f310867 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -18,16 +18,12 @@ use crate::{RUNTIME, ffi}; use iggy::prelude::{ Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, Consumer, - ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as RustIggyClient, - IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry, - IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, MessageClient, PartitionClient, - Partitioning, PollingStrategy, SnapshotCompression as RustSnapshotCompression, StreamClient, - SystemClient as RustSystemClient, SystemSnapshotType as RustSystemSnapshotType, TopicClient, - UserClient, -}; -use iggy_common::{ - CacheMetrics as RustCacheMetrics, CacheMetricsKey as RustCacheMetricsKey, - ClientInfo as RustClientInfo, ClientInfoDetails as RustClientInfoDetails, Stats as RustStats, + ConsumerGroupClient, ConsumerOffsetClient, Identifier as RustIdentifier, + IggyClient as RustIggyClient, IggyClientBuilder as RustIggyClientBuilder, + IggyExpiry as RustIggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, + MessageClient, PartitionClient, Partitioning, PollingStrategy, + SnapshotCompression as RustSnapshotCompression, StreamClient, SystemClient as RustSystemClient, + SystemSnapshotType as RustSystemSnapshotType, TopicClient, UserClient, }; use std::collections::HashSet; use std::convert::TryFrom; @@ -38,103 +34,6 @@ use std::sync::Arc; /// partition based on the consumer/strategy. Cxx FFI does not support `Option`, so we /// reserve `u32::MAX` as the sentinel for `partition_id`. const ANY_PARTITION_ID: u32 = u32::MAX; -impl From for ffi::ClientInfo { - fn from(client: RustClientInfo) -> Self { - let has_user_id = client.user_id.is_some(); - ffi::ClientInfo { - client_id: client.client_id, - has_user_id, - user_id: client.user_id.unwrap_or(u32::MAX), - address: client.address, - transport: client.transport, - consumer_groups_count: client.consumer_groups_count, - } - } -} - -impl From for ffi::ClientInfoDetails { - fn from(client: RustClientInfoDetails) -> Self { - let has_user_id = client.user_id.is_some(); - ffi::ClientInfoDetails { - client_id: client.client_id, - has_user_id, - user_id: client.user_id.unwrap_or(u32::MAX), - address: client.address, - transport: client.transport, - consumer_groups_count: client.consumer_groups_count, - consumer_groups: client - .consumer_groups - .into_iter() - .map(ffi::ConsumerGroupInfo::from) - .collect(), - } - } -} - -impl TryFrom> for ffi::ClientInfoDetails { - type Error = String; - - fn try_from(client: Option) -> Result { - match client { - Some(client) => Ok(ffi::ClientInfoDetails::from(client)), - None => Err("client not found".to_string()), - } - } -} - -impl From<(RustCacheMetricsKey, RustCacheMetrics)> for ffi::CacheMetricEntry { - fn from((key, metrics): (RustCacheMetricsKey, RustCacheMetrics)) -> Self { - ffi::CacheMetricEntry { - stream_id: key.stream_id, - topic_id: key.topic_id, - partition_id: key.partition_id, - hits: metrics.hits, - misses: metrics.misses, - hit_ratio: metrics.hit_ratio, - } - } -} - -impl From for ffi::Stats { - fn from(stats: RustStats) -> Self { - let has_server_semver = stats.iggy_server_semver.is_some(); - ffi::Stats { - process_id: stats.process_id, - cpu_usage: stats.cpu_usage, - total_cpu_usage: stats.total_cpu_usage, - memory_usage: stats.memory_usage.as_bytes_u64(), - total_memory: stats.total_memory.as_bytes_u64(), - available_memory: stats.available_memory.as_bytes_u64(), - run_time_micros: stats.run_time.as_micros(), - start_time_epoch_micros: stats.start_time.as_micros(), - read_bytes: stats.read_bytes.as_bytes_u64(), - written_bytes: stats.written_bytes.as_bytes_u64(), - messages_size_bytes: stats.messages_size_bytes.as_bytes_u64(), - streams_count: stats.streams_count, - topics_count: stats.topics_count, - partitions_count: stats.partitions_count, - segments_count: stats.segments_count, - messages_count: stats.messages_count, - clients_count: stats.clients_count, - consumer_groups_count: stats.consumer_groups_count, - hostname: stats.hostname, - os_name: stats.os_name, - os_version: stats.os_version, - kernel_version: stats.kernel_version, - iggy_server_version: stats.iggy_server_version, - has_server_semver, - iggy_server_semver: stats.iggy_server_semver.unwrap_or(0), - cache_metrics: stats - .cache_metrics - .into_iter() - .map(ffi::CacheMetricEntry::from) - .collect(), - threads_count: stats.threads_count, - free_disk_space: stats.free_disk_space.as_bytes_u64(), - total_disk_space: stats.total_disk_space.as_bytes_u64(), - } - } -} pub struct Client { pub inner: Arc, @@ -190,6 +89,16 @@ impl Client { }) } + pub fn logout_user(&self) -> Result<(), String> { + RUNTIME.block_on(async { + self.inner + .logout_user() + .await + .map_err(|error| format!("Could not logout user: {error}"))?; + Ok(()) + }) + } + pub fn connect(&self) -> Result<(), String> { RUNTIME.block_on(async { self.inner @@ -222,6 +131,27 @@ impl Client { }) } + pub fn update_stream( + &self, + stream_id: ffi::Identifier, + stream_name: String, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id) + .map_err(|error| format!("Could not update stream '{stream_name}': {error}"))?; + + RUNTIME.block_on(async { + self.inner + .update_stream(&rust_stream_id, &stream_name) + .await + .map_err(|error| { + format!( + "Could not update stream '{rust_stream_id}' to '{stream_name}': {error}" + ) + })?; + Ok(()) + }) + } + pub fn get_stream(&self, stream_id: ffi::Identifier) -> Result { let rust_stream_id = RustIdentifier::try_from(stream_id) .map_err(|error| format!("Could not get stream: {error}"))?; @@ -330,6 +260,31 @@ impl Client { }) } + pub fn flush_unsaved_buffer( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + partition_id: u32, + fsync: bool, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id) + .map_err(|error| format!("Could not flush unsaved buffer: {error}"))?; + let rust_topic_id = RustIdentifier::try_from(topic_id) + .map_err(|error| format!("Could not flush unsaved buffer: {error}"))?; + + RUNTIME.block_on(async { + self.inner + .flush_unsaved_buffer(&rust_stream_id, &rust_topic_id, partition_id, fsync) + .await + .map_err(|error| { + format!( + "Could not flush unsaved buffer for stream '{rust_stream_id}', topic '{rust_topic_id}', partition '{partition_id}': {error}" + ) + })?; + Ok(()) + }) + } + #[allow(clippy::too_many_arguments)] pub fn poll_messages( &self, @@ -822,6 +777,144 @@ impl Client { }) } + pub fn store_consumer_offset( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: ffi::Identifier, + offset: u64, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id) + .map_err(|error| format!("Could not store consumer offset: {error}"))?; + let rust_topic_id = RustIdentifier::try_from(topic_id) + .map_err(|error| format!("Could not store consumer offset: {error}"))?; + let rust_consumer_id = RustIdentifier::try_from(consumer_id) + .map_err(|error| format!("Could not store consumer offset: {error}"))?; + let consumer = match consumer_kind.as_str() { + "consumer" => Consumer::new(rust_consumer_id), + "consumer_group" => Consumer::group(rust_consumer_id), + _ => { + return Err(format!( + "Could not store consumer offset: invalid consumer kind: {consumer_kind}" + )); + } + }; + let partition_id = if partition_id == ANY_PARTITION_ID { + None + } else { + Some(partition_id) + }; + + RUNTIME.block_on(async { + self.inner + .store_consumer_offset( + &consumer, + &rust_stream_id, + &rust_topic_id, + partition_id, + offset, + ) + .await + .map_err(|error| { + format!( + "Could not store consumer offset for stream '{rust_stream_id}', topic '{rust_topic_id}': {error}" + ) + })?; + Ok(()) + }) + } + + pub fn get_consumer_offset( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: ffi::Identifier, + ) -> Result { + let rust_stream_id = RustIdentifier::try_from(stream_id) + .map_err(|error| format!("Could not get consumer offset: {error}"))?; + let rust_topic_id = RustIdentifier::try_from(topic_id) + .map_err(|error| format!("Could not get consumer offset: {error}"))?; + let rust_consumer_id = RustIdentifier::try_from(consumer_id) + .map_err(|error| format!("Could not get consumer offset: {error}"))?; + let consumer = match consumer_kind.as_str() { + "consumer" => Consumer::new(rust_consumer_id), + "consumer_group" => Consumer::group(rust_consumer_id), + _ => { + return Err(format!( + "Could not get consumer offset: invalid consumer kind: {consumer_kind}" + )); + } + }; + let partition_id = if partition_id == ANY_PARTITION_ID { + None + } else { + Some(partition_id) + }; + + RUNTIME.block_on(async { + let offset = self + .inner + .get_consumer_offset(&consumer, &rust_stream_id, &rust_topic_id, partition_id) + .await + .map_err(|error| { + format!( + "Could not get consumer offset for stream '{rust_stream_id}', topic '{rust_topic_id}': {error}" + ) + })?; + ffi::ConsumerOffsetInfo::try_from(offset).map_err(|error| { + format!( + "Could not get consumer offset for stream '{rust_stream_id}', topic '{rust_topic_id}': {error}" + ) + }) + }) + } + + pub fn delete_consumer_offset( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: ffi::Identifier, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id) + .map_err(|error| format!("Could not delete consumer offset: {error}"))?; + let rust_topic_id = RustIdentifier::try_from(topic_id) + .map_err(|error| format!("Could not delete consumer offset: {error}"))?; + let rust_consumer_id = RustIdentifier::try_from(consumer_id) + .map_err(|error| format!("Could not delete consumer offset: {error}"))?; + let consumer = match consumer_kind.as_str() { + "consumer" => Consumer::new(rust_consumer_id), + "consumer_group" => Consumer::group(rust_consumer_id), + _ => { + return Err(format!( + "Could not delete consumer offset: invalid consumer kind: {consumer_kind}" + )); + } + }; + let partition_id = if partition_id == ANY_PARTITION_ID { + None + } else { + Some(partition_id) + }; + + RUNTIME.block_on(async { + self.inner + .delete_consumer_offset(&consumer, &rust_stream_id, &rust_topic_id, partition_id) + .await + .map_err(|error| { + format!( + "Could not delete consumer offset for stream '{rust_stream_id}', topic '{rust_topic_id}': {error}" + ) + })?; + Ok(()) + }) + } + pub fn get_stats(&self) -> Result { RUNTIME.block_on(async { let stats = self @@ -958,23 +1051,11 @@ impl Client { } pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> { - if client.is_null() { - return Ok(()); - } - - // `Box::from_raw` below runs unconditionally, so the client is always released regardless - // of `logout_result`. The result is only used to surface a logout error to the caller — there - // is no leak path here. - let logout_result = RUNTIME.block_on(async { unsafe { &*client }.inner.logout_user().await }); - - unsafe { - drop(Box::from_raw(client)); + if !client.is_null() { + unsafe { + drop(Box::from_raw(client)); + } } - match logout_result { - Ok(()) | Err(IggyError::Unauthenticated | IggyError::Disconnected) => Ok(()), - Err(error) => Err(format!( - "Could not logout user during deletion of client: {error}" - )), - } + Ok(()) } diff --git a/foreign/cpp/src/consumer.rs b/foreign/cpp/src/consumer.rs new file mode 100644 index 0000000000..147374d4ed --- /dev/null +++ b/foreign/cpp/src/consumer.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy::prelude::IggyConsumer as RustIggyConsumer; + +#[allow(dead_code)] +pub struct Consumer { + pub inner: RustIggyConsumer, +} diff --git a/foreign/cpp/src/consumer_group.rs b/foreign/cpp/src/consumer_group.rs deleted file mode 100644 index 1e20015bb5..0000000000 --- a/foreign/cpp/src/consumer_group.rs +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::ffi; -use iggy::prelude::ConsumerGroupDetails as RustConsumerGroupDetails; -use iggy_common::{ - ConsumerGroup as RustConsumerGroup, ConsumerGroupInfo as RustConsumerGroupInfo, - ConsumerGroupMember as RustConsumerGroupMember, -}; - -impl From for ffi::ConsumerGroupInfo { - fn from(group: RustConsumerGroupInfo) -> Self { - ffi::ConsumerGroupInfo { - stream_id: group.stream_id, - topic_id: group.topic_id, - group_id: group.group_id, - } - } -} - -impl From for ffi::ConsumerGroupMember { - fn from(member: RustConsumerGroupMember) -> Self { - ffi::ConsumerGroupMember { - id: member.id, - partitions_count: member.partitions_count, - partitions: member.partitions, - } - } -} - -impl From for ffi::ConsumerGroup { - fn from(group: RustConsumerGroup) -> Self { - ffi::ConsumerGroup { - id: group.id, - name: group.name, - partitions_count: group.partitions_count, - members_count: group.members_count, - } - } -} - -impl From for ffi::ConsumerGroupDetails { - fn from(group: RustConsumerGroupDetails) -> Self { - ffi::ConsumerGroupDetails { - id: group.id, - name: group.name, - partitions_count: group.partitions_count, - members_count: group.members_count, - members: group - .members - .into_iter() - .map(ffi::ConsumerGroupMember::from) - .collect(), - } - } -} diff --git a/foreign/cpp/src/identifier.rs b/foreign/cpp/src/identifier.rs index b6d2c60c7e..7e2dc64508 100644 --- a/foreign/cpp/src/identifier.rs +++ b/foreign/cpp/src/identifier.rs @@ -16,51 +16,7 @@ // under the License. use crate::ffi; -use iggy::prelude::{IdKind, Identifier as RustIdentifier, Validatable}; - -impl From for ffi::Identifier { - fn from(identifier: RustIdentifier) -> Self { - let kind = match identifier.kind { - IdKind::Numeric => "numeric".to_string(), - IdKind::String => "string".to_string(), - }; - - ffi::Identifier { - kind, - length: identifier.length, - value: identifier.value, - } - } -} - -impl TryFrom for RustIdentifier { - type Error = String; - - fn try_from(identifier: ffi::Identifier) -> Result { - let kind = match identifier.kind.as_str() { - "numeric" => IdKind::Numeric, - "string" => IdKind::String, - _ => { - return Err(format!( - "unsupported identifier kind '{}'. Expected 'numeric' or 'string'.", - identifier.kind - )); - } - }; - - let rust_identifier = RustIdentifier { - kind, - length: identifier.length, - value: identifier.value, - }; - - rust_identifier - .validate() - .map_err(|error| format!("invalid identifier: {error}"))?; - - Ok(rust_identifier) - } -} +use iggy::prelude::Identifier as RustIdentifier; impl ffi::Identifier { pub fn set_string(&mut self, id: String) -> Result<(), String> { diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index 7d4bfb9c5a..febcd9db53 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -16,14 +16,16 @@ // under the License. mod client; -mod consumer_group; +mod consumer; mod identifier; mod messages; -mod stream; -mod topic; +mod producer; +mod type_conversion; -use client::{Client, delete_connection, new_connection}; +use client::{Client, delete_connection as delete_client, new_connection}; +use consumer::Consumer; use messages::make_message; +use producer::Producer; use std::sync::LazyLock; static RUNTIME: LazyLock = LazyLock::new(|| { @@ -151,6 +153,12 @@ mod ffi { group_id: u32, } + struct ConsumerOffsetInfo { + partition_id: u32, + current_offset: u64, + stored_offset: u64, + } + struct ClientInfo { client_id: u32, has_user_id: bool, @@ -216,12 +224,16 @@ mod ffi { extern "Rust" { type Client; + type Consumer; + type Producer; // Client functions fn new_connection(connection_string: String) -> Result<*mut Client>; fn login_user(self: &Client, username: String, password: String) -> Result<()>; + fn logout_user(self: &Client) -> Result<()>; fn connect(self: &Client) -> Result<()>; fn create_stream(self: &Client, stream_name: String) -> Result; + fn update_stream(self: &Client, stream_id: Identifier, stream_name: String) -> Result<()>; fn get_streams(self: &Client) -> Result>; fn get_stream(self: &Client, stream_id: Identifier) -> Result; fn delete_stream(self: &Client, stream_id: Identifier) -> Result<()>; @@ -305,6 +317,31 @@ mod ffi { topic_id: Identifier, group_id: Identifier, ) -> Result<()>; + fn store_consumer_offset( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: Identifier, + offset: u64, + ) -> Result<()>; + fn get_consumer_offset( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: Identifier, + ) -> Result; + fn delete_consumer_offset( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: Identifier, + ) -> Result<()>; #[allow(clippy::too_many_arguments)] fn poll_messages( @@ -331,6 +368,13 @@ mod ffi { partitioning_value: Vec, messages: Vec, ) -> Result<()>; + fn flush_unsaved_buffer( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + partition_id: u32, + fsync: bool, + ) -> Result<()>; fn get_stats(self: &Client) -> Result; fn get_me(self: &Client) -> Result; fn get_client(self: &Client, client_id: u32) -> Result; @@ -343,10 +387,52 @@ mod ffi { snapshot_types: Vec, ) -> Result>; - unsafe fn delete_connection(client: *mut Client) -> Result<()>; + // Future functions + // fn disconnect(self: &Client) -> Result<()>; + // fn shutdown(self: &Client) -> Result<()>; + // fn subscribe_events(self: &Client) -> Result<()>; + // fn delete_segments(self: &Client, stream_id: Identifier, topic_id: Identifier, partition_id: u32, segments_count: u32) -> Result<()>; + // fn get_user(self: &Client, user_id: Identifier) -> Result<()>; + // fn get_users(self: &Client) -> Result<()>; + // fn create_user(self: &Client, username: String, password: String, status: u8) -> Result<()>; + // fn delete_user(self: &Client, user_id: Identifier) -> Result<()>; + // fn update_user(self: &Client, user_id: Identifier, username: String, status: u8) -> Result<()>; + // fn update_permissions(self: &Client, user_id: Identifier, permissions: Vec) -> Result<()>; + // fn change_password(self: &Client, user_id: Identifier, current_password: String, new_password: String) -> Result<()>; + // fn get_cluster_metadata(self: &Client) -> Result<()>; + // fn get_personal_access_tokens(self: &Client) -> Result<()>; + // fn create_personal_access_token(self: &Client, name: String, expiry: u64) -> Result<()>; + // fn delete_personal_access_token(self: &Client, name: String) -> Result<()>; + // fn login_with_personal_access_token(self: &Client, token: String) -> Result<()>; + + unsafe fn delete_client(client: *mut Client) -> Result<()>; // Identifier functions fn set_string(self: &mut Identifier, id: String) -> Result<()>; fn set_numeric(self: &mut Identifier, id: u32) -> Result<()>; + + // Consumer methods + // fn name(self: &Consumer) -> Result; + // fn topic(self: &Consumer) -> Result; + // fn stream(self: &Consumer) -> Result; + // fn partition_id(self: &Consumer) -> u32; + // fn store_offset(self: &Consumer, offset: u64, partition_id: u32) -> Result<()>; + // fn delete_offset(self: &Consumer, partition_id: u32) -> Result<()>; + // fn get_last_consumed_offset(self: &Consumer, partition_id: u32) -> Result; + // fn get_last_stored_offset(self: &Consumer, partition_id: u32) -> Result; + // fn init(self: &mut Consumer) -> Result<()>; + // fn shutdown(self: &mut Consumer) -> Result<()>; + // unsafe fn delete_consumer(consumer: *mut Consumer) -> Result<()>; + + // Producer methods + // fn stream(self: &Producer) -> Result; + // fn topic(self: &Producer) -> Result; + // fn init(self: &Producer) -> Result<()>; + // fn send(self: &Producer, messages: Vec) -> Result<()>; + // fn send_one(self: &Producer, message: IggyMessageToSend) -> Result<()>; + // fn send_with_partitioning(self: &Producer, partitioning_kind: String, partitioning_value: Vec, messages: Vec) -> Result<()>; + // fn send_to(self: &Producer, stream_id: Identifier, topic_id: Identifier, partitioning_kind: String, partitioning_value: Vec, messages: Vec) -> Result<()>; + // fn shutdown(self: &mut Producer) -> Result<()>; + // unsafe fn delete_producer(producer: *mut Producer) -> Result<()>; } } diff --git a/foreign/cpp/src/messages.rs b/foreign/cpp/src/messages.rs index 2a36c62f37..f39cdc1d49 100644 --- a/foreign/cpp/src/messages.rs +++ b/foreign/cpp/src/messages.rs @@ -16,8 +16,6 @@ // under the License. use crate::ffi; -use bytes::Bytes; -use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as RustPolledMessages}; pub fn make_message(payload: Vec) -> ffi::IggyMessageToSend { ffi::IggyMessageToSend { @@ -27,59 +25,3 @@ pub fn make_message(payload: Vec) -> ffi::IggyMessageToSend { user_headers: Vec::new(), } } - -impl From for ffi::IggyMessagePolled { - fn from(m: RustIggyMessage) -> Self { - let id_bytes = m.header.id.to_le_bytes(); - let id_lo = u64::from_le_bytes(id_bytes[0..8].try_into().unwrap()); - let id_hi = u64::from_le_bytes(id_bytes[8..16].try_into().unwrap()); - ffi::IggyMessagePolled { - checksum: m.header.checksum, - id_lo, - id_hi, - offset: m.header.offset, - timestamp: m.header.timestamp, - origin_timestamp: m.header.origin_timestamp, - user_headers_length: m.header.user_headers_length, - payload_length: m.header.payload_length, - reserved: m.header.reserved, - payload: m.payload.to_vec(), - user_headers: m.user_headers.map(|h| h.to_vec()).unwrap_or_default(), - } - } -} - -impl TryFrom for RustIggyMessage { - type Error = String; - - fn try_from(m: ffi::IggyMessageToSend) -> Result { - if !m.user_headers.is_empty() { - return Err( - "Could not convert message: user_headers are not yet supported in the C++ SDK" - .to_string(), - ); - } - let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128); - let payload = Bytes::from(m.payload); - RustIggyMessage::builder() - .id(id) - .payload(payload) - .build() - .map_err(|error| format!("Could not convert message: {error}")) - } -} - -impl From for ffi::PolledMessages { - fn from(p: RustPolledMessages) -> Self { - ffi::PolledMessages { - partition_id: p.partition_id, - current_offset: p.current_offset, - count: p.count, - messages: p - .messages - .into_iter() - .map(ffi::IggyMessagePolled::from) - .collect(), - } - } -} diff --git a/foreign/cpp/src/producer.rs b/foreign/cpp/src/producer.rs new file mode 100644 index 0000000000..85b87db1cd --- /dev/null +++ b/foreign/cpp/src/producer.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy::prelude::IggyProducer as RustIggyProducer; + +#[allow(dead_code)] +pub struct Producer { + pub inner: RustIggyProducer, +} diff --git a/foreign/cpp/src/stream.rs b/foreign/cpp/src/stream.rs deleted file mode 100644 index b6e0ae22bf..0000000000 --- a/foreign/cpp/src/stream.rs +++ /dev/null @@ -1,47 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::ffi; -use iggy::prelude::Stream as RustStream; -use iggy::prelude::StreamDetails as RustStreamDetails; - -impl From for ffi::Stream { - fn from(s: RustStream) -> Self { - ffi::Stream { - id: s.id, - created_at: s.created_at.as_micros(), - name: s.name, - size_bytes: s.size.as_bytes_u64(), - messages_count: s.messages_count, - topics_count: s.topics_count, - } - } -} - -impl From for ffi::StreamDetails { - fn from(stream: RustStreamDetails) -> Self { - ffi::StreamDetails { - id: stream.id, - created_at: stream.created_at.as_micros(), - name: stream.name, - size_bytes: stream.size.as_bytes_u64(), - messages_count: stream.messages_count, - topics_count: stream.topics_count, - topics: stream.topics.into_iter().map(ffi::Topic::from).collect(), - } - } -} diff --git a/foreign/cpp/src/topic.rs b/foreign/cpp/src/topic.rs deleted file mode 100644 index 58591f449c..0000000000 --- a/foreign/cpp/src/topic.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::ffi; -use iggy::prelude::{ - Partition as RustPartition, Topic as RustTopic, TopicDetails as RustTopicDetails, -}; - -impl From for ffi::Partition { - fn from(partition: RustPartition) -> Self { - ffi::Partition { - id: partition.id, - created_at: partition.created_at.as_micros(), - segments_count: partition.segments_count, - current_offset: partition.current_offset, - size_bytes: partition.size.as_bytes_u64(), - messages_count: partition.messages_count, - } - } -} - -impl From for ffi::Topic { - fn from(topic: RustTopic) -> Self { - ffi::Topic { - id: topic.id, - created_at: topic.created_at.as_micros(), - name: topic.name, - size_bytes: topic.size.as_bytes_u64(), - message_expiry: u64::from(topic.message_expiry), - compression_algorithm: topic.compression_algorithm.to_string(), - max_topic_size: u64::from(topic.max_topic_size), - replication_factor: topic.replication_factor, - messages_count: topic.messages_count, - partitions_count: topic.partitions_count, - } - } -} - -impl From for ffi::TopicDetails { - fn from(topic: RustTopicDetails) -> Self { - ffi::TopicDetails { - id: topic.id, - created_at: topic.created_at.as_micros(), - name: topic.name, - size_bytes: topic.size.as_bytes_u64(), - message_expiry: u64::from(topic.message_expiry), - compression_algorithm: topic.compression_algorithm.to_string(), - max_topic_size: u64::from(topic.max_topic_size), - replication_factor: topic.replication_factor, - messages_count: topic.messages_count, - partitions_count: topic.partitions_count, - partitions: topic - .partitions - .into_iter() - .map(ffi::Partition::from) - .collect(), - } - } -} diff --git a/foreign/cpp/src/type_conversion.rs b/foreign/cpp/src/type_conversion.rs new file mode 100644 index 0000000000..dd38c15478 --- /dev/null +++ b/foreign/cpp/src/type_conversion.rs @@ -0,0 +1,374 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ffi; +use bytes::Bytes; +use iggy::prelude::{ + ConsumerGroupDetails as RustConsumerGroupDetails, IdKind, Identifier as RustIdentifier, + IggyMessage as RustIggyMessage, Partition as RustPartition, + PolledMessages as RustPolledMessages, Stream as RustStream, StreamDetails as RustStreamDetails, + Topic as RustTopic, TopicDetails as RustTopicDetails, Validatable, +}; +use iggy_common::{ + CacheMetrics as RustCacheMetrics, CacheMetricsKey as RustCacheMetricsKey, + ClientInfo as RustClientInfo, ClientInfoDetails as RustClientInfoDetails, + ConsumerGroup as RustConsumerGroup, ConsumerGroupInfo as RustConsumerGroupInfo, + ConsumerGroupMember as RustConsumerGroupMember, ConsumerOffsetInfo as RustConsumerOffsetInfo, + Stats as RustStats, +}; + +impl From for ffi::Identifier { + fn from(identifier: RustIdentifier) -> Self { + let kind = match identifier.kind { + IdKind::Numeric => "numeric".to_string(), + IdKind::String => "string".to_string(), + }; + + ffi::Identifier { + kind, + length: identifier.length, + value: identifier.value, + } + } +} + +impl TryFrom for RustIdentifier { + type Error = String; + + fn try_from(identifier: ffi::Identifier) -> Result { + let kind = match identifier.kind.as_str() { + "numeric" => IdKind::Numeric, + "string" => IdKind::String, + _ => { + return Err(format!( + "unsupported identifier kind '{}'. Expected 'numeric' or 'string'.", + identifier.kind + )); + } + }; + + let rust_identifier = RustIdentifier { + kind, + length: identifier.length, + value: identifier.value, + }; + + rust_identifier + .validate() + .map_err(|error| format!("invalid identifier: {error}"))?; + + Ok(rust_identifier) + } +} + +impl From for ffi::ClientInfo { + fn from(client: RustClientInfo) -> Self { + let has_user_id = client.user_id.is_some(); + ffi::ClientInfo { + client_id: client.client_id, + has_user_id, + user_id: client.user_id.unwrap_or(u32::MAX), + address: client.address, + transport: client.transport, + consumer_groups_count: client.consumer_groups_count, + } + } +} + +impl From for ffi::ClientInfoDetails { + fn from(client: RustClientInfoDetails) -> Self { + let has_user_id = client.user_id.is_some(); + ffi::ClientInfoDetails { + client_id: client.client_id, + has_user_id, + user_id: client.user_id.unwrap_or(u32::MAX), + address: client.address, + transport: client.transport, + consumer_groups_count: client.consumer_groups_count, + consumer_groups: client + .consumer_groups + .into_iter() + .map(ffi::ConsumerGroupInfo::from) + .collect(), + } + } +} + +impl TryFrom> for ffi::ClientInfoDetails { + type Error = String; + + fn try_from(client: Option) -> Result { + match client { + Some(client) => Ok(ffi::ClientInfoDetails::from(client)), + None => Err("client not found".to_string()), + } + } +} + +impl TryFrom> for ffi::ConsumerOffsetInfo { + type Error = String; + + fn try_from(offset: Option) -> Result { + match offset { + Some(offset) => Ok(ffi::ConsumerOffsetInfo { + partition_id: offset.partition_id, + current_offset: offset.current_offset, + stored_offset: offset.stored_offset, + }), + None => Err("consumer offset not found".to_string()), + } + } +} + +impl From<(RustCacheMetricsKey, RustCacheMetrics)> for ffi::CacheMetricEntry { + fn from((key, metrics): (RustCacheMetricsKey, RustCacheMetrics)) -> Self { + ffi::CacheMetricEntry { + stream_id: key.stream_id, + topic_id: key.topic_id, + partition_id: key.partition_id, + hits: metrics.hits, + misses: metrics.misses, + hit_ratio: metrics.hit_ratio, + } + } +} + +impl From for ffi::Stats { + fn from(stats: RustStats) -> Self { + let has_server_semver = stats.iggy_server_semver.is_some(); + ffi::Stats { + process_id: stats.process_id, + cpu_usage: stats.cpu_usage, + total_cpu_usage: stats.total_cpu_usage, + memory_usage: stats.memory_usage.as_bytes_u64(), + total_memory: stats.total_memory.as_bytes_u64(), + available_memory: stats.available_memory.as_bytes_u64(), + run_time_micros: stats.run_time.as_micros(), + start_time_epoch_micros: stats.start_time.as_micros(), + read_bytes: stats.read_bytes.as_bytes_u64(), + written_bytes: stats.written_bytes.as_bytes_u64(), + messages_size_bytes: stats.messages_size_bytes.as_bytes_u64(), + streams_count: stats.streams_count, + topics_count: stats.topics_count, + partitions_count: stats.partitions_count, + segments_count: stats.segments_count, + messages_count: stats.messages_count, + clients_count: stats.clients_count, + consumer_groups_count: stats.consumer_groups_count, + hostname: stats.hostname, + os_name: stats.os_name, + os_version: stats.os_version, + kernel_version: stats.kernel_version, + iggy_server_version: stats.iggy_server_version, + has_server_semver, + iggy_server_semver: stats.iggy_server_semver.unwrap_or(0), + cache_metrics: stats + .cache_metrics + .into_iter() + .map(ffi::CacheMetricEntry::from) + .collect(), + threads_count: stats.threads_count, + free_disk_space: stats.free_disk_space.as_bytes_u64(), + total_disk_space: stats.total_disk_space.as_bytes_u64(), + } + } +} + +impl From for ffi::Partition { + fn from(partition: RustPartition) -> Self { + ffi::Partition { + id: partition.id, + created_at: partition.created_at.as_micros(), + segments_count: partition.segments_count, + current_offset: partition.current_offset, + size_bytes: partition.size.as_bytes_u64(), + messages_count: partition.messages_count, + } + } +} + +impl From for ffi::Topic { + fn from(topic: RustTopic) -> Self { + ffi::Topic { + id: topic.id, + created_at: topic.created_at.as_micros(), + name: topic.name, + size_bytes: topic.size.as_bytes_u64(), + message_expiry: u64::from(topic.message_expiry), + compression_algorithm: topic.compression_algorithm.to_string(), + max_topic_size: u64::from(topic.max_topic_size), + replication_factor: topic.replication_factor, + messages_count: topic.messages_count, + partitions_count: topic.partitions_count, + } + } +} + +impl From for ffi::TopicDetails { + fn from(topic: RustTopicDetails) -> Self { + ffi::TopicDetails { + id: topic.id, + created_at: topic.created_at.as_micros(), + name: topic.name, + size_bytes: topic.size.as_bytes_u64(), + message_expiry: u64::from(topic.message_expiry), + compression_algorithm: topic.compression_algorithm.to_string(), + max_topic_size: u64::from(topic.max_topic_size), + replication_factor: topic.replication_factor, + messages_count: topic.messages_count, + partitions_count: topic.partitions_count, + partitions: topic + .partitions + .into_iter() + .map(ffi::Partition::from) + .collect(), + } + } +} + +impl From for ffi::Stream { + fn from(stream: RustStream) -> Self { + ffi::Stream { + id: stream.id, + created_at: stream.created_at.as_micros(), + name: stream.name, + size_bytes: stream.size.as_bytes_u64(), + messages_count: stream.messages_count, + topics_count: stream.topics_count, + } + } +} + +impl From for ffi::StreamDetails { + fn from(stream: RustStreamDetails) -> Self { + ffi::StreamDetails { + id: stream.id, + created_at: stream.created_at.as_micros(), + name: stream.name, + size_bytes: stream.size.as_bytes_u64(), + messages_count: stream.messages_count, + topics_count: stream.topics_count, + topics: stream.topics.into_iter().map(ffi::Topic::from).collect(), + } + } +} + +impl From for ffi::ConsumerGroupInfo { + fn from(group: RustConsumerGroupInfo) -> Self { + ffi::ConsumerGroupInfo { + stream_id: group.stream_id, + topic_id: group.topic_id, + group_id: group.group_id, + } + } +} + +impl From for ffi::ConsumerGroupMember { + fn from(member: RustConsumerGroupMember) -> Self { + ffi::ConsumerGroupMember { + id: member.id, + partitions_count: member.partitions_count, + partitions: member.partitions, + } + } +} + +impl From for ffi::ConsumerGroup { + fn from(group: RustConsumerGroup) -> Self { + ffi::ConsumerGroup { + id: group.id, + name: group.name, + partitions_count: group.partitions_count, + members_count: group.members_count, + } + } +} + +impl From for ffi::ConsumerGroupDetails { + fn from(group: RustConsumerGroupDetails) -> Self { + ffi::ConsumerGroupDetails { + id: group.id, + name: group.name, + partitions_count: group.partitions_count, + members_count: group.members_count, + members: group + .members + .into_iter() + .map(ffi::ConsumerGroupMember::from) + .collect(), + } + } +} + +impl From for ffi::IggyMessagePolled { + fn from(message: RustIggyMessage) -> Self { + let id_bytes = message.header.id.to_le_bytes(); + let id_lo = u64::from_le_bytes(id_bytes[0..8].try_into().unwrap()); + let id_hi = u64::from_le_bytes(id_bytes[8..16].try_into().unwrap()); + ffi::IggyMessagePolled { + checksum: message.header.checksum, + id_lo, + id_hi, + offset: message.header.offset, + timestamp: message.header.timestamp, + origin_timestamp: message.header.origin_timestamp, + user_headers_length: message.header.user_headers_length, + payload_length: message.header.payload_length, + reserved: message.header.reserved, + payload: message.payload.to_vec(), + user_headers: message + .user_headers + .map(|headers| headers.to_vec()) + .unwrap_or_default(), + } + } +} + +impl TryFrom for RustIggyMessage { + type Error = String; + + fn try_from(message: ffi::IggyMessageToSend) -> Result { + if !message.user_headers.is_empty() { + return Err( + "Could not convert message: user_headers are not yet supported in the C++ SDK" + .to_string(), + ); + } + let id = ((message.id_hi as u128) << 64) | (message.id_lo as u128); + let payload = Bytes::from(message.payload); + RustIggyMessage::builder() + .id(id) + .payload(payload) + .build() + .map_err(|error| format!("Could not convert message: {error}")) + } +} + +impl From for ffi::PolledMessages { + fn from(messages: RustPolledMessages) -> Self { + ffi::PolledMessages { + partition_id: messages.partition_id, + current_offset: messages.current_offset, + count: messages.count, + messages: messages + .messages + .into_iter() + .map(ffi::IggyMessagePolled::from) + .collect(), + } + } +} diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp b/foreign/cpp/tests/client/low_level_e2e.cpp index f252a6b45e..177fdc8553 100644 --- a/foreign/cpp/tests/client/low_level_e2e.cpp +++ b/foreign/cpp/tests/client/low_level_e2e.cpp @@ -17,6 +17,8 @@ * under the License. */ +// TODO(slbotbm): Add tests for store_consumer_offset, get_consumer_offset, and delete_consumer_offset functions +// attached to client after implementing consumer group functions #include #include #include @@ -89,13 +91,54 @@ TEST_F(LowLevelE2E_Client, LoginTwiceWithDifferentCredentials) { } TEST_F(LowLevelE2E_Client, LogoutWithoutLogin) { - RecordProperty("description", "Allows deleting a new unauthenticated client without logging in."); + RecordProperty("description", + "Rejects logout before authentication, both before and after connect, then succeeds after login."); iggy::ffi::Client *client = nullptr; ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); ASSERT_NE(client, nullptr); + TrackClient(client); - ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); - client = nullptr; + ASSERT_THROW(client->logout_user(), std::exception); + + ASSERT_NO_THROW(client->connect()); + ASSERT_THROW(client->logout_user(), std::exception); + + ASSERT_NO_THROW(client->login_user("iggy", "iggy")); + ASSERT_NO_THROW(client->logout_user()); +} + +TEST_F(LowLevelE2E_Client, ReloginOnSameClientAfterLogout) { + RecordProperty("description", "Allows reauthenticating on the same connected client after a successful logout."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + TrackClient(client); + + iggy::ffi::ClientInfoDetails first_me{}; + iggy::ffi::ClientInfoDetails second_me{}; + ASSERT_NO_THROW(client->connect()); + ASSERT_NO_THROW(client->login_user("iggy", "iggy")); + ASSERT_NO_THROW({ first_me = client->get_me(); }); + ASSERT_NO_THROW(client->logout_user()); + ASSERT_NO_THROW(client->login_user("iggy", "iggy")); + ASSERT_NO_THROW({ second_me = client->get_me(); }); + + EXPECT_EQ(second_me.client_id, first_me.client_id); + EXPECT_EQ(second_me.user_id, first_me.user_id); +} + +TEST_F(LowLevelE2E_Client, LogoutErrorsWhenCalledMoreThanOnce) { + RecordProperty("description", + "Rejects repeated logout calls once the authenticated session has already logged out."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + TrackClient(client); + + ASSERT_NO_THROW(client->connect()); + ASSERT_NO_THROW(client->login_user("iggy", "iggy")); + ASSERT_NO_THROW(client->logout_user()); + ASSERT_THROW(client->logout_user(), std::exception); } TEST_F(LowLevelE2E_Client, DeleteWhileUnauthenticatedAfterFailedLogin) { @@ -106,7 +149,7 @@ TEST_F(LowLevelE2E_Client, DeleteWhileUnauthenticatedAfterFailedLogin) { ASSERT_NO_THROW(client->connect()); ASSERT_THROW(client->login_user("biggy", "biggy"), std::exception); - ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + ASSERT_NO_THROW(iggy::ffi::delete_client(client)); client = nullptr; } @@ -127,7 +170,7 @@ TEST_F(LowLevelE2E_Client, ConnectWithoutLoginThenDelete) { ASSERT_NE(client, nullptr); ASSERT_NO_THROW(client->connect()); - ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + ASSERT_NO_THROW(iggy::ffi::delete_client(client)); client = nullptr; } @@ -142,16 +185,16 @@ TEST_F(LowLevelE2E_Client, RepeatedClientMethodCallsHaveStableBehavior) { ASSERT_NO_THROW(client->connect()); ASSERT_NO_THROW(client->login_user("iggy", "iggy")); ASSERT_NO_THROW(client->login_user("iggy", "iggy")); - ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + ASSERT_NO_THROW(iggy::ffi::delete_client(client)); client = nullptr; - ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + ASSERT_NO_THROW(iggy::ffi::delete_client(client)); } TEST_F(LowLevelE2E_Client, DeleteNullConnectionIsNoop) { RecordProperty("description", "Treats deleting a null client pointer as a no-op."); iggy::ffi::Client *client = nullptr; - ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + ASSERT_NO_THROW(iggy::ffi::delete_client(client)); } TEST_F(LowLevelE2E_Client, GetStatsBeforeLoginThrows) { @@ -163,6 +206,172 @@ TEST_F(LowLevelE2E_Client, GetStatsBeforeLoginThrows) { ASSERT_THROW(client->get_stats(), std::exception); } +TEST_F(LowLevelE2E_Client, FlushUnsavedBufferSucceedsForExistingPartition) { + RecordProperty("description", + "Creates a stream and topic, sends one message, and flushes the partition buffer successfully."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + auto stream = client->get_stream(make_string_identifier(stream_name)); + TrackStream(stream.id); + ASSERT_NO_THROW(client->create_topic(make_numeric_identifier(stream.id), topic_name, 1, "none", 0, "never_expire", + 0, "server_default")); + + rust::Vec messages; + messages.push_back(iggy::ffi::make_message(to_payload("flush-me"))); + + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages))); + ASSERT_NO_THROW( + client->flush_unsaved_buffer(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, true)); +} + +TEST_F(LowLevelE2E_Client, FlushUnsavedBufferSucceedsForExistingEmptyPartition) { + RecordProperty("description", + "Succeeds when flush_unsaved_buffer is called for an existing partition with no unsaved messages."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + auto stream = client->get_stream(make_string_identifier(stream_name)); + TrackStream(stream.id); + ASSERT_NO_THROW(client->create_topic(make_numeric_identifier(stream.id), topic_name, 1, "none", 0, "never_expire", + 0, "server_default")); + + ASSERT_NO_THROW( + client->flush_unsaved_buffer(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, true)); +} + +TEST_F(LowLevelE2E_Client, FlushUnsavedBufferBeforeLoginThrows) { + RecordProperty("description", + "Throws when flush_unsaved_buffer is called before connect, and after connect but before login."); + iggy::ffi::Client *client = GetLoggedOutClient(); + + ASSERT_THROW(client->flush_unsaved_buffer(make_numeric_identifier(1), make_numeric_identifier(1), 0, true), + std::exception); + ASSERT_NO_THROW(client->connect()); + ASSERT_THROW(client->flush_unsaved_buffer(make_numeric_identifier(1), make_numeric_identifier(1), 0, true), + std::exception); +} + +TEST_F(LowLevelE2E_Client, FlushUnsavedBufferOnNonExistentStreamThrows) { + RecordProperty("description", "Throws when flush_unsaved_buffer is called for a stream that does not exist."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, "never_expire", + 0, "server_default")); + + ASSERT_THROW( + client->flush_unsaved_buffer(make_string_identifier(GetRandomName()), make_numeric_identifier(0), 0, true), + std::exception); +} + +TEST_F(LowLevelE2E_Client, FlushUnsavedBufferOnNonExistentTopicThrows) { + RecordProperty("description", "Throws when flush_unsaved_buffer is called for a topic that does not exist."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, "never_expire", + 0, "server_default")); + + ASSERT_THROW(client->flush_unsaved_buffer(make_string_identifier(stream_name), + make_string_identifier(GetRandomName()), 0, true), + std::exception); +} + +TEST_F(LowLevelE2E_Client, FlushUnsavedBufferAfterStreamDeletedThrows) { + RecordProperty("description", "Throws when flush_unsaved_buffer is called after the stream has been deleted."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + auto stream = client->get_stream(make_string_identifier(stream_name)); + TrackStream(stream.id); + ASSERT_NO_THROW(client->create_topic(make_numeric_identifier(stream.id), topic_name, 1, "none", 0, "never_expire", + 0, "server_default")); + + const std::uint32_t saved_stream_id = stream.id; + ASSERT_NO_THROW(client->delete_stream(make_numeric_identifier(saved_stream_id))); + ForgetTrackedStream(saved_stream_id); + + ASSERT_THROW( + client->flush_unsaved_buffer(make_numeric_identifier(saved_stream_id), make_numeric_identifier(0), 0, true), + std::exception); +} + +TEST_F(LowLevelE2E_Client, FlushUnsavedBufferAfterTopicDeletedThrows) { + RecordProperty("description", "Throws when flush_unsaved_buffer is called after the topic has been deleted."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + auto stream = client->get_stream(make_string_identifier(stream_name)); + TrackStream(stream.id); + ASSERT_NO_THROW(client->create_topic(make_numeric_identifier(stream.id), topic_name, 1, "none", 0, "never_expire", + 0, "server_default")); + ASSERT_NO_THROW(client->delete_topic(make_numeric_identifier(stream.id), make_string_identifier(topic_name))); + + ASSERT_THROW( + client->flush_unsaved_buffer(make_numeric_identifier(stream.id), make_string_identifier(topic_name), 0, true), + std::exception); +} + +TEST_F(LowLevelE2E_Client, FlushUnsavedBufferTwiceSucceeds) { + RecordProperty("description", "Allows flush_unsaved_buffer to be called twice in a row for the same partition."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + auto stream = client->get_stream(make_string_identifier(stream_name)); + TrackStream(stream.id); + ASSERT_NO_THROW(client->create_topic(make_numeric_identifier(stream.id), topic_name, 1, "none", 0, "never_expire", + 0, "server_default")); + + rust::Vec messages; + messages.push_back(iggy::ffi::make_message(to_payload("flush-twice"))); + + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages))); + ASSERT_NO_THROW( + client->flush_unsaved_buffer(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, true)); + ASSERT_NO_THROW( + client->flush_unsaved_buffer(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, true)); +} + +TEST_F(LowLevelE2E_Client, FlushUnsavedBufferWithInvalidPartitionIdsThrows) { + RecordProperty("description", "Throws when flush_unsaved_buffer is called for non-existent partition ids."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + auto stream = client->get_stream(make_string_identifier(stream_name)); + TrackStream(stream.id); + ASSERT_NO_THROW(client->create_topic(make_numeric_identifier(stream.id), topic_name, 1, "none", 0, "never_expire", + 0, "server_default")); + + const std::uint32_t invalid_partition_ids[] = {1u, 9999u, static_cast(-1)}; + for (const std::uint32_t invalid_partition_id : invalid_partition_ids) { + SCOPED_TRACE(invalid_partition_id); + ASSERT_THROW(client->flush_unsaved_buffer(make_numeric_identifier(stream.id), make_numeric_identifier(0), + invalid_partition_id, true), + std::exception); + } +} + // TODO(slbotbm): add a test to create some streams, topics, partitions, and segments, send messages, and create // consumer groups and verify it. TEST_F(LowLevelE2E_Client, GetStatsReturnsServerStats) { diff --git a/foreign/cpp/tests/common/test_helpers.hpp b/foreign/cpp/tests/common/test_helpers.hpp index ca73454383..a6f240998f 100644 --- a/foreign/cpp/tests/common/test_helpers.hpp +++ b/foreign/cpp/tests/common/test_helpers.hpp @@ -24,7 +24,6 @@ #include #include #include -#include #include @@ -66,6 +65,13 @@ inline rust::Vec make_snapshot_types(std::initializer_listdelete_stream(make_numeric_identifier(stream_id))); } - EXPECT_NO_THROW(iggy::ffi::delete_connection(cleanup_client)); + EXPECT_NO_THROW(iggy::ffi::delete_client(cleanup_client)); } tracked_stream_names_.clear(); tracked_stream_ids_.clear(); } + void CleanupConsumerGroups() { + if (tracked_consumer_groups_.empty()) { + return; + } + + iggy::ffi::Client *cleanup_client = nullptr; + EXPECT_NO_THROW({ cleanup_client = iggy::ffi::new_connection(""); }); + EXPECT_NE(cleanup_client, nullptr); + if (cleanup_client != nullptr) { + EXPECT_NO_THROW(cleanup_client->connect()); + EXPECT_NO_THROW(cleanup_client->login_user("iggy", "iggy")); + for (const auto &group : tracked_consumer_groups_) { + EXPECT_NO_THROW(cleanup_client->delete_consumer_group(make_string_identifier(group.stream_name), + make_string_identifier(group.topic_name), + make_string_identifier(group.group_name))); + } + EXPECT_NO_THROW(iggy::ffi::delete_client(cleanup_client)); + } + + tracked_consumer_groups_.clear(); + } + void CleanupStreamsBestEffort() noexcept { if (tracked_stream_names_.empty() && tracked_stream_ids_.empty()) { return; @@ -202,7 +258,7 @@ class E2ETestFixture : public ::testing::Test { if (cleanup_client != nullptr) { try { - iggy::ffi::delete_connection(cleanup_client); + iggy::ffi::delete_client(cleanup_client); } catch (...) { } } @@ -211,11 +267,41 @@ class E2ETestFixture : public ::testing::Test { tracked_stream_ids_.clear(); } + void CleanupConsumerGroupsBestEffort() noexcept { + if (tracked_consumer_groups_.empty()) { + return; + } + + iggy::ffi::Client *cleanup_client = nullptr; + try { + cleanup_client = iggy::ffi::new_connection(""); + if (cleanup_client != nullptr) { + cleanup_client->connect(); + cleanup_client->login_user("iggy", "iggy"); + for (const auto &group : tracked_consumer_groups_) { + cleanup_client->delete_consumer_group(make_string_identifier(group.stream_name), + make_string_identifier(group.topic_name), + make_string_identifier(group.group_name)); + } + } + } catch (...) { + } + + if (cleanup_client != nullptr) { + try { + iggy::ffi::delete_client(cleanup_client); + } catch (...) { + } + } + + tracked_consumer_groups_.clear(); + } + void CleanupClients() { for (iggy::ffi::Client *&client : clients_) { iggy::ffi::Client *client_to_delete = client; client = nullptr; - EXPECT_NO_THROW(iggy::ffi::delete_connection(client_to_delete)); + EXPECT_NO_THROW(iggy::ffi::delete_client(client_to_delete)); } clients_.clear(); } @@ -225,7 +311,7 @@ class E2ETestFixture : public ::testing::Test { iggy::ffi::Client *client_to_delete = client; client = nullptr; try { - iggy::ffi::delete_connection(client_to_delete); + iggy::ffi::delete_client(client_to_delete); } catch (...) { } } @@ -235,4 +321,5 @@ class E2ETestFixture : public ::testing::Test { std::vector clients_; std::vector tracked_stream_names_; std::vector tracked_stream_ids_; + std::vector tracked_consumer_groups_; }; diff --git a/foreign/cpp/tests/consumer_group/low_level_e2e.cpp b/foreign/cpp/tests/consumer_group/low_level_e2e.cpp index a27bb1459c..f48acbd19f 100644 --- a/foreign/cpp/tests/consumer_group/low_level_e2e.cpp +++ b/foreign/cpp/tests/consumer_group/low_level_e2e.cpp @@ -18,7 +18,6 @@ */ #include -#include #include #include @@ -43,44 +42,13 @@ TEST_F(LowLevelE2E_ConsumerGroup, CreateConsumerGroupSucceeds) { ASSERT_NO_THROW({ const auto group = client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name); + TrackConsumerGroup(stream_name, topic_name, group_name); ASSERT_EQ(group.name, group_name); ASSERT_EQ(group.members_count, 0u); ASSERT_TRUE(group.members.empty()); }); } -TEST_F(LowLevelE2E_ConsumerGroup, CreateConsumerGroupWithInvalidIdentifiersThrows) { - RecordProperty("description", "Rejects malformed stream and topic identifiers before creating a consumer group."); - const std::string stream_name = GetRandomName(); - const std::string topic_name = GetRandomName(); - iggy::ffi::Client *client = GetLoggedInClient(); - - ASSERT_NO_THROW(client->create_stream(stream_name)); - TrackStream(stream_name); - ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, - "server_default", 0, "server_default")); - - iggy::ffi::Identifier invalid_stream_id; - invalid_stream_id.kind = "invalid"; - invalid_stream_id.length = 4; - invalid_stream_id.value = {1, 2, 3, 4}; - - iggy::ffi::Identifier invalid_topic_id; - invalid_topic_id.kind = "numeric"; - invalid_topic_id.length = 3; - invalid_topic_id.value = {1, 2, 3}; - - const std::string invalid_stream_group_name = GetRandomName(); - const std::string invalid_topic_group_name = GetRandomName(); - - ASSERT_THROW(client->create_consumer_group(std::move(invalid_stream_id), make_string_identifier(topic_name), - invalid_stream_group_name), - std::exception); - ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name), std::move(invalid_topic_id), - invalid_topic_group_name), - std::exception); -} - TEST_F(LowLevelE2E_ConsumerGroup, CreateConsumerGroupOnNonExistentResourcesThrows) { RecordProperty("description", "Rejects creating a consumer group on streams or topics that do not exist."); const std::string stream_name = GetRandomName(); @@ -115,38 +83,12 @@ TEST_F(LowLevelE2E_ConsumerGroup, CreateConsumerGroupTwiceOnSameInputThrows) { "server_default", 0, "server_default")); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name)); - + TrackConsumerGroup(stream_name, topic_name, group_name); ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name), std::exception); } -TEST_F(LowLevelE2E_ConsumerGroup, CreateConsumerGroupWithNumericIdentifiersSucceeds) { - RecordProperty("description", - "Creates a consumer group successfully when stream and topic are addressed by numeric identifiers."); - const std::string stream_name = GetRandomName(); - const std::string topic_name = GetRandomName(); - const std::string group_name = GetRandomName(); - iggy::ffi::Client *client = GetLoggedInClient(); - - ASSERT_NO_THROW(client->create_stream(stream_name)); - TrackStream(stream_name); - ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, - "server_default", 0, "server_default")); - - const auto stream_details = client->get_stream(make_string_identifier(stream_name)); - ASSERT_EQ(stream_details.topics.size(), std::size_t{1}); - - ASSERT_NO_THROW({ - const auto group = - client->create_consumer_group(make_numeric_identifier(stream_details.id), - make_numeric_identifier(stream_details.topics[0].id), group_name); - ASSERT_EQ(group.name, group_name); - ASSERT_EQ(group.members_count, 0u); - ASSERT_TRUE(group.members.empty()); - }); -} - TEST_F(LowLevelE2E_ConsumerGroup, CreateConsumerGroupWithInvalidNamesThrows) { RecordProperty("description", "Rejects empty and overlong consumer group names."); const std::string stream_name = GetRandomName(); @@ -225,6 +167,8 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupReturnsSameInfoAsCreateConsume const auto created_group = client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name); + TrackConsumerGroup(stream_name, topic_name, group_name); + const auto fetched_group = client->get_consumer_group( make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)); @@ -250,8 +194,10 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsReturnsCreatedGroups) { ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), first_group_name)); + TrackConsumerGroup(stream_name, topic_name, first_group_name); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), second_group_name)); + TrackConsumerGroup(stream_name, topic_name, second_group_name); const auto groups = client->get_consumer_groups(make_string_identifier(stream_name), make_string_identifier(topic_name)); @@ -259,17 +205,6 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsReturnsCreatedGroups) { EXPECT_EQ(groups.size(), std::size_t{2}); EXPECT_EQ(groups[0].name, first_group_name); EXPECT_EQ(groups[1].name, second_group_name); - - ASSERT_NO_THROW(client->delete_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), - make_string_identifier(first_group_name))); - ASSERT_NO_THROW(client->delete_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), - make_string_identifier(second_group_name))); - - const auto groups_after_delete = - client->get_consumer_groups(make_string_identifier(stream_name), make_string_identifier(topic_name)); - EXPECT_TRUE(groups_after_delete.empty()); } TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsBeforeLoginThrows) { @@ -294,6 +229,433 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsBeforeLoginThrows) { std::exception); } +TEST_F(LowLevelE2E_ConsumerGroup, JoinConsumerGroupSucceeds) { + RecordProperty("description", "Joins an existing consumer group successfully."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + + ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); +} + +TEST_F(LowLevelE2E_ConsumerGroup, JoinConsumerGroupBeforeLoginThrows) { + RecordProperty("description", "Rejects join_consumer_group before connect, and after connect but before login."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + + iggy::ffi::Client *setup_client = GetLoggedInClient(); + ASSERT_NO_THROW(setup_client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(setup_client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(setup_client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + + iggy::ffi::Client *unauthenticated_client = GetLoggedOutClient(); + + ASSERT_THROW(unauthenticated_client->join_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); + ASSERT_NO_THROW(unauthenticated_client->connect()); + ASSERT_THROW(unauthenticated_client->join_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); +} + +TEST_F(LowLevelE2E_ConsumerGroup, JoinConsumerGroupOnNonExistentResourcesThrows) { + RecordProperty("description", "Rejects join_consumer_group for streams, topics, or groups that do not exist."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string created_group_name = GetRandomName(); + const std::string missing_stream_name = GetRandomName(); + const std::string missing_topic_name = GetRandomName(); + const std::string missing_group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), created_group_name)); + TrackConsumerGroup(stream_name, topic_name, created_group_name); + + ASSERT_THROW( + client->join_consumer_group(make_string_identifier(missing_stream_name), make_string_identifier(topic_name), + make_string_identifier(created_group_name)), + std::exception); + ASSERT_THROW( + client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(missing_topic_name), + make_string_identifier(created_group_name)), + std::exception); + ASSERT_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(missing_group_name)), + std::exception); +} + +TEST_F(LowLevelE2E_ConsumerGroup, JoinConsumerGroupAfterStreamDeletionThrows) { + RecordProperty("description", "Rejects join_consumer_group after deleting the stream that owned the group."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name))); + ForgetTrackedStream(stream_name); + + ASSERT_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); +} + +TEST_F(LowLevelE2E_ConsumerGroup, JoinConsumerGroupAfterTopicDeletionThrows) { + RecordProperty("description", "Rejects join_consumer_group after deleting the topic that owned the group."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + ASSERT_NO_THROW(client->delete_topic(make_string_identifier(stream_name), make_string_identifier(topic_name))); + ForgetTrackedConsumerGroup(stream_name, topic_name, group_name); + ASSERT_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); +} + +TEST_F(LowLevelE2E_ConsumerGroup, JoinConsumerGroupReflectsInGetConsumerGroup) { + RecordProperty("description", "Reflects a joined consumer group in get_consumer_group member details."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + + iggy::ffi::ConsumerGroupDetails created_group; + ASSERT_NO_THROW({ + created_group = client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name); + }); + TrackConsumerGroup(stream_name, topic_name, group_name); + ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + + const auto fetched_group = client->get_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)); + + EXPECT_EQ(fetched_group.id, created_group.id); + EXPECT_EQ(fetched_group.name, created_group.name); + EXPECT_EQ(fetched_group.partitions_count, created_group.partitions_count); + EXPECT_EQ(fetched_group.members_count, 1u); + ASSERT_EQ(fetched_group.members.size(), std::size_t{1}); +} + +TEST_F(LowLevelE2E_ConsumerGroup, JoinConsumerGroupTwiceKeepsSingleMember) { + RecordProperty("description", + "Allows joining the same consumer group twice in a row without duplicating membership."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + + ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + + const auto fetched_group = client->get_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)); + EXPECT_EQ(fetched_group.members_count, 1u); + ASSERT_EQ(fetched_group.members.size(), std::size_t{1}); +} + +TEST_F(LowLevelE2E_ConsumerGroup, JoinConsumerGroupFromTwoClientsIncreasesMembersCount) { + RecordProperty("description", "Reflects two joined clients as two members in the same consumer group."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *first = GetLoggedInClient(); + iggy::ffi::Client *second = GetLoggedInClient(); + + ASSERT_NO_THROW(first->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(first->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, "server_default", + 0, "server_default")); + ASSERT_NO_THROW(first->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + + ASSERT_NO_THROW(first->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + ASSERT_NO_THROW(second->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + + const auto fetched_group = first->get_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)); + EXPECT_EQ(fetched_group.members_count, 2u); + ASSERT_EQ(fetched_group.members.size(), std::size_t{2}); +} + +TEST_F(LowLevelE2E_ConsumerGroup, JoinConsumerGroupThenLeaveRestoresMembersCount) { + RecordProperty("description", "Restores the consumer group member count after a client joins and then leaves."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + + ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + auto joined_group = client->get_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)); + EXPECT_EQ(joined_group.members_count, 1u); + ASSERT_EQ(joined_group.members.size(), std::size_t{1}); + + ASSERT_NO_THROW(client->leave_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name))); + + const auto left_group = client->get_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)); + EXPECT_EQ(left_group.members_count, 0u); + EXPECT_TRUE(left_group.members.empty()); +} + +TEST_F(LowLevelE2E_ConsumerGroup, LeaveConsumerGroupReducesMembersCount) { + RecordProperty("description", "Reduces the consumer group member count after one of two joined clients leaves."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *first = GetLoggedInClient(); + iggy::ffi::Client *second = GetLoggedInClient(); + + ASSERT_NO_THROW(first->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(first->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, "server_default", + 0, "server_default")); + ASSERT_NO_THROW(first->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + + ASSERT_NO_THROW(first->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + ASSERT_NO_THROW(second->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + + const auto joined_group = first->get_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)); + EXPECT_EQ(joined_group.members_count, 2u); + ASSERT_EQ(joined_group.members.size(), std::size_t{2}); + + ASSERT_NO_THROW(second->leave_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name))); + + const auto left_group = first->get_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)); + EXPECT_EQ(left_group.members_count, 1u); + ASSERT_EQ(left_group.members.size(), std::size_t{1}); +} + +TEST_F(LowLevelE2E_ConsumerGroup, LeaveConsumerGroupBeforeLoginThrows) { + RecordProperty("description", "Rejects leave_consumer_group before connect, and after connect but before login."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + + iggy::ffi::Client *setup_client = GetLoggedInClient(); + ASSERT_NO_THROW(setup_client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(setup_client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(setup_client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + ASSERT_NO_THROW(setup_client->join_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name))); + + iggy::ffi::Client *unauthenticated_client = GetLoggedOutClient(); + + ASSERT_THROW(unauthenticated_client->leave_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); + ASSERT_NO_THROW(unauthenticated_client->connect()); + ASSERT_THROW(unauthenticated_client->leave_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); +} + +TEST_F(LowLevelE2E_ConsumerGroup, LeaveConsumerGroupOnNonExistentResourcesThrows) { + RecordProperty("description", "Rejects leave_consumer_group for streams, topics, or groups that do not exist."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string created_group_name = GetRandomName(); + const std::string missing_stream_name = GetRandomName(); + const std::string missing_topic_name = GetRandomName(); + const std::string missing_group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), created_group_name)); + TrackConsumerGroup(stream_name, topic_name, created_group_name); + ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(created_group_name))); + + ASSERT_THROW( + client->leave_consumer_group(make_string_identifier(missing_stream_name), make_string_identifier(topic_name), + make_string_identifier(created_group_name)), + std::exception); + ASSERT_THROW( + client->leave_consumer_group(make_string_identifier(stream_name), make_string_identifier(missing_topic_name), + make_string_identifier(created_group_name)), + std::exception); + ASSERT_THROW(client->leave_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(missing_group_name)), + std::exception); +} + +TEST_F(LowLevelE2E_ConsumerGroup, LeaveConsumerGroupAfterStreamDeletionThrows) { + RecordProperty("description", "Rejects leave_consumer_group after deleting the stream that owned the group."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name))); + ForgetTrackedStream(stream_name); + + ASSERT_THROW(client->leave_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); +} + +TEST_F(LowLevelE2E_ConsumerGroup, LeaveConsumerGroupAfterTopicDeletionThrows) { + RecordProperty("description", "Rejects leave_consumer_group after deleting the topic that owned the group."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + ASSERT_NO_THROW(client->delete_topic(make_string_identifier(stream_name), make_string_identifier(topic_name))); + ForgetTrackedConsumerGroup(stream_name, topic_name, group_name); + + ASSERT_THROW(client->leave_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); +} + +TEST_F(LowLevelE2E_ConsumerGroup, LeaveConsumerGroupTwiceThrows) { + RecordProperty("description", "Rejects leaving the same consumer group twice."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name))); + ASSERT_NO_THROW(client->leave_consumer_group( + make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name))); + + ASSERT_THROW(client->leave_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); +} + +TEST_F(LowLevelE2E_ConsumerGroup, LeaveConsumerGroupWithoutJoiningThrows) { + RecordProperty("description", "Rejects leaving a consumer group when the client is not a member."); + const std::string stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + const std::string group_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), + make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + + ASSERT_THROW(client->leave_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), + make_string_identifier(group_name)), + std::exception); +} + TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsReflectsJoinedGroupMembersCount) { RecordProperty("description", "Reflects a joined consumer group in get_consumer_groups members_count."); const std::string stream_name = GetRandomName(); @@ -313,10 +675,12 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsReflectsJoinedGroupMembersCou joined_group = client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), joined_group_name); }); + TrackConsumerGroup(stream_name, topic_name, joined_group_name); ASSERT_NO_THROW({ other_group = client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), other_group_name); }); + TrackConsumerGroup(stream_name, topic_name, other_group_name); ASSERT_NO_THROW(client->join_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(joined_group_name))); @@ -334,20 +698,6 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsReflectsJoinedGroupMembersCou EXPECT_EQ(groups[1].name, other_group.name); EXPECT_EQ(groups[1].members_count, other_group.members_count); EXPECT_NE(groups[0].members_count, groups[1].members_count); - - ASSERT_NO_THROW(client->leave_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), - make_string_identifier(joined_group_name))); - ASSERT_NO_THROW(client->delete_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), - make_string_identifier(joined_group_name))); - ASSERT_NO_THROW(client->delete_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), - make_string_identifier(other_group_name))); - - const auto groups_after_delete = - client->get_consumer_groups(make_string_identifier(stream_name), make_string_identifier(topic_name)); - EXPECT_TRUE(groups_after_delete.empty()); } TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsOnNonExistentStreamReturnsEmpty) { @@ -390,8 +740,10 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsIsStableAcrossBackToBackCalls ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), first_group_name)); + TrackConsumerGroup(stream_name, topic_name, first_group_name); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), second_group_name)); + TrackConsumerGroup(stream_name, topic_name, second_group_name); const auto first_groups = client->get_consumer_groups(make_string_identifier(stream_name), make_string_identifier(topic_name)); @@ -407,17 +759,6 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsIsStableAcrossBackToBackCalls EXPECT_EQ(second_groups[i].partitions_count, first_groups[i].partitions_count); EXPECT_EQ(second_groups[i].members_count, first_groups[i].members_count); } - - ASSERT_NO_THROW(client->delete_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), - make_string_identifier(first_group_name))); - ASSERT_NO_THROW(client->delete_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), - make_string_identifier(second_group_name))); - - const auto groups_after_delete = - client->get_consumer_groups(make_string_identifier(stream_name), make_string_identifier(topic_name)); - EXPECT_TRUE(groups_after_delete.empty()); } TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsReturnsCorrectNumberOfGroups) { @@ -436,21 +777,24 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsReturnsCorrectNumberOfGroups) ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), deleted_group_name)); + TrackConsumerGroup(stream_name, topic_name, deleted_group_name); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), other_deleted_name)); + TrackConsumerGroup(stream_name, topic_name, other_deleted_name); iggy::ffi::ConsumerGroupDetails remaining_group; ASSERT_NO_THROW({ remaining_group = client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), remaining_group_name); }); - + TrackConsumerGroup(stream_name, topic_name, remaining_group_name); ASSERT_NO_THROW(client->delete_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(deleted_group_name))); + ForgetTrackedConsumerGroup(stream_name, topic_name, deleted_group_name); ASSERT_NO_THROW(client->delete_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(other_deleted_name))); - + ForgetTrackedConsumerGroup(stream_name, topic_name, other_deleted_name); const auto groups = client->get_consumer_groups(make_string_identifier(stream_name), make_string_identifier(topic_name)); @@ -463,6 +807,7 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsReturnsCorrectNumberOfGroups) ASSERT_NO_THROW(client->delete_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(remaining_group_name))); + ForgetTrackedConsumerGroup(stream_name, topic_name, remaining_group_name); const auto groups_after_delete = client->get_consumer_groups(make_string_identifier(stream_name), make_string_identifier(topic_name)); @@ -483,9 +828,13 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsAfterStreamDeletionReturnsEmp "server_default", 0, "server_default")); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), first_group_name)); + TrackConsumerGroup(stream_name, topic_name, first_group_name); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), second_group_name)); + TrackConsumerGroup(stream_name, topic_name, second_group_name); ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name))); + ForgetTrackedConsumerGroup(stream_name, topic_name, first_group_name); + ForgetTrackedConsumerGroup(stream_name, topic_name, second_group_name); ForgetTrackedStream(stream_name); const auto groups = @@ -507,9 +856,14 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupsAfterTopicDeletionReturnsEmpt "server_default", 0, "server_default")); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), first_group_name)); + TrackConsumerGroup(stream_name, topic_name, first_group_name); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), second_group_name)); + TrackConsumerGroup(stream_name, topic_name, second_group_name); + ASSERT_NO_THROW(client->delete_topic(make_string_identifier(stream_name), make_string_identifier(topic_name))); + ForgetTrackedConsumerGroup(stream_name, topic_name, first_group_name); + ForgetTrackedConsumerGroup(stream_name, topic_name, second_group_name); const auto groups = client->get_consumer_groups(make_string_identifier(stream_name), make_string_identifier(topic_name)); @@ -529,6 +883,7 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupBeforeLoginThrows) { "server_default", 0, "server_default")); ASSERT_NO_THROW(setup_client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); iggy::ffi::Client *unauthenticated_client = GetLoggedOutClient(); @@ -543,28 +898,6 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupBeforeLoginThrows) { std::exception); } -TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupWithInvalidIdentifiersThrows) { - RecordProperty("description", "Rejects get_consumer_group when the stream or topic identifier is invalid."); - const std::string stream_name = GetRandomName(); - const std::string topic_name = GetRandomName(); - const std::string group_name = GetRandomName(); - iggy::ffi::Client *client = GetLoggedInClient(); - - ASSERT_NO_THROW(client->create_stream(stream_name)); - TrackStream(stream_name); - ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, - "server_default", 0, "server_default")); - ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), group_name)); - - ASSERT_THROW(client->get_consumer_group(make_string_identifier(""), make_string_identifier(topic_name), - make_string_identifier(group_name)), - std::exception); - ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name), make_string_identifier(""), - make_string_identifier(group_name)), - std::exception); -} - TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupOnNonExistentResourcesThrows) { RecordProperty("description", "Rejects get_consumer_group for streams, topics, or groups that do not exist."); const std::string stream_name = GetRandomName(); @@ -581,6 +914,7 @@ TEST_F(LowLevelE2E_ConsumerGroup, GetConsumerGroupOnNonExistentResourcesThrows) "server_default", 0, "server_default")); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), created_group_name)); + TrackConsumerGroup(stream_name, topic_name, created_group_name); ASSERT_THROW( client->get_consumer_group(make_string_identifier(missing_stream_name), make_string_identifier(topic_name), @@ -629,9 +963,11 @@ TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupSucceeds) { "server_default", 0, "server_default")); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); ASSERT_NO_THROW(client->delete_consumer_group( make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name))); + ForgetTrackedConsumerGroup(stream_name, topic_name, group_name); ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)), @@ -651,6 +987,7 @@ TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupBeforeLoginThrows) { "server_default", 0, "server_default")); ASSERT_NO_THROW(setup_client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); iggy::ffi::Client *unauthenticated_client = GetLoggedOutClient(); @@ -665,37 +1002,6 @@ TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupBeforeLoginThrows) { std::exception); } -TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupWithInvalidIdentifiersThrows) { - RecordProperty("description", - "Rejects delete_consumer_group when the stream, topic, or group identifier is invalid."); - const std::string stream_name = GetRandomName(); - const std::string topic_name = GetRandomName(); - const std::string group_name = GetRandomName(); - iggy::ffi::Client *client = GetLoggedInClient(); - - ASSERT_NO_THROW(client->create_stream(stream_name)); - TrackStream(stream_name); - ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, - "server_default", 0, "server_default")); - ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), group_name)); - - iggy::ffi::Identifier invalid_group_id; - invalid_group_id.kind = "numeric"; - invalid_group_id.length = 3; - invalid_group_id.value = {1, 2, 3}; - - ASSERT_THROW(client->delete_consumer_group(make_string_identifier(""), make_string_identifier(topic_name), - make_string_identifier(group_name)), - std::exception); - ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name), make_string_identifier(""), - make_string_identifier(group_name)), - std::exception); - ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), - std::move(invalid_group_id)), - std::exception); -} - TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupOnNonExistentResourcesThrows) { RecordProperty("description", "Rejects delete_consumer_group for streams, topics, or groups that do not exist."); const std::string stream_name = GetRandomName(); @@ -712,7 +1018,7 @@ TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupOnNonExistentResourcesThrow "server_default", 0, "server_default")); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), created_group_name)); - + TrackConsumerGroup(stream_name, topic_name, created_group_name); ASSERT_THROW( client->delete_consumer_group(make_string_identifier(missing_stream_name), make_string_identifier(topic_name), make_string_identifier(created_group_name)), @@ -739,40 +1045,15 @@ TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupTwiceThrows) { "server_default", 0, "server_default")); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); ASSERT_NO_THROW(client->delete_consumer_group( make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name))); - + ForgetTrackedConsumerGroup(stream_name, topic_name, group_name); ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)), std::exception); } -TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupWithNumericIdentifiersSucceeds) { - RecordProperty( - "description", - "Deletes a consumer group successfully when stream, topic, and group are addressed by numeric identifiers."); - const std::string stream_name = GetRandomName(); - const std::string topic_name = GetRandomName(); - const std::string group_name = GetRandomName(); - iggy::ffi::Client *client = GetLoggedInClient(); - - ASSERT_NO_THROW(client->create_stream(stream_name)); - TrackStream(stream_name); - ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), topic_name, 1, "none", 0, - "server_default", 0, "server_default")); - const auto created_group = client->create_consumer_group(make_string_identifier(stream_name), - make_string_identifier(topic_name), group_name); - const auto stream_details = client->get_stream(make_string_identifier(stream_name)); - - ASSERT_NO_THROW(client->delete_consumer_group(make_numeric_identifier(stream_details.id), - make_numeric_identifier(stream_details.topics[0].id), - make_numeric_identifier(created_group.id))); - - ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), - make_string_identifier(group_name)), - std::exception); -} - TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupAfterStreamDeletionThrows) { RecordProperty("description", "Rejects delete_consumer_group after deleting the stream that owned the consumer group."); @@ -787,9 +1068,11 @@ TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupAfterStreamDeletionThrows) "server_default", 0, "server_default")); ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name)); + TrackConsumerGroup(stream_name, topic_name, group_name); + ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name))); ForgetTrackedStream(stream_name); - + ForgetTrackedConsumerGroup(stream_name, topic_name, group_name); ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), make_string_identifier(group_name)), std::exception); @@ -816,6 +1099,7 @@ TEST_F(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupAndRecreateWithSameNameSucc ASSERT_NO_THROW({ const auto recreated_group = client->create_consumer_group(make_string_identifier(stream_name), make_string_identifier(topic_name), group_name); + TrackConsumerGroup(stream_name, topic_name, group_name); ASSERT_EQ(recreated_group.id, 0u); ASSERT_EQ(recreated_group.name, group_name); ASSERT_EQ(recreated_group.members_count, 0u); diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp index 1cdf1e0668..ef588c5270 100644 --- a/foreign/cpp/tests/message/low_level_e2e.cpp +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -17,7 +17,6 @@ * under the License. */ -#include #include #include #include diff --git a/foreign/cpp/tests/partition/low_level_e2e.cpp b/foreign/cpp/tests/partition/low_level_e2e.cpp index 4e50d5cd2f..517c2c6798 100644 --- a/foreign/cpp/tests/partition/low_level_e2e.cpp +++ b/foreign/cpp/tests/partition/low_level_e2e.cpp @@ -19,7 +19,6 @@ #include #include -#include #include diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp b/foreign/cpp/tests/stream/low_level_e2e.cpp index d134770d45..2cdd4fc52d 100644 --- a/foreign/cpp/tests/stream/low_level_e2e.cpp +++ b/foreign/cpp/tests/stream/low_level_e2e.cpp @@ -19,8 +19,6 @@ #include #include -#include -#include #include @@ -91,6 +89,283 @@ TEST_F(LowLevelE2E_Stream, CreateStreamWithEmojiName) { }); } +TEST_F(LowLevelE2E_Stream, UpdateStreamWorksCorrectly) { + RecordProperty("description", "Updates an existing stream name while preserving the stream identity."); + const std::string stream_name = GetRandomName(); + const std::string updated_stream_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + + iggy::ffi::StreamDetails original_stream_details{}; + ASSERT_NO_THROW({ original_stream_details = client->get_stream(make_string_identifier(stream_name)); }); + const std::uint32_t stream_id = original_stream_details.id; + ForgetTrackedStream(stream_name); + TrackStream(stream_id); + + ASSERT_NO_THROW(client->update_stream(make_string_identifier(stream_name), updated_stream_name)); + + ASSERT_THROW(client->get_stream(make_string_identifier(stream_name)), std::exception); + + iggy::ffi::StreamDetails updated_stream_details{}; + ASSERT_NO_THROW({ updated_stream_details = client->get_stream(make_numeric_identifier(stream_id)); }); + + EXPECT_EQ(updated_stream_details.id, original_stream_details.id); + EXPECT_EQ(updated_stream_details.created_at, original_stream_details.created_at); + EXPECT_EQ(updated_stream_details.name, updated_stream_name); + EXPECT_EQ(updated_stream_details.size_bytes, original_stream_details.size_bytes); + EXPECT_EQ(updated_stream_details.messages_count, original_stream_details.messages_count); + EXPECT_EQ(updated_stream_details.topics_count, original_stream_details.topics_count); + EXPECT_EQ(updated_stream_details.topics.size(), original_stream_details.topics.size()); +} + +TEST_F(LowLevelE2E_Stream, UpdateStreamWithSameNameIsIdempotent) { + RecordProperty("description", + "Calling update_stream with the current name succeeds without changing stream details."); + const std::string stream_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + + iggy::ffi::StreamDetails first_read{}; + iggy::ffi::StreamDetails second_read{}; + ASSERT_NO_THROW({ first_read = client->get_stream(make_string_identifier(stream_name)); }); + ASSERT_NO_THROW(client->update_stream(make_numeric_identifier(first_read.id), stream_name)); + ASSERT_NO_THROW({ second_read = client->get_stream(make_numeric_identifier(first_read.id)); }); + + EXPECT_EQ(second_read.id, first_read.id); + EXPECT_EQ(second_read.created_at, first_read.created_at); + EXPECT_EQ(second_read.name, first_read.name); + EXPECT_EQ(second_read.size_bytes, first_read.size_bytes); + EXPECT_EQ(second_read.messages_count, first_read.messages_count); + EXPECT_EQ(second_read.topics_count, first_read.topics_count); + EXPECT_EQ(second_read.topics.size(), first_read.topics.size()); +} + +TEST_F(LowLevelE2E_Stream, UpdateStreamBeforeLoginThrows) { + RecordProperty("description", "Rejects update_stream before connect, and after connect but before login."); + const std::string stream_name = GetRandomName(); + const std::string updated_stream_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + + iggy::ffi::Client *unauthenticated_client = GetLoggedOutClient(); + + ASSERT_THROW(unauthenticated_client->update_stream(make_string_identifier(stream_name), updated_stream_name), + std::exception); + ASSERT_NO_THROW(unauthenticated_client->connect()); + ASSERT_THROW(unauthenticated_client->update_stream(make_string_identifier(stream_name), updated_stream_name), + std::exception); +} + +TEST_F(LowLevelE2E_Stream, UpdateStreamWithVariousUtf8Characters) { + RecordProperty("description", "Updates a stream name with various UTF-8 values."); + const std::string stream_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + + std::uint32_t stream_id = 0; + ASSERT_NO_THROW({ + const auto stream_details = client->get_stream(make_string_identifier(stream_name)); + stream_id = stream_details.id; + }); + ForgetTrackedStream(stream_name); + TrackStream(stream_id); + + const std::vector updated_stream_names = { + "こんにちは世界", "안녕하세요세계", "你好世界", "مرحبا بالعالم", "नमस्ते दुनिया", "🚀🍕✨🎯🔥", + }; + + for (const auto &updated_stream_name : updated_stream_names) { + SCOPED_TRACE(updated_stream_name); + ASSERT_NO_THROW(client->update_stream(make_numeric_identifier(stream_id), updated_stream_name)); + ASSERT_NO_THROW({ + const auto stream_details = client->get_stream(make_numeric_identifier(stream_id)); + EXPECT_EQ(stream_details.name, updated_stream_name); + }); + } +} + +TEST_F(LowLevelE2E_Stream, UpdateNonExistentStreamThrows) { + RecordProperty("description", "Throws when updating a stream that does not exist."); + const std::string stream_name = GetRandomName(); + const std::string updated_stream_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_THROW(client->update_stream(make_string_identifier(stream_name), updated_stream_name), std::exception); +} + +TEST_F(LowLevelE2E_Stream, UpdateStreamWithDuplicateNameThrows) { + RecordProperty("description", "Rejects renaming a stream to another stream's existing name."); + const std::string first_stream_name = GetRandomName(); + const std::string second_stream_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(first_stream_name)); + TrackStream(first_stream_name); + ASSERT_NO_THROW(client->create_stream(second_stream_name)); + TrackStream(second_stream_name); + + ASSERT_THROW(client->update_stream(make_string_identifier(first_stream_name), second_stream_name), std::exception); +} + +TEST_F(LowLevelE2E_Stream, UpdateDeletedStreamThrows) { + RecordProperty("description", "Throws when updating a stream after it has been deleted."); + const std::string stream_name = GetRandomName(); + const std::string updated_stream_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + + ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name))); + ForgetTrackedStream(stream_name); + + ASSERT_THROW(client->update_stream(make_string_identifier(stream_name), updated_stream_name), std::exception); +} + +TEST_F(LowLevelE2E_Stream, UpdateStreamFailedValidationDoesNotMutateStream) { + RecordProperty("description", "Keeps the stream unchanged when update_stream fails wrapper validation."); + const std::string stream_name = GetRandomName(); + const std::string updated_stream_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + + iggy::ffi::StreamDetails stream_before_failed_update{}; + ASSERT_NO_THROW({ stream_before_failed_update = client->get_stream(make_string_identifier(stream_name)); }); + + iggy::ffi::Identifier invalid_numeric_id; + invalid_numeric_id.kind = "numeric"; + invalid_numeric_id.length = 1; + invalid_numeric_id.value.push_back(1); + ASSERT_THROW(client->update_stream(std::move(invalid_numeric_id), updated_stream_name), std::exception); + + iggy::ffi::StreamDetails stream_after_failed_update{}; + ASSERT_NO_THROW({ stream_after_failed_update = client->get_stream(make_string_identifier(stream_name)); }); + + EXPECT_EQ(stream_after_failed_update.id, stream_before_failed_update.id); + EXPECT_EQ(stream_after_failed_update.created_at, stream_before_failed_update.created_at); + EXPECT_EQ(stream_after_failed_update.name, stream_before_failed_update.name); + EXPECT_EQ(stream_after_failed_update.size_bytes, stream_before_failed_update.size_bytes); + EXPECT_EQ(stream_after_failed_update.messages_count, stream_before_failed_update.messages_count); + EXPECT_EQ(stream_after_failed_update.topics_count, stream_before_failed_update.topics_count); + EXPECT_EQ(stream_after_failed_update.topics.size(), stream_before_failed_update.topics.size()); + + ASSERT_THROW(client->get_stream(make_string_identifier(updated_stream_name)), std::exception); +} + +TEST_F(LowLevelE2E_Stream, UpdateStreamOnlyChangesName) { + RecordProperty( + "description", + "Changes only the stream name and leaves stream, topic, message, partition, and segment data intact."); + const std::string stream_name = GetRandomName(); + const std::string updated_stream_name = GetRandomName(); + const std::string topic_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + + std::uint32_t stream_id = 0; + ASSERT_NO_THROW({ + const auto stream_details = client->get_stream(make_string_identifier(stream_name)); + stream_id = stream_details.id; + }); + ForgetTrackedStream(stream_name); + TrackStream(stream_id); + + ASSERT_NO_THROW(client->create_topic(make_numeric_identifier(stream_id), topic_name, 2, "none", 0, "never_expire", + 0, "server_default")); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 3; ++i) { + auto message = iggy::ffi::make_message(to_payload("stream-update-preserve-" + std::to_string(i))); + messages.push_back(std::move(message)); + } + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream_id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages))); + + iggy::ffi::StreamDetails stream_before_update{}; + iggy::ffi::StreamDetails stream_after_update{}; + iggy::ffi::Stats stats_before_update{}; + iggy::ffi::Stats stats_after_update{}; + ASSERT_NO_THROW({ + stream_before_update = client->get_stream(make_numeric_identifier(stream_id)); + stats_before_update = client->get_stats(); + }); + + ASSERT_NO_THROW(client->update_stream(make_numeric_identifier(stream_id), updated_stream_name)); + + ASSERT_THROW(client->get_stream(make_string_identifier(stream_name)), std::exception); + ASSERT_NO_THROW({ + stream_after_update = client->get_stream(make_numeric_identifier(stream_id)); + stats_after_update = client->get_stats(); + }); + + EXPECT_EQ(stream_after_update.id, stream_before_update.id); + EXPECT_EQ(stream_after_update.created_at, stream_before_update.created_at); + EXPECT_EQ(stream_after_update.name, updated_stream_name); + EXPECT_EQ(stream_after_update.size_bytes, stream_before_update.size_bytes); + EXPECT_EQ(stream_after_update.messages_count, stream_before_update.messages_count); + EXPECT_EQ(stream_after_update.topics_count, stream_before_update.topics_count); + ASSERT_EQ(stream_before_update.topics.size(), 1u); + ASSERT_EQ(stream_after_update.topics.size(), 1u); + + const auto &before_topic = stream_before_update.topics[0]; + const auto &after_topic = stream_after_update.topics[0]; + EXPECT_EQ(after_topic.id, before_topic.id); + EXPECT_EQ(after_topic.created_at, before_topic.created_at); + EXPECT_EQ(after_topic.name, before_topic.name); + EXPECT_EQ(after_topic.size_bytes, before_topic.size_bytes); + EXPECT_EQ(after_topic.message_expiry, before_topic.message_expiry); + EXPECT_EQ(after_topic.compression_algorithm, before_topic.compression_algorithm); + EXPECT_EQ(after_topic.max_topic_size, before_topic.max_topic_size); + EXPECT_EQ(after_topic.replication_factor, before_topic.replication_factor); + EXPECT_EQ(after_topic.messages_count, before_topic.messages_count); + EXPECT_EQ(after_topic.partitions_count, before_topic.partitions_count); + + EXPECT_EQ(stats_after_update.streams_count, stats_before_update.streams_count); + EXPECT_EQ(stats_after_update.topics_count, stats_before_update.topics_count); + EXPECT_EQ(stats_after_update.partitions_count, stats_before_update.partitions_count); + EXPECT_EQ(stats_after_update.segments_count, stats_before_update.segments_count); + EXPECT_EQ(stats_after_update.messages_count, stats_before_update.messages_count); +} + +TEST_F(LowLevelE2E_Stream, UpdateStreamValidatesNameBounds) { + RecordProperty("description", + "Rejects invalid stream name lengths during update and accepts the maximum allowed name length."); + const std::string stream_name = GetRandomName(); + iggy::ffi::Client *client = GetLoggedInClient(); + + ASSERT_NO_THROW(client->create_stream(stream_name)); + TrackStream(stream_name); + + std::uint32_t stream_id = 0; + ASSERT_NO_THROW({ + const auto stream_details = client->get_stream(make_string_identifier(stream_name)); + stream_id = stream_details.id; + }); + ForgetTrackedStream(stream_name); + TrackStream(stream_id); + + const std::vector invalid_stream_names = { + "", + std::string(256, 'b'), + }; + for (const auto &invalid_stream_name : invalid_stream_names) { + SCOPED_TRACE("invalid_stream_name_length=" + std::to_string(invalid_stream_name.size())); + ASSERT_THROW(client->update_stream(make_numeric_identifier(stream_id), invalid_stream_name), std::exception); + } +} + TEST_F(LowLevelE2E_Stream, StreamCreatedAndDeletedSuccessfully) { RecordProperty("description", "Creates a stream and deletes it successfully by string identifier."); const std::string stream_name = GetRandomName(); diff --git a/foreign/cpp/tests/topic/low_level_e2e.cpp b/foreign/cpp/tests/topic/low_level_e2e.cpp index cb15b4141f..8a58e065c1 100644 --- a/foreign/cpp/tests/topic/low_level_e2e.cpp +++ b/foreign/cpp/tests/topic/low_level_e2e.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include