Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,17 @@ use std::net::Ipv4Addr;

#[tokio::main]
async fn main() {
// Client::new returns a (Client, ClientUpdates) pair.
// Client is Clone-able and can be shared across tasks.
let (client, mut updates) = Client::<RawPayload>::new(Ipv4Addr::new(192, 168, 1, 100));
// Client::new returns a Clone-able handle, an update stream, and
// the run-loop future. The future must be actively driven — either
// spawned on the runtime as shown below, or awaited alongside your
// own work in a `tokio::select!`. If the future is never polled,
// Client method calls that send commands over the control channel
// will hang indefinitely waiting on their oneshot response.
// `Error::Shutdown` is returned only once the run-loop future has
// been dropped or its task cancelled.
let (client, mut updates, run) =
Client::<RawPayload>::new(Ipv4Addr::new(192, 168, 1, 100));
let _run_task = tokio::spawn(run);

// Bind the SD multicast socket to discover services
client.bind_discovery().await.unwrap();
Expand All @@ -95,12 +103,17 @@ use std::net::Ipv4Addr;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ServerConfig::new(Ipv4Addr::new(192, 168, 1, 200), 30500, 0x1234, 1);
let mut server = Server::new(config).await?;
server.start_announcing()?;
let announce_handle = tokio::spawn(server.announcement_loop()?);

let publisher = server.publisher();
tokio::spawn(async move { server.run().await });
let run_handle = tokio::spawn(async move { server.run().await });

// Publish events to subscribers...

tokio::select! {
res = announce_handle => eprintln!("announcement loop exited unexpectedly: {res:?}"),
res = run_handle => eprintln!("server run loop exited: {res:?}"),
}
Ok(())
}
```
Expand Down
9 changes: 5 additions & 4 deletions examples/client_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! This ensures remote nodes see a single coherent network identity for
//! multicast announcements.
//!
//! The server's built-in `start_announcing()` is NOT used — instead, the
//! The server's built-in `announcement_loop()` is NOT used — instead, the
//! client's `start_sd_announcements()` handles periodic multicast
//! announcements. The server's `run()` loop still handles unicast SD
//! traffic (e.g. `SubscribeAck`/`SubscribeNack` responses) on its own
Expand Down Expand Up @@ -106,7 +106,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// ── Create the client (handles discovery, subscriptions, SD socket) ──

let (client, mut updates) = simple_someip::Client::<Payload>::new(interface);
let (client, mut updates, run) = simple_someip::Client::<Payload>::new(interface);
let _run_handle = tokio::spawn(run);
client.bind_discovery().await?;
info!("Client discovery bound");

Expand All @@ -125,13 +126,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut server = Server::new(config).await?;
info!("Server bound on port {MY_SERVER_PORT}");

// NOTE: We intentionally do NOT call server.start_announcing().
// NOTE: We intentionally do NOT spawn server.announcement_loop().
// The client's start_sd_announcements handles all SD traffic.

let _publisher = server.publisher();
Comment thread
JustinKovacich marked this conversation as resolved.

// Spawn the server event loop (handles incoming subscriptions).
tokio::spawn(async move {
let _server_handle = tokio::spawn(async move {
if let Err(e) = server.run().await {
error!("Server error: {e}");
}
Expand Down
3 changes: 2 additions & 1 deletion examples/discovery_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ async fn main() -> Result<(), Error> {

info!("Starting discovery client on interface {interface}");

let (client, mut updates) = simple_someip::Client::<Payload>::new(interface);
let (client, mut updates, run) = simple_someip::Client::<Payload>::new(interface);
let _run_handle = tokio::spawn(run);
client.bind_discovery().await.unwrap();

let mut state = DiscoveryState::new();
Expand Down
16 changes: 14 additions & 2 deletions src/client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,26 @@ pub enum Error {
/// A fixed-capacity internal structure is full. The argument is a
/// lowercase `snake_case` tag naming the resource; grep the crate for
/// the tag to find the compile-time constant that governs it. Current
/// tags: `"unicast_sockets"` (→ `UNICAST_SOCKETS_CAP`), `"udp_buffer"`
/// (→ `crate::UDP_BUFFER_SIZE`).
/// tags:
/// - `"unicast_sockets"` → `UNICAST_SOCKETS_CAP`
/// - `"udp_buffer"` → `crate::UDP_BUFFER_SIZE`
/// - `"pending_responses"` → `PENDING_RESPONSES_CAP`
#[error("internal capacity exceeded: {0}")]
Capacity(&'static str),
/// An error surfaced by the pluggable transport backend (see
/// [`crate::transport::TransportError`]).
#[error(transparent)]
Transport(#[from] crate::transport::TransportError),
/// The client's internal run-loop future has exited — either because
/// the caller dropped it before or during polling, the executor
/// cancelled its task, or it returned. All public `Client` methods
/// that enqueue a control message or await its response return
/// this variant when the control channel is closed, rather than
/// panicking on `.unwrap()` of the send / recv result. Treat it as
/// a caller-side lifecycle error: the `Client` handle has outlived
/// its driver and further calls on it cannot make progress.
#[error("client run loop is no longer running")]
Shutdown,
}

#[cfg(test)]
Expand Down
Loading