Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion foreign/cpp/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ fn main() {
.compile("iggy-cpp-bridge");

println!("cargo:rerun-if-changed=src/client.rs");
println!("cargo:rerun-if-changed=src/consumer_group.rs");
println!("cargo:rerun-if-changed=src/consumer.rs");
println!("cargo:rerun-if-changed=src/producer.rs");
println!("cargo:rerun-if-changed=src/identifier.rs");
println!("cargo:rerun-if-changed=src/lib.rs");
println!("cargo:rerun-if-changed=src/messages.rs");
Expand Down
329 changes: 205 additions & 124 deletions foreign/cpp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@
use crate::{RUNTIME, ffi};
use iggy::prelude::{
Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, Consumer,
ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as RustIggyClient,
IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry,
IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, MessageClient, PartitionClient,
Partitioning, PollingStrategy, SnapshotCompression as RustSnapshotCompression, StreamClient,
SystemClient as RustSystemClient, SystemSnapshotType as RustSystemSnapshotType, TopicClient,
UserClient,
};
use iggy_common::{
CacheMetrics as RustCacheMetrics, CacheMetricsKey as RustCacheMetricsKey,
ClientInfo as RustClientInfo, ClientInfoDetails as RustClientInfoDetails, Stats as RustStats,
ConsumerGroupClient, ConsumerOffsetClient, Identifier as RustIdentifier,
IggyClient as RustIggyClient, IggyClientBuilder as RustIggyClientBuilder,
IggyExpiry as RustIggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize,
MessageClient, PartitionClient, Partitioning, PollingStrategy,
SnapshotCompression as RustSnapshotCompression, StreamClient, SystemClient as RustSystemClient,
SystemSnapshotType as RustSystemSnapshotType, TopicClient, UserClient,
};
use std::collections::HashSet;
use std::convert::TryFrom;
Expand All @@ -38,103 +34,6 @@ use std::sync::Arc;
/// partition based on the consumer/strategy. Cxx FFI does not support `Option<u32>`, so we
/// reserve `u32::MAX` as the sentinel for `partition_id`.
const ANY_PARTITION_ID: u32 = u32::MAX;
impl From<RustClientInfo> for ffi::ClientInfo {
fn from(client: RustClientInfo) -> Self {
let has_user_id = client.user_id.is_some();
ffi::ClientInfo {
client_id: client.client_id,
has_user_id,
user_id: client.user_id.unwrap_or(u32::MAX),
address: client.address,
transport: client.transport,
consumer_groups_count: client.consumer_groups_count,
}
}
}

impl From<RustClientInfoDetails> for ffi::ClientInfoDetails {
fn from(client: RustClientInfoDetails) -> Self {
let has_user_id = client.user_id.is_some();
ffi::ClientInfoDetails {
client_id: client.client_id,
has_user_id,
user_id: client.user_id.unwrap_or(u32::MAX),
address: client.address,
transport: client.transport,
consumer_groups_count: client.consumer_groups_count,
consumer_groups: client
.consumer_groups
.into_iter()
.map(ffi::ConsumerGroupInfo::from)
.collect(),
}
}
}

impl TryFrom<Option<RustClientInfoDetails>> for ffi::ClientInfoDetails {
type Error = String;

fn try_from(client: Option<RustClientInfoDetails>) -> Result<Self, Self::Error> {
match client {
Some(client) => Ok(ffi::ClientInfoDetails::from(client)),
None => Err("client not found".to_string()),
}
}
}

impl From<(RustCacheMetricsKey, RustCacheMetrics)> for ffi::CacheMetricEntry {
fn from((key, metrics): (RustCacheMetricsKey, RustCacheMetrics)) -> Self {
ffi::CacheMetricEntry {
stream_id: key.stream_id,
topic_id: key.topic_id,
partition_id: key.partition_id,
hits: metrics.hits,
misses: metrics.misses,
hit_ratio: metrics.hit_ratio,
}
}
}

impl From<RustStats> for ffi::Stats {
fn from(stats: RustStats) -> Self {
let has_server_semver = stats.iggy_server_semver.is_some();
ffi::Stats {
process_id: stats.process_id,
cpu_usage: stats.cpu_usage,
total_cpu_usage: stats.total_cpu_usage,
memory_usage: stats.memory_usage.as_bytes_u64(),
total_memory: stats.total_memory.as_bytes_u64(),
available_memory: stats.available_memory.as_bytes_u64(),
run_time_micros: stats.run_time.as_micros(),
start_time_epoch_micros: stats.start_time.as_micros(),
read_bytes: stats.read_bytes.as_bytes_u64(),
written_bytes: stats.written_bytes.as_bytes_u64(),
messages_size_bytes: stats.messages_size_bytes.as_bytes_u64(),
streams_count: stats.streams_count,
topics_count: stats.topics_count,
partitions_count: stats.partitions_count,
segments_count: stats.segments_count,
messages_count: stats.messages_count,
clients_count: stats.clients_count,
consumer_groups_count: stats.consumer_groups_count,
hostname: stats.hostname,
os_name: stats.os_name,
os_version: stats.os_version,
kernel_version: stats.kernel_version,
iggy_server_version: stats.iggy_server_version,
has_server_semver,
iggy_server_semver: stats.iggy_server_semver.unwrap_or(0),
cache_metrics: stats
.cache_metrics
.into_iter()
.map(ffi::CacheMetricEntry::from)
.collect(),
threads_count: stats.threads_count,
free_disk_space: stats.free_disk_space.as_bytes_u64(),
total_disk_space: stats.total_disk_space.as_bytes_u64(),
}
}
}

pub struct Client {
pub inner: Arc<RustIggyClient>,
Expand Down Expand Up @@ -190,6 +89,16 @@ impl Client {
})
}

