Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 106 additions & 120 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

mod error;
mod event_publisher;
mod sd_state;
mod service_info;
mod subscription_manager;

Expand All @@ -16,13 +17,14 @@ pub use event_publisher::EventPublisher;
pub use service_info::{EventGroupInfo, ServiceInfo};
pub use subscription_manager::SubscriptionManager;

use sd_state::SdStateManager;

use crate::e2e::{E2EKey, E2EProfile, E2ERegistry};
use crate::protocol::sd::{self, Entry, Flags, OptionsCount, ServiceEntry, TransportProtocol};
use core::sync::atomic::Ordering;
use std::{
format,
net::{IpAddr, Ipv4Addr, SocketAddrV4},
sync::{Arc, Mutex, atomic::AtomicU16},
sync::{Arc, Mutex},
vec,
Comment thread
JustinKovacich marked this conversation as resolved.
vec::Vec,
};
Expand Down Expand Up @@ -74,8 +76,8 @@ pub struct Server {
subscriptions: Arc<RwLock<SubscriptionManager>>,
/// Event publisher
publisher: Arc<EventPublisher>,
/// Incrementing session ID for SD messages
sd_session_id: Arc<AtomicU16>,
/// SD session-ID counter and announcement emitter
sd_state: Arc<SdStateManager>,
/// Shared E2E registry for runtime E2E configuration
e2e_registry: Arc<Mutex<E2ERegistry>>,
/// `true` if this server was constructed via [`Server::new_passive`].
Expand Down Expand Up @@ -186,7 +188,7 @@ impl Server {
sd_socket: Arc::new(sd_socket),
subscriptions,
publisher,
sd_session_id: Arc::new(AtomicU16::new(1)),
sd_state: Arc::new(SdStateManager::new()),
e2e_registry,
is_passive: false,
})
Expand Down Expand Up @@ -255,7 +257,7 @@ impl Server {
sd_socket: Arc::new(sd_socket),
subscriptions,
publisher,
sd_session_id: Arc::new(AtomicU16::new(1)),
sd_state: Arc::new(SdStateManager::new()),
e2e_registry,
is_passive: true,
})
Expand Down Expand Up @@ -288,12 +290,12 @@ impl Server {
}
let config = self.config.clone();
let sd_socket = Arc::clone(&self.sd_socket);
let sd_session_id = Arc::clone(&self.sd_session_id);
let sd_state = Arc::clone(&self.sd_state);

