feat(sdk): support per-request timeout to TCP/QUIC/WebSocket clients#3429
feat(sdk): support per-request timeout to TCP/QUIC/WebSocket clients#3429chengxilo wants to merge 12 commits into
Conversation
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3429 +/- ##
============================================
- Coverage 74.41% 72.44% -1.98%
Complexity 937 937
============================================
Files 1243 1243
Lines 125987 121394 -4593
Branches 101854 97305 -4549
============================================
- Hits 93756 87942 -5814
- Misses 29218 30167 +949
- Partials 3013 3285 +272
🚀 New features to boost your workflow:
|
8b9a947 to
005b84e
Compare
|
/ready |
3c5bfab to
b08a9a2
Compare
|
/author |
|
After merging there is something need to be changed. |
|
/ready |
There was a problem hiding this comment.
a few findings that don't map onto changed lines:
- the consumer poll loop error classifier (
consumer.rs) only treatsDisconnected | Unauthenticated | StaleClientas connection errors, so aRequestTimeout(andNotConnected) falls through as a raw error withcan_pollleft true and no backoff, on every transport. this is the real root of the quic no-recovery behavior noted in the quic comment. consumer.rs isn't touched by this PR so flagging it here. - the PR description says default 300s but the code is 30s everywhere (a later commit reduced it). update the description.
- no test for the
is_zero()no-timeout branch, nor for the vsr session-reset-on-timeout path. - the timeout is rust-sdk only; java/.net/python/go/c++/node don't get it - worth filing parity issues later on
| if request_timeout.is_zero() { | ||
| io.await | ||
| } else { | ||
| match tokio::time::timeout(request_timeout.get_duration(), io).await { |
There was a problem hiding this comment.
the timeout wraps the whole io future including stream.lock().await, so the lock-acquire wait counts against the request budget. on master the read deadline was set after lock+write+flush, timing only the response read. since the heartbeat ping shares this same per-transport stream mutex, a request queued behind a stalled in-flight one can hit RequestTimeout on lock-wait alone and then tear down the shared connection, cascading to every queued request. acquire the stream lock outside the timed region and time only the io. same in quic_client.rs and websocket_client.rs.
| })?; | ||
|
|
||
| if matches!(result, Err(IggyError::Disconnected)) { | ||
| if matches!( |
There was a problem hiding this comment.
this teardown nulls the stream and sets Disconnected directly, bypassing disconnect(), so publish_event(DiagnosticEvent::Disconnected) never fires on a timeout - diagnostic subscribers miss every timeout disconnect. routing the timeout teardown through disconnect() would emit the event from one shared path. same gap on websocket (set_state in the timeout arm) and quic (no set_state at all).
| Ok(result) => result, | ||
| Err(_) => { | ||
| // Reset to prevent response desync on the shared stream. | ||
| *stream.lock().await = None; |
There was a problem hiding this comment.
this inner *stream.lock().await = None is redundant with the outer self.stream.lock().await.take() that already runs for RequestTimeout(_). only the consensus_session reset is unique to this block. dropping the inner null and letting the single outer teardown handle stream + state removes the split-lock window and the duplicate work.
| *stream.lock().await = None; | ||
| #[cfg(feature = "vsr")] | ||
| { | ||
| *consensus_session |
There was a problem hiding this comment.
resetting consensus_session mints a fresh client_id. replicated metadata ops are deduped server-side on (client_id, session, request), so a mutation retried after a timed-out-but-committed op runs under the new client_id, misses dedup, and can double-apply. send_messages/partition writes are unaffected (deduped by message id). preserve the client_id across the timeout-reconnect, or suppress transparent retry of replicated mutations after a session reset. vsr-gated so not a blocker for the non-vsr build, but worth fixing before vsr ships.
| if request_timeout.is_zero() { | ||
| io.await | ||
| } else { | ||
| match tokio::time::timeout(request_timeout.get_duration(), io).await { |
There was a problem hiding this comment.
unlike tcp/ws, the quic timeout path resets consensus_session (vsr) but never calls set_state(Disconnected) or closes the connection - state stays Connected. RequestTimeout is also absent from the quic retry list, so a quic client never auto-reconnects after a timeout (tcp/ws recover because their next call sees NotConnected). under a persistent server stall the consumer just re-opens open_bi() and re-times-out every request_timeout, surfacing the error each cycle without recovering. fix: set_state(Disconnected) here too, or add RequestTimeout to the retry list.
| request_timeout: parse_duration(&args.request_timeout)?, | ||
| })); | ||
| } | ||
| TransportProtocol::Http => { |
There was a problem hiding this comment.
the http arm builds HttpClientConfig without request_timeout and sets no reqwest timeout, so --request-timeout is silently dropped when --transport http is used, while it applies to the other three transports. wire it into the reqwest client or reject the combination explicitly.
| #[serde(skip_serializing_if = "Option::is_none")] | ||
| pub websocket_reconnection_interval: Option<String>, | ||
|
|
||
| /// The optional per-request timeout for send/receive operations |
There was a problem hiding this comment.
worth a note here that request_timeout values "0", "unlimited", "disabled", "none" all map to no timeout (infinite wait), since the is_zero() fast-path skips the timeout entirely. "0" reading as infinite rather than instant is the opposite of what most people expect.
Which issue does this PR address?
Closes #3419
Rationale
Clients had no deadline on individual request, so a stalled server could block a caller forever.
What changed?
send_rawwould be blocked forever if server doesn't response. I added a request_timeout field (IggyDuration, default 300s) to TcpClientConfig, QuicClientConfig, and WebSocketClientConfig. Each client'ssend_rawnow wraps its I/O intokio::time::timeoutLocal Execution
AI Usage
Change Log
By default, Iggy Rust SDK would have a 30s timeout for each request.