Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/client-api-messages/DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,12 @@ spacetime generate -p spacetimedb-cli --lang <SDK lang> \
--out-dir <sdk WebSocket schema bindings dir> \
--module-def ws_schema_v2.json
```

For the v3 WebSocket transport schema:

```sh
cargo run --example get_ws_schema_v3 > ws_schema_v3.json
spacetime generate -p spacetimedb-cli --lang <SDK lang> \
--out-dir <sdk WebSocket schema bindings dir> \
--module-def ws_schema_v3.json
```
13 changes: 13 additions & 0 deletions crates/client-api-messages/examples/get_ws_schema_v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use spacetimedb_client_api_messages::websocket::v3::{ClientFrame, ServerFrame};
use spacetimedb_lib::ser::serde::SerializeWrapper;
use spacetimedb_lib::{RawModuleDef, RawModuleDefV8};

fn main() -> Result<(), serde_json::Error> {
let module = RawModuleDefV8::with_builder(|module| {
module.add_type::<ClientFrame>();
module.add_type::<ServerFrame>();
});
let module = RawModuleDef::V8BackCompat(module);

serde_json::to_writer(std::io::stdout().lock(), SerializeWrapper::from_ref(&module))
}
1 change: 1 addition & 0 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
pub mod common;
pub mod v1;
pub mod v2;
pub mod v3;
28 changes: 28 additions & 0 deletions crates/client-api-messages/src/websocket/v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use bytes::Bytes;
pub use spacetimedb_sats::SpacetimeType;

pub const BIN_PROTOCOL: &str = "v3.bsatn.spacetimedb";

/// Transport envelopes sent by the client over the v3 websocket protocol.
///
/// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ClientMessage`] values.
#[derive(SpacetimeType, Debug)]
#[sats(crate = spacetimedb_lib)]
pub enum ClientFrame {
/// A single logical client message.
Single(Bytes),
/// Multiple logical client messages that should be processed in-order.
Batch(Box<[Bytes]>),
}

/// Transport envelopes sent by the server over the v3 websocket protocol.
///
/// The inner bytes are BSATN-encoded v2 [`crate::websocket::v2::ServerMessage`] values.
#[derive(SpacetimeType, Debug)]
#[sats(crate = spacetimedb_lib)]
pub enum ServerFrame {
/// A single logical server message.
Single(Bytes),
/// Multiple logical server messages that should be processed in-order.
Batch(Box<[Bytes]>),
}
132 changes: 95 additions & 37 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use prometheus::{Histogram, IntGauge};
use scopeguard::{defer, ScopeGuard};
use serde::Deserialize;
use spacetimedb::client::messages::{
serialize, serialize_v2, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer, SwitchedServerMessage,
ToProtocol,
serialize, serialize_v2, serialize_v3, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer,
SwitchedServerMessage, ToProtocol,
};
use spacetimedb::client::{
ClientActorId, ClientConfig, ClientConnection, ClientConnectionReceiver, DataMessage, MessageExecutionError,
Expand All @@ -38,6 +38,7 @@ use spacetimedb::worker_metrics::WORKER_METRICS;
use spacetimedb::Identity;
use spacetimedb_client_api_messages::websocket::v1 as ws_v1;
use spacetimedb_client_api_messages::websocket::v2 as ws_v2;
use spacetimedb_client_api_messages::websocket::v3 as ws_v3;
use spacetimedb_datastore::execution_context::WorkloadType;
use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
use tokio::sync::{mpsc, watch};
Expand All @@ -62,6 +63,8 @@ pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::TEXT_PROT
pub const BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v1::BIN_PROTOCOL);
#[allow(clippy::declare_interior_mutable_const)]
pub const V2_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v2::BIN_PROTOCOL);
#[allow(clippy::declare_interior_mutable_const)]
pub const V3_BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_v3::BIN_PROTOCOL);

pub trait HasWebSocketOptions {
fn websocket_options(&self) -> WebSocketOptions;
Expand Down Expand Up @@ -101,7 +104,7 @@ fn resolve_confirmed_reads_default(version: WsVersion, confirmed: Option<bool>)
}
match version {
WsVersion::V1 => false,
WsVersion::V2 => crate::DEFAULT_CONFIRMED_READS,
WsVersion::V2 | WsVersion::V3 => crate::DEFAULT_CONFIRMED_READS,
}
}

Expand Down Expand Up @@ -151,6 +154,13 @@ where
}

