diff --git a/Cargo.toml b/Cargo.toml index 48d2867f51..4602d54f4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,14 +76,6 @@ default-features = false git = "https://github.com/rivet-gg/serde_array_query" rev = "b9f8bfa" -[workspace.dependencies.deno_core] -git = "https://github.com/rivet-gg/deno_core" -rev = "8a313913fa73d58f4f9532565b0084e723bc34ad" - -[workspace.dependencies.deno_runtime] -git = "https://github.com/rivet-gg/deno" -rev = "a6903d67063e07b82836399f63c7a0fa5be8bf56" - [workspace.dependencies.api-helper] path = "packages/common/api-helper/build" diff --git a/examples/system-test-actor/src/managerClient.ts b/examples/system-test-actor/src/managerClient.ts index cc86b1b0c8..62fbdcebac 100644 --- a/examples/system-test-actor/src/managerClient.ts +++ b/examples/system-test-actor/src/managerClient.ts @@ -1,6 +1,7 @@ import * as net from "net"; import * as fs from "fs"; import { setInterval, clearInterval } from "timers"; +import * as util from "util"; export function connectToManager() { const socketPath = process.env.RIVET_MANAGER_SOCKET_PATH; @@ -31,8 +32,9 @@ export function connectToManager() { client.on("data", (data) => { const packets = decodeFrames(data); - packets.forEach((packet) => { - console.log("Received packet from manager:", packet); + + for (let packet of packets) { + console.log("Received packet from manager:", util.inspect(packet, { depth: null })); if (packet.start_actor) { const response = { @@ -45,6 +47,41 @@ export function connectToManager() { }, }; client.write(encodeFrame(response)); + + const kvMessage = { + kv: { + actor_id: packet.start_actor.actor_id, + generation: packet.start_actor.generation, + request_id: 1, + data: { + put: { + keys: [ + [[1, 2, 3], [4, 5, 6]], + ], + values: [ + [11, 12, 13, 14, 15, 16] + ], + } + } + } + }; + client.write(encodeFrame(kvMessage)); + + const kvMessage2 = { + kv: { + actor_id: packet.start_actor.actor_id, + generation: packet.start_actor.generation, + request_id: 2, + data: { + get: { + keys: [ + [[1, 2, 3], [4, 5, 6]] + ], + } + } + } + }; + client.write(encodeFrame(kvMessage2)); } else if (packet.signal_actor) { const response = { actor_state_update: { @@ -59,7 +96,7 @@ export function connectToManager() { }; client.write(encodeFrame(response)); } - }); + } }); client.on("error", (error) => { @@ -98,7 +135,7 @@ function decodeFrames(buffer: Buffer): any[] { offset += 4; if (buffer.length - offset < payloadLength) break; // Incomplete frame data - const json = buffer.slice(offset, offset + payloadLength).toString(); + const json = buffer.subarray(offset, offset + payloadLength).toString(); packets.push(JSON.parse(json)); offset += payloadLength; } diff --git a/packages/common/fdb-util/src/codes.rs b/packages/common/fdb-util/src/codes.rs index 3b2e05e5e4..5ea58c2a58 100644 --- a/packages/common/fdb-util/src/codes.rs +++ b/packages/common/fdb-util/src/codes.rs @@ -1,3 +1,8 @@ +// === Copied from foundationdbrs === +pub const NIL: u8 = 0x00; +pub const NESTED: u8 = 0x05; +pub const ESCAPE: u8 = 0xff; + // FDB defines a range (0x40-0x4f) of user type codes for use with its tuple encoding system. // https://github.com/apple/foundationdb/blob/main/design/tuple.md#user-type-codes diff --git a/packages/edge/infra/client/actor-kv/Cargo.toml b/packages/edge/infra/client/actor-kv/Cargo.toml index b54bb58e19..23d54865d3 100644 --- a/packages/edge/infra/client/actor-kv/Cargo.toml +++ b/packages/edge/infra/client/actor-kv/Cargo.toml @@ -7,12 +7,12 @@ license = "Apache-2.0" [dependencies] anyhow.workspace = true -deno_core.workspace = true fdb-util.workspace = true foundationdb.workspace = true futures-util = { version = "0.3" } indexmap = { version = "2.0" } prost = "0.13.3" +rivet-util-id.workspace = true serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" tokio-tungstenite = "0.23.1" diff --git a/packages/edge/infra/client/actor-kv/src/entry.rs b/packages/edge/infra/client/actor-kv/src/entry.rs index e78d2b6edb..d9ffce45df 100644 --- a/packages/edge/infra/client/actor-kv/src/entry.rs +++ b/packages/edge/infra/client/actor-kv/src/entry.rs @@ -1,7 +1,7 @@ use anyhow::*; use foundationdb as fdb; use prost::Message; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use crate::{key::Key, metadata::Metadata}; @@ -49,7 +49,7 @@ impl EntryBuilder { } /// Represents a Rivet KV value. -#[derive(Serialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Entry { pub metadata: Metadata, pub value: Vec, diff --git a/packages/edge/infra/client/actor-kv/src/key.rs b/packages/edge/infra/client/actor-kv/src/key.rs index f65abdf538..1f955f51f8 100644 --- a/packages/edge/infra/client/actor-kv/src/key.rs +++ b/packages/edge/infra/client/actor-kv/src/key.rs @@ -1,18 +1,11 @@ -use deno_core::JsBuffer; use foundationdb::tuple::{ - Bytes, PackError, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, + Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, }; -use serde::Deserialize; +use serde::{Serialize, Deserialize}; // TODO: Custom deser impl that uses arrays instead of objects? -#[derive(Clone, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum Key { - /// Contains references to v8-owned buffers. Requires no copies. - JsInKey(Vec), - /// Cant use `ToJsBuffer` because of its API, so it gets converted to ToJsBuffer in the KV ext. - JsOutKey(Vec>), -} +#[derive(Clone, Serialize, Deserialize)] +pub struct Key(Vec>); impl std::fmt::Debug for Key { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -22,15 +15,7 @@ impl std::fmt::Debug for Key { impl PartialEq for Key { fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Key::JsInKey(a), Key::JsInKey(b)) => a - .iter() - .map(|x| x.as_ref()) - .eq(b.iter().map(|x| x.as_ref())), - (Key::JsOutKey(a), Key::JsOutKey(b)) => a == b, - (Key::JsInKey(a), Key::JsOutKey(b)) => a.iter().map(|x| x.as_ref()).eq(b.iter()), - (Key::JsOutKey(a), Key::JsInKey(b)) => a.iter().eq(b.iter().map(|x| x.as_ref())), - } + self.0 == other.0 } } @@ -38,33 +23,16 @@ impl Eq for Key {} impl std::hash::Hash for Key { fn hash(&self, state: &mut H) { - match self { - Key::JsInKey(js_in_key) => { - for buffer in js_in_key { - state.write(buffer.as_ref()); - } - } - Key::JsOutKey(out_key) => { - for buffer in out_key { - state.write(buffer); - } - } + for buffer in &self.0 { + state.write(buffer); } } } impl Key { pub fn len(&self) -> usize { - match self { - Key::JsInKey(js_in_key) => { - // Arbitrary 4 accounting for nesting overhead - js_in_key.iter().fold(0, |acc, x| acc + x.len()) + 4 * js_in_key.len() - } - Key::JsOutKey(out_key) => { - // Arbitrary 4 accounting for nesting overhead - out_key.iter().fold(0, |acc, x| acc + x.len()) + 4 * out_key.len() - } - } + // Arbitrary 4 accounting for nesting overhead + self.0.iter().fold(0, |acc, x| acc + x.len()) + 4 * self.0.len() } } @@ -74,30 +42,25 @@ impl TuplePack for Key { w: &mut W, tuple_depth: TupleDepth, ) -> std::io::Result { - match self { - Key::JsInKey(tuple) => { - let mut offset = VersionstampOffset::None { size: 0 }; + let mut offset = VersionstampOffset::None { size: 0 }; - w.write_all(&[NESTED])?; - offset += 1; + w.write_all(&[fdb_util::codes::NESTED])?; + offset += 1; - for v in tuple.iter() { - offset += v.as_ref().pack(w, tuple_depth.increment())?; - } + for v in self.0.iter() { + offset += v.pack(w, tuple_depth.increment())?; + } - w.write_all(&[NIL])?; - offset += 1; + w.write_all(&[fdb_util::codes::NIL])?; + offset += 1; - Ok(offset) - } - Key::JsOutKey(_) => unreachable!("should not be packing out keys"), - } + Ok(offset) } } impl<'de> TupleUnpack<'de> for Key { fn unpack(mut input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - input = parse_code(input, NESTED)?; + input = fdb_util::parse_code(input, fdb_util::codes::NESTED)?; let mut vec = Vec::new(); while !is_end_of_tuple(input, true) { @@ -106,15 +69,21 @@ impl<'de> TupleUnpack<'de> for Key { vec.push(v.into_owned()); } - input = parse_code(input, NIL)?; + input = fdb_util::parse_code(input, fdb_util::codes::NIL)?; - Ok((input, Key::JsOutKey(vec))) + Ok((input, Key(vec))) } } -/// Same as Key::JsInKey except when packing, it leaves off the NIL byte to allow for an open range. -#[derive(Deserialize)] -pub struct ListKey(Vec); +/// Same as Key: except when packing, it leaves off the NIL byte to allow for an open range. +#[derive(Clone, Serialize, Deserialize)] +pub struct ListKey(Vec>); + +impl std::fmt::Debug for ListKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ListKey({})", self.len()) + } +} impl TuplePack for ListKey { fn pack( @@ -124,11 +93,11 @@ impl TuplePack for ListKey { ) -> std::io::Result { let mut offset = VersionstampOffset::None { size: 0 }; - w.write_all(&[NESTED])?; + w.write_all(&[fdb_util::codes::NESTED])?; offset += 1; - for v in self.0.iter() { - offset += v.as_ref().pack(w, tuple_depth.increment())?; + for v in &self.0 { + offset += v.pack(w, tuple_depth.increment())?; } // No ending NIL byte compared to `Key::pack` @@ -144,37 +113,11 @@ impl ListKey { } } -// === Copied from foundationdbrs === -const NIL: u8 = 0x00; -const NESTED: u8 = 0x05; -const ESCAPE: u8 = 0xff; - -#[inline] -fn parse_byte(input: &[u8]) -> PackResult<(&[u8], u8)> { - if input.is_empty() { - Err(PackError::MissingBytes) - } else { - Ok((&input[1..], input[0])) - } -} - -fn parse_code(input: &[u8], expected: u8) -> PackResult<&[u8]> { - let (input, found) = parse_byte(input)?; - if found == expected { - Ok(input) - } else { - Err(PackError::BadCode { - found, - expected: Some(expected), - }) - } -} - fn is_end_of_tuple(input: &[u8], nested: bool) -> bool { match input.first() { None => true, _ if !nested => false, - Some(&NIL) => Some(&ESCAPE) != input.get(1), + Some(&fdb_util::codes::NIL) => Some(&fdb_util::codes::ESCAPE) != input.get(1), _ => false, } } diff --git a/packages/edge/infra/client/actor-kv/src/lib.rs b/packages/edge/infra/client/actor-kv/src/lib.rs index 5a0bf1bf17..f6165842f3 100644 --- a/packages/edge/infra/client/actor-kv/src/lib.rs +++ b/packages/edge/infra/client/actor-kv/src/lib.rs @@ -5,20 +5,19 @@ use std::{ }; use anyhow::*; -use deno_core::JsBuffer; pub use entry::Entry; use entry::{EntryBuilder, SubKey}; use fdb_util::keys::*; use foundationdb::{self as fdb, directory::Directory, tuple::Subspace}; use futures_util::{StreamExt, TryStreamExt}; use indexmap::IndexMap; -use key::Key; +pub use key::Key; use list_query::ListLimitReached; pub use list_query::ListQuery; pub use metadata::Metadata; use prost::Message; +use tokio::sync::Mutex; use utils::{validate_entries, validate_keys, TransactionExt}; -use uuid::Uuid; mod entry; pub mod key; @@ -33,29 +32,35 @@ const MAX_PUT_PAYLOAD_SIZE: usize = 976 * 1024; const MAX_STORAGE_SIZE: usize = 1024 * 1024 * 1024; // 1 GiB const VALUE_CHUNK_SIZE: usize = 10_000; // 10 KB, not KiB, see https://apple.github.io/foundationdb/blob.html -// Currently designed largely around the Deno runtime. More abstractions can be made later. pub struct ActorKv { version: &'static str, db: Arc, - actor_id: Uuid, - subspace: Option, + actor_id: rivet_util_id::Id, + subspace: Mutex>, } impl ActorKv { - pub fn new(db: Arc, actor_id: Uuid) -> Self { + pub fn new(db: Arc, actor_id: rivet_util_id::Id) -> Self { Self { version: env!("CARGO_PKG_VERSION"), db, actor_id, - subspace: None, + subspace: Mutex::new(None), } } - /// Initializes actor's KV. + /// Initializes actor's KV directory. /// /// If FDB is down, this will hang indefinitely until connected. - pub async fn init(&mut self) -> Result<()> { - tracing::info!("initializing actor KV"); + async fn init(&self) -> Result { + let mut guard = self.subspace.lock().await; + + // Already initialized + if let Some(subspace) = &*guard { + return Ok(subspace.clone()); + } + + tracing::info!(actor_id=?self.actor_id, "initializing actor KV"); let root = fdb::directory::DirectoryLayer::default(); @@ -82,17 +87,16 @@ impl ActorKv { .await .map_err(|err| anyhow!("failed to commit actor kv txn: {err:?}"))?; - self.subspace = Some(Subspace::from_bytes( - kv_dir.bytes().map_err(|err| anyhow!("{err:?}"))?, - )); + let subspace = Subspace::from_bytes(kv_dir.bytes().map_err(|err| anyhow!("{err:?}"))?); + *guard = Some(subspace.clone()); - tracing::info!("successfully initialized KV"); + tracing::info!(actor_id=?self.actor_id, "successfully initialized KV"); - Ok(()) + Ok(subspace) } /// Returns estimated size of the given subspace. - pub async fn get_subspace_size(&self, subspace: &Subspace) -> Result { + async fn get_subspace_size(&self, subspace: &Subspace) -> Result { let (start, end) = subspace.range(); // This txn does not have to be committed because we are not modifying any data @@ -104,10 +108,7 @@ impl ActorKv { /// Gets keys from the KV store. pub async fn get(&self, keys: Vec) -> Result> { - let subspace = self - .subspace - .as_ref() - .context("must call `ActorKv::init` before using KV operations")?; + let subspace = &self.init().await?; validate_keys(&keys)?; @@ -187,10 +188,7 @@ impl ActorKv { reverse: bool, limit: Option, ) -> Result> { - let subspace = self - .subspace - .as_ref() - .context("must call `ActorKv::init` before using KV operations")?; + let subspace = &self.init().await?; query.validate()?; @@ -305,18 +303,15 @@ impl ActorKv { } /// Puts keys into the KV store. - pub async fn put(&self, entries: HashMap) -> Result<()> { - let subspace = self - .subspace - .as_ref() - .context("must call `ActorKv::init` before using KV operations")?; + pub async fn put(&self, entries: Vec<(Key, Vec)>) -> Result<()> { + let subspace = &self.init().await?; let total_size = self.get_subspace_size(subspace).await? as usize; validate_entries(&entries, total_size)?; self.db .run(|tx, _mc| { - // TODO: Potentially costly clone + // TODO: Costly clone let entries = entries.clone(); let subspace = subspace.clone(); @@ -342,7 +337,7 @@ impl ActorKv { // Set metadata tx.set(&key_subspace.pack(&METADATA), &buf); - // Set data + // Set key data in chunks for start in (0..value.len()).step_by(VALUE_CHUNK_SIZE) { let idx = start / VALUE_CHUNK_SIZE; let end = (start + VALUE_CHUNK_SIZE).min(value.len()); @@ -367,12 +362,9 @@ impl ActorKv { .map_err(Into::into) } - /// Deletes keys from the KV store. + /// Deletes keys from the KV store. Cannot be undone. pub async fn delete(&self, keys: Vec) -> Result<()> { - let subspace = self - .subspace - .as_ref() - .context("must call `ActorKv::init` before using KV operations")?; + let subspace = &self.init().await?; validate_keys(&keys)?; @@ -393,12 +385,9 @@ impl ActorKv { .map_err(Into::into) } - /// Deletes all keys from the KV store. + /// Deletes all keys from the KV store. Cannot be undone. pub async fn delete_all(&self) -> Result<()> { - let subspace = self - .subspace - .as_ref() - .context("must call `ActorKv::init` before using KV operations")?; + let subspace = &self.init().await?; self.db .run(|tx, _mc| async move { diff --git a/packages/edge/infra/client/actor-kv/src/list_query.rs b/packages/edge/infra/client/actor-kv/src/list_query.rs index f070d3006a..20abd04e55 100644 --- a/packages/edge/infra/client/actor-kv/src/list_query.rs +++ b/packages/edge/infra/client/actor-kv/src/list_query.rs @@ -1,7 +1,7 @@ use anyhow::*; use foundationdb::tuple::Subspace; use indexmap::IndexMap; -use serde::Deserialize; +use serde::{Serialize, Deserialize}; use crate::{ entry::EntryBuilder, @@ -9,8 +9,8 @@ use crate::{ MAX_KEY_SIZE, }; -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] pub enum ListQuery { All, RangeInclusive(ListKey, Key), diff --git a/packages/edge/infra/client/actor-kv/src/metadata.rs b/packages/edge/infra/client/actor-kv/src/metadata.rs index 3c8863b35a..a1af22fea6 100644 --- a/packages/edge/infra/client/actor-kv/src/metadata.rs +++ b/packages/edge/infra/client/actor-kv/src/metadata.rs @@ -1,6 +1,6 @@ -use serde::Serialize; +use serde::{Deserialize, Serialize}; -#[derive(Clone, PartialEq, ::prost::Message, Serialize)] +#[derive(Clone, PartialEq, ::prost::Message, Deserialize, Serialize)] pub struct Metadata { #[prost(bytes = "vec", tag = "1")] pub kv_version: Vec, diff --git a/packages/edge/infra/client/actor-kv/src/utils.rs b/packages/edge/infra/client/actor-kv/src/utils.rs index ca87a3ffc4..2fd4274fb9 100644 --- a/packages/edge/infra/client/actor-kv/src/utils.rs +++ b/packages/edge/infra/client/actor-kv/src/utils.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, result::Result::Ok}; use anyhow::*; -use deno_core::JsBuffer; use foundationdb as fdb; use futures_util::{FutureExt, TryStreamExt}; @@ -90,7 +89,7 @@ pub fn validate_keys(keys: &[Key]) -> Result<()> { Ok(()) } -pub fn validate_entries(entries: &HashMap, total_size: usize) -> Result<()> { +pub fn validate_entries(entries: &Vec<(Key, Vec)>, total_size: usize) -> Result<()> { ensure!( entries.len() <= MAX_KEYS, "A maximum of 128 key-value entries is allowed" diff --git a/packages/edge/infra/client/config/Cargo.toml b/packages/edge/infra/client/config/Cargo.toml index 58b6f3c684..caffc30362 100644 --- a/packages/edge/infra/client/config/Cargo.toml +++ b/packages/edge/infra/client/config/Cargo.toml @@ -7,9 +7,11 @@ edition.workspace = true [dependencies] anyhow = "1.0" +indexmap = { version = "2.0" } ipnet = { version = "2.10.1", features = ["serde"] } pegboard.workspace = true -rivet-util.workspace = true +pegboard-actor-kv.workspace = true +rivet-util-id.workspace = true schemars = { version = "0.8.21", features = ["url", "uuid1"] } serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0" diff --git a/packages/edge/infra/client/config/src/runner_protocol.rs b/packages/edge/infra/client/config/src/runner_protocol.rs index afb92980cc..0750315ef0 100644 --- a/packages/edge/infra/client/config/src/runner_protocol.rs +++ b/packages/edge/infra/client/config/src/runner_protocol.rs @@ -1,19 +1,23 @@ -use std::io::{Write, Cursor}; +use std::{ + io::{Cursor, Write}, +}; use anyhow::*; use pegboard::protocol; -use tokio_util::codec::LengthDelimitedCodec; +use pegboard_actor_kv as kv; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio_util::codec::LengthDelimitedCodec; #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub enum ToManager { ActorStateUpdate { - actor_id: rivet_util::Id, + actor_id: rivet_util_id::Id, generation: u32, state: ActorState, }, Ping, + Kv(KvRequest), } #[derive(Debug, Serialize, Deserialize)] @@ -25,17 +29,18 @@ pub enum ToRunner { }, StartActor { - actor_id: rivet_util::Id, + actor_id: rivet_util_id::Id, generation: u32, env: protocol::HashableMap, metadata: protocol::Raw, }, SignalActor { - actor_id: rivet_util::Id, + actor_id: rivet_util_id::Id, generation: u32, signal: i32, persist_storage: bool, }, + Kv(KvResponse), } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -45,6 +50,71 @@ pub enum ActorState { Exited { exit_code: Option }, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KvRequest { + pub actor_id: rivet_util_id::Id, + // TODO: This shouldn't require generation since all gens share the same kv + pub generation: u32, + /// Deduplication id. + pub request_id: u32, + pub data: KvRequestData, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub enum KvRequestData { + Get { + keys: Vec, + }, + List { + query: kv::ListQuery, + reverse: bool, + limit: Option, + }, + Put { + keys: Vec, + values: Vec>, + }, + Delete { + keys: Vec, + }, + Drop {}, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct KvResponse { + /// Deduplication id. + pub request_id: u32, + pub data: Option, + pub error: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub enum KvResponseData { + Get { + keys: Vec, + values: Vec, + }, + List { + keys: Vec, + values: Vec, + }, + Put {}, + Delete {}, + Drop {}, +} + +// Small subset of the ToRunner enum that gets proxied to the actor +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub enum ToActor { + StateUpdate { + state: ActorState, + }, + Kv(KvRequest), +} + pub fn codec() -> LengthDelimitedCodec { LengthDelimitedCodec::builder() .length_field_type::() diff --git a/packages/edge/infra/client/isolate-v8-runner/src/main.rs b/packages/edge/infra/client/isolate-v8-runner/src/main.rs deleted file mode 100644 index 3272f7b971..0000000000 --- a/packages/edge/infra/client/isolate-v8-runner/src/main.rs +++ /dev/null @@ -1,336 +0,0 @@ -use std::{ - collections::HashMap, - path::Path, - result::Result::{Err, Ok}, - sync::Arc, - thread::JoinHandle, - time::Duration, -}; - -use anyhow::*; -use deno_core::{v8_set_flags, JsRuntime}; -use deno_runtime::worker::MainWorkerTerminateHandle; -use futures_util::{stream::SplitStream, SinkExt, StreamExt}; -use pegboard_actor_kv::ActorKv; -use pegboard_config::{isolate_runner::Config, runner_protocol}; -use tokio::{ - fs, - net::TcpStream, - sync::{mpsc, watch, RwLock}, -}; -use tokio_tungstenite::{tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream}; -use utils::FdbPool; -use uuid::Uuid; - -mod ext; -mod isolate; -mod log_shipper; -mod metadata; -mod throttle; -mod utils; - -enum Packet { - Msg(runner_protocol::ToRunner), - Pong, - None, -} - -/// Manager port to connect to. -const THREAD_STATUS_POLL_INTERVAL: Duration = Duration::from_millis(500); -const PING_INTERVAL: Duration = Duration::from_secs(1); -// 7 day logs retention -const LOGS_RETENTION: Duration = Duration::from_secs(7 * 24 * 60 * 60); - -fn main() -> Result<()> { - rivet_runtime::run(main_inner()).transpose()?; - Ok(()) -} - -async fn main_inner() -> Result<()> { - // Initialize with a default CryptoProvider for rustls - let provider = rustls::crypto::ring::default_provider(); - provider - .install_default() - .expect("Failed to install crypto provider"); - - let working_path = std::env::args() - .skip(1) - .next() - .context("`working_path` arg required")?; - let working_path = Path::new(&working_path); - - rivet_logs::Logs::new(working_path.join("logs"), LOGS_RETENTION) - .start() - .await?; - - let config_data = fs::read_to_string(working_path.join("config.json")).await?; - let config = serde_json::from_str::(&config_data)?; - - let fdb_pool = utils::setup_fdb_pool(&config).await?; - - tracing::info!(pid=%std::process::id(), "starting"); - - // Write PID to file - fs::write( - working_path.join("pid"), - std::process::id().to_string().as_bytes(), - ) - .await?; - - // Set v8 flags (https://chromium.googlesource.com/v8/v8/+/refs/heads/main/src/flags/flag-definitions.h) - let invalid = v8_set_flags(vec![ - // Binary name - "UNUSED_BUT_NECESSARY_ARG0".into(), - // Disable eval - "--disallow-code-generation-from-strings".into(), - ]); - assert!( - invalid.len() == 1, - "v8 did not understand these flags: {:?}", - invalid.into_iter().skip(1).collect::>(), - ); - - // Explicitly start runtime on current thread - JsRuntime::init_platform(None, false); - - let actors = Arc::new(RwLock::new(HashMap::new())); - let (fatal_tx, mut fatal_rx) = watch::channel(()); - - let res = tokio::select! { - res = retry_connection(&config, &fdb_pool, actors, fatal_tx) => res, - // If any fatal error occurs in the isolate threads, kill the entire program - _ = fatal_rx.changed() => Err(anyhow!("Fatal error")), - }; - - // Write exit code - if let Err(err) = &res { - tracing::error!(?err); - - fs::write(working_path.join("exit-code"), 1.to_string().as_bytes()).await?; - } - - res -} - -async fn retry_connection( - config: &Config, - fdb_pool: &FdbPool, - actors: Arc>>>, - fatal_tx: watch::Sender<()>, -) -> Result<()> { - loop { - use std::result::Result::{Err, Ok}; - match tokio_tungstenite::connect_async(format!("ws://{}", config.manager_ws_addr)).await { - Ok((socket, _)) => { - handle_connection(config, fdb_pool, actors.clone(), fatal_tx.clone(), socket) - .await? - } - Err(err) => tracing::error!("Failed to connect: {err}"), - } - - tracing::info!("Retrying connection"); - std::thread::sleep(Duration::from_secs(1)); - } -} - -async fn handle_connection( - config: &Config, - fdb_pool: &FdbPool, - actors: Arc>>>, - fatal_tx: watch::Sender<()>, - socket: WebSocketStream>, -) -> Result<()> { - tracing::info!("Connected"); - - let (mut tx, mut rx) = socket.split(); - - // NOTE: Currently, the error from the ping thread is not caught but we assume error handling elsewhere - // will catch any connection issues. - // Start ping thread - let _: tokio::task::JoinHandle> = tokio::spawn(async move { - loop { - tokio::time::sleep(PING_INTERVAL).await; - tx.send(Message::Ping(Vec::new())).await?; - } - }); - - loop { - let packet = match read_packet(&mut rx).await? { - Packet::Msg(packet) => packet, - Packet::Pong => continue, - Packet::None => return Ok(()), - }; - - match packet { - runner_protocol::ToRunner::Start { - actor_id, - generation, - } => { - let mut guard = actors.write().await; - - if guard.contains_key(&(actor_id, generation)) { - tracing::error!( - "Actor {actor_id}-{generation} already exists, ignoring new start packet" - ); - } else { - // For receiving the terminate handle from the isolate thread - let (terminate_tx, terminate_rx) = - mpsc::channel::(1); - let (signal_tx, signal_rx) = mpsc::channel(1); - - // Store actor signal sender - guard.insert((actor_id, generation), signal_tx); - drop(guard); - - // Spawn a new thread for the isolate - let config2 = config.clone(); - let fdb_pool2 = fdb_pool.clone(); - let handle = std::thread::Builder::new() - .name(format!("{actor_id}-{generation}")) - .spawn(move || { - isolate::run(config2, fdb_pool2, actor_id, generation, terminate_tx) - })?; - - tokio::task::spawn(watch_thread( - fdb_pool.clone(), - actors.clone(), - fatal_tx.clone(), - actor_id, - generation, - terminate_rx, - signal_rx, - handle, - )); - } - } - runner_protocol::ToRunner::Signal { - actor_id, - generation, - signal, - persist_storage, - } => { - if let Some(signal_tx) = actors.read().await.get(&(actor_id, generation)) { - // Tell actor thread to stop. Removing the actor is handled in the tokio task above. - signal_tx - .try_send((signal, persist_storage)) - .context("failed to send stop signal to actor thread watcher")?; - } else { - tracing::warn!("Actor {actor_id}-{generation} not found for stopping"); - } - } - runner_protocol::ToRunner::Terminate => bail!("Received terminate"), - } - } -} - -async fn read_packet( - socket: &mut SplitStream>>, -) -> Result { - let buf = match socket.next().await { - Some(Ok(Message::Binary(buf))) => buf, - Some(Ok(Message::Close(_))) => { - tracing::error!("Connection closed"); - return Ok(Packet::None); - } - Some(Ok(Message::Pong(_))) => { - tracing::trace!("received pong"); - return Ok(Packet::Pong); - } - Some(Ok(msg)) => bail!("unexpected message: {msg:?}"), - Some(Err(err)) => { - tracing::error!("Connection failed: {err}"); - return Ok(Packet::None); - } - None => { - tracing::error!("Stream closed"); - return Ok(Packet::None); - } - }; - - let packet = serde_json::from_slice(&buf)?; - - Ok(Packet::Msg(packet)) -} - -/// Polls the isolate thread we just spawned to see if it errored. Should handle all errors gracefully. -async fn watch_thread( - fdb_pool: FdbPool, - actors: Arc>>>, - fatal_tx: watch::Sender<()>, - actor_id: Uuid, - generation: u32, - mut terminate_rx: mpsc::Receiver, - mut signal_rx: mpsc::Receiver<(i32, bool)>, - handle: JoinHandle>, -) { - // Await terminate handle. If the transmitting end of the terminate handle was dropped (`recv` returned - // `None`), either the worker failed to create or the thread stopped. The latter is handled later - let terminate_handle = terminate_rx.recv().await; - drop(terminate_rx); - - // Wait for either the thread to stop or a signal to be received - let persist_storage = tokio::select! { - biased; - _ = poll_thread(&handle) => true, - res = signal_rx.recv() => { - let Some((_signal, persist_storage)) = res else { - tracing::error!(?actor_id, ?generation, "failed to receive signal"); - fatal_tx.send(()).expect("receiver cannot be dropped"); - return; - }; - - if let Some(terminate_handle) = terminate_handle { - // Currently, we terminate regardless of what the signal is - terminate_handle.terminate(); - } - - persist_storage - } - }; - - // Remove actor - { - actors.write().await.remove(&(actor_id, generation)); - } - - // Remove state - if !persist_storage { - if let Err(err) = ActorKv::new((&*fdb_pool).clone(), actor_id).destroy().await { - tracing::error!(?err, ?actor_id, "failed to destroy actor kv"); - fatal_tx.send(()).expect("receiver cannot be dropped"); - return; - }; - } - - // Cleanup thread - poll_thread(&handle).await; - cleanup_thread(actor_id, generation, handle, &fatal_tx); -} - -async fn poll_thread(handle: &JoinHandle>) { - loop { - if handle.is_finished() { - return; - } - - tokio::time::sleep(THREAD_STATUS_POLL_INTERVAL).await; - } -} - -fn cleanup_thread( - actor_id: Uuid, - generation: u32, - handle: JoinHandle>, - fatal_tx: &watch::Sender<()>, -) { - let res = handle.join(); - - match res { - Ok(Err(err)) => { - tracing::error!(?actor_id, ?generation, "Isolate thread failed:\n{err:?}"); - fatal_tx.send(()).expect("receiver cannot be dropped") - } - Err(_) => fatal_tx.send(()).expect("receiver cannot be dropped"), - _ => {} - } -} diff --git a/packages/edge/infra/client/manager/Cargo.toml b/packages/edge/infra/client/manager/Cargo.toml index 240d290d97..ae7ed5b196 100644 --- a/packages/edge/infra/client/manager/Cargo.toml +++ b/packages/edge/infra/client/manager/Cargo.toml @@ -16,13 +16,16 @@ test = [] [dependencies] anyhow.workspace = true bytes = "1.0" +foundationdb.workspace = true futures-util = { version = "0.3" } +fdb-util.workspace = true hyper = { version = "0.14", features = ["server", "http1", "tcp", "stream"] } indoc = "2.0" json5.workspace = true lazy_static = "1.4" nix.workspace = true notify = { version = "6.1.1", default-features = false, features = ["serde", "fsevent-sys"] } +pegboard-actor-kv.workspace = true pegboard-config.workspace = true pegboard.workspace = true prometheus = "0.13" diff --git a/packages/edge/infra/client/manager/src/actor/mod.rs b/packages/edge/infra/client/manager/src/actor/mod.rs index f8fcac6d84..80d758171b 100644 --- a/packages/edge/infra/client/manager/src/actor/mod.rs +++ b/packages/edge/infra/client/manager/src/actor/mod.rs @@ -7,6 +7,7 @@ use anyhow::*; use indoc::indoc; use nix::{sys::signal::Signal, unistd::Pid}; use pegboard::protocol; +use pegboard_actor_kv as kv; use pegboard_config::runner_protocol; use crate::{ctx::Ctx, runner, utils}; @@ -16,10 +17,12 @@ pub struct Actor { generation: u32, config: protocol::ActorConfig, runner: Arc, + kv: kv::ActorKv, } impl Actor { pub fn new( + fdb: &utils::fdb::FdbPool, actor_id: rivet_util::Id, generation: u32, config: protocol::ActorConfig, @@ -30,6 +33,7 @@ impl Actor { generation, config, runner, + kv: kv::ActorKv::new((&**fdb).clone(), actor_id), }) } @@ -82,8 +86,8 @@ impl Actor { let ctx2 = ctx.clone(); tokio::spawn(async move { match self2.run(&ctx2).await { - Ok(observers) => { - if let Err(err) = self2.observe(&ctx2, observers).await { + Ok(observer) => { + if let Err(err) = self2.observe(&ctx2, observer).await { tracing::error!(actor_id=?self2.actor_id, ?err, "observe failed"); } } @@ -101,18 +105,12 @@ impl Actor { Ok(()) } - async fn run(self: &Arc, ctx: &Arc) -> Result> { + async fn run(self: &Arc, ctx: &Arc) -> Result { tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "running"); - // NOTE: Create actor observer before sending the start actor message to prevent a race + // NOTE: Create actor proxy before sending the start actor message to prevent a race // condition - let actor_observer = match self.runner.config().image.allocation_type { - protocol::ImageAllocationType::Single => None, - protocol::ImageAllocationType::Multi => Some( - self.runner - .new_actor_observer(self.actor_id, self.generation), - ), - }; + let actor_proxy = self.runner.new_actor_proxy(self.actor_id, self.generation); match self .config @@ -166,42 +164,120 @@ impl Actor { } } - Ok(actor_observer) + Ok(actor_proxy) } // Watch actor for updates pub(crate) async fn observe( &self, ctx: &Arc, - actor_observer: Option, + mut actor_proxy: runner::ActorProxy, ) -> Result<()> { tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "observing"); - let exit_code = if let Some(mut actor_observer) = actor_observer { - loop { - tokio::select! { - // We have to check if the shared runner exited or if the actor exited - res = self.runner.observe(ctx, true) => break res?, - res = actor_observer.next() => match res { - Some(runner_protocol::ActorState::Running) => { - tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "actor set to running"); - - let (pid, ports) = tokio::try_join!( - self.runner.pid(), - self.runner.ports(ctx), - )?; - - self.set_running(ctx, pid, ports).await?; - }, - Some(runner_protocol::ActorState::Exited { - exit_code, - }) => break exit_code, - None => break None, - }, + let exit_code = loop { + tokio::select! { + // We have to check if the shared runner exited or if the actor exited + res = self.runner.observe(ctx, true) => break res?, + res = actor_proxy.next() => { + let Some(res) = res else { + // Channel closed + break None; + }; + + match res { + runner_protocol::ToActor::StateUpdate { state } => { + match state { + runner_protocol::ActorState::Running => { + tracing::info!( + actor_id=?self.actor_id, + generation=?self.generation, + "actor set to running" + ); + + let (pid, ports) = tokio::try_join!( + self.runner.pid(), + self.runner.ports(ctx), + )?; + + self.set_running(ctx, pid, ports).await?; + }, + runner_protocol::ActorState::Exited { + exit_code, + } => break exit_code, + } + } + runner_protocol::ToActor::Kv(req) => { + // TODO: Add queue and bg thread for processing kv ops + // Run kv operation + match req.data { + runner_protocol::KvRequestData::Get { keys } => { + let res = self.kv.get(keys).await; + let error = res.as_ref().err().map(|x| x.to_string()); + + self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { + request_id: req.request_id, + data: res.ok().map(|entries| { + let (keys, values) = entries.into_iter().unzip(); + runner_protocol::KvResponseData::Get { + keys, + values, + } + }), + error, + })).await?; + } + runner_protocol::KvRequestData::List { query, reverse, limit } => { + let res = self.kv.list(query, reverse, limit).await; + let error = res.as_ref().err().map(|x| x.to_string()); + + self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { + request_id: req.request_id, + data: res.ok().map(|entries| { + let (keys, values) = entries.into_iter().unzip(); + runner_protocol::KvResponseData::List { + keys, + values, + } + }), + error, + })).await?; + } + runner_protocol::KvRequestData::Put { keys, values } => { + let res = self.kv.put(keys.into_iter().zip(values.into_iter()).collect()).await; + let error = res.as_ref().err().map(|x| x.to_string()); + + self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { + request_id: req.request_id, + data: res.ok().map(|_| runner_protocol::KvResponseData::Put {}), + error, + })).await?; + } + runner_protocol::KvRequestData::Delete { keys } => { + let res = self.kv.delete(keys).await; + let error = res.as_ref().err().map(|x| x.to_string()); + + self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { + request_id: req.request_id, + data: res.ok().map(|_| runner_protocol::KvResponseData::Delete {}), + error, + })).await?; + } + runner_protocol::KvRequestData::Drop { } => { + let res = self.kv.delete_all().await; + let error = res.as_ref().err().map(|x| x.to_string()); + + self.runner.send(&runner_protocol::ToRunner::Kv(runner_protocol::KvResponse { + request_id: req.request_id, + data: res.ok().map(|_| runner_protocol::KvResponseData::Drop {}), + error, + })).await?; + } + } + } + } } } - } else { - self.runner.observe(ctx, true).await? }; self.set_exit_code(ctx, exit_code).await?; diff --git a/packages/edge/infra/client/manager/src/ctx.rs b/packages/edge/infra/client/manager/src/ctx.rs index d39423f737..d6d5be8a5a 100644 --- a/packages/edge/infra/client/manager/src/ctx.rs +++ b/packages/edge/infra/client/manager/src/ctx.rs @@ -38,7 +38,7 @@ use crate::{ image_download_handler::ImageDownloadHandler, metrics, runner::{self, Runner}, - utils::{self, sql::SqlitePoolExt}, + utils::{self, fdb::FdbPool, sql::SqlitePoolExt}, }; const PING_INTERVAL: Duration = Duration::from_secs(1); @@ -82,10 +82,13 @@ pub struct Ctx { config: Config, system: SystemInfo, - // This requires a RwLock because of the reset functionality which reinitialized the entire database. It + // This requires a RwLock because of the reset functionality which reinitializes the entire database. It // should never be written to besides that. pool: RwLock, + fdb: FdbPool, + tx: Mutex>, Message>>, + event_sender: EventSender, pub(crate) image_download_handler: ImageDownloadHandler, @@ -98,6 +101,7 @@ impl Ctx { config: Config, system: SystemInfo, pool: SqlitePool, + fdb: FdbPool, tx: SplitSink>, Message>, ) -> Arc { Arc::new(Ctx { @@ -105,7 +109,10 @@ impl Ctx { system, pool: RwLock::new(pool), + fdb, + tx: Mutex::new(tx), + event_sender: EventSender::new(), image_download_handler: ImageDownloadHandler::new(), @@ -400,7 +407,7 @@ impl Ctx { ) .await? { - let actor = Actor::new(actor_id, generation, *config, runner); + let actor = Actor::new(&self.fdb, actor_id, generation, *config, runner); // Insert actor actors.insert((actor_id, generation), actor); @@ -773,7 +780,7 @@ impl Ctx { let mut runners_guard = self.runners.write().await; let mut actors_guard = self.actors.write().await; - // Start runner observers + // Start runner proxies for row in runner_rows { let Some(pid) = row.pid else { continue; @@ -837,23 +844,23 @@ impl Ctx { // NOTE: No runner sockets are connected yet so there is no race condition with missed state // updates here let generation = row.generation.try_into()?; - let actor_observer = if let protocol::ImageAllocationType::Multi = - runner.config().image.allocation_type - { - Some(runner.new_actor_observer(row.actor_id, generation)) - } else { - None - }; + let actor_proxy = runner.new_actor_proxy(row.actor_id, generation); let actor = actors_guard .entry((row.actor_id, generation)) - .or_insert(Actor::new(row.actor_id, generation, config, runner.clone())); + .or_insert(Actor::new( + &self.fdb, + row.actor_id, + generation, + config, + runner.clone(), + )); let actor_id = row.actor_id; let actor = actor.clone(); let self2 = self.clone(); tokio::spawn(async move { - if let Err(err) = actor.observe(&self2, actor_observer).await { + if let Err(err) = actor.observe(&self2, actor_proxy).await { tracing::error!(?actor_id, ?err, "observe failed"); } diff --git a/packages/edge/infra/client/manager/src/main.rs b/packages/edge/infra/client/manager/src/main.rs index cb9754f95b..e2381d593a 100644 --- a/packages/edge/infra/client/manager/src/main.rs +++ b/packages/edge/infra/client/manager/src/main.rs @@ -36,6 +36,7 @@ struct Init { config: Config, system: SystemInfo, pool: SqlitePool, + fdb: utils::fdb::FdbPool, } fn main() -> Result<()> { @@ -151,10 +152,14 @@ async fn init() -> Result { // Init sqlite db let pool = utils::init_sqlite_db(&config).await?; + // Init fdb pool handle + let fdb = utils::fdb::FdbPool::new(&config).await?; + Ok(Init { config, system, pool, + fdb, }) } @@ -187,7 +192,7 @@ async fn run(init: Init, first: bool) -> Result<()> { tracing::info!("connected to pegboard ws"); - let ctx = Ctx::new(init.config, init.system, init.pool, tx); + let ctx = Ctx::new(init.config, init.system, init.pool, init.fdb, tx); tokio::try_join!( async { metrics_task.await.map_err(Into::::into)? }, diff --git a/packages/edge/infra/client/manager/src/runner/mod.rs b/packages/edge/infra/client/manager/src/runner/mod.rs index 5257a1952c..4e400c1e1f 100644 --- a/packages/edge/infra/client/manager/src/runner/mod.rs +++ b/packages/edge/infra/client/manager/src/runner/mod.rs @@ -76,7 +76,8 @@ pub struct Runner { /// Used instead of polling loops for faster updates. bump_channel: broadcast::Sender<()>, - actor_observer_tx: broadcast::Sender<(rivet_util::Id, u32, runner_protocol::ActorState)>, + // TODO: replace with a single stream for each actor? + actor_proxy_tx: broadcast::Sender<(rivet_util::Id, u32, runner_protocol::ToActor)>, } impl Runner { @@ -87,7 +88,7 @@ impl Runner { config, pid: RwLock::new(None), bump_channel: broadcast::channel(2).0, - actor_observer_tx: broadcast::channel(16).0, + actor_proxy_tx: broadcast::channel(16).0, } } @@ -103,7 +104,7 @@ impl Runner { config, pid: RwLock::new(Some(pid)), bump_channel: broadcast::channel(1).0, - actor_observer_tx: broadcast::channel(16).0, + actor_proxy_tx: broadcast::channel(16).0, } } @@ -177,7 +178,8 @@ impl Runner { break Ok(()); }; - let (_, packet) = runner_protocol::decode_frame::(&buf?).context("failed to decode frame")?; + let (_, packet) = runner_protocol::decode_frame::(&buf?) + .context("failed to decode frame")?; tracing::debug!(?packet, "runner received packet"); @@ -197,12 +199,23 @@ impl Runner { } protocol::ImageAllocationType::Multi => { // NOTE: We don't have to verify if the actor id given here is valid because only valid actors - // are listening to this runner's `actor_observer_tx`. This means invalid messages are ignored. + // are listening to this runner's `actor_proxy_tx`. This means invalid messages are ignored. // NOTE: No receivers is not an error - let _ = self.actor_observer_tx.send((actor_id, generation, state)); + let _ = self.actor_proxy_tx.send(( + actor_id, + generation, + runner_protocol::ToActor::StateUpdate { state }, + )); } } } + runner_protocol::ToManager::Kv(req) => { + let _ = self.actor_proxy_tx.send(( + req.actor_id, + req.generation, + runner_protocol::ToActor::Kv(req), + )); + } } } } @@ -240,7 +253,8 @@ impl Runner { })??; let socket = guard.as_mut().expect("should exist"); - let buf = runner_protocol::encode_frame(packet).context("failed to encode frame")?; + let buf = + runner_protocol::encode_frame(packet).context("failed to encode frame")?; socket .send(buf.into()) .await @@ -454,8 +468,8 @@ impl Runner { Ok(exit_code) } - pub fn new_actor_observer(&self, actor_id: rivet_util::Id, generation: u32) -> ActorObserver { - ActorObserver::new(actor_id, generation, self.actor_observer_tx.subscribe()) + pub fn new_actor_proxy(&self, actor_id: rivet_util::Id, generation: u32) -> ActorProxy { + ActorProxy::new(actor_id, generation, self.actor_proxy_tx.subscribe()) } pub async fn signal(&self, ctx: &Ctx, signal: Signal) -> Result<()> { @@ -679,25 +693,25 @@ impl Comms { } } -pub struct ActorObserver { +pub struct ActorProxy { actor_id: rivet_util::Id, generation: u32, - sub: broadcast::Receiver<(rivet_util::Id, u32, runner_protocol::ActorState)>, + sub: broadcast::Receiver<(rivet_util::Id, u32, runner_protocol::ToActor)>, } -impl ActorObserver { +impl ActorProxy { fn new( actor_id: rivet_util::Id, generation: u32, - sub: broadcast::Receiver<(rivet_util::Id, u32, runner_protocol::ActorState)>, + sub: broadcast::Receiver<(rivet_util::Id, u32, runner_protocol::ToActor)>, ) -> Self { - ActorObserver { + ActorProxy { actor_id, generation, sub, } } - pub async fn next(&mut self) -> Option { + pub async fn next(&mut self) -> Option { loop { let Ok((other_actor_id, other_generation, state)) = self.sub.recv().await else { tracing::error!("actor observer channel dropped"); diff --git a/packages/edge/infra/client/manager/src/utils/fdb.rs b/packages/edge/infra/client/manager/src/utils/fdb.rs new file mode 100644 index 0000000000..3e26e5f41b --- /dev/null +++ b/packages/edge/infra/client/manager/src/utils/fdb.rs @@ -0,0 +1,105 @@ +use std::{ops::Deref, path::Path, result::Result::Ok, sync::Arc}; + +use anyhow::*; +use foundationdb as fdb; +use pegboard_config::Config; +use service_discovery::ServiceDiscovery; +use tokio::fs; + +// TODO: Copied from rivet_pools +#[derive(Clone)] +pub struct FdbPool { + db: Arc, + _sd: Option>, + // Prevent dropping temp file + _connection_file: Arc, +} + +impl Deref for FdbPool { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.db + } +} + +impl FdbPool { + #[tracing::instrument(skip(config))] + pub async fn new(config: &Config) -> Result { + let temp_file = tempfile::NamedTempFile::new()?; + let temp_path = temp_file.path().to_path_buf(); + + let fdb_config = &config.client.foundationdb; + + let sd = match &fdb_config.addresses { + pegboard_config::Addresses::Dynamic { fetch_endpoint } => { + let sd = ServiceDiscovery::new(fetch_endpoint.clone()); + + // Initial fetch + let servers = sd.fetch().await.context("failed to fetch services")?; + let joined = servers + .into_iter() + .filter_map(|server| server.lan_ip) + .map(|lan_ip| format!("{lan_ip}:4500")) + .collect::>() + .join(","); + write_connection_file(&fdb_config, &temp_path, &joined).await?; + + let fdb_config = config.client.foundationdb.clone(); + sd.start(move |servers| { + let temp_path = temp_path.clone(); + let fdb_config = fdb_config.clone(); + async move { + let joined = servers + .into_iter() + .filter_map(|server| server.lan_ip) + .map(|lan_ip| format!("{lan_ip}:4500")) + .collect::>() + .join(","); + + write_connection_file(&fdb_config, &temp_path, &joined).await?; + + anyhow::Ok(()) + } + }); + + Some(sd) + } + pegboard_config::Addresses::Static(addresses) => { + let joined = addresses.join(","); + write_connection_file(&fdb_config, &temp_path, &joined).await?; + + None + } + }; + + // Start network + fdb_util::init(temp_file.path()); + + let fdb_handle = fdb_util::handle(&temp_file.path())?; + + tracing::debug!(config_file_path=%temp_file.path().display(), "fdb started"); + + Ok(FdbPool { + db: Arc::new(fdb_handle), + _sd: sd, + _connection_file: Arc::new(temp_file), + }) + } +} + +async fn write_connection_file( + fdb_config: &pegboard_config::FoundationDb, + temp_path: &Path, + joined: &str, +) -> Result<(), std::io::Error> { + let connection = format!( + "{cluster_description}:{cluster_id}@{joined}", + cluster_description = fdb_config.cluster_description, + cluster_id = fdb_config.cluster_id, + ); + + fs::write(temp_path, connection.as_bytes()).await?; + + Ok(()) +} diff --git a/packages/edge/infra/client/manager/src/utils/mod.rs b/packages/edge/infra/client/manager/src/utils/mod.rs index 46d53231d1..3c14b44cbb 100644 --- a/packages/edge/infra/client/manager/src/utils/mod.rs +++ b/packages/edge/infra/client/manager/src/utils/mod.rs @@ -25,6 +25,7 @@ use tokio::{ sync::mpsc::{channel, Receiver}, }; +pub mod fdb; pub mod libc; pub mod sql; diff --git a/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql b/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql index 580e7beaea..06607730a9 100644 --- a/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql +++ b/packages/edge/services/pegboard/db/runner/migrations/20200101000000_init.up.sql @@ -4,15 +4,13 @@ CREATE TABLE IF NOT EXISTS actor_runners ( runner_id UUID, started_at DateTime64 (9), finished_at DateTime64 (9) -) ENGINE = ReplicatedMergeTree () +) ENGINE = ReplicatedReplacingMergeTree () PARTITION BY toStartOfHour (started_at) ORDER BY ( actor_id, generation, - runner_id, - toUnixTimestamp (started_at), - toUnixTimestamp (finished_at) + runner_id ) TTL toDate (started_at + toIntervalDay (30)) SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; diff --git a/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs b/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs index aa573d591a..fa08711755 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/destroy.rs @@ -126,12 +126,12 @@ async fn update_db( ) -> GlobalResult> { let pool = ctx.sqlite().await?; + // NOTE: Row might not exist if the workflow failed before insert_db sql_fetch_optional!( [ctx, UpdateDbOutput, pool] " UPDATE state SET destroy_ts = ? - WHERE destroy_ts IS NULL RETURNING env_id, selected_resources_memory_mib, diff --git a/packages/edge/services/pegboard/src/workflows/actor2/migrations.rs b/packages/edge/services/pegboard/src/workflows/actor2/migrations.rs index 2923d488e4..ede6f53156 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/migrations.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/migrations.rs @@ -43,6 +43,7 @@ async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalRes lifecycle_durable INT NOT NULL DEFAULT false, -- BOOLEAN create_ts INT NOT NULL, + pending_allocation_ts INT, -- Set if currently pending alloc start_ts INT, connectable_ts INT, finish_ts INT, diff --git a/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs index 85e5c4e564..f55b9235ae 100644 --- a/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor2/runtime.rs @@ -97,6 +97,7 @@ async fn update_client_and_runner( " UPDATE state SET + pending_allocation_ts = NULL, client_id = ?1, client_workflow_id = ?2, client_wan_hostname = ?3, diff --git a/packages/edge/services/pegboard/src/workflows/client/mod.rs b/packages/edge/services/pegboard/src/workflows/client/mod.rs index 980274ce64..aeff8090c2 100644 --- a/packages/edge/services/pegboard/src/workflows/client/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/client/mod.rs @@ -899,31 +899,49 @@ struct FetchRemainingActorsInput { async fn fetch_remaining_actors( ctx: &ActivityCtx, input: &FetchRemainingActorsInput, -) -> GlobalResult> { +) -> GlobalResult> { let actor_ids = ctx .fdb() .await? .run(|tx, _mc| async move { + let actor2_subspace = + keys::subspace().subspace(&keys::client::Actor2Key::subspace(input.client_id)); let actor_subspace = keys::subspace().subspace(&keys::client::ActorKey::subspace(input.client_id)); tx.get_ranges_keyvalues( fdb::RangeOption { mode: StreamingMode::WantAll, - ..(&actor_subspace).into() + ..(&actor2_subspace).into() }, SERIALIZABLE, ) + .chain(tx.get_ranges_keyvalues( + fdb::RangeOption { + mode: StreamingMode::WantAll, + ..(&actor_subspace).into() + }, + SERIALIZABLE, + )) .map(|res| match res { Ok(entry) => { - let key = keys::subspace() - .unpack::(entry.key()) - .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; - let generation = key - .deserialize(entry.value()) - .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; - - Ok((key.actor_id, generation)) + if let Ok(key) = keys::subspace() + .unpack::(entry.key()) { + let generation = key + .deserialize(entry.value()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + + Ok((key.actor_id, generation)) + } else { + let key = keys::subspace() + .unpack::(entry.key()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + let generation = key + .deserialize(entry.value()) + .map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?; + + Ok((key.actor_id.into(), generation)) + } } Err(err) => Err(Into::::into(err)), })