pub fn logout_user(&self) -> Result<(), String> {
RUNTIME.block_on(async {
self.inner
.logout_user()
.await
.map_err(|error| format!("Could not logout user: {error}"))?;
Ok(())
})
}

pub fn connect(&self) -> Result<(), String> {
RUNTIME.block_on(async {
self.inner
Expand Down Expand Up @@ -222,6 +131,27 @@ impl Client {
})
}

pub fn update_stream(
&self,
stream_id: ffi::Identifier,
stream_name: String,
) -> Result<(), String> {
let rust_stream_id = RustIdentifier::try_from(stream_id)
.map_err(|error| format!("Could not update stream '{stream_name}': {error}"))?;

RUNTIME.block_on(async {
self.inner
.update_stream(&rust_stream_id, &stream_name)
.await
.map_err(|error| {
format!(
"Could not update stream '{rust_stream_id}' to '{stream_name}': {error}"
)
})?;
Ok(())
})
}

pub fn get_stream(&self, stream_id: ffi::Identifier) -> Result<ffi::StreamDetails, String> {
let rust_stream_id = RustIdentifier::try_from(stream_id)
.map_err(|error| format!("Could not get stream: {error}"))?;
Expand Down Expand Up @@ -330,6 +260,31 @@ impl Client {
})
}

pub fn flush_unsaved_buffer(
&self,
stream_id: ffi::Identifier,
topic_id: ffi::Identifier,
partition_id: u32,
fsync: bool,
) -> Result<(), String> {
let rust_stream_id = RustIdentifier::try_from(stream_id)
.map_err(|error| format!("Could not flush unsaved buffer: {error}"))?;
let rust_topic_id = RustIdentifier::try_from(topic_id)
.map_err(|error| format!("Could not flush unsaved buffer: {error}"))?;

RUNTIME.block_on(async {
self.inner
.flush_unsaved_buffer(&rust_stream_id, &rust_topic_id, partition_id, fsync)
.await
.map_err(|error| {
format!(
"Could not flush unsaved buffer for stream '{rust_stream_id}', topic '{rust_topic_id}', partition '{partition_id}': {error}"
)
})?;
Ok(())
})
}

#[allow(clippy::too_many_arguments)]
pub fn poll_messages(
&self,
Expand Down Expand Up @@ -822,6 +777,144 @@ impl Client {
})
}

pub fn store_consumer_offset(
&self,
stream_id: ffi::Identifier,
topic_id: ffi::Identifier,
partition_id: u32,
consumer_kind: String,
consumer_id: ffi::Identifier,
offset: u64,
) -> Result<(), String> {
let rust_stream_id = RustIdentifier::try_from(stream_id)
.map_err(|error| format!("Could not store consumer offset: {error}"))?;
let rust_topic_id = RustIdentifier::try_from(topic_id)
.map_err(|error| format!("Could not store consumer offset: {error}"))?;
let rust_consumer_id = RustIdentifier::try_from(consumer_id)
.map_err(|error| format!("Could not store consumer offset: {error}"))?;
let consumer = match consumer_kind.as_str() {
"consumer" => Consumer::new(rust_consumer_id),
"consumer_group" => Consumer::group(rust_consumer_id),
_ => {
return Err(format!(
"Could not store consumer offset: invalid consumer kind: {consumer_kind}"
));
}
};
let partition_id = if partition_id == ANY_PARTITION_ID {
None
} else {
Some(partition_id)
};

RUNTIME.block_on(async {
self.inner
.store_consumer_offset(
&consumer,
&rust_stream_id,
&rust_topic_id,
partition_id,
offset,
)
.await
.map_err(|error| {
format!(
"Could not store consumer offset for stream '{rust_stream_id}', topic '{rust_topic_id}': {error}"
)
})?;
Ok(())
})
}