let (res, ws_upgrade, protocol) = ws.select_protocol([
(
V3_BIN_PROTOCOL,
NegotiatedProtocol {
protocol: Protocol::Binary,
version: WsVersion::V3,
},
),
(
V2_BIN_PROTOCOL,
NegotiatedProtocol {
Expand Down Expand Up @@ -284,7 +294,7 @@ where
};
client.send_message(None, OutboundMessage::V1(message.into()))
}
WsVersion::V2 => {
WsVersion::V2 | WsVersion::V3 => {
let message = ws_v2::ServerMessage::InitialConnection(ws_v2::InitialConnection {
identity: client_identity,
connection_id,
Expand Down Expand Up @@ -1293,10 +1303,15 @@ async fn ws_encode_task(
// copied to the wire. Since we don't know when that will happen, we prepare
// for a few messages to be in-flight, i.e. encoded but not yet sent.
const BUF_POOL_CAPACITY: usize = 16;
let binary_message_serializer = match config.version {
WsVersion::V1 => None,
WsVersion::V2 => Some(serialize_v2 as BinarySerializeFn),
WsVersion::V3 => Some(serialize_v3 as BinarySerializeFn),
};
let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY);
let mut in_use_bufs: Vec<ScopeGuard<InUseSerializeBuffer, _>> = Vec::with_capacity(BUF_POOL_CAPACITY);

while let Some(message) = messages.recv().await {
'send: while let Some(message) = messages.recv().await {
// Drop serialize buffers with no external referent,
// returning them to the pool.
in_use_bufs.retain(|in_use| !in_use.is_unique());
Expand All @@ -1306,55 +1321,70 @@ async fn ws_encode_task(

let in_use_buf = match message {
OutboundWsMessage::Error(message) => {
if config.version == WsVersion::V2 {
log::error!("dropping v1 error message sent to a v2 client: {:?}", message);
if config.version != WsVersion::V1 {
log::error!(
"dropping v1 error message sent to a binary websocket client: {:?}",
message
);
continue;
}
let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await;
metrics.report(None, None, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
}

let Ok(in_use) = ws_forward_frames(
&metrics,
&outgoing_frames,
None,
None,
ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await,
) else {
break 'send;
};
in_use
}
OutboundWsMessage::Message(message) => {
let workload = message.workload();
let num_rows = message.num_rows();
match message {
OutboundMessage::V2(server_message) => {
if config.version != WsVersion::V2 {
if config.version == WsVersion::V1 {
log::error!("dropping v2 message on v1 connection");
continue;
}

let (stats, in_use, mut frames) =
ws_encode_message_v2(config, buf, server_message, false, &bsatn_rlb_pool).await;
metrics.report(workload, num_rows, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
}

let Ok(in_use) = ws_forward_frames(
&metrics,
&outgoing_frames,
workload,
num_rows,
ws_encode_binary_message(
config,
buf,
server_message,
binary_message_serializer.expect("v2 message should not be sent on a v1 connection"),
false,
&bsatn_rlb_pool,
)
.await,
) else {
break 'send;
};
in_use
}
OutboundMessage::V1(message) => {
if config.version == WsVersion::V2 {
log::error!(
"dropping v1 message for v2 connection until v2 serialization is implemented: {:?}",
message
);
if config.version != WsVersion::V1 {
log::error!("dropping v1 message for a binary websocket connection: {:?}", message);
continue;
}

let is_large = num_rows.is_some_and(|n| n > 1024);

let (stats, in_use, mut frames) =
ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await;
metrics.report(workload, num_rows, stats);
if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() {
break;
}

let Ok(in_use) = ws_forward_frames(
&metrics,
&outgoing_frames,
workload,
num_rows,
ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await,
) else {
break 'send;
};
in_use
}
}
Expand All @@ -1370,6 +1400,24 @@ async fn ws_encode_task(
}
}

/// Reports encode metrics for an already-encoded message and forwards all of
/// its frames to the websocket send task.
fn ws_forward_frames<I>(
metrics: &SendMetrics,
outgoing_frames: &mpsc::UnboundedSender<Frame>,
workload: Option<WorkloadType>,
num_rows: Option<usize>,
encoded: (EncodeMetrics, InUseSerializeBuffer, I),
) -> Result<InUseSerializeBuffer, mpsc::error::SendError<Frame>>
where
I: Iterator<Item = Frame>,
{
let (stats, in_use, frames) = encoded;
metrics.report(workload, num_rows, stats);
frames.into_iter().try_for_each(|frame| outgoing_frames.send(frame))?;
Ok(in_use)
}

/// Some stats about serialization and compression.
///
/// Returned by [`ws_encode_message`].
Expand Down Expand Up @@ -1443,21 +1491,29 @@ async fn ws_encode_message(
(metrics, msg_alloc, frames)
}

#[allow(dead_code, unused_variables)]
async fn ws_encode_message_v2(
type BinarySerializeFn = fn(
&BsatnRowListBuilderPool,
SerializeBuffer,
ws_v2::ServerMessage,
ws_v1::Compression,
) -> (InUseSerializeBuffer, Bytes);

async fn ws_encode_binary_message(
config: ClientConfig,
buf: SerializeBuffer,
message: ws_v2::ServerMessage,
serialize_message: BinarySerializeFn,
is_large_message: bool,
bsatn_rlb_pool: &BsatnRowListBuilderPool,
) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator<Item = Frame> + use<>) {
let start = Instant::now();
let compression = config.compression;

let (in_use, data) = if is_large_message {
let bsatn_rlb_pool = bsatn_rlb_pool.clone();
spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, config.compression)).await
spawn_rayon(move || serialize_message(&bsatn_rlb_pool, buf, message, compression)).await
} else {
serialize_v2(bsatn_rlb_pool, buf, message, config.compression)
serialize_message(bsatn_rlb_pool, buf, message, compression)
};

let metrics = EncodeMetrics {
Expand Down Expand Up @@ -2298,9 +2354,11 @@ mod tests {

#[test]
fn confirmed_reads_default_depends_on_ws_version() {
assert!(resolve_confirmed_reads_default(WsVersion::V3, None));
assert!(resolve_confirmed_reads_default(WsVersion::V2, None));
assert!(!resolve_confirmed_reads_default(WsVersion::V1, None));
assert!(resolve_confirmed_reads_default(WsVersion::V1, Some(true)));
assert!(!resolve_confirmed_reads_default(WsVersion::V3, Some(false)));
assert!(!resolve_confirmed_reads_default(WsVersion::V2, Some(false)));
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod consume_each_list;
mod message_handlers;
mod message_handlers_v1;
mod message_handlers_v2;
mod message_handlers_v3;
pub mod messages;

pub use client_connection::{
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub enum Protocol {
pub enum WsVersion {
V1,
V2,
V3,
}

impl Protocol {
Expand Down Expand Up @@ -384,7 +385,7 @@ impl ClientConnectionSender {
debug_assert!(
matches!(
(&self.config.version, &message),
(WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2, OutboundMessage::V2(_))
(WsVersion::V1, OutboundMessage::V1(_)) | (WsVersion::V2 | WsVersion::V3, OutboundMessage::V2(_))
),
"attempted to send message variant that does not match client websocket version"
);
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
match client.config.version {
WsVersion::V1 => super::message_handlers_v1::handle(client, message, timer).await,
WsVersion::V2 => super::message_handlers_v2::handle(client, message, timer).await,
WsVersion::V3 => super::message_handlers_v3::handle(client, message, timer).await,
}
}
8 changes: 8 additions & 0 deletions crates/core/src/client/message_handlers_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
)))
}
};
handle_decoded_message(client, message, timer).await
}

pub(super) async fn handle_decoded_message(
client: &ClientConnection,
message: ws_v2::ClientMessage,
timer: Instant,
) -> Result<(), MessageHandleError> {
let module = client.module();
let mod_info = module.info();
let mod_metrics = &mod_info.metrics;
Expand Down
32 changes: 32 additions & 0 deletions crates/core/src/client/message_handlers_v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use super::{ClientConnection, DataMessage, MessageHandleError};
use serde::de::Error as _;
use spacetimedb_client_api_messages::websocket::{v2 as ws_v2, v3 as ws_v3};
use spacetimedb_lib::bsatn;
use std::time::Instant;

pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> {
client.observe_websocket_request_message(&message);
let frame = match message {
DataMessage::Binary(message_buf) => bsatn::from_slice::<ws_v3::ClientFrame>(&message_buf)?,
DataMessage::Text(_) => {
return Err(MessageHandleError::TextDecode(serde_json::Error::custom(
"v3 websocket does not support text messages",
)))
}
};

match frame {
ws_v3::ClientFrame::Single(message) => {
let message = bsatn::from_slice::<ws_v2::ClientMessage>(&message)?;
super::message_handlers_v2::handle_decoded_message(client, message, timer).await?;
}
ws_v3::ClientFrame::Batch(messages) => {
for message in messages {
let message = bsatn::from_slice::<ws_v2::ClientMessage>(&message)?;
super::message_handlers_v2::handle_decoded_message(client, message, timer).await?;
}
}
}

Ok(())
}
Loading
Loading