Skip to content

Commit 9369ddc

Browse files
committed
Try and make watch client connection more resilient.
1 parent 7f9c90c commit 9369ddc

7 files changed

Lines changed: 154 additions & 19 deletions

File tree

rewatch/EXPERIMENT_DAEMON.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ Eliminate lockfile limitation via gRPC daemon that owns build state. Commands (`
115115
| Client isolation | client_id on all events | Concurrent builds don't interfere, each sees only its own |
116116
| Per-build CLI flags | Passed to build, not stored | Flags like `--warn-error` apply per-build without persisting |
117117
| Scoping via path | `get_scope_package_from_working_dir()` | Working dir determines scope, passed as parameter to commands |
118+
| Connection resilience | HTTP/2 keepalive + heartbeat + retry | Daemon detects dead clients, watch client auto-reconnects |
118119

119120
## How It Works
120121

@@ -190,6 +191,16 @@ The watch client (`client/watch.rs`) uses a **daemon-driven** watching strategy:
190191

191192
**Signal handling:** Watch client uses `tokio::signal::unix` to handle SIGTERM/SIGINT. On signal, sends `DisconnectRequest` to daemon before exiting cleanly.
192193

194+
**Reconnection:** On gRPC stream errors, the watch client automatically reconnects:
195+
196+
- `run()` wraps each session in a retry loop with exponential backoff (1s → 2s → 4s → ... → 30s cap)
197+
- `run_watch_session()` returns `SessionExit::Clean` (signal, no retry) or `SessionExit::StreamError` (retry)
198+
- On reconnect, `connect_or_start()` handles starting a new daemon if the old one died
199+
- File watchers are recreated fresh via `WatchPaths` event each session — no state carryover needed
200+
- Logs the last event received before the error for diagnostics
201+
202+
**Heartbeat:** Daemon sends `Heartbeat` event every 30s on the watch stream. This ensures broken connections are detected even during idle periods (no build activity). The heartbeat write failing on the server side triggers tonic to drop the stream, firing `scopeguard` cleanup.
203+
193204
**CompileType enum:** `Incremental`, `SourceCreated`, `SourceDeleted`, `SourceRenamed`, `ConfigChange`
194205

195206
### Daemon Lifecycle
@@ -199,6 +210,7 @@ The watch client (`client/watch.rs`) uses a **daemon-driven** watching strategy:
199210
- SIGTERM/SIGINT → graceful shutdown
200211
- SIGHUP → ignored (daemon survives terminal close)
201212
- All clients disconnected → shutdown
213+
- HTTP/2 keepalive: `http2_keepalive_interval(10s)` + `http2_keepalive_timeout(5s)` on tonic server — detects dead clients within ~15s even if no application traffic
202214
- Writes PID to `<root>/lib/bs/rescript.daemon.pid`
203215
- Writes socket path to `<root>/lib/bs/rescript.sock.path`
204216
- Cleans up socket, PID file, and socket path file on exit
@@ -312,6 +324,7 @@ Update after each session. Reflect current state, not history.
312324
- **std::sync::Mutex in async**: Works because locking only in `spawn_blocking`, but footgun
313325
- **No protocol versioning**: Client/daemon must be same version
314326
- **Unix-only**: No Windows support (needs named pipes or TCP)
327+
- **Reconnection is watch-only**: Build/clean/format clients don't retry on stream errors (they're short-lived, so restart is acceptable)
315328

316329
## Testing
317330

rewatch/proto/rescript.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ message DaemonEvent {
9191
FormatProgress format_progress = 21;
9292
FormatCheckFailed format_check_failed = 22;
9393
FormattedStdin formatted_stdin = 23;
94+
95+
// Keepalive
96+
Heartbeat heartbeat = 34;
9497
}
9598
}
9699

@@ -364,6 +367,16 @@ message FormattedStdin {
364367
string content = 2; // Formatted content from stdin
365368
}
366369

370+
// =============================================================================
371+
// Keepalive
372+
// =============================================================================
373+
374+
// Periodic heartbeat sent on long-lived streams (e.g., watch) so broken
375+
// connections are detected even when no build activity is happening.
376+
message Heartbeat {
377+
uint64 client_id = 1;
378+
}
379+
367380
// =============================================================================
368381
// Request/Response Messages
369382
// =============================================================================

rewatch/src/client/debug.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,8 @@ impl App {
419419
format!("{} ({})", e.command, output_info),
420420
)
421421
}
422+
// Heartbeats are keepalive signals — don't show in debug TUI
423+
DaemonEventVariant::Heartbeat(_) => return,
422424
};
423425

424426
self.logs.push(LogEntry {

rewatch/src/client/output.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ This inconsistency will cause issues with package resolution.",
324324
| Event::BuildStarted(_)
325325
| Event::FileChanged(_)
326326
| Event::WatchPaths(_)
327+
| Event::Heartbeat(_)
327328
| Event::FormatStarted(_)
328329
| Event::FormatFinished(_)
329330
| Event::FormatProgress(_)

rewatch/src/client/watch.rs

Lines changed: 98 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::collections::{HashMap, HashSet};
22
use std::path::{Path, PathBuf};
33
use std::sync::Arc;
4-
use std::time::SystemTime;
4+
use std::time::{Duration, SystemTime};
55

66
use anyhow::Result;
77
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
88
use tokio::sync::mpsc;
9+
use tokio::time::sleep;
910
use tokio_stream::StreamExt;
1011

1112
#[cfg(unix)]
@@ -116,23 +117,96 @@ fn setup_watchers(
116117
state
117118
}
118119

119-
/// Run watch mode via the daemon
120+
/// Outcome of a single watch session.
121+
enum SessionExit {
122+
/// User-initiated shutdown (signal). Don't retry.
123+
Clean,
124+
/// gRPC stream broke. Retry with reconnection.
125+
StreamError(String),
126+
}
127+
128+
/// Return a short description of a daemon event variant for diagnostic logging.
129+
fn event_description(event: &DaemonEventVariant) -> &'static str {
130+
match event {
131+
DaemonEventVariant::ClientConnected(_) => "ClientConnected",
132+
DaemonEventVariant::ClientDisconnected(_) => "ClientDisconnected",
133+
DaemonEventVariant::BuildStarted(_) => "BuildStarted",
134+
DaemonEventVariant::BuildFinished(_) => "BuildFinished",
135+
DaemonEventVariant::Cleaned(_) => "Cleaned",
136+
DaemonEventVariant::Parsed(_) => "Parsed",
137+
DaemonEventVariant::Compiling(_) => "Compiling",
138+
DaemonEventVariant::Compiled(_) => "Compiled",
139+
DaemonEventVariant::CleanedCompilerAssets(_) => "CleanedCompilerAssets",
140+
DaemonEventVariant::CleanedJsFiles(_) => "CleanedJsFiles",
141+
DaemonEventVariant::CircularDependency(_) => "CircularDependency",
142+
DaemonEventVariant::UnallowedDependency(_) => "UnallowedDependency",
143+
DaemonEventVariant::PackageTreeError(_) => "PackageTreeError",
144+
DaemonEventVariant::ModuleNotFound(_) => "ModuleNotFound",
145+
DaemonEventVariant::InitializationError(_) => "InitializationError",
146+
DaemonEventVariant::CompilerWarning(_) => "CompilerWarning",
147+
DaemonEventVariant::CompilerError(_) => "CompilerError",
148+
DaemonEventVariant::ConfigWarning(_) => "ConfigWarning",
149+
DaemonEventVariant::DuplicatedPackage(_) => "DuplicatedPackage",
150+
DaemonEventVariant::MissingImplementation(_) => "MissingImplementation",
151+
DaemonEventVariant::PackageNameMismatch(_) => "PackageNameMismatch",
152+
DaemonEventVariant::FileChanged(_) => "FileChanged",
153+
DaemonEventVariant::WatchPaths(_) => "WatchPaths",
154+
DaemonEventVariant::Heartbeat(_) => "Heartbeat",
155+
DaemonEventVariant::FormatStarted(_) => "FormatStarted",
156+
DaemonEventVariant::FormatFinished(_) => "FormatFinished",
157+
DaemonEventVariant::FormatProgress(_) => "FormatProgress",
158+
DaemonEventVariant::FormatCheckFailed(_) => "FormatCheckFailed",
159+
DaemonEventVariant::FormattedStdin(_) => "FormattedStdin",
160+
DaemonEventVariant::JsPostBuildOutput(_) => "JsPostBuildOutput",
161+
}
162+
}
163+
164+
/// Run watch mode via the daemon, reconnecting on stream errors.
120165
pub async fn run(folder: &str, filter: Option<String>, after_build: Option<String>) -> Result<()> {
121166
let root = connection::find_project_root(Path::new(folder))?;
122167
let working_dir = Path::new(folder).canonicalize()?;
123168

124-
let mut client = connection::connect_or_start(&root).await?;
169+
let mut delay = Duration::from_secs(1);
170+
let max_delay = Duration::from_secs(30);
171+
172+
loop {
173+
match run_watch_session(&root, &working_dir, &filter, &after_build).await {
174+
Ok(SessionExit::Clean) => return Ok(()),
175+
Ok(SessionExit::StreamError(msg)) => {
176+
eprintln!(
177+
"Connection to daemon lost ({}). Reconnecting in {}s...",
178+
msg,
179+
delay.as_secs()
180+
);
181+
sleep(delay).await;
182+
delay = (delay * 2).min(max_delay);
183+
}
184+
Err(e) => return Err(e),
185+
}
186+
}
187+
}
188+
189+
/// Run a single watch session. Returns `SessionExit::Clean` on signal,
190+
/// `SessionExit::StreamError` on gRPC stream failure, or `Err` for
191+
/// non-recoverable errors (e.g. initialization failure).
192+
async fn run_watch_session(
193+
root: &Path,
194+
working_dir: &Path,
195+
filter: &Option<String>,
196+
after_build: &Option<String>,
197+
) -> Result<SessionExit> {
198+
let mut client = connection::connect_or_start(root).await?;
125199

126200
// Send watch request to daemon
127201
let request = WatchRequest {
128202
working_directory: working_dir.to_string_lossy().to_string(),
129-
filter,
203+
filter: filter.clone(),
130204
};
131205

132206
let mut build_stream = client.watch(request).await?.into_inner();
133207

134208
// Clone client for file/config change notifications
135-
let mut notify_client = connection::connect(&root).await?;
209+
let mut notify_client = connection::connect(root).await?;
136210

137211
// Set up signal handlers for graceful shutdown
138212
#[cfg(unix)]
@@ -151,6 +225,9 @@ pub async fn run(folder: &str, filter: Option<String>, after_build: Option<Strin
151225
// Track client_id for graceful disconnect
152226
let mut client_id: Option<u64> = None;
153227

228+
// Track the last event received for diagnostic logging on stream errors
229+
let mut last_event: &str = "none";
230+
154231
loop {
155232
tokio::select! {
156233
// Handle SIGTERM for graceful shutdown
@@ -165,7 +242,7 @@ pub async fn run(folder: &str, filter: Option<String>, after_build: Option<Strin
165242
if let Some(id) = client_id {
166243
let _ = notify_client.disconnect(DisconnectRequest { client_id: id }).await;
167244
}
168-
break;
245+
return Ok(SessionExit::Clean);
169246
}
170247

171248
// Handle SIGINT (Ctrl-C) for graceful shutdown
@@ -180,7 +257,7 @@ pub async fn run(folder: &str, filter: Option<String>, after_build: Option<Strin
180257
if let Some(id) = client_id {
181258
let _ = notify_client.disconnect(DisconnectRequest { client_id: id }).await;
182259
}
183-
break;
260+
return Ok(SessionExit::Clean);
184261
}
185262

186263
// Handle file system events
@@ -254,6 +331,11 @@ pub async fn run(folder: &str, filter: Option<String>, after_build: Option<Strin
254331
Some(result) = build_stream.next() => {
255332
match result {
256333
Ok(event) => {
334+
// Track the last event for diagnostic logging
335+
if let Some(ref evt) = event.event {
336+
last_event = event_description(evt);
337+
}
338+
257339
// Check if this is a WatchPaths event — update our watchers
258340
if let Some(DaemonEventVariant::WatchPaths(ref watch_paths)) = event.event {
259341
let source_paths: Vec<(PathBuf, bool)> = watch_paths
@@ -275,6 +357,11 @@ pub async fn run(folder: &str, filter: Option<String>, after_build: Option<Strin
275357
continue;
276358
}
277359

360+
// Silently consume heartbeats
361+
if matches!(event.event, Some(DaemonEventVariant::Heartbeat(_))) {
362+
continue;
363+
}
364+
278365
// Extract client_id from events (BuildStarted is the first event with it)
279366
if client_id.is_none()
280367
&& let Some(DaemonEventVariant::BuildStarted(ref started)) = event.event
@@ -300,25 +387,23 @@ pub async fn run(folder: &str, filter: Option<String>, after_build: Option<Strin
300387

301388
// Run after_build command on successful completion
302389
if is_success
303-
&& let Some(ref cmd_str) = after_build
390+
&& let Some(cmd_str) = after_build
304391
{
305392
cmd::run(cmd_str.clone());
306393
}
307394
}
308395
Err(e) => {
309-
eprintln!("Stream error: {}", e);
310-
// Send disconnect to daemon so it can clean up
396+
eprintln!("Stream error (last event: {}): {}", last_event, e);
397+
// Best-effort disconnect — connection is likely broken
311398
if let Some(id) = client_id {
312399
let _ = notify_client.disconnect(DisconnectRequest { client_id: id }).await;
313400
}
314-
break;
401+
return Ok(SessionExit::StreamError(e.to_string()));
315402
}
316403
}
317404
}
318405
}
319406
}
320-
321-
Ok(())
322407
}
323408

324409
/// Check if a path is a config file (rescript.json) that we're watching.

rewatch/src/daemon.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod work_queue;
99
use std::collections::hash_map::DefaultHasher;
1010
use std::hash::{Hash, Hasher};
1111
use std::path::{Path, PathBuf};
12+
use std::time::Duration;
1213

1314
use anyhow::Result;
1415
use tokio::net::UnixListener;
@@ -111,6 +112,8 @@ pub async fn start(root: PathBuf) -> Result<()> {
111112
};
112113

113114
Server::builder()
115+
.http2_keepalive_interval(Some(Duration::from_secs(10)))
116+
.http2_keepalive_timeout(Some(Duration::from_secs(5)))
114117
.add_service(proto::rescript_daemon_server::RescriptDaemonServer::new(service))
115118
.serve_with_incoming_shutdown(uds_stream, shutdown_monitor(shutdown_rx))
116119
.await?;

rewatch/src/daemon/service.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ fn event_matches_client(event: &DaemonEvent, client_id: u64) -> bool {
140140
Some(Event::FileChanged(_)) => true,
141141
// WatchPaths is targeted at the watch client
142142
Some(Event::WatchPaths(e)) => e.client_id == client_id,
143+
// Heartbeat is targeted at a specific client
144+
Some(Event::Heartbeat(e)) => e.client_id == client_id,
143145
None => false,
144146
}
145147
}
@@ -509,14 +511,30 @@ impl RescriptDaemon for DaemonService {
509511
});
510512
});
511513

512-
while let Some(result) = event_stream.next().await {
513-
match result {
514-
Ok(event) => {
515-
if event_matches_client(&event, client_id) {
516-
yield Ok(event);
514+
// Send periodic heartbeats so broken connections are detected
515+
// even when no build activity is happening.
516+
let mut heartbeat_interval = tokio::time::interval(std::time::Duration::from_secs(30));
517+
heartbeat_interval.tick().await; // consume the immediate first tick
518+
519+
loop {
520+
tokio::select! {
521+
result = event_stream.next() => {
522+
match result {
523+
Some(Ok(event)) => {
524+
if event_matches_client(&event, client_id) {
525+
yield Ok(event);
526+
}
527+
}
528+
Some(Err(_)) => continue,
529+
None => break,
517530
}
518531
}
519-
Err(_) => continue,
532+
_ = heartbeat_interval.tick() => {
533+
yield Ok(DaemonEvent {
534+
timestamp: timestamp(),
535+
event: Some(Event::Heartbeat(super::proto::Heartbeat { client_id })),
536+
});
537+
}
520538
}
521539
}
522540
};

0 commit comments

Comments
 (0)