tokio::spawn(async move {
let mut announcement_count = 0u32;
loop {
match Self::send_offer_service(&config, &sd_socket, &sd_session_id).await {
match sd_state.send_offer_service(&config, &sd_socket).await {
Ok(()) => {
announcement_count += 1;
if announcement_count == 1 {
Expand Down Expand Up @@ -322,80 +324,6 @@ impl Server {
Ok(())
}

/// Send an `OfferService` message via Service Discovery
async fn send_offer_service(
config: &ServerConfig,
socket: &UdpSocket,
session_id: &AtomicU16,
) -> Result<(), Error> {
use crate::protocol::Header as SomeIpHeader;
use crate::traits::WireFormat;

// Create OfferService entry
let entry = Entry::OfferService(ServiceEntry {
index_first_options_run: 0,
index_second_options_run: 0,
options_count: OptionsCount::new(1, 0),
service_id: config.service_id,
instance_id: config.instance_id,
major_version: config.major_version,
ttl: config.ttl,
minor_version: config.minor_version,
});

// Create IPv4 endpoint option
let option = sd::Options::IpV4Endpoint {
ip: config.interface,
port: config.local_port,
protocol: TransportProtocol::Udp,
};

let entries = [entry];
let options = [option];
let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &options);

// Encode SD payload
let mut sd_data = Vec::new();
sd_payload.encode(&mut sd_data)?;

// Increment session ID (wrapping from 0xFFFF back to 0x0001, skipping 0)
let prev = session_id
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
let next = v.wrapping_add(1);
Some(if next == 0 { 1 } else { next })
})
.unwrap();
let next = prev.wrapping_add(1);
let sid = u32::from(if next == 0 { 1 } else { next });

// Wrap in SOME/IP header for SD (service 0xFFFF, method 0x8100)
let someip_header = SomeIpHeader::new_sd(sid, sd_data.len());

// Encode complete SOME/IP-SD message
let mut buffer = Vec::new();
someip_header.encode(&mut buffer)?;
buffer.extend_from_slice(&sd_data);

let multicast_addr = SocketAddrV4::new(sd::MULTICAST_IP, sd::MULTICAST_PORT);

tracing::trace!(
"Sending OfferService: service=0x{:04X}, instance={}, port={}, size={} bytes",
config.service_id,
config.instance_id,
config.local_port,
buffer.len()
);
tracing::trace!(
"OfferService data: {:02X?}",
&buffer[..buffer.len().min(64)]
);

socket.send_to(&buffer, multicast_addr).await?;
tracing::trace!("Sent to {}", multicast_addr);

Ok(())
}

/// Send a unicast `OfferService` to a specific address (in response to `FindService`)
async fn send_unicast_offer(&self, target: std::net::SocketAddr) -> Result<(), Error> {
use crate::protocol::Header as SomeIpHeader;
Expand All @@ -420,12 +348,20 @@ impl Server {

let entries = [entry];
let options = [option];
let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &options);
// See the ordering note on `SdStateManager::send_offer_service`:
// advance the session counter first so `has_wrapped` latches,
// then read the reboot flag so the wrap message itself carries
// `Continuous`.
let sid = self.sd_state.next_session_id();
let sd_payload = sd::Header::new(
Flags::new_sd(self.sd_state.reboot_flag()),
&entries,
&options,
);
Comment thread
JustinKovacich marked this conversation as resolved.

let mut sd_data = Vec::new();
sd_payload.encode(&mut sd_data)?;

let sid = self.next_sd_session_id();
let someip_header = SomeIpHeader::new_sd(sid, sd_data.len());

let mut buffer = Vec::new();
Expand All @@ -442,20 +378,6 @@ impl Server {
Ok(())
}

/// Get the next SD session ID (`client_id=0`, `session_id` incrementing), skipping 0
fn next_sd_session_id(&self) -> u32 {
let prev = self
.sd_session_id
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
let next = v.wrapping_add(1);
Some(if next == 0 { 1 } else { next })
})
.unwrap();
// fetch_update returns the previous value; compute the same next value
let next = prev.wrapping_add(1);
u32::from(if next == 0 { 1 } else { next })
}

/// Get the event publisher for sending events
#[must_use]
pub fn publisher(&self) -> Arc<EventPublisher> {
Expand Down Expand Up @@ -818,12 +740,14 @@ impl Server {
});

let entries = [ack_entry];
let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &[]);
// Ordering: advance the session id first so `has_wrapped` latches
// on the wrap boundary, then read `reboot_flag()` for this
// message — see `SdStateManager::send_offer_service`.
let sid = self.sd_state.next_session_id();
let sd_payload = sd::Header::new(Flags::new_sd(self.sd_state.reboot_flag()), &entries, &[]);

let mut sd_data = Vec::new();
sd_payload.encode(&mut sd_data)?;

let sid = self.next_sd_session_id();
let someip_header = SomeIpHeader::new_sd(sid, sd_data.len());

let mut buffer = Vec::new();
Expand Down Expand Up @@ -865,12 +789,13 @@ impl Server {
});

let entries = [nack_entry];
let sd_payload = sd::Header::new(Flags::new(true, true), &entries, &[]);
// Ordering: advance first so `has_wrapped` latches, then read
// reboot flag — see `SdStateManager::send_offer_service`.
let sid = self.sd_state.next_session_id();
let sd_payload = sd::Header::new(Flags::new_sd(self.sd_state.reboot_flag()), &entries, &[]);

let mut sd_data = Vec::new();
sd_payload.encode(&mut sd_data)?;

let sid = self.next_sd_session_id();
let someip_header = SomeIpHeader::new_sd(sid, sd_data.len());

let mut buffer = Vec::new();
Expand Down Expand Up @@ -1508,22 +1433,6 @@ mod tests {
server_handle.abort();
}

#[tokio::test]
async fn test_next_sd_session_id_wraps() {
let (server, _) = create_test_server(0x5B, 1).await;

// Set session ID to 0xFFFE
server.sd_session_id.store(0xFFFE, Ordering::Relaxed);

// First call: 0xFFFE -> 0xFFFF, returns 0xFFFF
let sid1 = server.next_sd_session_id();
assert_eq!(sid1, 0xFFFF);

// Second call: 0xFFFF -> wraps to 0x0001 (skipping 0), returns 0x0001
let sid2 = server.next_sd_session_id();
assert_eq!(sid2, 0x0001);
}

#[tokio::test]
async fn test_handle_sd_other_entry_type() {
let (mut server, _) = create_test_server(0x5B, 1).await;
Expand Down Expand Up @@ -2179,4 +2088,81 @@ mod tests {
);
});
}

