-
Notifications
You must be signed in to change notification settings - Fork 0
phase 10: lock-handle abstraction (Arc<Mutex/RwLock<_>> → trait handles) #86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,17 +51,19 @@ | |
|
|
||
| use crate::{ | ||
| UDP_BUFFER_SIZE, | ||
| e2e::{E2ECheckStatus, E2EKey, E2ERegistry}, | ||
| e2e::{E2ECheckStatus, E2EKey}, | ||
| protocol::{Message, MessageView, sd}, | ||
| traits::{PayloadWireFormat, WireFormat}, | ||
| transport::{ReceivedDatagram, SocketOptions, Spawner, TransportFactory, TransportSocket}, | ||
| transport::{ | ||
| E2ERegistryHandle, ReceivedDatagram, SocketOptions, Spawner, TransportFactory, | ||
| TransportSocket, | ||
| }, | ||
| }; | ||
|
|
||
| use super::error::Error; | ||
| use futures::{FutureExt, pin_mut, select}; | ||
| use std::{ | ||
| net::{Ipv4Addr, SocketAddr, SocketAddrV4}, | ||
| sync::{Arc, Mutex}, | ||
| task::{Context, Poll}, | ||
| }; | ||
| use tokio::sync::mpsc; | ||
|
|
@@ -151,9 +153,9 @@ where | |
| /// socket through the `_with_transport` variant so the `Spawner` | ||
| /// trait can be exercised end-to-end. | ||
| #[cfg(test)] | ||
| pub async fn bind_discovery_seeded( | ||
| pub async fn bind_discovery_seeded<R: E2ERegistryHandle>( | ||
| interface: Ipv4Addr, | ||
| e2e_registry: Arc<Mutex<E2ERegistry>>, | ||
| e2e_registry: R, | ||
| session_id: u16, | ||
| session_has_wrapped: bool, | ||
| multicast_loopback: bool, | ||
|
|
@@ -200,18 +202,19 @@ where | |
| /// build a small orchestrator directly on top of `protocol`, `e2e`, | ||
| /// and the `transport` traits — the `bare_metal` example workspace | ||
| /// member demonstrates the trait layer in isolation. | ||
| pub async fn bind_discovery_seeded_with_transport<F, S>( | ||
| pub async fn bind_discovery_seeded_with_transport<F, S, R>( | ||
| factory: &F, | ||
| spawner: &S, | ||
| interface: Ipv4Addr, | ||
| e2e_registry: Arc<Mutex<E2ERegistry>>, | ||
| e2e_registry: R, | ||
| session_id: u16, | ||
| session_has_wrapped: bool, | ||
| multicast_loopback: bool, | ||
| ) -> Result<Self, Error> | ||
| where | ||
| F: TransportFactory<Socket = crate::tokio_transport::TokioSocket>, | ||
| S: Spawner, | ||
| R: E2ERegistryHandle, | ||
| { | ||
| let (rx_tx, rx_rx) = mpsc::channel(16); | ||
| let (tx_tx, tx_rx) = mpsc::channel(16); | ||
|
|
@@ -259,7 +262,7 @@ where | |
| /// socket through the `_with_transport` variant so the `Spawner` | ||
| /// trait can be exercised end-to-end. | ||
| #[cfg(test)] | ||
| pub async fn bind(port: u16, e2e_registry: Arc<Mutex<E2ERegistry>>) -> Result<Self, Error> { | ||
| pub async fn bind<R: E2ERegistryHandle>(port: u16, e2e_registry: R) -> Result<Self, Error> { | ||
| use crate::tokio_transport::{TokioSpawner, TokioTransport}; | ||
| Self::bind_with_transport(&TokioTransport, &TokioSpawner, port, e2e_registry).await | ||
| } | ||
|
|
@@ -269,15 +272,16 @@ where | |
| /// socket's I/O loop through a caller-supplied [`Spawner`]. See | ||
| /// [`Self::bind_discovery_seeded_with_transport`] for the factory | ||
| /// bound rationale. | ||
| pub async fn bind_with_transport<F, S>( | ||
| pub async fn bind_with_transport<F, S, R>( | ||
| factory: &F, | ||
| spawner: &S, | ||
| port: u16, | ||
| e2e_registry: Arc<Mutex<E2ERegistry>>, | ||
| e2e_registry: R, | ||
| ) -> Result<Self, Error> | ||
| where | ||
| F: TransportFactory<Socket = crate::tokio_transport::TokioSocket>, | ||
| S: Spawner, | ||
| R: E2ERegistryHandle, | ||
| { | ||
| let (rx_tx, rx_rx) = mpsc::channel(4); | ||
| let (tx_tx, tx_rx) = mpsc::channel(4); | ||
|
|
@@ -394,11 +398,11 @@ where | |
| /// return-type notation to express `Send` bounds on the trait's | ||
| /// RPITIT methods — still nightly as of this writing. | ||
| #[allow(clippy::too_many_lines)] | ||
| async fn socket_loop_future( | ||
| async fn socket_loop_future<R: E2ERegistryHandle>( | ||
| socket: crate::tokio_transport::TokioSocket, | ||
| rx_tx: mpsc::Sender<Result<ReceivedMessage<MessageDefinitions>, Error>>, | ||
| mut tx_rx: mpsc::Receiver<SendMessage<MessageDefinitions>>, | ||
| e2e_registry: Arc<Mutex<E2ERegistry>>, | ||
| e2e_registry: R, | ||
| ) { | ||
| // Maximum number of consecutive `recv_from` errors tolerated before | ||
| // the socket loop gives up. A single failure (transient I/O, peer | ||
|
|
@@ -458,12 +462,11 @@ where | |
| { | ||
| let key = | ||
| E2EKey::from_message_id(send_message.message.header().message_id()); | ||
| let mut registry = e2e_registry.lock().expect("e2e registry lock poisoned"); | ||
| if registry.contains_key(&key) { | ||
| if e2e_registry.contains_key(&key) { | ||
| let upper_header: [u8; 8] = | ||
| buf[8..16].try_into().expect("upper header slice"); | ||
| let mut protected = [0u8; UDP_BUFFER_SIZE]; | ||
| let result = registry.protect( | ||
| let result = e2e_registry.protect( | ||
| key, | ||
| &buf[16..message_length], | ||
| upper_header, | ||
|
Comment on lines
468
to
472
|
||
|
|
@@ -553,14 +556,11 @@ where | |
| let payload_bytes = view.payload_bytes(); | ||
|
|
||
| // Apply E2E check if configured | ||
| let (e2e_status, effective_payload) = { | ||
| let mut registry = | ||
| e2e_registry.lock().expect("e2e registry lock poisoned"); | ||
| match registry.check(key, payload_bytes, upper_header) { | ||
| let (e2e_status, effective_payload) = | ||
| match e2e_registry.check(key, payload_bytes, upper_header) { | ||
| Some((status, stripped)) => (Some(status), stripped), | ||
| None => (None, payload_bytes), | ||
| } | ||
| }; | ||
| }; | ||
|
|
||
| let payload = MessageDefinitions::from_payload_bytes( | ||
| header.message_id(), | ||
|
|
@@ -607,9 +607,11 @@ where | |
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::e2e::E2ERegistry; | ||
| use crate::protocol::sd::test_support::{TestPayload, empty_sd_header}; | ||
| use crate::tokio_transport::TokioSpawner; | ||
| use std::format; | ||
| use std::sync::{Arc, Mutex}; | ||
| use std::vec; | ||
| // Tests build ad-hoc UDP peers via tokio directly; this is not part of | ||
| // the production code path, which goes through the `TransportSocket` | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
# Panicssection says this panics if the E2E registry mutex is poisoned, butClientis now generic overR: E2ERegistryHandleand no longer directly uses a mutex here. This doc is now inaccurate for non-mutex handle implementations.Suggested fix: either remove the
# Panicssection, or reword it to something like “May panic if the underlying handle implementation panics (e.g. a poisoned mutex in the std implementation).”