Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.0.3

- Add `PowerSyncDatabase::watch_statement` to get an auto-updating stream of query results.

## 0.0.2

- Configure automated publishing to crates.io.
Expand Down
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
[workspace]
resolver = "3"
members = [ "examples/egui_todolist","powersync", "powersync_test_utils"]
members = [
"powersync",
"powersync_test_utils",
"examples/egui_todolist"
]

[workspace.package]
repository = "https://github.com/powersync-ja/powersync-native"
1 change: 0 additions & 1 deletion powersync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ smol = ["dep:async-io"]
ffi = []

[dependencies]
async-broadcast = "0.7.2"
async-channel = "2.5.0"
async-lock = "3.4.1"
async-io = { version = "2.6.0", optional = true }
Expand Down
5 changes: 0 additions & 5 deletions powersync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,3 @@ This SDK is in development. Some items that are still being worked on are:

1. Token prefetching and caching.
2. Unit tests for CRUD uploads.

Also, this crate's `build.rs` dynamically downloads a binary
(the [PowerSync core extension](https://github.com/powersync-ja/powersync-sqlite-core/)) to link into the final
executable. The reason is that, while this library works with stable Rust, the core extension requires a nightly build.
We'll work towards making the core extension a regular Rust crate supporting stable compilers in the future.
140 changes: 132 additions & 8 deletions powersync/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::borrow::Cow;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use futures_lite::{FutureExt, Stream, StreamExt};

use crate::db::async_support::AsyncDatabaseTasks;
use crate::sync::coordinator::SyncCoordinator;
use crate::{
Expand All @@ -15,6 +16,9 @@ use crate::{
schema::Schema,
sync::{download::DownloadActor, status::SyncStatusData, upload::UploadActor},
};
use futures_lite::stream::{once, once_future};
use futures_lite::{FutureExt, Stream, StreamExt};
use rusqlite::{Params, Statement, params};

mod async_support;
pub mod core_extension;
Expand Down Expand Up @@ -84,26 +88,140 @@ impl PowerSyncDatabase {
/// The `emit_initially` option can be used to control whether the stream should emit as well
/// when polled for the first time. This can be useful to build streams emitting a complete
/// snapshot of results every time a source table is changed.
pub fn watch_tables<'a>(
pub fn watch_tables<'a, Tables: IntoIterator<Item = impl Into<Cow<'a, str>>>>(
&self,
emit_initially: bool,
tables: impl IntoIterator<Item = &'a str>,
) -> impl Stream<Item = ()> {
tables: Tables,
) -> impl Stream<Item = ()> + 'static {
self.inner.env.pool.update_notifiers().listen(
emit_initially,
tables
.into_iter()
.flat_map(|s| {
let s = s.into();

[
s.to_string(),
format!("ps_data__{s}"),
format!("ps_data_local__{s}"),
format!("{}{s}", Self::PS_DATA_PREFIX),
format!("{}{s}", Self::PS_DATA_LOCAL_PREFIX),
Cow::into_owned(s),
]
})
.collect(),
)
}

/// Returns an asynchronous [Stream] emitting snapshots of a `SELECT` statement every time
/// source tables are modified.
///
/// `sql` is the `SELECT` statement to execute and `params` are parameters to use.
/// The `read` function obtains the raw prepared statement and a copy of parameters to use,
/// and can run the statements into desired results.
///
/// This method is a core building block for reactive applications with PowerSync - since it
/// updates automatically, all writes (regardless of whether they're local or due to synced
/// writes from your backend) are reflected.
pub fn watch_statement<T, F, P: Params + Clone + 'static>(
&self,
sql: String,
params: P,
read: F,
) -> impl Stream<Item = Result<T, PowerSyncError>> + 'static
where
for<'a> F: (Fn(&'a mut Statement, P) -> Result<T, PowerSyncError>) + 'static + Clone,
{
let update_notifications =
self.emit_on_statement_changes(true, sql.to_string(), params.clone());

let db = self.clone();
update_notifications.then(move |notification| {
let db = db.clone();
let sql = sql.clone();
let params = params.clone();
let mapper = read.clone();

async move {
if let Err(e) = notification {
return Err(e);
}

let reader = db.reader().await?;
let mut stmt = reader.prepare_cached(&sql)?;

mapper(&mut stmt, params)
}
})
}

fn emit_on_statement_changes(
&self,
emit_initially: bool,
sql: String,
params: impl Params + 'static,
) -> impl Stream<Item = Result<(), PowerSyncError>> + 'static {
// Stream emitting referenced tables once.
let tables = once_future(self.clone().find_tables(sql, params));

// Stream emitting updates, or a single error if we couldn't resolve tables.
let db = self.clone();
tables.flat_map(move |referenced_tables| match referenced_tables {
Ok(referenced_tables) => db
.watch_tables(emit_initially, referenced_tables)
.map(Ok)
.boxed(),
Err(e) => once(Err(e)).boxed(),
})
}

