From 370d3ac1ae1bee05cd064fd8b68e7b8505714d63 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Wed, 22 Apr 2026 11:07:40 -0400 Subject: [PATCH 1/7] Extract SdStateManager from server mod --- src/server/mod.rs | 118 ++++++---------------------------------- src/server/sd_state.rs | 121 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 102 deletions(-) create mode 100644 src/server/sd_state.rs diff --git a/src/server/mod.rs b/src/server/mod.rs index 1532b98..0bb8e14 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -8,6 +8,7 @@ mod error; mod event_publisher; +mod sd_state; mod service_info; mod subscription_manager; @@ -16,13 +17,14 @@ pub use event_publisher::EventPublisher; pub use service_info::{EventGroupInfo, ServiceInfo}; pub use subscription_manager::SubscriptionManager; +use sd_state::SdStateManager; + use crate::e2e::{E2EKey, E2EProfile, E2ERegistry}; use crate::protocol::sd::{self, Entry, Flags, OptionsCount, ServiceEntry, TransportProtocol}; -use core::sync::atomic::Ordering; use std::{ format, net::{IpAddr, Ipv4Addr, SocketAddrV4}, - sync::{Arc, Mutex, atomic::AtomicU16}, + sync::{Arc, Mutex}, vec, vec::Vec, }; @@ -74,8 +76,8 @@ pub struct Server { subscriptions: Arc>, /// Event publisher publisher: Arc, - /// Incrementing session ID for SD messages - sd_session_id: Arc, + /// SD session-ID counter and announcement emitter + sd_state: Arc, /// Shared E2E registry for runtime E2E configuration e2e_registry: Arc>, /// `true` if this server was constructed via [`Server::new_passive`]. @@ -186,7 +188,7 @@ impl Server { sd_socket: Arc::new(sd_socket), subscriptions, publisher, - sd_session_id: Arc::new(AtomicU16::new(1)), + sd_state: Arc::new(SdStateManager::new()), e2e_registry, is_passive: false, }) @@ -255,7 +257,7 @@ impl Server { sd_socket: Arc::new(sd_socket), subscriptions, publisher, - sd_session_id: Arc::new(AtomicU16::new(1)), + sd_state: Arc::new(SdStateManager::new()), e2e_registry, is_passive: true, }) @@ -288,12 +290,12 @@ impl Server { } let config = self.config.clone(); let sd_socket = Arc::clone(&self.sd_socket); - let sd_session_id = Arc::clone(&self.sd_session_id); + let sd_state = Arc::clone(&self.sd_state); tokio::spawn(async move { let mut announcement_count = 0u32; loop { - match Self::send_offer_service(&config, &sd_socket, &sd_session_id).await { + match sd_state.send_offer_service(&config, &sd_socket).await { Ok(()) => { announcement_count += 1; if announcement_count == 1 { @@ -322,80 +324,6 @@ impl Server { Ok(()) } - /// Send an `OfferService` message via Service Discovery - async fn send_offer_service( - config: &ServerConfig, - socket: &UdpSocket, - session_id: &AtomicU16, - ) -> Result<(), Error> { - use crate::protocol::Header as SomeIpHeader; - use crate::traits::WireFormat; - - // Create OfferService entry - let entry = Entry::OfferService(ServiceEntry { - index_first_options_run: 0, - index_second_options_run: 0, - options_count: OptionsCount::new(1, 0), - service_id: config.service_id, - instance_id: config.instance_id, - major_version: config.major_version, - ttl: config.ttl, - minor_version: config.minor_version, - }); - - // Create IPv4 endpoint option - let option = sd::Options::IpV4Endpoint { - ip: config.interface, - port: config.local_port, - protocol: TransportProtocol::Udp, - }; - - let entries = [entry]; - let options = [option]; - let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &options); - - // Encode SD payload - let mut sd_data = Vec::new(); - sd_payload.encode(&mut sd_data)?; - - // Increment session ID (wrapping from 0xFFFF back to 0x0001, skipping 0) - let prev = session_id - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { - let next = v.wrapping_add(1); - Some(if next == 0 { 1 } else { next }) - }) - .unwrap(); - let next = prev.wrapping_add(1); - let sid = u32::from(if next == 0 { 1 } else { next }); - - // Wrap in SOME/IP header for SD (service 0xFFFF, method 0x8100) - let someip_header = SomeIpHeader::new_sd(sid, sd_data.len()); - - // Encode complete SOME/IP-SD message - let mut buffer = Vec::new(); - someip_header.encode(&mut buffer)?; - buffer.extend_from_slice(&sd_data); - - let multicast_addr = SocketAddrV4::new(sd::MULTICAST_IP, sd::MULTICAST_PORT); - - tracing::trace!( - "Sending OfferService: service=0x{:04X}, instance={}, port={}, size={} bytes", - config.service_id, - config.instance_id, - config.local_port, - buffer.len() - ); - tracing::trace!( - "OfferService data: {:02X?}", - &buffer[..buffer.len().min(64)] - ); - - socket.send_to(&buffer, multicast_addr).await?; - tracing::trace!("Sent to {}", multicast_addr); - - Ok(()) - } - /// Send a unicast `OfferService` to a specific address (in response to `FindService`) async fn send_unicast_offer(&self, target: std::net::SocketAddr) -> Result<(), Error> { use crate::protocol::Header as SomeIpHeader; @@ -425,7 +353,7 @@ impl Server { let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; - let sid = self.next_sd_session_id(); + let sid = self.sd_state.next_session_id(); let someip_header = SomeIpHeader::new_sd(sid, sd_data.len()); let mut buffer = Vec::new(); @@ -442,20 +370,6 @@ impl Server { Ok(()) } - /// Get the next SD session ID (`client_id=0`, `session_id` incrementing), skipping 0 - fn next_sd_session_id(&self) -> u32 { - let prev = self - .sd_session_id - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { - let next = v.wrapping_add(1); - Some(if next == 0 { 1 } else { next }) - }) - .unwrap(); - // fetch_update returns the previous value; compute the same next value - let next = prev.wrapping_add(1); - u32::from(if next == 0 { 1 } else { next }) - } - /// Get the event publisher for sending events #[must_use] pub fn publisher(&self) -> Arc { @@ -823,7 +737,7 @@ impl Server { let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; - let sid = self.next_sd_session_id(); + let sid = self.sd_state.next_session_id(); let someip_header = SomeIpHeader::new_sd(sid, sd_data.len()); let mut buffer = Vec::new(); @@ -870,7 +784,7 @@ impl Server { let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; - let sid = self.next_sd_session_id(); + let sid = self.sd_state.next_session_id(); let someip_header = SomeIpHeader::new_sd(sid, sd_data.len()); let mut buffer = Vec::new(); @@ -1513,14 +1427,14 @@ mod tests { let (server, _) = create_test_server(0x5B, 1).await; // Set session ID to 0xFFFE - server.sd_session_id.store(0xFFFE, Ordering::Relaxed); + server.sd_state.store_for_test(0xFFFE); // First call: 0xFFFE -> 0xFFFF, returns 0xFFFF - let sid1 = server.next_sd_session_id(); + let sid1 = server.sd_state.next_session_id(); assert_eq!(sid1, 0xFFFF); // Second call: 0xFFFF -> wraps to 0x0001 (skipping 0), returns 0x0001 - let sid2 = server.next_sd_session_id(); + let sid2 = server.sd_state.next_session_id(); assert_eq!(sid2, 0x0001); } diff --git a/src/server/sd_state.rs b/src/server/sd_state.rs new file mode 100644 index 0000000..784c3fd --- /dev/null +++ b/src/server/sd_state.rs @@ -0,0 +1,121 @@ +//! Service Discovery session-state tracking, decoupled from socket ownership. +//! +//! [`SdStateManager`] owns the session-ID counter used by every outgoing +//! SOME/IP-SD message this server emits (`OfferService` announcements, +//! unicast Offer replies, `SubscribeAck`, `SubscribeNack`). It also builds +//! and sends `OfferService` announcements when given a socket. +//! +//! Keeping this state in its own type prepares the server for upcoming +//! transport abstraction: once `TransportSocket` lands, the `&UdpSocket` +//! parameter on [`SdStateManager::send_offer_service`] becomes the single +//! migration point for the announcement path. + +use core::sync::atomic::{AtomicU16, Ordering}; +use std::{net::SocketAddrV4, vec::Vec}; +use tokio::net::UdpSocket; + +use crate::protocol::sd::{self, Entry, Flags, OptionsCount, ServiceEntry, TransportProtocol}; + +use super::{Error, ServerConfig}; + +/// Tracks the SD session-ID counter and emits `OfferService` announcements. +/// +/// Session IDs increment with each SD message and wrap from `0xFFFF` back +/// to `0x0001` (skipping `0`, which is reserved). +#[derive(Debug)] +pub(super) struct SdStateManager { + session_id: AtomicU16, +} + +impl SdStateManager { + pub(super) const fn new() -> Self { + Self { + session_id: AtomicU16::new(1), + } + } + + /// Advance the counter and return the next SOME/IP-SD session ID + /// (`client_id = 0`, session ID in the low 16 bits). Skips 0 on wrap. + pub(super) fn next_session_id(&self) -> u32 { + let prev = self + .session_id + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { + let next = v.wrapping_add(1); + Some(if next == 0 { 1 } else { next }) + }) + .unwrap(); + let next = prev.wrapping_add(1); + u32::from(if next == 0 { 1 } else { next }) + } + + /// Send a multicast `OfferService` announcement for the given config. + pub(super) async fn send_offer_service( + &self, + config: &ServerConfig, + socket: &UdpSocket, + ) -> Result<(), Error> { + use crate::protocol::Header as SomeIpHeader; + use crate::traits::WireFormat; + + let entry = Entry::OfferService(ServiceEntry { + index_first_options_run: 0, + index_second_options_run: 0, + options_count: OptionsCount::new(1, 0), + service_id: config.service_id, + instance_id: config.instance_id, + major_version: config.major_version, + ttl: config.ttl, + minor_version: config.minor_version, + }); + + let option = sd::Options::IpV4Endpoint { + ip: config.interface, + port: config.local_port, + protocol: TransportProtocol::Udp, + }; + + let entries = [entry]; + let options = [option]; + let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &options); + + let mut sd_data = Vec::new(); + sd_payload.encode(&mut sd_data)?; + + let sid = self.next_session_id(); + let someip_header = SomeIpHeader::new_sd(sid, sd_data.len()); + + let mut buffer = Vec::new(); + someip_header.encode(&mut buffer)?; + buffer.extend_from_slice(&sd_data); + + let multicast_addr = SocketAddrV4::new(sd::MULTICAST_IP, sd::MULTICAST_PORT); + + tracing::trace!( + "Sending OfferService: service=0x{:04X}, instance={}, port={}, size={} bytes", + config.service_id, + config.instance_id, + config.local_port, + buffer.len() + ); + tracing::trace!( + "OfferService data: {:02X?}", + &buffer[..buffer.len().min(64)] + ); + + socket.send_to(&buffer, multicast_addr).await?; + tracing::trace!("Sent to {}", multicast_addr); + + Ok(()) + } + + #[cfg(test)] + pub(super) fn store_for_test(&self, v: u16) { + self.session_id.store(v, Ordering::Relaxed); + } +} + +impl Default for SdStateManager { + fn default() -> Self { + Self::new() + } +} From 9c73ab8e2a5ee2e318521712faa158900f10782f Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Wed, 22 Apr 2026 11:18:56 -0400 Subject: [PATCH 2/7] Shifted tests around, added a test, removed dead impl Default --- src/server/mod.rs | 16 ---------------- src/server/sd_state.rs | 35 +++++++++++++++++++++++++++-------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 0bb8e14..a3fc92c 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1422,22 +1422,6 @@ mod tests { server_handle.abort(); } - #[tokio::test] - async fn test_next_sd_session_id_wraps() { - let (server, _) = create_test_server(0x5B, 1).await; - - // Set session ID to 0xFFFE - server.sd_state.store_for_test(0xFFFE); - - // First call: 0xFFFE -> 0xFFFF, returns 0xFFFF - let sid1 = server.sd_state.next_session_id(); - assert_eq!(sid1, 0xFFFF); - - // Second call: 0xFFFF -> wraps to 0x0001 (skipping 0), returns 0x0001 - let sid2 = server.sd_state.next_session_id(); - assert_eq!(sid2, 0x0001); - } - #[tokio::test] async fn test_handle_sd_other_entry_type() { let (mut server, _) = create_test_server(0x5B, 1).await; diff --git a/src/server/sd_state.rs b/src/server/sd_state.rs index 784c3fd..3f90f32 100644 --- a/src/server/sd_state.rs +++ b/src/server/sd_state.rs @@ -29,8 +29,15 @@ pub(super) struct SdStateManager { impl SdStateManager { pub(super) const fn new() -> Self { + Self::with_initial(1) + } + + /// Construct with a specific starting session counter. Primarily used by + /// tests to validate wrap behavior; callers in production should use + /// [`Self::new`]. + pub(super) const fn with_initial(initial: u16) -> Self { Self { - session_id: AtomicU16::new(1), + session_id: AtomicU16::new(initial), } } @@ -107,15 +114,27 @@ impl SdStateManager { Ok(()) } +} + +#[cfg(test)] +mod tests { + use super::SdStateManager; - #[cfg(test)] - pub(super) fn store_for_test(&self, v: u16) { - self.session_id.store(v, Ordering::Relaxed); + #[test] + fn next_session_id_wraps_past_ffff_skipping_zero() { + let sd = SdStateManager::with_initial(0xFFFE); + + // 0xFFFE -> 0xFFFF + assert_eq!(sd.next_session_id(), 0xFFFF); + + // 0xFFFF -> wraps to 0x0001 (0 is skipped) + assert_eq!(sd.next_session_id(), 0x0001); } -} -impl Default for SdStateManager { - fn default() -> Self { - Self::new() + #[test] + fn next_session_id_starts_at_two_from_default_new() { + let sd = SdStateManager::new(); + // new() seeds at 1; first next_session_id increments to 2 + assert_eq!(sd.next_session_id(), 2); } } From 4f9f70907f1413176ace8651de5c0b545c8918c0 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Wed, 22 Apr 2026 22:14:58 -0400 Subject: [PATCH 3/7] Add multicast-loopback coverage for SdStateManager::send_offer_service Covers the send path end-to-end (SOME/IP envelope, SD flags, OfferService entry fields, and the IPv4 endpoint option) plus session-id advancement and wrap-through-zero exercised via send_offer_service itself, and a smoke test for Server::start_announcing. All `#[ignore]`d pending the loopback MULTICAST-flag fix on this branch; without that fix, hosts drop the multicast packet silently and the tests time out on recv. Co-Authored-By: Claude Opus 4.7 --- src/server/mod.rs | 75 +++++++++++ src/server/sd_state.rs | 287 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 361 insertions(+), 1 deletion(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index a3fc92c..205e99e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2077,4 +2077,79 @@ mod tests { ); }); } + + /// Smoke test for [`Server::start_announcing`]: a loopback server with + /// `multicast_loop` enabled should emit at least one `OfferService` on + /// the SD multicast group within a couple of seconds. + /// + /// `#[ignore]`d for the same reason as the `sd_state` tests — hosts + /// without the MULTICAST flag on `lo` drop the packet silently. The + /// spawned announcer task keeps running until runtime teardown; that + /// is intentional (there is no stop API on `Server`) and harmless in + /// a `#[tokio::test]`. + #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[tokio::test] + async fn start_announcing_emits_first_offer_within_timeout() { + use crate::protocol::MessageView; + use crate::protocol::sd::EntryType; + + let interface = Ipv4Addr::LOCALHOST; + // Pick a service_id and unicast port that do not collide with + // the other loopback-enabled server test in this file. + let service_id = 0xFE02; + let config = ServerConfig::new(interface, 30684, service_id, 0x43); + + // Receiver joined to the SD multicast group on loopback. + let raw_rx = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + ) + .unwrap(); + raw_rx.set_reuse_address(true).unwrap(); + #[cfg(unix)] + raw_rx.set_reuse_port(true).unwrap(); + raw_rx.set_multicast_loop_v4(true).unwrap(); + raw_rx + .bind(&std::net::SocketAddr::new(IpAddr::V4(interface), sd::MULTICAST_PORT).into()) + .unwrap(); + raw_rx.set_nonblocking(true).unwrap(); + let rx: UdpSocket = UdpSocket::from_std(raw_rx.into()).unwrap(); + rx.join_multicast_v4(sd::MULTICAST_IP, interface).unwrap(); + + let server = Server::new_with_loopback(config, true) + .await + .expect("server must bind with loopback enabled"); + server + .start_announcing() + .expect("start_announcing should succeed on a non-passive server"); + + // Scan the multicast group for our OfferService. The first tick + // happens immediately; 2s is ample headroom for scheduler jitter. + let recv_loop = async { + let mut buf = [0u8; 2048]; + loop { + let (len, _from) = rx.recv_from(&mut buf).await.expect("recv_from"); + let Ok(view) = MessageView::parse(&buf[..len]) else { + continue; + }; + if view.header().message_id().service_id() != 0xFFFF { + continue; + } + let Ok(sd_view) = view.sd_header() else { continue }; + let Some(entry) = sd_view.entries().next() else { + continue; + }; + if !matches!(entry.entry_type(), Ok(EntryType::OfferService)) { + continue; + } + if entry.service_id() == service_id { + return; + } + } + }; + tokio::time::timeout(std::time::Duration::from_secs(2), recv_loop) + .await + .expect("start_announcing should emit at least one OfferService within 2s"); + } } diff --git a/src/server/sd_state.rs b/src/server/sd_state.rs index 3f90f32..698c20d 100644 --- a/src/server/sd_state.rs +++ b/src/server/sd_state.rs @@ -118,7 +118,24 @@ impl SdStateManager { #[cfg(test)] mod tests { - use super::SdStateManager; + use super::{SdStateManager, ServerConfig}; + use crate::protocol::sd::{self, EntryType, Flags, RebootFlag, TransportProtocol}; + use crate::protocol::{MessageType, MessageView, ReturnCode}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::time::Duration; + use tokio::net::UdpSocket; + + /// Test-only `service_id` for `send_offer_service` tests. Distinct from + /// the 0x5B / 0x5C values used elsewhere in this crate so that parallel + /// tests joined to the same SD multicast group do not produce false + /// matches. If you add a new test that emits a multicast `OfferService`, + /// give it its own dedicated `service_id` too. + const TEST_SERVICE_ID: u16 = 0xFE01; + const TEST_INSTANCE_ID: u16 = 0x42; + /// Port value placed in the emitted `IpV4Endpoint` option so the + /// round-trip assertion has something non-zero to check. The test does + /// not bind this port — it only appears in the announcement payload. + const TEST_ADVERTISED_PORT: u16 = 40210; #[test] fn next_session_id_wraps_past_ffff_skipping_zero() { @@ -137,4 +154,272 @@ mod tests { // new() seeds at 1; first next_session_id increments to 2 assert_eq!(sd.next_session_id(), 2); } + + // ── Multicast-loopback harness ────────────────────────────────────── + // + // All tests below drive `send_offer_service` against a real UDP socket + // and read the emitted packet off a second socket joined to the SD + // multicast group. These are `#[ignore]`d until the `lo` MULTICAST + // flag fix lands on this branch (`feature/firmware_someip_conversion`); + // hosts without that flag drop the packet silently and the tests time + // out on recv. + + /// Bind a receiver socket on the SD multicast port, ready to + /// `join_multicast_v4`. + fn build_mcast_receiver(interface: Ipv4Addr) -> std::io::Result { + let raw = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + )?; + raw.set_reuse_address(true)?; + #[cfg(unix)] + raw.set_reuse_port(true)?; + raw.set_multicast_loop_v4(true)?; + raw.bind(&SocketAddr::new(IpAddr::V4(interface), sd::MULTICAST_PORT).into())?; + raw.set_nonblocking(true)?; + UdpSocket::from_std(raw.into()) + } + + /// Bind a sender socket on an ephemeral port with `multicast_if` pinned + /// to the loopback interface so emitted packets loop back to any + /// receiver joined to the same group on that interface. + fn build_mcast_sender(interface: Ipv4Addr) -> std::io::Result { + let raw = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + )?; + raw.set_reuse_address(true)?; + #[cfg(unix)] + raw.set_reuse_port(true)?; + raw.set_multicast_loop_v4(true)?; + raw.set_multicast_if_v4(&interface)?; + raw.bind(&SocketAddr::new(IpAddr::V4(interface), 0).into())?; + raw.set_nonblocking(true)?; + UdpSocket::from_std(raw.into()) + } + + /// Fields extracted from a received SOME/IP-SD `OfferService` packet. + /// Keeping these together makes per-test assertions a straight list of + /// `assert_eq!`s against expected values. + struct ReceivedOffer { + request_id: u32, + someip_service_id: u16, + someip_method_id: u16, + message_type: MessageType, + return_code: ReturnCode, + protocol_version: u8, + interface_version: u8, + flags: Flags, + entry_service_id: u16, + entry_instance_id: u16, + entry_major_version: u8, + entry_minor_version: u32, + entry_ttl: u32, + endpoint_ip: Ipv4Addr, + endpoint_port: u16, + endpoint_protocol: TransportProtocol, + } + + /// Wait for a multicast `OfferService` matching `expected_service_id`, + /// returning its decoded fields. Other packets on the group (from + /// concurrent tests) are ignored; a single outer timeout bounds the + /// whole filter loop. + async fn recv_our_offer( + rx: &UdpSocket, + expected_service_id: u16, + within: Duration, + ) -> ReceivedOffer { + let recv_loop = async { + let mut buf = [0u8; 2048]; + loop { + let (len, _from) = rx + .recv_from(&mut buf) + .await + .expect("recv_from should succeed"); + let Ok(view) = MessageView::parse(&buf[..len]) else { + continue; + }; + if view.header().message_id().service_id() != 0xFFFF { + continue; + } + let Ok(sd_view) = view.sd_header() else { continue }; + let Some(entry) = sd_view.entries().next() else { + continue; + }; + if !matches!(entry.entry_type(), Ok(EntryType::OfferService)) { + continue; + } + if entry.service_id() != expected_service_id { + continue; + } + let first_option = sd_view + .options() + .next() + .expect("OfferService should carry an endpoint option"); + let (endpoint_ip, endpoint_protocol, endpoint_port) = first_option + .as_ipv4() + .expect("endpoint option should decode as IPv4"); + return ReceivedOffer { + request_id: view.header().request_id(), + someip_service_id: view.header().message_id().service_id(), + someip_method_id: view.header().message_id().method_id(), + message_type: view.header().message_type().message_type(), + return_code: view.header().return_code(), + protocol_version: view.header().protocol_version(), + interface_version: view.header().interface_version(), + flags: sd_view.flags(), + entry_service_id: entry.service_id(), + entry_instance_id: entry.instance_id(), + entry_major_version: entry.major_version(), + entry_minor_version: entry.minor_version(), + entry_ttl: entry.ttl(), + endpoint_ip, + endpoint_port, + endpoint_protocol, + }; + } + }; + tokio::time::timeout(within, recv_loop) + .await + .expect("timed out waiting for our OfferService") + } + + /// Assert every field of the SOME/IP + SD envelope that + /// `send_offer_service` is responsible for — not just the entry body. + /// A future regression that garbles the endpoint option, flips a flag, + /// or changes the SOME/IP message type should fail here. + fn assert_offer_matches(offer: &ReceivedOffer, config: &ServerConfig, expected_request_id: u32) { + // SOME/IP envelope + assert_eq!(offer.someip_service_id, 0xFFFF, "SD uses service_id 0xFFFF"); + assert_eq!(offer.someip_method_id, 0x8100, "SD uses method_id 0x8100"); + assert_eq!(offer.message_type, MessageType::Notification); + assert_eq!(offer.return_code, ReturnCode::Ok); + assert_eq!(offer.protocol_version, 0x01); + assert_eq!(offer.interface_version, 0x01); + assert_eq!( + offer.request_id, expected_request_id, + "request_id is session_id in low 16 bits, client_id zero in high 16", + ); + // SD flags — `send_offer_service` uses Flags::new(true, true). + assert_eq!(offer.flags.reboot(), RebootFlag::RecentlyRebooted); + assert!(offer.flags.unicast()); + // OfferService entry + assert_eq!(offer.entry_service_id, config.service_id); + assert_eq!(offer.entry_instance_id, config.instance_id); + assert_eq!(offer.entry_major_version, config.major_version); + assert_eq!(offer.entry_minor_version, config.minor_version); + assert_eq!(offer.entry_ttl, config.ttl); + // Endpoint option + assert_eq!(offer.endpoint_ip, config.interface); + assert_eq!(offer.endpoint_port, config.local_port); + assert_eq!(offer.endpoint_protocol, TransportProtocol::Udp); + } + + /// Standard loopback receiver/sender pair used by the send-path tests. + fn mcast_rx_tx() -> (UdpSocket, UdpSocket) { + let interface = Ipv4Addr::LOCALHOST; + let rx = build_mcast_receiver(interface).expect("bind receiver"); + rx.join_multicast_v4(sd::MULTICAST_IP, interface) + .expect("join SD multicast group"); + let tx = build_mcast_sender(interface).expect("bind sender"); + (rx, tx) + } + + #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[tokio::test] + async fn send_offer_service_emits_parseable_offer_to_multicast() { + let config = ServerConfig::new( + Ipv4Addr::LOCALHOST, + TEST_ADVERTISED_PORT, + TEST_SERVICE_ID, + TEST_INSTANCE_ID, + ); + let (rx, tx) = mcast_rx_tx(); + + // Seed with a recognisable value so on-wire session_id is exact. + let sd_state = SdStateManager::with_initial(0x1233); + sd_state + .send_offer_service(&config, &tx) + .await + .expect("send_offer_service should succeed on a configured socket"); + + let offer = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; + // next_session_id advances 0x1233 -> 0x1234; client_id is zero. + assert_offer_matches(&offer, &config, 0x0000_1234); + } + + #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[tokio::test] + async fn send_offer_service_advances_session_id_across_calls() { + // Back-to-back sends must consume distinct, incrementing session + // IDs — catches a regression where `send_offer_service` reads the + // counter without advancing it, or reuses a cached value. + let config = ServerConfig::new( + Ipv4Addr::LOCALHOST, + TEST_ADVERTISED_PORT, + TEST_SERVICE_ID, + TEST_INSTANCE_ID, + ); + let (rx, tx) = mcast_rx_tx(); + + let sd_state = SdStateManager::with_initial(0x1233); + sd_state.send_offer_service(&config, &tx).await.unwrap(); + sd_state.send_offer_service(&config, &tx).await.unwrap(); + + let first = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; + let second = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; + assert_eq!(first.request_id, 0x0000_1234); + assert_eq!(second.request_id, 0x0000_1235); + } + + #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[tokio::test] + async fn send_offer_service_wraps_session_id_through_zero_on_send() { + // Session counter wrap must be visible on the wire: 0xFFFE -> 0xFFFF + // -> 0x0001 (skipping the reserved 0). Exercises the wrap branch + // *through* the send path, not only the unit test of next_session_id. + let config = ServerConfig::new( + Ipv4Addr::LOCALHOST, + TEST_ADVERTISED_PORT, + TEST_SERVICE_ID, + TEST_INSTANCE_ID, + ); + let (rx, tx) = mcast_rx_tx(); + + let sd_state = SdStateManager::with_initial(0xFFFE); + sd_state.send_offer_service(&config, &tx).await.unwrap(); + sd_state.send_offer_service(&config, &tx).await.unwrap(); + + let first = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; + let second = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; + assert_eq!(first.request_id, 0x0000_FFFF); + assert_eq!(second.request_id, 0x0000_0001, "must skip reserved 0 on wrap"); + } + + #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[tokio::test] + async fn send_offer_service_preserves_zero_ttl() { + // TTL=0 is a legitimate SOME/IP-SD value meaning "stop offering"; + // `send_offer_service` must preserve it end-to-end rather than, + // say, defaulting it back to the ServerConfig::new value of 3. + let mut config = ServerConfig::new( + Ipv4Addr::LOCALHOST, + TEST_ADVERTISED_PORT, + TEST_SERVICE_ID, + TEST_INSTANCE_ID, + ); + config.ttl = 0; + let (rx, tx) = mcast_rx_tx(); + + let sd_state = SdStateManager::with_initial(0x1233); + sd_state.send_offer_service(&config, &tx).await.unwrap(); + + let offer = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; + assert_offer_matches(&offer, &config, 0x0000_1234); + // Belt-and-suspenders: assert_offer_matches already checks this, + // but the purpose of this test is specifically the zero case. + assert_eq!(offer.entry_ttl, 0); + } } From d3315e85e4b0dea1785ee7c6d3c136b0394d4931 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Wed, 22 Apr 2026 22:18:15 -0400 Subject: [PATCH 4/7] fmt: apply rustfmt style to the new multicast tests Rustfmt on stable wraps the let-else continue blocks and the assert_offer_matches signature differently than I hand-wrote. Let cargo fmt normalize the style. Co-Authored-By: Claude Opus 4.7 --- src/server/mod.rs | 4 +++- src/server/sd_state.rs | 15 ++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 205e99e..6562563 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2136,7 +2136,9 @@ mod tests { if view.header().message_id().service_id() != 0xFFFF { continue; } - let Ok(sd_view) = view.sd_header() else { continue }; + let Ok(sd_view) = view.sd_header() else { + continue; + }; let Some(entry) = sd_view.entries().next() else { continue; }; diff --git a/src/server/sd_state.rs b/src/server/sd_state.rs index 698c20d..cdd41f9 100644 --- a/src/server/sd_state.rs +++ b/src/server/sd_state.rs @@ -244,7 +244,9 @@ mod tests { if view.header().message_id().service_id() != 0xFFFF { continue; } - let Ok(sd_view) = view.sd_header() else { continue }; + let Ok(sd_view) = view.sd_header() else { + continue; + }; let Some(entry) = sd_view.entries().next() else { continue; }; @@ -290,7 +292,11 @@ mod tests { /// `send_offer_service` is responsible for — not just the entry body. /// A future regression that garbles the endpoint option, flips a flag, /// or changes the SOME/IP message type should fail here. - fn assert_offer_matches(offer: &ReceivedOffer, config: &ServerConfig, expected_request_id: u32) { + fn assert_offer_matches( + offer: &ReceivedOffer, + config: &ServerConfig, + expected_request_id: u32, + ) { // SOME/IP envelope assert_eq!(offer.someip_service_id, 0xFFFF, "SD uses service_id 0xFFFF"); assert_eq!(offer.someip_method_id, 0x8100, "SD uses method_id 0x8100"); @@ -395,7 +401,10 @@ mod tests { let first = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; let second = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; assert_eq!(first.request_id, 0x0000_FFFF); - assert_eq!(second.request_id, 0x0000_0001, "must skip reserved 0 on wrap"); + assert_eq!( + second.request_id, 0x0000_0001, + "must skip reserved 0 on wrap" + ); } #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] From e9d180521a1d2216926f57821c7c8cda191ca418 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Thu, 23 Apr 2026 11:37:03 -0400 Subject: [PATCH 5/7] server: track SD session wrap, propagate reboot flag everywhere MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per AUTOSAR SOME/IP-SD, the reboot bit on emitted SD messages must flip from RecentlyRebooted to Continuous once the session counter wraps past 0xFFFF. SdStateManager already owns the counter, so it also tracks wrap (new has_wrapped: AtomicBool latched exactly on the 0xFFFF -> 0x0001 transition) and exposes reboot_flag() as the single source of truth. The four SD emission paths — SdStateManager::send_offer_service, Server::send_unicast_offer, send_subscribe_ack_from_view, and send_subscribe_nack_from_view — all now consume the tracked flag instead of hardcoding Flags::new(true, true). Coverage: four new non-ignored unit tests cover the state machine (fresh, sub-wrap, exactly-on-wrap, monotonic-after-wrap); assert_offer_matches takes an expected RebootFlag; the existing wrap multicast test now asserts first-emit=RecentlyRebooted and second-emit=Continuous across the boundary. Responds to PR #75 feedback on src/server/sd_state.rs:90. Co-Authored-By: Claude Opus 4.7 --- src/server/mod.rs | 10 ++- src/server/sd_state.rs | 148 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 146 insertions(+), 12 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 6562563..5a5cf46 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -348,7 +348,11 @@ impl Server { let entries = [entry]; let options = [option]; - let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &options); + let sd_payload = sd::Header::new( + Flags::new_sd(self.sd_state.reboot_flag()), + &entries, + &options, + ); let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; @@ -732,7 +736,7 @@ impl Server { }); let entries = [ack_entry]; - let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &[]); + let sd_payload = sd::Header::new(Flags::new_sd(self.sd_state.reboot_flag()), &entries, &[]); let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; @@ -779,7 +783,7 @@ impl Server { }); let entries = [nack_entry]; - let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &[]); + let sd_payload = sd::Header::new(Flags::new_sd(self.sd_state.reboot_flag()), &entries, &[]); let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; diff --git a/src/server/sd_state.rs b/src/server/sd_state.rs index cdd41f9..6dbcd20 100644 --- a/src/server/sd_state.rs +++ b/src/server/sd_state.rs @@ -10,21 +10,31 @@ //! parameter on [`SdStateManager::send_offer_service`] becomes the single //! migration point for the announcement path. -use core::sync::atomic::{AtomicU16, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::{net::SocketAddrV4, vec::Vec}; use tokio::net::UdpSocket; -use crate::protocol::sd::{self, Entry, Flags, OptionsCount, ServiceEntry, TransportProtocol}; +use crate::protocol::sd::{ + self, Entry, Flags, OptionsCount, RebootFlag, ServiceEntry, TransportProtocol, +}; use super::{Error, ServerConfig}; /// Tracks the SD session-ID counter and emits `OfferService` announcements. /// /// Session IDs increment with each SD message and wrap from `0xFFFF` back -/// to `0x0001` (skipping `0`, which is reserved). +/// to `0x0001` (skipping `0`, which is reserved). Per AUTOSAR SOME/IP-SD, +/// the reboot flag on emitted SD messages is +/// [`RebootFlag::RecentlyRebooted`] from startup until the counter wraps +/// once, then [`RebootFlag::Continuous`] permanently — `SdStateManager` +/// tracks that transition and exposes it via [`Self::reboot_flag`] so every +/// server-side SD emission path reads from a single source of truth. #[derive(Debug)] pub(super) struct SdStateManager { session_id: AtomicU16, + /// `true` once [`Self::next_session_id`] has advanced past `0xFFFF`. + /// Monotonic: never transitions back to `false`. + has_wrapped: AtomicBool, } impl SdStateManager { @@ -38,11 +48,15 @@ impl SdStateManager { pub(super) const fn with_initial(initial: u16) -> Self { Self { session_id: AtomicU16::new(initial), + has_wrapped: AtomicBool::new(false), } } /// Advance the counter and return the next SOME/IP-SD session ID - /// (`client_id = 0`, session ID in the low 16 bits). Skips 0 on wrap. + /// (`client_id = 0`, session ID in the low 16 bits). Skips 0 on wrap, + /// and latches [`Self::has_wrapped`] the first time the counter crosses + /// the `0xFFFF → 0x0001` boundary so the reboot flag flips to + /// [`RebootFlag::Continuous`] permanently. pub(super) fn next_session_id(&self) -> u32 { let prev = self .session_id @@ -51,10 +65,28 @@ impl SdStateManager { Some(if next == 0 { 1 } else { next }) }) .unwrap(); + // The only value whose successor wraps through 0 is 0xFFFF; latch + // the flag exactly on that transition. + if prev == u16::MAX { + self.has_wrapped.store(true, Ordering::Relaxed); + } let next = prev.wrapping_add(1); u32::from(if next == 0 { 1 } else { next }) } + /// Current SD reboot flag for this server. + /// + /// Returns [`RebootFlag::RecentlyRebooted`] until the session counter + /// has wrapped past `0xFFFF` at least once, then + /// [`RebootFlag::Continuous`] permanently. Every server-side SD + /// emission path ([`Self::send_offer_service`], plus the unicast + /// offer / `SubscribeAck` / `SubscribeNack` paths in + /// [`crate::server::Server`]) calls this so the flag on the wire + /// reflects a single tracked state. + pub(super) fn reboot_flag(&self) -> RebootFlag { + RebootFlag::from(!self.has_wrapped.load(Ordering::Relaxed)) + } + /// Send a multicast `OfferService` announcement for the given config. pub(super) async fn send_offer_service( &self, @@ -83,7 +115,7 @@ impl SdStateManager { let entries = [entry]; let options = [option]; - let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &options); + let sd_payload = sd::Header::new(Flags::new_sd(self.reboot_flag()), &entries, &options); let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; @@ -155,6 +187,79 @@ mod tests { assert_eq!(sd.next_session_id(), 2); } + // ── Reboot-flag tracking ──────────────────────────────────────────── + // + // AUTOSAR SOME/IP-SD: the reboot bit on emitted SD messages must be + // set until the session counter wraps past `0xFFFF` for the first + // time, then cleared permanently. These tests drive `SdStateManager` + // directly (no socket) and verify the state machine that every + // server-side SD emission path (`send_offer_service`, plus unicast + // offer / `SubscribeAck` / `SubscribeNack` in `server::Server`) now + // reads from via [`SdStateManager::reboot_flag`]. + + #[test] + fn reboot_flag_is_recently_rebooted_on_fresh_manager() { + // Default constructor: counter hasn't wrapped, flag must indicate + // a recent reboot so peers can re-synchronize SD state. + let sd = SdStateManager::new(); + assert_eq!(sd.reboot_flag(), RebootFlag::RecentlyRebooted); + } + + #[test] + fn reboot_flag_stays_recently_rebooted_below_wrap() { + // Advancing the counter short of a wrap must not flip the flag — + // it's specifically the 0xFFFF → 0x0001 transition that matters, + // not "has next_session_id been called more than once". + let sd = SdStateManager::with_initial(0x1233); + for _ in 0..10 { + sd.next_session_id(); + } + assert_eq!(sd.reboot_flag(), RebootFlag::RecentlyRebooted); + } + + #[test] + fn reboot_flag_flips_to_continuous_exactly_on_wrap() { + // Step the counter across the wrap boundary and assert the flag + // transitions on the precise call that crosses 0xFFFF → 0x0001. + let sd = SdStateManager::with_initial(0xFFFE); + assert_eq!(sd.reboot_flag(), RebootFlag::RecentlyRebooted); + + // 0xFFFE -> 0xFFFF: prev=0xFFFE, no wrap. + assert_eq!(sd.next_session_id(), 0xFFFF); + assert_eq!( + sd.reboot_flag(), + RebootFlag::RecentlyRebooted, + "counter reached 0xFFFF but has not yet wrapped — flag must still be RecentlyRebooted", + ); + + // 0xFFFF -> 0x0001 (skip 0): prev=0xFFFF, wrap latches. + assert_eq!(sd.next_session_id(), 0x0001); + assert_eq!( + sd.reboot_flag(), + RebootFlag::Continuous, + "wrap just occurred — flag must now be Continuous", + ); + } + + #[test] + fn reboot_flag_is_monotonic_after_wrap() { + // Once the flag latches to Continuous it never goes back, even + // after the counter wraps a second time or is advanced + // indefinitely. Guard against a regression that would re-derive + // the flag from the current counter value (which would wrongly + // flip back to RecentlyRebooted at 0x0001). + let sd = SdStateManager::with_initial(0xFFFE); + sd.next_session_id(); // -> 0xFFFF + sd.next_session_id(); // wrap -> 0x0001 + assert_eq!(sd.reboot_flag(), RebootFlag::Continuous); + + // Many further advances, including crossing 0xFFFF again. + for _ in 0..(u32::from(u16::MAX) + 5) { + sd.next_session_id(); + } + assert_eq!(sd.reboot_flag(), RebootFlag::Continuous); + } + // ── Multicast-loopback harness ────────────────────────────────────── // // All tests below drive `send_offer_service` against a real UDP socket @@ -292,10 +397,16 @@ mod tests { /// `send_offer_service` is responsible for — not just the entry body. /// A future regression that garbles the endpoint option, flips a flag, /// or changes the SOME/IP message type should fail here. + /// + /// `expected_reboot` lets pre-wrap callers assert `RecentlyRebooted` + /// and post-wrap callers assert `Continuous`; the flag is tracked by + /// `SdStateManager::has_wrapped` and read via `reboot_flag()` at each + /// send. fn assert_offer_matches( offer: &ReceivedOffer, config: &ServerConfig, expected_request_id: u32, + expected_reboot: RebootFlag, ) { // SOME/IP envelope assert_eq!(offer.someip_service_id, 0xFFFF, "SD uses service_id 0xFFFF"); @@ -308,8 +419,10 @@ mod tests { offer.request_id, expected_request_id, "request_id is session_id in low 16 bits, client_id zero in high 16", ); - // SD flags — `send_offer_service` uses Flags::new(true, true). - assert_eq!(offer.flags.reboot(), RebootFlag::RecentlyRebooted); + // SD flags — reboot comes from SdStateManager::reboot_flag (latches + // to Continuous after the session counter wraps past 0xFFFF); + // unicast is always true for SD. + assert_eq!(offer.flags.reboot(), expected_reboot); assert!(offer.flags.unicast()); // OfferService entry assert_eq!(offer.entry_service_id, config.service_id); @@ -353,7 +466,9 @@ mod tests { let offer = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; // next_session_id advances 0x1233 -> 0x1234; client_id is zero. - assert_offer_matches(&offer, &config, 0x0000_1234); + // Fresh SdStateManager: counter has not wrapped, reboot flag is + // RecentlyRebooted. + assert_offer_matches(&offer, &config, 0x0000_1234, RebootFlag::RecentlyRebooted); } #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] @@ -405,6 +520,21 @@ mod tests { second.request_id, 0x0000_0001, "must skip reserved 0 on wrap" ); + // Reboot flag latches: the first emission goes out before the + // wrap happens (prev=0xFFFE), so it still advertises + // RecentlyRebooted; the second emission is the one whose + // next_session_id call crossed 0xFFFF -> 0x0001, so the flag + // Flips to Continuous permanently from there on. + assert_eq!( + first.flags.reboot(), + RebootFlag::RecentlyRebooted, + "first emit is pre-wrap and must still advertise RecentlyRebooted", + ); + assert_eq!( + second.flags.reboot(), + RebootFlag::Continuous, + "post-wrap emit must advertise Continuous", + ); } #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] @@ -426,7 +556,7 @@ mod tests { sd_state.send_offer_service(&config, &tx).await.unwrap(); let offer = recv_our_offer(&rx, config.service_id, Duration::from_secs(2)).await; - assert_offer_matches(&offer, &config, 0x0000_1234); + assert_offer_matches(&offer, &config, 0x0000_1234, RebootFlag::RecentlyRebooted); // Belt-and-suspenders: assert_offer_matches already checks this, // but the purpose of this test is specifically the zero case. assert_eq!(offer.entry_ttl, 0); From 3116a1cc95dadc5c08a546c40477dd1b44194886 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Thu, 23 Apr 2026 14:12:07 -0400 Subject: [PATCH 6/7] server: fix SD wrap-flag ordering across all emission paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-2 Copilot review caught a real correctness bug in the prior reboot-flag refactor: the four SD emission paths read `reboot_flag()` BEFORE advancing the session counter, so the message whose session_id crosses 0xFFFF -> 0x0001 (where `has_wrapped` actually latches) still advertises `RebootFlag::RecentlyRebooted`. The flip only lands on the NEXT emission — violating AUTOSAR SOME/IP-SD semantics that say the wrap message itself should carry `Continuous`. Reordered in all four sites: call `next_session_id()` first so `has_wrapped` latches, then read `reboot_flag()` for this specific message. Sites: - `SdStateManager::send_offer_service` (sd_state.rs) - `Server::send_unicast_offer` (mod.rs) - `Server::send_subscribe_ack_from_view` (mod.rs) - `Server::send_subscribe_nack_from_view` (mod.rs) Added short comments at each site pointing at the canonical ordering note on `send_offer_service`. Also reworded the multicast-loopback `#[ignore]` comment block and per-test message to remove the stale branch-name reference (`feature/firmware_someip_conversion`) — the underlying dependency is the `lo` MULTICAST flag, not a branch-specific fix. New wording says "skipped on hosts whose `lo` lacks the MULTICAST flag" with the `ip link show lo` diagnostic pointer. Coverage: the existing ignore-gated wrap test `send_offer_service_wraps_session_id_through_zero_on_send` already asserts the pre-wrap/post-wrap flag transition on-the-wire; with the ordering fix it now passes in environments that run ignored tests (would have FAILED before this commit — which is why the bug slipped past the first round). The non-ignored state-machine tests (`reboot_flag_flips_to_continuous_exactly_on_wrap` et al.) are unaffected and still green. Co-Authored-By: Claude Opus 4.7 --- src/server/mod.rs | 17 ++++++++++++----- src/server/sd_state.rs | 34 +++++++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 5a5cf46..cf57ba1 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -348,6 +348,11 @@ impl Server { let entries = [entry]; let options = [option]; + // See the ordering note on `SdStateManager::send_offer_service`: + // advance the session counter first so `has_wrapped` latches, + // then read the reboot flag so the wrap message itself carries + // `Continuous`. + let sid = self.sd_state.next_session_id(); let sd_payload = sd::Header::new( Flags::new_sd(self.sd_state.reboot_flag()), &entries, @@ -357,7 +362,6 @@ impl Server { let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; - let sid = self.sd_state.next_session_id(); let someip_header = SomeIpHeader::new_sd(sid, sd_data.len()); let mut buffer = Vec::new(); @@ -736,12 +740,14 @@ impl Server { }); let entries = [ack_entry]; + // Ordering: advance the session id first so `has_wrapped` latches + // on the wrap boundary, then read `reboot_flag()` for this + // message — see `SdStateManager::send_offer_service`. + let sid = self.sd_state.next_session_id(); let sd_payload = sd::Header::new(Flags::new_sd(self.sd_state.reboot_flag()), &entries, &[]); let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; - - let sid = self.sd_state.next_session_id(); let someip_header = SomeIpHeader::new_sd(sid, sd_data.len()); let mut buffer = Vec::new(); @@ -783,12 +789,13 @@ impl Server { }); let entries = [nack_entry]; + // Ordering: advance first so `has_wrapped` latches, then read + // reboot flag — see `SdStateManager::send_offer_service`. + let sid = self.sd_state.next_session_id(); let sd_payload = sd::Header::new(Flags::new_sd(self.sd_state.reboot_flag()), &entries, &[]); let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; - - let sid = self.sd_state.next_session_id(); let someip_header = SomeIpHeader::new_sd(sid, sd_data.len()); let mut buffer = Vec::new(); diff --git a/src/server/sd_state.rs b/src/server/sd_state.rs index 6dbcd20..11e7ea5 100644 --- a/src/server/sd_state.rs +++ b/src/server/sd_state.rs @@ -115,12 +115,19 @@ impl SdStateManager { let entries = [entry]; let options = [option]; + // Advance the session counter FIRST so `has_wrapped` latches on + // the wrap transition, then derive the reboot flag for this + // same message. Without this ordering the message carrying + // session_id=0x0001 after a wrap would still advertise + // `RebootFlag::RecentlyRebooted`, and the flip would only land + // on the NEXT emission — violating AUTOSAR SOME/IP-SD semantics + // (the wrap message itself should carry `Continuous`). + let sid = self.next_session_id(); let sd_payload = sd::Header::new(Flags::new_sd(self.reboot_flag()), &entries, &options); let mut sd_data = Vec::new(); sd_payload.encode(&mut sd_data)?; - let sid = self.next_session_id(); let someip_header = SomeIpHeader::new_sd(sid, sd_data.len()); let mut buffer = Vec::new(); @@ -264,10 +271,11 @@ mod tests { // // All tests below drive `send_offer_service` against a real UDP socket // and read the emitted packet off a second socket joined to the SD - // multicast group. These are `#[ignore]`d until the `lo` MULTICAST - // flag fix lands on this branch (`feature/firmware_someip_conversion`); - // hosts without that flag drop the packet silently and the tests time - // out on recv. + // multicast group. These are `#[ignore]`d on environments whose + // loopback interface does not carry the `MULTICAST` flag (check with + // `ip link show lo`); on such hosts the kernel drops multicast on + // `lo` before loopback reflection, so the receiver times out. Runs + // in any environment where loopback multicast is available. /// Bind a receiver socket on the SD multicast port, ready to /// `join_multicast_v4`. @@ -446,7 +454,9 @@ mod tests { (rx, tx) } - #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[ignore = "requires MULTICAST on loopback; skipped on hosts whose `lo` \ + lacks the MULTICAST flag. Runs in any environment where \ + loopback multicast is available."] #[tokio::test] async fn send_offer_service_emits_parseable_offer_to_multicast() { let config = ServerConfig::new( @@ -471,7 +481,9 @@ mod tests { assert_offer_matches(&offer, &config, 0x0000_1234, RebootFlag::RecentlyRebooted); } - #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[ignore = "requires MULTICAST on loopback; skipped on hosts whose `lo` \ + lacks the MULTICAST flag. Runs in any environment where \ + loopback multicast is available."] #[tokio::test] async fn send_offer_service_advances_session_id_across_calls() { // Back-to-back sends must consume distinct, incrementing session @@ -495,7 +507,9 @@ mod tests { assert_eq!(second.request_id, 0x0000_1235); } - #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[ignore = "requires MULTICAST on loopback; skipped on hosts whose `lo` \ + lacks the MULTICAST flag. Runs in any environment where \ + loopback multicast is available."] #[tokio::test] async fn send_offer_service_wraps_session_id_through_zero_on_send() { // Session counter wrap must be visible on the wire: 0xFFFE -> 0xFFFF @@ -537,7 +551,9 @@ mod tests { ); } - #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[ignore = "requires MULTICAST on loopback; skipped on hosts whose `lo` \ + lacks the MULTICAST flag. Runs in any environment where \ + loopback multicast is available."] #[tokio::test] async fn send_offer_service_preserves_zero_ttl() { // TTL=0 is a legitimate SOME/IP-SD value meaning "stop offering"; From 7f975cc4b068d1c81d05b39fe778eb80e6f6a420 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Fri, 24 Apr 2026 12:52:38 -0400 Subject: [PATCH 7/7] round-3: drop branch-specific note from #[ignore] reason Copilot round-3 flagged the `#[ignore]` reason on start_announcing_emits_first_offer_within_timeout for still carrying a branch-specific phrase ("re-enable after lo fix on this branch"), which becomes stale once merged. Replaced with a durable prerequisite description: requires loopback multicast support (MULTICAST on lo) Matches the companion rewording in a638a4b on the sd_state.rs multicast-loopback harness comment block. Addresses Copilot comment 3132878961. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/server/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index cf57ba1..2bcc4b1 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2098,7 +2098,7 @@ mod tests { /// spawned announcer task keeps running until runtime teardown; that /// is intentional (there is no stop API on `Server`) and harmless in /// a `#[tokio::test]`. - #[ignore = "requires MULTICAST on loopback; re-enable after lo fix on this branch"] + #[ignore = "requires loopback multicast support (MULTICAST on lo)"] #[tokio::test] async fn start_announcing_emits_first_offer_within_timeout() { use crate::protocol::MessageView;