Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
.DS_Store

/target

.claude/
CLAUDE.md

.DS_Store
lcov.info
/target
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ required-features = ["client-tokio", "server"]
[[test]]
name = "bare_metal_client"
required-features = ["client", "bare_metal"]

[[test]]
name = "static_channels_alloc_witness"
required-features = ["client", "bare_metal"]
6 changes: 5 additions & 1 deletion examples/bare_metal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ impl Future for MockSendFut {
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.get_mut();
if let Some(bytes) = me.bytes.take() {
me.pipe.send_queue.lock().unwrap().push_back((bytes, me.target));
me.pipe
.send_queue
.lock()
.unwrap()
.push_back((bytes, me.target));
}
Poll::Ready(Ok(()))
}
Expand Down
3 changes: 1 addition & 2 deletions examples/client_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// ── Create the client (handles discovery, subscriptions, SD socket) ──

let (client, mut updates, run_fut) =
simple_someip::Client::<Payload, _, _, _>::new(interface);
let (client, mut updates, run_fut) = simple_someip::Client::<Payload, _, _, _>::new(interface);
let _run_handle = tokio::spawn(run_fut);
client.bind_discovery().await?;
info!("Client discovery bound");
Expand Down
3 changes: 1 addition & 2 deletions examples/discovery_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ async fn main() -> Result<(), Error> {

info!("Starting discovery client on interface {interface}");

let (client, mut updates, run_fut) =
simple_someip::Client::<Payload, _, _, _>::new(interface);
let (client, mut updates, run_fut) = simple_someip::Client::<Payload, _, _, _>::new(interface);
let _run_handle = tokio::spawn(run_fut);
client.bind_discovery().await.unwrap();

Expand Down
83 changes: 61 additions & 22 deletions src/client/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use std::borrow::ToOwned;
use std::sync::{Arc, Mutex};
use tracing::{debug, error, info, trace, warn};

#[cfg(all(test, feature = "client-tokio"))]
use crate::e2e::E2ERegistry;
#[cfg(all(test, feature = "client-tokio"))]
use crate::tokio_transport::{TokioChannels, TokioSpawner, TokioTimer, TokioTransport};
use crate::{
Timer,
client::{
Expand All @@ -23,10 +27,6 @@ use crate::{
TransportSocket, UnboundedSend,
},
};
#[cfg(all(test, feature = "client-tokio"))]
use crate::e2e::E2ERegistry;
#[cfg(all(test, feature = "client-tokio"))]
use crate::tokio_transport::{TokioChannels, TokioSpawner, TokioTimer, TokioTransport};

use super::error::Error;

Expand All @@ -43,7 +43,7 @@ const PENDING_RESPONSES_CAP: usize = 64;
/// two.
const UNICAST_SOCKETS_CAP: usize = 8;

pub(super) enum ControlMessage<P: PayloadWireFormat + 'static, C: ChannelFactory> {
pub enum ControlMessage<P: PayloadWireFormat + 'static, C: ChannelFactory> {
SetInterface(Ipv4Addr, C::OneshotSender<Result<(), Error>>),
BindDiscovery(C::OneshotSender<Result<(), Error>>),
UnbindDiscovery(C::OneshotSender<Result<(), Error>>),
Expand Down Expand Up @@ -140,27 +140,39 @@ impl<P: PayloadWireFormat + 'static, C: ChannelFactory> std::fmt::Debug for Cont
}
}

impl<P: PayloadWireFormat + 'static, C: ChannelFactory> ControlMessage<P, C> {
impl<P, C> ControlMessage<P, C>
where
P: PayloadWireFormat + Send + 'static,
C: ChannelFactory,
Result<(), Error>: crate::transport::OneshotPooled<C>,
Result<P, Error>: crate::transport::OneshotPooled<C>,
Result<crate::protocol::sd::RebootFlag, Error>: crate::transport::OneshotPooled<C>,
{
#[must_use]
pub fn set_interface(interface: Ipv4Addr) -> (C::OneshotReceiver<Result<(), Error>>, Self) {
let (sender, receiver) = C::oneshot();
(receiver, Self::SetInterface(interface, sender))
}
#[must_use]
pub fn bind_discovery() -> (C::OneshotReceiver<Result<(), Error>>, Self) {
let (sender, receiver) = C::oneshot();
(receiver, Self::BindDiscovery(sender))
}
#[must_use]
pub fn unbind_discovery() -> (C::OneshotReceiver<Result<(), Error>>, Self) {
let (sender, receiver) = C::oneshot();
(receiver, Self::UnbindDiscovery(sender))
}

#[must_use]
pub fn send_sd(
socket_addr: SocketAddrV4,
header: P::SdHeader,
) -> (C::OneshotReceiver<Result<(), Error>>, Self) {
let (sender, receiver) = C::oneshot();
(receiver, Self::SendSD(socket_addr, header, sender))
}
#[must_use]
pub fn add_endpoint(
service_id: u16,
instance_id: u16,
Expand All @@ -174,6 +186,7 @@ impl<P: PayloadWireFormat + 'static, C: ChannelFactory> ControlMessage<P, C> {
)
}

