From 8b4cb783d4c0f973efadcfa54aa5a0c884bef8e1 Mon Sep 17 00:00:00 2001 From: Sam 'Gyro' C Date: Wed, 29 Jun 2022 15:08:35 -0400 Subject: [PATCH 1/3] Change LengthTag to u64 to fix alignment issue. --- sequencer_common/src/lib.rs | 2 +- sequencer_server/src/main.rs | 10 +++++++++- sequencer_server/src/record.rs | 4 ++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sequencer_common/src/lib.rs b/sequencer_common/src/lib.rs index 417b501..ecfadee 100644 --- a/sequencer_common/src/lib.rs +++ b/sequencer_common/src/lib.rs @@ -13,7 +13,7 @@ pub type EpochId = u16; pub type InstanceId = u16; pub type ClusterId = u16; -pub type LengthTag = u16; +pub type LengthTag = u64; /// Messages going into the sequencer server and /// being sent out by the sequencer diff --git a/sequencer_server/src/main.rs b/sequencer_server/src/main.rs index 8ca88da..af76461 100644 --- a/sequencer_server/src/main.rs +++ b/sequencer_server/src/main.rs @@ -1139,6 +1139,8 @@ mod test { } #[test] fn end_to_end_no_appid() { + use rkyv::Deserialize; + let _guard = IO_TEST_PERMISSIONS.lock(); CURRENT_OFFSET.store(0, atomic::Ordering::Relaxed); const NUM_TESTS: usize = 32; @@ -1183,7 +1185,7 @@ mod test { if num_messages_received >= NUM_TESTS { break; } - let read_resl = tcp_stream.read_u16_le().await; + let read_resl = tcp_stream.read_u64_le().await; let length_tag = read_resl.unwrap(); if length_tag != 0 { let mut message_buf = vec![0u8; length_tag as usize]; @@ -1223,6 +1225,12 @@ mod test { for (i, message) in all_messages.iter().enumerate() { assert_eq!(message, running_tally.get(i).unwrap()); } + + for message in all_messages.iter() { + let mut deserializer = rkyv::de::deserializers::SharedDeserializeMap::default(); + let checked = rkyv::check_archived_root::(message).unwrap(); + let _deserialized: SequencerMessage = checked.deserialize(&mut deserializer).unwrap(); + } //Clean up std::fs::remove_file(path).unwrap(); diff --git a/sequencer_server/src/record.rs b/sequencer_server/src/record.rs index 5bba3a5..f27881a 100644 --- a/sequencer_server/src/record.rs +++ b/sequencer_server/src/record.rs @@ -380,7 +380,7 @@ impl MemMapRecordReader { if app_ids.is_empty() { // No whitelist, match all. // Length tag - writer.write_u16_le(message.len() as u16).await?; + writer.write_u64_le(message.len() as LengthTag).await?; // Write the message. writer.write_all(message).await?; } @@ -393,7 +393,7 @@ impl MemMapRecordReader { }; if app_ids.contains(&data.app_id) { // Length tag - writer.write_u16_le(message.len() as u16).await?; + writer.write_u64_le(message.len() as LengthTag).await?; // Write the message. writer.write_all(message).await?; } From 7cc42d1c1273cd93ba04127a3a1d19b5370eaca5 Mon Sep 17 00:00:00 2001 From: Sam 'Gyro' C Date: Thu, 30 Jun 2022 13:49:10 -0400 Subject: [PATCH 2/3] Rework threading. --- sequencer_server/src/main.rs | 120 +++++++++++++++++++-------------- sequencer_server/src/record.rs | 30 ++------- 2 files changed, 78 insertions(+), 72 deletions(-) diff --git a/sequencer_server/src/main.rs b/sequencer_server/src/main.rs index af76461..5425166 100644 --- a/sequencer_server/src/main.rs +++ b/sequencer_server/src/main.rs @@ -7,7 +7,7 @@ use rkyv::{ Archive, }; use sequencer_common::{AppId, ArchivedSequencerMessage, SequencerMessage, EpochId}; -use tokio::{sync::broadcast::Receiver, io::{AsyncReadExt}}; +use tokio::sync::broadcast::Receiver; use std::{ collections::HashMap, fmt::Display, @@ -351,27 +351,7 @@ fn build_async_runtime() -> io::Result { runtime_builder.build() } -fn offset_watcher_thread(channel_capacity: usize, update_interval: Duration) -> Receiver { - // Intended to notify all Client-Service processes, which will likely be - let (sender, receiver) = tokio::sync::broadcast::channel(channel_capacity); - //Prevent spurious errors on shutdown. - let _receiver_keepalive = receiver.resubscribe(); - std::thread::spawn(move || { - let _receiver_keepalive = _receiver_keepalive; - let mut previous_offset: u64 = CURRENT_OFFSET.load(atomic::Ordering::Relaxed); - loop { - std::thread::sleep(update_interval); - let new_offset: u64 = CURRENT_OFFSET.load(atomic::Ordering::Relaxed); - if new_offset > previous_offset { - sender.send(new_offset-previous_offset).unwrap(); - previous_offset = new_offset; - } - } - }); - receiver -} - -async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String, tcp_port: u16, path: PathBuf, offset_change_receiver: Receiver) { +async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String, tcp_port: u16, path: PathBuf, _shutdown_receiver: Receiver<()>) { let address = match IpAddr::from_str(&un_parsed_address) { Ok(addr) => SocketAddr::from((addr, tcp_port)), Err(_parse_err) => match tokio::net::lookup_host(&un_parsed_address).await { @@ -385,32 +365,40 @@ async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String }; let listener = tokio::net::TcpListener::bind(address).await.unwrap(); - //Make sure the closure captures it before borrowing it off into another closure. - let offset_change_receiver = offset_change_receiver; + let reader_path = path.clone(); + loop { let (mut socket, _peer_addr) = listener.accept().await.unwrap(); - let mut local_change_receiver = offset_change_receiver.resubscribe(); let reader_path = reader_path.clone(); + + #[cfg(test)] + let mut shutdown_receiver = _shutdown_receiver.resubscribe(); + tokio::spawn(async move { - let mut read_buf = [0u8; 4096]; - let (mut reader, mut writer) = socket.split(); + //let mut read_buf = [0u8; 4096]; + let (_reader, mut writer) = socket.split(); let mut _subscribed_app_ids: Vec = Vec::default(); + let mut prev_offset = initial_offset; let mut last_sent_offset = initial_offset; - loop { - tokio::select! { - _new_bytes = local_change_receiver.recv() => { - let current_offset = CURRENT_OFFSET.load(atomic::Ordering::Relaxed); - // Time does not go backwards - assert!(current_offset > last_sent_offset); - let mut file_reader = MemMapRecordReader::new(&reader_path, last_sent_offset).unwrap(); - file_reader.read_and_push_to(&_subscribed_app_ids, &mut writer).await.unwrap(); - last_sent_offset = file_reader.most_recent_offset_visited(); - } - _len_read = reader.read(&mut read_buf) => { - //Todo: Read inbound messages + loop { + #[cfg(test)] { + if let Ok(()) = shutdown_receiver.try_recv() { + break; } } + + let current_offset = CURRENT_OFFSET.load(atomic::Ordering::Relaxed); + + // Spinlock on offset changes. + if prev_offset < current_offset { + let mut file_reader = MemMapRecordReader::new(&reader_path, last_sent_offset).unwrap(); + file_reader.read_and_push_to(&mut writer).await.unwrap(); + last_sent_offset = file_reader.most_recent_offset_visited(); + prev_offset = current_offset; + } + // Since we are spinlocking (not a typical async behavior), ensure we do not starve the scheduler. + tokio::task::yield_now().await; } }); } @@ -446,14 +434,16 @@ fn main() -> std::io::Result<()> { let initial_offset = record.initial_offset(); CURRENT_OFFSET.store(initial_offset, atomic::Ordering::Relaxed); - let offset_change_receiver = offset_watcher_thread(4096, Duration::from_millis(50)); + //let offset_change_receiver = offset_watcher_thread(4096, Duration::from_millis(50)); let tcp_addr = args.address.clone(); let tcp_port = args.port; let reader_path = path.clone(); - runtime.spawn(run_client_service_tasks(initial_offset, tcp_addr, tcp_port, reader_path, offset_change_receiver)); + let (__unused, shutdown_receiver) = tokio::sync::broadcast::channel(3); + + runtime.spawn(run_client_service_tasks(initial_offset, tcp_addr, tcp_port, reader_path, shutdown_receiver)); // Attempt to give the client service tasks thread a moment to spin up before there's any possibility of incrementing the global offset. std::thread::sleep(Duration::from_millis(5)); @@ -481,7 +471,7 @@ mod test { use super::*; use lazy_static::lazy_static; use sequencer_common::LengthTag; - use tokio::{sync::broadcast::{Receiver, Sender}, net::TcpStream}; + use tokio::{sync::broadcast::{Receiver, Sender}, net::TcpStream, io::AsyncReadExt}; // Any localhost-related network tests will interfere with eachother if you use // the cargo test command, which is multithreaded by default. @@ -969,6 +959,34 @@ mod test { } impl std::error::Error for TestError {} + + fn offset_watcher_thread(channel_capacity: usize, update_interval: Duration) -> Receiver { + // Intended to notify all Client-Service processes, which will likely be + let (sender, receiver) = tokio::sync::broadcast::channel(channel_capacity); + //Prevent spurious errors on shutdown. + let _receiver_keepalive = receiver.resubscribe(); + std::thread::spawn(move || { + let _receiver_keepalive = _receiver_keepalive; + let mut previous_offset: u64 = CURRENT_OFFSET.load(atomic::Ordering::Relaxed); + loop { + std::thread::sleep(update_interval); + let new_offset: u64 = CURRENT_OFFSET.load(atomic::Ordering::Relaxed); + if new_offset > previous_offset { + sender.send(new_offset-previous_offset).unwrap(); + previous_offset = new_offset; + } + + #[cfg(test)] + { + if (new_offset != previous_offset) && (new_offset == 0) { + previous_offset = 0; + } + } + } + }); + receiver + } + #[test] pub fn test_offset_counter_consistency() { let _guard = IO_TEST_PERMISSIONS.lock(); @@ -1137,8 +1155,9 @@ mod test { assert_eq!(record.inner.len() as u64, offset); } + #[test] - fn end_to_end_no_appid() { + fn end_to_end_client_service() { use rkyv::Deserialize; let _guard = IO_TEST_PERMISSIONS.lock(); @@ -1161,17 +1180,16 @@ mod test { let record = record::MemMapBackend::init(&path, 0).unwrap(); // Determine current offset. let initial_offset = record.initial_offset(); - CURRENT_OFFSET.store(initial_offset, atomic::Ordering::Relaxed); - - let offset_change_receiver = offset_watcher_thread(4096, Duration::from_millis(50)); - + CURRENT_OFFSET.store(initial_offset, atomic::Ordering::Relaxed); let tcp_server_addr = IpAddr::from(Ipv6Addr::LOCALHOST); let tcp_server_port = 9999; let reader_path = path.clone(); - runtime.spawn(run_client_service_tasks(initial_offset, format!("{}", tcp_server_addr), tcp_server_port, reader_path.to_path_buf(), offset_change_receiver)); + let (shutdown_sender, shutdown_receiver) = tokio::sync::broadcast::channel(24); + + let client_service_join_handle = runtime.spawn(run_client_service_tasks(initial_offset, format!("{}", tcp_server_addr), tcp_server_port, reader_path.to_path_buf(), shutdown_receiver)); // Attempt to give the client service tasks thread a moment to spin up before there's any possibility of incrementing the global offset. std::thread::sleep(Duration::from_millis(5)); @@ -1232,9 +1250,13 @@ mod test { let _deserialized: SequencerMessage = checked.deserialize(&mut deserializer).unwrap(); } + println!("Cleaning up after the test."); + shutdown_sender.send(()).unwrap(); + std::thread::sleep(Duration::from_millis(10)); + client_service_join_handle.abort(); //Clean up std::fs::remove_file(path).unwrap(); std::fs::remove_dir(Path::new("test_output/")).unwrap(); CURRENT_OFFSET.store(0, atomic::Ordering::Relaxed); } -} +} \ No newline at end of file diff --git a/sequencer_server/src/record.rs b/sequencer_server/src/record.rs index f27881a..886dffc 100644 --- a/sequencer_server/src/record.rs +++ b/sequencer_server/src/record.rs @@ -5,7 +5,7 @@ use memmap2::{MmapMut, MmapOptions, Mmap}; //use rkyv::validation::CheckArchiveError; -use sequencer_common::{LengthTag, AppId, SequencerMessage}; +use sequencer_common::LengthTag; use tokio::io::{AsyncWrite, AsyncWriteExt}; use std::{ @@ -372,32 +372,16 @@ impl MemMapRecordReader { /// /// This method should be fully zero-copy, end-to-end, unless the OS performs its own copy when we /// push to the socket. - pub async fn read_and_push_to(&mut self, app_ids: &[AppId], writer: &mut W) -> Result { + pub async fn read_and_push_to(&mut self, writer: &mut W) -> Result { let mut reader = RecordReader::new(&self.mmap.as_ref()[self.last_read_offset as usize ..]); while let Some(maybe_message) = reader.next() { let message = maybe_message?; - if app_ids.is_empty() { - // No whitelist, match all. - // Length tag - writer.write_u64_le(message.len() as LengthTag).await?; - // Write the message. - writer.write_all(message).await?; - } - else { - rkyv::check_archived_root::(message) - .map_err(|e| MmapReadError::CheckArchive(format!("{:?}", e)))?; - - let data = unsafe { - rkyv::archived_root::(message) - }; - if app_ids.contains(&data.app_id) { - // Length tag - writer.write_u64_le(message.len() as LengthTag).await?; - // Write the message. - writer.write_all(message).await?; - } - } + // No whitelist, match all. + // Length tag + writer.write_u64_le(message.len() as LengthTag).await?; + // Write the message. + writer.write_all(message).await?; } let amt_written = reader.cursor as u64 - self.last_read_offset; self.last_read_offset += reader.cursor as u64; From ea9c476b3d0dc781fd68cfd0be15beaf8e99c25d Mon Sep 17 00:00:00 2001 From: Sam 'Gyro' C Date: Thu, 30 Jun 2022 13:55:39 -0400 Subject: [PATCH 3/3] Remove spurious 'shutdown' channel - 'drop(runtime);' also prevents a deadlock. --- sequencer_server/src/main.rs | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/sequencer_server/src/main.rs b/sequencer_server/src/main.rs index 5425166..b4a9e08 100644 --- a/sequencer_server/src/main.rs +++ b/sequencer_server/src/main.rs @@ -7,7 +7,6 @@ use rkyv::{ Archive, }; use sequencer_common::{AppId, ArchivedSequencerMessage, SequencerMessage, EpochId}; -use tokio::sync::broadcast::Receiver; use std::{ collections::HashMap, fmt::Display, @@ -351,7 +350,7 @@ fn build_async_runtime() -> io::Result { runtime_builder.build() } -async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String, tcp_port: u16, path: PathBuf, _shutdown_receiver: Receiver<()>) { +async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String, tcp_port: u16, path: PathBuf) { let address = match IpAddr::from_str(&un_parsed_address) { Ok(addr) => SocketAddr::from((addr, tcp_port)), Err(_parse_err) => match tokio::net::lookup_host(&un_parsed_address).await { @@ -372,9 +371,6 @@ async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String let (mut socket, _peer_addr) = listener.accept().await.unwrap(); let reader_path = reader_path.clone(); - #[cfg(test)] - let mut shutdown_receiver = _shutdown_receiver.resubscribe(); - tokio::spawn(async move { //let mut read_buf = [0u8; 4096]; let (_reader, mut writer) = socket.split(); @@ -382,11 +378,6 @@ async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String let mut prev_offset = initial_offset; let mut last_sent_offset = initial_offset; loop { - #[cfg(test)] { - if let Ok(()) = shutdown_receiver.try_recv() { - break; - } - } let current_offset = CURRENT_OFFSET.load(atomic::Ordering::Relaxed); @@ -441,9 +432,7 @@ fn main() -> std::io::Result<()> { let reader_path = path.clone(); - let (__unused, shutdown_receiver) = tokio::sync::broadcast::channel(3); - - runtime.spawn(run_client_service_tasks(initial_offset, tcp_addr, tcp_port, reader_path, shutdown_receiver)); + runtime.spawn(run_client_service_tasks(initial_offset, tcp_addr, tcp_port, reader_path)); // Attempt to give the client service tasks thread a moment to spin up before there's any possibility of incrementing the global offset. std::thread::sleep(Duration::from_millis(5)); @@ -1187,9 +1176,7 @@ mod test { let reader_path = path.clone(); - let (shutdown_sender, shutdown_receiver) = tokio::sync::broadcast::channel(24); - - let client_service_join_handle = runtime.spawn(run_client_service_tasks(initial_offset, format!("{}", tcp_server_addr), tcp_server_port, reader_path.to_path_buf(), shutdown_receiver)); + let client_service_join_handle = runtime.spawn(run_client_service_tasks(initial_offset, format!("{}", tcp_server_addr), tcp_server_port, reader_path.to_path_buf())); // Attempt to give the client service tasks thread a moment to spin up before there's any possibility of incrementing the global offset. std::thread::sleep(Duration::from_millis(5)); @@ -1251,10 +1238,10 @@ mod test { } println!("Cleaning up after the test."); - shutdown_sender.send(()).unwrap(); + //Clean up std::thread::sleep(Duration::from_millis(10)); client_service_join_handle.abort(); - //Clean up + drop(runtime); std::fs::remove_file(path).unwrap(); std::fs::remove_dir(Path::new("test_output/")).unwrap(); CURRENT_OFFSET.store(0, atomic::Ordering::Relaxed);