pub fn get_consumer_offset(
&self,
stream_id: ffi::Identifier,
topic_id: ffi::Identifier,
partition_id: u32,
consumer_kind: String,
consumer_id: ffi::Identifier,
) -> Result<ffi::ConsumerOffsetInfo, String> {
let rust_stream_id = RustIdentifier::try_from(stream_id)
.map_err(|error| format!("Could not get consumer offset: {error}"))?;
let rust_topic_id = RustIdentifier::try_from(topic_id)
.map_err(|error| format!("Could not get consumer offset: {error}"))?;
let rust_consumer_id = RustIdentifier::try_from(consumer_id)
.map_err(|error| format!("Could not get consumer offset: {error}"))?;
let consumer = match consumer_kind.as_str() {
"consumer" => Consumer::new(rust_consumer_id),
"consumer_group" => Consumer::group(rust_consumer_id),
_ => {
return Err(format!(
"Could not get consumer offset: invalid consumer kind: {consumer_kind}"
));
}
};
let partition_id = if partition_id == ANY_PARTITION_ID {
None
} else {
Some(partition_id)
};

RUNTIME.block_on(async {
let offset = self
.inner
.get_consumer_offset(&consumer, &rust_stream_id, &rust_topic_id, partition_id)
.await
.map_err(|error| {
format!(
"Could not get consumer offset for stream '{rust_stream_id}', topic '{rust_topic_id}': {error}"
)
})?;
ffi::ConsumerOffsetInfo::try_from(offset).map_err(|error| {
format!(
"Could not get consumer offset for stream '{rust_stream_id}', topic '{rust_topic_id}': {error}"
)
})
})
}

pub fn delete_consumer_offset(
&self,
stream_id: ffi::Identifier,
topic_id: ffi::Identifier,
partition_id: u32,
consumer_kind: String,
consumer_id: ffi::Identifier,
) -> Result<(), String> {
let rust_stream_id = RustIdentifier::try_from(stream_id)
.map_err(|error| format!("Could not delete consumer offset: {error}"))?;
let rust_topic_id = RustIdentifier::try_from(topic_id)
.map_err(|error| format!("Could not delete consumer offset: {error}"))?;
let rust_consumer_id = RustIdentifier::try_from(consumer_id)
.map_err(|error| format!("Could not delete consumer offset: {error}"))?;
let consumer = match consumer_kind.as_str() {
"consumer" => Consumer::new(rust_consumer_id),
"consumer_group" => Consumer::group(rust_consumer_id),
_ => {
return Err(format!(
"Could not delete consumer offset: invalid consumer kind: {consumer_kind}"
));
}
};
let partition_id = if partition_id == ANY_PARTITION_ID {
None
} else {
Some(partition_id)
};

RUNTIME.block_on(async {
self.inner
.delete_consumer_offset(&consumer, &rust_stream_id, &rust_topic_id, partition_id)
.await
.map_err(|error| {
format!(
"Could not delete consumer offset for stream '{rust_stream_id}', topic '{rust_topic_id}': {error}"
)
})?;
Ok(())
})
}

pub fn get_stats(&self) -> Result<ffi::Stats, String> {
RUNTIME.block_on(async {
let stats = self
Expand Down Expand Up @@ -958,23 +1051,11 @@ impl Client {
}

pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> {
if client.is_null() {
return Ok(());
}

// `Box::from_raw` below runs unconditionally, so the client is always released regardless
// of `logout_result`. The result is only used to surface a logout error to the caller — there
// is no leak path here.
let logout_result = RUNTIME.block_on(async { unsafe { &*client }.inner.logout_user().await });

unsafe {
drop(Box::from_raw(client));
if !client.is_null() {
unsafe {
drop(Box::from_raw(client));
}
}

match logout_result {
Ok(()) | Err(IggyError::Unauthenticated | IggyError::Disconnected) => Ok(()),
Err(error) => Err(format!(
"Could not logout user during deletion of client: {error}"
)),
}
Ok(())
}
Loading
Loading