#[must_use]
pub fn remove_endpoint(
service_id: u16,
instance_id: u16,
Expand All @@ -186,6 +199,7 @@ impl<P: PayloadWireFormat + 'static, C: ChannelFactory> ControlMessage<P, C> {
}

#[allow(clippy::type_complexity)]
#[must_use]
pub fn send_to_service(
service_id: u16,
instance_id: u16,
Expand All @@ -210,6 +224,7 @@ impl<P: PayloadWireFormat + 'static, C: ChannelFactory> ControlMessage<P, C> {
)
}

#[must_use]
pub fn subscribe(
service_id: u16,
instance_id: u16,
Expand All @@ -233,6 +248,7 @@ impl<P: PayloadWireFormat + 'static, C: ChannelFactory> ControlMessage<P, C> {
)
}

#[must_use]
pub fn query_reboot_flag() -> (
C::OneshotReceiver<Result<crate::protocol::sd::RebootFlag, Error>>,
Self,
Expand All @@ -242,6 +258,7 @@ impl<P: PayloadWireFormat + 'static, C: ChannelFactory> ControlMessage<P, C> {
}

#[cfg(all(test, feature = "client-tokio"))]
#[must_use]
pub fn force_sd_session_wrapped_for_test(
wrapped: bool,
) -> (C::OneshotReceiver<Result<(), Error>>, Self) {
Expand Down Expand Up @@ -299,13 +316,16 @@ pub(super) struct Inner<
C: ChannelFactory,
> {
/// MPSC Receiver used to receive control messages from outer client
control_receiver: C::BoundedReceiver<ControlMessage<PayloadDefinitions, C>>,
control_receiver: C::BoundedReceiver<ControlMessage<PayloadDefinitions, C>, 4>,
/// Queue of pending control messages to process
request_queue: Deque<ControlMessage<PayloadDefinitions, C>, REQUEST_QUEUE_CAP>,
/// Pending request-responses keyed by `request_id` (`client_id` << 16 | `session_counter`).
/// Set by `SendToService`, cleared when a matching unicast arrives.
pending_responses:
FnvIndexMap<u32, C::OneshotSender<Result<PayloadDefinitions, Error>>, PENDING_RESPONSES_CAP>,
pending_responses: FnvIndexMap<
u32,
C::OneshotSender<Result<PayloadDefinitions, Error>>,
PENDING_RESPONSES_CAP,
>,
/// Unbounded sender used to send updates to outer client
update_sender: C::UnboundedSender<ClientUpdate<PayloadDefinitions>>,
/// Target interface for sockets
Expand Down Expand Up @@ -370,7 +390,7 @@ impl<

impl<PayloadDefinitions, F, S, Tm, R, C> Inner<PayloadDefinitions, F, S, Tm, R, C>
where
PayloadDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + 'static,
PayloadDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + Send + 'static,
F: TransportFactory + Send + Sync + 'static,
F::Socket: Send + Sync + 'static,
for<'a> <F::Socket as TransportSocket>::SendFuture<'a>: Send,
Expand All @@ -379,6 +399,16 @@ where
Tm: Timer + Send + Sync + 'static,
R: E2ERegistryHandle,
C: ChannelFactory,
// Channel-bound bundle (see comment in `client::mod`).
Result<(), Error>: crate::transport::OneshotPooled<C>,
Result<PayloadDefinitions, Error>: crate::transport::OneshotPooled<C>,
Result<crate::protocol::sd::RebootFlag, Error>: crate::transport::OneshotPooled<C>,
ControlMessage<PayloadDefinitions, C>: crate::transport::BoundedPooled<C, 4>,
super::socket_manager::SendMessage<PayloadDefinitions, C>:
crate::transport::BoundedPooled<C, 16>,
Result<super::socket_manager::ReceivedMessage<PayloadDefinitions>, Error>:
crate::transport::BoundedPooled<C, 16>,
super::ClientUpdate<PayloadDefinitions>: crate::transport::UnboundedPooled<C>,
{
/// Construct an `Inner` and return the control/update channels plus
/// the run-loop future. The caller drives the future on its
Expand All @@ -398,7 +428,7 @@ where
spawner: S,
timer: Tm,
) -> (
C::BoundedSender<ControlMessage<PayloadDefinitions, C>>,
C::BoundedSender<ControlMessage<PayloadDefinitions, C>, 4>,
C::UnboundedReceiver<ClientUpdate<PayloadDefinitions>>,
impl core::future::Future<Output = ()> + Send + 'static,
) {
Expand Down Expand Up @@ -1012,9 +1042,7 @@ where
// `TokioTimer` (wrapping `tokio::time::sleep`); bare-metal
// builds plug in their own (e.g. an `embassy_time` shim).
let control_fut = control_receiver.recv().fuse();
let sleep_fut = timer
.sleep(core::time::Duration::from_millis(125))
.fuse();
let sleep_fut = timer.sleep(core::time::Duration::from_millis(125)).fuse();
let discovery_fut = Self::receive_discovery(discovery_socket).fuse();
let unicast_fut = Self::receive_any_unicast(unicast_sockets).fuse();
pin_mut!(control_fut, sleep_fut, discovery_fut, unicast_fut);
Expand Down Expand Up @@ -1177,8 +1205,8 @@ mod tests {
use crate::protocol::sd::test_support::{TestPayload, empty_sd_header};
use crate::transport::{OneshotRecv, UnboundedRecv};
use std::format;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot};

type TestControl = ControlMessage<TestPayload, TokioChannels>;
/// Type alias for the fully-spelled `Inner` flavor used throughout
Expand Down Expand Up @@ -1233,8 +1261,8 @@ mod tests {
/// the resulting `RecvError`, which is exactly what Copilot flagged.
#[test]
fn reject_with_capacity_notifies_every_sender() {
use futures::FutureExt;
use crate::transport::OneshotCancelled;
use futures::FutureExt;

fn expect_capacity<F>(rx: F, label: &str)
where
Expand Down Expand Up @@ -1286,8 +1314,12 @@ mod tests {
expect_capacity(send_rx.recv(), "SendToService.send_complete");
// resp_rx has type Result<TestPayload, Error> — check it separately
match resp_rx.recv().now_or_never() {
Some(Ok(Err(Error::Capacity(s)))) => assert_eq!(s, "request_queue", "SendToService.response"),
other => panic!("SendToService.response: expected Some(Ok(Err(Capacity))), got {other:?}"),
Some(Ok(Err(Error::Capacity(s)))) => {
assert_eq!(s, "request_queue", "SendToService.response");
}
other => {
panic!("SendToService.response: expected Some(Ok(Err(Capacity))), got {other:?}")
}
}
}

Expand Down Expand Up @@ -1520,7 +1552,9 @@ mod tests {
);
match displaced_result {
Err(Error::Capacity(tag)) => assert_eq!(tag, "pending_responses"),
other => panic!("expected Err(Error::Capacity(\\\"pending_responses\\\")), got {other:?}"),
other => {
panic!("expected Err(Error::Capacity(\\\"pending_responses\\\")), got {other:?}")
}
}

// The new sender is still live and pending.
Expand Down Expand Up @@ -1629,15 +1663,20 @@ mod tests {
// Drop control sender to trigger loop exit
drop(control_sender);
// The update receiver should eventually return None when the inner loop exits
let result =
tokio::time::timeout(std::time::Duration::from_secs(2), UnboundedRecv::recv(&mut update_receiver)).await;
let result = tokio::time::timeout(
std::time::Duration::from_secs(2),
UnboundedRecv::recv(&mut update_receiver),
)
.await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
}

/// Helper: verify inner loop is still alive by sending an `AddEndpoint` and
/// checking that a response arrives within 2 seconds.
async fn assert_inner_alive(control_sender: &Sender<ControlMessage<TestPayload, TokioChannels>>) {
async fn assert_inner_alive(
control_sender: &Sender<ControlMessage<TestPayload, TokioChannels>>,
) {
let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999);
let (rx, msg) = TestControl::add_endpoint(0xFFFE, 0xFFFE, addr, 0);
control_sender.send(msg).await.unwrap();
Expand Down
Loading