diff --git a/core/common/src/error/client_error.rs b/core/common/src/error/client_error.rs index 2517526b47..09448000be 100644 --- a/core/common/src/error/client_error.rs +++ b/core/common/src/error/client_error.rs @@ -29,6 +29,9 @@ pub enum ClientError { /// Transport is invalid and cannot be used. #[error("Invalid transport {0}")] InvalidTransport(String), + /// Duration value is invalid. + #[error("Invalid duration: {0}")] + InvalidDuration(String), /// IO error. #[error("IO error")] IoError(#[from] io::Error), diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index 816d8fac57..0bf203cd1a 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -16,6 +16,7 @@ // under the License. use crate::Identifier; +use crate::IggyDuration; use crate::utils::topic_size::MaxTopicSize; use crate::{IggyMessage, utils::byte_size::IggyByteSize}; use std::sync::Arc; @@ -502,6 +503,8 @@ pub enum IggyError { CannotBindToSocket(String) = 12000, #[error("Task execution timeout")] TaskTimeout = 12001, + #[error("Request timeout after {0}")] + RequestTimeout(IggyDuration) = 12002, #[error("IO error: {0}")] IoError(String) = 13000, diff --git a/core/common/src/types/args/mod.rs b/core/common/src/types/args/mod.rs index f60b509740..d9ea93167b 100644 --- a/core/common/src/types/args/mod.rs +++ b/core/common/src/types/args/mod.rs @@ -218,6 +218,13 @@ pub struct ArgsOptional { #[arg(long)] #[serde(skip_serializing_if = "Option::is_none")] pub websocket_reconnection_interval: Option, + + /// The optional per-request timeout for send/receive operations + /// + /// [default: "30s"] + #[arg(long)] + #[serde(skip_serializing_if = "Option::is_none")] + pub request_timeout: Option, } /// The arguments used by the `ClientProviderConfig` to create a client. @@ -351,6 +358,9 @@ pub struct Args { /// The optional TLS validate certificate for the WebSocket transport pub websocket_tls_validate_certificate: bool, + + /// The per-request timeout for send/receive operations + pub request_timeout: String, } const QUIC_TRANSPORT: &str = "quic"; @@ -416,6 +426,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, + request_timeout: "30s".to_string(), } } } @@ -517,6 +528,9 @@ impl From> for Args { { args.websocket_reconnection_interval = websocket_reconnection_interval; } + if let Some(request_timeout) = optional_args.request_timeout { + args.request_timeout = request_timeout; + } } args diff --git a/core/common/src/types/configuration/quic_config/quic_client_config.rs b/core/common/src/types/configuration/quic_config/quic_client_config.rs index b6e2b95e52..778c7e94bd 100644 --- a/core/common/src/types/configuration/quic_config/quic_client_config.rs +++ b/core/common/src/types/configuration/quic_config/quic_client_config.rs @@ -54,6 +54,8 @@ pub struct QuicClientConfig { pub validate_certificate: bool, /// Interval of heartbeats sent by the client pub heartbeat_interval: IggyDuration, + /// Per-request timeout for send/receive operations. + pub request_timeout: IggyDuration, } impl Default for QuicClientConfig { @@ -64,6 +66,7 @@ impl Default for QuicClientConfig { server_name: "localhost".to_string(), auto_login: AutoLogin::Disabled, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), reconnection: QuicClientReconnectionConfig::default(), response_buffer_size: 1000 * 1000 * 10, max_concurrent_bidi_streams: 10000, @@ -96,6 +99,7 @@ impl From> for QuicClientConfig { max_idle_timeout: connection_string.options().max_idle_timeout(), validate_certificate: connection_string.options().validate_certificate(), heartbeat_interval: connection_string.options().heartbeat_interval(), + request_timeout: connection_string.options().request_timeout(), } } } diff --git a/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs b/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs index 9af4e85024..62ec1a2431 100644 --- a/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs +++ b/core/common/src/types/configuration/quic_config/quic_client_config_builder.rs @@ -152,6 +152,12 @@ impl QuicClientConfigBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config.request_timeout = request_timeout; + self + } + /// Finalizes the builder and returns the `QuicClientConfig`. pub fn build(mut self) -> Result { self.config.server_address = self.config.server_address.trim().to_owned(); @@ -164,6 +170,7 @@ impl QuicClientConfigBuilder { #[cfg(test)] mod tests { use super::*; + use std::str::FromStr; #[test] fn build_should_trim_and_validate_server_address() { @@ -193,4 +200,16 @@ mod tests { assert!(result.is_err()); } + + #[test] + fn with_request_timeout_should_override_default() { + let config = QuicClientConfigBuilder::default() + .with_request_timeout(IggyDuration::from_str("60s").unwrap()) + .build() + .unwrap(); + assert_eq!( + config.request_timeout, + IggyDuration::from_str("60s").unwrap() + ); + } } diff --git a/core/common/src/types/configuration/quic_config/quic_connection_string_options.rs b/core/common/src/types/configuration/quic_config/quic_connection_string_options.rs index db937557e0..7dea982399 100644 --- a/core/common/src/types/configuration/quic_config/quic_connection_string_options.rs +++ b/core/common/src/types/configuration/quic_config/quic_connection_string_options.rs @@ -31,6 +31,7 @@ pub struct QuicConnectionStringOptions { max_idle_timeout: u64, validate_certificate: bool, heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, } impl QuicConnectionStringOptions { @@ -73,6 +74,10 @@ impl QuicConnectionStringOptions { pub fn validate_certificate(&self) -> bool { self.validate_certificate } + + pub fn request_timeout(&self) -> IggyDuration { + self.request_timeout + } } impl ConnectionStringOptions for QuicConnectionStringOptions { @@ -96,6 +101,7 @@ impl ConnectionStringOptions for QuicConnectionStringOptions { let mut max_idle_timeout = 10000; let mut validate_certificate = false; let mut heartbeat_interval = "5s".to_owned(); + let mut request_timeout = "30s".to_owned(); // For reconnection config let mut reconnection_max_retries = "unlimited".to_owned(); @@ -178,6 +184,9 @@ impl ConnectionStringOptions for QuicConnectionStringOptions { "heartbeat_interval" => { heartbeat_interval = option_parts[1].to_string(); } + "request_timeout" => { + request_timeout = option_parts[1].to_string(); + } "reconnection_max_retries" => { reconnection_max_retries = option_parts[1].to_string(); } @@ -211,6 +220,8 @@ impl ConnectionStringOptions for QuicConnectionStringOptions { let heartbeat_interval = IggyDuration::from_str(heartbeat_interval.as_str()) .map_err(|_| IggyError::InvalidConnectionString)?; + let request_timeout = IggyDuration::from_str(request_timeout.as_str()) + .map_err(|_| IggyError::InvalidConnectionString)?; let connection_string_options = QuicConnectionStringOptions::new( reconnection, @@ -224,6 +235,7 @@ impl ConnectionStringOptions for QuicConnectionStringOptions { max_idle_timeout, validate_certificate, heartbeat_interval, + request_timeout, ); Ok(connection_string_options) @@ -244,6 +256,7 @@ impl QuicConnectionStringOptions { max_idle_timeout: u64, validate_certificate: bool, heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, ) -> Self { Self { reconnection, @@ -257,6 +270,7 @@ impl QuicConnectionStringOptions { max_idle_timeout, validate_certificate, heartbeat_interval, + request_timeout, } } } @@ -275,6 +289,7 @@ impl Default for QuicConnectionStringOptions { max_idle_timeout: 10000, validate_certificate: false, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), } } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs index 86997c6d7a..573bebb686 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config.rs @@ -41,6 +41,8 @@ pub struct TcpClientConfig { pub reconnection: TcpClientReconnectionConfig, /// Interval of heartbeats sent by the client pub heartbeat_interval: IggyDuration, + /// Per-request timeout for send/receive operations. + pub request_timeout: IggyDuration, /// Disable Nagle algorithm for the TCP socket. pub nodelay: bool, } @@ -54,6 +56,7 @@ impl Default for TcpClientConfig { tls_ca_file: None, tls_validate_certificate: true, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), auto_login: AutoLogin::Disabled, reconnection: TcpClientReconnectionConfig::default(), nodelay: false, @@ -73,6 +76,7 @@ impl From> for TcpClientConfig { tls_validate_certificate: true, reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), + request_timeout: connection_string.options().request_timeout(), nodelay: connection_string.options().nodelay(), } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs index 6f665a5777..54944ad4a1 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_client_config_builder.rs @@ -101,6 +101,12 @@ impl TcpClientConfigBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config.request_timeout = request_timeout; + self + } + /// Builds the TCP client configuration. pub fn build(mut self) -> Result { self.config.server_address = self.config.server_address.trim().to_owned(); @@ -114,6 +120,7 @@ impl TcpClientConfigBuilder { mod tests { use super::*; use crate::IggyError; + use std::str::FromStr; fn builder_with_address(addr: &str) -> TcpClientConfigBuilder { let mut builder = TcpClientConfigBuilder::default(); @@ -187,4 +194,16 @@ mod tests { let builder = builder_with_address("iggy-server:8090"); assert!(builder.build().is_ok()); } + + #[test] + fn with_request_timeout_should_override_default() { + let config = TcpClientConfigBuilder::default() + .with_request_timeout(IggyDuration::from_str("60s").unwrap()) + .build() + .unwrap(); + assert_eq!( + config.request_timeout, + IggyDuration::from_str("60s").unwrap() + ); + } } diff --git a/core/common/src/types/configuration/tcp_config/tcp_connection_string_options.rs b/core/common/src/types/configuration/tcp_config/tcp_connection_string_options.rs index 87ebd54490..99d560a382 100644 --- a/core/common/src/types/configuration/tcp_config/tcp_connection_string_options.rs +++ b/core/common/src/types/configuration/tcp_config/tcp_connection_string_options.rs @@ -25,6 +25,7 @@ pub struct TcpConnectionStringOptions { tls_ca_file: Option, reconnection: TcpClientReconnectionConfig, heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, nodelay: bool, } @@ -45,6 +46,10 @@ impl TcpConnectionStringOptions { &self.reconnection } + pub fn request_timeout(&self) -> IggyDuration { + self.request_timeout + } + pub fn nodelay(&self) -> bool { self.nodelay } @@ -68,6 +73,7 @@ impl ConnectionStringOptions for TcpConnectionStringOptions { let mut reconnection_interval = "1s".to_owned(); let mut reestablish_after = "5s".to_owned(); let mut heartbeat_interval = "5s".to_owned(); + let mut request_timeout = "30s".to_owned(); let mut nodelay = false; for option in options { @@ -97,6 +103,9 @@ impl ConnectionStringOptions for TcpConnectionStringOptions { "heartbeat_interval" => { heartbeat_interval = option_parts[1].to_string(); } + "request_timeout" => { + request_timeout = option_parts[1].to_string(); + } "nodelay" => { nodelay = option_parts[1] == "true"; } @@ -124,6 +133,8 @@ impl ConnectionStringOptions for TcpConnectionStringOptions { let heartbeat_interval = IggyDuration::from_str(heartbeat_interval.as_str()) .map_err(|_| IggyError::InvalidConnectionString)?; + let request_timeout = IggyDuration::from_str(request_timeout.as_str()) + .map_err(|_| IggyError::InvalidConnectionString)?; let connection_string_options = TcpConnectionStringOptions::new( tls_enabled, @@ -131,6 +142,7 @@ impl ConnectionStringOptions for TcpConnectionStringOptions { tls_ca_file, reconnection, heartbeat_interval, + request_timeout, nodelay, ); @@ -145,6 +157,7 @@ impl TcpConnectionStringOptions { tls_ca_file: Option, reconnection: TcpClientReconnectionConfig, heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, nodelay: bool, ) -> Self { Self { @@ -153,6 +166,7 @@ impl TcpConnectionStringOptions { tls_ca_file, reconnection, heartbeat_interval, + request_timeout, nodelay, } } @@ -166,6 +180,7 @@ impl Default for TcpConnectionStringOptions { tls_ca_file: None, reconnection: Default::default(), heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), nodelay: false, } } diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs index d93c168a95..cc8ca14937 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config.rs @@ -33,6 +33,8 @@ pub struct WebSocketClientConfig { pub reconnection: WebSocketClientReconnectionConfig, /// Interval of heartbeats sent by the client pub heartbeat_interval: IggyDuration, + /// Per-request timeout for send/receive operations. + pub request_timeout: IggyDuration, /// WebSocket-specific configuration. pub ws_config: WebSocketConfig, /// Whether tls is enabled @@ -70,6 +72,7 @@ impl Default for WebSocketClientConfig { auto_login: AutoLogin::Disabled, reconnection: WebSocketClientReconnectionConfig::default(), heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), ws_config: WebSocketConfig::default(), tls_enabled: false, tls_domain: "localhost".to_string(), @@ -156,6 +159,7 @@ impl From> for WebSocketClien auto_login: connection_string.auto_login().to_owned(), reconnection: connection_string.options().reconnection().to_owned(), heartbeat_interval: connection_string.options().heartbeat_interval(), + request_timeout: connection_string.options().request_timeout(), ws_config, tls_enabled: options.tls_enabled(), tls_domain: options.tls_domain().into(), diff --git a/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs b/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs index 626c0e683e..1b1f16d0ba 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_client_config_builder.rs @@ -136,6 +136,12 @@ impl WebSocketClientConfigBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config.request_timeout = request_timeout; + self + } + /// Builds the WebSocket client configuration. pub fn build(mut self) -> Result { self.config.server_address = self.config.server_address.trim().to_owned(); @@ -148,6 +154,7 @@ impl WebSocketClientConfigBuilder { #[cfg(test)] mod tests { use super::*; + use std::str::FromStr; #[test] fn build_should_trim_and_validate_server_address() { @@ -177,4 +184,16 @@ mod tests { assert!(result.is_err()); } + + #[test] + fn with_request_timeout_should_override_default() { + let config = WebSocketClientConfigBuilder::default() + .with_request_timeout(IggyDuration::from_str("60s").unwrap()) + .build() + .unwrap(); + assert_eq!( + config.request_timeout, + IggyDuration::from_str("60s").unwrap() + ); + } } diff --git a/core/common/src/types/configuration/websocket_config/websocket_connection_string_options.rs b/core/common/src/types/configuration/websocket_config/websocket_connection_string_options.rs index 85c5fb2053..6f63c1acfa 100644 --- a/core/common/src/types/configuration/websocket_config/websocket_connection_string_options.rs +++ b/core/common/src/types/configuration/websocket_config/websocket_connection_string_options.rs @@ -21,6 +21,7 @@ use std::str::FromStr; #[derive(Debug, Clone)] pub struct WebSocketConnectionStringOptions { heartbeat_interval: IggyDuration, + request_timeout: IggyDuration, reconnection: WebSocketClientReconnectionConfig, read_buffer_size: Option, @@ -42,6 +43,10 @@ impl WebSocketConnectionStringOptions { self.heartbeat_interval } + pub fn request_timeout(&self) -> IggyDuration { + self.request_timeout + } + pub fn reconnection(&self) -> &WebSocketClientReconnectionConfig { &self.reconnection } @@ -114,6 +119,10 @@ impl ConnectionStringOptions for WebSocketConnectionStringOptions { parsed_options.heartbeat_interval = IggyDuration::from_str(parts[1]) .map_err(|_| IggyError::InvalidConnectionString)?; } + "request_timeout" => { + parsed_options.request_timeout = IggyDuration::from_str(parts[1]) + .map_err(|_| IggyError::InvalidConnectionString)?; + } "reconnection_retries" => { let retries = match parts[1] { "unlimited" => None, @@ -193,6 +202,7 @@ impl Default for WebSocketConnectionStringOptions { fn default() -> Self { WebSocketConnectionStringOptions { heartbeat_interval: IggyDuration::from_str("5s").unwrap(), + request_timeout: IggyDuration::from_str("30s").unwrap(), reconnection: WebSocketClientReconnectionConfig::default(), read_buffer_size: None, write_buffer_size: None, diff --git a/core/integration/tests/cli/general/test_help_command.rs b/core/integration/tests/cli/general/test_help_command.rs index 91a55099b8..266bd9da9c 100644 --- a/core/integration/tests/cli/general/test_help_command.rs +++ b/core/integration/tests/cli/general/test_help_command.rs @@ -190,6 +190,11 @@ Options: {CLAP_INDENT} [default: "1s"] + --request-timeout + The optional per-request timeout for send/receive operations +{CLAP_INDENT} + [default: "30s"] + -q, --quiet Quiet mode (disabled stdout printing) diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index 423b7048ea..12010d3f8d 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -86,22 +86,25 @@ impl ClientProviderConfig { tcp: None, websocket: None, }; + let parse_duration = |value: &str| -> Result { + IggyDuration::from_str(value) + .map_err(|_| ClientError::InvalidDuration(value.to_owned())) + }; + match config.transport { TransportProtocol::Quic => { config.quic = Some(Arc::new(QuicClientConfig { client_address: args.quic_client_address, server_address: args.quic_server_address, server_name: args.quic_server_name, - heartbeat_interval: IggyDuration::from_str(&args.quic_heartbeat_interval) - .unwrap(), + heartbeat_interval: parse_duration(&args.quic_heartbeat_interval)?, reconnection: QuicClientReconnectionConfig { enabled: args.quic_reconnection_enabled, max_retries: args.quic_reconnection_max_retries, - interval: IggyDuration::from_str(&args.quic_reconnection_interval).unwrap(), - reestablish_after: IggyDuration::from_str( + interval: parse_duration(&args.quic_reconnection_interval)?, + reestablish_after: parse_duration( &args.quic_reconnection_reestablish_after, - ) - .unwrap(), + )?, }, auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( @@ -120,6 +123,7 @@ impl ClientProviderConfig { keep_alive_interval: args.quic_keep_alive_interval, max_idle_timeout: args.quic_max_idle_timeout, validate_certificate: args.quic_validate_certificate, + request_timeout: parse_duration(&args.request_timeout)?, })); } TransportProtocol::Http => { @@ -137,17 +141,16 @@ impl ClientProviderConfig { tls_ca_file: args.tcp_tls_ca_file, tls_validate_certificate: true, nodelay: args.tcp_nodelay, - heartbeat_interval: IggyDuration::from_str(&args.tcp_heartbeat_interval) - .unwrap(), + heartbeat_interval: parse_duration(&args.tcp_heartbeat_interval)?, reconnection: TcpClientReconnectionConfig { enabled: args.tcp_reconnection_enabled, max_retries: args.tcp_reconnection_max_retries, - interval: IggyDuration::from_str(&args.tcp_reconnection_interval).unwrap(), - reestablish_after: IggyDuration::from_str( + interval: parse_duration(&args.tcp_reconnection_interval)?, + reestablish_after: parse_duration( &args.tcp_reconnection_reestablish_after, - ) - .unwrap(), + )?, }, + request_timeout: parse_duration(&args.request_timeout)?, auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( args.username, @@ -161,17 +164,14 @@ impl ClientProviderConfig { TransportProtocol::WebSocket => { config.websocket = Some(Arc::new(WebSocketClientConfig { server_address: args.websocket_server_address, - heartbeat_interval: IggyDuration::from_str(&args.websocket_heartbeat_interval) - .unwrap(), + heartbeat_interval: parse_duration(&args.websocket_heartbeat_interval)?, reconnection: WebSocketClientReconnectionConfig { enabled: args.websocket_reconnection_enabled, max_retries: args.websocket_reconnection_max_retries, - interval: IggyDuration::from_str(&args.websocket_reconnection_interval) - .unwrap(), - reestablish_after: IggyDuration::from_str( + interval: parse_duration(&args.websocket_reconnection_interval)?, + reestablish_after: parse_duration( &args.websocket_reconnection_reestablish_after, - ) - .unwrap(), + )?, }, auto_login: if auto_login { AutoLogin::Enabled(Credentials::UsernamePassword( @@ -181,6 +181,7 @@ impl ClientProviderConfig { } else { AutoLogin::Disabled }, + request_timeout: parse_duration(&args.request_timeout)?, ws_config: WebSocketConfig::default(), tls_enabled: args.websocket_tls_enabled, tls_domain: args.websocket_tls_domain, @@ -249,3 +250,33 @@ pub async fn get_raw_client( } } } + +#[cfg(test)] +mod tests { + use super::*; + use iggy_common::Args; + + #[test] + fn from_args_should_fail_on_invalid_request_timeout() { + let args = Args { + request_timeout: "not-a-duration".to_string(), + ..Default::default() + }; + let result = ClientProviderConfig::from_args(args); + assert!(matches!(result, Err(ClientError::InvalidDuration(_)))); + } + + #[test] + fn from_args_should_succeed_with_valid_request_timeout() { + let args = Args { + request_timeout: "10s".to_string(), + ..Default::default() + }; + let config = ClientProviderConfig::from_args(args).unwrap(); + let tcp_config = config.tcp.unwrap(); + assert_eq!( + tcp_config.request_timeout, + IggyDuration::from_str("10s").unwrap() + ); + } +} diff --git a/core/sdk/src/clients/client_builder.rs b/core/sdk/src/clients/client_builder.rs index 0af473b554..f07e6db5fa 100644 --- a/core/sdk/src/clients/client_builder.rs +++ b/core/sdk/src/clients/client_builder.rs @@ -219,6 +219,12 @@ impl TcpClientBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config = self.config.with_request_timeout(request_timeout); + self + } + /// Builds the parent `IggyClient` with TCP configuration. pub fn build(self) -> Result { let client = TcpClient::create(Arc::new(self.config.build()?))?; @@ -277,6 +283,12 @@ impl QuicClientBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config = self.config.with_request_timeout(request_timeout); + self + } + /// Builds the parent `IggyClient` with QUIC configuration. pub fn build(self) -> Result { let client = QuicClient::create(Arc::new(self.config.build()?))?; @@ -390,6 +402,12 @@ impl WebSocketClientBuilder { self } + /// Sets the per-request timeout for send/receive operations. + pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self { + self.config = self.config.with_request_timeout(request_timeout); + self + } + /// Builds the parent `IggyClient` with WebSocket configuration. pub fn build(self) -> Result { let client = WebSocketClient::create(Arc::new(self.config.build()?))?; diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 2aacab7185..e58b8eaa85 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -642,78 +642,99 @@ impl QuicClient { let connection = self.connection.clone(); let response_buffer_size = self.config.response_buffer_size; + let request_timeout = self.config.request_timeout; #[cfg(feature = "vsr")] let consensus_session = self.consensus_session.clone(); - // SAFETY: we run code holding the `connection` lock in a task so we can't be cancelled while holding the lock. tokio::spawn(async move { - let connection = connection.lock().await; - if let Some(connection) = connection.as_ref() { - #[cfg(feature = "vsr")] - let (request_header, request_size) = { - let mut consensus_session = consensus_session - .lock() - .expect("consensus session mutex poisoned"); - crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? - }; - #[cfg(not(feature = "vsr"))] - let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; - let (mut send, mut recv) = connection.open_bi().await.map_err(|error| { - error!("Failed to open a bidirectional stream: {error}"); - IggyError::QuicError - })?; - trace!("Sending a QUIC request with code: {code}"); - #[cfg(feature = "vsr")] - trace!( - "Sending a QUIC VSR request of size {} with code: {code}", - request_size - ); - #[cfg(feature = "vsr")] - send.write_all(bytemuck::bytes_of(&request_header)) - .await - .map_err(|error| { - error!("Failed to write VSR request header: {error}"); + let io = async { + let connection = connection.lock().await; + if let Some(connection) = connection.as_ref() { + #[cfg(feature = "vsr")] + let (request_header, request_size) = { + let mut consensus_session = consensus_session + .lock() + .expect("consensus session mutex poisoned"); + crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] + let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + let (mut send, mut recv) = connection.open_bi().await.map_err(|error| { + error!("Failed to open a bidirectional stream: {error}"); + IggyError::QuicError + })?; + trace!("Sending a QUIC request with code: {code}"); + #[cfg(feature = "vsr")] + trace!( + "Sending a QUIC VSR request of size {} with code: {code}", + request_size + ); + #[cfg(feature = "vsr")] + send.write_all(bytemuck::bytes_of(&request_header)) + .await + .map_err(|error| { + error!("Failed to write VSR request header: {error}"); + IggyError::QuicError + })?; + #[cfg(feature = "vsr")] + if !payload.is_empty() { + send.write_all(&payload).await.map_err(|error| { + error!("Failed to write VSR request payload: {error}"); + IggyError::QuicError + })?; + } + #[cfg(feature = "vsr")] + send.finish().map_err(|error| { + error!("Failed to finish VSR request stream: {error}"); IggyError::QuicError })?; - #[cfg(feature = "vsr")] - if !payload.is_empty() { + #[cfg(not(feature = "vsr"))] + send.write_all(&(payload_length as u32).to_le_bytes()) + .await + .map_err(|error| { + error!("Failed to write payload length: {error}"); + IggyError::QuicError + })?; + #[cfg(not(feature = "vsr"))] + send.write_all(&code.to_le_bytes()).await.map_err(|error| { + error!("Failed to write payload code: {error}"); + IggyError::QuicError + })?; + #[cfg(not(feature = "vsr"))] send.write_all(&payload).await.map_err(|error| { - error!("Failed to write VSR request payload: {error}"); + error!("Failed to write payload: {error}"); IggyError::QuicError })?; - } - #[cfg(feature = "vsr")] - send.finish().map_err(|error| { - error!("Failed to finish VSR request stream: {error}"); - IggyError::QuicError - })?; - #[cfg(not(feature = "vsr"))] - send.write_all(&(payload_length as u32).to_le_bytes()) - .await - .map_err(|error| { - error!("Failed to write payload length: {error}"); + #[cfg(not(feature = "vsr"))] + send.finish().map_err(|error| { + error!("Failed to finish sending data: {error}"); IggyError::QuicError })?; - #[cfg(not(feature = "vsr"))] - send.write_all(&code.to_le_bytes()).await.map_err(|error| { - error!("Failed to write payload code: {error}"); - IggyError::QuicError - })?; - #[cfg(not(feature = "vsr"))] - send.write_all(&payload).await.map_err(|error| { - error!("Failed to write payload: {error}"); - IggyError::QuicError - })?; - #[cfg(not(feature = "vsr"))] - send.finish().map_err(|error| { - error!("Failed to finish sending data: {error}"); - IggyError::QuicError - })?; - trace!("Sent a QUIC request with code: {code}, waiting for a response..."); - return QuicClient::handle_response(&mut recv, response_buffer_size as usize).await; - } + trace!("Sent a QUIC request with code: {code}, waiting for a response..."); + return QuicClient::handle_response(&mut recv, response_buffer_size as usize) + .await; + } - error!("Cannot send data. Client is not connected."); - Err(IggyError::NotConnected) + error!("Cannot send data. Client is not connected."); + Err(IggyError::NotConnected) + }; + + if request_timeout.is_zero() { + io.await + } else { + match tokio::time::timeout(request_timeout.get_duration(), io).await { + Ok(result) => result, + Err(_) => { + #[cfg(feature = "vsr")] + { + *consensus_session + .lock() + .expect("consensus session mutex poisoned") = + ConsensusSession::new(); + } + Err(IggyError::RequestTimeout(request_timeout)) + } + } + } }) .await .map_err(|e| { @@ -968,6 +989,10 @@ mod tests { quic_client_config.heartbeat_interval, IggyDuration::from_str("5s").unwrap() ); + assert_eq!( + quic_client_config.request_timeout, + IggyDuration::from_str("30s").unwrap() + ); assert!(quic_client_config.reconnection.enabled); assert!(quic_client_config.reconnection.max_retries.is_none()); diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index f6e6f4980e..c2a7a12836 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -57,12 +57,6 @@ const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; #[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "Iggy"; -/// Upper bound for awaiting a reply on the lockstep VSR connection. Far -/// beyond any healthy round-trip; only trips when the server loses the -/// reply entirely (e.g. stalled replication quorum), which would otherwise -/// hold the stream lock forever and wedge the client. -#[cfg(feature = "vsr")] -const RESPONSE_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); /// TCP client for interacting with the Iggy API. /// It requires a valid server address. @@ -111,7 +105,6 @@ impl Client for TcpClient { } } -#[async_trait] #[async_trait] impl BinaryTransport for TcpClient { async fn get_state(&self) -> ClientState { @@ -681,144 +674,136 @@ impl TcpClient { } let stream = self.stream.clone(); + let request_timeout = self.config.request_timeout; #[cfg(feature = "vsr")] let consensus_session = self.consensus_session.clone(); - // SAFETY: we run code holding the `stream` lock in a task so we can't be cancelled while holding the lock. let result = tokio::spawn(async move { - let mut stream = stream.lock().await; - if let Some(stream) = stream.as_mut() { - #[cfg(feature = "vsr")] - let (request_header, request_size) = { - let mut consensus_session = consensus_session - .lock() - .expect("consensus session mutex poisoned"); - crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? - }; - #[cfg(not(feature = "vsr"))] - let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; - #[cfg(feature = "vsr")] - trace!( - "Sending a TCP VSR request of size {} with code: {code}", - request_size - ); - #[cfg(not(feature = "vsr"))] - trace!("Sending a TCP request of size {payload_length} with code: {code}"); - #[cfg(feature = "vsr")] - stream.write(bytemuck::bytes_of(&request_header)).await?; - #[cfg(feature = "vsr")] - if !payload.is_empty() { - stream.write(&payload).await?; - } - #[cfg(not(feature = "vsr"))] - stream.write(&(payload_length as u32).to_le_bytes()).await?; - #[cfg(not(feature = "vsr"))] - stream.write(&code.to_le_bytes()).await?; - #[cfg(not(feature = "vsr"))] - stream.write(&payload).await?; - stream.flush().await?; - trace!("Sent a TCP request with code: {code}, waiting for a response..."); - #[cfg(feature = "vsr")] - { - let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; - // `stream.read` delegates to `read_exact`; on success it - // always returns the requested length, so no short-read - // guard is needed here. - // - // Deadline guards against server-side reply loss (e.g. a - // stalled replication quorum that never commits the op): - // the connection is lockstep, so an unanswered read would - // hold the stream lock forever and wedge every later - // request on this client. On expiry drop the stream -- - // a late reply would desync framing for the next request. - // - // One deadline spans BOTH the header and body reads: a - // reply that delivers a header then stalls must not get a - // fresh full timeout for the body (which would allow up to - // 2x `RESPONSE_READ_TIMEOUT` total). - let response_deadline = tokio::time::Instant::now() + RESPONSE_READ_TIMEOUT; - let header_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)) - .await; - let Ok(header_read) = header_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for VSR response header for TCP request with code: {code}", - ); - return Err(IggyError::Disconnected); + let io = async { + let mut stream = stream.lock().await; + if let Some(stream) = stream.as_mut() { + #[cfg(feature = "vsr")] + let (request_header, request_size) = { + let mut consensus_session = consensus_session + .lock() + .expect("consensus session mutex poisoned"); + crate::vsr::encode_request_header(&mut consensus_session, code, &payload)? }; - header_read.map_err(|error| { - error!( - "Failed to read VSR response header for TCP request with code: {code}: {error}", - code = code, - error = error - ); - IggyError::Disconnected - })?; - - let response_size = crate::vsr::response_size(&response_header)?; - - let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; - let body = if body_size > 0 { - let mut body = BytesMut::with_capacity(body_size); - let body_read = tokio::time::timeout_at( - response_deadline, - stream.read_buf(&mut body, body_size), - ) - .await; - let Ok(body_read) = body_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for VSR response body for TCP request with code: {code}", - ); - return Err(IggyError::Disconnected); + #[cfg(not(feature = "vsr"))] + let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(feature = "vsr")] + trace!( + "Sending a TCP VSR request of size {} with code: {code}", + request_size + ); + #[cfg(not(feature = "vsr"))] + trace!("Sending a TCP request of size {payload_length} with code: {code}"); + #[cfg(feature = "vsr")] + stream.write(bytemuck::bytes_of(&request_header)).await?; + #[cfg(feature = "vsr")] + if !payload.is_empty() { + stream.write(&payload).await?; + } + #[cfg(not(feature = "vsr"))] + stream.write(&(payload_length as u32).to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] + stream.write(&code.to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] + stream.write(&payload).await?; + stream.flush().await?; + trace!("Sent a TCP request with code: {code}, waiting for a response..."); + #[cfg(feature = "vsr")] + { + let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; + stream + .read(&mut response_header) + .await + .map_err(|error| { + error!( + "Failed to read VSR response header for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + + let response_size = crate::vsr::response_size(&response_header)?; + + let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; + let body = if body_size > 0 { + let mut body = BytesMut::with_capacity(body_size); + stream + .read_buf(&mut body, body_size) + .await + .map_err(|error| { + error!( + "Failed to read VSR response body for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + body.freeze() + } else { + Bytes::new() }; - body_read.map_err(|error| { + + return crate::vsr::decode_response_split(&response_header, body); + } + + #[cfg(not(feature = "vsr"))] + { + let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { error!( - "Failed to read VSR response body for TCP request with code: {code}: {error}", + "Failed to read response for TCP request with code: {code}: {error}", code = code, error = error ); IggyError::Disconnected })?; - body.freeze() - } else { - Bytes::new() - }; - return crate::vsr::decode_response_split(&response_header, body); - } + if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { + error!("Received an invalid or empty response."); + return Err(IggyError::EmptyResponse); + } - #[cfg(not(feature = "vsr"))] - { - let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { - error!( - "Failed to read response for TCP request with code: {code}: {error}", - code = code, - error = error + let status = u32::from_le_bytes( + response_buffer[..4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, ); - IggyError::Disconnected - })?; - - if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { - error!("Received an invalid or empty response."); - return Err(IggyError::EmptyResponse); + let length = u32::from_le_bytes( + response_buffer[4..] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + return TcpClient::handle_response(status, length, stream).await; } + } - let status = u32::from_le_bytes( - response_buffer[..4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - let length = u32::from_le_bytes( - response_buffer[4..] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - return TcpClient::handle_response(status, length, stream).await; + error!("Cannot send data. Client is not connected."); + Err(IggyError::NotConnected) + }; + + if request_timeout.is_zero() { + io.await + } else { + match tokio::time::timeout(request_timeout.get_duration(), io).await { + Ok(result) => result, + Err(_) => { + // Reset to prevent response desync on the shared stream. + *stream.lock().await = None; + #[cfg(feature = "vsr")] + { + *consensus_session + .lock() + .expect("consensus session mutex poisoned") = + ConsensusSession::new(); + } + Err(IggyError::RequestTimeout(request_timeout)) + } } } - - error!("Cannot send data. Client is not connected."); - Err(IggyError::NotConnected) }) .await .map_err(|e| { @@ -826,7 +811,10 @@ impl TcpClient { IggyError::TcpError })?; - if matches!(result, Err(IggyError::Disconnected)) { + if matches!( + result, + Err(IggyError::Disconnected) | Err(IggyError::RequestTimeout(_)) + ) { // Reply stream state is unknown (timed out or torn mid-frame); // a late reply would desync framing for the next request, so // drop the connection and let callers reconnect. @@ -1038,6 +1026,10 @@ mod tests { tcp_client_config.heartbeat_interval, IggyDuration::from_str("5s").unwrap() ); + assert_eq!( + tcp_client_config.request_timeout, + IggyDuration::from_str("30s").unwrap() + ); assert!(tcp_client_config.reconnection.enabled); assert!(tcp_client_config.reconnection.max_retries.is_none()); @@ -1185,4 +1177,32 @@ mod tests { let tcp_client = TcpClient::handle_response(0, 50, &mut stream).await; assert!(tcp_client.is_err()); } + + #[cfg(not(feature = "vsr"))] + #[tokio::test] + async fn should_return_request_timeout_when_server_does_not_respond() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + std::future::pending::<()>().await; + }); + + let config = TcpClientConfig { + server_address: addr.to_string(), + request_timeout: IggyDuration::from_str("100ms").unwrap(), + ..Default::default() + }; + let client = TcpClient::create(Arc::new(config)).unwrap(); + + let tcp_stream = TcpStream::connect(addr).await.unwrap(); + let client_addr = tcp_stream.local_addr().unwrap(); + let stream = ConnectionStreamKind::Tcp(TcpConnectionStream::new(client_addr, tcp_stream)); + *client.stream.lock().await = Some(stream); + *client.state.lock().await = ClientState::Connected; + + let result = client.send_raw(1, Bytes::new()).await; + assert!(matches!(result, Err(IggyError::RequestTimeout(_)))); + } } diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index e1a9838398..a6a38ff55b 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -59,12 +59,6 @@ const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; #[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "WebSocket"; -/// Bound on how long a single VSR reply read may block. The connection is -/// lockstep and the read runs in the caller's task while holding the stream -/// lock, so an unanswered read (lost server reply) would wedge every later -/// request on this client forever. On expiry the stream is dropped. -#[cfg(feature = "vsr")] -const RESPONSE_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); #[derive(Debug)] pub struct WebSocketClient { @@ -657,152 +651,144 @@ impl WebSocketClient { ClientState::Connected | ClientState::Authenticating | ClientState::Authenticated => {} } - let mut stream_guard = self.stream.lock().await; - let stream = stream_guard.as_mut().ok_or_else(|| { - trace!("Cannot send data. Client is not connected."); - IggyError::NotConnected - })?; + let io = async { + let mut stream_guard = self.stream.lock().await; + let stream = stream_guard.as_mut().ok_or_else(|| { + trace!("Cannot send data. Client is not connected."); + IggyError::NotConnected + })?; - #[cfg(feature = "vsr")] - let request = { - let mut consensus_session = self - .consensus_session - .lock() - .expect("consensus session mutex poisoned"); - crate::vsr::encode_contiguous_request(&mut consensus_session, code, &payload)? - }; - #[cfg(not(feature = "vsr"))] - let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; - #[cfg(not(feature = "vsr"))] - let mut request = BytesMut::with_capacity(4 + REQUEST_INITIAL_BYTES_LENGTH + payload.len()); - #[cfg(not(feature = "vsr"))] - request.put_u32_le(payload_length as u32); - #[cfg(not(feature = "vsr"))] - request.put_u32_le(code); - #[cfg(not(feature = "vsr"))] - request.put_slice(&payload); - - trace!( - "Sending {NAME} message with code: {}, payload size: {} bytes", - code, - payload.len() - ); - #[cfg(feature = "vsr")] - trace!( - "Sending {NAME} VSR request of size {} with code: {code}", - request.len() - ); + #[cfg(feature = "vsr")] + let request = { + let mut consensus_session = self + .consensus_session + .lock() + .expect("consensus session mutex poisoned"); + crate::vsr::encode_contiguous_request(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] + let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(not(feature = "vsr"))] + let mut request = + BytesMut::with_capacity(4 + REQUEST_INITIAL_BYTES_LENGTH + payload.len()); + #[cfg(not(feature = "vsr"))] + request.put_u32_le(payload_length as u32); + #[cfg(not(feature = "vsr"))] + request.put_u32_le(code); + #[cfg(not(feature = "vsr"))] + request.put_slice(&payload); + + trace!( + "Sending {NAME} message with code: {}, payload size: {} bytes", + code, + payload.len() + ); + #[cfg(feature = "vsr")] + trace!( + "Sending {NAME} VSR request of size {} with code: {code}", + request.len() + ); - stream.write(&request).await?; - stream.flush().await?; + stream.write(&request).await?; + stream.flush().await?; - #[cfg(feature = "vsr")] - { - // Mirror the TCP path: header onto stack, body into its own - // buffer, `decode_response_split` slices without concatenation. - // Old path did `vec![0; HEADER_SIZE]` + `vec![0; body_size]` - // (two zero-fills) + `BytesMut::with_capacity(response_size)` + - // two `put_slice` memcopies per reply. - // One deadline spans both the header and body reads so a reply - // that delivers a header then stalls cannot wait up to 2x the - // timeout. On expiry drop the stream: the read runs inline here - // (no spawned task to cancel), so without an explicit drop a late - // reply would desync framing for the next request. - let response_deadline = tokio::time::Instant::now() + RESPONSE_READ_TIMEOUT; - let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; - let header_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut response_header)).await; - let Ok(header_read) = header_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for {NAME} VSR response header for request with code: {code}" - ); - *stream_guard = None; - return Err(IggyError::Disconnected); - }; - header_read?; - - let response_size = crate::vsr::response_size(&response_header)?; - let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; - let body = if body_size > 0 { - // `WebSocketStreamKind::read` reads into a slice without a - // zero-fill prerequisite; we still allocate `body_size` but - // skip the header concatenation. - let mut body = vec![0u8; body_size]; - let body_read = - tokio::time::timeout_at(response_deadline, stream.read(&mut body)).await; - let Ok(body_read) = body_read else { - error!( - "Timed out after {RESPONSE_READ_TIMEOUT:?} waiting for {NAME} VSR response body for request with code: {code}" - ); - *stream_guard = None; - return Err(IggyError::Disconnected); + #[cfg(feature = "vsr")] + { + let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; + stream.read(&mut response_header).await?; + + let response_size = crate::vsr::response_size(&response_header)?; + let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; + let body = if body_size > 0 { + let mut body = vec![0u8; body_size]; + stream.read(&mut body).await?; + Bytes::from(body) + } else { + Bytes::new() }; - body_read?; - Bytes::from(body) - } else { - Bytes::new() - }; - crate::vsr::decode_response_split(&response_header, body) - } + crate::vsr::decode_response_split(&response_header, body) + } - #[cfg(not(feature = "vsr"))] - { - let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - stream.read(&mut response_initial_buffer).await?; - - let status = u32::from_le_bytes([ - response_initial_buffer[0], - response_initial_buffer[1], - response_initial_buffer[2], - response_initial_buffer[3], - ]); - - let length = u32::from_le_bytes([ - response_initial_buffer[4], - response_initial_buffer[5], - response_initial_buffer[6], - response_initial_buffer[7], - ]) as usize; + #[cfg(not(feature = "vsr"))] + { + let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + stream.read(&mut response_initial_buffer).await?; + + let status = u32::from_le_bytes([ + response_initial_buffer[0], + response_initial_buffer[1], + response_initial_buffer[2], + response_initial_buffer[3], + ]); + + let length = u32::from_le_bytes([ + response_initial_buffer[4], + response_initial_buffer[5], + response_initial_buffer[6], + response_initial_buffer[7], + ]) as usize; + + trace!( + "Received {NAME} response status: {}, length: {} bytes", + status, length + ); - trace!( - "Received {NAME} response status: {}, length: {} bytes", - status, length - ); + if status != 0 { + // TEMP: See https://github.com/apache/iggy/pull/604 for context. + if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::UserAlreadyExists as u32 + || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 + || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 + { + debug!( + "Received a server resource already exists response: {} ({})", + status, + IggyError::from_code_as_string(status) + ) + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status), + ); + } - if status != 0 { - // TEMP: See https://github.com/apache/iggy/pull/604 for context. - if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::UserAlreadyExists as u32 - || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 - || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 - { - debug!( - "Received a server resource already exists response: {} ({})", - status, - IggyError::from_code_as_string(status) - ) - } else { - error!( - "Received an invalid response with status: {} ({}).", - status, - IggyError::from_code_as_string(status), - ); + return Err(IggyError::from_code(status)); } - return Err(IggyError::from_code(status)); - } + if length == 0 { + return Ok(Bytes::new()); + } - if length == 0 { - return Ok(Bytes::new()); - } + let mut response_buffer = vec![0u8; length]; + stream.read(&mut response_buffer).await?; - let mut response_buffer = vec![0u8; length]; - stream.read(&mut response_buffer).await?; + trace!("Received {NAME} response payload, size: {} bytes", length); + Ok(Bytes::from(response_buffer)) + } + }; - trace!("Received {NAME} response payload, size: {} bytes", length); - Ok(Bytes::from(response_buffer)) + if self.config.request_timeout.is_zero() { + io.await + } else { + match tokio::time::timeout(self.config.request_timeout.get_duration(), io).await { + Ok(result) => result, + Err(_) => { + // Reset to prevent response desync on the shared stream. + *self.stream.lock().await = None; + self.set_state(ClientState::Disconnected).await; + #[cfg(feature = "vsr")] + { + *self + .consensus_session + .lock() + .expect("consensus session mutex poisoned") = ConsensusSession::new(); + } + Err(IggyError::RequestTimeout(self.config.request_timeout)) + } + } } } } @@ -825,6 +811,10 @@ mod tests { client.config.heartbeat_interval, IggyDuration::from_str("5s").unwrap() ); + assert_eq!( + client.config.request_timeout, + IggyDuration::from_str("30s").unwrap() + ); assert!(matches!(client.config.auto_login, AutoLogin::Disabled)); assert!(client.config.reconnection.enabled); } diff --git a/examples/rust/src/shared/args.rs b/examples/rust/src/shared/args.rs index ef43f5fb40..c95451fdba 100644 --- a/examples/rust/src/shared/args.rs +++ b/examples/rust/src/shared/args.rs @@ -190,6 +190,9 @@ pub struct Args { #[arg(long, default_value = "false")] pub websocket_tls_validate_certificate: bool, + + #[arg(long, default_value = "30s")] + pub request_timeout: String, } impl Args { @@ -262,6 +265,7 @@ impl Default for Args { websocket_tls_domain: "localhost".to_string(), websocket_tls_ca_file: None, websocket_tls_validate_certificate: false, + request_timeout: "30s".to_string(), } } } @@ -373,6 +377,7 @@ impl Args { websocket_tls_domain: self.websocket_tls_domain.clone(), websocket_tls_ca_file: self.websocket_tls_ca_file.clone(), websocket_tls_validate_certificate: self.websocket_tls_validate_certificate, + request_timeout: self.request_timeout.clone(), } }