Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sequencer_common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub type EpochId = u16;
pub type InstanceId = u16;
pub type ClusterId = u16;

pub type LengthTag = u16;
pub type LengthTag = u64;

/// Messages going into the sequencer server and
/// being sent out by the sequencer
Expand Down
119 changes: 68 additions & 51 deletions sequencer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use rkyv::{
Archive,
};
use sequencer_common::{AppId, ArchivedSequencerMessage, SequencerMessage, EpochId};
use tokio::{sync::broadcast::Receiver, io::{AsyncReadExt}};
use std::{
collections::HashMap,
fmt::Display,
Expand Down Expand Up @@ -351,27 +350,7 @@ fn build_async_runtime() -> io::Result<tokio::runtime::Runtime> {
runtime_builder.build()
}

fn offset_watcher_thread(channel_capacity: usize, update_interval: Duration) -> Receiver<u64> {
// Intended to notify all Client-Service processes, which will likely be
let (sender, receiver) = tokio::sync::broadcast::channel(channel_capacity);
//Prevent spurious errors on shutdown.
let _receiver_keepalive = receiver.resubscribe();
std::thread::spawn(move || {
let _receiver_keepalive = _receiver_keepalive;
let mut previous_offset: u64 = CURRENT_OFFSET.load(atomic::Ordering::Relaxed);
loop {
std::thread::sleep(update_interval);
let new_offset: u64 = CURRENT_OFFSET.load(atomic::Ordering::Relaxed);
if new_offset > previous_offset {
sender.send(new_offset-previous_offset).unwrap();
previous_offset = new_offset;
}
}
});
receiver
}

async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String, tcp_port: u16, path: PathBuf, offset_change_receiver: Receiver<u64>) {
async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String, tcp_port: u16, path: PathBuf) {
let address = match IpAddr::from_str(&un_parsed_address) {
Ok(addr) => SocketAddr::from((addr, tcp_port)),
Err(_parse_err) => match tokio::net::lookup_host(&un_parsed_address).await {
Expand All @@ -385,32 +364,32 @@ async fn run_client_service_tasks(initial_offset: u64, un_parsed_address: String
};

let listener = tokio::net::TcpListener::bind(address).await.unwrap();
//Make sure the closure captures it before borrowing it off into another closure.
let offset_change_receiver = offset_change_receiver;

let reader_path = path.clone();

loop {
let (mut socket, _peer_addr) = listener.accept().await.unwrap();
let mut local_change_receiver = offset_change_receiver.resubscribe();
let reader_path = reader_path.clone();

tokio::spawn(async move {
let mut read_buf = [0u8; 4096];
let (mut reader, mut writer) = socket.split();
//let mut read_buf = [0u8; 4096];
let (_reader, mut writer) = socket.split();
let mut _subscribed_app_ids: Vec<AppId> = Vec::default();
let mut prev_offset = initial_offset;
let mut last_sent_offset = initial_offset;
loop {
tokio::select! {
_new_bytes = local_change_receiver.recv() => {
let current_offset = CURRENT_OFFSET.load(atomic::Ordering::Relaxed);
// Time does not go backwards
assert!(current_offset > last_sent_offset);
let mut file_reader = MemMapRecordReader::new(&reader_path, last_sent_offset).unwrap();
file_reader.read_and_push_to(&_subscribed_app_ids, &mut writer).await.unwrap();
last_sent_offset = file_reader.most_recent_offset_visited();
}
_len_read = reader.read(&mut read_buf) => {
//Todo: Read inbound messages
}
loop {

let current_offset = CURRENT_OFFSET.load(atomic::Ordering::Relaxed);

// Spinlock on offset changes.
if prev_offset < current_offset {
let mut file_reader = MemMapRecordReader::new(&reader_path, last_sent_offset).unwrap();
file_reader.read_and_push_to(&mut writer).await.unwrap();
last_sent_offset = file_reader.most_recent_offset_visited();
prev_offset = current_offset;
}
// Since we are spinlocking (not a typical async behavior), ensure we do not starve the scheduler.
tokio::task::yield_now().await;
}
});
}
Expand Down Expand Up @@ -446,14 +425,14 @@ fn main() -> std::io::Result<()> {
let initial_offset = record.initial_offset();
CURRENT_OFFSET.store(initial_offset, atomic::Ordering::Relaxed);

let offset_change_receiver = offset_watcher_thread(4096, Duration::from_millis(50));
//let offset_change_receiver = offset_watcher_thread(4096, Duration::from_millis(50));

let tcp_addr = args.address.clone();
let tcp_port = args.port;

let reader_path = path.clone();

runtime.spawn(run_client_service_tasks(initial_offset, tcp_addr, tcp_port, reader_path, offset_change_receiver));
runtime.spawn(run_client_service_tasks(initial_offset, tcp_addr, tcp_port, reader_path));

// Attempt to give the client service tasks thread a moment to spin up before there's any possibility of incrementing the global offset.
std::thread::sleep(Duration::from_millis(5));
Expand Down Expand Up @@ -481,7 +460,7 @@ mod test {
use super::*;
use lazy_static::lazy_static;
use sequencer_common::LengthTag;
use tokio::{sync::broadcast::{Receiver, Sender}, net::TcpStream};
use tokio::{sync::broadcast::{Receiver, Sender}, net::TcpStream, io::AsyncReadExt};

// Any localhost-related network tests will interfere with eachother if you use
// the cargo test command, which is multithreaded by default.
Expand Down Expand Up @@ -969,6 +948,34 @@ mod test {
}
impl std::error::Error for TestError {}


fn offset_watcher_thread(channel_capacity: usize, update_interval: Duration) -> Receiver<u64> {
// Intended to notify all Client-Service processes, which will likely be
let (sender, receiver) = tokio::sync::broadcast::channel(channel_capacity);
//Prevent spurious errors on shutdown.
let _receiver_keepalive = receiver.resubscribe();
std::thread::spawn(move || {
let _receiver_keepalive = _receiver_keepalive;
let mut previous_offset: u64 = CURRENT_OFFSET.load(atomic::Ordering::Relaxed);
loop {
std::thread::sleep(update_interval);
let new_offset: u64 = CURRENT_OFFSET.load(atomic::Ordering::Relaxed);
if new_offset > previous_offset {
sender.send(new_offset-previous_offset).unwrap();
previous_offset = new_offset;
}

#[cfg(test)]
{
if (new_offset != previous_offset) && (new_offset == 0) {
previous_offset = 0;
}
}
}
});
receiver
}

#[test]
pub fn test_offset_counter_consistency() {
let _guard = IO_TEST_PERMISSIONS.lock();
Expand Down Expand Up @@ -1137,8 +1144,11 @@ mod test {

assert_eq!(record.inner.len() as u64, offset);
}

#[test]
fn end_to_end_no_appid() {
fn end_to_end_client_service() {
use rkyv::Deserialize;

let _guard = IO_TEST_PERMISSIONS.lock();
CURRENT_OFFSET.store(0, atomic::Ordering::Relaxed);
const NUM_TESTS: usize = 32;
Expand All @@ -1159,17 +1169,14 @@ mod test {
let record = record::MemMapBackend::init(&path, 0).unwrap();
// Determine current offset.
let initial_offset = record.initial_offset();
CURRENT_OFFSET.store(initial_offset, atomic::Ordering::Relaxed);

let offset_change_receiver = offset_watcher_thread(4096, Duration::from_millis(50));

CURRENT_OFFSET.store(initial_offset, atomic::Ordering::Relaxed);

let tcp_server_addr = IpAddr::from(Ipv6Addr::LOCALHOST);
let tcp_server_port = 9999;

let reader_path = path.clone();

runtime.spawn(run_client_service_tasks(initial_offset, format!("{}", tcp_server_addr), tcp_server_port, reader_path.to_path_buf(), offset_change_receiver));
let client_service_join_handle = runtime.spawn(run_client_service_tasks(initial_offset, format!("{}", tcp_server_addr), tcp_server_port, reader_path.to_path_buf()));

// Attempt to give the client service tasks thread a moment to spin up before there's any possibility of incrementing the global offset.
std::thread::sleep(Duration::from_millis(5));
Expand All @@ -1183,7 +1190,7 @@ mod test {
if num_messages_received >= NUM_TESTS {
break;
}
let read_resl = tcp_stream.read_u16_le().await;
let read_resl = tcp_stream.read_u64_le().await;
let length_tag = read_resl.unwrap();
if length_tag != 0 {
let mut message_buf = vec![0u8; length_tag as usize];
Expand Down Expand Up @@ -1223,10 +1230,20 @@ mod test {
for (i, message) in all_messages.iter().enumerate() {
assert_eq!(message, running_tally.get(i).unwrap());
}

for message in all_messages.iter() {
let mut deserializer = rkyv::de::deserializers::SharedDeserializeMap::default();
let checked = rkyv::check_archived_root::<SequencerMessage>(message).unwrap();
let _deserialized: SequencerMessage = checked.deserialize(&mut deserializer).unwrap();
}

println!("Cleaning up after the test.");
//Clean up
std::thread::sleep(Duration::from_millis(10));
client_service_join_handle.abort();
drop(runtime);
std::fs::remove_file(path).unwrap();
std::fs::remove_dir(Path::new("test_output/")).unwrap();
CURRENT_OFFSET.store(0, atomic::Ordering::Relaxed);
}
}
}
30 changes: 7 additions & 23 deletions sequencer_server/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use memmap2::{MmapMut, MmapOptions, Mmap};

//use rkyv::validation::CheckArchiveError;
use sequencer_common::{LengthTag, AppId, SequencerMessage};
use sequencer_common::LengthTag;
use tokio::io::{AsyncWrite, AsyncWriteExt};

use std::{
Expand Down Expand Up @@ -372,32 +372,16 @@ impl MemMapRecordReader {
///
/// This method should be fully zero-copy, end-to-end, unless the OS performs its own copy when we
/// push to the socket.
pub async fn read_and_push_to<W: AsyncWrite + AsyncWriteExt + Unpin>(&mut self, app_ids: &[AppId], writer: &mut W) -> Result<u64, MmapReadError> {
pub async fn read_and_push_to<W: AsyncWrite + AsyncWriteExt + Unpin>(&mut self, writer: &mut W) -> Result<u64, MmapReadError> {
let mut reader = RecordReader::new(&self.mmap.as_ref()[self.last_read_offset as usize ..]);
while let Some(maybe_message) = reader.next() {
let message = maybe_message?;

if app_ids.is_empty() {
// No whitelist, match all.
// Length tag
writer.write_u16_le(message.len() as u16).await?;
// Write the message.
writer.write_all(message).await?;
}
else {
rkyv::check_archived_root::<SequencerMessage>(message)
.map_err(|e| MmapReadError::CheckArchive(format!("{:?}", e)))?;

let data = unsafe {
rkyv::archived_root::<SequencerMessage>(message)
};
if app_ids.contains(&data.app_id) {
// Length tag
writer.write_u16_le(message.len() as u16).await?;
// Write the message.
writer.write_all(message).await?;
}
}
// No whitelist, match all.
// Length tag
writer.write_u64_le(message.len() as LengthTag).await?;
// Write the message.
writer.write_all(message).await?;
}
let amt_written = reader.cursor as u64 - self.last_read_offset;
self.last_read_offset += reader.cursor as u64;
Expand Down