/// Finds all tables that are used in a given select statement.
///
/// This can be used together with [watch_tables] to build an auto-updating stream of queries.
async fn find_tables<P: Params>(
self,
sql: impl Into<Cow<'static, str>>,
params: P,
) -> Result<Vec<String>, PowerSyncError> {
let reader = self.reader().await?;
let mut stmt = reader.prepare(&format!("EXPLAIN {}", sql.into()))?;
let mut rows = stmt.query(params)?;

let mut find_table_stmt =
reader.prepare_cached("SELECT tbl_name FROM sqlite_schema WHERE rootpage = ?")?;
let mut found_tables = HashSet::new();

while let Some(row) = rows.next()? {
let opcode = row.get_ref("opcode")?;
let p2 = row.get_ref("p2")?;
let p3 = row.get_ref("p3")?;

if matches!(opcode.as_str(), Ok("OpenRead"))
&& matches!(p3.as_i64(), Ok(0))
&& let Ok(page) = p2.as_i64()
{
let mut found_table = find_table_stmt.query(params![page])?;
if let Some(found_table) = found_table.next()? {
let table_name: String = found_table.get(0)?;
found_tables.insert(table_name);
}
}
}

Ok(found_tables
.into_iter()
.map(|mut table| {
if table.starts_with(Self::PS_DATA_PREFIX) {
table.split_off(Self::PS_DATA_PREFIX.len())
} else if table.starts_with(Self::PS_DATA_LOCAL_PREFIX) {
table.split_off(Self::PS_DATA_LOCAL_PREFIX.len())
} else {
table
}
})
.collect())
}

const PS_DATA_PREFIX: &'static str = "ps_data__";
const PS_DATA_LOCAL_PREFIX: &'static str = "ps_data_local__";

/// Returns a [Stream] traversing through transactions that have been completed on this
/// database.
///
Expand Down Expand Up @@ -198,3 +316,9 @@ impl PowerSyncDatabase {
}
*/
}

impl Debug for PowerSyncDatabase {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PowerSyncDatabase").finish_non_exhaustive()
}
}
16 changes: 7 additions & 9 deletions powersync/src/db/streams.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,3 @@
use std::{
cell::Cell,
collections::HashMap,
sync::{Arc, Mutex, Weak},
time::Duration,
};

use rusqlite::params;

use crate::{
PowerSyncDatabase, StreamPriority,
db::internal::InnerPowerSyncState,
Expand All @@ -17,6 +8,13 @@ use crate::{
},
util::SerializedJsonObject,
};
use rusqlite::params;
use std::{
cell::Cell,
collections::HashMap,
sync::{Arc, Mutex, Weak},
time::Duration,
};

/// Tracks all sync streams that currently have at least one active [StreamSubscription].
#[derive(Default)]
Expand Down
2 changes: 1 addition & 1 deletion powersync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ mod sync;
mod util;

pub use db::PowerSyncDatabase;
pub use db::crud::{CrudEntry, CrudTransaction, UpdateType};
#[cfg(feature = "ffi")]
pub use db::internal::InnerPowerSyncState;
pub use db::crud::{CrudEntry, CrudTransaction, UpdateType};
pub use db::pool::{ConnectionPool, LeasedConnection};
pub use db::streams::StreamSubscription;
pub use db::streams::StreamSubscriptionOptions;
Expand Down
82 changes: 81 additions & 1 deletion powersync/tests/database_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures_lite::{StreamExt, future};
use powersync::error::PowerSyncError;
use powersync_test_utils::{DatabaseTest, UserRow, execute, query_all};
use rusqlite::params;
use serde_json::json;
use serde_json::{Value, json};

#[test]
fn link_core_extension() {
Expand Down Expand Up @@ -152,3 +152,83 @@ fn test_table_updates() {
);
});
}

#[test]
fn test_watch_statement() {
let test = DatabaseTest::new();
let db = Arc::new(test.test_dir_database());

future::block_on(async move {
let mut stream = db
.watch_statement(
"SELECT name FROM users".to_string(),
params![],
|stmt, params| {
let mut rows = stmt.query(params)?;
let mut names = vec![];
while let Some(row) = rows.next()? {
let name = row.get(0)?;
names.push(Value::String(name));
}

Ok(Value::Array(names))
},
)
.boxed_local();

// Initial query.
assert_eq!(stream.next().await.unwrap().unwrap(), json!([]));

execute(
&db,
"INSERT INTO users (id, name) VALUES (uuid(), ?)",
params!["Test"],
)
.await;
assert_eq!(stream.next().await.unwrap().unwrap(), json!(["Test"]));

{
let mut writer = db.writer().await.unwrap();
let writer = writer.transaction().unwrap();

writer
.execute(
"INSERT INTO users (id, name) VALUES (uuid(), ?)",
params!["Test2"],
)
.unwrap();
writer
.execute(
"INSERT INTO users (id, name) VALUES (uuid(), ?)",
params!["Test3"],
)
.unwrap();

writer.commit().unwrap();
}

assert_eq!(
stream.next().await.unwrap().unwrap(),
json!(["Test", "Test2", "Test3"])
);

{
let mut writer = db.writer().await.unwrap();
let writer = writer.transaction().unwrap();

writer.execute("DELETE FROM users", params![]).unwrap();
// Transactions we're rolling back should not impact streams.
}

execute(
&db,
"INSERT INTO users (id, name) VALUES (uuid(), ?)",
params!["Test4"],
)
.await;
assert_eq!(
stream.next().await.unwrap().unwrap(),
json!(["Test", "Test2", "Test3", "Test4"])
);
});
}