diff --git a/fuse-pipe/src/client/multiplexer.rs b/fuse-pipe/src/client/multiplexer.rs index 26a120bc..ca0132cd 100644 --- a/fuse-pipe/src/client/multiplexer.rs +++ b/fuse-pipe/src/client/multiplexer.rs @@ -316,7 +316,7 @@ impl Multiplexer { pub fn send_request_no_reply(&self, request: VolumeRequest) { let unique = self.next_id.fetch_add(1, Ordering::Relaxed); - let wire = WireRequest::with_groups(unique, 0, request, Vec::new()); + let wire = WireRequest::with_groups(unique, 0, request, Vec::new()).with_checksum(); let body = match bincode::serialize(&wire) { Ok(b) => b, @@ -540,9 +540,13 @@ fn writer_loop_reconnectable( ); } - // Try to write + // Compute CRC32 and write CRC header + message + // Wire format: [4 bytes: CRC][4 bytes: length][N bytes: body] + let send_crc = crc32fast::hash(&req.data); + let crc_bytes = send_crc.to_be_bytes(); let write_ok = writer_socket - .write_all(&req.data) + .write_all(&crc_bytes) + .and_then(|_| writer_socket.write_all(&req.data)) .and_then(|_| writer_socket.flush()) .is_ok(); @@ -689,8 +693,13 @@ fn do_reconnect( let mut resend_failed = 0; for key in &keys { if let Some(entry) = pending.get(key) { + // Compute CRC32 and write CRC header + message + // Wire format: [4 bytes: CRC][4 bytes: length][N bytes: body] + let send_crc = crc32fast::hash(&entry.data); + let crc_bytes = send_crc.to_be_bytes(); if writer_socket - .write_all(&entry.data) + .write_all(&crc_bytes) + .and_then(|_| writer_socket.write_all(&entry.data)) .and_then(|_| writer_socket.flush()) .is_err() { diff --git a/fuse-pipe/src/protocol/wire.rs b/fuse-pipe/src/protocol/wire.rs index 0e6fe581..fbb60f36 100644 --- a/fuse-pipe/src/protocol/wire.rs +++ b/fuse-pipe/src/protocol/wire.rs @@ -2,6 +2,16 @@ //! //! # Frame Format //! +//! Requests use a CRC-prefixed format for corruption detection: +//! ```text +//! +----------+----------+---------+ +//! | CRC | length | payload | +//! | (4 bytes)| (4 bytes)| (N bytes)| +//! +----------+----------+---------+ +//! ``` +//! - CRC is a big-endian u32 CRC32 checksum over (length + payload) +//! +//! Responses use the original format (no CRC prefix): //! ```text //! +----------+---------+ //! | length | payload | diff --git a/fuse-pipe/src/server/pipelined.rs b/fuse-pipe/src/server/pipelined.rs index 1dd35f75..103f8280 100644 --- a/fuse-pipe/src/server/pipelined.rs +++ b/fuse-pipe/src/server/pipelined.rs @@ -671,9 +671,12 @@ mod tests { let reader_task = tokio::spawn(request_reader(read_half, handler, tx)); + // Wire format: [4 bytes: CRC][4 bytes: length][N bytes: body] // 16-byte invalid bincode payload. let bad_payload = [0xffu8; 16]; let bad_len = (bad_payload.len() as u32).to_be_bytes(); + let dummy_crc = 0u32.to_be_bytes(); + client.write_all(&dummy_crc).await.unwrap(); client.write_all(&bad_len).await.unwrap(); client.write_all(&bad_payload).await.unwrap();