Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ pub enum Error {
/// An E2E protection or checking error occurred.
#[error(transparent)]
E2e(#[from] crate::e2e::Error),
/// A fixed-capacity internal structure is full. The argument names the
/// structure so bare-metal users can size the corresponding compile-time
/// constant up (e.g. `"unicast_sockets"`).
/// A fixed-capacity internal structure is full. The argument is a
/// lowercase `snake_case` tag naming the resource; grep the crate for
/// the tag to find the compile-time constant that governs it. Current
/// tags: `"unicast_sockets"` (→ `UNICAST_SOCKETS_CAP`), `"udp_buffer"`
/// (→ `crate::UDP_BUFFER_SIZE`).
#[error("internal capacity exceeded: {0}")]
Capacity(&'static str),
}
35 changes: 25 additions & 10 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,31 @@
//! # Memory footprint
//!
//! The client's internal `Inner` state is allocated inline rather than on
//! the heap. With the default capacity constants used by the client
//! internals — `REQUEST_QUEUE_CAP=32`, `PENDING_RESPONSES_CAP=64`, and
//! `UNICAST_SOCKETS_CAP=8` in `inner.rs`, plus `SESSION_CAP=64` in
//! `session.rs` — `Inner<P>` occupies on the order of **8–12 KiB**,
//! depending on `sizeof::<P>()` and `sizeof::<SocketManager<P>>()`. On
//! `std + tokio`, this is allocated on the heap when the run-loop is
//! spawned, so the overhead is invisible to callers. On the bare-metal
//! port (future), whoever drives the future must arrange storage for it
//! (either a `static` or a heap allocator); these capacity constants are
//! the primary knobs for trimming this footprint.
//! the heap. With the default capacity constants declared in `inner.rs` —
//! `REQUEST_QUEUE_CAP=32`, `PENDING_RESPONSES_CAP=64`, `UNICAST_SOCKETS_CAP=8`,
//! and `SESSION_CAP=64` — `Inner<P>` occupies on the order of **8–12 KiB**,
//! depending on `sizeof::<P>()` and `sizeof::<SocketManager<P>>()`.
//!
//! In addition, each `SocketManager`'s spawn loop holds a persistent
//! `[u8; UDP_BUFFER_SIZE]` receive/send buffer. When the send path needs
//! E2E protection (i.e. the destination key is registered in the
//! `E2ERegistry`), it transiently allocates a second
//! `[u8; UDP_BUFFER_SIZE]` on the stack for the protected output; sends
//! without E2E protection do not pay this cost. So an active
//! socket-loop future carries one always-live `UDP_BUFFER_SIZE` buffer
//! plus up to one additional `UDP_BUFFER_SIZE` buffer during E2E sends.
//! With `UNICAST_SOCKETS_CAP=8` sockets bound, the total per-client
//! buffer budget scales as `UNICAST_SOCKETS_CAP * UDP_BUFFER_SIZE`
//! always-live, up to `2 * UNICAST_SOCKETS_CAP * UDP_BUFFER_SIZE` at
//! peak during concurrent E2E-protected sends on every socket. At the
//! current default of `UDP_BUFFER_SIZE = 1500`, that is ~12 KiB
//! always-live / ~24 KiB peak per client.
//!
//! On `std + tokio`, all of this is allocated on the heap when each future
//! is spawned, so the overhead is invisible to callers. On the bare-metal
//! port (future), whoever drives the futures must arrange storage for them
//! (either a `static` or a heap allocator); the capacity constants plus
//! [`crate::UDP_BUFFER_SIZE`] are the knobs for trimming this footprint.
mod error;
mod inner;
mod service_registry;
Expand Down
143 changes: 133 additions & 10 deletions src/client/socket_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
e2e::{E2ECheckStatus, E2EKey, E2ERegistry, PROFILE4_HEADER_SIZE},
UDP_BUFFER_SIZE,
e2e::{E2ECheckStatus, E2EKey, E2ERegistry},
protocol::{Message, MessageView, sd},
traits::{PayloadWireFormat, WireFormat},
};
Expand All @@ -9,7 +10,6 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
sync::{Arc, Mutex},
task::{Context, Poll},
vec,
};
use tokio::{net::UdpSocket, select, sync::mpsc};
use tracing::{error, info, trace};
Expand Down Expand Up @@ -212,7 +212,7 @@ where
e2e_registry: Arc<Mutex<E2ERegistry>>,
) {
tokio::spawn(async move {
let mut buf = vec![0; 1400];
let mut buf = [0u8; UDP_BUFFER_SIZE];
loop {
select! {
result = socket.recv_from(&mut buf) => {
Expand Down Expand Up @@ -257,6 +257,20 @@ where
message = tx_rx.recv() => {
if let Some(send_message) = message {
trace!("Sending: {:?}", &send_message);
// Fail fast with the capacity error rather than
// letting `encode` report a less-actionable
// protocol I/O error when it runs out of
// buffer. Matches the E2E-overflow arm below
// and the server event_publisher path.
let required_size = send_message.message.required_size();
if required_size > UDP_BUFFER_SIZE {
error!(
"outgoing message ({} bytes) exceeds UDP_BUFFER_SIZE ({}); dropping send",
required_size, UDP_BUFFER_SIZE
);
let _ = send_message.response.send(Err(Error::Capacity("udp_buffer")));
continue;
}
let mut message_length = match send_message.message.encode(&mut buf.as_mut_slice()) {
Ok(length) => length,
Err(e) => {
Expand All @@ -271,22 +285,35 @@ where
}
};

// Apply E2E protect if configured
// Apply E2E protect if configured. `protected`
// is a disjoint stack buffer, so the input can
// be borrowed directly out of `buf[16..]` with
// no intermediate copy.
{
Comment thread
JustinKovacich marked this conversation as resolved.
let key = E2EKey::from_message_id(send_message.message.header().message_id());
let mut registry = e2e_registry.lock().expect("e2e registry lock poisoned");
if registry.contains_key(&key) {
let original_payload = buf[16..message_length].to_vec();
let upper_header: [u8; 8] = buf[8..16].try_into().expect("upper header slice");
let mut protected = vec![0u8; original_payload.len() + PROFILE4_HEADER_SIZE];
match registry.protect(key, &original_payload, upper_header, &mut protected) {
let mut protected = [0u8; UDP_BUFFER_SIZE];
let result = registry.protect(
key,
&buf[16..message_length],
upper_header,
&mut protected,
);
match result {
Some(Ok(protected_len)) => {
if 16 + protected_len > UDP_BUFFER_SIZE {
error!(
"E2E-protected datagram ({} bytes, header + protected payload) exceeds UDP_BUFFER_SIZE ({}); dropping send",
16 + protected_len, UDP_BUFFER_SIZE
);
let _ = send_message.response.send(Err(Error::Capacity("udp_buffer")));
continue;
Comment thread
JustinKovacich marked this conversation as resolved.
}
#[allow(clippy::cast_possible_truncation)]
let new_length: u32 = 8 + protected_len as u32;
buf[4..8].copy_from_slice(&new_length.to_be_bytes());
if 16 + protected_len > buf.len() {
buf.resize(16 + protected_len, 0);
}
buf[16..16 + protected_len].copy_from_slice(&protected[..protected_len]);
message_length = 16 + protected_len;
}
Expand Down Expand Up @@ -332,6 +359,7 @@ mod tests {
use super::*;
use crate::protocol::sd::test_support::{TestPayload, empty_sd_header};
use std::format;
use std::vec;
Comment thread
JustinKovacich marked this conversation as resolved.
Comment thread
JustinKovacich marked this conversation as resolved.

type TestSocketManager = SocketManager<TestPayload>;

Expand Down Expand Up @@ -562,4 +590,99 @@ mod tests {
"reboot flag stays Continuous after wrap"
);
}

#[tokio::test]
async fn send_e2e_protected_payload_exceeding_udp_buffer_returns_capacity_error() {
use crate::RawPayload;
use crate::e2e::{E2EProfile, Profile4Config};
use crate::protocol::{Header, MessageId, MessageType, MessageTypeField, ReturnCode};

// Register an E2E profile so the protect branch runs.
let message_id = MessageId::new_from_service_and_method(0x1234, 0x5678);
let key = E2EKey::from_message_id(message_id);
let mut reg = E2ERegistry::new();
reg.register(key, E2EProfile::Profile4(Profile4Config::new(0, 15)));
let e2e_registry = Arc::new(Mutex::new(reg));

let mut sm = SocketManager::<RawPayload>::bind(0, e2e_registry).unwrap();

// Craft a message whose raw-encoded size fits `UDP_BUFFER_SIZE`
// exactly (header + payload = cap) but whose E2E-protected size
// does not — Profile4 adds `PROFILE4_HEADER_SIZE` bytes which
// pushes the protected total over the cap. Sizes derived from
// `UDP_BUFFER_SIZE` and `PROFILE4_HEADER_SIZE` so the fixture
// stays valid if the constant is retuned.
const SOMEIP_HEADER_SIZE: usize = 16;
let payload_len = UDP_BUFFER_SIZE - SOMEIP_HEADER_SIZE; // raw total == UDP_BUFFER_SIZE
let payload_bytes = vec![0u8; payload_len];
let payload = RawPayload::from_payload_bytes(message_id, &payload_bytes).unwrap();
let header = Header::new(
message_id,
0x0001_0001,
0x01,
0x01,
MessageTypeField::new(MessageType::Request, false),
ReturnCode::Ok,
payload_bytes.len(),
);
let message = Message::new(header, payload);

let target = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999);
let err = sm
.send(target, message)
.await
.expect_err("E2E-protected oversize message must error");
match err {
Error::Capacity(tag) => assert_eq!(tag, "udp_buffer"),
other => panic!("expected Error::Capacity(\"udp_buffer\"), got {other:?}"),
}
}

/// Messages whose raw encoded size already exceeds `UDP_BUFFER_SIZE`
/// — with no E2E in play — must be rejected up front with
/// `Error::Capacity("udp_buffer")` rather than bubbling out the
/// less-actionable protocol I/O error that `encode` would report
/// after running out of buffer.
#[tokio::test]
async fn send_raw_message_exceeding_udp_buffer_returns_capacity_error() {
use crate::RawPayload;
use crate::protocol::{Header, MessageId, MessageType, MessageTypeField, ReturnCode};

let message_id = MessageId::new_from_service_and_method(0x1234, 0x5678);
// No E2E registered — goes straight through the pre-encode check.
let e2e_registry = Arc::new(Mutex::new(E2ERegistry::new()));
let mut sm = SocketManager::<RawPayload>::bind(0, e2e_registry).unwrap();

// Derive a payload that makes the full message exceed the UDP cap
// by 1 byte regardless of how `UDP_BUFFER_SIZE` is retuned:
// 16-byte header + payload_len = UDP_BUFFER_SIZE + 1.
const SOMEIP_HEADER_SIZE: usize = 16;
let payload_len = UDP_BUFFER_SIZE - SOMEIP_HEADER_SIZE + 1;
let payload_bytes = vec![0u8; payload_len];
let payload = RawPayload::from_payload_bytes(message_id, &payload_bytes).unwrap();
let header = Header::new(
message_id,
0x0001_0001,
0x01,
0x01,
MessageTypeField::new(MessageType::Request, false),
ReturnCode::Ok,
payload_bytes.len(),
);
let message = Message::new(header, payload);
assert!(
message.required_size() > UDP_BUFFER_SIZE,
"fixture must actually exceed the cap for this test to exercise the new path",
);

let target = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999);
let err = sm
.send(target, message)
.await
.expect_err("raw oversize message must error");
match err {
Error::Capacity(tag) => assert_eq!(tag, "udp_buffer"),
other => panic!("expected Error::Capacity(\"udp_buffer\"), got {other:?}"),
}
}
}
27 changes: 27 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,33 @@
#[cfg(feature = "std")]
extern crate std;

/// Maximum size, in bytes, of UDP payloads for `client` / `server` send
/// paths that serialize into a fixed-size buffer of this size.
///
/// Paths currently capped by this constant:
/// - `client::SocketManager::send` (unicast + SD outbound)
/// - `server::EventPublisher::publish_event`
/// - `server::EventPublisher::publish_raw_event`
///
/// When one of these paths is actually reached and serialization is
/// attempted, messages larger than this cap fail with
/// `client::Error::Capacity("udp_buffer")` or
/// `server::Error::Capacity("udp_buffer")`, depending on the path.
/// Paths that return early before
/// attempting serialization (e.g. `publish_event` when there are no
/// subscribers) are not affected. Other outbound SD paths (announcement
/// builders, `SubscribeAck` / `SubscribeNack`) currently still use
/// heap `Vec` buffers and are not capped by this constant — that is a
/// known gap, planned alongside the bare-metal `no_alloc` refactor.
///
/// Note that this is an application-level UDP payload limit, not an
/// Ethernet-MTU-safe size: a 1500-byte UDP payload exceeds a 1500-byte
/// L2 MTU once IP/UDP headers are added (IPv4 leaves 1472 bytes of UDP
/// payload, IPv6 leaves 1452), so sends at this size may fragment or
/// fail depending on the network stack. Bare-metal ports targeting a
/// smaller link MTU may want to lower this by forking.
pub const UDP_BUFFER_SIZE: usize = 1500;

/// SOME/IP client for discovering services and exchanging messages.
#[cfg(feature = "client")]
pub mod client;
Expand Down
12 changes: 12 additions & 0 deletions src/server/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use thiserror::Error;

/// Errors that can occur during SOME/IP server operations.
///
/// Not marked `#[non_exhaustive]` today: downstream crates that match on
/// this enum rely on exhaustiveness, and adding the attribute now would be
/// a silent breaking change that `cargo-semver-checks` would flag. Revisit
/// when a breaking release is planned.
#[derive(Error, Debug)]
pub enum Error {
/// A SOME/IP protocol-level error.
Expand All @@ -12,6 +17,13 @@ pub enum Error {
/// An E2E protection or checking error occurred.
#[error(transparent)]
E2e(#[from] crate::e2e::Error),
/// A fixed-capacity internal structure is full (e.g. a stack send
/// buffer smaller than the outgoing message). The argument is a
/// lowercase `snake_case` tag naming the resource; grep the crate for
/// the tag to find the compile-time constant that governs it. Current
/// tags: `"udp_buffer"` (→ `crate::UDP_BUFFER_SIZE`).
#[error("internal capacity exceeded: {0}")]
Capacity(&'static str),
}

impl From<crate::protocol::sd::Error> for Error {
Expand Down
Loading