From a7623c7178630207b01eda988974a77b2e04335c Mon Sep 17 00:00:00 2001 From: Brad Murray Date: Fri, 22 May 2026 15:51:18 -0400 Subject: [PATCH 1/6] sdk: Add support for MSC4471 event streams --- Cargo.lock | 27 +- Cargo.toml | 2 +- crates/matrix-sdk/Cargo.toml | 3 +- crates/matrix-sdk/src/client/mod.rs | 15 + crates/matrix-sdk/src/event_streams/mod.rs | 156 +++ .../matrix-sdk/src/event_streams/publisher.rs | 931 ++++++++++++++++++ .../src/event_streams/subscription.rs | 857 ++++++++++++++++ crates/matrix-sdk/src/lib.rs | 2 + crates/matrix-sdk/src/room/event_streams.rs | 94 ++ crates/matrix-sdk/src/room/mod.rs | 2 + 10 files changed, 2069 insertions(+), 20 deletions(-) create mode 100644 crates/matrix-sdk/src/event_streams/mod.rs create mode 100644 crates/matrix-sdk/src/event_streams/publisher.rs create mode 100644 crates/matrix-sdk/src/event_streams/subscription.rs create mode 100644 crates/matrix-sdk/src/room/event_streams.rs diff --git a/Cargo.lock b/Cargo.lock index 5c682cac982..e2bcefe3e1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4803,8 +4803,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e420da038fd6529af5abffe21df50ba122e1b4a84db05c02ec05b5ab0a21a320" +source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" dependencies = [ "assign", "js_int", @@ -4822,8 +4821,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a793e13cc9c354385e4f635b5eca581abe76169a9fafd8c530918f9b19f8d63" +source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" dependencies = [ "as_variant", "assign", @@ -4845,8 +4843,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b69b11cb6ccf0e27c3c44c50e2e4799337921c66d4e6a490c084f18c5b4481ec" +source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" dependencies = [ "as_variant", "base64", @@ -4879,8 +4876,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c96e3c39ab1b692086d02513fe0c24400d864060880bbd2716cb5544f5923131" +source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" dependencies = [ "as_variant", "indexmap", @@ -4903,8 +4899,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ebea8616d3d3cc26057f7856b9ef42aba1e37c44d2c136856134f3083a44ef" +source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" dependencies = [ "headers", "http", @@ -4925,8 +4920,7 @@ dependencies = [ [[package]] name = "ruma-html" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1d81a7300e8623dbf5e6d73e700f0277fce6824849d77779ed997ec4e280b97" +source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" dependencies = [ "as_variant", "html5ever", @@ -4937,8 +4931,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6cff00317675f487c4e7ccfb18875a14c5a14867b51d13f2a826053f03c432" +source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" dependencies = [ "js_int", "thiserror 2.0.18", @@ -4947,8 +4940,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ac022103cd7829721476d3df79d16125be159e99527c8ddb27f125e7b674e5c" +source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" dependencies = [ "as_variant", "cfg-if", @@ -4964,8 +4956,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134d4b6b5b039d3f52b3a28516f348c718ab5f0785fed1328588636c9c67b54c" +source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" dependencies = [ "base64", "ed25519-dalek", diff --git a/Cargo.toml b/Cargo.toml index aea2a91c70c..c03130b2ad4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,7 @@ rand = { version = "0.10.1", default-features = false, features = ["std", "std_r regex = { version = "1.12.2", default-features = false } reqwest = { version = "0.13.1", default-features = false } rmp-serde = { version = "1.3.0", default-features = false } -ruma = { version = "0.15.1", features = [ +ruma = { version = "0.15.1", git = "https://github.com/bradtgmurray/ruma", branch = "msc4471-event-streams", features = [ "client-api-c", "compat-unset-avatar", "compat-upload-signatures", diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 97e4ea36afe..ff3904917a6 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -16,7 +16,7 @@ features = ["docsrs"] rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] [features] -default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite"] +default = ["e2e-encryption", "automatic-room-key-forwarding", "sqlite", "experimental-event-streams"] testing = [ "matrix-sdk-sqlite?/testing", "matrix-sdk-indexeddb?/testing", @@ -81,6 +81,7 @@ federation-api = ["ruma/federation-api-c"] uniffi = ["dep:uniffi", "matrix-sdk-base/uniffi", "dep:matrix-sdk-ffi-macros"] experimental-widgets = ["dep:uuid", "experimental-send-custom-to-device"] +experimental-event-streams = ["e2e-encryption", "ruma/unstable-msc4471"] docsrs = ["e2e-encryption", "sqlite", "indexeddb", "sso-login", "qrcode", "federation-api"] diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 537fd116bc6..2f5072d7dd3 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -14,6 +14,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "experimental-event-streams")] +use std::sync::OnceLock; use std::{ collections::{BTreeMap, BTreeSet, btree_map}, fmt::{self, Debug}, @@ -87,6 +89,8 @@ use self::{ caches::{Cache, CachedValue, ClientCaches}, futures::SendRequest, }; +#[cfg(feature = "experimental-event-streams")] +use crate::event_streams::EventStreams; use crate::{ Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result, Room, SessionTokens, TransmissionProgress, @@ -394,6 +398,9 @@ pub(crate) struct ClientInner { #[cfg(feature = "e2e-encryption")] pub(crate) duplicate_key_upload_error_sender: broadcast::Sender>, + + #[cfg(feature = "experimental-event-streams")] + event_streams: OnceLock, } impl ClientInner { @@ -464,6 +471,8 @@ impl ClientInner { task_monitor: TaskMonitor::new(), #[cfg(feature = "e2e-encryption")] duplicate_key_upload_error_sender: broadcast::channel(1).0, + #[cfg(feature = "experimental-event-streams")] + event_streams: OnceLock::new(), }; #[allow(clippy::let_and_return)] @@ -514,6 +523,12 @@ impl Client { ClientBuilder::new() } + /// Get the client-owned event stream manager. + #[cfg(feature = "experimental-event-streams")] + pub fn event_streams(&self) -> EventStreams { + self.inner.event_streams.get_or_init(|| EventStreams::new(self.clone())).clone() + } + pub(crate) fn base_client(&self) -> &BaseClient { &self.inner.base_client } diff --git a/crates/matrix-sdk/src/event_streams/mod.rs b/crates/matrix-sdk/src/event_streams/mod.rs new file mode 100644 index 00000000000..7a89a8d7cdc --- /dev/null +++ b/crates/matrix-sdk/src/event_streams/mod.rs @@ -0,0 +1,156 @@ +//! Core support for MSC4471 event streams. +//! +//! Event streams let a client publish short-lived, device-scoped updates for a +//! room message without committing each intermediate update to room history. +//! This module keeps that state in memory and exposes publisher and subscriber +//! handles around the Ruma MSC4471 event content types. + +mod publisher; +mod subscription; + +use std::collections::BTreeMap; + +use ruma::{ + DeviceId, OwnedEventId, OwnedRoomId, TransactionId, UserId, + api::client::to_device::send_event_to_device::v3::Request as ToDeviceRequest, + events::{AnyToDeviceEventContent, ToDeviceEventContent}, + serde::Raw, + to_device::DeviceIdOrAllDevices, +}; +use thiserror::Error; +use tokio::sync::broadcast; + +pub use self::{ + publisher::{EventStreamPublisher, EventStreamPublisherOptions, EventStreamPublishers}, + subscription::{ + EventStreamSubscriberUpdate, EventStreamSubscription, EventStreamSubscriptions, + }, +}; +use crate::{Client, HttpError, room::edit::EditError}; + +/// A specialized result for event stream operations. +pub type Result = std::result::Result; + +/// An error returned by an event stream operation. +#[derive(Debug, Error)] +pub enum EventStreamError { + /// The operation needs the current device ID, but the client is not logged in. + #[error("event streams require a logged-in client")] + AuthenticationRequired, + + /// The requested stream is not active in this client's in-memory state. + #[error("unknown event stream")] + UnknownStream, + + /// The descriptor event is not an unredacted room message with a stream descriptor. + #[error("stream descriptor event is not an unredacted room message with a stream descriptor")] + InvalidDescriptorEvent, + + /// The descriptor event does not have a sender. + #[error("stream descriptor event does not have a sender")] + MissingDescriptorSender, + + /// An HTTP request failed. + #[error(transparent)] + Http(#[from] HttpError), + + /// A matrix-sdk operation failed. + #[error(transparent)] + Sdk(#[from] crate::Error), + + /// Creating a final edit event failed. + #[error(transparent)] + Edit(#[from] EditError), + + /// Serializing a to-device payload failed. + #[error(transparent)] + Json(#[from] serde_json::Error), + + /// The event stream update receiver lagged. + #[error("event stream update receiver lagged")] + Lagged, +} + +impl From for EventStreamError { + fn from(value: broadcast::error::RecvError) -> Self { + match value { + broadcast::error::RecvError::Closed => Self::UnknownStream, + broadcast::error::RecvError::Lagged(_) => Self::Lagged, + } + } +} + +/// Identifies a stream by the room event that advertised it. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct StreamId { + /// The room containing the descriptor event. + pub room_id: OwnedRoomId, + + /// The message event that advertised the stream. + pub event_id: OwnedEventId, +} + +impl StreamId { + /// Create a stream identifier from its descriptor event. + pub fn new(room_id: OwnedRoomId, event_id: OwnedEventId) -> Self { + Self { room_id, event_id } + } +} + +/// Client-owned namespace for publishing and subscribing to event streams. +#[derive(Clone, Debug)] +pub struct EventStreams { + publishers: EventStreamPublishers, + subscriptions: EventStreamSubscriptions, +} + +impl EventStreams { + pub(crate) fn new(client: Client) -> Self { + let publishers = EventStreamPublishers::new(client.clone()); + let subscriptions = EventStreamSubscriptions::new(client); + + Self { publishers, subscriptions } + } + + /// Access publisher-side event stream operations. + pub fn publishers(&self) -> EventStreamPublishers { + self.publishers.clone() + } + + /// Access subscriber-side event stream operations. + pub fn subscriptions(&self) -> EventStreamSubscriptions { + self.subscriptions.clone() + } +} + +/// Send one typed to-device event to one specific device. +async fn send_to_device( + client: &Client, + user_id: &UserId, + device_id: &DeviceId, + content: C, +) -> Result<()> +where + C: ToDeviceEventContent, +{ + let event_type = content.event_type(); + let messages = BTreeMap::from([( + user_id.to_owned(), + BTreeMap::from([( + DeviceIdOrAllDevices::DeviceId(device_id.to_owned()), + raw_content(&content)?, + )]), + )]); + + let request = ToDeviceRequest::new_raw(event_type, TransactionId::new(), messages); + client.send(request).await?; + + Ok(()) +} + +fn raw_content(content: &C) -> Result> +where + C: ToDeviceEventContent, +{ + Ok(Raw::new(content)?.cast_unchecked()) +} diff --git a/crates/matrix-sdk/src/event_streams/publisher.rs b/crates/matrix-sdk/src/event_streams/publisher.rs new file mode 100644 index 00000000000..5d9a37f61b3 --- /dev/null +++ b/crates/matrix-sdk/src/event_streams/publisher.rs @@ -0,0 +1,931 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::{Arc, Weak}, +}; + +use js_int::{UInt, uint}; +use matrix_sdk_common::executor::spawn; +use ruma::{ + DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId, + api::client::to_device::send_event_to_device::v3::Request as ToDeviceRequest, + events::{ + AnyStateEvent, AnySyncTimelineEvent, AnyToDeviceEventContent, StateEventType, + StaticEventContent, ToDeviceEvent, + event_stream::{ + StreamCancelCode, StreamCancelEventContent, StreamSubscribeEventContent, + StreamUpdateContent, StreamUpdateEventContent, StreamUpdateOperation, + }, + room::{ + history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, + member::{MembershipState, RoomMemberEventContent}, + message::RoomMessageEventContentWithoutRelation, + }, + }, + serde::Raw, + to_device::DeviceIdOrAllDevices, +}; +use tokio::sync::{Mutex, mpsc}; +use tracing::{debug, trace, warn}; + +use super::{EventStreamError, Result, StreamId, raw_content, send_to_device}; +use crate::{Client, Room, room::edit::EditedContent}; + +/// Options for publishing a stream. +#[derive(Clone, Debug)] +pub struct EventStreamPublisherOptions { + /// How long subscribers should treat the stream descriptor as usable. + pub descriptor_expiry_ms: UInt, +} + +impl Default for EventStreamPublisherOptions { + fn default() -> Self { + Self { descriptor_expiry_ms: uint!(300_000) } + } +} + +#[derive(Clone, Debug)] +struct PublisherSubscriber { + user_id: OwnedUserId, + device_id: OwnedDeviceId, + next_seq: UInt, + /// `None` means the next update for this subscriber must be a replace. + delivered_generation: Option, + delivered_offset: usize, +} + +#[derive(Debug)] +struct PublisherInner { + room: Room, + current_body: String, + generation: u64, + descriptor_expiry_ms: UInt, + descriptor_origin_server_ts: Option, + /// The original descriptor event body length, used as the initial delivered offset. + original_descriptor_body_len: usize, + subscribers: BTreeMap<(OwnedUserId, OwnedDeviceId), PublisherSubscriber>, +} + +#[derive(Clone, Debug)] +struct PublisherHandle { + state: Arc>, + update_sender: mpsc::UnboundedSender<()>, +} + +impl PublisherHandle { + fn new( + client: Client, + stream_id: StreamId, + room: Room, + descriptor_body: String, + descriptor_expiry_ms: UInt, + ) -> Self { + let (update_sender, update_receiver) = mpsc::unbounded_channel(); + let state = Arc::new(Mutex::new(PublisherInner { + room, + original_descriptor_body_len: descriptor_body.len(), + current_body: descriptor_body.clone(), + generation: 0, + descriptor_expiry_ms, + descriptor_origin_server_ts: None, + subscribers: Default::default(), + })); + + let update_loop = PublisherUpdateLoop { + client, + state: Arc::downgrade(&state), + stream_id, + update_receiver, + }; + let _ = spawn(async move { + update_loop.run().await; + }); + + Self { state, update_sender } + } + + async fn room(&self) -> Room { + self.state.lock().await.room.clone() + } + + async fn descriptor_expiry_ms(&self) -> UInt { + self.state.lock().await.descriptor_expiry_ms + } + + async fn set_descriptor_origin_server_ts(&self, ts: MilliSecondsSinceUnixEpoch) { + self.state.lock().await.descriptor_origin_server_ts = Some(ts); + } + + async fn register_subscription( + &self, + content: &StreamSubscribeEventContent, + sender: &UserId, + ) -> SubscriptionValidationResult { + let mut publisher = self.state.lock().await; + let key = (sender.to_owned(), content.subscriber_device_id.clone()); + let is_new = !publisher.subscribers.contains_key(&key); + let original_descriptor_body_len = publisher.original_descriptor_body_len; + + publisher.subscribers.entry(key.clone()).or_insert_with(|| PublisherSubscriber { + user_id: sender.to_owned(), + device_id: content.subscriber_device_id.clone(), + next_seq: uint!(1), + delivered_generation: (!content.resync).then_some(0), + delivered_offset: original_descriptor_body_len, + }); + + if let Some(subscriber) = publisher.subscribers.get_mut(&key) { + if content.resync { + subscriber.delivered_generation = None; + } + } + + Ok(content.resync + || (is_new + && (publisher.generation != 0 + || publisher.current_body.len() != publisher.original_descriptor_body_len))) + } + + async fn remove_subscriber( + &self, + subscriber_user_id: &UserId, + subscriber_device_id: &DeviceId, + ) -> bool { + self.state + .lock() + .await + .subscribers + .remove(&(subscriber_user_id.to_owned(), subscriber_device_id.to_owned())) + .is_some() + } + + async fn queue_update(&self, update: PublisherBodyUpdate) -> Result<()> { + { + let mut publisher = self.state.lock().await; + match update { + PublisherBodyUpdate::Replace(body) => { + publisher.current_body = body; + publisher.generation = publisher.generation.saturating_add(1); + } + PublisherBodyUpdate::Append(body) => { + publisher.current_body.push_str(&body); + } + } + } + + self.notify_update_loop() + } + + fn notify_update_loop(&self) -> Result<()> { + self.update_sender.send(()).map_err(|_| EventStreamError::UnknownStream) + } +} + +struct PublisherUpdateLoop { + client: Client, + state: Weak>, + stream_id: StreamId, + update_receiver: mpsc::UnboundedReceiver<()>, +} + +impl PublisherUpdateLoop { + async fn run(mut self) { + while self.update_receiver.recv().await.is_some() { + while self.update_receiver.try_recv().is_ok() {} + + if let Err(error) = self.send_pending_updates().await { + warn!("failed to send event stream update: {error}"); + } + + while self.update_receiver.try_recv().is_ok() {} + } + } + + async fn send_pending_updates(&self) -> Result<()> { + let Some(state) = self.state.upgrade() else { + return Ok(()); + }; + + loop { + let mut planned_updates = Vec::new(); + + { + let mut publisher = state.lock().await; + + // A subscription is bounded by its descriptor even if the subscriber never + // explicitly cancels it. + if publisher.descriptor_origin_server_ts.is_some_and(|origin_server_ts| { + descriptor_is_expired(origin_server_ts, publisher.descriptor_expiry_ms) + }) { + publisher.subscribers.clear(); + return Ok(()); + } + + for subscriber in publisher.subscribers.values() { + let Some(op) = make_update_for_subscriber( + subscriber, + publisher.generation, + publisher.current_body.as_str(), + ) else { + continue; + }; + + let update = StreamUpdateEventContent::new( + self.stream_id.room_id.clone(), + self.stream_id.event_id.clone(), + subscriber.next_seq, + op, + ); + + planned_updates.push(PlannedSubscriberUpdate { + user_id: subscriber.user_id.clone(), + device_id: subscriber.device_id.clone(), + content: raw_content(&update)?, + seq: subscriber.next_seq, + delivered_generation: publisher.generation, + delivered_offset: publisher.current_body.len(), + }); + } + } + + if planned_updates.is_empty() { + return Ok(()); + } + + trace!( + room_id = %self.stream_id.room_id, + event_id = %self.stream_id.event_id, + num_updates = planned_updates.len(), + "sending event stream updates" + ); + + // Create the to-device HTTP request payload and send it + let mut to_device_messages = BTreeMap::new(); + for planned_update in &planned_updates { + to_device_messages + .entry(planned_update.user_id.clone()) + .or_insert_with(BTreeMap::new) + .insert( + DeviceIdOrAllDevices::DeviceId(planned_update.device_id.clone()), + planned_update.content.clone(), + ); + } + let request = ToDeviceRequest::new_raw( + ::TYPE.into(), + TransactionId::new(), + to_device_messages, + ); + self.client.send(request).await?; + + trace!( + room_id = %self.stream_id.room_id, + event_id = %self.stream_id.event_id, + num_updates = planned_updates.len(), + "sent event stream updates" + ); + + // Now that we've successfully sent the to-device messages, update our internal state + // for each subscriber + let mut publisher = state.lock().await; + for planned_update in planned_updates { + let subscriber_key = (planned_update.user_id, planned_update.device_id); + + if let Some(subscriber) = publisher.subscribers.get_mut(&subscriber_key) { + subscriber.next_seq = + subscriber.next_seq.max(planned_update.seq.saturating_add(uint!(1))); + if Some(planned_update.delivered_generation) > subscriber.delivered_generation { + subscriber.delivered_generation = Some(planned_update.delivered_generation); + subscriber.delivered_offset = planned_update.delivered_offset; + } else if subscriber.delivered_generation + == Some(planned_update.delivered_generation) + { + subscriber.delivered_offset = + subscriber.delivered_offset.max(planned_update.delivered_offset); + } + } + } + } + } +} + +struct PlannedSubscriberUpdate { + user_id: OwnedUserId, + device_id: OwnedDeviceId, + content: Raw, + seq: UInt, + delivered_generation: u64, + delivered_offset: usize, +} + +#[derive(Debug)] +struct EventStreamPublishersInner { + client: Client, + publishers: Mutex>, +} + +/// Publisher-side event stream operations. +#[derive(Clone, Debug)] +pub struct EventStreamPublishers { + inner: Arc, +} + +impl EventStreamPublishers { + pub(super) fn new(client: Client) -> Self { + let publishers = Self { + inner: Arc::new(EventStreamPublishersInner { + client: client.clone(), + publishers: Default::default(), + }), + }; + + let handler = publishers.clone(); + client.add_event_handler(move |event: ToDeviceEvent| { + let handler = handler.clone(); + async move { handler.handle_subscribe(event).await } + }); + + let cancel_handler = publishers.clone(); + client.add_event_handler(move |event: ToDeviceEvent| { + let cancel_handler = cancel_handler.clone(); + async move { cancel_handler.handle_cancel(event).await } + }); + + publishers + } + + pub(crate) async fn create_publisher( + &self, + room: Room, + stream_id: StreamId, + descriptor_body: String, + descriptor_expiry_ms: UInt, + ) { + let handle = PublisherHandle::new( + self.inner.client.clone(), + stream_id.clone(), + room, + descriptor_body, + descriptor_expiry_ms, + ); + self.inner.publishers.lock().await.insert(stream_id.clone(), handle); + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + expiry_ms = ?descriptor_expiry_ms, + "created event stream publisher" + ); + } + + async fn handle_subscribe(&self, event: ToDeviceEvent) { + let sender = event.sender; + let content = event.content; + let stream_id = StreamId::new(content.room_id.clone(), content.event_id.clone()); + + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + subscriber_user_id = %sender, + subscriber_device_id = %content.subscriber_device_id, + resync = content.resync, + "received event stream subscription" + ); + + let (publisher, should_notify) = + match self.validate_and_register_subscription(&stream_id, &content, &sender).await { + Ok(result) => result, + Err((code, reason)) => { + debug!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + subscriber_user_id = %sender, + subscriber_device_id = %content.subscriber_device_id, + ?code, + reason, + "rejecting event stream subscription" + ); + if let Err(error) = + self.reject_subscription(&sender, &content, code, Some(reason)).await + { + warn!("failed to send event stream cancel: {error}"); + } + + return; + } + }; + + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + subscriber_user_id = %sender, + subscriber_device_id = %content.subscriber_device_id, + resync = content.resync, + should_notify, + "accepted event stream subscription" + ); + + if should_notify { + if let Err(error) = publisher.notify_update_loop() { + warn!("failed to schedule event stream update: {error}"); + } + } + } + + async fn handle_cancel(&self, event: ToDeviceEvent) { + let sender = event.sender; + let content = event.content; + let stream_id = StreamId::new(content.room_id, content.event_id); + + let Ok(publisher) = self.publisher(&stream_id).await else { + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + subscriber_user_id = %sender, + subscriber_device_id = %content.subscriber_device_id, + "ignored cancellation for unknown event stream publisher" + ); + return; + }; + + let removed = publisher.remove_subscriber(&sender, &content.subscriber_device_id).await; + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + subscriber_user_id = %sender, + subscriber_device_id = %content.subscriber_device_id, + removed, + "handled event stream subscriber cancellation" + ); + } + + async fn queue_update(&self, stream_id: &StreamId, update: PublisherBodyUpdate) -> Result<()> { + let operation = match &update { + PublisherBodyUpdate::Replace(_) => "replace", + PublisherBodyUpdate::Append(_) => "append", + }; + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + operation, + "queued event stream publisher update" + ); + self.publisher(stream_id).await?.queue_update(update).await + } + + async fn publisher(&self, stream_id: &StreamId) -> Result { + self.inner + .publishers + .lock() + .await + .get(stream_id) + .cloned() + .ok_or(EventStreamError::UnknownStream) + } + + async fn reject_subscription( + &self, + subscriber_user_id: &UserId, + content: &StreamSubscribeEventContent, + code: StreamCancelCode, + reason: Option, + ) -> Result<()> { + let mut cancel = StreamCancelEventContent::new( + content.room_id.clone(), + content.event_id.clone(), + content.subscriber_device_id.clone(), + code, + ); + cancel.reason = reason; + + send_to_device( + &self.inner.client, + subscriber_user_id, + &content.subscriber_device_id, + cancel, + ) + .await + } + + /// Validate and register a subscription, returning whether the update loop should be notified. + async fn validate_and_register_subscription( + &self, + stream_id: &StreamId, + content: &StreamSubscribeEventContent, + sender: &UserId, + ) -> SubscriptionValidationResult<(PublisherHandle, bool)> { + self.validate_subscriber_device(content, sender).await?; + let publisher = self.publisher(stream_id).await.map_err(|_| { + (StreamCancelCode::UnknownStream, "Unknown or expired stream".to_owned()) + })?; + self.validate_stream_visibility(&publisher, stream_id, sender).await?; + + let should_notify = publisher.register_subscription(content, sender).await?; + Ok((publisher, should_notify)) + } + + /// Check that updates would be sent to a device owned by the subscribing user. + async fn validate_subscriber_device( + &self, + content: &StreamSubscribeEventContent, + sender: &UserId, + ) -> SubscriptionValidationResult<()> { + if content.subscriber_device_id.as_str().is_empty() { + return Err(( + StreamCancelCode::InvalidSubscription, + "Empty subscriber device ID".to_owned(), + )); + } + + match self.inner.client.encryption().get_device(sender, &content.subscriber_device_id).await + { + Ok(Some(_)) => Ok(()), + Ok(None) => Err(( + StreamCancelCode::InvalidSubscription, + "Subscriber device does not belong to the subscribing user".to_owned(), + )), + Err(error) => { + warn!("failed to look up event stream subscriber device: {error}"); + Err(( + StreamCancelCode::InvalidSubscription, + "Subscriber device could not be validated".to_owned(), + )) + } + } + } + + /// Check that the subscriber is currently joined and could view the descriptor event under + /// the room history rules in effect when that event was sent. + async fn validate_stream_visibility( + &self, + publisher: &PublisherHandle, + stream_id: &StreamId, + sender: &UserId, + ) -> SubscriptionValidationResult<()> { + let room = publisher.room().await; + + match room.get_member(sender).await { + Ok(Some(member)) if *member.membership() == MembershipState::Join => {} + _ => { + return Err(( + StreamCancelCode::Forbidden, + "Subscriber is not joined to the room".to_owned(), + )); + } + } + + // FIXME: Requiring a network request while accepting a subscription is unfortunate, but the + // descriptor event's context is the most robust way to determine whether this particular + // member was allowed to see it. Current local state does not preserve all historical + // membership and visibility transitions. + let descriptor_context = room + .event_with_context(&stream_id.event_id, false, uint!(0), None) + .await + .map_err(|error| { + warn!("failed to load event stream descriptor context: {error}"); + (StreamCancelCode::UnknownStream, "Descriptor event is unavailable".to_owned()) + })?; + let descriptor_event = descriptor_context.event.ok_or_else(|| { + (StreamCancelCode::UnknownStream, "Descriptor event is unavailable".to_owned()) + })?; + let descriptor_event: AnySyncTimelineEvent = + descriptor_event.raw().deserialize().map_err(|error| { + warn!("failed to deserialize event stream descriptor event: {error}"); + (StreamCancelCode::UnknownStream, "Descriptor event is invalid".to_owned()) + })?; + let descriptor_ts = descriptor_event.origin_server_ts(); + publisher.set_descriptor_origin_server_ts(descriptor_ts).await; + + if descriptor_is_expired(descriptor_ts, publisher.descriptor_expiry_ms().await) { + return Err((StreamCancelCode::UnknownStream, "Unknown or expired stream".to_owned())); + } + + if !descriptor_is_visible_to_joined_member( + history_visibility_from_context(&descriptor_context.state), + membership_from_context(&descriptor_context.state, sender), + ) { + return Err(( + StreamCancelCode::Forbidden, + "Subscriber cannot see the stream descriptor event".to_owned(), + )); + } + + Ok(()) + } +} + +enum PublisherBodyUpdate { + Replace(String), + Append(String), +} + +type SubscriptionValidationResult = std::result::Result; + +/// A handle for updating a stream published by this client. +#[derive(Clone, Debug)] +pub struct EventStreamPublisher { + publishers: EventStreamPublishers, + stream_id: StreamId, +} + +impl EventStreamPublisher { + pub(crate) fn new(publishers: EventStreamPublishers, stream_id: StreamId) -> Self { + Self { publishers, stream_id } + } + + /// The room event backing this published stream. + pub fn stream_id(&self) -> &StreamId { + &self.stream_id + } + + /// Replace the current transient body for current subscribers. + pub async fn replace(&self, body: impl Into) -> Result<()> { + self.publishers + .queue_update(&self.stream_id, PublisherBodyUpdate::Replace(body.into())) + .await + } + + /// Append text to the current transient body for current subscribers. + pub async fn append(&self, body: impl Into) -> Result<()> { + self.publishers + .queue_update(&self.stream_id, PublisherBodyUpdate::Append(body.into())) + .await + } + + /// Finish the stream by editing the original message with final content. + pub async fn finish(self, final_content: RoomMessageEventContentWithoutRelation) -> Result<()> { + let room = self.publishers.publisher(&self.stream_id).await?.room().await; + + let edit = room + .make_edit_event(&self.stream_id.event_id, EditedContent::RoomMessage(final_content)) + .await?; + room.send(edit).await?; + + self.publishers.inner.publishers.lock().await.remove(&self.stream_id); + trace!( + room_id = %self.stream_id.room_id, + event_id = %self.stream_id.event_id, + "finalized event stream publisher" + ); + + Ok(()) + } +} + +fn make_update_for_subscriber( + subscriber: &PublisherSubscriber, + current_generation: u64, + current_body: &str, +) -> Option { + if subscriber.delivered_generation != Some(current_generation) { + return Some(StreamUpdateOperation::Replace(StreamUpdateContent::new( + current_body.to_owned(), + ))); + } + + if subscriber.delivered_offset == current_body.len() { + None + } else if subscriber.delivered_offset < current_body.len() + && current_body.is_char_boundary(subscriber.delivered_offset) + { + Some(StreamUpdateOperation::Append(StreamUpdateContent::new( + current_body[subscriber.delivered_offset..].to_owned(), + ))) + } else { + Some(StreamUpdateOperation::Replace(StreamUpdateContent::new(current_body.to_owned()))) + } +} + +fn descriptor_is_expired(origin_server_ts: MilliSecondsSinceUnixEpoch, expiry_ms: UInt) -> bool { + let expires_at = u64::from(origin_server_ts.0).saturating_add(u64::from(expiry_ms)); + u64::from(MilliSecondsSinceUnixEpoch::now().0) >= expires_at +} + +fn descriptor_is_visible_to_joined_member( + history_visibility: HistoryVisibility, + membership_at_descriptor: Option, +) -> bool { + match history_visibility { + HistoryVisibility::WorldReadable | HistoryVisibility::Shared => true, + HistoryVisibility::Invited => matches!( + membership_at_descriptor, + Some(MembershipState::Invite | MembershipState::Join) + ), + HistoryVisibility::Joined => membership_at_descriptor == Some(MembershipState::Join), + _ => false, + } +} + +fn history_visibility_from_context(state: &[Raw]) -> HistoryVisibility { + state + .iter() + .find(|event| { + event.get_field::("type").ok().flatten() + == Some(StateEventType::RoomHistoryVisibility) + }) + .and_then(|event| { + event.get_field::("content").ok().flatten() + }) + .map(|content| content.history_visibility) + .unwrap_or(HistoryVisibility::Shared) +} + +fn membership_from_context( + state: &[Raw], + user_id: &UserId, +) -> Option { + state + .iter() + .find(|event| { + event.get_field::("type").ok().flatten() + == Some(StateEventType::RoomMember) + && event.get_field::("state_key").ok().flatten().as_deref() + == Some(user_id) + }) + .and_then(|event| event.get_field::("content").ok().flatten()) + .map(|content| content.membership) +} + +#[cfg(test)] +mod tests { + use js_int::uint; + use matrix_sdk_test::{async_test, event_factory::EventFactory}; + use ruma::{MilliSecondsSinceUnixEpoch, event_id, owned_device_id, owned_user_id, room_id}; + use serde_json::json; + + use super::*; + use crate::test_utils::mocks::MatrixMockServer; + + #[test] + fn make_update_for_subscriber_uses_generation_and_offset() { + let mut subscriber = PublisherSubscriber { + user_id: owned_user_id!("@alice:example.org"), + device_id: owned_device_id!("ALICEDEVICE"), + next_seq: uint!(1), + delivered_generation: Some(0), + delivered_offset: "hello".len(), + }; + + assert!(make_update_for_subscriber(&subscriber, 0, "hello").is_none()); + + let Some(StreamUpdateOperation::Append(content)) = + make_update_for_subscriber(&subscriber, 0, "hello world") + else { + panic!("expected append"); + }; + assert_eq!(content.body, " world"); + + subscriber.delivered_generation = Some(1); + subscriber.delivered_offset = "hello world".len(); + + let Some(StreamUpdateOperation::Replace(content)) = + make_update_for_subscriber(&subscriber, 2, "goodbye") + else { + panic!("expected replace"); + }; + assert_eq!(content.body, "goodbye"); + + subscriber.delivered_generation = None; + subscriber.delivered_offset = "hello".len(); + + let Some(StreamUpdateOperation::Replace(content)) = + make_update_for_subscriber(&subscriber, 2, "hello world") + else { + panic!("expected replace"); + }; + assert_eq!(content.body, "hello world"); + } + + #[test] + fn restricted_history_requires_visible_membership_at_the_descriptor() { + assert!(descriptor_is_visible_to_joined_member( + HistoryVisibility::Joined, + Some(MembershipState::Join), + )); + assert!(!descriptor_is_visible_to_joined_member( + HistoryVisibility::Joined, + Some(MembershipState::Invite), + )); + assert!(descriptor_is_visible_to_joined_member( + HistoryVisibility::Invited, + Some(MembershipState::Invite), + )); + assert!(descriptor_is_visible_to_joined_member( + HistoryVisibility::Invited, + Some(MembershipState::Join), + )); + assert!(!descriptor_is_visible_to_joined_member(HistoryVisibility::Invited, None,)); + } + + #[test] + fn unrestricted_visibility_does_not_require_historical_membership() { + assert!(!descriptor_is_visible_to_joined_member(HistoryVisibility::Joined, None,)); + assert!(descriptor_is_visible_to_joined_member(HistoryVisibility::Shared, None,)); + } + + #[test] + fn descriptor_context_can_authorize_a_subscriber_invited_before_the_event() { + let subscriber = owned_user_id!("@subscriber:example.org"); + let state: Vec> = vec![ + serde_json::from_value(json!({ + "content": { "history_visibility": "invited" }, + "event_id": "$history", + "origin_server_ts": 1, + "sender": "@admin:example.org", + "state_key": "", + "type": "m.room.history_visibility" + })) + .unwrap(), + serde_json::from_value(json!({ + "content": { "membership": "invite" }, + "event_id": "$invite", + "origin_server_ts": 2, + "sender": "@admin:example.org", + "state_key": "@subscriber:example.org", + "type": "m.room.member" + })) + .unwrap(), + ]; + + let history_visibility = history_visibility_from_context(&state); + let membership = membership_from_context(&state, &subscriber); + + assert_eq!(history_visibility, HistoryVisibility::Invited); + assert_eq!(membership, Some(MembershipState::Invite)); + assert!(descriptor_is_visible_to_joined_member(history_visibility, membership)); + } + + #[async_test] + async fn test_finish_removes_the_publisher_after_sending_final_content() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let room_id = room_id!("!stream:localhost"); + let room = server.sync_joined_room(&client, room_id).await; + let descriptor_event_id = event_id!("$descriptor"); + let stream_id = StreamId::new(room_id.to_owned(), descriptor_event_id.to_owned()); + let sender = client.user_id().unwrap(); + let f = EventFactory::new().room(room_id); + + server + .mock_room_event() + .ok(f.text_msg("initial").sender(sender).event_id(descriptor_event_id).into_event()) + .expect(1) + .mount() + .await; + server.mock_room_state_encryption().plain().mount().await; + server.mock_room_send().ok(event_id!("$final")).expect(1).mount().await; + + let publishers = EventStreamPublishers::new(client); + publishers + .create_publisher(room, stream_id.clone(), "initial".to_owned(), uint!(300_000)) + .await; + let publisher = EventStreamPublisher::new(publishers.clone(), stream_id.clone()); + let finished_publisher = publisher.clone(); + + publisher.finish(RoomMessageEventContentWithoutRelation::text_plain("done")).await.unwrap(); + + assert!(matches!( + publishers.publisher(&stream_id).await, + Err(EventStreamError::UnknownStream) + )); + assert!(matches!( + finished_publisher.append("too late").await, + Err(EventStreamError::UnknownStream) + )); + } + + #[async_test] + async fn test_expired_descriptor_stops_updates_to_existing_subscribers() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let room_id = room_id!("!stream:localhost"); + let room = server.sync_joined_room(&client, room_id).await; + let stream_id = StreamId::new(room_id.to_owned(), event_id!("$descriptor").to_owned()); + let subscribers = EventStreamPublishers::new(client.clone()); + + subscribers.create_publisher(room, stream_id.clone(), "initial".to_owned(), uint!(0)).await; + let handle = subscribers.publisher(&stream_id).await.unwrap(); + handle.set_descriptor_origin_server_ts(MilliSecondsSinceUnixEpoch::now()).await; + handle + .register_subscription( + &StreamSubscribeEventContent::new( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + owned_device_id!("SUBSCRIBER"), + ), + owned_user_id!("@subscriber:localhost").as_ref(), + ) + .await + .unwrap(); + + let (_sender, update_receiver) = mpsc::unbounded_channel(); + PublisherUpdateLoop { + client, + state: Arc::downgrade(&handle.state), + stream_id, + update_receiver, + } + .send_pending_updates() + .await + .unwrap(); + + assert!(handle.state.lock().await.subscribers.is_empty()); + } +} diff --git a/crates/matrix-sdk/src/event_streams/subscription.rs b/crates/matrix-sdk/src/event_streams/subscription.rs new file mode 100644 index 00000000000..ef0b7920ca1 --- /dev/null +++ b/crates/matrix-sdk/src/event_streams/subscription.rs @@ -0,0 +1,857 @@ +use std::{collections::HashMap, sync::Arc}; + +use js_int::UInt; +use ruma::{ + OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, assign, + events::{ + ToDeviceEvent, + event_stream::{ + StreamCancelCode, StreamCancelEventContent, StreamDescriptor, + StreamSubscribeEventContent, StreamUpdateEventContent, StreamUpdateOperation, + }, + room::message::{OriginalSyncRoomMessageEvent, Relation}, + }, +}; +use tokio::sync::{Mutex, broadcast}; +use tracing::{debug, trace, warn}; + +use super::{EventStreamError, Result, StreamId, send_to_device}; +use crate::{Client, Room}; + +/// State changes observed by a stream subscriber. +#[derive(Clone, Debug)] +pub enum EventStreamSubscriberUpdate { + /// The subscriber received a complete transient body baseline. + Replaced { + /// The stream that changed. + stream_id: StreamId, + /// The complete current transient body. + body: String, + }, + + /// The subscriber applied an append update to an existing baseline. + Appended { + /// The stream that changed. + stream_id: StreamId, + /// The text carried by the append update. + appended_body: String, + /// The complete current transient body after the append. + body: String, + }, + + /// The publisher rejected the subscription or ended the stream for this subscriber. + Cancelled { + /// The stream that became terminal. + stream_id: StreamId, + /// The machine-readable cancellation reason from the publisher. + code: StreamCancelCode, + /// Optional human-readable context from the publisher. + reason: Option, + }, +} + +#[derive(Clone, Debug)] +struct SubscriberState { + publisher_user_id: OwnedUserId, + publisher_device_id: OwnedDeviceId, + subscriber_device_id: OwnedDeviceId, + latest_seq: Option, + current_body: Option, + append_valid: bool, + resync_pending: bool, +} + +#[derive(Debug)] +struct EventStreamSubscriptionsInner { + client: Client, + subscriptions: Mutex>, + updates_sender: broadcast::Sender, +} + +/// Subscriber-side event stream operations. +#[derive(Clone, Debug)] +pub struct EventStreamSubscriptions { + inner: Arc, +} + +impl EventStreamSubscriptions { + pub(super) fn new(client: Client) -> Self { + let (updates_sender, _) = broadcast::channel(128); + + let subscriptions = Self { + inner: Arc::new(EventStreamSubscriptionsInner { + client: client.clone(), + subscriptions: Default::default(), + updates_sender, + }), + }; + + // Receive update to-device events + let update_handler = subscriptions.clone(); + client.add_event_handler(move |event: ToDeviceEvent| { + let update_handler = update_handler.clone(); + async move { update_handler.handle_update(event).await } + }); + + // Receive cancel to-device events + let cancel_handler = subscriptions.clone(); + client.add_event_handler(move |event: ToDeviceEvent| { + let cancel_handler = cancel_handler.clone(); + async move { cancel_handler.handle_cancel(event).await } + }); + + // We need to listen for the event we're subscribing to being edited + let edit_handler = subscriptions.clone(); + client.add_event_handler(move |event: OriginalSyncRoomMessageEvent, room: Room| { + let edit_handler = edit_handler.clone(); + async move { edit_handler.handle_room_message(event, room).await } + }); + + subscriptions + } + + /// Subscribe to local subscriber-side stream state changes. + pub fn subscribe_to_updates(&self) -> broadcast::Receiver { + self.inner.updates_sender.subscribe() + } + + /// Query the current in-memory transient body for a subscribed stream. + /// + /// This body is not room history and is not written to the event cache. + pub async fn transient_body(&self, stream_id: &StreamId) -> Option { + self.inner + .subscriptions + .lock() + .await + .get(stream_id) + .and_then(|state| state.current_body.clone()) + } + + /// Subscribe this device to a stream advertised by another user's event. + pub async fn subscribe( + &self, + room_id: OwnedRoomId, + event_id: OwnedEventId, + publisher_user_id: OwnedUserId, + descriptor: StreamDescriptor, + descriptor_body: String, + ) -> Result { + let own_device_id = self + .inner + .client + .device_id() + .ok_or(EventStreamError::AuthenticationRequired)? + .to_owned(); + + let stream_id = StreamId::new(room_id.clone(), event_id.clone()); + + // Create our local state for managing the subscription. A duplicate subscription without + // a resync request should not cause us to lose already-applied transient content. + let state = SubscriberState { + publisher_user_id: publisher_user_id.clone(), + publisher_device_id: descriptor.device_id.clone(), + subscriber_device_id: own_device_id.clone(), + latest_seq: None, + current_body: Some(descriptor_body), + append_valid: true, + resync_pending: false, + }; + self.inner + .subscriptions + .lock() + .await + .entry(stream_id.clone()) + .and_modify(|state| { + state.publisher_user_id = publisher_user_id.clone(); + state.publisher_device_id = descriptor.device_id.clone(); + state.subscriber_device_id = own_device_id.clone(); + }) + .or_insert(state); + + // Send the to-device event to let the publisher know we'd like updates + let content = StreamSubscribeEventContent::new(room_id, event_id, own_device_id); + send_to_device(&self.inner.client, &publisher_user_id, &descriptor.device_id, content) + .await?; + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + publisher_user_id = %publisher_user_id, + publisher_device_id = %descriptor.device_id, + "subscribed to event stream" + ); + + Ok(EventStreamSubscription { subscriptions: self.clone(), stream_id }) + } + + /// Ask the publisher for a full replacement after incremental updates could not be applied. + async fn resync(&self, stream_id: &StreamId) -> Result<()> { + let (publisher_user_id, publisher_device_id, subscriber_device_id) = { + let subscriptions = self.inner.subscriptions.lock().await; + let state = subscriptions.get(stream_id).ok_or(EventStreamError::UnknownStream)?; + ( + state.publisher_user_id.clone(), + state.publisher_device_id.clone(), + state.subscriber_device_id.clone(), + ) + }; + + let content = assign!( + StreamSubscribeEventContent::new( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + subscriber_device_id, + ), + { resync: true } + ); + + send_to_device(&self.inner.client, &publisher_user_id, &publisher_device_id, content) + .await?; + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + publisher_user_id = %publisher_user_id, + publisher_device_id = %publisher_device_id, + "requested event stream resync" + ); + + Ok(()) + } + + /// Stop tracking a subscription locally and notify the publisher. + pub async fn unsubscribe(&self, stream_id: &StreamId) { + let Some(state) = self.inner.subscriptions.lock().await.remove(stream_id) else { + return; + }; + + let content = StreamCancelEventContent::new( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + state.subscriber_device_id, + StreamCancelCode::UserCancelled, + ); + + if let Err(error) = send_to_device( + &self.inner.client, + &state.publisher_user_id, + &state.publisher_device_id, + content, + ) + .await + { + warn!("failed to send event stream unsubscribe: {error}"); + } else { + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + publisher_user_id = %state.publisher_user_id, + publisher_device_id = %state.publisher_device_id, + "unsubscribed from event stream" + ); + } + } + + async fn handle_update(&self, event: ToDeviceEvent) { + let sender = event.sender; + let content = event.content; + let stream_id = StreamId::new(content.room_id.clone(), content.event_id.clone()); + + let update = { + let mut subscriptions = self.inner.subscriptions.lock().await; + let Some(state) = subscriptions.get_mut(&stream_id) else { + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + sender = %sender, + seq = ?content.seq, + "ignored update for untracked event stream" + ); + return; + }; + + if state.publisher_user_id != sender { + debug!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + sender = %sender, + expected_sender = %state.publisher_user_id, + seq = ?content.seq, + "ignored event stream update from unexpected sender" + ); + return; + } + + if state.latest_seq.is_some_and(|seq| content.seq <= seq) { + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + seq = ?content.seq, + latest_seq = ?state.latest_seq, + "ignored stale event stream update" + ); + return; + } + + let mut should_resync = false; + let update = match content.op { + StreamUpdateOperation::Replace(new_content) => { + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + seq = ?content.seq, + "applying event stream replacement update" + ); + state.latest_seq = Some(content.seq); + state.current_body = Some(new_content.body.clone()); + state.append_valid = true; + state.resync_pending = false; + + Some(EventStreamSubscriberUpdate::Replaced { + stream_id: stream_id.clone(), + body: new_content.body, + }) + } + StreamUpdateOperation::Append(append) => { + let expected_next = state + .latest_seq + .and_then(|seq| u64::try_from(seq).ok()) + .and_then(|seq| UInt::try_from(seq + 1).ok()) + .unwrap_or(js_int::uint!(1)); + + let can_append = state.append_valid + && state.current_body.is_some() + && expected_next == content.seq; + + if !can_append { + if !state.resync_pending { + debug!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + seq = ?content.seq, + expected_next = ?expected_next, + "cannot apply event stream append update; requesting resync" + ); + state.resync_pending = true; + should_resync = true; + } + state.append_valid = false; + state.latest_seq = Some(content.seq); + + None + } else { + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + seq = ?content.seq, + "applying event stream append update" + ); + let current = state.current_body.as_mut().expect("checked above"); + current.push_str(&append.body); + let body = current.clone(); + state.latest_seq = Some(content.seq); + + Some(EventStreamSubscriberUpdate::Appended { + stream_id: stream_id.clone(), + appended_body: append.body, + body, + }) + } + } + _ => { + if !state.resync_pending { + debug!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + seq = ?content.seq, + "unsupported event stream update operation; requesting resync" + ); + state.resync_pending = true; + should_resync = true; + } + state.append_valid = false; + state.latest_seq = Some(content.seq); + + None + } + }; + + (update, should_resync) + }; + + if let Some(update) = update.0 { + let _ = self.inner.updates_sender.send(update); + } + + if update.1 { + if let Err(error) = self.resync(&stream_id).await { + warn!("failed to resync event stream: {error}"); + if let Some(state) = self.inner.subscriptions.lock().await.get_mut(&stream_id) { + state.resync_pending = false; + } + } + } + } + + async fn handle_room_message(&self, event: OriginalSyncRoomMessageEvent, room: Room) { + let Some(Relation::Replacement(replacement)) = event.content.relates_to else { + return; + }; + + let stream_id = StreamId::new(room.room_id().to_owned(), replacement.event_id); + let mut subscriptions = self.inner.subscriptions.lock().await; + // This handler sees replacement relations directly, before timeline edit + // validation rejects edits sent by anyone other than the original sender. Make sure it's + // a valid edit before handling it + if let Some(state) = subscriptions.get(&stream_id) { + if state.publisher_user_id != event.sender { + debug!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + sender = %event.sender, + expected_sender = %state.publisher_user_id, + "ignored event stream final replacement from unexpected sender" + ); + return; + } + + subscriptions.remove(&stream_id); + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + "stopped tracking finalized event stream" + ); + } + } + + async fn handle_cancel(&self, event: ToDeviceEvent) { + let sender = event.sender; + let content = event.content; + let stream_id = StreamId::new(content.room_id.clone(), content.event_id.clone()); + + { + let mut subscriptions = self.inner.subscriptions.lock().await; + let Some(state) = subscriptions.get(&stream_id) else { + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + sender = %sender, + "ignored cancellation for untracked event stream" + ); + return; + }; + + if state.publisher_user_id != sender + || state.subscriber_device_id != content.subscriber_device_id + { + debug!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + sender = %sender, + subscriber_device_id = %content.subscriber_device_id, + "ignored event stream cancellation from unexpected sender or device" + ); + return; + } + + subscriptions.remove(&stream_id); + } + + trace!( + room_id = %stream_id.room_id, + event_id = %stream_id.event_id, + ?content.code, + "event stream subscription cancelled" + ); + let _ = self.inner.updates_sender.send(EventStreamSubscriberUpdate::Cancelled { + stream_id, + code: content.code, + reason: content.reason, + }); + } +} + +/// A handle for a stream subscription owned by this client. +#[derive(Clone, Debug)] +pub struct EventStreamSubscription { + subscriptions: EventStreamSubscriptions, + stream_id: StreamId, +} + +impl EventStreamSubscription { + /// The room event backing this subscribed stream. + pub fn stream_id(&self) -> &StreamId { + &self.stream_id + } + + /// Stop tracking this stream on this client. + pub async fn unsubscribe(self) { + self.subscriptions.unsubscribe(&self.stream_id).await; + } +} + +#[cfg(test)] +mod tests { + use js_int::uint; + use matrix_sdk_base::RoomState; + use matrix_sdk_test::async_test; + use ruma::{ + MilliSecondsSinceUnixEpoch, event_id, + events::{ + ToDeviceEvent, + event_stream::{StreamUpdateContent, StreamUpdateEventContent, StreamUpdateOperation}, + room::message::OriginalSyncRoomMessageEvent, + }, + owned_device_id, owned_user_id, room_id, + }; + use serde_json::json; + + use super::*; + use crate::test_utils::{logged_in_client, mocks::MatrixMockServer}; + + async fn track_stream(subscriptions: &EventStreamSubscriptions, stream_id: &StreamId) { + subscriptions.inner.subscriptions.lock().await.insert( + stream_id.clone(), + SubscriberState { + publisher_user_id: owned_user_id!("@publisher:example.org"), + publisher_device_id: owned_device_id!("PUBLISHER"), + subscriber_device_id: owned_device_id!("SUBSCRIBER"), + latest_seq: None, + current_body: Some("initial".to_owned()), + append_valid: true, + resync_pending: false, + }, + ); + } + + #[async_test] + async fn test_applies_updates_until_the_stream_is_finalized() { + let client = logged_in_client(None).await; + let room_id = room_id!("!room:example.org"); + client.base_client().get_or_create_room(room_id, RoomState::Joined); + let room = client.get_room(room_id).unwrap(); + let subscriptions = EventStreamSubscriptions::new(client); + let stream_id = StreamId::new(room_id.to_owned(), event_id!("$stream").to_owned()); + track_stream(&subscriptions, &stream_id).await; + + let update = |seq, operation| { + ToDeviceEvent::new( + owned_user_id!("@publisher:example.org"), + StreamUpdateEventContent::new( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + seq, + operation, + ), + ) + }; + + subscriptions + .handle_update(update( + uint!(1), + StreamUpdateOperation::Append(StreamUpdateContent::new(" one".to_owned())), + )) + .await; + assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial one")); + + subscriptions + .handle_update(update( + uint!(2), + StreamUpdateOperation::Append(StreamUpdateContent::new(" two".to_owned())), + )) + .await; + assert_eq!( + subscriptions.transient_body(&stream_id).await.as_deref(), + Some("initial one two") + ); + + subscriptions + .handle_update(update( + uint!(3), + StreamUpdateOperation::Replace(StreamUpdateContent::new("replaced".to_owned())), + )) + .await; + assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("replaced")); + + subscriptions + .handle_update(update( + uint!(4), + StreamUpdateOperation::Append(StreamUpdateContent::new(" again".to_owned())), + )) + .await; + assert_eq!( + subscriptions.transient_body(&stream_id).await.as_deref(), + Some("replaced again") + ); + + let final_edit: OriginalSyncRoomMessageEvent = serde_json::from_value(json!({ + "content": { + "msgtype": "m.text", "body": "* done", + "m.new_content": { "msgtype": "m.text", "body": "done" }, + "m.relates_to": { "rel_type": "m.replace", "event_id": "$stream" } + }, + "event_id": "$edit", + "origin_server_ts": MilliSecondsSinceUnixEpoch::now(), + "sender": "@publisher:example.org", + "type": "m.room.message" + })) + .unwrap(); + + subscriptions.handle_room_message(final_edit, room).await; + + assert!(subscriptions.transient_body(&stream_id).await.is_none()); + } + + #[async_test] + async fn test_ignores_updates_from_a_different_user() { + let subscriptions = EventStreamSubscriptions::new(logged_in_client(None).await); + let stream_id = StreamId::new( + room_id!("!room:example.org").to_owned(), + event_id!("$stream").to_owned(), + ); + track_stream(&subscriptions, &stream_id).await; + + let update = |body: &str| { + StreamUpdateEventContent::new( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + uint!(1), + StreamUpdateOperation::Replace(StreamUpdateContent::new(body.to_owned())), + ) + }; + + subscriptions + .handle_update(ToDeviceEvent::new( + owned_user_id!("@attacker:example.org"), + update("tampered"), + )) + .await; + assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial")); + + subscriptions + .handle_update(ToDeviceEvent::new( + owned_user_id!("@publisher:example.org"), + update("accepted"), + )) + .await; + assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("accepted")); + } + + #[async_test] + async fn test_invalid_final_replacement_from_another_user_keeps_subscription_state() { + let client = logged_in_client(None).await; + let room_id = room_id!("!room:example.org"); + client.base_client().get_or_create_room(room_id, RoomState::Joined); + let room = client.get_room(room_id).unwrap(); + let subscriptions = EventStreamSubscriptions::new(client); + let stream_id = StreamId::new(room_id.to_owned(), event_id!("$stream").to_owned()); + track_stream(&subscriptions, &stream_id).await; + + let event: OriginalSyncRoomMessageEvent = serde_json::from_value(json!({ + "content": { + "msgtype": "m.text", + "body": "* done", + "m.new_content": { "msgtype": "m.text", "body": "done" }, + "m.relates_to": { "rel_type": "m.replace", "event_id": "$stream" } + }, + "event_id": "$invalid_edit", + "origin_server_ts": MilliSecondsSinceUnixEpoch::now(), + "sender": "@attacker:example.org", + "type": "m.room.message" + })) + .unwrap(); + + subscriptions.handle_room_message(event, room).await; + + assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial")); + } + + #[async_test] + async fn test_stale_and_duplicate_updates_do_not_change_the_transient_body() { + let subscriptions = EventStreamSubscriptions::new(logged_in_client(None).await); + let stream_id = StreamId::new( + room_id!("!room:example.org").to_owned(), + event_id!("$stream").to_owned(), + ); + track_stream(&subscriptions, &stream_id).await; + + let update = |seq, body: &str| { + ToDeviceEvent::new( + owned_user_id!("@publisher:example.org"), + StreamUpdateEventContent::new( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + seq, + StreamUpdateOperation::Replace(StreamUpdateContent::new(body.to_owned())), + ), + ) + }; + + subscriptions.handle_update(update(uint!(2), "latest")).await; + subscriptions.handle_update(update(uint!(2), "duplicate")).await; + subscriptions.handle_update(update(uint!(1), "stale")).await; + + assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("latest")); + } + + #[async_test] + async fn test_duplicate_subscription_preserves_the_transient_baseline() { + let server = MatrixMockServer::new().await; + let subscriptions = EventStreamSubscriptions::new(server.client_builder().build().await); + let stream_id = StreamId::new( + room_id!("!room:example.org").to_owned(), + event_id!("$stream").to_owned(), + ); + track_stream(&subscriptions, &stream_id).await; + + let update = |seq, appended_body: &str| { + ToDeviceEvent::new( + owned_user_id!("@publisher:example.org"), + StreamUpdateEventContent::new( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + seq, + StreamUpdateOperation::Append(StreamUpdateContent::new( + appended_body.to_owned(), + )), + ), + ) + }; + + subscriptions.handle_update(update(uint!(1), " live")).await; + assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial live")); + + { + let _renewal = server.mock_send_to_device().ok().mock_once().mount_as_scoped().await; + subscriptions + .subscribe( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + owned_user_id!("@publisher:example.org"), + StreamDescriptor::new(owned_device_id!("PUBLISHER")), + "initial".to_owned(), + ) + .await + .unwrap(); + } + + assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial live")); + + { + let _no_resync = server.mock_send_to_device().ok().never().mount_as_scoped().await; + subscriptions.handle_update(update(uint!(2), " update")).await; + } + assert_eq!( + subscriptions.transient_body(&stream_id).await.as_deref(), + Some("initial live update") + ); + } + + #[async_test] + async fn test_requests_one_resync_until_a_replacement_restores_the_baseline() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let subscriptions = EventStreamSubscriptions::new(client); + let stream_id = StreamId::new( + room_id!("!room:example.org").to_owned(), + event_id!("$stream").to_owned(), + ); + track_stream(&subscriptions, &stream_id).await; + + let update = |seq, operation| { + ToDeviceEvent::new( + owned_user_id!("@publisher:example.org"), + StreamUpdateEventContent::new( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + seq, + operation, + ), + ) + }; + + // Missing seq 1 makes this append unusable and triggers the first resync request. + { + let _resync = server.mock_send_to_device().ok().mock_once().mount_as_scoped().await; + subscriptions + .handle_update(update( + uint!(2), + StreamUpdateOperation::Append(StreamUpdateContent::new("missing".to_owned())), + )) + .await; + } + + // The baseline is still invalid, but the outstanding resync suppresses a second request. + { + let _no_resync = server.mock_send_to_device().ok().never().mount_as_scoped().await; + subscriptions + .handle_update(update( + uint!(3), + StreamUpdateOperation::Append(StreamUpdateContent::new( + "still missing".to_owned(), + )), + )) + .await; + } + + // This is the resync update, we're good again + { + let _no_resync = server.mock_send_to_device().ok().never().mount_as_scoped().await; + subscriptions + .handle_update(update( + uint!(4), + StreamUpdateOperation::Replace(StreamUpdateContent::new("restored".to_owned())), + )) + .await; + } + + // This new gap makes the append unusable and triggers a second resync request. + { + let _resync = server.mock_send_to_device().ok().mock_once().mount_as_scoped().await; + subscriptions + .handle_update(update( + uint!(6), + StreamUpdateOperation::Append(StreamUpdateContent::new("new gap".to_owned())), + )) + .await; + } + } + + #[async_test] + async fn test_updates_after_final_replacement_are_ignored() { + let client = logged_in_client(None).await; + let room_id = room_id!("!room:example.org"); + client.base_client().get_or_create_room(room_id, RoomState::Joined); + let room = client.get_room(room_id).unwrap(); + let subscriptions = EventStreamSubscriptions::new(client); + let stream_id = StreamId::new(room_id.to_owned(), event_id!("$stream").to_owned()); + track_stream(&subscriptions, &stream_id).await; + + let event: OriginalSyncRoomMessageEvent = serde_json::from_value(json!({ + "content": { + "msgtype": "m.text", + "body": "* done", + "m.new_content": { "msgtype": "m.text", "body": "done" }, + "m.relates_to": { "rel_type": "m.replace", "event_id": "$stream" } + }, + "event_id": "$edit", + "origin_server_ts": MilliSecondsSinceUnixEpoch::now(), + "sender": "@publisher:example.org", + "type": "m.room.message" + })) + .unwrap(); + subscriptions.handle_room_message(event, room).await; + + subscriptions + .handle_update(ToDeviceEvent::new( + owned_user_id!("@publisher:example.org"), + StreamUpdateEventContent::new( + stream_id.room_id.clone(), + stream_id.event_id.clone(), + uint!(1), + StreamUpdateOperation::Replace(StreamUpdateContent::new("late".to_owned())), + ), + )) + .await; + + assert!(subscriptions.transient_body(&stream_id).await.is_none()); + } +} diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index b58de918355..4af701ff022 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -43,6 +43,8 @@ pub mod encryption; mod error; pub mod event_cache; pub mod event_handler; +#[cfg(feature = "experimental-event-streams")] +pub mod event_streams; mod http_client; pub mod latest_events; pub mod media; diff --git a/crates/matrix-sdk/src/room/event_streams.rs b/crates/matrix-sdk/src/room/event_streams.rs new file mode 100644 index 00000000000..75781219d39 --- /dev/null +++ b/crates/matrix-sdk/src/room/event_streams.rs @@ -0,0 +1,94 @@ +use matrix_sdk_base::deserialized_responses::TimelineEvent; +use ruma::{ + EventId, assign, + events::{ + AnyMessageLikeEventContent, AnySyncTimelineEvent, event_stream::StreamDescriptor, + room::message::RoomMessageEventContent, + }, +}; + +use super::Room; +use crate::event_streams::{ + EventStreamError, EventStreamPublisher, EventStreamPublisherOptions, EventStreamSubscription, + Result, StreamId, +}; + +impl Room { + /// Send a room message and return a publisher for sending transient updates to it. + pub async fn send_streaming_message( + &self, + message_content: RoomMessageEventContent, + options: EventStreamPublisherOptions, + ) -> Result { + let device_id = + self.client().device_id().ok_or(EventStreamError::AuthenticationRequired)?.to_owned(); + let content_with_descriptor = assign!(message_content.clone(), { + stream: Some(assign!(StreamDescriptor::new(device_id.clone()), { + expiry_ms: Some(options.descriptor_expiry_ms), + })), + }); + let response = self.send(content_with_descriptor).await?; + + let stream_id = StreamId::new(self.room_id().to_owned(), response.response.event_id); + let publishers = self.client().event_streams().publishers(); + publishers + .create_publisher( + self.clone(), + stream_id.clone(), + message_content.msgtype.body().to_owned(), + options.descriptor_expiry_ms, + ) + .await; + + Ok(EventStreamPublisher::new(publishers, stream_id)) + } + + /// Subscribe this device to a stream advertised by an event in this room. + pub async fn subscribe_to_event_stream( + &self, + event_id: &EventId, + ) -> Result { + let event = self.load_or_fetch_event(event_id, None).await?; + let publisher_user_id = event.sender().ok_or(EventStreamError::MissingDescriptorSender)?; + let (descriptor, descriptor_body) = stream_descriptor_and_body(&event)?; + + self.client() + .event_streams() + .subscriptions() + .subscribe( + self.room_id().to_owned(), + event_id.to_owned(), + publisher_user_id, + descriptor, + descriptor_body, + ) + .await + } + + /// Stop tracking the stream advertised by an event in this room. + pub async fn unsubscribe_from_event_stream(&self, event_id: &EventId) { + self.client() + .event_streams() + .subscriptions() + .unsubscribe(&StreamId::new(self.room_id().to_owned(), event_id.to_owned())) + .await; + } +} + +fn stream_descriptor_and_body(event: &TimelineEvent) -> Result<(StreamDescriptor, String)> { + match event.raw().deserialize()? { + AnySyncTimelineEvent::MessageLike(event) => { + let Some(AnyMessageLikeEventContent::RoomMessage(content)) = event.original_content() + else { + return Err(EventStreamError::InvalidDescriptorEvent); + }; + + let Some(descriptor) = content.stream else { + return Err(EventStreamError::InvalidDescriptorEvent); + }; + + Ok((descriptor, content.msgtype.body().to_owned())) + } + _ => Err(EventStreamError::InvalidDescriptorEvent), + } +} diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index b53c2ec34ef..899721bc737 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -189,6 +189,8 @@ use crate::{ }; pub mod edit; +#[cfg(feature = "experimental-event-streams")] +mod event_streams; pub mod futures; pub mod identity_status_changes; /// Contains code related to requests to join a room. From 2f4bb84d29ed215f9fdd4345bf41d7fececb896b Mon Sep 17 00:00:00 2001 From: Brad Murray Date: Fri, 22 May 2026 16:10:33 -0400 Subject: [PATCH 2/6] Fix formatting issues --- crates/matrix-sdk/src/event_streams/mod.rs | 6 +++-- .../matrix-sdk/src/event_streams/publisher.rs | 27 +++++++++++-------- .../src/event_streams/subscription.rs | 21 +++++++++------ crates/matrix-sdk/src/room/event_streams.rs | 3 ++- 4 files changed, 35 insertions(+), 22 deletions(-) diff --git a/crates/matrix-sdk/src/event_streams/mod.rs b/crates/matrix-sdk/src/event_streams/mod.rs index 7a89a8d7cdc..f42acdf1dc3 100644 --- a/crates/matrix-sdk/src/event_streams/mod.rs +++ b/crates/matrix-sdk/src/event_streams/mod.rs @@ -34,7 +34,8 @@ pub type Result = std::result::Result; /// An error returned by an event stream operation. #[derive(Debug, Error)] pub enum EventStreamError { - /// The operation needs the current device ID, but the client is not logged in. + /// The operation needs the current device ID, but the client is not logged + /// in. #[error("event streams require a logged-in client")] AuthenticationRequired, @@ -42,7 +43,8 @@ pub enum EventStreamError { #[error("unknown event stream")] UnknownStream, - /// The descriptor event is not an unredacted room message with a stream descriptor. + /// The descriptor event is not an unredacted room message with a stream + /// descriptor. #[error("stream descriptor event is not an unredacted room message with a stream descriptor")] InvalidDescriptorEvent, diff --git a/crates/matrix-sdk/src/event_streams/publisher.rs b/crates/matrix-sdk/src/event_streams/publisher.rs index 5d9a37f61b3..4f06a257f9b 100644 --- a/crates/matrix-sdk/src/event_streams/publisher.rs +++ b/crates/matrix-sdk/src/event_streams/publisher.rs @@ -60,7 +60,8 @@ struct PublisherInner { generation: u64, descriptor_expiry_ms: UInt, descriptor_origin_server_ts: Option, - /// The original descriptor event body length, used as the initial delivered offset. + /// The original descriptor event body length, used as the initial delivered + /// offset. original_descriptor_body_len: usize, subscribers: BTreeMap<(OwnedUserId, OwnedDeviceId), PublisherSubscriber>, } @@ -283,8 +284,8 @@ impl PublisherUpdateLoop { "sent event stream updates" ); - // Now that we've successfully sent the to-device messages, update our internal state - // for each subscriber + // Now that we've successfully sent the to-device messages, update our internal + // state for each subscriber let mut publisher = state.lock().await; for planned_update in planned_updates { let subscriber_key = (planned_update.user_id, planned_update.device_id); @@ -504,7 +505,8 @@ impl EventStreamPublishers { .await } - /// Validate and register a subscription, returning whether the update loop should be notified. + /// Validate and register a subscription, returning whether the update loop + /// should be notified. async fn validate_and_register_subscription( &self, stream_id: &StreamId, @@ -521,7 +523,8 @@ impl EventStreamPublishers { Ok((publisher, should_notify)) } - /// Check that updates would be sent to a device owned by the subscribing user. + /// Check that updates would be sent to a device owned by the subscribing + /// user. async fn validate_subscriber_device( &self, content: &StreamSubscribeEventContent, @@ -551,8 +554,9 @@ impl EventStreamPublishers { } } - /// Check that the subscriber is currently joined and could view the descriptor event under - /// the room history rules in effect when that event was sent. + /// Check that the subscriber is currently joined and could view the + /// descriptor event under the room history rules in effect when that + /// event was sent. async fn validate_stream_visibility( &self, publisher: &PublisherHandle, @@ -571,10 +575,11 @@ impl EventStreamPublishers { } } - // FIXME: Requiring a network request while accepting a subscription is unfortunate, but the - // descriptor event's context is the most robust way to determine whether this particular - // member was allowed to see it. Current local state does not preserve all historical - // membership and visibility transitions. + // FIXME: Requiring a network request while accepting a subscription is + // unfortunate, but the descriptor event's context is the most robust + // way to determine whether this particular member was allowed to see + // it. Current local state does not preserve all historical membership + // and visibility transitions. let descriptor_context = room .event_with_context(&stream_id.event_id, false, uint!(0), None) .await diff --git a/crates/matrix-sdk/src/event_streams/subscription.rs b/crates/matrix-sdk/src/event_streams/subscription.rs index ef0b7920ca1..c660c1bf6d9 100644 --- a/crates/matrix-sdk/src/event_streams/subscription.rs +++ b/crates/matrix-sdk/src/event_streams/subscription.rs @@ -39,7 +39,8 @@ pub enum EventStreamSubscriberUpdate { body: String, }, - /// The publisher rejected the subscription or ended the stream for this subscriber. + /// The publisher rejected the subscription or ended the stream for this + /// subscriber. Cancelled { /// The stream that became terminal. stream_id: StreamId, @@ -145,8 +146,9 @@ impl EventStreamSubscriptions { let stream_id = StreamId::new(room_id.clone(), event_id.clone()); - // Create our local state for managing the subscription. A duplicate subscription without - // a resync request should not cause us to lose already-applied transient content. + // Create our local state for managing the subscription. A duplicate + // subscription without a resync request should not cause us to lose + // already-applied transient content. let state = SubscriberState { publisher_user_id: publisher_user_id.clone(), publisher_device_id: descriptor.device_id.clone(), @@ -183,7 +185,8 @@ impl EventStreamSubscriptions { Ok(EventStreamSubscription { subscriptions: self.clone(), stream_id }) } - /// Ask the publisher for a full replacement after incremental updates could not be applied. + /// Ask the publisher for a full replacement after incremental updates could + /// not be applied. async fn resync(&self, stream_id: &StreamId) -> Result<()> { let (publisher_user_id, publisher_device_id, subscriber_device_id) = { let subscriptions = self.inner.subscriptions.lock().await; @@ -399,8 +402,8 @@ impl EventStreamSubscriptions { let stream_id = StreamId::new(room.room_id().to_owned(), replacement.event_id); let mut subscriptions = self.inner.subscriptions.lock().await; // This handler sees replacement relations directly, before timeline edit - // validation rejects edits sent by anyone other than the original sender. Make sure it's - // a valid edit before handling it + // validation rejects edits sent by anyone other than the original sender. Make + // sure it's a valid edit before handling it if let Some(state) = subscriptions.get(&stream_id) { if state.publisher_user_id != event.sender { debug!( @@ -768,7 +771,8 @@ mod tests { ) }; - // Missing seq 1 makes this append unusable and triggers the first resync request. + // Missing seq 1 makes this append unusable and triggers the first resync + // request. { let _resync = server.mock_send_to_device().ok().mock_once().mount_as_scoped().await; subscriptions @@ -779,7 +783,8 @@ mod tests { .await; } - // The baseline is still invalid, but the outstanding resync suppresses a second request. + // The baseline is still invalid, but the outstanding resync suppresses a second + // request. { let _no_resync = server.mock_send_to_device().ok().never().mount_as_scoped().await; subscriptions diff --git a/crates/matrix-sdk/src/room/event_streams.rs b/crates/matrix-sdk/src/room/event_streams.rs index 75781219d39..55720643e81 100644 --- a/crates/matrix-sdk/src/room/event_streams.rs +++ b/crates/matrix-sdk/src/room/event_streams.rs @@ -14,7 +14,8 @@ use crate::event_streams::{ }; impl Room { - /// Send a room message and return a publisher for sending transient updates to it. + /// Send a room message and return a publisher for sending transient updates + /// to it. pub async fn send_streaming_message( &self, message_content: RoomMessageEventContent, From 0443c9e15723c15812e2e6a022d1da09f51ff979 Mon Sep 17 00:00:00 2001 From: Brad Murray Date: Fri, 22 May 2026 16:17:11 -0400 Subject: [PATCH 3/6] doc(sdk): Add MSC4471 changelog entry --- crates/matrix-sdk/changelog.d/6607.added.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 crates/matrix-sdk/changelog.d/6607.added.md diff --git a/crates/matrix-sdk/changelog.d/6607.added.md b/crates/matrix-sdk/changelog.d/6607.added.md new file mode 100644 index 00000000000..37153983edf --- /dev/null +++ b/crates/matrix-sdk/changelog.d/6607.added.md @@ -0,0 +1,2 @@ +Add experimental support for MSC4471 event streams, allowing clients to send +transient updates to room messages and subscribe to updates from other devices. From 0836539428a31b6fba81afab316be187fe01415c Mon Sep 17 00:00:00 2001 From: Brad Murray Date: Fri, 22 May 2026 18:20:40 -0400 Subject: [PATCH 4/6] Fix event stream clippy warnings --- .../matrix-sdk/src/event_streams/publisher.rs | 18 ++++++++---------- .../src/event_streams/subscription.rs | 14 +++++++------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/crates/matrix-sdk/src/event_streams/publisher.rs b/crates/matrix-sdk/src/event_streams/publisher.rs index 4f06a257f9b..76dbac4b6fe 100644 --- a/crates/matrix-sdk/src/event_streams/publisher.rs +++ b/crates/matrix-sdk/src/event_streams/publisher.rs @@ -84,7 +84,7 @@ impl PublisherHandle { let state = Arc::new(Mutex::new(PublisherInner { room, original_descriptor_body_len: descriptor_body.len(), - current_body: descriptor_body.clone(), + current_body: descriptor_body, generation: 0, descriptor_expiry_ms, descriptor_origin_server_ts: None, @@ -97,7 +97,7 @@ impl PublisherHandle { stream_id, update_receiver, }; - let _ = spawn(async move { + spawn(async move { update_loop.run().await; }); @@ -134,10 +134,10 @@ impl PublisherHandle { delivered_offset: original_descriptor_body_len, }); - if let Some(subscriber) = publisher.subscribers.get_mut(&key) { - if content.resync { - subscriber.delivered_generation = None; - } + if let Some(subscriber) = publisher.subscribers.get_mut(&key) + && content.resync + { + subscriber.delivered_generation = None; } Ok(content.resync @@ -423,10 +423,8 @@ impl EventStreamPublishers { "accepted event stream subscription" ); - if should_notify { - if let Err(error) = publisher.notify_update_loop() { - warn!("failed to schedule event stream update: {error}"); - } + if should_notify && let Err(error) = publisher.notify_update_loop() { + warn!("failed to schedule event stream update: {error}"); } } diff --git a/crates/matrix-sdk/src/event_streams/subscription.rs b/crates/matrix-sdk/src/event_streams/subscription.rs index c660c1bf6d9..78dd5286919 100644 --- a/crates/matrix-sdk/src/event_streams/subscription.rs +++ b/crates/matrix-sdk/src/event_streams/subscription.rs @@ -316,7 +316,7 @@ impl EventStreamSubscriptions { StreamUpdateOperation::Append(append) => { let expected_next = state .latest_seq - .and_then(|seq| u64::try_from(seq).ok()) + .map(u64::from) .and_then(|seq| UInt::try_from(seq + 1).ok()) .unwrap_or(js_int::uint!(1)); @@ -384,12 +384,12 @@ impl EventStreamSubscriptions { let _ = self.inner.updates_sender.send(update); } - if update.1 { - if let Err(error) = self.resync(&stream_id).await { - warn!("failed to resync event stream: {error}"); - if let Some(state) = self.inner.subscriptions.lock().await.get_mut(&stream_id) { - state.resync_pending = false; - } + if update.1 + && let Err(error) = self.resync(&stream_id).await + { + warn!("failed to resync event stream: {error}"); + if let Some(state) = self.inner.subscriptions.lock().await.get_mut(&stream_id) { + state.resync_pending = false; } } } From 19c5f9fc8823fe94f7525089d1e6dd6c591e1504 Mon Sep 17 00:00:00 2001 From: Brad Murray Date: Mon, 25 May 2026 17:18:09 -0400 Subject: [PATCH 5/6] test(sdk): improve event stream coverage --- .../matrix-sdk/src/event_streams/publisher.rs | 457 ++++++++-- .../src/event_streams/subscription.rs | 790 +++++++++++++----- crates/matrix-sdk/src/test_utils/mocks/mod.rs | 15 + 3 files changed, 975 insertions(+), 287 deletions(-) diff --git a/crates/matrix-sdk/src/event_streams/publisher.rs b/crates/matrix-sdk/src/event_streams/publisher.rs index 76dbac4b6fe..456ada96dcf 100644 --- a/crates/matrix-sdk/src/event_streams/publisher.rs +++ b/crates/matrix-sdk/src/event_streams/publisher.rs @@ -750,11 +750,108 @@ fn membership_from_context( mod tests { use js_int::uint; use matrix_sdk_test::{async_test, event_factory::EventFactory}; - use ruma::{MilliSecondsSinceUnixEpoch, event_id, owned_device_id, owned_user_id, room_id}; + use ruma::{ + MilliSecondsSinceUnixEpoch, event_id, events::room::message::RoomMessageEventContent, + owned_device_id, owned_user_id, room_id, + }; use serde_json::json; use super::*; - use crate::test_utils::mocks::MatrixMockServer; + use crate::test_utils::mocks::{MatrixMockServer, RoomContextResponseTemplate}; + + struct SubscribableStreamFixture { + server: MatrixMockServer, + publisher: EventStreamPublisher, + stream_id: StreamId, + publisher_user_id: OwnedUserId, + subscriber_user_id: OwnedUserId, + subscriber_device_id: OwnedDeviceId, + } + + impl SubscribableStreamFixture { + fn subscription_content(&self) -> StreamSubscribeEventContent { + StreamSubscribeEventContent::new( + self.stream_id.room_id.clone(), + self.stream_id.event_id.clone(), + self.subscriber_device_id.clone(), + ) + } + } + + async fn set_up_subscribable_stream() -> SubscribableStreamFixture { + set_up_subscribable_stream_with_options(EventStreamPublisherOptions::default()).await + } + + async fn set_up_subscribable_stream_with_options( + options: EventStreamPublisherOptions, + ) -> SubscribableStreamFixture { + let server = MatrixMockServer::new().await; + server.mock_crypto_endpoints_preset().await; + let (client, subscriber_client) = server.set_up_alice_and_bob_for_encryption().await; + let room_id = room_id!("!stream:example.org"); + let publisher_user_id = client.user_id().unwrap().to_owned(); + let subscriber_user_id = subscriber_client.user_id().unwrap().to_owned(); + let subscriber_device_id = subscriber_client.device_id().unwrap().to_owned(); + let room = server.sync_joined_room(&client, room_id).await; + server.mock_room_state_encryption().expect_any_access_token().plain().mount().await; + server + .mock_room_send() + .expect_any_access_token() + .body_matches_partial_json(json!({ "body": "initial" })) + .ok(event_id!("$descriptor")) + .mock_once() + .mount() + .await; + let publisher = room + .send_streaming_message(RoomMessageEventContent::text_plain("initial"), options) + .await + .unwrap(); + let stream_id = publisher.stream_id().clone(); + + SubscribableStreamFixture { + server, + publisher, + stream_id, + publisher_user_id, + subscriber_user_id, + subscriber_device_id, + } + } + + async fn assert_subscription_rejected( + server: &MatrixMockServer, + publishers: &EventStreamPublishers, + subscriber_user_id: OwnedUserId, + content: StreamSubscribeEventContent, + code: StreamCancelCode, + reason: &str, + ) { + let mut expected = StreamCancelEventContent::new( + content.room_id.clone(), + content.event_id.clone(), + content.subscriber_device_id.clone(), + code, + ); + expected.reason = Some(reason.to_owned()); + let expected_body = json!({ + "messages": { + (subscriber_user_id.as_str()): { + (content.subscriber_device_id.as_str()): expected, + } + } + }); + + let _send = server + .mock_send_to_device() + .expect_any_access_token() + .for_type(::TYPE) + .body_json(expected_body) + .ok() + .mock_once() + .mount_as_scoped() + .await; + publishers.handle_subscribe(ToDeviceEvent::new(subscriber_user_id, content)).await; + } #[test] fn make_update_for_subscriber_uses_generation_and_offset() { @@ -794,33 +891,34 @@ mod tests { panic!("expected replace"); }; assert_eq!(content.body, "hello world"); - } - #[test] - fn restricted_history_requires_visible_membership_at_the_descriptor() { - assert!(descriptor_is_visible_to_joined_member( - HistoryVisibility::Joined, - Some(MembershipState::Join), - )); - assert!(!descriptor_is_visible_to_joined_member( - HistoryVisibility::Joined, - Some(MembershipState::Invite), - )); - assert!(descriptor_is_visible_to_joined_member( - HistoryVisibility::Invited, - Some(MembershipState::Invite), - )); - assert!(descriptor_is_visible_to_joined_member( - HistoryVisibility::Invited, - Some(MembershipState::Join), - )); - assert!(!descriptor_is_visible_to_joined_member(HistoryVisibility::Invited, None,)); + subscriber.delivered_generation = Some(2); + subscriber.delivered_offset = 1; + + let Some(StreamUpdateOperation::Replace(content)) = + make_update_for_subscriber(&subscriber, 2, "é") + else { + panic!("expected replace for a non-character-boundary offset"); + }; + assert_eq!(content.body, "é"); } #[test] - fn unrestricted_visibility_does_not_require_historical_membership() { - assert!(!descriptor_is_visible_to_joined_member(HistoryVisibility::Joined, None,)); - assert!(descriptor_is_visible_to_joined_member(HistoryVisibility::Shared, None,)); + fn descriptor_visibility_follows_history_visibility_and_historical_membership() { + for (visibility, membership, expected) in [ + (HistoryVisibility::WorldReadable, None, true), + (HistoryVisibility::Shared, None, true), + (HistoryVisibility::Invited, None, false), + (HistoryVisibility::Invited, Some(MembershipState::Invite), true), + (HistoryVisibility::Invited, Some(MembershipState::Join), true), + (HistoryVisibility::Invited, Some(MembershipState::Leave), false), + (HistoryVisibility::Joined, None, false), + (HistoryVisibility::Joined, Some(MembershipState::Invite), false), + (HistoryVisibility::Joined, Some(MembershipState::Join), true), + (HistoryVisibility::Joined, Some(MembershipState::Leave), false), + ] { + assert_eq!(descriptor_is_visible_to_joined_member(visibility, membership), expected); + } } #[test] @@ -855,37 +953,279 @@ mod tests { assert!(descriptor_is_visible_to_joined_member(history_visibility, membership)); } + #[async_test] + async fn test_sends_appended_and_replaced_updates_and_advances_delivery_state() { + let fixture = set_up_subscribable_stream().await; + let handle = fixture.publisher.publishers.publisher(&fixture.stream_id).await.unwrap(); + + // Pretend someone just sent us (the publisher) a to-device event to subscribe + assert!( + !handle + .register_subscription(&fixture.subscription_content(), &fixture.subscriber_user_id) + .await + .unwrap() + ); + + // Send a small append update + { + let _send = fixture + .server + .mock_send_to_device() + .expect_any_access_token() + .ok() + .mock_once() + .mount_as_scoped() + .await; + fixture.publisher.append(" live").await.unwrap(); + tokio::time::timeout(std::time::Duration::from_secs(1), async { + loop { + if handle.state.lock().await.subscribers.values().next().unwrap().next_seq + == uint!(2) + { + break; + } + tokio::task::yield_now().await; + } + }) + .await + .unwrap(); + } + + { + let state = handle.state.lock().await; + let subscriber = state.subscribers.values().next().unwrap(); + assert_eq!(subscriber.next_seq, uint!(2)); + assert_eq!(subscriber.delivered_generation, Some(0)); + assert_eq!(subscriber.delivered_offset, "initial live".len()); + } + + // Send a replace update + { + let _send = fixture + .server + .mock_send_to_device() + .expect_any_access_token() + .ok() + .mock_once() + .mount_as_scoped() + .await; + fixture.publisher.replace("replaced").await.unwrap(); + tokio::time::timeout(std::time::Duration::from_secs(1), async { + loop { + if handle.state.lock().await.subscribers.values().next().unwrap().next_seq + == uint!(3) + { + break; + } + tokio::task::yield_now().await; + } + }) + .await + .unwrap(); + } + + { + let state = handle.state.lock().await; + let subscriber = state.subscribers.values().next().unwrap(); + assert_eq!(subscriber.next_seq, uint!(3)); + assert_eq!(subscriber.delivered_generation, Some(1)); + assert_eq!(subscriber.delivered_offset, "replaced".len()); + } + + // Validate the to-device events were generated as expected + let requests = fixture.server.received_requests().await.unwrap(); + let bodies: Vec = requests + .iter() + .filter(|request| request.url.path().contains("/sendToDevice/")) + .map(|request| request.body_json().unwrap()) + .collect(); + assert_eq!(bodies.len(), 2); + assert_eq!( + bodies[0]["messages"][fixture.subscriber_user_id.as_str()] + [fixture.subscriber_device_id.as_str()], + serde_json::to_value(StreamUpdateEventContent::new( + fixture.stream_id.room_id.clone(), + fixture.stream_id.event_id.clone(), + uint!(1), + StreamUpdateOperation::Append(StreamUpdateContent::new(" live".to_owned())), + )) + .unwrap() + ); + assert_eq!( + bodies[1]["messages"][fixture.subscriber_user_id.as_str()] + [fixture.subscriber_device_id.as_str()], + serde_json::to_value(StreamUpdateEventContent::new( + fixture.stream_id.room_id.clone(), + fixture.stream_id.event_id.clone(), + uint!(2), + StreamUpdateOperation::Replace(StreamUpdateContent::new("replaced".to_owned())), + )) + .unwrap() + ); + } + + #[async_test] + async fn test_accepts_valid_subscription() { + let fixture = set_up_subscribable_stream().await; + let f = EventFactory::new() + .room(&fixture.stream_id.room_id) + .sender(&fixture.publisher_user_id) + .server_ts(u64::from(MilliSecondsSinceUnixEpoch::now().0)); + fixture + .server + .mock_get_members() + .ok(vec![f.member(&fixture.subscriber_user_id).into_raw()]) + .mock_once() + .mount() + .await; + fixture + .server + .mock_room_event_context() + .room(fixture.stream_id.room_id.clone()) + .match_event_id() + .ok(RoomContextResponseTemplate::new( + f.text_msg("initial").event_id(&fixture.stream_id.event_id).into_event(), + )) + .mock_once() + .mount() + .await; + + fixture + .publisher + .publishers + .handle_subscribe(ToDeviceEvent::new( + fixture.subscriber_user_id.clone(), + fixture.subscription_content(), + )) + .await; + + let handle = fixture.publisher.publishers.publisher(&fixture.stream_id).await.unwrap(); + assert!( + handle + .state + .lock() + .await + .subscribers + .contains_key(&(fixture.subscriber_user_id, fixture.subscriber_device_id)) + ); + } + + #[async_test] + async fn test_rejects_subscription_with_empty_device_id() { + let fixture = set_up_subscribable_stream().await; + let content = StreamSubscribeEventContent::new( + fixture.stream_id.room_id.clone(), + fixture.stream_id.event_id.clone(), + owned_device_id!(""), + ); + + assert_subscription_rejected( + &fixture.server, + &fixture.publisher.publishers, + fixture.subscriber_user_id, + content, + StreamCancelCode::InvalidSubscription, + "Empty subscriber device ID", + ) + .await; + } + + #[async_test] + async fn test_rejects_subscriber_that_is_not_joined_to_the_room() { + let fixture = set_up_subscribable_stream().await; + fixture.server.mock_get_members().ok(Vec::new()).mock_once().mount().await; + + assert_subscription_rejected( + &fixture.server, + &fixture.publisher.publishers, + fixture.subscriber_user_id.clone(), + fixture.subscription_content(), + StreamCancelCode::Forbidden, + "Subscriber is not joined to the room", + ) + .await; + } + + #[async_test] + async fn test_cancel_removes_subscriber_and_stops_updates() { + let fixture = set_up_subscribable_stream().await; + let handle = fixture.publisher.publishers.publisher(&fixture.stream_id).await.unwrap(); + handle + .register_subscription(&fixture.subscription_content(), &fixture.subscriber_user_id) + .await + .unwrap(); + + fixture + .publisher + .publishers + .handle_cancel(ToDeviceEvent::new( + fixture.subscriber_user_id.clone(), + StreamCancelEventContent::new( + fixture.stream_id.room_id.clone(), + fixture.stream_id.event_id.clone(), + fixture.subscriber_device_id.clone(), + StreamCancelCode::UserCancelled, + ), + )) + .await; + assert!(handle.state.lock().await.subscribers.is_empty()); + + let (_sender, update_receiver) = mpsc::unbounded_channel(); + let _no_send = fixture + .server + .mock_send_to_device() + .expect_any_access_token() + .ok() + .never() + .mount_as_scoped() + .await; + fixture.publisher.append(" after cancellation").await.unwrap(); + PublisherUpdateLoop { + client: fixture.publisher.publishers.inner.client.clone(), + state: Arc::downgrade(&handle.state), + stream_id: fixture.stream_id, + update_receiver, + } + .send_pending_updates() + .await + .unwrap(); + } + #[async_test] async fn test_finish_removes_the_publisher_after_sending_final_content() { - let server = MatrixMockServer::new().await; - let client = server.client_builder().build().await; - let room_id = room_id!("!stream:localhost"); - let room = server.sync_joined_room(&client, room_id).await; - let descriptor_event_id = event_id!("$descriptor"); - let stream_id = StreamId::new(room_id.to_owned(), descriptor_event_id.to_owned()); - let sender = client.user_id().unwrap(); - let f = EventFactory::new().room(room_id); + let fixture = set_up_subscribable_stream().await; + let f = EventFactory::new().room(&fixture.stream_id.room_id); - server + fixture + .server .mock_room_event() - .ok(f.text_msg("initial").sender(sender).event_id(descriptor_event_id).into_event()) + .expect_any_access_token() + .ok(f + .text_msg("initial") + .sender(&fixture.publisher_user_id) + .event_id(&fixture.stream_id.event_id) + .into_event()) .expect(1) .mount() .await; - server.mock_room_state_encryption().plain().mount().await; - server.mock_room_send().ok(event_id!("$final")).expect(1).mount().await; - - let publishers = EventStreamPublishers::new(client); - publishers - .create_publisher(room, stream_id.clone(), "initial".to_owned(), uint!(300_000)) + fixture + .server + .mock_room_send() + .expect_any_access_token() + .ok(event_id!("$final")) + .expect(1) + .mount() .await; - let publisher = EventStreamPublisher::new(publishers.clone(), stream_id.clone()); - let finished_publisher = publisher.clone(); + let finished_publisher = fixture.publisher.clone(); - publisher.finish(RoomMessageEventContentWithoutRelation::text_plain("done")).await.unwrap(); + fixture + .publisher + .finish(RoomMessageEventContentWithoutRelation::text_plain("done")) + .await + .unwrap(); assert!(matches!( - publishers.publisher(&stream_id).await, + finished_publisher.publishers.publisher(&fixture.stream_id).await, Err(EventStreamError::UnknownStream) )); assert!(matches!( @@ -896,33 +1236,22 @@ mod tests { #[async_test] async fn test_expired_descriptor_stops_updates_to_existing_subscribers() { - let server = MatrixMockServer::new().await; - let client = server.client_builder().build().await; - let room_id = room_id!("!stream:localhost"); - let room = server.sync_joined_room(&client, room_id).await; - let stream_id = StreamId::new(room_id.to_owned(), event_id!("$descriptor").to_owned()); - let subscribers = EventStreamPublishers::new(client.clone()); - - subscribers.create_publisher(room, stream_id.clone(), "initial".to_owned(), uint!(0)).await; - let handle = subscribers.publisher(&stream_id).await.unwrap(); + let fixture = set_up_subscribable_stream_with_options(EventStreamPublisherOptions { + descriptor_expiry_ms: uint!(0), + }) + .await; + let handle = fixture.publisher.publishers.publisher(&fixture.stream_id).await.unwrap(); handle.set_descriptor_origin_server_ts(MilliSecondsSinceUnixEpoch::now()).await; handle - .register_subscription( - &StreamSubscribeEventContent::new( - stream_id.room_id.clone(), - stream_id.event_id.clone(), - owned_device_id!("SUBSCRIBER"), - ), - owned_user_id!("@subscriber:localhost").as_ref(), - ) + .register_subscription(&fixture.subscription_content(), &fixture.subscriber_user_id) .await .unwrap(); let (_sender, update_receiver) = mpsc::unbounded_channel(); PublisherUpdateLoop { - client, + client: fixture.publisher.publishers.inner.client.clone(), state: Arc::downgrade(&handle.state), - stream_id, + stream_id: fixture.stream_id, update_receiver, } .send_pending_updates() diff --git a/crates/matrix-sdk/src/event_streams/subscription.rs b/crates/matrix-sdk/src/event_streams/subscription.rs index 78dd5286919..7a50e963129 100644 --- a/crates/matrix-sdk/src/event_streams/subscription.rs +++ b/crates/matrix-sdk/src/event_streams/subscription.rs @@ -494,289 +494,644 @@ impl EventStreamSubscription { #[cfg(test)] mod tests { use js_int::uint; - use matrix_sdk_base::RoomState; - use matrix_sdk_test::async_test; + use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory}; use ruma::{ MilliSecondsSinceUnixEpoch, event_id, events::{ - ToDeviceEvent, + StaticEventContent, ToDeviceEvent, event_stream::{StreamUpdateContent, StreamUpdateEventContent, StreamUpdateOperation}, - room::message::OriginalSyncRoomMessageEvent, + room::message::{OriginalSyncRoomMessageEvent, RoomMessageEventContentWithoutRelation}, }, owned_device_id, owned_user_id, room_id, }; use serde_json::json; + use wiremock::ResponseTemplate; use super::*; use crate::test_utils::{logged_in_client, mocks::MatrixMockServer}; - async fn track_stream(subscriptions: &EventStreamSubscriptions, stream_id: &StreamId) { - subscriptions.inner.subscriptions.lock().await.insert( - stream_id.clone(), - SubscriberState { + struct SubscribableEventFixture { + server: Option, + subscriptions: EventStreamSubscriptions, + stream_id: StreamId, + publisher_user_id: OwnedUserId, + publisher_device_id: OwnedDeviceId, + subscriber_device_id: OwnedDeviceId, + } + + impl SubscribableEventFixture { + fn new(client: Client, subscriber_device_id: OwnedDeviceId) -> Self { + let subscriptions = EventStreamSubscriptions::new(client); + let stream_id = StreamId::new( + room_id!("!room:example.org").to_owned(), + event_id!("$stream").to_owned(), + ); + + Self { + server: None, + subscriptions, + stream_id, publisher_user_id: owned_user_id!("@publisher:example.org"), publisher_device_id: owned_device_id!("PUBLISHER"), - subscriber_device_id: owned_device_id!("SUBSCRIBER"), - latest_seq: None, - current_body: Some("initial".to_owned()), - append_valid: true, - resync_pending: false, - }, - ); - } + subscriber_device_id, + } + } - #[async_test] - async fn test_applies_updates_until_the_stream_is_finalized() { - let client = logged_in_client(None).await; - let room_id = room_id!("!room:example.org"); - client.base_client().get_or_create_room(room_id, RoomState::Joined); - let room = client.get_room(room_id).unwrap(); - let subscriptions = EventStreamSubscriptions::new(client); - let stream_id = StreamId::new(room_id.to_owned(), event_id!("$stream").to_owned()); - track_stream(&subscriptions, &stream_id).await; - - let update = |seq, operation| { + /// Creates a fixture as if its subscription request has already been + /// accepted. + async fn tracked() -> Self { + let fixture = Self::new(logged_in_client(None).await, owned_device_id!("SUBSCRIBER")); + fixture.track_stream().await; + fixture + } + + /// Creates a mock-backed fixture without an accepted subscription yet. + async fn with_mock_server() -> Self { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let subscriber_device_id = client.device_id().unwrap().to_owned(); + let mut fixture = Self::new(client, subscriber_device_id); + fixture.server = Some(server); + fixture + } + + /// Creates mock-backed state as if its subscription request has already + /// been accepted. + async fn tracked_with_mock_server() -> Self { + let fixture = Self::with_mock_server().await; + fixture.track_stream().await; + fixture + } + + async fn track_stream(&self) { + self.subscriptions.inner.subscriptions.lock().await.insert( + self.stream_id.clone(), + SubscriberState { + publisher_user_id: self.publisher_user_id.clone(), + publisher_device_id: self.publisher_device_id.clone(), + subscriber_device_id: self.subscriber_device_id.clone(), + latest_seq: None, + current_body: Some("initial".to_owned()), + append_valid: true, + resync_pending: false, + }, + ); + } + + fn room(&self) -> Room { + let client = &self.subscriptions.inner.client; + client + .base_client() + .get_or_create_room(&self.stream_id.room_id, matrix_sdk_base::RoomState::Joined); + client.get_room(&self.stream_id.room_id).unwrap() + } + + fn update_from_publisher( + &self, + seq: UInt, + operation: StreamUpdateOperation, + ) -> ToDeviceEvent { ToDeviceEvent::new( - owned_user_id!("@publisher:example.org"), + self.publisher_user_id.clone(), StreamUpdateEventContent::new( - stream_id.room_id.clone(), - stream_id.event_id.clone(), + self.stream_id.room_id.clone(), + self.stream_id.event_id.clone(), seq, operation, ), ) - }; + } - subscriptions - .handle_update(update( + fn cancellation_from( + &self, + sender: OwnedUserId, + subscriber_device_id: OwnedDeviceId, + code: StreamCancelCode, + ) -> ToDeviceEvent { + ToDeviceEvent::new( + sender, + StreamCancelEventContent::new( + self.stream_id.room_id.clone(), + self.stream_id.event_id.clone(), + subscriber_device_id, + code, + ), + ) + } + + fn final_replacement_from( + &self, + sender: &str, + replaced_event_id: &str, + ) -> OriginalSyncRoomMessageEvent { + serde_json::from_value(json!({ + "content": { + "msgtype": "m.text", "body": "* done", + "m.new_content": { "msgtype": "m.text", "body": "done" }, + "m.relates_to": { "rel_type": "m.replace", "event_id": replaced_event_id } + }, + "event_id": "$edit", + "origin_server_ts": MilliSecondsSinceUnixEpoch::now(), + "sender": sender, + "type": "m.room.message" + })) + .unwrap() + } + + fn descriptor(&self) -> StreamDescriptor { + StreamDescriptor::new(self.publisher_device_id.clone()) + } + + async fn subscribe(&self) -> Result { + self.subscriptions + .subscribe( + self.stream_id.room_id.clone(), + self.stream_id.event_id.clone(), + self.publisher_user_id.clone(), + self.descriptor(), + "initial".to_owned(), + ) + .await + } + + fn server(&self) -> &MatrixMockServer { + self.server.as_ref().expect("fixture was not created with a mock server") + } + + async fn receive_to_device_event_from_publisher(&self, content: C) + where + C: StaticEventContent + serde::Serialize, + { + self.server() + .mock_sync() + .ok_and_run(&self.subscriptions.inner.client, |builder| { + builder.add_to_device_event(json!({ + "sender": self.publisher_user_id.as_str(), + "type": C::TYPE, + "content": content, + })); + }) + .await; + } + + async fn receive_final_edit_from_publisher(&self) { + let final_edit = EventFactory::new() + .room(&self.stream_id.room_id) + .sender(&self.publisher_user_id) + .text_msg("* done") + .edit( + &self.stream_id.event_id, + RoomMessageEventContentWithoutRelation::text_plain("done"), + ); + + self.server() + .mock_sync() + .ok_and_run(&self.subscriptions.inner.client, |builder| { + builder.add_joined_room( + JoinedRoomBuilder::new(&self.stream_id.room_id) + .add_timeline_event(final_edit), + ); + }) + .await; + } + } + + #[async_test] + async fn test_sync_to_device_events_are_emitted_and_cancel_the_stream() { + let fixture = SubscribableEventFixture::tracked_with_mock_server().await; + let mut updates = fixture.subscriptions.subscribe_to_updates(); + + let replacement = StreamUpdateEventContent::new( + fixture.stream_id.room_id.clone(), + fixture.stream_id.event_id.clone(), + uint!(1), + StreamUpdateOperation::Replace(StreamUpdateContent::new("from sync".to_owned())), + ); + fixture.receive_to_device_event_from_publisher(replacement).await; + + match updates.recv().await.unwrap() { + EventStreamSubscriberUpdate::Replaced { stream_id: update_stream_id, body } => { + assert_eq!(update_stream_id, fixture.stream_id); + assert_eq!(body, "from sync"); + } + update => panic!("expected replacement update, got {update:?}"), + } + + let mut cancellation = StreamCancelEventContent::new( + fixture.stream_id.room_id.clone(), + fixture.stream_id.event_id.clone(), + fixture.subscriber_device_id.clone(), + StreamCancelCode::Forbidden, + ); + cancellation.reason = Some("no longer visible".to_owned()); + fixture.receive_to_device_event_from_publisher(cancellation).await; + + match updates.recv().await.unwrap() { + EventStreamSubscriberUpdate::Cancelled { + stream_id: update_stream_id, + code, + reason, + } => { + assert_eq!(update_stream_id, fixture.stream_id); + assert_eq!(code, StreamCancelCode::Forbidden); + assert_eq!(reason.as_deref(), Some("no longer visible")); + } + update => panic!("expected cancellation update, got {update:?}"), + } + assert!(fixture.subscriptions.transient_body(&fixture.stream_id).await.is_none()); + + fixture + .subscriptions + .handle_cancel(fixture.cancellation_from( + fixture.publisher_user_id.clone(), + fixture.subscriber_device_id.clone(), + StreamCancelCode::Forbidden, + )) + .await; + assert!(updates.try_recv().is_err()); + } + + #[async_test] + async fn test_final_replacement_received_from_sync_stops_tracking_the_stream() { + let fixture = SubscribableEventFixture::tracked_with_mock_server().await; + fixture.receive_final_edit_from_publisher().await; + + assert!(fixture.subscriptions.transient_body(&fixture.stream_id).await.is_none()); + } + + #[async_test] + async fn test_ignores_invalid_cancellations() { + let fixture = SubscribableEventFixture::tracked().await; + let mut updates = fixture.subscriptions.subscribe_to_updates(); + + fixture + .subscriptions + .handle_cancel(fixture.cancellation_from( + owned_user_id!("@attacker:example.org"), + fixture.subscriber_device_id.clone(), + StreamCancelCode::UserCancelled, + )) + .await; + fixture + .subscriptions + .handle_cancel(fixture.cancellation_from( + fixture.publisher_user_id.clone(), + owned_device_id!("OTHER_DEVICE"), + StreamCancelCode::UserCancelled, + )) + .await; + + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("initial") + ); + assert!(updates.try_recv().is_err()); + } + + #[async_test] + async fn test_unsubscribe_stops_tracking_when_notification_fails() { + let fixture = SubscribableEventFixture::tracked_with_mock_server().await; + + let _failed_cancel = fixture + .server() + .mock_send_to_device() + .for_type(::TYPE) + .respond_with(ResponseTemplate::new(500)) + .mock_once() + .mount_as_scoped() + .await; + fixture.subscriptions.unsubscribe(&fixture.stream_id).await; + + assert!(fixture.subscriptions.transient_body(&fixture.stream_id).await.is_none()); + } + + #[async_test] + async fn test_subscription_handle_unsubscribe_sends_cancellation() { + let fixture = SubscribableEventFixture::with_mock_server().await; + let subscriber_device_id = fixture.subscriber_device_id.clone(); + + let expected_subscribe = StreamSubscribeEventContent::new( + fixture.stream_id.room_id.clone(), + fixture.stream_id.event_id.clone(), + subscriber_device_id.clone(), + ); + { + let _send = fixture + .server() + .mock_send_to_device() + .for_type(::TYPE) + .body_json(json!({ + "messages": { + (fixture.publisher_user_id.as_str()): { + (fixture.publisher_device_id.as_str()): expected_subscribe, + } + } + })) + .ok() + .mock_once() + .mount_as_scoped() + .await; + let subscription = fixture.subscribe().await.unwrap(); + + assert_eq!(subscription.stream_id(), &fixture.stream_id); + + let expected_cancel = StreamCancelEventContent::new( + fixture.stream_id.room_id.clone(), + fixture.stream_id.event_id.clone(), + subscriber_device_id, + StreamCancelCode::UserCancelled, + ); + let _cancel = fixture + .server() + .mock_send_to_device() + .for_type(::TYPE) + .body_json(json!({ + "messages": { + (fixture.publisher_user_id.as_str()): { + (fixture.publisher_device_id.as_str()): expected_cancel, + } + } + })) + .ok() + .mock_once() + .mount_as_scoped() + .await; + subscription.unsubscribe().await; + } + + assert!(fixture.subscriptions.transient_body(&fixture.stream_id).await.is_none()); + let _no_cancel = + fixture.server().mock_send_to_device().ok().never().mount_as_scoped().await; + fixture.subscriptions.unsubscribe(&fixture.stream_id).await; + } + + #[async_test] + async fn test_failed_resync_can_be_requested_again() { + let fixture = SubscribableEventFixture::tracked_with_mock_server().await; + + { + let _failed_resync = fixture + .server() + .mock_send_to_device() + .for_type(::TYPE) + .respond_with(ResponseTemplate::new(500)) + .mock_once() + .mount_as_scoped() + .await; + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( + uint!(2), + StreamUpdateOperation::Append(StreamUpdateContent::new("after gap".to_owned())), + )) + .await; + } + assert!( + !fixture.subscriptions.inner.subscriptions.lock().await[&fixture.stream_id] + .resync_pending + ); + + { + let _retried_resync = fixture + .server() + .mock_send_to_device() + .for_type(::TYPE) + .ok() + .mock_once() + .mount_as_scoped() + .await; + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( + uint!(3), + StreamUpdateOperation::Append(StreamUpdateContent::new("after gap".to_owned())), + )) + .await; + } + assert!( + fixture.subscriptions.inner.subscriptions.lock().await[&fixture.stream_id] + .resync_pending + ); + } + + #[async_test] + async fn test_applies_updates_until_the_stream_is_finalized() { + let fixture = SubscribableEventFixture::tracked().await; + + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( uint!(1), StreamUpdateOperation::Append(StreamUpdateContent::new(" one".to_owned())), )) .await; - assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial one")); + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("initial one") + ); - subscriptions - .handle_update(update( + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( uint!(2), StreamUpdateOperation::Append(StreamUpdateContent::new(" two".to_owned())), )) .await; assert_eq!( - subscriptions.transient_body(&stream_id).await.as_deref(), + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), Some("initial one two") ); - subscriptions - .handle_update(update( + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( uint!(3), StreamUpdateOperation::Replace(StreamUpdateContent::new("replaced".to_owned())), )) .await; - assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("replaced")); + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("replaced") + ); - subscriptions - .handle_update(update( + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( uint!(4), StreamUpdateOperation::Append(StreamUpdateContent::new(" again".to_owned())), )) .await; assert_eq!( - subscriptions.transient_body(&stream_id).await.as_deref(), + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), Some("replaced again") ); - let final_edit: OriginalSyncRoomMessageEvent = serde_json::from_value(json!({ - "content": { - "msgtype": "m.text", "body": "* done", - "m.new_content": { "msgtype": "m.text", "body": "done" }, - "m.relates_to": { "rel_type": "m.replace", "event_id": "$stream" } - }, - "event_id": "$edit", - "origin_server_ts": MilliSecondsSinceUnixEpoch::now(), - "sender": "@publisher:example.org", - "type": "m.room.message" - })) - .unwrap(); - - subscriptions.handle_room_message(final_edit, room).await; + fixture + .subscriptions + .handle_room_message( + fixture.final_replacement_from("@publisher:example.org", "$stream"), + fixture.room(), + ) + .await; - assert!(subscriptions.transient_body(&stream_id).await.is_none()); + assert!(fixture.subscriptions.transient_body(&fixture.stream_id).await.is_none()); } #[async_test] async fn test_ignores_updates_from_a_different_user() { - let subscriptions = EventStreamSubscriptions::new(logged_in_client(None).await); - let stream_id = StreamId::new( - room_id!("!room:example.org").to_owned(), - event_id!("$stream").to_owned(), - ); - track_stream(&subscriptions, &stream_id).await; + let fixture = SubscribableEventFixture::tracked().await; - let update = |body: &str| { - StreamUpdateEventContent::new( - stream_id.room_id.clone(), - stream_id.event_id.clone(), - uint!(1), - StreamUpdateOperation::Replace(StreamUpdateContent::new(body.to_owned())), - ) - }; - - subscriptions + fixture + .subscriptions .handle_update(ToDeviceEvent::new( owned_user_id!("@attacker:example.org"), - update("tampered"), + StreamUpdateEventContent::new( + fixture.stream_id.room_id.clone(), + fixture.stream_id.event_id.clone(), + uint!(1), + StreamUpdateOperation::Replace(StreamUpdateContent::new("tampered".to_owned())), + ), )) .await; - assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial")); + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("initial") + ); - subscriptions - .handle_update(ToDeviceEvent::new( - owned_user_id!("@publisher:example.org"), - update("accepted"), + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( + uint!(1), + StreamUpdateOperation::Replace(StreamUpdateContent::new("accepted".to_owned())), )) .await; - assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("accepted")); + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("accepted") + ); } #[async_test] async fn test_invalid_final_replacement_from_another_user_keeps_subscription_state() { - let client = logged_in_client(None).await; - let room_id = room_id!("!room:example.org"); - client.base_client().get_or_create_room(room_id, RoomState::Joined); - let room = client.get_room(room_id).unwrap(); - let subscriptions = EventStreamSubscriptions::new(client); - let stream_id = StreamId::new(room_id.to_owned(), event_id!("$stream").to_owned()); - track_stream(&subscriptions, &stream_id).await; - - let event: OriginalSyncRoomMessageEvent = serde_json::from_value(json!({ - "content": { - "msgtype": "m.text", - "body": "* done", - "m.new_content": { "msgtype": "m.text", "body": "done" }, - "m.relates_to": { "rel_type": "m.replace", "event_id": "$stream" } - }, - "event_id": "$invalid_edit", + let fixture = SubscribableEventFixture::tracked().await; + + fixture + .subscriptions + .handle_room_message( + fixture.final_replacement_from("@attacker:example.org", "$stream"), + fixture.room(), + ) + .await; + + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("initial") + ); + } + + #[async_test] + async fn test_ignores_room_messages_that_do_not_finalize_the_tracked_stream() { + let fixture = SubscribableEventFixture::tracked().await; + + let unrelated_message: OriginalSyncRoomMessageEvent = serde_json::from_value(json!({ + "content": { "msgtype": "m.text", "body": "ordinary message" }, + "event_id": "$ordinary", "origin_server_ts": MilliSecondsSinceUnixEpoch::now(), - "sender": "@attacker:example.org", + "sender": "@publisher:example.org", "type": "m.room.message" })) .unwrap(); + fixture.subscriptions.handle_room_message(unrelated_message, fixture.room()).await; + fixture + .subscriptions + .handle_room_message( + fixture.final_replacement_from("@publisher:example.org", "$other"), + fixture.room(), + ) + .await; - subscriptions.handle_room_message(event, room).await; - - assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial")); + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("initial") + ); } #[async_test] async fn test_stale_and_duplicate_updates_do_not_change_the_transient_body() { - let subscriptions = EventStreamSubscriptions::new(logged_in_client(None).await); - let stream_id = StreamId::new( - room_id!("!room:example.org").to_owned(), - event_id!("$stream").to_owned(), - ); - track_stream(&subscriptions, &stream_id).await; + let fixture = SubscribableEventFixture::tracked().await; - let update = |seq, body: &str| { - ToDeviceEvent::new( - owned_user_id!("@publisher:example.org"), - StreamUpdateEventContent::new( - stream_id.room_id.clone(), - stream_id.event_id.clone(), + for (seq, body) in [(uint!(2), "latest"), (uint!(2), "duplicate"), (uint!(1), "stale")] { + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( seq, StreamUpdateOperation::Replace(StreamUpdateContent::new(body.to_owned())), - ), - ) - }; - - subscriptions.handle_update(update(uint!(2), "latest")).await; - subscriptions.handle_update(update(uint!(2), "duplicate")).await; - subscriptions.handle_update(update(uint!(1), "stale")).await; + )) + .await; + } - assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("latest")); + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("latest") + ); } #[async_test] async fn test_duplicate_subscription_preserves_the_transient_baseline() { - let server = MatrixMockServer::new().await; - let subscriptions = EventStreamSubscriptions::new(server.client_builder().build().await); - let stream_id = StreamId::new( - room_id!("!room:example.org").to_owned(), - event_id!("$stream").to_owned(), - ); - track_stream(&subscriptions, &stream_id).await; - - let update = |seq, appended_body: &str| { - ToDeviceEvent::new( - owned_user_id!("@publisher:example.org"), - StreamUpdateEventContent::new( - stream_id.room_id.clone(), - stream_id.event_id.clone(), - seq, - StreamUpdateOperation::Append(StreamUpdateContent::new( - appended_body.to_owned(), - )), - ), - ) - }; + let fixture = SubscribableEventFixture::tracked_with_mock_server().await; - subscriptions.handle_update(update(uint!(1), " live")).await; - assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial live")); + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( + uint!(1), + StreamUpdateOperation::Append(StreamUpdateContent::new(" live".to_owned())), + )) + .await; + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("initial live") + ); { - let _renewal = server.mock_send_to_device().ok().mock_once().mount_as_scoped().await; - subscriptions - .subscribe( - stream_id.room_id.clone(), - stream_id.event_id.clone(), - owned_user_id!("@publisher:example.org"), - StreamDescriptor::new(owned_device_id!("PUBLISHER")), - "initial".to_owned(), - ) - .await - .unwrap(); + let _renewal = + fixture.server().mock_send_to_device().ok().mock_once().mount_as_scoped().await; + fixture.subscribe().await.unwrap(); } - assert_eq!(subscriptions.transient_body(&stream_id).await.as_deref(), Some("initial live")); + assert_eq!( + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), + Some("initial live") + ); { - let _no_resync = server.mock_send_to_device().ok().never().mount_as_scoped().await; - subscriptions.handle_update(update(uint!(2), " update")).await; + let _no_resync = + fixture.server().mock_send_to_device().ok().never().mount_as_scoped().await; + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( + uint!(2), + StreamUpdateOperation::Append(StreamUpdateContent::new(" update".to_owned())), + )) + .await; } assert_eq!( - subscriptions.transient_body(&stream_id).await.as_deref(), + fixture.subscriptions.transient_body(&fixture.stream_id).await.as_deref(), Some("initial live update") ); } #[async_test] async fn test_requests_one_resync_until_a_replacement_restores_the_baseline() { - let server = MatrixMockServer::new().await; - let client = server.client_builder().build().await; - let subscriptions = EventStreamSubscriptions::new(client); - let stream_id = StreamId::new( - room_id!("!room:example.org").to_owned(), - event_id!("$stream").to_owned(), - ); - track_stream(&subscriptions, &stream_id).await; - - let update = |seq, operation| { - ToDeviceEvent::new( - owned_user_id!("@publisher:example.org"), - StreamUpdateEventContent::new( - stream_id.room_id.clone(), - stream_id.event_id.clone(), - seq, - operation, - ), - ) - }; + let fixture = SubscribableEventFixture::tracked_with_mock_server().await; // Missing seq 1 makes this append unusable and triggers the first resync // request. { - let _resync = server.mock_send_to_device().ok().mock_once().mount_as_scoped().await; - subscriptions - .handle_update(update( + let _resync = + fixture.server().mock_send_to_device().ok().mock_once().mount_as_scoped().await; + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( uint!(2), StreamUpdateOperation::Append(StreamUpdateContent::new("missing".to_owned())), )) @@ -786,9 +1141,11 @@ mod tests { // The baseline is still invalid, but the outstanding resync suppresses a second // request. { - let _no_resync = server.mock_send_to_device().ok().never().mount_as_scoped().await; - subscriptions - .handle_update(update( + let _no_resync = + fixture.server().mock_send_to_device().ok().never().mount_as_scoped().await; + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( uint!(3), StreamUpdateOperation::Append(StreamUpdateContent::new( "still missing".to_owned(), @@ -799,9 +1156,11 @@ mod tests { // This is the resync update, we're good again { - let _no_resync = server.mock_send_to_device().ok().never().mount_as_scoped().await; - subscriptions - .handle_update(update( + let _no_resync = + fixture.server().mock_send_to_device().ok().never().mount_as_scoped().await; + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( uint!(4), StreamUpdateOperation::Replace(StreamUpdateContent::new("restored".to_owned())), )) @@ -810,9 +1169,11 @@ mod tests { // This new gap makes the append unusable and triggers a second resync request. { - let _resync = server.mock_send_to_device().ok().mock_once().mount_as_scoped().await; - subscriptions - .handle_update(update( + let _resync = + fixture.server().mock_send_to_device().ok().mock_once().mount_as_scoped().await; + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( uint!(6), StreamUpdateOperation::Append(StreamUpdateContent::new("new gap".to_owned())), )) @@ -822,41 +1183,24 @@ mod tests { #[async_test] async fn test_updates_after_final_replacement_are_ignored() { - let client = logged_in_client(None).await; - let room_id = room_id!("!room:example.org"); - client.base_client().get_or_create_room(room_id, RoomState::Joined); - let room = client.get_room(room_id).unwrap(); - let subscriptions = EventStreamSubscriptions::new(client); - let stream_id = StreamId::new(room_id.to_owned(), event_id!("$stream").to_owned()); - track_stream(&subscriptions, &stream_id).await; - - let event: OriginalSyncRoomMessageEvent = serde_json::from_value(json!({ - "content": { - "msgtype": "m.text", - "body": "* done", - "m.new_content": { "msgtype": "m.text", "body": "done" }, - "m.relates_to": { "rel_type": "m.replace", "event_id": "$stream" } - }, - "event_id": "$edit", - "origin_server_ts": MilliSecondsSinceUnixEpoch::now(), - "sender": "@publisher:example.org", - "type": "m.room.message" - })) - .unwrap(); - subscriptions.handle_room_message(event, room).await; + let fixture = SubscribableEventFixture::tracked().await; - subscriptions - .handle_update(ToDeviceEvent::new( - owned_user_id!("@publisher:example.org"), - StreamUpdateEventContent::new( - stream_id.room_id.clone(), - stream_id.event_id.clone(), - uint!(1), - StreamUpdateOperation::Replace(StreamUpdateContent::new("late".to_owned())), - ), + fixture + .subscriptions + .handle_room_message( + fixture.final_replacement_from("@publisher:example.org", "$stream"), + fixture.room(), + ) + .await; + + fixture + .subscriptions + .handle_update(fixture.update_from_publisher( + uint!(1), + StreamUpdateOperation::Replace(StreamUpdateContent::new("late".to_owned())), )) .await; - assert!(subscriptions.transient_body(&stream_id).await.is_none()); + assert!(fixture.subscriptions.transient_body(&fixture.stream_id).await.is_none()); } } diff --git a/crates/matrix-sdk/src/test_utils/mocks/mod.rs b/crates/matrix-sdk/src/test_utils/mocks/mod.rs index dd835bb07e9..c77c67ccf0e 100644 --- a/crates/matrix-sdk/src/test_utils/mocks/mod.rs +++ b/crates/matrix-sdk/src/test_utils/mocks/mod.rs @@ -3497,6 +3497,21 @@ impl<'a> MockEndpoint<'a, DeleteRoomKeysVersionEndpoint> { /// This mock can be used to simulate sending to-device messages in tests. pub struct SendToDeviceEndpoint; impl<'a> MockEndpoint<'a, SendToDeviceEndpoint> { + /// Ensures that the request sends a particular to-device event type. + pub fn for_type(self, event_type: &str) -> Self { + Self { + mock: self + .mock + .and(path_regex(format!(r"^/_matrix/client/v3/sendToDevice/{event_type}/.*"))), + ..self + } + } + + /// Ensures that the request body exactly matches the provided JSON. + pub fn body_json(self, body: Value) -> Self { + Self { mock: self.mock.and(body_json(body)), ..self } + } + /// Returns a successful response with default data. pub fn ok(self) -> MatrixMock<'a> { self.respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) From f2c2e5391f0058863100ecc08fdb12362cc7605a Mon Sep 17 00:00:00 2001 From: Brad Murray Date: Fri, 29 May 2026 10:23:38 -0400 Subject: [PATCH 6/6] sdk: use upstream ruma support for MSC4471 --- Cargo.lock | 18 ++++++++-------- Cargo.toml | 3 ++- bindings/matrix-sdk-ffi/src/client.rs | 8 +++---- .../src/store/integration_tests.rs | 4 ++-- crates/matrix-sdk-base/src/store/mod.rs | 2 +- crates/matrix-sdk-base/src/store/traits.rs | 11 +++++----- crates/matrix-sdk-sqlite/src/error.rs | 2 +- crates/matrix-sdk/src/client/mod.rs | 21 +++++++------------ .../matrix-sdk/src/event_streams/publisher.rs | 14 +++++++++---- .../src/event_streams/subscription.rs | 21 +++++++++++++------ crates/matrix-sdk/src/room/event_streams.rs | 8 ++++--- 11 files changed, 62 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2bcefe3e1b..efe3f003d37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4803,7 +4803,7 @@ dependencies = [ [[package]] name = "ruma" version = "0.15.1" -source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" +source = "git+https://github.com/ruma/ruma?rev=e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16#e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16" dependencies = [ "assign", "js_int", @@ -4821,7 +4821,7 @@ dependencies = [ [[package]] name = "ruma-client-api" version = "0.23.1" -source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" +source = "git+https://github.com/ruma/ruma?rev=e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16#e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16" dependencies = [ "as_variant", "assign", @@ -4843,7 +4843,7 @@ dependencies = [ [[package]] name = "ruma-common" version = "0.18.0" -source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" +source = "git+https://github.com/ruma/ruma?rev=e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16#e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16" dependencies = [ "as_variant", "base64", @@ -4876,7 +4876,7 @@ dependencies = [ [[package]] name = "ruma-events" version = "0.33.0" -source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" +source = "git+https://github.com/ruma/ruma?rev=e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16#e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16" dependencies = [ "as_variant", "indexmap", @@ -4899,7 +4899,7 @@ dependencies = [ [[package]] name = "ruma-federation-api" version = "0.14.0" -source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" +source = "git+https://github.com/ruma/ruma?rev=e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16#e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16" dependencies = [ "headers", "http", @@ -4920,7 +4920,7 @@ dependencies = [ [[package]] name = "ruma-html" version = "0.7.0" -source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" +source = "git+https://github.com/ruma/ruma?rev=e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16#e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16" dependencies = [ "as_variant", "html5ever", @@ -4931,7 +4931,7 @@ dependencies = [ [[package]] name = "ruma-identifiers-validation" version = "0.12.1" -source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" +source = "git+https://github.com/ruma/ruma?rev=e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16#e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16" dependencies = [ "js_int", "thiserror 2.0.18", @@ -4940,7 +4940,7 @@ dependencies = [ [[package]] name = "ruma-macros" version = "0.18.0" -source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" +source = "git+https://github.com/ruma/ruma?rev=e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16#e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16" dependencies = [ "as_variant", "cfg-if", @@ -4956,7 +4956,7 @@ dependencies = [ [[package]] name = "ruma-signatures" version = "0.20.0" -source = "git+https://github.com/bradtgmurray/ruma?branch=msc4471-event-streams#8e060f2faf372a1710aa67cc6e5c89995866ef5b" +source = "git+https://github.com/ruma/ruma?rev=e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16#e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16" dependencies = [ "base64", "ed25519-dalek", diff --git a/Cargo.toml b/Cargo.toml index c03130b2ad4..26f6bc7002f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,7 @@ rand = { version = "0.10.1", default-features = false, features = ["std", "std_r regex = { version = "1.12.2", default-features = false } reqwest = { version = "0.13.1", default-features = false } rmp-serde = { version = "1.3.0", default-features = false } -ruma = { version = "0.15.1", git = "https://github.com/bradtgmurray/ruma", branch = "msc4471-event-streams", features = [ +ruma = { version = "0.15.1", git = "https://github.com/ruma/ruma", rev = "e54e5b8048a1ff77a07f1a7a48f631b78d0a0a16", features = [ "client-api-c", "compat-unset-avatar", "compat-upload-signatures", @@ -89,6 +89,7 @@ ruma = { version = "0.15.1", git = "https://github.com/bradtgmurray/ruma", branc "unstable-msc4140", "unstable-msc4143", "unstable-msc4171", + "unstable-msc4195", "unstable-msc4278", "unstable-msc4286", "unstable-msc4306", diff --git a/bindings/matrix-sdk-ffi/src/client.rs b/bindings/matrix-sdk-ffi/src/client.rs index 256f2e5e4a7..a29a9a64aa0 100644 --- a/bindings/matrix-sdk-ffi/src/client.rs +++ b/bindings/matrix-sdk-ffi/src/client.rs @@ -38,12 +38,10 @@ use matrix_sdk::{ EventEncryptionAlgorithm, RoomId, TransactionId, UInt, UserId, api::client::{ account::request_openid_token, - discovery::{ - discover_homeserver::RtcFocusInfo, - get_authorization_server_metadata::v1::Prompt as RumaOAuthPrompt, - }, + discovery::get_authorization_server_metadata::v1::Prompt as RumaOAuthPrompt, push::{EmailPusherData, PusherIds, PusherInit, PusherKind as RumaPusherKind}, room::{Visibility, create_room}, + rtc::RtcTransport, session::get_login_types, user_directory::search_users, }, @@ -2060,7 +2058,7 @@ impl Client { .rtc_foci() .await? .iter() - .any(|focus| matches!(focus, RtcFocusInfo::LiveKit(_)))) + .any(|focus| matches!(focus, RtcTransport::LiveKit(_)))) } /// Checks if the server supports login using a QR code. diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 166f6d8a792..23e7fa29eb2 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -14,7 +14,7 @@ use ruma::{ EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, TransactionId, UserId, api::{ FeatureFlag, MatrixVersion, - client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo}, + client::{discovery::discover_homeserver::HomeserverInfo, rtc::RtcTransport}, }, event_id, events::{ @@ -559,7 +559,7 @@ impl StateStoreIntegrationTests for DynStateStore { homeserver: HomeserverInfo::new("matrix.example.com".to_owned()), identity_server: None, tile_server: None, - rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())], + rtc_foci: vec![RtcTransport::livekit("livekit.example.com".to_owned())], }; self.set_kv_data( diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 792b446faca..88c2fae5e9b 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -147,7 +147,7 @@ pub enum StoreError { /// /// This should never happen. #[error("Redaction failed: {0}")] - Redaction(#[source] ruma::canonical_json::RedactionError), + Redaction(#[source] ruma::canonical_json::CanonicalJsonFieldError), /// The store contains invalid data. #[error("The store contains invalid data: {details}")] diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 90efaaba3c0..97867de1b06 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -29,11 +29,12 @@ use ruma::{ OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId, api::{ MatrixVersion, SupportedVersions, - client::discovery::{ - discover_homeserver::{ - self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo, + client::{ + discovery::{ + discover_homeserver::{self, HomeserverInfo, IdentityServerInfo, TileServerInfo}, + get_capabilities::v3::Capabilities, }, - get_capabilities::v3::Capabilities, + rtc::RtcTransport, }, }, events::{ @@ -2070,7 +2071,7 @@ pub struct WellKnownResponse { pub tile_server: Option, /// A list of the available MatrixRTC foci, ordered by priority. - pub rtc_foci: Vec, + pub rtc_foci: Vec, } impl From for WellKnownResponse { diff --git a/crates/matrix-sdk-sqlite/src/error.rs b/crates/matrix-sdk-sqlite/src/error.rs index c87f08768e8..35745c8e9cd 100644 --- a/crates/matrix-sdk-sqlite/src/error.rs +++ b/crates/matrix-sdk-sqlite/src/error.rs @@ -106,7 +106,7 @@ pub enum Error { Unpickle, #[error("Redaction failed: {0}")] - Redaction(#[source] ruma::canonical_json::RedactionError), + Redaction(#[source] ruma::canonical_json::CanonicalJsonFieldError), #[error("An update keyed by unique ID touched more than one entry")] InconsistentUpdate, diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 2f5072d7dd3..705b1cf59be 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -57,15 +57,13 @@ use ruma::{ authenticated_media, device::{self, delete_devices, get_devices, update_device}, directory::{get_public_rooms, get_public_rooms_filtered}, - discovery::{ - discover_homeserver::{self, RtcFocusInfo}, - get_supported_versions, - }, + discovery::{discover_homeserver, get_supported_versions}, filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest}, knock::knock_room, media, membership::{join_room_by_id, join_room_by_id_or_alias}, room::create_room, + rtc::RtcTransport, session::login::v3::DiscoveryInfo, sync::sync_events, threads::get_thread_subscriptions_changes, @@ -2556,14 +2554,14 @@ impl Client { /// /// # Examples /// ```no_run - /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo}; + /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::rtc::RtcTransport}; /// # use url::Url; /// # async { /// # let homeserver = Url::parse("http://localhost:8080")?; /// # let mut client = Client::new(homeserver).await?; /// let rtc_foci = client.rtc_foci().await?; /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus { - /// RtcFocusInfo::LiveKit(info) => Some(info), + /// RtcTransport::LiveKit(info) => Some(info), /// _ => None, /// }); /// if let Some(info) = default_livekit_focus_info { @@ -2571,7 +2569,7 @@ impl Client { /// } /// # anyhow::Ok(()) }; /// ``` - pub async fn rtc_foci(&self) -> HttpResult> { + pub async fn rtc_foci(&self) -> HttpResult> { let well_known = self.well_known().await; Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default()) @@ -3649,10 +3647,7 @@ pub(crate) mod tests { RoomId, ServerName, UserId, api::{ FeatureFlag, MatrixVersion, - client::{ - discovery::discover_homeserver::RtcFocusInfo, - room::create_room::v3::Request as CreateRoomRequest, - }, + client::{room::create_room::v3::Request as CreateRoomRequest, rtc::RtcTransport}, }, assign, events::{ @@ -4103,7 +4098,7 @@ pub(crate) mod tests { let server_url = server.uri(); let domain = server_url.strip_prefix("http://").unwrap(); let server_name = <&ServerName>::try_from(domain).unwrap(); - let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())]; + let rtc_foci = vec![RtcTransport::livekit("https://livekit.example.com".to_owned())]; let well_known_mock = server .mock_well_known() @@ -4180,7 +4175,7 @@ pub(crate) mod tests { #[async_test] async fn test_missing_well_known_caching() { let server = MatrixMockServer::new().await; - let rtc_foci: Vec = vec![]; + let rtc_foci: Vec = vec![]; let well_known_mock = server .mock_well_known() diff --git a/crates/matrix-sdk/src/event_streams/publisher.rs b/crates/matrix-sdk/src/event_streams/publisher.rs index 456ada96dcf..62ce6ecbeb3 100644 --- a/crates/matrix-sdk/src/event_streams/publisher.rs +++ b/crates/matrix-sdk/src/event_streams/publisher.rs @@ -11,15 +11,21 @@ use ruma::{ events::{ AnyStateEvent, AnySyncTimelineEvent, AnyToDeviceEventContent, StateEventType, StaticEventContent, ToDeviceEvent, - event_stream::{ - StreamCancelCode, StreamCancelEventContent, StreamSubscribeEventContent, - StreamUpdateContent, StreamUpdateEventContent, StreamUpdateOperation, - }, room::{ history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent}, member::{MembershipState, RoomMemberEventContent}, message::RoomMessageEventContentWithoutRelation, }, + stream::{ + cancel::{ + StreamCancelCode, ToDeviceStreamCancelEventContent as StreamCancelEventContent, + }, + subscribe::ToDeviceStreamSubscribeEventContent as StreamSubscribeEventContent, + update::{ + StreamUpdateContent, StreamUpdateOperation, + ToDeviceStreamUpdateEventContent as StreamUpdateEventContent, + }, + }, }, serde::Raw, to_device::DeviceIdOrAllDevices, diff --git a/crates/matrix-sdk/src/event_streams/subscription.rs b/crates/matrix-sdk/src/event_streams/subscription.rs index 7a50e963129..952c4ab13f6 100644 --- a/crates/matrix-sdk/src/event_streams/subscription.rs +++ b/crates/matrix-sdk/src/event_streams/subscription.rs @@ -5,11 +5,17 @@ use ruma::{ OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, assign, events::{ ToDeviceEvent, - event_stream::{ - StreamCancelCode, StreamCancelEventContent, StreamDescriptor, - StreamSubscribeEventContent, StreamUpdateEventContent, StreamUpdateOperation, - }, room::message::{OriginalSyncRoomMessageEvent, Relation}, + stream::{ + StreamDescriptor, + cancel::{ + StreamCancelCode, ToDeviceStreamCancelEventContent as StreamCancelEventContent, + }, + subscribe::ToDeviceStreamSubscribeEventContent as StreamSubscribeEventContent, + update::{ + StreamUpdateOperation, ToDeviceStreamUpdateEventContent as StreamUpdateEventContent, + }, + }, }, }; use tokio::sync::{Mutex, broadcast}; @@ -295,7 +301,7 @@ impl EventStreamSubscriptions { } let mut should_resync = false; - let update = match content.op { + let update = match content.operation { StreamUpdateOperation::Replace(new_content) => { trace!( room_id = %stream_id.room_id, @@ -499,8 +505,11 @@ mod tests { MilliSecondsSinceUnixEpoch, event_id, events::{ StaticEventContent, ToDeviceEvent, - event_stream::{StreamUpdateContent, StreamUpdateEventContent, StreamUpdateOperation}, room::message::{OriginalSyncRoomMessageEvent, RoomMessageEventContentWithoutRelation}, + stream::update::{ + StreamUpdateContent, StreamUpdateOperation, + ToDeviceStreamUpdateEventContent as StreamUpdateEventContent, + }, }, owned_device_id, owned_user_id, room_id, }; diff --git a/crates/matrix-sdk/src/room/event_streams.rs b/crates/matrix-sdk/src/room/event_streams.rs index 55720643e81..dae13d8f48a 100644 --- a/crates/matrix-sdk/src/room/event_streams.rs +++ b/crates/matrix-sdk/src/room/event_streams.rs @@ -1,9 +1,11 @@ +use std::time::Duration; + use matrix_sdk_base::deserialized_responses::TimelineEvent; use ruma::{ EventId, assign, events::{ - AnyMessageLikeEventContent, AnySyncTimelineEvent, event_stream::StreamDescriptor, - room::message::RoomMessageEventContent, + AnyMessageLikeEventContent, AnySyncTimelineEvent, room::message::RoomMessageEventContent, + stream::StreamDescriptor, }, }; @@ -25,7 +27,7 @@ impl Room { self.client().device_id().ok_or(EventStreamError::AuthenticationRequired)?.to_owned(); let content_with_descriptor = assign!(message_content.clone(), { stream: Some(assign!(StreamDescriptor::new(device_id.clone()), { - expiry_ms: Some(options.descriptor_expiry_ms), + expiry_ms: Some(Duration::from_millis(u64::from(options.descriptor_expiry_ms))), })), }); let response = self.send(content_with_descriptor).await?;