From 80d4abc1d0d385d5152c5bc5f166dce45736c327 Mon Sep 17 00:00:00 2001 From: chengxi Date: Fri, 5 Jun 2026 21:32:48 -0400 Subject: [PATCH 01/10] feat(sdk): support per-request timeout to TCP/QUIC/WebSocket clients --- core/common/src/error/iggy_error.rs | 3 + core/common/src/types/args/mod.rs | 14 + .../quic_config/quic_client_config.rs | 4 + .../quic_config/quic_client_config_builder.rs | 19 ++ .../tcp_config/tcp_client_config.rs | 4 + .../tcp_config/tcp_client_config_builder.rs | 19 ++ .../websocket_client_config.rs | 4 + .../websocket_client_config_builder.rs | 19 ++ .../tests/cli/general/test_help_command.rs | 5 + core/sdk/src/client_provider.rs | 3 + core/sdk/src/clients/client_builder.rs | 18 ++ core/sdk/src/quic/quic_client.rs | 132 +++++---- core/sdk/src/tcp/tcp_client.rs | 272 ++++++++++-------- core/sdk/src/websocket/websocket_client.rs | 271 ++++++++--------- examples/rust/src/shared/args.rs | 5 + 15 files changed, 485 insertions(+), 307 deletions(-) diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index 816d8fac57..0bf203cd1a 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -16,6 +16,7 @@ // under the License. use crate::Identifier; +use crate::IggyDuration; use crate::utils::topic_size::MaxTopicSize; use crate::{IggyMessage, utils::byte_size::IggyByteSize}; use std::sync::Arc; @@ -502,6 +503,8 @@ pub enum IggyError { CannotBindToSocket(String) = 12000, #[error("Task execution timeout")] TaskTimeout = 12001, + #[error("Request timeout after {0}")] + RequestTimeout(IggyDuration) = 12002, #[error("IO error: {0}")] IoError(String) = 13000, diff --git a/core/common/src/types/args/mod.rs b/core/common/src/types/args/mod.rs index f60b509740..f590da907c 100644 --- a/core/common/src/types/args/mod.rs +++ b/core/common/src/types/args/mod.rs @@ -218,6 +218,13 @@ pub struct ArgsOptional { #[arg(long)] #[serde(skip_serializing_if = "Option::is_none")] pub websocket_reconnection_interval: Option, + + /// The optional per-request timeout for send/receive operations + /// + /// [default: "300s"] + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub request_timeout: Option, } /// The arguments used by the `ClientProviderConfig` to create a client. @@ -351,6 +358,9 @@ pub struct Args { /// The optional TLS validate certificate for the WebSocket transport pub websocket_tls_validate_certificate: bool, + + /// The per-request timeout for send/receive operations + pub request_timeout: String, } const QUIC_TRANSPORT: &str = "quic"; @@ -416,6 +426,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, + request_timeout: "300s".to_string(), } } } @@ -517,6 +528,9 @@ impl From> for Args { { args.websocket_reconnection_interval = websocket_reconnection_interval; } + if let Some(request_timeout) = optional_args.request_timeout { + args.request_timeout = request_timeout; + } } args diff --git a/core/common/src/types/configuration/quic_config/quic_client_config.rs b/core/common/src/types/configuration/quic_config/quic_client_config.rs index b6e2b95e52..f186281770 100644 --- a/core/common/src/types/configuration/quic_config/quic_client_config.rs +++ b/core/common/src/types/configuration/quic_config/quic_client_config.rs @@ -54,6 +54,8 @@ pub struct QuicClientConfig { pub validate_certificate: bool, /// Interval of heartbeats sent by the client pub heartbeat_interval: IggyDuration, + /// Per-request timeout for send/receive operations. + pub request_timeout: IggyDuration, } impl Default for QuicClientConfig { @@ -64,6 +66,7 @@ impl Default for QuicClientConfig { server_name: "localhost".to_string(), auto_login: AutoLogin::Disabled, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("300s").unwrap(), reconnection: QuicClientReconnectionConfig::default(), response_buffer_size: 1000 * 1000 * 10, max_concurrent_bidi_streams: 10000, @@ -96,6 +99,7 @@ impl From> for QuicClientConfig { max_idle_timeout: connection_string.options().max_idle_timeout(), validate_certificate: connection_string.options().validate_certificate(), heartbeat_interval: connection_string.options().heartbeat_interval(), + request_timeout: IggyDuration::from_str("300s").unwrap(), } } } diff --git a/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs b/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs index 9af4e85024..62ec1a2431 100644 --- a/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs +++ b/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs @@ -152,6 +152,12 @@ impl QuicClientConfigBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config.request_timeout = request_timeout; + self + } + /// Finalizes the builder and returns the `QuicClientConfig`. pub fn build(mut self) -> Result { self.config.server_address = self.config.server_address.trim().to_owned(); @@ -164,6 +170,7 @@ impl QuicClientConfigBuilder { #[cfg(test)] mod tests { use super::*; + use std::str::FromStr; #[test] fn build_should_trim_and_validate_server_address() { @@ -193,4 +200,16 @@ mod tests { assert!(result.is_err()); } + + #[test] + fn with_request_timeout_should_override_default() { + let config = QuicClientConfigBuilder::default() + .with_request_timeout(IggyDuration::from_str("60s").unwrap()) + .build() + .unwrap(); + assert_eq!( + config.request_timeout, + IggyDuration::from_str("60s").unwrap() + ); + } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs index 86997c6d7a..dd76ac85dc 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs @@ -41,6 +41,8 @@ pub struct TcpClientConfig { pub reconnection: TcpClientReconnectionConfig, /// Interval of heartbeats sent by the client pub heartbeat_interval: IggyDuration, + /// Per-request timeout for send/receive operations. + pub request_timeout: IggyDuration, /// Disable Nagle algorithm for the TCP socket. pub nodelay: bool, } @@ -54,6 +56,7 @@ impl Default for TcpClientConfig { tls_ca_file: None, tls_validate_certificate: true, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("300s").unwrap(), auto_login: AutoLogin::Disabled, reconnection: TcpClientReconnectionConfig::default(), nodelay: false, @@ -73,6 +76,7 @@ impl From> for TcpClientConfig { tls_validate_certificate: true, reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), + request_timeout: IggyDuration::from_str("300s").unwrap(), nodelay: connection_string.options().nodelay(), } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs index 6f665a5777..54944ad4a1 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs @@ -101,6 +101,12 @@ impl TcpClientConfigBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config.request_timeout = request_timeout; + self + } + /// Builds the TCP client configuration. pub fn build(mut self) -> Result { self.config.server_address = self.config.server_address.trim().to_owned(); @@ -114,6 +120,7 @@ impl TcpClientConfigBuilder { mod tests { use super::*; use crate::IggyError; + use std::str::FromStr; fn builder_with_address(addr: &str) -> TcpClientConfigBuilder { let mut builder = TcpClientConfigBuilder::default(); @@ -187,4 +194,16 @@ mod tests { let builder = builder_with_address("iggy-server:8090"); assert!(builder.build().is_ok()); } + + #[test] + fn with_request_timeout_should_override_default() { + let config = TcpClientConfigBuilder::default() + .with_request_timeout(IggyDuration::from_str("60s").unwrap()) + .build() + .unwrap(); + assert_eq!( + config.request_timeout, + IggyDuration::from_str("60s").unwrap() + ); + } } diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs index d93c168a95..f757eab24a 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs @@ -33,6 +33,8 @@ pub struct WebSocketClientConfig { pub reconnection: WebSocketClientReconnectionConfig, /// Interval of heartbeats sent by the client pub heartbeat_interval: IggyDuration, + /// Per-request timeout for send/receive operations. + pub request_timeout: IggyDuration, /// WebSocket-specific configuration. pub ws_config: WebSocketConfig, /// Whether tls is enabled @@ -70,6 +72,7 @@ impl Default for WebSocketClientConfig { auto_login: AutoLogin::Disabled, reconnection: WebSocketClientReconnectionConfig::default(), heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("300s").unwrap(), ws_config: WebSocketConfig::default(), tls_enabled: false, tls_domain: "localhost".to_string(), @@ -156,6 +159,7 @@ impl From> for WebSocketClien auto_login: connection_string.auto_login().to_owned(), reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), + request_timeout: IggyDuration::from_str("300s").unwrap(), ws_config, tls_enabled: options.tls_enabled(), tls_domain: options.tls_domain().into(), diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs index 626c0e683e..1b1f16d0ba 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs @@ -136,6 +136,12 @@ impl WebSocketClientConfigBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config.request_timeout = request_timeout; + self + } + /// Builds the WebSocket client configuration. pub fn build(mut self) -> Result { self.config.server_address = self.config.server_address.trim().to_owned(); @@ -148,6 +154,7 @@ impl WebSocketClientConfigBuilder { #[cfg(test)] mod tests { use super::*; + use std::str::FromStr; #[test] fn build_should_trim_and_validate_server_address() { @@ -177,4 +184,16 @@ mod tests { assert!(result.is_err()); } + + #[test] + fn with_request_timeout_should_override_default() { + let config = WebSocketClientConfigBuilder::default() + .with_request_timeout(IggyDuration::from_str("60s").unwrap()) + .build() + .unwrap(); + assert_eq!( + config.request_timeout, + IggyDuration::from_str("60s").unwrap() + ); + } } diff --git a/core/integration/tests/cli/general/test_help_command.rs b/core/integration/tests/cli/general/test_help_command.rs index 91a55099b8..5da76ae228 100644 --- a/core/integration/tests/cli/general/test_help_command.rs +++ b/core/integration/tests/cli/general/test_help_command.rs @@ -190,6 +190,11 @@ Options: {CLAP_INDENT} [default: "1s"] + --request-timeout + The optional per-request timeout for send/receive operations +{CLAP_INDENT} + [default: "300s"] + -q, --quiet Quiet mode (disabled stdout printing) diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index 423b7048ea..60f55e1f9b 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -120,6 +120,7 @@ impl ClientProviderConfig { keep_alive_interval: args.quic_keep_alive_interval, max_idle_timeout: args.quic_max_idle_timeout, validate_certificate: args.quic_validate_certificate, + request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(), })); } TransportProtocol::Http => { @@ -148,6 +149,7 @@ impl ClientProviderConfig { ) .unwrap(), }, + request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(), auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( args.username, @@ -181,6 +183,7 @@ impl ClientProviderConfig { } else { AutoLogin::Disabled }, + request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(), ws_config: WebSocketConfig::default(), tls_enabled: args.websocket_tls_enabled, tls_domain: args.websocket_tls_domain, diff --git a/core/sdk/src/clients/client_builder.rs b/core/sdk/src/clients/client_builder.rs index 0af473b554..f07e6db5fa 100644 --- a/core/sdk/src/clients/client_builder.rs +++ b/core/sdk/src/clients/client_builder.rs @@ -219,6 +219,12 @@ impl TcpClientBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config = self.config.with_request_timeout(request_timeout); + self + } + /// Builds the parent `IggyClient` with TCP configuration. pub fn build(self) -> Result { let client = TcpClient::create(Arc::new(self.config.build()?))?; @@ -277,6 +283,12 @@ impl QuicClientBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config = self.config.with_request_timeout(request_timeout); + self + } + /// Builds the parent `IggyClient` with QUIC configuration. pub fn build(self) -> Result { let client = QuicClient::create(Arc::new(self.config.build()?))?; @@ -390,6 +402,12 @@ impl WebSocketClientBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config = self.config.with_request_timeout(request_timeout); + self + } + /// Builds the parent `IggyClient` with WebSocket configuration. pub fn build(self) -> Result { let client = WebSocketClient::create(Arc::new(self.config.build()?))?; diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 2aacab7185..d9017039dc 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -642,78 +642,86 @@ impl QuicClient { let connection = self.connection.clone(); let response_buffer_size = self.config.response_buffer_size; + let request_timeout = self.config.request_timeout; #[cfg(feature = "vsr")] let consensus_session = self.consensus_session.clone(); // SAFETY: we run code holding the `connection` lock in a task so we can't be cancelled while holding the lock. tokio::spawn(async move { - let connection = connection.lock().await; - if let Some(connection) = connection.as_ref() { - #[cfg(feature = "vsr")] - let (request_header, request_size) = { - let mut consensus_session = consensus_session - .lock() - .expect("consensus session mutex poisoned"); - crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? - }; - #[cfg(not(feature = "vsr"))] - let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; - let (mut send, mut recv) = connection.open_bi().await.map_err(|error| { - error!("Failed to open a bidirectional stream: {error}"); - IggyError::QuicError - })?; - trace!("Sending a QUIC request with code: {code}"); - #[cfg(feature = "vsr")] - trace!( - "Sending a QUIC VSR request of size {} with code: {code}", - request_size - ); - #[cfg(feature = "vsr")] - send.write_all(bytemuck::bytes_of(&request_header)) - .await - .map_err(|error| { - error!("Failed to write VSR request header: {error}"); + let io = async { + let connection = connection.lock().await; + if let Some(connection) = connection.as_ref() { + #[cfg(feature = "vsr")] + let (request_header, request_size) = { + let mut consensus_session = consensus_session + .lock() + .expect("consensus session mutex poisoned"); + crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] + let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + let (mut send, mut recv) = connection.open_bi().await.map_err(|error| { + error!("Failed to open a bidirectional stream: {error}"); + IggyError::QuicError + })?; + trace!("Sending a QUIC request with code: {code}"); + #[cfg(feature = "vsr")] + trace!( + "Sending a QUIC VSR request of size {} with code: {code}", + request_size + ); + #[cfg(feature = "vsr")] + send.write_all(bytemuck::bytes_of(&request_header)) + .await + .map_err(|error| { + error!("Failed to write VSR request header: {error}"); + IggyError::QuicError + })?; + #[cfg(feature = "vsr")] + if !payload.is_empty() { + send.write_all(&payload).await.map_err(|error| { + error!("Failed to write VSR request payload: {error}"); + IggyError::QuicError + })?; + } + #[cfg(feature = "vsr")] + send.finish().map_err(|error| { + error!("Failed to finish VSR request stream: {error}"); IggyError::QuicError })?; - #[cfg(feature = "vsr")] - if !payload.is_empty() { + #[cfg(not(feature = "vsr"))] + send.write_all(&(payload_length as u32).to_le_bytes()) + .await + .map_err(|error| { + error!("Failed to write payload length: {error}"); + IggyError::QuicError + })?; + #[cfg(not(feature = "vsr"))] + send.write_all(&code.to_le_bytes()).await.map_err(|error| { + error!("Failed to write payload code: {error}"); + IggyError::QuicError + })?; + #[cfg(not(feature = "vsr"))] send.write_all(&payload).await.map_err(|error| { - error!("Failed to write VSR request payload: {error}"); + error!("Failed to write payload: {error}"); IggyError::QuicError })?; - } - #[cfg(feature = "vsr")] - send.finish().map_err(|error| { - error!("Failed to finish VSR request stream: {error}"); - IggyError::QuicError - })?; - #[cfg(not(feature = "vsr"))] - send.write_all(&(payload_length as u32).to_le_bytes()) - .await - .map_err(|error| { - error!("Failed to write payload length: {error}"); + #[cfg(not(feature = "vsr"))] + send.finish().map_err(|error| { + error!("Failed to finish sending data: {error}"); IggyError::QuicError })?; - #[cfg(not(feature = "vsr"))] - send.write_all(&code.to_le_bytes()).await.map_err(|error| { - error!("Failed to write payload code: {error}"); - IggyError::QuicError - })?; - #[cfg(not(feature = "vsr"))] - send.write_all(&payload).await.map_err(|error| { - error!("Failed to write payload: {error}"); - IggyError::QuicError - })?; - #[cfg(not(feature = "vsr"))] - send.finish().map_err(|error| { - error!("Failed to finish sending data: {error}"); - IggyError::QuicError - })?; - trace!("Sent a QUIC request with code: {code}, waiting for a response..."); - return QuicClient::handle_response(&mut recv, response_buffer_size as usize).await; - } + trace!("Sent a QUIC request with code: {code}, waiting for a response..."); + return QuicClient::handle_response(&mut recv, response_buffer_size as usize) + .await; + } - error!("Cannot send data. Client is not connected."); - Err(IggyError::NotConnected) + error!("Cannot send data. Client is not connected."); + Err(IggyError::NotConnected) + }; + + tokio::time::timeout(request_timeout.get_duration(), io) + .await + .map_err(|_| IggyError::RequestTimeout(request_timeout))? }) .await .map_err(|e| { @@ -968,6 +976,10 @@ mod tests { quic_client_config.heartbeat_interval, IggyDuration::from_str("5s").unwrap() ); + assert_eq!( + quic_client_config.request_timeout, + IggyDuration::from_str("300s").unwrap() + ); assert!(quic_client_config.reconnection.enabled); assert!(quic_client_config.reconnection.max_retries.is_none()); diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index f6e6f4980e..2bc0dedb51 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -111,7 +111,6 @@ impl Client for TcpClient { } } -#[async_trait] #[async_trait] impl BinaryTransport for TcpClient { async fn get_state(&self) -> ClientState { @@ -681,144 +680,151 @@ impl TcpClient { } let stream = self.stream.clone(); + let request_timeout = self.config.request_timeout; #[cfg(feature = "vsr")] let consensus_session = self.consensus_session.clone(); // SAFETY: we run code holding the `stream` lock in a task so we can't be cancelled while holding the lock. let result = tokio::spawn(async move { - let mut stream = stream.lock().await; - if let Some(stream) = stream.as_mut() { - #[cfg(feature = "vsr")] - let (request_header, request_size) = { - let mut consensus_session = consensus_session - .lock() - .expect("consensus session mutex poisoned"); - crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? - }; - #[cfg(not(feature = "vsr"))] - let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; - #[cfg(feature = "vsr")] - trace!( - "Sending a TCP VSR request of size {} with code: {code}", - request_size - ); - #[cfg(not(feature = "vsr"))] - trace!("Sending a TCP request of size {payload_length} with code: {code}"); - #[cfg(feature = "vsr")] - stream.write(bytemuck::bytes_of(&request_header)).await?; - #[cfg(feature = "vsr")] - if !payload.is_empty() { - stream.write(&payload).await?; - } - #[cfg(not(feature = "vsr"))] - stream.write(&(payload_length as u32).to_le_bytes()).await?; - #[cfg(not(feature = "vsr"))] - stream.write(&code.to_le_bytes()).await?; - #[cfg(not(feature = "vsr"))] - stream.write(&payload).await?; - stream.flush().await?; - trace!("Sent a TCP request with code: {code}, waiting for a response..."); - #[cfg(feature = "vsr")] - { - let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; - // `stream.read` delegates to `read_exact`; on success it - // always returns the requested length, so no short-read - // guard is needed here. - // - // Deadline guards against server-side reply loss (e.g. a - // stalled replication quorum that never commits the op): - // the connection is lockstep, so an unanswered read would - // hold the stream lock forever and wedge every later - // request on this client. On expiry drop the stream -- - // a late reply would desync framing for the next request. - // - // One deadline spans BOTH the header and body reads: a - // reply that delivers a header then stalls must not get a - // fresh full timeout for the body (which would allow up to - // 2x `RESPONSE_READ_TIMEOUT` total). - let response_deadline = tokio::time::Instant::now() + RESPONSE_READ_TIMEOUT; - let header_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)) - .await; - let Ok(header_read) = header_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for VSR response header for TCP request with code: {code}", - ); - return Err(IggyError::Disconnected); + let io = async { + let mut stream = stream.lock().await; + if let Some(stream) = stream.as_mut() { + #[cfg(feature = "vsr")] + let (request_header, request_size) = { + let mut consensus_session = consensus_session + .lock() + .expect("consensus session mutex poisoned"); + crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? }; - header_read.map_err(|error| { - error!( - "Failed to read VSR response header for TCP request with code: {code}: {error}", - code = code, - error = error - ); - IggyError::Disconnected - })?; - - let response_size = crate::vsr::response_size(&response_header)?; - - let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; - let body = if body_size > 0 { - let mut body = BytesMut::with_capacity(body_size); - let body_read = tokio::time::timeout_at( - response_deadline, - stream.read_buf(&mut body, body_size), - ) - .await; - let Ok(body_read) = body_read else { + #[cfg(not(feature = "vsr"))] + let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(feature = "vsr")] + trace!( + "Sending a TCP VSR request of size {} with code: {code}", + request_size + ); + #[cfg(not(feature = "vsr"))] + trace!("Sending a TCP request of size {payload_length} with code: {code}"); + #[cfg(feature = "vsr")] + stream.write(bytemuck::bytes_of(&request_header)).await?; + #[cfg(feature = "vsr")] + if !payload.is_empty() { + stream.write(&payload).await?; + } + #[cfg(not(feature = "vsr"))] + stream.write(&(payload_length as u32).to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] + stream.write(&code.to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] + stream.write(&payload).await?; + stream.flush().await?; + trace!("Sent a TCP request with code: {code}, waiting for a response..."); + #[cfg(feature = "vsr")] + { + let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; + // `stream.read` delegates to `read_exact`; on success it + // always returns the requested length, so no short-read + // guard is needed here. + // + // Deadline guards against server-side reply loss (e.g. a + // stalled replication quorum that never commits the op): + // the connection is lockstep, so an unanswered read would + // hold the stream lock forever and wedge every later + // request on this client. On expiry drop the stream -- + // a late reply would desync framing for the next request. + // + // One deadline spans BOTH the header and body reads: a + // reply that delivers a header then stalls must not get a + // fresh full timeout for the body (which would allow up to + // 2x `RESPONSE_READ_TIMEOUT` total). + let response_deadline = tokio::time::Instant::now() + RESPONSE_READ_TIMEOUT; + let header_read = + tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)) + .await; + let Ok(header_read) = header_read else { error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for VSR response body for TCP request with code: {code}", + "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for VSR response header for TCP request with code: {code}", ); return Err(IggyError::Disconnected); }; - body_read.map_err(|error| { + header_read.map_err(|error| { error!( - "Failed to read VSR response body for TCP request with code: {code}: {error}", + "Failed to read VSR response header for TCP request with code: {code}: {error}", code = code, error = error ); IggyError::Disconnected })?; - body.freeze() - } else { - Bytes::new() - }; - return crate::vsr::decode_response_split(&response_header, body); - } + let response_size = crate::vsr::response_size(&response_header)?; - #[cfg(not(feature = "vsr"))] - { - let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { - error!( - "Failed to read response for TCP request with code: {code}: {error}", - code = code, - error = error - ); - IggyError::Disconnected - })?; + let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; + let body = if body_size > 0 { + let mut body = BytesMut::with_capacity(body_size); + let body_read = tokio::time::timeout_at( + response_deadline, + stream.read_buf(&mut body, body_size), + ) + .await; + let Ok(body_read) = body_read else { + error!( + "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for VSR response body for TCP request with code: {code}", + ); + return Err(IggyError::Disconnected); + }; + body_read.map_err(|error| { + error!( + "Failed to read VSR response body for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + body.freeze() + } else { + Bytes::new() + }; - if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { - error!("Received an invalid or empty response."); - return Err(IggyError::EmptyResponse); + return crate::vsr::decode_response_split(&response_header, body); } - let status = u32::from_le_bytes( - response_buffer[..4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - let length = u32::from_le_bytes( - response_buffer[4..] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - return TcpClient::handle_response(status, length, stream).await; + #[cfg(not(feature = "vsr"))] + { + let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { + error!( + "Failed to read response for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + + if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { + error!("Received an invalid or empty response."); + return Err(IggyError::EmptyResponse); + } + + let status = u32::from_le_bytes( + response_buffer[..4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + let length = u32::from_le_bytes( + response_buffer[4..] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + return TcpClient::handle_response(status, length, stream).await; + } } - } - error!("Cannot send data. Client is not connected."); - Err(IggyError::NotConnected) + error!("Cannot send data. Client is not connected."); + Err(IggyError::NotConnected) + }; + + tokio::time::timeout(request_timeout.get_duration(), io) + .await + .map_err(|_| IggyError::RequestTimeout(request_timeout))? }) .await .map_err(|e| { @@ -1038,6 +1044,10 @@ mod tests { tcp_client_config.heartbeat_interval, IggyDuration::from_str("5s").unwrap() ); + assert_eq!( + tcp_client_config.request_timeout, + IggyDuration::from_str("300s").unwrap() + ); assert!(tcp_client_config.reconnection.enabled); assert!(tcp_client_config.reconnection.max_retries.is_none()); @@ -1185,4 +1195,32 @@ mod tests { let tcp_client = TcpClient::handle_response(0, 50, &mut stream).await; assert!(tcp_client.is_err()); } + + #[cfg(not(feature = "vsr"))] + #[tokio::test] + async fn should_return_request_timeout_when_server_does_not_respond() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + std::future::pending::<()>().await; + }); + + let config = TcpClientConfig { + server_address: addr.to_string(), + request_timeout: IggyDuration::from_str("100ms").unwrap(), + ..Default::default() + }; + let client = TcpClient::create(Arc::new(config)).unwrap(); + + let tcp_stream = TcpStream::connect(addr).await.unwrap(); + let client_addr = tcp_stream.local_addr().unwrap(); + let stream = ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_addr, tcp_stream)); + *client.stream.lock().await = Some(stream); + *client.state.lock().await = ClientState::Connected; + + let result = client.send_raw(1, Bytes::new()).await; + assert!(matches!(result, Err(IggyError::RequestTimeout(_)))); + } } diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index e1a9838398..4940e8e29a 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -657,153 +657,160 @@ impl WebSocketClient { ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => {} } - let mut stream_guard = self.stream.lock().await; - let stream = stream_guard.as_mut().ok_or_else(|| { - trace!("Cannot send data. Client is not connected."); - IggyError::NotConnected - })?; + let io = async { + let mut stream_guard = self.stream.lock().await; + let stream = stream_guard.as_mut().ok_or_else(|| { + trace!("Cannot send data. Client is not connected."); + IggyError::NotConnected + })?; - #[cfg(feature = "vsr")] - let request = { - let mut consensus_session = self - .consensus_session - .lock() - .expect("consensus session mutex poisoned"); - crate::vsr::encode_contiguous_request(&mut consensus_session, code, &payload)? - }; - #[cfg(not(feature = "vsr"))] - let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; - #[cfg(not(feature = "vsr"))] - let mut request = BytesMut::with_capacity(4 + REQUEST_INITIAL_BYTES_LENGTH + payload.len()); - #[cfg(not(feature = "vsr"))] - request.put_u32_le(payload_length as u32); - #[cfg(not(feature = "vsr"))] - request.put_u32_le(code); - #[cfg(not(feature = "vsr"))] - request.put_slice(&payload); - - trace!( - "Sending {NAME} message with code: {}, payload size: {} bytes", - code, - payload.len() - ); - #[cfg(feature = "vsr")] - trace!( - "Sending {NAME} VSR request of size {} with code: {code}", - request.len() - ); + #[cfg(feature = "vsr")] + let request = { + let mut consensus_session = self + .consensus_session + .lock() + .expect("consensus session mutex poisoned"); + crate::vsr::encode_contiguous_request(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] + let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(not(feature = "vsr"))] + let mut request = + BytesMut::with_capacity(4 + REQUEST_INITIAL_BYTES_LENGTH + payload.len()); + #[cfg(not(feature = "vsr"))] + request.put_u32_le(payload_length as u32); + #[cfg(not(feature = "vsr"))] + request.put_u32_le(code); + #[cfg(not(feature = "vsr"))] + request.put_slice(&payload); - stream.write(&request).await?; - stream.flush().await?; + trace!( + "Sending {NAME} message with code: {}, payload size: {} bytes", + code, + payload.len() + ); + #[cfg(feature = "vsr")] + trace!( + "Sending {NAME} VSR request of size {} with code: {code}", + request.len() + ); - #[cfg(feature = "vsr")] - { - // Mirror the TCP path: header onto stack, body into its own - // buffer, `decode_response_split` slices without concatenation. - // Old path did `vec![0; HEADER_SIZE]` + `vec![0; body_size]` - // (two zero-fills) + `BytesMut::with_capacity(response_size)` + - // two `put_slice` memcopies per reply. - // One deadline spans both the header and body reads so a reply - // that delivers a header then stalls cannot wait up to 2x the - // timeout. On expiry drop the stream: the read runs inline here - // (no spawned task to cancel), so without an explicit drop a late - // reply would desync framing for the next request. - let response_deadline = tokio::time::Instant::now() + RESPONSE_READ_TIMEOUT; - let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; - let header_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)).await; - let Ok(header_read) = header_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for {NAME} VSR response header for request with code: {code}" - ); - *stream_guard = None; - return Err(IggyError::Disconnected); - }; - header_read?; - - let response_size = crate::vsr::response_size(&response_header)?; - let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; - let body = if body_size > 0 { - // `WebSocketStreamKind::read` reads into a slice without a - // zero-fill prerequisite; we still allocate `body_size` but - // skip the header concatenation. - let mut body = vec![0u8; body_size]; - let body_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut body)).await; - let Ok(body_read) = body_read else { + stream.write(&request).await?; + stream.flush().await?; + + #[cfg(feature = "vsr")] + { + // Mirror the TCP path: header onto stack, body into its own + // buffer, `decode_response_split` slices without concatenation. + // Old path did `vec![0; HEADER_SIZE]` + `vec![0; body_size]` + // (two zero-fills) + `BytesMut::with_capacity(response_size)` + + // two `put_slice` memcopies per reply. + // One deadline spans both the header and body reads so a reply + // that delivers a header then stalls cannot wait up to 2x the + // timeout. On expiry drop the stream: the read runs inline here + // (no spawned task to cancel), so without an explicit drop a late + // reply would desync framing for the next request. + let response_deadline = tokio::time::Instant::now() + RESPONSE_READ_TIMEOUT; + let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; + let header_read = + tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)).await; + let Ok(header_read) = header_read else { error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for {NAME} VSR response body for request with code: {code}" + "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for {NAME} VSR response header for request with code: {code}" ); *stream_guard = None; return Err(IggyError::Disconnected); }; - body_read?; - Bytes::from(body) - } else { - Bytes::new() - }; + header_read?; + + let response_size = crate::vsr::response_size(&response_header)?; + let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; + let body = if body_size > 0 { + // `WebSocketStreamKind::read` reads into a slice without a + // zero-fill prerequisite; we still allocate `body_size` but + // skip the header concatenation. + let mut body = vec![0u8; body_size]; + let body_read = + tokio::time::timeout_at(response_deadline, stream.read(&mut body)).await; + let Ok(body_read) = body_read else { + error!( + "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for {NAME} VSR response body for request with code: {code}" + ); + *stream_guard = None; + return Err(IggyError::Disconnected); + }; + body_read?; + Bytes::from(body) + } else { + Bytes::new() + }; + + crate::vsr::decode_response_split(&response_header, body) + } - crate::vsr::decode_response_split(&response_header, body) - } + #[cfg(not(feature = "vsr"))] + { + let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + stream.read(&mut response_initial_buffer).await?; + + let status = u32::from_le_bytes([ + response_initial_buffer[0], + response_initial_buffer[1], + response_initial_buffer[2], + response_initial_buffer[3], + ]); + + let length = u32::from_le_bytes([ + response_initial_buffer[4], + response_initial_buffer[5], + response_initial_buffer[6], + response_initial_buffer[7], + ]) as usize; + + trace!( + "Received {NAME} response status: {}, length: {} bytes", + status, length + ); - #[cfg(not(feature = "vsr"))] - { - let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - stream.read(&mut response_initial_buffer).await?; - - let status = u32::from_le_bytes([ - response_initial_buffer[0], - response_initial_buffer[1], - response_initial_buffer[2], - response_initial_buffer[3], - ]); - - let length = u32::from_le_bytes([ - response_initial_buffer[4], - response_initial_buffer[5], - response_initial_buffer[6], - response_initial_buffer[7], - ]) as usize; + if status != 0 { + // TEMP: See https://github.com/apache/iggy/pull/604 for context. + if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::UserAlreadyExists as u32 + || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 + || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 + { + debug!( + "Received a server resource already exists response: {} ({})", + status, + IggyError::from_code_as_string(status) + ) + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status), + ); + } - trace!( - "Received {NAME} response status: {}, length: {} bytes", - status, length - ); + return Err(IggyError::from_code(status)); + } - if status != 0 { - // TEMP: See https://github.com/apache/iggy/pull/604 for context. - if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::UserAlreadyExists as u32 - || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 - || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 - { - debug!( - "Received a server resource already exists response: {} ({})", - status, - IggyError::from_code_as_string(status) - ) - } else { - error!( - "Received an invalid response with status: {} ({}).", - status, - IggyError::from_code_as_string(status), - ); + if length == 0 { + return Ok(Bytes::new()); } - return Err(IggyError::from_code(status)); - } + let mut response_buffer = vec![0u8; length]; + stream.read(&mut response_buffer).await?; - if length == 0 { - return Ok(Bytes::new()); + trace!("Received {NAME} response payload, size: {} bytes", length); + Ok(Bytes::from(response_buffer)) } + }; - let mut response_buffer = vec![0u8; length]; - stream.read(&mut response_buffer).await?; - - trace!("Received {NAME} response payload, size: {} bytes", length); - Ok(Bytes::from(response_buffer)) - } + tokio::time::timeout(self.config.request_timeout.get_duration(), io) + .await + .map_err(|_| IggyError::RequestTimeout(self.config.request_timeout))? } } @@ -825,6 +832,10 @@ mod tests { client.config.heartbeat_interval, IggyDuration::from_str("5s").unwrap() ); + assert_eq!( + client.config.request_timeout, + IggyDuration::from_str("300s").unwrap() + ); assert!(matches!(client.config.auto_login, AutoLogin::Disabled)); assert!(client.config.reconnection.enabled); } diff --git a/examples/rust/src/shared/args.rs b/examples/rust/src/shared/args.rs index ef43f5fb40..4b61a48215 100644 --- a/examples/rust/src/shared/args.rs +++ b/examples/rust/src/shared/args.rs @@ -190,6 +190,9 @@ pub struct Args { #[arg(long, default_value = "false")] pub websocket_tls_validate_certificate: bool, + + #[arg(long, default_value = "300s")] + pub request_timeout: String, } impl Args { @@ -262,6 +265,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, + request_timeout: "300s".to_string(), } } } @@ -373,6 +377,7 @@ impl Args { websocket_tls_domain: self.websocket_tls_domain.clone(), websocket_tls_ca_file: self.websocket_tls_ca_file.clone(), websocket_tls_validate_certificate: self.websocket_tls_validate_certificate, + request_timeout: self.request_timeout.clone(), } } From f6da496855f46a3975f1bbe3af57cb0c4f892aea Mon Sep 17 00:00:00 2001 From: chengxi Date: Sun, 14 Jun 2026 22:59:30 -0400 Subject: [PATCH 02/10] chore(sdk): reduce request timeout from 300s to 30s --- core/common/src/types/args/mod.rs | 4 ++-- .../src/types/configuration/quic_config/quic_client_config.rs | 4 ++-- .../src/types/configuration/tcp_config/tcp_client_config.rs | 4 ++-- .../configuration/websocket_config/websocket_client_config.rs | 4 ++-- core/integration/tests/cli/general/test_help_command.rs | 2 +- core/sdk/src/quic/quic_client.rs | 2 +- core/sdk/src/tcp/tcp_client.rs | 2 +- core/sdk/src/websocket/websocket_client.rs | 2 +- examples/rust/src/shared/args.rs | 4 ++-- 9 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/common/src/types/args/mod.rs b/core/common/src/types/args/mod.rs index f590da907c..d9ea93167b 100644 --- a/core/common/src/types/args/mod.rs +++ b/core/common/src/types/args/mod.rs @@ -221,7 +221,7 @@ pub struct ArgsOptional { /// The optional per-request timeout for send/receive operations /// - /// [default: "300s"] + /// [default: "30s"] #[arg(long)] #[serde(skip_serializing_if = "Option::is_none")] pub request_timeout: Option, @@ -426,7 +426,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, - request_timeout: "300s".to_string(), + request_timeout: "30s".to_string(), } } } diff --git a/core/common/src/types/configuration/quic_config/quic_client_config.rs b/core/common/src/types/configuration/quic_config/quic_client_config.rs index f186281770..7350f6c04b 100644 --- a/core/common/src/types/configuration/quic_config/quic_client_config.rs +++ b/core/common/src/types/configuration/quic_config/quic_client_config.rs @@ -66,7 +66,7 @@ impl Default for QuicClientConfig { server_name: "localhost".to_string(), auto_login: AutoLogin::Disabled, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), reconnection: QuicClientReconnectionConfig::default(), response_buffer_size: 1000 * 1000 * 10, max_concurrent_bidi_streams: 10000, @@ -99,7 +99,7 @@ impl From> for QuicClientConfig { max_idle_timeout: connection_string.options().max_idle_timeout(), validate_certificate: connection_string.options().validate_certificate(), heartbeat_interval: connection_string.options().heartbeat_interval(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), } } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs index dd76ac85dc..578b70be13 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs @@ -56,7 +56,7 @@ impl Default for TcpClientConfig { tls_ca_file: None, tls_validate_certificate: true, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), auto_login: AutoLogin::Disabled, reconnection: TcpClientReconnectionConfig::default(), nodelay: false, @@ -76,7 +76,7 @@ impl From> for TcpClientConfig { tls_validate_certificate: true, reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), nodelay: connection_string.options().nodelay(), } } diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs index f757eab24a..f50183183f 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs @@ -72,7 +72,7 @@ impl Default for WebSocketClientConfig { auto_login: AutoLogin::Disabled, reconnection: WebSocketClientReconnectionConfig::default(), heartbeat_interval: IggyDuration::from_str("5s").unwrap(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), ws_config: WebSocketConfig::default(), tls_enabled: false, tls_domain: "localhost".to_string(), @@ -159,7 +159,7 @@ impl From> for WebSocketClien auto_login: connection_string.auto_login().to_owned(), reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), - request_timeout: IggyDuration::from_str("300s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), ws_config, tls_enabled: options.tls_enabled(), tls_domain: options.tls_domain().into(), diff --git a/core/integration/tests/cli/general/test_help_command.rs b/core/integration/tests/cli/general/test_help_command.rs index 5da76ae228..266bd9da9c 100644 --- a/core/integration/tests/cli/general/test_help_command.rs +++ b/core/integration/tests/cli/general/test_help_command.rs @@ -193,7 +193,7 @@ Options: --request-timeout The optional per-request timeout for send/receive operations {CLAP_INDENT} - [default: "300s"] + [default: "30s"] -q, --quiet Quiet mode (disabled stdout printing) diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index d9017039dc..8f26fd29c4 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -978,7 +978,7 @@ mod tests { ); assert_eq!( quic_client_config.request_timeout, - IggyDuration::from_str("300s").unwrap() + IggyDuration::from_str("30s").unwrap() ); assert!(quic_client_config.reconnection.enabled); diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 2bc0dedb51..c08491747d 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -1046,7 +1046,7 @@ mod tests { ); assert_eq!( tcp_client_config.request_timeout, - IggyDuration::from_str("300s").unwrap() + IggyDuration::from_str("30s").unwrap() ); assert!(tcp_client_config.reconnection.enabled); diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 4940e8e29a..dba5f5c9b1 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -834,7 +834,7 @@ mod tests { ); assert_eq!( client.config.request_timeout, - IggyDuration::from_str("300s").unwrap() + IggyDuration::from_str("30s").unwrap() ); assert!(matches!(client.config.auto_login, AutoLogin::Disabled)); assert!(client.config.reconnection.enabled); diff --git a/examples/rust/src/shared/args.rs b/examples/rust/src/shared/args.rs index 4b61a48215..c95451fdba 100644 --- a/examples/rust/src/shared/args.rs +++ b/examples/rust/src/shared/args.rs @@ -191,7 +191,7 @@ pub struct Args { #[arg(long, default_value = "false")] pub websocket_tls_validate_certificate: bool, - #[arg(long, default_value = "300s")] + #[arg(long, default_value = "30s")] pub request_timeout: String, } @@ -265,7 +265,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, - request_timeout: "300s".to_string(), + request_timeout: "30s".to_string(), } } } From 35166e438ba27e4f05a7d7c8e7fd98c7cd364547 Mon Sep 17 00:00:00 2001 From: chengxi Date: Mon, 15 Jun 2026 20:07:54 -0400 Subject: [PATCH 03/10] fix: reset stream and VSR session on request timeout --- core/sdk/src/quic/quic_client.rs | 16 ++++++++++++---- core/sdk/src/tcp/tcp_client.rs | 19 +++++++++++++++---- core/sdk/src/websocket/websocket_client.rs | 18 +++++++++++++++--- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 8f26fd29c4..9cb50233d4 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -645,7 +645,6 @@ impl QuicClient { let request_timeout = self.config.request_timeout; #[cfg(feature = "vsr")] let consensus_session = self.consensus_session.clone(); - // SAFETY: we run code holding the `connection` lock in a task so we can't be cancelled while holding the lock. tokio::spawn(async move { let io = async { let connection = connection.lock().await; @@ -719,9 +718,18 @@ impl QuicClient { Err(IggyError::NotConnected) }; - tokio::time::timeout(request_timeout.get_duration(), io) - .await - .map_err(|_| IggyError::RequestTimeout(request_timeout))? + match tokio::time::timeout(request_timeout.get_duration(), io).await { + Ok(result) => result, + Err(_) => { + #[cfg(feature = "vsr")] + { + *consensus_session + .lock() + .expect("consensus session mutex poisoned") = ConsensusSession::new(); + } + Err(IggyError::RequestTimeout(request_timeout)) + } + } }) .await .map_err(|e| { diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index c08491747d..b1e0699ecd 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -683,7 +683,6 @@ impl TcpClient { let request_timeout = self.config.request_timeout; #[cfg(feature = "vsr")] let consensus_session = self.consensus_session.clone(); - // SAFETY: we run code holding the `stream` lock in a task so we can't be cancelled while holding the lock. let result = tokio::spawn(async move { let io = async { let mut stream = stream.lock().await; @@ -822,9 +821,21 @@ impl TcpClient { Err(IggyError::NotConnected) }; - tokio::time::timeout(request_timeout.get_duration(), io) - .await - .map_err(|_| IggyError::RequestTimeout(request_timeout))? + match tokio::time::timeout(request_timeout.get_duration(), io).await { + Ok(result) => result, + Err(_) => { + // Reset to prevent response desync on the shared stream. + *stream.lock().await = None; + #[cfg(feature = "vsr")] + { + *consensus_session + .lock() + .expect("consensus session mutex poisoned") = + ConsensusSession::new(); + } + Err(IggyError::RequestTimeout(request_timeout)) + } + } }) .await .map_err(|e| { diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index dba5f5c9b1..d3f5d08d7b 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -808,9 +808,21 @@ impl WebSocketClient { } }; - tokio::time::timeout(self.config.request_timeout.get_duration(), io) - .await - .map_err(|_| IggyError::RequestTimeout(self.config.request_timeout))? + match tokio::time::timeout(self.config.request_timeout.get_duration(), io).await { + Ok(result) => result, + Err(_) => { + // Reset to prevent response desync on the shared stream. + *self.stream.lock().await = None; + #[cfg(feature = "vsr")] + { + *self + .consensus_session + .lock() + .expect("consensus session mutex poisoned") = ConsensusSession::new(); + } + Err(IggyError::RequestTimeout(self.config.request_timeout)) + } + } } } From dd32824936dea2b9e50a5f5cff7548b8666177e1 Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 16 Jun 2026 01:03:57 -0400 Subject: [PATCH 04/10] fix(sdk): avoid instant fail when request_timeout is zero --- core/sdk/src/quic/quic_client.rs | 23 +++++++++++------- core/sdk/src/tcp/tcp_client.rs | 28 ++++++++++++---------- core/sdk/src/websocket/websocket_client.rs | 28 ++++++++++++---------- 3 files changed, 46 insertions(+), 33 deletions(-) diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 9cb50233d4..e58b8eaa85 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -718,16 +718,21 @@ impl QuicClient { Err(IggyError::NotConnected) }; - match tokio::time::timeout(request_timeout.get_duration(), io).await { - Ok(result) => result, - Err(_) => { - #[cfg(feature = "vsr")] - { - *consensus_session - .lock() - .expect("consensus session mutex poisoned") = ConsensusSession::new(); + if request_timeout.is_zero() { + io.await + } else { + match tokio::time::timeout(request_timeout.get_duration(), io).await { + Ok(result) => result, + Err(_) => { + #[cfg(feature = "vsr")] + { + *consensus_session + .lock() + .expect("consensus session mutex poisoned") = + ConsensusSession::new(); + } + Err(IggyError::RequestTimeout(request_timeout)) } - Err(IggyError::RequestTimeout(request_timeout)) } } }) diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index b1e0699ecd..c1612c767e 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -821,19 +821,23 @@ impl TcpClient { Err(IggyError::NotConnected) }; - match tokio::time::timeout(request_timeout.get_duration(), io).await { - Ok(result) => result, - Err(_) => { - // Reset to prevent response desync on the shared stream. - *stream.lock().await = None; - #[cfg(feature = "vsr")] - { - *consensus_session - .lock() - .expect("consensus session mutex poisoned") = - ConsensusSession::new(); + if request_timeout.is_zero() { + io.await + } else { + match tokio::time::timeout(request_timeout.get_duration(), io).await { + Ok(result) => result, + Err(_) => { + // Reset to prevent response desync on the shared stream. + *stream.lock().await = None; + #[cfg(feature = "vsr")] + { + *consensus_session + .lock() + .expect("consensus session mutex poisoned") = + ConsensusSession::new(); + } + Err(IggyError::RequestTimeout(request_timeout)) } - Err(IggyError::RequestTimeout(request_timeout)) } } }) diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index d3f5d08d7b..0bb7cb3596 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -808,19 +808,23 @@ impl WebSocketClient { } }; - match tokio::time::timeout(self.config.request_timeout.get_duration(), io).await { - Ok(result) => result, - Err(_) => { - // Reset to prevent response desync on the shared stream. - *self.stream.lock().await = None; - #[cfg(feature = "vsr")] - { - *self - .consensus_session - .lock() - .expect("consensus session mutex poisoned") = ConsensusSession::new(); + if self.config.request_timeout.is_zero() { + io.await + } else { + match tokio::time::timeout(self.config.request_timeout.get_duration(), io).await { + Ok(result) => result, + Err(_) => { + // Reset to prevent response desync on the shared stream. + *self.stream.lock().await = None; + #[cfg(feature = "vsr")] + { + *self + .consensus_session + .lock() + .expect("consensus session mutex poisoned") = ConsensusSession::new(); + } + Err(IggyError::RequestTimeout(self.config.request_timeout)) } - Err(IggyError::RequestTimeout(self.config.request_timeout)) } } } From 9ef1a20cb659b034d6a2bb233256a90c5a82a148 Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 16 Jun 2026 01:29:25 -0400 Subject: [PATCH 05/10] fix(sdk): fix the hard-coded request_timeout in connection string --- .../quic_config/quic_client_config.rs | 2 +- .../quic_config/quic_connection_string_options.rs | 15 +++++++++++++++ .../configuration/tcp_config/tcp_client_config.rs | 2 +- .../tcp_config/tcp_connection_string_options.rs | 15 +++++++++++++++ .../websocket_config/websocket_client_config.rs | 2 +- .../websocket_connection_string_options.rs | 10 ++++++++++ 6 files changed, 43 insertions(+), 3 deletions(-) diff --git a/core/common/src/types/configuration/quic_config/quic_client_config.rs b/core/common/src/types/configuration/quic_config/quic_client_config.rs index 7350f6c04b..778c7e94bd 100644 --- a/core/common/src/types/configuration/quic_config/quic_client_config.rs +++ b/core/common/src/types/configuration/quic_config/quic_client_config.rs @@ -99,7 +99,7 @@ impl From> for QuicClientConfig { max_idle_timeout: connection_string.options().max_idle_timeout(), validate_certificate: connection_string.options().validate_certificate(), heartbeat_interval: connection_string.options().heartbeat_interval(), - request_timeout: IggyDuration::from_str("30s").unwrap(), + request_timeout: connection_string.options().request_timeout(), } } } diff --git a/core/common/src/types/configuration/quic_config/quic_connection_string_options.rs b/core/common/src/types/configuration/quic_config/quic_connection_string_options.rs index db937557e0..7dea982399 100644 --- a/core/common/src/types/configuration/quic_config/quic_connection_string_options.rs +++ b/core/common/src/types/configuration/quic_config/quic_connection_string_options.rs @@ -31,6 +31,7 @@ pub struct QuicConnectionStringOptions { max_idle_timeout: u64, validate_certificate: bool, heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, } impl QuicConnectionStringOptions { @@ -73,6 +74,10 @@ impl QuicConnectionStringOptions { pub fn validate_certificate(&self) -> bool { self.validate_certificate } + + pub fn request_timeout(&self) -> IggyDuration { + self.request_timeout + } } impl ConnectionStringOptions for QuicConnectionStringOptions { @@ -96,6 +101,7 @@ impl ConnectionStringOptions for QuicConnectionStringOptions { let mut max_idle_timeout = 10000; let mut validate_certificate = false; let mut heartbeat_interval = "5s".to_owned(); + let mut request_timeout = "30s".to_owned(); // For reconnection config let mut reconnection_max_retries = "unlimited".to_owned(); @@ -178,6 +184,9 @@ impl ConnectionStringOptions for QuicConnectionStringOptions { "heartbeat_interval" => { heartbeat_interval = option_parts[1].to_string(); } + "request_timeout" => { + request_timeout = option_parts[1].to_string(); + } "reconnection_max_retries" => { reconnection_max_retries = option_parts[1].to_string(); } @@ -211,6 +220,8 @@ impl ConnectionStringOptions for QuicConnectionStringOptions { let heartbeat_interval = IggyDuration::from_str(heartbeat_interval.as_str()) .map_err(|_| IggyError::InvalidConnectionString)?; + let request_timeout = IggyDuration::from_str(request_timeout.as_str()) + .map_err(|_| IggyError::InvalidConnectionString)?; let connection_string_options = QuicConnectionStringOptions::new( reconnection, @@ -224,6 +235,7 @@ impl ConnectionStringOptions for QuicConnectionStringOptions { max_idle_timeout, validate_certificate, heartbeat_interval, + request_timeout, ); Ok(connection_string_options) @@ -244,6 +256,7 @@ impl QuicConnectionStringOptions { max_idle_timeout: u64, validate_certificate: bool, heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, ) -> Self { Self { reconnection, @@ -257,6 +270,7 @@ impl QuicConnectionStringOptions { max_idle_timeout, validate_certificate, heartbeat_interval, + request_timeout, } } } @@ -275,6 +289,7 @@ impl Default for QuicConnectionStringOptions { max_idle_timeout: 10000, validate_certificate: false, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), } } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs index 578b70be13..573bebb686 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs @@ -76,7 +76,7 @@ impl From> for TcpClientConfig { tls_validate_certificate: true, reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), - request_timeout: IggyDuration::from_str("30s").unwrap(), + request_timeout: connection_string.options().request_timeout(), nodelay: connection_string.options().nodelay(), } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_connection_string_options.rs b/core/common/src/types/configuration/tcp_config/tcp_connection_string_options.rs index 87ebd54490..99d560a382 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_connection_string_options.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_connection_string_options.rs @@ -25,6 +25,7 @@ pub struct TcpConnectionStringOptions { tls_ca_file: Option, reconnection: TcpClientReconnectionConfig, heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, nodelay: bool, } @@ -45,6 +46,10 @@ impl TcpConnectionStringOptions { &self.reconnection } + pub fn request_timeout(&self) -> IggyDuration { + self.request_timeout + } + pub fn nodelay(&self) -> bool { self.nodelay } @@ -68,6 +73,7 @@ impl ConnectionStringOptions for TcpConnectionStringOptions { let mut reconnection_interval = "1s".to_owned(); let mut reestablish_after = "5s".to_owned(); let mut heartbeat_interval = "5s".to_owned(); + let mut request_timeout = "30s".to_owned(); let mut nodelay = false; for option in options { @@ -97,6 +103,9 @@ impl ConnectionStringOptions for TcpConnectionStringOptions { "heartbeat_interval" => { heartbeat_interval = option_parts[1].to_string(); } + "request_timeout" => { + request_timeout = option_parts[1].to_string(); + } "nodelay" => { nodelay = option_parts[1] == "true"; } @@ -124,6 +133,8 @@ impl ConnectionStringOptions for TcpConnectionStringOptions { let heartbeat_interval = IggyDuration::from_str(heartbeat_interval.as_str()) .map_err(|_| IggyError::InvalidConnectionString)?; + let request_timeout = IggyDuration::from_str(request_timeout.as_str()) + .map_err(|_| IggyError::InvalidConnectionString)?; let connection_string_options = TcpConnectionStringOptions::new( tls_enabled, @@ -131,6 +142,7 @@ impl ConnectionStringOptions for TcpConnectionStringOptions { tls_ca_file, reconnection, heartbeat_interval, + request_timeout, nodelay, ); @@ -145,6 +157,7 @@ impl TcpConnectionStringOptions { tls_ca_file: Option, reconnection: TcpClientReconnectionConfig, heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, nodelay: bool, ) -> Self { Self { @@ -153,6 +166,7 @@ impl TcpConnectionStringOptions { tls_ca_file, reconnection, heartbeat_interval, + request_timeout, nodelay, } } @@ -166,6 +180,7 @@ impl Default for TcpConnectionStringOptions { tls_ca_file: None, reconnection: Default::default(), heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), nodelay: false, } } diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs index f50183183f..cc8ca14937 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs @@ -159,7 +159,7 @@ impl From> for WebSocketClien auto_login: connection_string.auto_login().to_owned(), reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), - request_timeout: IggyDuration::from_str("30s").unwrap(), + request_timeout: connection_string.options().request_timeout(), ws_config, tls_enabled: options.tls_enabled(), tls_domain: options.tls_domain().into(), diff --git a/core/common/src/types/configuration/websocket_config/websocket_connection_string_options.rs b/core/common/src/types/configuration/websocket_config/websocket_connection_string_options.rs index 85c5fb2053..6f63c1acfa 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_connection_string_options.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_connection_string_options.rs @@ -21,6 +21,7 @@ use std::str::FromStr; #[derive(Debug, Clone)] pub struct WebSocketConnectionStringOptions { heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, reconnection: WebSocketClientReconnectionConfig, read_buffer_size: Option, @@ -42,6 +43,10 @@ impl WebSocketConnectionStringOptions { self.heartbeat_interval } + pub fn request_timeout(&self) -> IggyDuration { + self.request_timeout + } + pub fn reconnection(&self) -> &WebSocketClientReconnectionConfig { &self.reconnection } @@ -114,6 +119,10 @@ impl ConnectionStringOptions for WebSocketConnectionStringOptions { parsed_options.heartbeat_interval = IggyDuration::from_str(parts[1]) .map_err(|_| IggyError::InvalidConnectionString)?; } + "request_timeout" => { + parsed_options.request_timeout = IggyDuration::from_str(parts[1]) + .map_err(|_| IggyError::InvalidConnectionString)?; + } "reconnection_retries" => { let retries = match parts[1] { "unlimited" => None, @@ -193,6 +202,7 @@ impl Default for WebSocketConnectionStringOptions { fn default() -> Self { WebSocketConnectionStringOptions { heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), reconnection: WebSocketClientReconnectionConfig::default(), read_buffer_size: None, write_buffer_size: None, From e847d7d4fb82b6d54646100436729953a7447938 Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 16 Jun 2026 18:16:03 -0400 Subject: [PATCH 06/10] fix(sdk): return error instead of panicking on invalid duration args --- core/common/src/error/client_error.rs | 3 ++ core/sdk/src/client_provider.rs | 42 +++++++++++++-------------- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/core/common/src/error/client_error.rs b/core/common/src/error/client_error.rs index 2517526b47..09448000be 100644 --- a/core/common/src/error/client_error.rs +++ b/core/common/src/error/client_error.rs @@ -29,6 +29,9 @@ pub enum ClientError { /// Transport is invalid and cannot be used. #[error("Invalid transport {0}")] InvalidTransport(String), + /// Duration value is invalid. + #[error("Invalid duration: {0}")] + InvalidDuration(String), /// IO error. #[error("IO error")] IoError(#[from] io::Error), diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index 60f55e1f9b..390bd6057e 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -86,22 +86,25 @@ impl ClientProviderConfig { tcp: None, websocket: None, }; + let parse_duration = |value: &str| -> Result { + IggyDuration::from_str(value) + .map_err(|_| ClientError::InvalidDuration(value.to_owned())) + }; + match config.transport { TransportProtocol::Quic => { config.quic = Some(Arc::new(QuicClientConfig { client_address: args.quic_client_address, server_address: args.quic_server_address, server_name: args.quic_server_name, - heartbeat_interval: IggyDuration::from_str(&args.quic_heartbeat_interval) - .unwrap(), + heartbeat_interval: parse_duration(&args.quic_heartbeat_interval)?, reconnection: QuicClientReconnectionConfig { enabled: args.quic_reconnection_enabled, max_retries: args.quic_reconnection_max_retries, - interval: IggyDuration::from_str(&args.quic_reconnection_interval).unwrap(), - reestablish_after: IggyDuration::from_str( + interval: parse_duration(&args.quic_reconnection_interval)?, + reestablish_after: parse_duration( &args.quic_reconnection_reestablish_after, - ) - .unwrap(), + )?, }, auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( @@ -120,7 +123,7 @@ impl ClientProviderConfig { keep_alive_interval: args.quic_keep_alive_interval, max_idle_timeout: args.quic_max_idle_timeout, validate_certificate: args.quic_validate_certificate, - request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(), + request_timeout: parse_duration(&args.request_timeout)?, })); } TransportProtocol::Http => { @@ -138,18 +141,16 @@ impl ClientProviderConfig { tls_ca_file: args.tcp_tls_ca_file, tls_validate_certificate: true, nodelay: args.tcp_nodelay, - heartbeat_interval: IggyDuration::from_str(&args.tcp_heartbeat_interval) - .unwrap(), + heartbeat_interval: parse_duration(&args.tcp_heartbeat_interval)?, reconnection: TcpClientReconnectionConfig { enabled: args.tcp_reconnection_enabled, max_retries: args.tcp_reconnection_max_retries, - interval: IggyDuration::from_str(&args.tcp_reconnection_interval).unwrap(), - reestablish_after: IggyDuration::from_str( + interval: parse_duration(&args.tcp_reconnection_interval)?, + reestablish_after: parse_duration( &args.tcp_reconnection_reestablish_after, - ) - .unwrap(), + )?, }, - request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(), + request_timeout: parse_duration(&args.request_timeout)?, auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( args.username, @@ -163,17 +164,14 @@ impl ClientProviderConfig { TransportProtocol::WebSocket => { config.websocket = Some(Arc::new(WebSocketClientConfig { server_address: args.websocket_server_address, - heartbeat_interval: IggyDuration::from_str(&args.websocket_heartbeat_interval) - .unwrap(), + heartbeat_interval: parse_duration(&args.websocket_heartbeat_interval)?, reconnection: WebSocketClientReconnectionConfig { enabled: args.websocket_reconnection_enabled, max_retries: args.websocket_reconnection_max_retries, - interval: IggyDuration::from_str(&args.websocket_reconnection_interval) - .unwrap(), - reestablish_after: IggyDuration::from_str( + interval: parse_duration(&args.websocket_reconnection_interval)?, + reestablish_after: parse_duration( &args.websocket_reconnection_reestablish_after, - ) - .unwrap(), + )?, }, auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( @@ -183,7 +181,7 @@ impl ClientProviderConfig { } else { AutoLogin::Disabled }, - request_timeout: IggyDuration::from_str(&args.request_timeout).unwrap(), + request_timeout: parse_duration(&args.request_timeout)?, ws_config: WebSocketConfig::default(), tls_enabled: args.websocket_tls_enabled, tls_domain: args.websocket_tls_domain, From 8378bd84421ca3acfa039b1f7be13a6c9ca78a93 Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 16 Jun 2026 19:44:50 -0400 Subject: [PATCH 07/10] test: test the duration args validation --- core/sdk/src/client_provider.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index 390bd6057e..12010d3f8d 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -250,3 +250,33 @@ pub async fn get_raw_client( } } } + +#[cfg(test)] +mod tests { + use super::*; + use iggy_common::Args; + + #[test] + fn from_args_should_fail_on_invalid_request_timeout() { + let args = Args { + request_timeout: "not-a-duration".to_string(), + ..Default::default() + }; + let result = ClientProviderConfig::from_args(args); + assert!(matches!(result, Err(ClientError::InvalidDuration(_)))); + } + + #[test] + fn from_args_should_succeed_with_valid_request_timeout() { + let args = Args { + request_timeout: "10s".to_string(), + ..Default::default() + }; + let config = ClientProviderConfig::from_args(args).unwrap(); + let tcp_config = config.tcp.unwrap(); + assert_eq!( + tcp_config.request_timeout, + IggyDuration::from_str("10s").unwrap() + ); + } +} From b08a9a2594df580e0c4d92298d2d84f927a3ea3a Mon Sep 17 00:00:00 2001 From: chengxi Date: Sat, 20 Jun 2026 18:18:10 -0400 Subject: [PATCH 08/10] cargo fmt --- core/sdk/src/websocket/websocket_client.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 0bb7cb3596..02e8951167 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -713,7 +713,8 @@ impl WebSocketClient { let response_deadline = tokio::time::Instant::now() + RESPONSE_READ_TIMEOUT; let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; let header_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)).await; + tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)) + .await; let Ok(header_read) = header_read else { error!( "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for {NAME} VSR response header for request with code: {code}" @@ -740,10 +741,10 @@ impl WebSocketClient { return Err(IggyError::Disconnected); }; body_read?; - Bytes::from(body) - } else { - Bytes::new() - }; + Bytes::from(body) + } else { + Bytes::new() + }; crate::vsr::decode_response_split(&response_header, body) } From ee779fce5682b003cff48c1f08c8953e95e7639e Mon Sep 17 00:00:00 2001 From: chengxi Date: Sun, 21 Jun 2026 00:37:22 -0400 Subject: [PATCH 09/10] fix(sdk): use request_timeout instead of inner VSR timeout --- core/sdk/src/tcp/tcp_client.rs | 80 ++++++---------------- core/sdk/src/websocket/websocket_client.rs | 43 +----------- 2 files changed, 24 insertions(+), 99 deletions(-) diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index c1612c767e..47525b14b4 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -57,12 +57,6 @@ const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; #[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "Iggy"; -/// Upper bound for awaiting a reply on the lockstep VSR connection. Far -/// beyond any healthy round-trip; only trips when the server loses the -/// reply entirely (e.g. stalled replication quorum), which would otherwise -/// hold the stream lock forever and wedge the client. -#[cfg(feature = "vsr")] -const RESPONSE_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); /// TCP client for interacting with the Iggy API. /// It requires a valid server address. @@ -720,64 +714,34 @@ impl TcpClient { #[cfg(feature = "vsr")] { let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; - // `stream.read` delegates to `read_exact`; on success it - // always returns the requested length, so no short-read - // guard is needed here. - // - // Deadline guards against server-side reply loss (e.g. a - // stalled replication quorum that never commits the op): - // the connection is lockstep, so an unanswered read would - // hold the stream lock forever and wedge every later - // request on this client. On expiry drop the stream -- - // a late reply would desync framing for the next request. - // - // One deadline spans BOTH the header and body reads: a - // reply that delivers a header then stalls must not get a - // fresh full timeout for the body (which would allow up to - // 2x `RESPONSE_READ_TIMEOUT` total). - let response_deadline = tokio::time::Instant::now() + RESPONSE_READ_TIMEOUT; - let header_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)) - .await; - let Ok(header_read) = header_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for VSR response header for TCP request with code: {code}", - ); - return Err(IggyError::Disconnected); - }; - header_read.map_err(|error| { - error!( - "Failed to read VSR response header for TCP request with code: {code}: {error}", - code = code, - error = error - ); - IggyError::Disconnected - })?; - - let response_size = crate::vsr::response_size(&response_header)?; - - let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; - let body = if body_size > 0 { - let mut body = BytesMut::with_capacity(body_size); - let body_read = tokio::time::timeout_at( - response_deadline, - stream.read_buf(&mut body, body_size), - ) - .await; - let Ok(body_read) = body_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for VSR response body for TCP request with code: {code}", - ); - return Err(IggyError::Disconnected); - }; - body_read.map_err(|error| { + stream + .read(&mut response_header) + .await + .map_err(|error| { error!( - "Failed to read VSR response body for TCP request with code: {code}: {error}", + "Failed to read VSR response header for TCP request with code: {code}: {error}", code = code, error = error ); IggyError::Disconnected })?; + + let response_size = crate::vsr::response_size(&response_header)?; + + let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; + let body = if body_size > 0 { + let mut body = BytesMut::with_capacity(body_size); + stream + .read_buf(&mut body, body_size) + .await + .map_err(|error| { + error!( + "Failed to read VSR response body for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; body.freeze() } else { Bytes::new() diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 02e8951167..4ce40f2e3c 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -59,12 +59,6 @@ const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; #[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "WebSocket"; -/// Bound on how long a single VSR reply read may block. The connection is -/// lockstep and the read runs in the caller's task while holding the stream -/// lock, so an unanswered read (lost server reply) would wedge every later -/// request on this client forever. On expiry the stream is dropped. -#[cfg(feature = "vsr")] -const RESPONSE_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); #[derive(Debug)] pub struct WebSocketClient { @@ -700,47 +694,14 @@ impl WebSocketClient { #[cfg(feature = "vsr")] { - // Mirror the TCP path: header onto stack, body into its own - // buffer, `decode_response_split` slices without concatenation. - // Old path did `vec![0; HEADER_SIZE]` + `vec![0; body_size]` - // (two zero-fills) + `BytesMut::with_capacity(response_size)` + - // two `put_slice` memcopies per reply. - // One deadline spans both the header and body reads so a reply - // that delivers a header then stalls cannot wait up to 2x the - // timeout. On expiry drop the stream: the read runs inline here - // (no spawned task to cancel), so without an explicit drop a late - // reply would desync framing for the next request. - let response_deadline = tokio::time::Instant::now() + RESPONSE_READ_TIMEOUT; let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; - let header_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)) - .await; - let Ok(header_read) = header_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for {NAME} VSR response header for request with code: {code}" - ); - *stream_guard = None; - return Err(IggyError::Disconnected); - }; - header_read?; + stream.read(&mut response_header).await?; let response_size = crate::vsr::response_size(&response_header)?; let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; let body = if body_size > 0 { - // `WebSocketStreamKind::read` reads into a slice without a - // zero-fill prerequisite; we still allocate `body_size` but - // skip the header concatenation. let mut body = vec![0u8; body_size]; - let body_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut body)).await; - let Ok(body_read) = body_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for {NAME} VSR response body for request with code: {code}" - ); - *stream_guard = None; - return Err(IggyError::Disconnected); - }; - body_read?; + stream.read(&mut body).await?; Bytes::from(body) } else { Bytes::new() From 06588b9033c835807cc922f49df91cb29d91d788 Mon Sep 17 00:00:00 2001 From: chengxi Date: Sun, 21 Jun 2026 00:37:51 -0400 Subject: [PATCH 10/10] fix(sdk): change client to Disconnected state on RequestTimeout --- core/sdk/src/tcp/tcp_client.rs | 5 ++++- core/sdk/src/websocket/websocket_client.rs | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 47525b14b4..c2a7a12836 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -811,7 +811,10 @@ impl TcpClient { IggyError::TcpError })?; - if matches!(result, Err(IggyError::Disconnected)) { + if matches!( + result, + Err(IggyError::Disconnected) | Err(IggyError::RequestTimeout(_)) + ) { // Reply stream state is unknown (timed out or torn mid-frame); // a late reply would desync framing for the next request, so // drop the connection and let callers reconnect. diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 4ce40f2e3c..a6a38ff55b 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -778,6 +778,7 @@ impl WebSocketClient { Err(_) => { // Reset to prevent response desync on the shared stream. *self.stream.lock().await = None; + self.set_state(ClientState::Disconnected).await; #[cfg(feature = "vsr")] { *self