Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/common/src/error/client_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ pub enum ClientError {
/// Transport is invalid and cannot be used.
#[error("Invalid transport {0}")]
InvalidTransport(String),
/// Duration value is invalid.
#[error("Invalid duration: {0}")]
InvalidDuration(String),
/// IO error.
#[error("IO error")]
IoError(#[from] io::Error),
Expand Down
3 changes: 3 additions & 0 deletions core/common/src/error/iggy_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::Identifier;
use crate::IggyDuration;
use crate::utils::topic_size::MaxTopicSize;
use crate::{IggyMessage, utils::byte_size::IggyByteSize};
use std::sync::Arc;
Expand Down Expand Up @@ -502,6 +503,8 @@ pub enum IggyError {
CannotBindToSocket(String) = 12000,
#[error("Task execution timeout")]
TaskTimeout = 12001,
#[error("Request timeout after {0}")]
RequestTimeout(IggyDuration) = 12002,

#[error("IO error: {0}")]
IoError(String) = 13000,
Expand Down
14 changes: 14 additions & 0 deletions core/common/src/types/args/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ pub struct ArgsOptional {
#[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
pub websocket_reconnection_interval: Option<String>,

/// The optional per-request timeout for send/receive operations

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth a note here that request_timeout values "0", "unlimited", "disabled", "none" all map to no timeout (infinite wait), since the is_zero() fast-path skips the timeout entirely. "0" reading as infinite rather than instant is the opposite of what most people expect.

///
/// [default: "30s"]
#[arg(long)]
#[serde(skip_serializing_if = "Option::is_none")]
pub request_timeout: Option<String>,
}

/// The arguments used by the `ClientProviderConfig` to create a client.
Expand Down Expand Up @@ -351,6 +358,9 @@ pub struct Args {

/// The optional TLS validate certificate for the WebSocket transport
pub websocket_tls_validate_certificate: bool,

/// The per-request timeout for send/receive operations
pub request_timeout: String,
}

const QUIC_TRANSPORT: &str = "quic";
Expand Down Expand Up @@ -416,6 +426,7 @@ impl Default for Args {
websocket_tls_domain: "localhost".to_string(),
websocket_tls_ca_file: None,
websocket_tls_validate_certificate: false,
request_timeout: "30s".to_string(),
}
}
}
Expand Down Expand Up @@ -517,6 +528,9 @@ impl From<Vec<ArgsOptional>> for Args {
{
args.websocket_reconnection_interval = websocket_reconnection_interval;
}
if let Some(request_timeout) = optional_args.request_timeout {
args.request_timeout = request_timeout;
}
}

args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct QuicClientConfig {
pub validate_certificate: bool,
/// Interval of heartbeats sent by the client
pub heartbeat_interval: IggyDuration,
/// Per-request timeout for send/receive operations.
pub request_timeout: IggyDuration,
}

impl Default for QuicClientConfig {
Expand All @@ -64,6 +66,7 @@ impl Default for QuicClientConfig {
server_name: "localhost".to_string(),
auto_login: AutoLogin::Disabled,
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
request_timeout: IggyDuration::from_str("30s").unwrap(),
reconnection: QuicClientReconnectionConfig::default(),
response_buffer_size: 1000 * 1000 * 10,
max_concurrent_bidi_streams: 10000,
Expand Down Expand Up @@ -96,6 +99,7 @@ impl From<ConnectionString<QuicConnectionStringOptions>> for QuicClientConfig {
max_idle_timeout: connection_string.options().max_idle_timeout(),
validate_certificate: connection_string.options().validate_certificate(),
heartbeat_interval: connection_string.options().heartbeat_interval(),
request_timeout: connection_string.options().request_timeout(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ impl QuicClientConfigBuilder {
self
}

/// Sets the per-request timeout for send/receive operations.
pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self {
self.config.request_timeout = request_timeout;
self
}

/// Finalizes the builder and returns the `QuicClientConfig`.
pub fn build(mut self) -> Result<QuicClientConfig, IggyError> {
self.config.server_address = self.config.server_address.trim().to_owned();
Expand All @@ -164,6 +170,7 @@ impl QuicClientConfigBuilder {
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;

#[test]
fn build_should_trim_and_validate_server_address() {
Expand Down Expand Up @@ -193,4 +200,16 @@ mod tests {

assert!(result.is_err());
}

#[test]
fn with_request_timeout_should_override_default() {
let config = QuicClientConfigBuilder::default()
.with_request_timeout(IggyDuration::from_str("60s").unwrap())
.build()
.unwrap();
assert_eq!(
config.request_timeout,
IggyDuration::from_str("60s").unwrap()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct QuicConnectionStringOptions {
max_idle_timeout: u64,
validate_certificate: bool,
heartbeat_interval: IggyDuration,
request_timeout: IggyDuration,
}

impl QuicConnectionStringOptions {
Expand Down Expand Up @@ -73,6 +74,10 @@ impl QuicConnectionStringOptions {
pub fn validate_certificate(&self) -> bool {
self.validate_certificate
}

pub fn request_timeout(&self) -> IggyDuration {
self.request_timeout
}
}

impl ConnectionStringOptions for QuicConnectionStringOptions {
Expand All @@ -96,6 +101,7 @@ impl ConnectionStringOptions for QuicConnectionStringOptions {
let mut max_idle_timeout = 10000;
let mut validate_certificate = false;
let mut heartbeat_interval = "5s".to_owned();
let mut request_timeout = "30s".to_owned();

// For reconnection config
let mut reconnection_max_retries = "unlimited".to_owned();
Expand Down Expand Up @@ -178,6 +184,9 @@ impl ConnectionStringOptions for QuicConnectionStringOptions {
"heartbeat_interval" => {
heartbeat_interval = option_parts[1].to_string();
}
"request_timeout" => {
request_timeout = option_parts[1].to_string();
}
"reconnection_max_retries" => {
reconnection_max_retries = option_parts[1].to_string();
}
Expand Down Expand Up @@ -211,6 +220,8 @@ impl ConnectionStringOptions for QuicConnectionStringOptions {

let heartbeat_interval = IggyDuration::from_str(heartbeat_interval.as_str())
.map_err(|_| IggyError::InvalidConnectionString)?;
let request_timeout = IggyDuration::from_str(request_timeout.as_str())
.map_err(|_| IggyError::InvalidConnectionString)?;

let connection_string_options = QuicConnectionStringOptions::new(
reconnection,
Expand All @@ -224,6 +235,7 @@ impl ConnectionStringOptions for QuicConnectionStringOptions {
max_idle_timeout,
validate_certificate,
heartbeat_interval,
request_timeout,
);

Ok(connection_string_options)
Expand All @@ -244,6 +256,7 @@ impl QuicConnectionStringOptions {
max_idle_timeout: u64,
validate_certificate: bool,
heartbeat_interval: IggyDuration,
request_timeout: IggyDuration,
) -> Self {
Self {
reconnection,
Expand All @@ -257,6 +270,7 @@ impl QuicConnectionStringOptions {
max_idle_timeout,
validate_certificate,
heartbeat_interval,
request_timeout,
}
}
}
Expand All @@ -275,6 +289,7 @@ impl Default for QuicConnectionStringOptions {
max_idle_timeout: 10000,
validate_certificate: false,
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
request_timeout: IggyDuration::from_str("30s").unwrap(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct TcpClientConfig {
pub reconnection: TcpClientReconnectionConfig,
/// Interval of heartbeats sent by the client
pub heartbeat_interval: IggyDuration,
/// Per-request timeout for send/receive operations.
pub request_timeout: IggyDuration,
Comment thread
hubcio marked this conversation as resolved.
/// Disable Nagle algorithm for the TCP socket.
pub nodelay: bool,
}
Expand All @@ -54,6 +56,7 @@ impl Default for TcpClientConfig {
tls_ca_file: None,
tls_validate_certificate: true,
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
request_timeout: IggyDuration::from_str("30s").unwrap(),
auto_login: AutoLogin::Disabled,
reconnection: TcpClientReconnectionConfig::default(),
nodelay: false,
Expand All @@ -73,6 +76,7 @@ impl From<ConnectionString<TcpConnectionStringOptions>> for TcpClientConfig {
tls_validate_certificate: true,
reconnection: connection_string.options().reconnection().to_owned(),
heartbeat_interval: connection_string.options().heartbeat_interval(),
request_timeout: connection_string.options().request_timeout(),
nodelay: connection_string.options().nodelay(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ impl TcpClientConfigBuilder {
self
}

/// Sets the per-request timeout for send/receive operations.
pub fn with_request_timeout(mut self, request_timeout: IggyDuration) -> Self {
self.config.request_timeout = request_timeout;
self
}

/// Builds the TCP client configuration.
pub fn build(mut self) -> Result<TcpClientConfig, IggyError> {
self.config.server_address = self.config.server_address.trim().to_owned();
Expand All @@ -114,6 +120,7 @@ impl TcpClientConfigBuilder {
mod tests {
use super::*;
use crate::IggyError;
use std::str::FromStr;

fn builder_with_address(addr: &str) -> TcpClientConfigBuilder {
let mut builder = TcpClientConfigBuilder::default();
Expand Down Expand Up @@ -187,4 +194,16 @@ mod tests {
let builder = builder_with_address("iggy-server:8090");
assert!(builder.build().is_ok());
}

#[test]
fn with_request_timeout_should_override_default() {
let config = TcpClientConfigBuilder::default()
.with_request_timeout(IggyDuration::from_str("60s").unwrap())
.build()
.unwrap();
assert_eq!(
config.request_timeout,
IggyDuration::from_str("60s").unwrap()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct TcpConnectionStringOptions {
tls_ca_file: Option<String>,
reconnection: TcpClientReconnectionConfig,
heartbeat_interval: IggyDuration,
request_timeout: IggyDuration,
nodelay: bool,
}

Expand All @@ -45,6 +46,10 @@ impl TcpConnectionStringOptions {
&self.reconnection
}

pub fn request_timeout(&self) -> IggyDuration {
self.request_timeout
}

pub fn nodelay(&self) -> bool {
self.nodelay
}
Expand All @@ -68,6 +73,7 @@ impl ConnectionStringOptions for TcpConnectionStringOptions {
let mut reconnection_interval = "1s".to_owned();
let mut reestablish_after = "5s".to_owned();
let mut heartbeat_interval = "5s".to_owned();
let mut request_timeout = "30s".to_owned();
let mut nodelay = false;

for option in options {
Expand Down Expand Up @@ -97,6 +103,9 @@ impl ConnectionStringOptions for TcpConnectionStringOptions {
"heartbeat_interval" => {
heartbeat_interval = option_parts[1].to_string();
}
"request_timeout" => {
request_timeout = option_parts[1].to_string();
}
"nodelay" => {
nodelay = option_parts[1] == "true";
}
Expand Down Expand Up @@ -124,13 +133,16 @@ impl ConnectionStringOptions for TcpConnectionStringOptions {

let heartbeat_interval = IggyDuration::from_str(heartbeat_interval.as_str())
.map_err(|_| IggyError::InvalidConnectionString)?;
let request_timeout = IggyDuration::from_str(request_timeout.as_str())
.map_err(|_| IggyError::InvalidConnectionString)?;

let connection_string_options = TcpConnectionStringOptions::new(
tls_enabled,
tls_domain,
tls_ca_file,
reconnection,
heartbeat_interval,
request_timeout,
nodelay,
);

Expand All @@ -145,6 +157,7 @@ impl TcpConnectionStringOptions {
tls_ca_file: Option<String>,
reconnection: TcpClientReconnectionConfig,
heartbeat_interval: IggyDuration,
request_timeout: IggyDuration,
nodelay: bool,
) -> Self {
Self {
Expand All @@ -153,6 +166,7 @@ impl TcpConnectionStringOptions {
tls_ca_file,
reconnection,
heartbeat_interval,
request_timeout,
nodelay,
}
}
Expand All @@ -166,6 +180,7 @@ impl Default for TcpConnectionStringOptions {
tls_ca_file: None,
reconnection: Default::default(),
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
request_timeout: IggyDuration::from_str("30s").unwrap(),
nodelay: false,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct WebSocketClientConfig {
pub reconnection: WebSocketClientReconnectionConfig,
/// Interval of heartbeats sent by the client
pub heartbeat_interval: IggyDuration,
/// Per-request timeout for send/receive operations.
pub request_timeout: IggyDuration,
/// WebSocket-specific configuration.
pub ws_config: WebSocketConfig,
/// Whether tls is enabled
Expand Down Expand Up @@ -70,6 +72,7 @@ impl Default for WebSocketClientConfig {
auto_login: AutoLogin::Disabled,
reconnection: WebSocketClientReconnectionConfig::default(),
heartbeat_interval: IggyDuration::from_str("5s").unwrap(),
request_timeout: IggyDuration::from_str("30s").unwrap(),
ws_config: WebSocketConfig::default(),
tls_enabled: false,
tls_domain: "localhost".to_string(),
Expand Down Expand Up @@ -156,6 +159,7 @@ impl From<ConnectionString<WebSocketConnectionStringOptions>> for WebSocketClien
auto_login: connection_string.auto_login().to_owned(),
reconnection: connection_string.options().reconnection().to_owned(),
heartbeat_interval: connection_string.options().heartbeat_interval(),
request_timeout: connection_string.options().request_timeout(),
ws_config,
tls_enabled: options.tls_enabled(),
tls_domain: options.tls_domain().into(),
Expand Down
Loading
Loading