Add raw mavlink zenoh driver#215
Conversation
638f44e to
966a05d
Compare
| @@ -1 +1,102 @@ | |||
| pub mod json; | |||
|
|
|||
There was a problem hiding this comment.
over having this here, isn't it better to have session.rs ?
| } | ||
|
|
||
| loop { | ||
| tokio::time::sleep(tokio::time::Duration::from_secs(86400)).await; |
There was a problem hiding this comment.
Can you explain why is there a 24hs loop here ?
There was a problem hiding this comment.
It was a "forever" loop; it feels really odd indeed, I'll simplify it.
| async fn init_session_manager() { | ||
| SESSION_RX | ||
| .get_or_init(|| async { | ||
| let (tx, rx) = watch::channel(None); |
There was a problem hiding this comment.
It would be nice to have a comment explaining the reason behind this task, it took me a while to understand what is doing and why is doing. But it would be nice just to have the reason of the loop itself, the how is indirectly related to the chosen implementation.
| return session; | ||
| } | ||
|
|
||
| if rx.changed().await.is_err() { |
There was a problem hiding this comment.
Some things that would improve the code, one of the is to add a log when the session changes and also a log when the session is created.
| error!("Failed to decode packet: {decode_error:?}"); | ||
| continue; | ||
| } | ||
| Ok(None) => break, |
There was a problem hiding this comment.
It would be nice to have a comment explaining why we are breaking here, having None as "not enough data" is not clear.
There was a problem hiding this comment.
I'm gonna move to a Framed adapter, following the same pattern as the other drivers.
| } | ||
| }; | ||
|
|
||
| let bus_message = Arc::new(Protocol::new("zenohraw", packet)); |
There was a problem hiding this comment.
Since this string needs to match with the one in the sender to avoid loop-back, this should be stored in a static variable.
| debug!( | ||
| "Dropping message: on_message_input callback returned error: {error:?}" | ||
| ); | ||
| continue; |
There was a problem hiding this comment.
what ? should this be a break ? If so... why are we not printing the others calls status ?
| debug!( | ||
| "Dropping message: on_message_output callback returned error: {error:?}" | ||
| ); | ||
| continue 'mainloop; |
There was a problem hiding this comment.
wait, how about the other futures calls ? Same as my previous comment.
There was a problem hiding this comment.
wait.. This should continue the outer loop, skipping the message transmission.
966a05d to
7dec60c
Compare
| let mut timestamp_bytes = [0u8; 8]; | ||
|
|
||
| loop { | ||
| 'mainloop: loop { |
There was a problem hiding this comment.
can you give more details about the "fix" in the commit description ?
| let bus_message = Arc::new(Protocol::from_mavlink_raw( | ||
| content.header.inner, | ||
| &content.message, | ||
| DRIVER_IDENTIFIER, |
| static SESSION_RX: OnceCell<watch::Receiver<Option<Arc<zenoh::Session>>>> = OnceCell::const_new(); | ||
|
|
||
| #[instrument(level = "debug")] | ||
| pub(crate) fn build_config() -> anyhow::Result<zenoh::Config> { |
There was a problem hiding this comment.
why is this here and in session.rs ?
| static SESSION_RX: OnceCell<watch::Receiver<Option<Arc<zenoh::Session>>>> = OnceCell::const_new(); | ||
|
|
||
| #[instrument(level = "debug")] | ||
| pub(crate) fn build_config() -> anyhow::Result<zenoh::Config> { |
There was a problem hiding this comment.
This is duplicated in session.rs
|
|
||
| use crate::cli::zenoh_config_file; | ||
|
|
||
| static SESSION_RX: OnceCell<watch::Receiver<Option<Arc<zenoh::Session>>>> = OnceCell::const_new(); |
| match decode_result { | ||
| Ok(Some(item)) => return Poll::Ready(Some(Ok(item))), | ||
| Ok(None) if self.end_of_stream => return Poll::Ready(None), | ||
| Ok(None) => { /* need more data */ } |
7dec60c to
e955c67
Compare
e955c67 to
4b49832
Compare
4b49832 to
5a75631
Compare
|
|
||
| fn start_send(self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> { | ||
| self.publisher | ||
| .put(item.bytes().clone()) |
There was a problem hiding this comment.
It appears that this is missing .encoding, where the default is ZENOH_BYTES that represents the usage of zenoh specific serialization methods. For this, it appears that https://docs.rs/zenoh/latest/zenoh/bytes/struct.Encoding.html#associatedconstant.APPLICATION_OCTET_STREAM is the best.
There was a problem hiding this comment.
great catch, I'll add it
There was a problem hiding this comment.
Adjusted for raw and JSON.
These on_message_input/on_message_output callbacks are supposed to skip/drop messages when returning an "Err". The previous code was continuing inside the for loop, which won't skip/drop the message.
5a75631 to
47b2cdc
Compare
Uh oh!
There was an error while loading. Please reload this page.