/// Smoke test for [`Server::start_announcing`]: a loopback server with
/// `multicast_loop` enabled should emit at least one `OfferService` on
/// the SD multicast group within a couple of seconds.
///
/// `#[ignore]`d for the same reason as the `sd_state` tests — hosts
/// without the MULTICAST flag on `lo` drop the packet silently. The
/// spawned announcer task keeps running until runtime teardown; that
/// is intentional (there is no stop API on `Server`) and harmless in
/// a `#[tokio::test]`.
#[ignore = "requires loopback multicast support (MULTICAST on lo)"]
#[tokio::test]
async fn start_announcing_emits_first_offer_within_timeout() {
use crate::protocol::MessageView;
use crate::protocol::sd::EntryType;

let interface = Ipv4Addr::LOCALHOST;
// Pick a service_id and unicast port that do not collide with
// the other loopback-enabled server test in this file.
let service_id = 0xFE02;
let config = ServerConfig::new(interface, 30684, service_id, 0x43);

// Receiver joined to the SD multicast group on loopback.
let raw_rx = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)
.unwrap();
raw_rx.set_reuse_address(true).unwrap();
#[cfg(unix)]
raw_rx.set_reuse_port(true).unwrap();
raw_rx.set_multicast_loop_v4(true).unwrap();
raw_rx
.bind(&std::net::SocketAddr::new(IpAddr::V4(interface), sd::MULTICAST_PORT).into())
.unwrap();
raw_rx.set_nonblocking(true).unwrap();
let rx: UdpSocket = UdpSocket::from_std(raw_rx.into()).unwrap();
rx.join_multicast_v4(sd::MULTICAST_IP, interface).unwrap();

let server = Server::new_with_loopback(config, true)
.await
.expect("server must bind with loopback enabled");
server
.start_announcing()
.expect("start_announcing should succeed on a non-passive server");

// Scan the multicast group for our OfferService. The first tick
// happens immediately; 2s is ample headroom for scheduler jitter.
let recv_loop = async {
let mut buf = [0u8; 2048];
loop {
let (len, _from) = rx.recv_from(&mut buf).await.expect("recv_from");
let Ok(view) = MessageView::parse(&buf[..len]) else {
continue;
};
if view.header().message_id().service_id() != 0xFFFF {
continue;
}
let Ok(sd_view) = view.sd_header() else {
continue;
};
let Some(entry) = sd_view.entries().next() else {
continue;
};
if !matches!(entry.entry_type(), Ok(EntryType::OfferService)) {
continue;
}
if entry.service_id() == service_id {
return;
}
}
};
tokio::time::timeout(std::time::Duration::from_secs(2), recv_loop)
.await
.expect("start_announcing should emit at least one OfferService within 2s");
}
}
Loading
Loading