Skip to content

Add raw mavlink zenoh driver#215

Merged
patrickelectric merged 15 commits into
bluerobotics:masterfrom
joaoantoniocardoso:add_raw_mavlink_zenoh_driver
Jun 12, 2026
Merged

Add raw mavlink zenoh driver#215
patrickelectric merged 15 commits into
bluerobotics:masterfrom
joaoantoniocardoso:add_raw_mavlink_zenoh_driver

Conversation

@joaoantoniocardoso

@joaoantoniocardoso joaoantoniocardoso commented Jun 9, 2026

Copy link
Copy Markdown
Member
image

Comment thread src/lib/drivers/zenoh/mod.rs Outdated
@@ -1 +1,102 @@
pub mod json;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

over having this here, isn't it better to have session.rs ?

Comment thread src/lib/drivers/zenoh/mod.rs Outdated
}

loop {
tokio::time::sleep(tokio::time::Duration::from_secs(86400)).await;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why is there a 24hs loop here ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a "forever" loop; it feels really odd indeed, I'll simplify it.

Comment thread src/lib/drivers/zenoh/mod.rs Outdated
async fn init_session_manager() {
SESSION_RX
.get_or_init(|| async {
let (tx, rx) = watch::channel(None);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a comment explaining the reason behind this task, it took me a while to understand what is doing and why is doing. But it would be nice just to have the reason of the loop itself, the how is indirectly related to the chosen implementation.

Comment thread src/lib/drivers/zenoh/mod.rs Outdated
return session;
}

if rx.changed().await.is_err() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some things that would improve the code, one of the is to add a log when the session changes and also a log when the session is created.

Comment thread src/lib/drivers/zenoh/raw.rs Outdated
error!("Failed to decode packet: {decode_error:?}");
continue;
}
Ok(None) => break,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a comment explaining why we are breaking here, having None as "not enough data" is not clear.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna move to a Framed adapter, following the same pattern as the other drivers.

Comment thread src/lib/drivers/zenoh/raw.rs Outdated
}
};

let bus_message = Arc::new(Protocol::new("zenohraw", packet));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this string needs to match with the one in the sender to avoid loop-back, this should be stored in a static variable.

Comment thread src/lib/drivers/zenoh/raw.rs Outdated
debug!(
"Dropping message: on_message_input callback returned error: {error:?}"
);
continue;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what ? should this be a break ? If so... why are we not printing the others calls status ?

Comment thread src/lib/drivers/zenoh/raw.rs Outdated
debug!(
"Dropping message: on_message_output callback returned error: {error:?}"
);
continue 'mainloop;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, how about the other futures calls ? Same as my previous comment.

@joaoantoniocardoso joaoantoniocardoso Jun 10, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait.. This should continue the outer loop, skipping the message transmission.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From patrick from the past: 4b22893

let mut timestamp_bytes = [0u8; 8];

loop {
'mainloop: loop {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you give more details about the "fix" in the commit description ?

let bus_message = Arc::new(Protocol::from_mavlink_raw(
content.header.inner,
&content.message,
DRIVER_IDENTIFIER,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added in 9e22904

Comment thread src/lib/drivers/zenoh/mod.rs Outdated
static SESSION_RX: OnceCell<watch::Receiver<Option<Arc<zenoh::Session>>>> = OnceCell::const_new();

#[instrument(level = "debug")]
pub(crate) fn build_config() -> anyhow::Result<zenoh::Config> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this here and in session.rs ?

Comment thread src/lib/drivers/zenoh/mod.rs Outdated
static SESSION_RX: OnceCell<watch::Receiver<Option<Arc<zenoh::Session>>>> = OnceCell::const_new();

#[instrument(level = "debug")]
pub(crate) fn build_config() -> anyhow::Result<zenoh::Config> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicated in session.rs

Comment thread src/lib/drivers/zenoh/mod.rs Outdated

use crate::cli::zenoh_config_file;

static SESSION_RX: OnceCell<watch::Receiver<Option<Arc<zenoh::Session>>>> = OnceCell::const_new();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being added here and removed: 1b06fb4

match decode_result {
Ok(Some(item)) => return Poll::Ready(Some(Ok(item))),
Ok(None) if self.end_of_stream => return Poll::Ready(None),
Ok(None) => { /* need more data */ }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@joaoantoniocardoso joaoantoniocardoso force-pushed the add_raw_mavlink_zenoh_driver branch from 7dec60c to e955c67 Compare June 11, 2026 18:45
@joaoantoniocardoso joaoantoniocardoso force-pushed the add_raw_mavlink_zenoh_driver branch from e955c67 to 4b49832 Compare June 11, 2026 20:37
@joaoantoniocardoso joaoantoniocardoso force-pushed the add_raw_mavlink_zenoh_driver branch from 4b49832 to 5a75631 Compare June 11, 2026 20:40

fn start_send(self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
self.publisher
.put(item.bytes().clone())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that this is missing .encoding, where the default is ZENOH_BYTES that represents the usage of zenoh specific serialization methods. For this, it appears that https://docs.rs/zenoh/latest/zenoh/bytes/struct.Encoding.html#associatedconstant.APPLICATION_OCTET_STREAM is the best.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch, I'll add it

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted for raw and JSON.

@joaoantoniocardoso joaoantoniocardoso force-pushed the add_raw_mavlink_zenoh_driver branch from 5a75631 to 47b2cdc Compare June 12, 2026 17:10
@patrickelectric patrickelectric merged commit bfc96df into bluerobotics:master Jun 12, 2026
18 of 19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants