diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs
index 6e1adcd6e6..ac18ab4a37 100644
--- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs
+++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs
@@ -17,7 +17,7 @@ use uuid::Uuid;
use super::{
keys,
- sqlite::{db_name_internal, SqlStub},
+ sqlite::SqlStub,
DatabaseFdbSqliteNats,
};
use crate::{
@@ -602,10 +602,10 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
for key in sub_workflow_wake_keys {
tracing::warn!(
- "workflow {} is being waited on by sub workflow {}, silencing anyway",
- key.workflow_id,
- key.sub_workflow_id
- );
+ "workflow {} is being waited on by sub workflow {}, silencing anyway",
+ key.workflow_id,
+ key.sub_workflow_id
+ );
}
for key in tag_keys {
@@ -741,7 +741,7 @@ impl DatabaseDebug for DatabaseFdbSqliteNats {
) -> Result> {
let pool = &self
.pools
- .sqlite(db_name_internal(workflow_id), true)
+ .sqlite(crate::db::sqlite_db_name_internal(workflow_id), true)
.await?;
let (wf_data, event_rows, error_rows) = tokio::try_join!(
diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs
index 544fc03532..2ca270169a 100644
--- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs
+++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs
@@ -142,7 +142,7 @@ impl DatabaseFdbSqliteNats {
self.pools
.sqlite_manager()
.evict(vec![
- sqlite::db_name_internal(workflow_id),
+ crate::db::sqlite_db_name_internal(workflow_id),
crate::db::sqlite_db_name_data(workflow_id),
])
.await?;
@@ -1277,7 +1277,7 @@ impl Database for DatabaseFdbSqliteNats {
async move {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(partial.workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(partial.workflow_id), false)
.await?;
// Handle error during sqlite init
@@ -1876,7 +1876,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult > {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
.await?;
let owned_filter = filter
@@ -2323,7 +2323,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(from_workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;
// Insert history event
@@ -2351,7 +2351,7 @@ impl Database for DatabaseFdbSqliteNats {
.sqlite_manager()
.flush(
vec![
- sqlite::db_name_internal(from_workflow_id),
+ crate::db::sqlite_db_name_internal(from_workflow_id),
crate::db::sqlite_db_name_data(from_workflow_id),
],
false,
@@ -2413,7 +2413,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
.await?;
// Insert history event
@@ -2448,7 +2448,7 @@ impl Database for DatabaseFdbSqliteNats {
.sqlite_manager()
.flush(
vec![
- sqlite::db_name_internal(workflow_id),
+ crate::db::sqlite_db_name_internal(workflow_id),
crate::db::sqlite_db_name_data(workflow_id),
],
false,
@@ -2596,7 +2596,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
.await?;
let input_hash = event_id.input_hash.to_be_bytes();
@@ -2702,7 +2702,7 @@ impl Database for DatabaseFdbSqliteNats {
.sqlite_manager()
.flush(
vec![
- sqlite::db_name_internal(from_workflow_id),
+ crate::db::sqlite_db_name_internal(from_workflow_id),
crate::db::sqlite_db_name_data(from_workflow_id),
],
false,
@@ -2711,7 +2711,7 @@ impl Database for DatabaseFdbSqliteNats {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(from_workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;
sql_execute!(
@@ -2751,7 +2751,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(workflow_id), false)
.await?;
self.txn(|| async {
@@ -3040,7 +3040,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(from_workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;
sql_execute!(
@@ -3072,7 +3072,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(from_workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;
sql_execute!(
@@ -3100,7 +3100,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(from_workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;
sql_execute!(
@@ -3132,7 +3132,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(from_workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;
sql_execute!(
@@ -3164,7 +3164,7 @@ impl Database for DatabaseFdbSqliteNats {
) -> WorkflowResult<()> {
let pool = &self
.pools
- .sqlite(sqlite::db_name_internal(from_workflow_id), false)
+ .sqlite(crate::db::sqlite_db_name_internal(from_workflow_id), false)
.await?;
sql_execute!(
@@ -3220,7 +3220,7 @@ async fn flush_handler(pools: rivet_pools::Pools, mut flush_rx: mpsc::UnboundedR
.sqlite_manager()
.flush(
vec![
- sqlite::db_name_internal(workflow_id),
+ crate::db::sqlite_db_name_internal(workflow_id),
crate::db::sqlite_db_name_data(workflow_id),
],
true,
diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs
index bd76c08e1a..6eb0e80cb7 100644
--- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs
+++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/sqlite/mod.rs
@@ -1,6 +1,5 @@
use std::collections::HashMap;
-use fdb_util::keys::*;
use include_dir::{include_dir, Dir, File};
use indoc::indoc;
use rivet_pools::prelude::*;
@@ -407,8 +406,3 @@ pub fn build_history(
Ok(events_by_location)
}
-
-/// Database name for the workflow internal state.
-pub fn db_name_internal(workflow_id: Uuid) -> (usize, Uuid, usize) {
- (WORKFLOW, workflow_id, INTERNAL)
-}
diff --git a/packages/common/chirp-workflow/core/src/db/mod.rs b/packages/common/chirp-workflow/core/src/db/mod.rs
index 682c400f37..467ff11f5e 100644
--- a/packages/common/chirp-workflow/core/src/db/mod.rs
+++ b/packages/common/chirp-workflow/core/src/db/mod.rs
@@ -343,3 +343,8 @@ pub struct SignalData {
pub fn sqlite_db_name_data(workflow_id: Uuid) -> (usize, Uuid, usize) {
(WORKFLOW, workflow_id, DATA)
}
+
+/// Database name for the workflow internal state.
+pub fn sqlite_db_name_internal(workflow_id: Uuid) -> (usize, Uuid, usize) {
+ (WORKFLOW, workflow_id, INTERNAL)
+}
diff --git a/packages/common/pools/src/db/sqlite/mod.rs b/packages/common/pools/src/db/sqlite/mod.rs
index 688e88929a..3ef6d2d461 100644
--- a/packages/common/pools/src/db/sqlite/mod.rs
+++ b/packages/common/pools/src/db/sqlite/mod.rs
@@ -28,7 +28,7 @@ use uuid::Uuid;
use crate::{metrics, Error, FdbPool};
-mod keys;
+pub mod keys;
#[cfg(test)]
mod tests;
@@ -146,7 +146,7 @@ impl SqliteWriterEntry {
/// DB key in packed form. This is not the full FDB key, this is the DB name segment in DbDataKey.
///
/// Stored in an `Arc` since this is frequently copied around.
-type KeyPacked = Arc>;
+pub type KeyPacked = Arc>;
pub type SqlitePoolManagerHandle = Arc;
pub type SqlitePoolManagerHandleWeak = Weak;
@@ -293,22 +293,15 @@ impl SqlitePoolManager {
// MARK: Private helpers
impl SqlitePoolManager {
- fn db_info(&self, key_packed: &KeyPacked) -> (PathBuf, String) {
+ fn db_path(&self, key_packed: &KeyPacked) -> PathBuf {
let hex_key_str = hex::encode(&**key_packed);
match &self.storage {
- SqliteStorage::Local { path } => {
- // Determine the persistent location of this database
- let db_path = path.join(format!("{hex_key_str}.db"));
- let db_url = format!("sqlite://{}", db_path.display());
- (db_path, db_url)
- }
+ // Determine the persistent location of this database
+ SqliteStorage::Local { path } => path.join(format!("{hex_key_str}.db")),
+ // Generate temporary file location so multiple readers don't clobber each other
SqliteStorage::FoundationDb { path } => {
- // Generate temporary file location so multiple readers don't clobber each other
- let db_path =
- path.join(format!("rivet-sqlite-{hex_key_str}-{}.db", Uuid::new_v4()));
- let db_url = format!("sqlite://{}", db_path.display());
- (db_path, db_url)
+ path.join(format!("rivet-sqlite-{hex_key_str}-{}.db", Uuid::new_v4()))
}
}
}
@@ -344,7 +337,7 @@ impl SqlitePoolManager {
}
}
},
- clear_db_files(&self.storage, self.db_info(&key_packed).0),
+ clear_db_files(&self.storage, self.db_path(&key_packed)),
);
}
}
@@ -746,13 +739,13 @@ pub struct SqlitePoolInner {
}
impl SqlitePoolInner {
- #[tracing::instrument(name = "sqlite_pool_new", skip_all)]
async fn new(
key_packed: KeyPacked,
conn_type: SqliteConnType,
manager: SqlitePoolManagerHandle,
) -> Result {
- let (db_path, db_url) = manager.db_info(&key_packed);
+ let db_path = manager.db_path(&key_packed);
+ let db_url = format!("sqlite://{}", db_path.display());
// Load database
match &manager.storage {
@@ -893,6 +886,7 @@ impl SqlitePoolInner {
}
impl SqlitePoolInner {
+ // TODO: Doesn't need a result type
#[tracing::instrument(name = "sqlite_pool_snapshot", skip_all)]
pub async fn snapshot(&self, vacuum: bool) -> GlobalResult {
match self
@@ -910,9 +904,21 @@ impl SqlitePoolInner {
}
}
}
+
+ #[tracing::instrument(name = "sqlite_pool_evict", skip_all)]
+ pub async fn evict(&self) -> GlobalResult<()> {
+ self
+ .manager
+ .evict_with_key(&[self.key_packed.clone()])
+ .await
+ }
}
impl SqlitePoolInner {
+ pub fn db_path(&self) -> &Path {
+ &self.db_path
+ }
+
#[tracing::instrument(skip_all)]
pub async fn conn(&self) -> Result, sqlx::Error> {
// Attempt to use an existing connection
diff --git a/packages/common/server-cli/Cargo.toml b/packages/common/server-cli/Cargo.toml
index 035714cc9c..3a65391ef1 100644
--- a/packages/common/server-cli/Cargo.toml
+++ b/packages/common/server-cli/Cargo.toml
@@ -11,13 +11,15 @@ chirp-workflow.workspace = true
chrono = "0.4.38"
clap = { version = "4.3", features = ["derive"] }
colored_json = "5.0.0"
+fdb-util.workspace = true
+foundationdb.workspace = true
futures-util = "0.3"
global-error.workspace = true
hex.workspace = true
include_dir = "0.7.4"
indoc = "2.0.5"
+lz4_flex = "0.11.3"
reqwest = "0.12.9"
-foundationdb.workspace = true
rivet-api.workspace = true
rivet-config.workspace = true
rivet-logs.workspace = true
diff --git a/packages/common/server-cli/src/commands/db/mod.rs b/packages/common/server-cli/src/commands/db/mod.rs
index dcbd2a2f52..79e118445c 100644
--- a/packages/common/server-cli/src/commands/db/mod.rs
+++ b/packages/common/server-cli/src/commands/db/mod.rs
@@ -29,6 +29,10 @@ pub enum DatabaseType {
Redis,
#[clap(alias = "ch")]
Clickhouse,
+ #[clap(alias = "wfd")]
+ WorkflowData,
+ #[clap(alias = "wfi")]
+ WorkflowInternal,
}
impl SubCommand {
@@ -56,6 +60,12 @@ impl SubCommand {
DatabaseType::Clickhouse => {
crate::util::db::clickhouse_shell(config, shell_ctx).await?
}
+ DatabaseType::WorkflowData => {
+ crate::util::db::wf_sqlite_shell(config, shell_ctx, false).await?
+ }
+ DatabaseType::WorkflowInternal => {
+ crate::util::db::wf_sqlite_shell(config, shell_ctx, true).await?
+ }
}
Ok(())
diff --git a/packages/common/server-cli/src/util/db.rs b/packages/common/server-cli/src/util/db.rs
index 82aa999d40..c881af5283 100644
--- a/packages/common/server-cli/src/util/db.rs
+++ b/packages/common/server-cli/src/util/db.rs
@@ -1,6 +1,25 @@
+use std::{
+ io::{Read, Write},
+ path::Path,
+ result::Result::Ok,
+ str::FromStr,
+ sync::Arc,
+ time::Duration,
+};
+
use anyhow::*;
+use fdb_util::{prelude::*, SERIALIZABLE};
+use foundationdb::{self as fdb, options::StreamingMode, FdbBindingError};
+use futures_util::TryStreamExt;
+use rivet_pools::db::sqlite::{keys, KeyPacked};
use serde_json::json;
-use std::path::Path;
+use sqlx::sqlite::{
+ SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode, SqliteLockingMode, SqliteSynchronous,
+};
+use tokio::io::AsyncReadExt;
+use uuid::Uuid;
+
+const CHUNK_SIZE: usize = 10_000; // 10 KB, not KiB, see https://apple.github.io/foundationdb/blob.html
pub struct ShellQuery {
pub svc: String,
@@ -176,3 +195,267 @@ pub async fn clickhouse_shell(
Ok(())
}
+
+pub async fn wf_sqlite_shell(
+ config: rivet_config::Config,
+ shell_ctx: ShellContext<'_>,
+ internal: bool,
+) -> Result<()> {
+ let ShellContext { queries, .. } = shell_ctx;
+
+ let pools = rivet_pools::Pools::new(config.clone()).await?;
+
+ // Combine all queries into one command
+ for ShellQuery {
+ svc: workflow_id,
+ query,
+ } in queries
+ {
+ let workflow_id = Uuid::from_str(workflow_id).context("could not parse input as UUID")?;
+
+ rivet_term::status::warn(
+ "WARNING",
+ "Database will open in WRITE mode. Modifications made will automatically be committed after the shell closes. This may cause changes made outside of this shell to be overwritten."
+ );
+
+ let term = rivet_term::terminal();
+ let response = rivet_term::prompt::PromptBuilder::default()
+ .message("Are you sure?")
+ .build()
+ .expect("failed to build prompt")
+ .bool(&term)
+ .await
+ .expect("failed to show prompt");
+
+ if !response {
+ return Ok(());
+ }
+
+ println!();
+
+ let key = if internal {
+ chirp_workflow::db::sqlite_db_name_internal(workflow_id)
+ } else {
+ chirp_workflow::db::sqlite_db_name_data(workflow_id)
+ };
+ let key_packed = Arc::new(key.pack_to_vec());
+
+ let db_file = tempfile::NamedTempFile::new()?;
+ let db_path = db_file.path();
+
+ read_from_fdb(&pools, &key_packed, &db_path).await?;
+
+ let mut cmd = std::process::Command::new("/root/go/bin/usql");
+ cmd.arg(format!("sqlite:{}", db_path.display()));
+
+ if let Some(query) = query {
+ cmd.args(["-c", query]);
+ }
+
+ cmd.status().context("failed running usql")?;
+
+ rivet_term::status::progress("Evicting database", "");
+ write_to_fdb(&pools, &key_packed, &db_path).await?;
+ rivet_term::status::success("Evicted", "");
+ }
+
+ Ok(())
+}
+
+async fn read_from_fdb(
+ pools: &rivet_pools::Pools,
+ key_packed: &KeyPacked,
+ db_path: &Path,
+) -> Result<()> {
+ let (data, chunks) = pools
+ .fdb()?
+ .run(|tx, _mc| {
+ let key_packed = key_packed.clone();
+ async move {
+ let compressed_db_data_subspace =
+ subspace().subspace(&keys::CompressedDbDataKey::new(key_packed.clone()));
+
+ // Fetch all chunks
+ let mut compressed_data_stream = tx.get_ranges_keyvalues(
+ fdb::RangeOption {
+ mode: StreamingMode::WantAll,
+ ..(&compressed_db_data_subspace).into()
+ },
+ SERIALIZABLE,
+ );
+
+ // Aggregate data
+ let mut buf = Vec::new();
+ let mut chunk_count = 0;
+
+ let mut compressed_data_buf = Vec::new();
+ while let Some(entry) = compressed_data_stream.try_next().await? {
+ // Parse key
+ let key = subspace()
+ .unpack::(entry.key())
+ .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;
+
+ // Validate chunk
+ if chunk_count != key.chunk {
+ return Err(FdbBindingError::CustomError("mismatched chunk".into()));
+ }
+ chunk_count += 1;
+
+ // Write to buffer
+ compressed_data_buf.extend(entry.value());
+ }
+
+ // Decompress the data
+ let mut decoder = lz4_flex::frame::FrameDecoder::new(&compressed_data_buf[..]);
+ decoder
+ .read_to_end(&mut buf)
+ .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;
+
+ // If there is no compressed data, read from the uncompressed data (backwards compatibility)
+ if chunk_count == 0 {
+ let db_data_subspace =
+ subspace().subspace(&keys::DbDataKey::new(key_packed.clone()));
+ let mut data_stream = tx.get_ranges_keyvalues(
+ fdb::RangeOption {
+ mode: StreamingMode::WantAll,
+ ..(&db_data_subspace).into()
+ },
+ SERIALIZABLE,
+ );
+
+ while let Some(entry) = data_stream.try_next().await? {
+ // Parse key
+ let key = subspace()
+ .unpack::(entry.key())
+ .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;
+
+ // Validate chunk
+ if chunk_count != key.chunk {
+ return Err(FdbBindingError::CustomError("mismatched chunk".into()));
+ }
+ chunk_count += 1;
+
+ // Write to buffer
+ buf.extend(entry.value());
+ }
+ }
+
+ Ok((buf, chunk_count))
+ }
+ })
+ .await?;
+
+ ensure!(chunks > 0, "db not found in fdb");
+
+ tokio::fs::write(db_path, data).await?;
+
+ Ok(())
+}
+
+async fn write_to_fdb(
+ pools: &rivet_pools::Pools,
+ key_packed: &KeyPacked,
+ db_path: &Path,
+) -> Result<()> {
+ let db_url = format!("sqlite://{}", db_path.display());
+
+ let opts = db_url
+ .parse::()?
+ .create_if_missing(false)
+ // Enable foreign key constraint enforcement
+ .foreign_keys(true)
+ // Enable auto vacuuming and set it to incremental mode for gradual space reclaiming
+ .auto_vacuum(SqliteAutoVacuum::Incremental)
+ // Set synchronous mode to NORMAL for performance and data safety balance
+ .synchronous(SqliteSynchronous::Normal)
+ // Increases write performance
+ .journal_mode(SqliteJournalMode::Wal)
+ // Reduces file system operations
+ .locking_mode(SqliteLockingMode::Exclusive);
+
+ let pool_opts = sqlx::sqlite::SqlitePoolOptions::new()
+ // The default connection timeout is too high
+ .acquire_timeout(Duration::from_secs(60))
+ .max_lifetime(Duration::from_secs(15 * 60))
+ .max_lifetime_jitter(Duration::from_secs(90))
+ // Remove connections after a while in order to reduce load after bursts
+ .idle_timeout(Some(Duration::from_secs(10 * 60)))
+ // Sqlite doesnt support more than 1 concurrent writer, will get "database is locked"
+ .min_connections(1)
+ .max_connections(1);
+
+ // Create pool
+ let pool = pool_opts.connect_with(opts).await?;
+
+ // Attempt to use an existing connection
+ let mut conn = if let Some(conn) = pool.try_acquire() {
+ conn
+ } else {
+ // Create a new connection
+ pool.acquire().await?
+ };
+
+ // Flush WAL journal
+ sqlx::query("PRAGMA wal_checkpoint(TRUNCATE);")
+ .execute(&mut *conn)
+ .await?;
+
+ // Stream the database file and compress it
+ let mut compressed_data = Vec::new();
+ let file = tokio::fs::File::open(db_path).await?;
+ let mut reader = tokio::io::BufReader::new(file);
+ let mut encoder = lz4_flex::frame::FrameEncoder::new(&mut compressed_data);
+
+ async {
+ let mut buffer = [0u8; 16 * 1024]; // 16 KiB
+ loop {
+ let bytes_read = reader.read(&mut buffer).await?;
+ if bytes_read == 0 {
+ break;
+ }
+ encoder.write_all(&buffer[..bytes_read])?;
+ }
+ encoder.finish()?;
+
+ Result::<_, Error>::Ok(())
+ }
+ .await?;
+
+ let data = Arc::new(compressed_data);
+
+ // Write to FDB
+ pools
+ .fdb()?
+ .run(|tx, _mc| {
+ let key_packed = key_packed.clone();
+ let data = data.clone();
+ async move {
+ // Clear previous data
+ let db_data_subspace =
+ subspace().subspace(&keys::DbDataKey::new(key_packed.clone()));
+ tx.clear_subspace_range(&db_data_subspace);
+ let compressed_db_data_subspace =
+ subspace().subspace(&keys::CompressedDbDataKey::new(key_packed.clone()));
+ tx.clear_subspace_range(&compressed_db_data_subspace);
+
+ // Write chunks
+ for (idx, chunk) in data.chunks(CHUNK_SIZE).enumerate() {
+ let chunk_key = keys::CompressedDbDataChunkKey {
+ db_name_segment: key_packed.clone(),
+ chunk: idx,
+ };
+
+ tx.set(&subspace().pack(&chunk_key), chunk);
+ }
+
+ Ok(())
+ }
+ })
+ .await?;
+
+ Ok(())
+}
+
+fn subspace() -> fdb_util::Subspace {
+ fdb_util::Subspace::new(&(RIVET, SQLITE))
+}
diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs
index 26bb86c57a..c75338adf8 100644
--- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs
+++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/components/mod.rs
@@ -217,6 +217,38 @@ pub mod skopeo {
}
}
+pub mod go {
+ use indoc::indoc;
+
+ pub fn install() -> String {
+ indoc!(
+ r#"
+ wget https://go.dev/dl/go1.24.4.linux-amd64.tar.gz
+ sudo tar -C /usr/local -xzf go1.24.4.linux-amd64.tar.gz
+ export PATH=$PATH:/usr/local/go/bin
+ export PATH="$PATH:$(go env GOPATH)/bin"
+ "#
+ )
+ .to_string()
+ }
+}
+
+pub mod usql {
+ use indoc::indoc;
+
+ /// Requires go.
+ pub fn install() -> String {
+ indoc!(
+ r#"
+ apt install -y gcc
+
+ go install github.com/xo/usql@latest
+ "#
+ )
+ .to_string()
+ }
+}
+
pub mod umoci {
use indoc::indoc;
diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs
index e95e6e1617..4a373d06e4 100644
--- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs
+++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs
@@ -72,6 +72,8 @@ pub async fn gen_install(
script.push(components::fdb::install(initialize_immediately));
}
PoolType::Worker => {
+ script.push(components::go::install());
+ script.push(components::usql::install());
script.push(components::otel_collector::install(pool_type)?);
script.push(components::rivet::worker::install(config)?);
}
diff --git a/packages/edge/services/pegboard/src/workflows/actor/mod.rs b/packages/edge/services/pegboard/src/workflows/actor/mod.rs
index f3b0e99ccd..2c82f7ec9c 100644
--- a/packages/edge/services/pegboard/src/workflows/actor/mod.rs
+++ b/packages/edge/services/pegboard/src/workflows/actor/mod.rs
@@ -165,7 +165,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResul
runtime::State::new(res.client_id, res.client_workflow_id, input.image_id),
|ctx, state| {
let input = input.clone();
- let meta = initial_actor_setup.meta.clone();
async move {
let sig = if let Some(drain_timeout_ts) = state.drain_timeout_ts {
diff --git a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs
index b2915be492..da9481b9f2 100644
--- a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs
+++ b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs
@@ -617,6 +617,19 @@ pub async fn insert_ports_fdb(ctx: &ActivityCtx, input: &InsertPortsFdbInput) ->
Ok(())
}
+#[derive(Debug, Serialize, Deserialize, Hash)]
+struct CompareRetryInput {
+ last_retry_ts: i64,
+}
+
+#[activity(CompareRetry)]
+async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> GlobalResult<(i64, bool)> {
+ let now = util::timestamp::now();
+
+ // If the last retry ts is more than RETRY_RESET_DURATION_MS, reset retry count
+ Ok((now, input.last_retry_ts < now - RETRY_RESET_DURATION_MS))
+}
+
/// Returns whether or not there was availability to spawn the actor.
pub async fn spawn_actor(
ctx: &mut WorkflowCtx,
@@ -790,9 +803,11 @@ pub async fn reschedule_actor(
let mut backoff =
util::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, state.retry_count);
- // If the last retry ts is more than RETRY_RESET_DURATION_MS, reset retry count to 0
- let now = util::timestamp::now();
- state.retry_count = if state.last_retry_ts < now - RETRY_RESET_DURATION_MS {
+ let (now, reset) = ctx.v(2).activity(CompareRetryInput {
+ last_retry_ts: state.last_retry_ts,
+ }).await?;
+
+ state.retry_count = if reset {
0
} else {
state.retry_count + 1