Conversation
|
https://github.com/jonhoo/bus might be a good option |
|
Hmmmm I think we need an async broadcast channel. How about https://crates.io/crates/async-broadcast? |
|
Design choice:
#[derive(Debug, Clone)]
pub struct Event {
pub entity_type_id: TypeId,
pub action: EventAction,
pub values: HashMap<String, Value>,
}
let mut tokio_receiver = db.set_event_stream(async_broadcast::broadcast(10));
while let Ok(event) = tokio_receiver.recv().await {
// Filter by entity
if event.of_entity::<cake::Entity>() {
// Unpack the values
if let Some(val) = event.values.get(cake::Column::Name.as_str()) {
todo!()
}
}
} |
use async_broadcast::{broadcast, TryRecvError, Receiver};
use futures_lite::{future::block_on, stream::StreamExt};
fn main() {
block_on(async move {
let (s1, mut r1) = broadcast(2);
let s2 = s1.clone();
let mut r2 = r1.clone();
// Send 2 messages from two different senders.
s1.broadcast(7).await.unwrap();
s2.broadcast(8).await.unwrap();
// Channel is now at capacity so sending more messages will result in an error.
assert!(s2.try_broadcast(9).unwrap_err().is_full());
assert!(s1.try_broadcast(10).unwrap_err().is_full());
// We can use `recv` method of the `Stream` implementation to receive messages.
assert_eq!(r1.next().await.unwrap(), 7);
assert_eq!(r1.recv().await.unwrap(), 8);
assert_eq!(r2.next().await.unwrap(), 7);
assert_eq!(r2.recv().await.unwrap(), 8);
// All receiver got all messages so channel is now empty.
assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
// Close one receiver, which closes all the sender and receivers.
Receiver::close(&r1);
println!("{}", s1.is_closed()); // prints True
s1.broadcast(10).await.unwrap(); // thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError(..)', src/main.rs:34:32
s2.broadcast(11).await.unwrap();
println!("{}", r2.next().await.unwrap());
println!("{}", r2.recv().await.unwrap());
// println!("{}", r1.next().await.unwrap());
// println!("{}", r1.recv().await.unwrap());
})
}There are 2 problems with this library
--------------EDITED-------------- Verifying if a subscriber get's past events. Cargo.toml use async_broadcast::{broadcast};
use futures::executor::block_on;
fn main() {
block_on(async move {
let (s1, mut r1) = broadcast(2);
let s2 = s1.clone();
// Event 7 should be received by only r1.
s1.broadcast(7).await.unwrap();
// new subscriber after event 7.
let mut r2 = r1.clone();
// event 8 should be received by both r1 and r2
s2.broadcast(8).await.unwrap();
// We can use `recv` method to receive messages.
assert_eq!(r1.recv().await.unwrap(), 7);
assert_eq!(r1.recv().await.unwrap(), 8);
// r2 should receive events 8 and onwards.
assert_eq!(r2.recv().await.unwrap(), 8); // this panics, received event is 7 which is past event for r2, it should be 8
})
}Future subscribers receive past event messages. Maybe all the receivers are reading from the same queue with their own index pointer. |
|
@Diwakar-Gupta thank you for the investigation. Your example is easy to read!
According to the docs, this is not the intended usage:
Note that, dropping the Receiver is a different operation from manually closing it.
Yes, I think this is a problem, although the example did not make it clear: we should clone r2 at the later point and verify the behaviour. Otherwise, I think it will be fine, because if the channel is closed, we will drop the Sender as well and do nothing. |
|
Are you also interested in investigating the Apart from semantics, an additional requirement is to be compatible with all async runtime we support. |
|
Hi @tyt2y3 I did some work with Cargo.toml fn main() {
use bus::Bus;
let mut tx = Bus::new(2); // len = 2
let mut rx1 = tx.add_rx();
tx.broadcast(1);
let mut rx2 = tx.add_rx(); // should not receive 1
tx.broadcast(2);
assert_eq!(rx1.recv(), Ok(1));
assert_eq!(rx1.recv(), Ok(2));
// rx2 don't receive past events
assert_eq!(rx2.recv(), Ok(2));
drop(rx2);
tx.broadcast(3);
assert_eq!(rx1.recv(), Ok(3));
// send more events than len and don't consume value
let mut rx3 = tx.add_rx();
tx.broadcast(4);
tx.broadcast(5);
assert_eq!(tx.try_broadcast(6), Err(6));
// 1 receiver consume's value
assert_eq!(rx3.recv(), Ok(4));
assert_eq!(tx.try_broadcast(6), Err(6));
// both receiver consumes value
assert_eq!(rx1.recv(), Ok(4));
assert_eq!(tx.try_broadcast(6), Ok(()));
}My observation
One other think I wanted to ask should we consider the way javascript web handles event, it takes a function ( function or object in our case ) |
|
Hey @Diwakar-Gupta, thanks for the experiments!! I didn't have time to do it myself just yet. So, glad you helped :))
One shouldn't invoke
I found that we cannot spawn a new receiver simply by calling Also, can we try |
|
@billy1624 @tyt2y3 |
PR Info
New Features
Breaking Changes
Changes