From a94aa4a9fde2dd38119da0aaa72f47199d64f71d Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 12 Jun 2025 01:36:17 +0000 Subject: [PATCH] feat(pegboard): add local image cache and overlay fs mounts --- examples/system-test-actor/rivet.jsonc | 11 +- .../core/src/db/fdb_sqlite_nats/debug.rs | 6 +- .../core/src/db/fdb_sqlite_nats/mod.rs | 5 +- packages/common/pools/src/db/sqlite/mod.rs | 3 +- packages/core/api/actor/src/route/builds.rs | 17 +- .../files/pegboard_configure.sh | 2 + .../files/rivet_guard_configure.sh | 2 + .../files/rivet_worker_configure.sh | 2 + packages/edge/api/intercom/Cargo.toml | 3 + packages/edge/api/intercom/src/route/mod.rs | 2 +- .../edge/api/intercom/src/route/pegboard.rs | 78 ++- .../edge/infra/client/config/src/manager.rs | 7 + .../client/container-runner/src/container.rs | 2 +- packages/edge/infra/client/echo/src/main.rs | 9 +- .../client/isolate-v8-runner/src/isolate.rs | 5 +- packages/edge/infra/client/manager/Cargo.toml | 1 + .../infra/client/manager/src/actor/mod.rs | 57 +- .../client/manager/src/actor/oci_config.rs | 4 +- .../infra/client/manager/src/actor/setup.rs | 340 ++++------- packages/edge/infra/client/manager/src/ctx.rs | 124 +++- .../manager/src/image_download_handler.rs | 537 ++++++++++++++++++ packages/edge/infra/client/manager/src/lib.rs | 2 + .../edge/infra/client/manager/src/main.rs | 1 + .../client/manager/src/metrics/buckets.rs | 5 + .../infra/client/manager/src/metrics/mod.rs | 77 ++- .../client/manager/src/pull_addr_handler.rs | 1 + .../edge/infra/client/manager/src/runner.rs | 13 + .../infra/client/manager/src/utils/libc.rs | 49 ++ .../infra/client/manager/src/utils/mod.rs | 189 +++--- .../manager/tests/client_rebuild_state.rs | 12 +- .../tests/client_state_external_kill.rs | 8 +- .../edge/infra/client/manager/tests/common.rs | 22 +- .../manager/tests/container_external_kill.rs | 6 +- .../manager/tests/container_lifecycle.rs | 28 +- .../client/manager/tests/isolate_lifecycle.rs | 11 +- .../infra/client/manager/tests/vector.json | 21 +- packages/edge/infra/guard/core/src/metrics.rs | 4 +- .../infra/guard/core/src/proxy_service.rs | 55 +- packages/edge/infra/guard/core/src/server.rs | 61 +- .../edge/services/pegboard/src/protocol.rs | 22 +- .../pegboard/src/workflows/actor/runtime.rs | 28 +- .../pegboard/src/workflows/client/mod.rs | 16 +- .../usage-metrics-publish/Cargo.toml | 5 +- .../usage-metrics-publish/src/lib.rs | 11 + .../core-intercom/pegboard/__package__.yml | 2 +- .../edge-intercom/pegboard/__package__.yml | 5 +- .../full/go/edgeintercom/pegboard/pegboard.go | 2 - sdks/api/full/openapi/openapi.yml | 6 +- sdks/api/full/openapi_compat/openapi.yml | 6 +- sdks/api/full/rust/.openapi-generator/FILES | 2 - sdks/api/full/rust/README.md | 1 - .../full/rust/docs/EdgeIntercomPegboardApi.md | 4 +- ...EdgeIntercomPegboardPrewarmImageRequest.md | 11 - .../src/apis/edge_intercom_pegboard_api.rs | 5 +- ...intercom_pegboard_prewarm_image_request.rs | 23 - sdks/api/full/rust/src/models/mod.rs | 2 - .../resources/pegboard/client/Client.ts | 4 +- .../pegboard/types/PrewarmImageRequest.ts | 4 +- .../pegboard/types/PrewarmImageRequest.ts | 8 +- tests/load/actor-lifecycle/actor.ts | 10 +- tests/load/actor-lifecycle/index.ts | 24 +- 61 files changed, 1361 insertions(+), 622 deletions(-) create mode 100644 packages/edge/infra/client/manager/src/image_download_handler.rs create mode 100644 packages/edge/infra/client/manager/src/utils/libc.rs delete mode 100644 sdks/api/full/rust/docs/EdgeIntercomPegboardPrewarmImageRequest.md delete mode 100644 sdks/api/full/rust/src/models/edge_intercom_pegboard_prewarm_image_request.rs diff --git a/examples/system-test-actor/rivet.jsonc b/examples/system-test-actor/rivet.jsonc index 518758c55c..e828151bb7 100644 --- a/examples/system-test-actor/rivet.jsonc +++ b/examples/system-test-actor/rivet.jsonc @@ -1,10 +1,13 @@ { "builds": { - "ws-isolate": { - "script": "src/isolate/main.ts" - }, + // "ws-isolate": { + // "script": "src/isolate/main.ts" + // }, "ws-container": { - "dockerfile": "Dockerfile" + "dockerfile": "Dockerfile", + // "unstable": { + // "compression": "none" + // } } } } diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs index ac18ab4a37..e0848a2ee4 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/debug.rs @@ -15,11 +15,7 @@ use std::{ use tracing::Instrument; use uuid::Uuid; -use super::{ - keys, - sqlite::SqlStub, - DatabaseFdbSqliteNats, -}; +use super::{keys, sqlite::SqlStub, DatabaseFdbSqliteNats}; use crate::{ db::debug::{ ActivityError, ActivityEvent, DatabaseDebug, Event, EventData, HistoryData, LoopEvent, diff --git a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs index 2ca270169a..5c3d93f275 100644 --- a/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs +++ b/packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs @@ -1277,7 +1277,10 @@ impl Database for DatabaseFdbSqliteNats { async move { let pool = &self .pools - .sqlite(crate::db::sqlite_db_name_internal(partial.workflow_id), false) + .sqlite( + crate::db::sqlite_db_name_internal(partial.workflow_id), + false, + ) .await?; // Handle error during sqlite init diff --git a/packages/common/pools/src/db/sqlite/mod.rs b/packages/common/pools/src/db/sqlite/mod.rs index 3ef6d2d461..a8fbb34a18 100644 --- a/packages/common/pools/src/db/sqlite/mod.rs +++ b/packages/common/pools/src/db/sqlite/mod.rs @@ -907,8 +907,7 @@ impl SqlitePoolInner { #[tracing::instrument(name = "sqlite_pool_evict", skip_all)] pub async fn evict(&self) -> GlobalResult<()> { - self - .manager + self.manager .evict_with_key(&[self.key_packed.clone()]) .await } diff --git a/packages/core/api/actor/src/route/builds.rs b/packages/core/api/actor/src/route/builds.rs index 271fbea2e7..f31a578a56 100644 --- a/packages/core/api/actor/src/route/builds.rs +++ b/packages/core/api/actor/src/route/builds.rs @@ -520,13 +520,7 @@ pub async fn complete_build( edge_intercom_pegboard_prewarm_image( &config, &build_id.to_string(), - models::EdgeIntercomPegboardPrewarmImageRequest { - image_artifact_url_stub: pegboard::util::image_artifact_url_stub( - ctx.config(), - build.upload_id, - &build::utils::file_name(build.kind, build.compression), - )?, - }, + json!({}), ) .await .map_err(Into::::into) @@ -542,10 +536,11 @@ pub async fn complete_build( } } - // Error only if all prewarm requests failed - if !results.is_empty() && results.iter().all(|res| res.is_err()) { - return Err(unwrap!(unwrap!(results.into_iter().next()).err())); - } + // TODO: Disabled until deploy + // // Error only if all prewarm requests failed + // if !results.is_empty() && results.iter().all(|res| res.is_err()) { + // return Err(unwrap!(unwrap!(results.into_iter().next()).err())); + // } } Ok(json!({})) diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/pegboard_configure.sh b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/pegboard_configure.sh index 00959c8351..28e4df9a09 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/pegboard_configure.sh +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/pegboard_configure.sh @@ -353,6 +353,8 @@ ExecStart=/usr/local/bin/rivet-client -c /etc/rivet-client/config.json Restart=always RestartSec=2 +# High scheduling priority +Nice=-15 # Real time service CPUSchedulingPolicy=fifo # High CPU priority diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_guard_configure.sh b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_guard_configure.sh index 56b4d5ae1f..39037fd2f6 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_guard_configure.sh +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_guard_configure.sh @@ -26,6 +26,8 @@ ExecStart=/usr/local/bin/rivet-guard Restart=always RestartSec=2 +# High scheduling priority +Nice=-15 # Real time service CPUSchedulingPolicy=fifo # High CPU priority diff --git a/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_worker_configure.sh b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_worker_configure.sh index 46042eb9ef..e926940e66 100644 --- a/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_worker_configure.sh +++ b/packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_worker_configure.sh @@ -28,6 +28,8 @@ ExecStart=/usr/local/bin/rivet-edge-server start --skip-provision Restart=always RestartSec=2 +# High scheduling priority +Nice=-15 # Real time service CPUSchedulingPolicy=fifo # High CPU priority diff --git a/packages/edge/api/intercom/Cargo.toml b/packages/edge/api/intercom/Cargo.toml index 3c88ca3249..f3965ecfb3 100644 --- a/packages/edge/api/intercom/Cargo.toml +++ b/packages/edge/api/intercom/Cargo.toml @@ -26,6 +26,7 @@ rivet-env.workspace = true rivet-health-checks.workspace = true rivet-operation.workspace = true rivet-pools.workspace = true +s3-util.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" @@ -35,4 +36,6 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["f url = "2.2.2" uuid = { version = "1", features = ["v4"] } +build.workspace = true +cluster.workspace = true pegboard.workspace = true diff --git a/packages/edge/api/intercom/src/route/mod.rs b/packages/edge/api/intercom/src/route/mod.rs index 89e6a761ab..a3291eee86 100644 --- a/packages/edge/api/intercom/src/route/mod.rs +++ b/packages/edge/api/intercom/src/route/mod.rs @@ -12,7 +12,7 @@ define_router! { POST: pegboard::prewarm_image( internal_endpoint: true, opt_auth: true, - body: models::EdgeIntercomPegboardPrewarmImageRequest, + body: serde_json::Value, ), }, diff --git a/packages/edge/api/intercom/src/route/pegboard.rs b/packages/edge/api/intercom/src/route/pegboard.rs index 60e0b0b67e..f6dc8fd9aa 100644 --- a/packages/edge/api/intercom/src/route/pegboard.rs +++ b/packages/edge/api/intercom/src/route/pegboard.rs @@ -1,8 +1,10 @@ use api_helper::ctx::Ctx; use chirp_workflow::prelude::*; +use cluster::types::BuildDeliveryMethod; use fdb_util::SERIALIZABLE; use foundationdb::{self as fdb, options::StreamingMode}; use futures_util::TryStreamExt; +use pegboard::protocol; use rivet_api::models; use serde_json::json; @@ -12,13 +14,10 @@ use crate::auth::Auth; pub async fn prewarm_image( ctx: Ctx, image_id: Uuid, - body: models::EdgeIntercomPegboardPrewarmImageRequest, + body: serde_json::Value, ) -> GlobalResult { ctx.auth().bypass()?; - // TODO: If we replicate the algorithm for choosing the correct ATS node from the pb manager, we can - // remove prewarming from the pb protocol entirely and just prewarm the image here since this api service - // is in the same dc let client_id = ctx .fdb() .await? @@ -59,10 +58,35 @@ pub async fn prewarm_image( return Ok(json!({})); }; + let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id; + let (dc_res, builds_res) = tokio::try_join!( + ctx.op(cluster::ops::datacenter::get::Input { + datacenter_ids: vec![dc_id], + }), + ctx.op(build::ops::get::Input { + build_ids: vec![image_id], + }), + )?; + + let dc = unwrap!(dc_res.datacenters.first()); + let build = unwrap!(builds_res.builds.first()); + + let fallback_artifact_url = + resolve_image_fallback_artifact_url(&ctx, dc.build_delivery_method, &build).await?; + let res = ctx - .signal(pegboard::workflows::client::PrewarmImage { - image_id, - image_artifact_url_stub: body.image_artifact_url_stub, + .signal(pegboard::workflows::client::PrewarmImage2 { + image: protocol::Image { + id: image_id, + artifact_url_stub: pegboard::util::image_artifact_url_stub( + ctx.config(), + build.upload_id, + &build::utils::file_name(build.kind, build.compression), + )?, + fallback_artifact_url, + kind: build.kind.into(), + compression: build.compression.into(), + }, }) .to_workflow::() .tag("client_id", client_id) @@ -131,3 +155,43 @@ pub async fn toggle_drain_client( Ok(json!({})) } + +async fn resolve_image_fallback_artifact_url( + ctx: &Ctx, + dc_build_delivery_method: BuildDeliveryMethod, + build: &build::types::Build, +) -> GlobalResult> { + if let BuildDeliveryMethod::S3Direct = dc_build_delivery_method { + tracing::debug!("using s3 direct delivery"); + + // Build client + let s3_client = s3_util::Client::with_bucket_and_endpoint( + ctx.config(), + "bucket-build", + s3_util::EndpointKind::EdgeInternal, + ) + .await?; + + let presigned_req = s3_client + .get_object() + .bucket(s3_client.bucket()) + .key(format!( + "{upload_id}/{file_name}", + upload_id = build.upload_id, + file_name = build::utils::file_name(build.kind, build.compression), + )) + .presigned( + s3_util::aws_sdk_s3::presigning::PresigningConfig::builder() + .expires_in(std::time::Duration::from_secs(15 * 60)) + .build()?, + ) + .await?; + + let addr_str = presigned_req.uri().to_string(); + tracing::debug!(addr = %addr_str, "resolved artifact s3 presigned request"); + + Ok(Some(addr_str)) + } else { + Ok(None) + } +} diff --git a/packages/edge/infra/client/config/src/manager.rs b/packages/edge/infra/client/config/src/manager.rs index 3ec7c62d54..99b8ea23b8 100644 --- a/packages/edge/infra/client/config/src/manager.rs +++ b/packages/edge/infra/client/config/src/manager.rs @@ -120,6 +120,8 @@ impl Runner { #[serde(rename_all = "snake_case", deny_unknown_fields)] pub struct Images { pub pull_addresses: Option, + /// Bytes. Defaults to 64 GiB. + pub max_cache_size: Option, } impl Images { @@ -129,6 +131,11 @@ impl Images { .map(Cow::Borrowed) .unwrap_or_else(|| Cow::Owned(Addresses::Static(Vec::new()))) } + + pub fn max_cache_size(&self) -> u64 { + // 64 GiB + self.max_cache_size.unwrap_or(1024 * 1024 * 1024 * 64) + } } #[derive(Clone, Deserialize, JsonSchema)] diff --git a/packages/edge/infra/client/container-runner/src/container.rs b/packages/edge/infra/client/container-runner/src/container.rs index 0168d2083e..91e42d3150 100644 --- a/packages/edge/infra/client/container-runner/src/container.rs +++ b/packages/edge/infra/client/container-runner/src/container.rs @@ -32,7 +32,7 @@ pub fn run( .context("empty `actor_path`")? .to_string_lossy() .to_string(); - let fs_path = actor_path.join("fs"); + let fs_path = actor_path.join("fs").join("upper"); let oci_bundle_config_json = fs_path.join("config.json"); // Validate OCI bundle diff --git a/packages/edge/infra/client/echo/src/main.rs b/packages/edge/infra/client/echo/src/main.rs index 472216da96..6fe6a71f2b 100644 --- a/packages/edge/infra/client/echo/src/main.rs +++ b/packages/edge/infra/client/echo/src/main.rs @@ -2,9 +2,12 @@ use tiny_http::{Response, Server, StatusCode}; // TODO: This can't pick up SIGTERM fn main() { - let port = std::env::var("PORT") - .ok() - .unwrap_or_else(|| "8080".to_string()); + println!("Env:"); + for (key, value) in std::env::vars() { + println!(" {}: {}", key, value); + } + + let port = std::env::var("PORT_MAIN").expect("no PORT_MAIN"); let addr = format!("0.0.0.0:{port}"); let server = Server::http(&addr).unwrap(); println!("Listening on {addr}"); diff --git a/packages/edge/infra/client/isolate-v8-runner/src/isolate.rs b/packages/edge/infra/client/isolate-v8-runner/src/isolate.rs index 37e727ed1c..777fb28503 100644 --- a/packages/edge/infra/client/isolate-v8-runner/src/isolate.rs +++ b/packages/edge/infra/client/isolate-v8-runner/src/isolate.rs @@ -153,7 +153,7 @@ pub async fn run_inner( tracing::info!(?actor_id, ?generation, "isolate kv initialized"); // Should match the path from `Actor::make_fs` in manager/src/actor/setup.rs - let index = actor_path.join("fs").join("index.js"); + let index = actor_path.join("fs").join("upper").join("index.js"); // Load index.js let index_script_content = match fs::read_to_string(&index).await { @@ -560,7 +560,8 @@ mod tests { let fs_path = actors_path .join(format!("{actor_id}-{generation}")) - .join("fs"); + .join("fs") + .join("upper"); std::fs::create_dir_all(&fs_path)?; std::fs::copy( diff --git a/packages/edge/infra/client/manager/Cargo.toml b/packages/edge/infra/client/manager/Cargo.toml index e23ac05b56..7607d187d1 100644 --- a/packages/edge/infra/client/manager/Cargo.toml +++ b/packages/edge/infra/client/manager/Cargo.toml @@ -30,6 +30,7 @@ rand = "0.8" rand_chacha = "0.3.1" reqwest = { version = "0.12", default-features = false, features = ["stream", "rustls-tls", "json"] } rivet-logs.workspace = true +scc = "2.3.4" serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" serde_yaml = "0.9.34" diff --git a/packages/edge/infra/client/manager/src/actor/mod.rs b/packages/edge/infra/client/manager/src/actor/mod.rs index 3122469024..e894ddc45e 100644 --- a/packages/edge/infra/client/manager/src/actor/mod.rs +++ b/packages/edge/infra/client/manager/src/actor/mod.rs @@ -1,7 +1,7 @@ use std::{ result::Result::{Err, Ok}, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use anyhow::*; @@ -9,6 +9,7 @@ use indoc::indoc; use nix::sys::signal::Signal; use pegboard::protocol; use pegboard_config::runner_protocol; +use sqlx::Acquire; use tokio::{fs, sync::Mutex}; use uuid::Uuid; @@ -75,9 +76,10 @@ impl Actor { actor_id, generation, config, - start_ts + start_ts, + image_id ) - VALUES (?1, ?2, ?3, ?4) + VALUES (?1, ?2, ?3, ?4, ?5) ON CONFLICT (actor_id, generation) DO NOTHING ", )) @@ -85,6 +87,7 @@ impl Actor { .bind(self.generation as i64) .bind(&config_json) .bind(utils::now()) + .bind(self.config.image.id) .execute(&mut *ctx.sql().await?) .await }) @@ -128,7 +131,7 @@ impl Actor { self: &Arc, ctx: &Arc, ) -> Result> { - let setup_timer = std::time::Instant::now(); + let setup_timer = Instant::now(); tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "setting up actor"); let actor_path = ctx.actor_path(self.actor_id, self.generation); @@ -146,19 +149,13 @@ impl Actor { protocol::ImageKind::DockerImage | protocol::ImageKind::OciBundle ) && matches!(self.config.network_mode, protocol::NetworkMode::Bridge); - // Parallelize two independent jobs: - // - // - `download_image` takes a long time to download. `download_image` is dependent on - // `make_fs` - // - `setup_cni_network` takes a long time. `setup_cni_network` is dependent on - // `bind_ports`. tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "starting parallel setup tasks"); - let parallel_timer = std::time::Instant::now(); + let parallel_timer = Instant::now(); let (_, ports) = tokio::try_join!( async { - self.make_fs(&ctx).await?; self.download_image(&ctx).await?; + self.make_fs(&ctx).await?; Result::<(), anyhow::Error>::Ok(()) }, async { @@ -205,7 +202,12 @@ impl Actor { let mut runner_env = vec![ ( "ROOT_USER_ENABLED", - if self.config.root_user_enabled { "1" } else { "0" }.to_string(), + if self.config.root_user_enabled { + "1" + } else { + "0" + } + .to_string(), ), ("ACTOR_ID", self.actor_id.to_string()), ]; @@ -443,7 +445,10 @@ impl Actor { // Update stop_ts if matches!(signal, Signal::SIGTERM | Signal::SIGKILL) || !has_runner { let stop_ts_set = utils::sql::query(|| async { - sqlx::query_as::<_, (bool,)>(indoc!( + let mut conn = ctx.sql().await?; + let mut tx = conn.begin().await?; + + let res = sqlx::query_as::<_, (bool,)>(indoc!( " UPDATE actors SET stop_ts = ?3 @@ -457,11 +462,27 @@ impl Actor { .bind(self.actor_id) .bind(self.generation as i64) .bind(utils::now()) - .fetch_optional(&mut *ctx.sql().await?) - .await + .fetch_optional(&mut *tx) + .await?; + + // Update LRU cache + sqlx::query(indoc!( + " + UPDATE images_cache + SET last_used_ts = ?2 + WHERE image_id = ?1 + ", + )) + .bind(self.config.image.id) + .bind(utils::now()) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(res.is_some()) }) - .await? - .is_some(); + .await?; // Emit event if not stopped before if stop_ts_set { diff --git a/packages/edge/infra/client/manager/src/actor/oci_config.rs b/packages/edge/infra/client/manager/src/actor/oci_config.rs index 3ff88870f7..5e4b63fcab 100644 --- a/packages/edge/infra/client/manager/src/actor/oci_config.rs +++ b/packages/edge/infra/client/manager/src/actor/oci_config.rs @@ -25,9 +25,7 @@ pub fn config(opts: ConfigOpts) -> Result { // // Corresponds to cpu.weight in cgroups. Must be [1, 10_000] // - // We divide by 8 in order to make sure the CPU shares are within bounds. `cpu` is measured in - // millishares, so 1_000 = 1 core. For a range of 32d1 (32_000) to 1d16 (62), we divide by 8 - // to make the range 3_200 to 6. + // We divide by 10 in order to make sure the CPU shares are within bounds. let mut cpu_shares = opts.cpu / 10; if cpu_shares > 10_000 { cpu_shares = 10_000; diff --git a/packages/edge/infra/client/manager/src/actor/setup.rs b/packages/edge/infra/client/manager/src/actor/setup.rs index d5b72b5f50..c1f8b3545b 100644 --- a/packages/edge/infra/client/manager/src/actor/setup.rs +++ b/packages/edge/infra/client/manager/src/actor/setup.rs @@ -2,6 +2,7 @@ use std::{ collections::HashMap, path::{Path, PathBuf}, result::Result::{Err, Ok}, + time::Instant, }; use anyhow::*; @@ -21,10 +22,14 @@ use crate::{ctx::Ctx, utils}; impl Actor { pub async fn make_fs(&self, ctx: &Ctx) -> Result<()> { - let timer = std::time::Instant::now(); + let timer = Instant::now(); let actor_path = ctx.actor_path(self.actor_id, self.generation); + let fs_img_path = actor_path.join("fs.img"); let fs_path = actor_path.join("fs"); + let fs_upper_path = fs_path.join("upper"); + let fs_work_path = fs_path.join("work"); + let image_path = ctx.image_path(self.config.image.id); tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "creating fs"); @@ -40,11 +45,16 @@ impl Actor { .context("failed to create disk image")?; fs_img .set_len(self.config.resources.disk as u64 * 1024 * 1024) - .await?; + .await + .context("failed to set disk image length")?; tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "formatting disk image"); // Format file as ext4 - let cmd_out = Command::new("mkfs.ext4").arg(&fs_img_path).output().await?; + let cmd_out = Command::new("mkfs.ext4") + .arg(&fs_img_path) + .output() + .await + .context("failed to run `mkfs.ext4`")?; ensure!( cmd_out.status.success(), @@ -53,6 +63,7 @@ impl Actor { ); tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "mounting disk image"); + // Mount fs img as loop mount let cmd_out = Command::new("mount") .arg("-o") @@ -60,172 +71,94 @@ impl Actor { .arg(&fs_img_path) .arg(&fs_path) .output() - .await?; + .await + .context("failed to run `mount`")?; ensure!( cmd_out.status.success(), "failed `mount` command\n{}", std::str::from_utf8(&cmd_out.stderr)? ); - } - let duration = timer.elapsed().as_secs_f64(); - crate::metrics::SETUP_MAKE_FS_DURATION.observe(duration); - tracing::info!(actor_id=?self.actor_id, generation=?self.generation, duration_seconds=duration, "fs creation completed"); + // Create folders on disk + fs::create_dir(&fs_upper_path) + .await + .context("failed to create actor fs upper dir")?; + fs::create_dir(&fs_work_path) + .await + .context("failed to create actor fs work dir")?; - Ok(()) - } + tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "mounting overlay"); - pub async fn download_image(&self, ctx: &Ctx) -> Result<()> { - let timer = std::time::Instant::now(); - tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "downloading artifact"); + ensure!( + fs::metadata(&image_path).await.is_ok(), + "image dir does not exist" + ); - let actor_path = ctx.actor_path(self.actor_id, self.generation); - let fs_path = actor_path.join("fs"); + // Overlay mount setup: + // lowerdir = extracted build in manager's builds cache folder + // upperdir = {actor dir}/fs/upper folder + // workdir = {actor dir}/fs/work folder + // merged dir = also fs/upper folder, it mounts to the upperdir + let cmd_out = Command::new("mount") + .arg("-t") + .arg("overlay") + // Arbitrary device name + .arg(format!("{}-{}", self.actor_id, self.generation)) + .arg("-o") + .arg(format!( + "lowerdir={},upperdir={},workdir={}", + image_path.display(), + fs_upper_path.display(), + fs_work_path.display() + )) + .arg(&fs_upper_path) + .output() + .await + .context("failed to run overlay `mount`")?; - // Get addresses using the shared utility function - let addresses = crate::utils::get_image_addresses( - ctx, - self.config.image.id, - &self.config.image.artifact_url_stub, - self.config.image.fallback_artifact_url.as_deref(), - ) - .await?; + ensure!( + cmd_out.status.success(), + "failed overlay `mount` command\n{}", + std::str::from_utf8(&cmd_out.stderr)? + ); + } else { + // Create folder on host + fs::create_dir(&fs_upper_path) + .await + .context("failed to create actor fs upper dir")?; + + tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "copying image contents to fs"); + + // Copy everything from the image (lowerdir) to the upperdir + utils::copy_dir_all(image_path, &fs_upper_path) + .await + .context("failed to copy image contents to fs upper dir")?; + } - // Log the URLs we're attempting to download from + let duration = timer.elapsed().as_secs_f64(); + crate::metrics::SETUP_MAKE_FS_DURATION.observe(duration); tracing::info!( actor_id=?self.actor_id, generation=?self.generation, - image_id=?self.config.image.id, - addresses=?addresses, - "initiating image download" + duration_seconds=duration, + "fs creation completed", ); - // Try each URL until one succeeds - let mut last_error = None; - for url in &addresses { - tracing::info!(actor_id=?self.actor_id, generation=?self.generation, ?url, "attempting download"); - - // Build the shell command based on image kind and compression - // Using shell commands with native Unix pipes improves performance by: - // 1. Reducing overhead of passing data through Rust - // 2. Letting the OS handle data transfer between processes efficiently - // 3. Avoiding unnecessary buffer copies in memory - let shell_cmd = match (self.config.image.kind, self.config.image.compression) { - // Docker image, no compression - (protocol::ImageKind::DockerImage, protocol::ImageCompression::None) => { - let docker_image_path = fs_path.join("docker-image.tar"); - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "downloading uncompressed docker image using curl" - ); - - // Use curl to download directly to file - format!("curl -sSfL '{}' -o '{}'", url, docker_image_path.display()) - } - - // Docker image with LZ4 compression - (protocol::ImageKind::DockerImage, protocol::ImageCompression::Lz4) => { - let docker_image_path = fs_path.join("docker-image.tar"); - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "downloading and decompressing docker image using curl | lz4" - ); - - // Use curl piped to lz4 for decompression - format!( - "curl -sSfL '{}' | lz4 -d - '{}'", - url, - docker_image_path.display() - ) - } - - // OCI Bundle or JavaScript with no compression - ( - protocol::ImageKind::OciBundle | protocol::ImageKind::JavaScript, - protocol::ImageCompression::None, - ) => { - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "downloading and unarchiving uncompressed artifact using curl | tar" - ); - - // Use curl piped to tar for extraction - format!("curl -sSfL '{}' | tar -x -C '{}'", url, fs_path.display()) - } - - // OCI Bundle or JavaScript with LZ4 compression - ( - protocol::ImageKind::OciBundle | protocol::ImageKind::JavaScript, - protocol::ImageCompression::Lz4, - ) => { - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "downloading, decompressing, and unarchiving artifact using curl | lz4 | tar" - ); - - // Use curl piped to lz4 for decompression, then to tar for extraction - format!( - "curl -sSfL '{}' | lz4 -d | tar -x -C '{}'", - url, - fs_path.display() - ) - } - }; - - // Execute the shell command - // Use curl's built-in error handling to fail silently and let us try the next URL - let cmd_result = Command::new("sh").arg("-c").arg(&shell_cmd).output().await; + Ok(()) + } - match cmd_result { - Ok(output) if output.status.success() => { - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - ?url, - "successfully downloaded image" - ); + pub async fn download_image(&self, ctx: &Ctx) -> Result<()> { + let timer = Instant::now(); - let duration = timer.elapsed().as_secs_f64(); - crate::metrics::SETUP_DOWNLOAD_IMAGE_DURATION.observe(duration); - tracing::info!(actor_id=?self.actor_id, generation=?self.generation, duration_seconds=duration, "artifact download completed"); + ctx.image_download_handler + .download(ctx, &self.config.image) + .await?; - return Ok(()); - } - Ok(output) => { - // Command ran but failed - let stderr = String::from_utf8_lossy(&output.stderr); - tracing::warn!( - actor_id=?self.actor_id, - generation=?self.generation, - ?url, - status=?output.status, - stderr=%stderr, - "failed to download image" - ); - last_error = Some(anyhow!("download failed: {}", stderr)); - } - Err(e) => { - // Failed to execute command - tracing::warn!( - actor_id=?self.actor_id, - generation=?self.generation, - ?url, - error=?e, - "failed to execute download command" - ); - last_error = Some(anyhow!("download command failed: {}", e)); - } - } - } + let duration = timer.elapsed().as_secs_f64(); + crate::metrics::SETUP_DOWNLOAD_IMAGE_DURATION.observe(duration); - // If we get here, all URLs failed - Err(last_error - .unwrap_or_else(|| anyhow!("failed to download image from any available URL"))) + Ok(()) } pub async fn setup_oci_bundle( @@ -233,81 +166,13 @@ impl Actor { ctx: &Ctx, ports: &protocol::HashableMap, ) -> Result<()> { - let timer = std::time::Instant::now(); + let timer = Instant::now(); tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "setting up oci bundle"); let actor_path = ctx.actor_path(self.actor_id, self.generation); - let fs_path = actor_path.join("fs"); + let fs_path = actor_path.join("fs").join("upper"); let netns_path = self.netns_path(); - // We need to convert the Docker image to an OCI bundle in order to run it. - // Allows us to work with the build with umoci - if let protocol::ImageKind::DockerImage = self.config.image.kind { - let docker_image_path = fs_path.join("docker-image.tar"); - let oci_image_path = fs_path.join("oci-image"); - - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "converting Docker image -> OCI image", - ); - let conversion_start = std::time::Instant::now(); - let cmd_out = Command::new("skopeo") - .arg("copy") - .arg(format!("docker-archive:{}", docker_image_path.display())) - .arg(format!("oci:{}:default", oci_image_path.display())) - .output() - .await?; - ensure!( - cmd_out.status.success(), - "failed `skopeo` command\n{}", - std::str::from_utf8(&cmd_out.stderr)? - ); - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - duration_seconds=conversion_start.elapsed().as_secs_f64(), - "docker to OCI conversion completed", - ); - - // Allows us to run the bundle natively with runc - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "converting OCI image -> OCI bundle", - ); - let unpack_start = std::time::Instant::now(); - let cmd_out = Command::new("umoci") - .arg("unpack") - .arg("--image") - .arg(format!("{}:default", oci_image_path.display())) - .arg(&fs_path) - .output() - .await?; - ensure!( - cmd_out.status.success(), - "failed `umoci` command\n{}", - std::str::from_utf8(&cmd_out.stderr)? - ); - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - duration_seconds=unpack_start.elapsed().as_secs_f64(), - "OCI image unpacking completed", - ); - - // Remove artifacts - tracing::info!( - actor_id=?self.actor_id, - generation=?self.generation, - "cleaning up temporary image artifacts", - ); - tokio::try_join!( - fs::remove_file(&docker_image_path), - fs::remove_dir_all(&oci_image_path), - )?; - } - // Read the config.json from the user-provided OCI bundle tracing::info!( actor_id=?self.actor_id, @@ -315,7 +180,9 @@ impl Actor { "reading OCI bundle configuration", ); let oci_bundle_config_path = fs_path.join("config.json"); - let user_config_json = fs::read_to_string(&oci_bundle_config_path).await?; + let user_config_json = fs::read_to_string(&oci_bundle_config_path) + .await + .context("failed to read oci config")?; let user_config = serde_json::from_str::(&user_config_json)?; @@ -410,7 +277,7 @@ impl Actor { ctx: &Ctx, ports: &protocol::HashableMap, ) -> Result<()> { - let timer = std::time::Instant::now(); + let timer = Instant::now(); tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "setting up isolate environment"); let actor_path = ctx.actor_path(self.actor_id, self.generation); @@ -471,7 +338,7 @@ impl Actor { ctx: &Ctx, ports: &protocol::HashableMap, ) -> Result<()> { - let timer = std::time::Instant::now(); + let timer = Instant::now(); tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "setting up cni network"); let actor_path = ctx.actor_path(self.actor_id, self.generation); @@ -569,7 +436,7 @@ impl Actor { &self, ctx: &Ctx, ) -> Result> { - let timer = std::time::Instant::now(); + let timer = Instant::now(); tracing::info!(actor_id=?self.actor_id, generation=?self.generation, "binding ports"); let (mut gg_ports, mut host_ports): (Vec<_>, Vec<_>) = self @@ -683,8 +550,35 @@ impl Actor { let actor_path = ctx.actor_path(self.actor_id, self.generation); let netns_path = self.netns_path(); - // Clean up fs mount + // Clean up fs mounts if ctx.config().runner.use_mounts() { + match Command::new("umount") + .arg("-dl") + .arg(actor_path.join("fs").join("upper")) + .output() + .await + { + Result::Ok(cmd_out) => { + if !cmd_out.status.success() { + tracing::error!( + actor_id=?self.actor_id, + generation=?self.generation, + stdout=%std::str::from_utf8(&cmd_out.stdout).unwrap_or(""), + stderr=%std::str::from_utf8(&cmd_out.stderr).unwrap_or(""), + "failed overlay `umount` command", + ); + } + } + Err(err) => { + tracing::error!( + actor_id=?self.actor_id, + generation=?self.generation, + ?err, + "failed to run overlay `umount` command", + ); + } + } + match Command::new("umount") .arg("-dl") .arg(actor_path.join("fs")) diff --git a/packages/edge/infra/client/manager/src/ctx.rs b/packages/edge/infra/client/manager/src/ctx.rs index f8e1d4a770..90c824be02 100644 --- a/packages/edge/infra/client/manager/src/ctx.rs +++ b/packages/edge/infra/client/manager/src/ctx.rs @@ -37,9 +37,8 @@ use uuid::Uuid; use crate::{ actor::Actor, event_sender::EventSender, - metrics, - pull_addr_handler::PullAddrHandler, - runner, + image_download_handler::ImageDownloadHandler, + metrics, runner, utils::{self, sql::SqlitePoolExt}, }; @@ -81,7 +80,7 @@ pub struct Ctx { pool: RwLock, tx: Mutex>, Message>>, event_sender: EventSender, - pub(crate) pull_addr_handler: PullAddrHandler, + pub(crate) image_download_handler: ImageDownloadHandler, pub(crate) actors: RwLock>>, isolate_runner: RwLock>, @@ -101,7 +100,7 @@ impl Ctx { pool: RwLock::new(pool), tx: Mutex::new(tx), event_sender: EventSender::new(), - pull_addr_handler: PullAddrHandler::new(), + image_download_handler: ImageDownloadHandler::new(), actors: RwLock::new(HashMap::new()), isolate_runner: RwLock::new(None), @@ -409,10 +408,7 @@ impl Ctx { self.process_command(command).await?; } } - protocol::ToClient::PrewarmImage { - image_id, - image_artifact_url_stub, - } => utils::prewarm_image(&self, image_id, &image_artifact_url_stub), + protocol::ToClient::PrewarmImage { image } => self.prewarm_image(image), } Ok(()) @@ -605,6 +601,30 @@ impl Ctx { } }); } + + fn prewarm_image(self: &Arc, image_config: protocol::Image) { + // Log full URL for prewarm operation + let prewarm_url = format!("{}/{}", image_config.artifact_url_stub, image_config.id); + tracing::info!(image_id=?image_config.id, %prewarm_url, "prewarming image"); + + let self2 = self.clone(); + tokio::spawn(async move { + match self2 + .image_download_handler + .download(&self2, &image_config) + .await + { + Ok(_) => { + tracing::info!(image_id=?image_config.id, %prewarm_url, "prewarm complete") + } + Err(_) => tracing::warn!( + image_id=?image_config.id, + %prewarm_url, + "prewarm failed, artifact url could not be resolved" + ), + } + }); + } } // MARK: State re-initialization @@ -809,6 +829,8 @@ impl Ctx { }) )?; + self.rebuild_images_cache().await?; + self.event_sender.set_idx(last_event_idx + 1); let isolate_runner = { self.isolate_runner.read().await.clone() }; @@ -899,6 +921,82 @@ impl Ctx { Ok(()) } + + /// Cleans up image cache entries that no longer have corresponding directories. + async fn rebuild_images_cache(&self) -> Result<()> { + let mut valid_image_ids = Vec::new(); + let mut entries = fs::read_dir(self.images_path()).await?; + + // Read all entries in the images directory + while let Some(entry) = entries.next_entry().await? { + if let Ok(file_type) = entry.file_type().await { + if file_type.is_dir() { + if let Some(name) = entry.file_name().to_str() { + if let Ok(image_id) = Uuid::parse_str(name) { + valid_image_ids.push(image_id); + } else { + tracing::warn!(path=%entry.path().display(), "invalid file name in image cache"); + } + } else { + tracing::warn!(path=%entry.path().display(), "invalid file name in image cache"); + } + } else { + tracing::warn!(path=%entry.path().display(), "unexpected file in image cache"); + } + } + } + + let mut conn = self.sql().await?; + let mut tx = conn.begin().await?; + + sqlx::query(indoc!( + " + CREATE TEMPORARY TABLE __valid_images ( + image_id BLOB PRIMARY KEY + ) + " + )) + .execute(&mut *tx) + .await?; + + // For each valid image ID, mark it for keeping in a temporary table + for image_id in valid_image_ids { + sqlx::query(indoc!( + " + INSERT OR IGNORE INTO __valid_images (image_id) VALUES (?) + " + )) + .bind(image_id) + .execute(&mut *tx) + .await?; + } + + // Delete entries that aren't in our valid images table + let deleted = sqlx::query(indoc!( + " + DELETE + FROM images_cache + WHERE image_id NOT IN ( + SELECT image_id FROM __valid_images + ) + " + )) + .execute(&mut *tx) + .await?; + + // Clean up the temporary table + sqlx::query("DROP TABLE __valid_images") + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + if deleted.rows_affected() > 0 { + tracing::info!(count=%deleted.rows_affected(), "cleaned up missing images"); + } + + Ok(()) + } } // MARK: Utils @@ -915,6 +1013,14 @@ impl Ctx { self.actors_path().join(format!("{actor_id}-{generation}")) } + pub fn images_path(&self) -> PathBuf { + self.config().data_dir().join("images") + } + + pub fn image_path(&self, image_id: Uuid) -> PathBuf { + self.images_path().join(image_id.to_string()) + } + pub fn isolate_runner_path(&self) -> PathBuf { self.config().data_dir().join("runner") } diff --git a/packages/edge/infra/client/manager/src/image_download_handler.rs b/packages/edge/infra/client/manager/src/image_download_handler.rs new file mode 100644 index 0000000000..0eda6e9fb3 --- /dev/null +++ b/packages/edge/infra/client/manager/src/image_download_handler.rs @@ -0,0 +1,537 @@ +use std::{ + hash::{DefaultHasher, Hasher}, + io::ErrorKind, + result::Result::Ok, + time::Instant, +}; + +use anyhow::*; +use indoc::indoc; +use pegboard::protocol; +use rand::{prelude::SliceRandom, SeedableRng}; +use rand_chacha::ChaCha12Rng; +use scc::hash_map::Entry; +use sqlx::Acquire; +use tokio::fs; +use tokio::process::Command; +use url::Url; +use uuid::Uuid; + +use crate::{metrics, pull_addr_handler::PullAddrHandler, utils, Ctx}; + +/// Handles downloading images by queuing downloads of the same image together and reading from cache if +/// it exists. +pub struct ImageDownloadHandler { + pull_addr_handler: PullAddrHandler, + // This is not a Set because it uses SCC's entry locking capability to function. + downloads: scc::HashMap, +} + +impl ImageDownloadHandler { + pub fn new() -> Self { + ImageDownloadHandler { + pull_addr_handler: PullAddrHandler::new(), + downloads: scc::HashMap::new(), + } + } + + pub async fn download(&self, ctx: &Ctx, image_config: &protocol::Image) -> Result<()> { + metrics::IMAGE_DOWNLOAD_REQUEST_TOTAL.inc(); + + match self.downloads.entry_async(image_config.id).await { + // The image download started at some point in the current runtime and finished downloading + Entry::Occupied(_) => { + tracing::debug!(image_id=?image_config.id, "image already downloaded"); + + // Update LRU cache + sqlx::query(indoc!( + " + UPDATE images_cache + SET last_used_ts = ?2 + WHERE image_id = ?1 + ", + )) + .bind(image_config.id) + .bind(utils::now()) + .execute(&mut *ctx.sql().await?) + .await?; + } + // The image is not currently being downloaded + Entry::Vacant(entry) => { + // Check database for image + let row = sqlx::query_as::<_, (i64,)>(indoc!( + " + SELECT 1 + FROM images_cache + WHERE image_id = ?1 AND download_complete_ts IS NOT NULL + ", + )) + .bind(image_config.id) + .fetch_optional(&mut *ctx.sql().await?) + .await?; + + // Image exists and is downloaded + if row.is_some() { + tracing::debug!(image_id=?image_config.id, "image already downloaded"); + return Ok(()); + } + + // Image does not exist/wasn't fully downloaded and isn't currently downloading, continue + metrics::IMAGE_DOWNLOAD_CACHE_MISS_TOTAL.inc(); + + let start_instant = Instant::now(); + tracing::info!(image_id=?image_config.id, "downloading image"); + + let image_path = ctx.image_path(image_config.id); + + // Clear any previous content and make image dir + match fs::remove_dir_all(&image_path).await { + Err(e) if e.kind() == ErrorKind::NotFound => {} + res => res.context("failed to delete image dir")?, + } + fs::create_dir(&image_path) + .await + .context("failed to create image dir")?; + + // NOTE: Txn here so that we prune and insert the new image row at the same time. This ensures + // if another image is downloading concurrently that it will calculate the correct images + // dir size. + let mut conn = ctx.sql().await?; + let mut tx = conn.begin().await?; + + let ((cache_count, images_dir_size), image_download_size) = tokio::try_join!( + async { + // Get total size of images directory. Note that it doesn't matter if this doesn't + // match the actual fs size because it should either be exactly at or below actual fs + // size. Also calculating fs size manually is expensive. + sqlx::query_as::<_, (i64, i64)>(indoc!( + " + SELECT COUNT(size), COALESCE(SUM(size), 0) FROM images_cache + ", + )) + .fetch_one(&mut *tx) + .await + .map_err(Into::::into) + }, + // NOTE: The image size here is somewhat misleading because its only the size of the + // downloaded archive and not the total disk usage after it is unpacked. However, this is + // good enough + self.fetch_image_download_size(ctx, image_config), + )?; + + // Prune images + let (removed_count, removed_bytes) = if images_dir_size as u64 + image_download_size + > ctx.config().images.max_cache_size() + { + // Fetch as many images as it takes to clear up enough space for this new image. + // Ordered by LRU + let rows = sqlx::query_as::<_, (Uuid, i64)>(indoc!( + " + WITH + cumulative_sizes AS ( + SELECT + ic.image_id, + ic.size, + ic.last_used_ts, + SUM(ic.size) + OVER (ORDER BY ic.last_used_ts ROWS UNBOUNDED PRECEDING) + AS running_total + FROM images_cache AS ic + LEFT JOIN actors AS a + -- Filter out images that are currently in use by actors + ON + ic.image_id = a.image_id AND + a.stop_ts IS NULL + WHERE + -- Filter out current image, will be upserted + ic.image_id != ?1 AND + a.image_id IS NULL + ORDER BY ic.last_used_ts + ) + SELECT image_id, size + FROM cumulative_sizes + WHERE running_total - size < ?2 + ORDER BY last_used_ts + ", + )) + .bind(image_config.id) + .bind( + (images_dir_size as u64) + .saturating_add(image_download_size) + .saturating_sub(ctx.config().images.max_cache_size()) as i64, + ) + .fetch_all(&mut *tx) + .await?; + + let rows_len = rows.len(); + + if rows.is_empty() { + tracing::error!( + image_id=?image_config.id, + "no inactive images to delete to make space for new image, downloading anyway", + ); + } else { + tracing::debug!(count=?rows_len, "cache full, clearing LRU entries"); + } + + let mut total_removed_bytes = 0; + + for (image_id, size) in rows { + total_removed_bytes += size; + + // NOTE: The sql query does not return the current image id so there is no chance + // for a deadlock here + // Acquire lock on entry + let entry = self.downloads.entry_async(image_id).await; + + match fs::remove_dir_all(ctx.image_path(image_id)).await { + Err(e) if e.kind() == ErrorKind::NotFound => {} + res => res.context("failed to delete image dir")?, + } + + // Remove entry and release lock + if let Entry::Occupied(entry) = entry { + let _ = entry.remove(); + } + } + + (rows_len as i64, total_removed_bytes as i64) + } else { + (0, 0) + }; + + metrics::IMAGE_CACHE_COUNT.set(cache_count + 1 - removed_count); + metrics::IMAGE_CACHE_SIZE + .set(images_dir_size + image_download_size as i64 - removed_bytes); + + sqlx::query(indoc!( + " + INSERT OR REPLACE INTO images_cache (image_id, size, last_used_ts, download_complete_ts) + VALUES (?1, 0, ?2, NULL) + ", + )) + .bind(image_config.id) + .bind(utils::now()) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + // Release lock on sqlite pool + drop(conn); + + self.download_inner(ctx, image_config).await?; + self.convert(ctx, image_config).await?; + + // Calculate dir size after unpacking image and save to db + let image_size = utils::total_dir_size(&image_path).await?; + + // Update metrics after unpacking + metrics::IMAGE_CACHE_SIZE.set(images_dir_size + image_size as i64 - removed_bytes); + + // Update state to signify download completed successfully + sqlx::query(indoc!( + " + UPDATE images_cache + SET + download_complete_ts = ?2 AND + size = ?3 + WHERE image_id = ?1 + ", + )) + .bind(image_config.id) + .bind(utils::now()) + .bind(image_size as i64) + .execute(&mut *ctx.sql().await?) + .await?; + + let duration = start_instant.elapsed().as_secs_f64(); + crate::metrics::DOWNLOAD_IMAGE_DURATION.observe(duration); + tracing::info!(duration_seconds = duration, "image download completed"); + + // The lock on entry is held until this point. After this any other parallel downloaders will + // continue with the image already downloaded + entry.insert_entry(()); + } + } + + Ok(()) + } + + async fn download_inner(&self, ctx: &Ctx, image_config: &protocol::Image) -> Result<()> { + let image_path = ctx.image_path(image_config.id); + + let addresses = self.get_image_addresses(ctx, image_config).await?; + + // Log the URLs we're attempting to download from + tracing::info!( + image_id=?image_config.id, + addresses=?addresses, + "initiating image download" + ); + + // Try each URL until one succeeds + let mut last_error = None; + for url in &addresses { + tracing::info!(image_id=?image_config.id, ?url, "attempting download"); + + // Build the shell command based on image kind and compression + // Using shell commands with native Unix pipes improves performance by: + // 1. Reducing overhead of passing data through Rust + // 2. Letting the OS handle data transfer between processes efficiently + // 3. Avoiding unnecessary buffer copies in memory + let shell_cmd = match (image_config.kind, image_config.compression) { + // Docker image, no compression + (protocol::ImageKind::DockerImage, protocol::ImageCompression::None) => { + let docker_image_path = image_path.join("docker-image.tar"); + tracing::info!(image_id=?image_config.id, "downloading uncompressed docker image using curl"); + + // Use curl to download directly to file + format!("curl -sSfL '{}' -o '{}'", url, docker_image_path.display()) + } + + // Docker image with LZ4 compression + (protocol::ImageKind::DockerImage, protocol::ImageCompression::Lz4) => { + let docker_image_path = image_path.join("docker-image.tar"); + tracing::info!( + image_id=?image_config.id, + "downloading and decompressing docker image using curl | lz4", + ); + + // Use curl piped to lz4 for decompression + format!( + "curl -sSfL '{}' | lz4 -d - '{}'", + url, + docker_image_path.display() + ) + } + + // OCI Bundle or JavaScript with no compression + ( + protocol::ImageKind::OciBundle | protocol::ImageKind::JavaScript, + protocol::ImageCompression::None, + ) => { + tracing::info!( + image_id=?image_config.id, + "downloading and unarchiving uncompressed artifact using curl | tar", + ); + + // Use curl piped to tar for extraction + format!( + "curl -sSfL '{}' | tar -x -C '{}'", + url, + image_path.display() + ) + } + + // OCI Bundle or JavaScript with LZ4 compression + ( + protocol::ImageKind::OciBundle | protocol::ImageKind::JavaScript, + protocol::ImageCompression::Lz4, + ) => { + tracing::info!( + image_id=?image_config.id, + "downloading, decompressing, and unarchiving artifact using curl | lz4 | tar", + ); + + // Use curl piped to lz4 for decompression, then to tar for extraction + format!( + "curl -sSfL '{}' | lz4 -d | tar -x -C '{}'", + url, + image_path.display() + ) + } + }; + + // Execute the shell command + // Use curl's built-in error handling to fail silently and let us try the next URL + let cmd_result = Command::new("sh").arg("-c").arg(&shell_cmd).output().await; + + match cmd_result { + Ok(output) if output.status.success() => { + tracing::info!(image_id=?image_config.id, ?url, "successfully downloaded image"); + + return Ok(()); + } + Ok(output) => { + // Command ran but failed + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!( + image_id=?image_config.id, + ?url, + status=?output.status, + stderr=%stderr, + "failed to download image" + ); + last_error = Some(anyhow!("download failed: {}", stderr)); + } + Err(e) => { + // Failed to execute command + tracing::warn!( + image_id=?image_config.id, + ?url, + error=?e, + "failed to execute download command" + ); + last_error = Some(anyhow!("download command failed: {}", e)); + } + } + } + + // If we get here, all URLs failed + Err(last_error + .unwrap_or_else(|| anyhow!("failed to download image from any available URL"))) + } + + // Convert downloaded image to other formats (if needed) + async fn convert(&self, ctx: &Ctx, image_config: &protocol::Image) -> Result<()> { + let image_path = ctx.image_path(image_config.id); + + // We need to convert the Docker image to an OCI bundle in order to run it. + // Allows us to work with the build with umoci + if let protocol::ImageKind::DockerImage = image_config.kind { + let docker_image_path = image_path.join("docker-image.tar"); + let oci_image_path = image_path.join("oci-image"); + + tracing::info!("converting Docker image -> OCI image",); + let conversion_start = Instant::now(); + let cmd_out = Command::new("skopeo") + .arg("copy") + .arg(format!("docker-archive:{}", docker_image_path.display())) + .arg(format!("oci:{}:default", oci_image_path.display())) + .output() + .await?; + ensure!( + cmd_out.status.success(), + "failed `skopeo` command\n{}", + std::str::from_utf8(&cmd_out.stderr)? + ); + tracing::info!( + duration_seconds = conversion_start.elapsed().as_secs_f64(), + "docker to OCI conversion completed", + ); + + // Allows us to run the bundle natively with runc + tracing::info!("converting OCI image -> OCI bundle"); + let unpack_start = Instant::now(); + let cmd_out = Command::new("umoci") + .arg("unpack") + .arg("--image") + .arg(format!("{}:default", oci_image_path.display())) + .arg(&image_path) + .output() + .await?; + ensure!( + cmd_out.status.success(), + "failed `umoci` command\n{}", + std::str::from_utf8(&cmd_out.stderr)? + ); + tracing::info!( + duration_seconds = unpack_start.elapsed().as_secs_f64(), + "OCI image unpacking completed", + ); + + // Remove artifacts + tracing::info!("cleaning up temporary image artifacts"); + tokio::try_join!( + fs::remove_file(&docker_image_path), + fs::remove_dir_all(&oci_image_path), + ) + .context("failed to delete temporary image artifacts")?; + } + + Ok(()) + } + + /// Generates a list of address URLs for a given build ID, with deterministic shuffling. + /// + /// This function accepts a build ID and returns an array of URLs, including both + /// the seeded shuffling and the fallback address (if provided). + async fn get_image_addresses( + &self, + ctx: &Ctx, + image_config: &protocol::Image, + ) -> Result> { + // Get hash from image id + let mut hasher = DefaultHasher::new(); + hasher.write(image_config.id.as_bytes()); + let hash = hasher.finish(); + + let mut rng = ChaCha12Rng::seed_from_u64(hash); + + // Shuffle based on hash + let mut addresses = self + .pull_addr_handler + .addresses(ctx.config()) + .await? + .iter() + .map(|addr| { + Ok( + Url::parse(&format!("{addr}{}", image_config.artifact_url_stub)) + .context("failed to build artifact url")? + .to_string(), + ) + }) + .collect::>>()?; + addresses.shuffle(&mut rng); + + // Add fallback url to the end if one is set + if let Some(fallback_artifact_url) = &image_config.fallback_artifact_url { + addresses.push(fallback_artifact_url.to_string()); + } + + ensure!( + !addresses.is_empty(), + "no artifact urls available (no pull addresses nor fallback)" + ); + + Ok(addresses) + } + + /// Attempts to fetch HEAD for the image download url and determine the image's download size. + async fn fetch_image_download_size( + &self, + ctx: &Ctx, + image_config: &protocol::Image, + ) -> Result { + let addresses = self.get_image_addresses(ctx, image_config).await?; + + let mut iter = addresses.into_iter(); + while let Some(artifact_url) = iter.next() { + // Log the full URL we're attempting to download from + tracing::info!(image_id=?image_config.id, %artifact_url, "attempting to download image"); + + match reqwest::Client::new() + .head(&artifact_url) + .send() + .await + .and_then(|res| res.error_for_status()) + { + Ok(res) => { + tracing::info!(image_id=?image_config.id, %artifact_url, "successfully fetched image HEAD"); + + // Read Content-Length header from response + let image_size = res + .headers() + .get(reqwest::header::CONTENT_LENGTH) + .context("no Content-Length header")? + .to_str()? + .parse::() + .context("invalid Content-Length header")?; + + return Ok(image_size); + } + Err(err) => { + tracing::warn!( + image_id=?image_config.id, + %artifact_url, + %err, + "failed to fetch image HEAD" + ); + } + } + } + + bail!("artifact url could not be resolved"); + } +} diff --git a/packages/edge/infra/client/manager/src/lib.rs b/packages/edge/infra/client/manager/src/lib.rs index e2dd405100..24856d27a5 100644 --- a/packages/edge/infra/client/manager/src/lib.rs +++ b/packages/edge/infra/client/manager/src/lib.rs @@ -9,6 +9,8 @@ mod ctx; #[cfg(feature = "test")] pub mod event_sender; #[cfg(feature = "test")] +pub mod image_download_handler; +#[cfg(feature = "test")] mod metrics; #[cfg(feature = "test")] pub mod pull_addr_handler; diff --git a/packages/edge/infra/client/manager/src/main.rs b/packages/edge/infra/client/manager/src/main.rs index 462c24bec1..9b99311628 100644 --- a/packages/edge/infra/client/manager/src/main.rs +++ b/packages/edge/infra/client/manager/src/main.rs @@ -22,6 +22,7 @@ use url::Url; mod actor; mod ctx; mod event_sender; +mod image_download_handler; mod metrics; mod pull_addr_handler; mod runner; diff --git a/packages/edge/infra/client/manager/src/metrics/buckets.rs b/packages/edge/infra/client/manager/src/metrics/buckets.rs index facc7b9b02..36c90830be 100644 --- a/packages/edge/infra/client/manager/src/metrics/buckets.rs +++ b/packages/edge/infra/client/manager/src/metrics/buckets.rs @@ -5,3 +5,8 @@ pub const BUCKETS: &[f64] = &[ 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, // Added 25.0, 50.0, 100.0, 250.0, 500.0, ]; + +pub const MICRO_BUCKETS: &[f64] = &[ + 0.0001, 0.00025, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.5, 1.0, 2.5, + 5.0, 10.0, 25.0, 50.0, +]; diff --git a/packages/edge/infra/client/manager/src/metrics/mod.rs b/packages/edge/infra/client/manager/src/metrics/mod.rs index 7a695b2436..6d99f02893 100644 --- a/packages/edge/infra/client/manager/src/metrics/mod.rs +++ b/packages/edge/infra/client/manager/src/metrics/mod.rs @@ -1,10 +1,10 @@ use prometheus::*; -// mod buckets; +mod buckets; mod registry; mod server; -// pub use buckets::BUCKETS; +pub use buckets::{BUCKETS, MICRO_BUCKETS}; pub use registry::REGISTRY; pub use server::run_standalone; @@ -51,52 +51,91 @@ lazy_static::lazy_static! { *REGISTRY, ).unwrap(); - // Actor setup step duration metrics + pub static ref DOWNLOAD_IMAGE_DURATION: Histogram = register_histogram_with_registry!( + "download_image_duration", + "Duration of image download", + BUCKETS.to_vec(), + *REGISTRY, + ).unwrap(); + + // MARK: Actor setup step duration metrics pub static ref SETUP_TOTAL_DURATION: Histogram = register_histogram_with_registry!( - "actor_setup_total_duration_seconds", - "Duration of the complete actor setup process in seconds", + "actor_setup_total_duration", + "Duration of the complete actor setup process", + BUCKETS.to_vec(), *REGISTRY, ).unwrap(); pub static ref SETUP_MAKE_FS_DURATION: Histogram = register_histogram_with_registry!( - "actor_setup_make_fs_duration_seconds", - "Duration of make_fs step in seconds", + "actor_setup_make_fs_duration", + "Duration of fs creation step", + BUCKETS.to_vec(), *REGISTRY, ).unwrap(); pub static ref SETUP_DOWNLOAD_IMAGE_DURATION: Histogram = register_histogram_with_registry!( - "actor_setup_download_image_duration_seconds", - "Duration of download_image step in seconds", + "actor_setup_download_image_duration", + "Duration of image download step", + BUCKETS.to_vec(), *REGISTRY, ).unwrap(); pub static ref SETUP_BIND_PORTS_DURATION: Histogram = register_histogram_with_registry!( - "actor_setup_bind_ports_duration_seconds", - "Duration of bind_ports step in seconds", + "actor_setup_bind_ports_duration", + "Duration of port binding step", + MICRO_BUCKETS.to_vec(), *REGISTRY, ).unwrap(); pub static ref SETUP_CNI_NETWORK_DURATION: Histogram = register_histogram_with_registry!( - "actor_setup_cni_network_duration_seconds", - "Duration of setup_cni_network step in seconds", + "actor_setup_cni_network_duration", + "Duration of CNI network setup step", + BUCKETS.to_vec(), *REGISTRY, ).unwrap(); pub static ref SETUP_OCI_BUNDLE_DURATION: Histogram = register_histogram_with_registry!( - "actor_setup_oci_bundle_duration_seconds", - "Duration of setup_oci_bundle step in seconds", + "actor_setup_oci_bundle_duration", + "Duration of OCI bundle setup step", + BUCKETS.to_vec(), *REGISTRY, ).unwrap(); pub static ref SETUP_ISOLATE_DURATION: Histogram = register_histogram_with_registry!( - "actor_setup_isolate_duration_seconds", - "Duration of setup_isolate step in seconds", + "actor_setup_isolate_duration", + "Duration of isolate setup", + BUCKETS.to_vec(), *REGISTRY, ).unwrap(); pub static ref SETUP_PARALLEL_TASKS_DURATION: Histogram = register_histogram_with_registry!( - "actor_setup_parallel_tasks_duration_seconds", - "Duration of parallel setup tasks (image download/fs + ports/network) in seconds", + "actor_setup_parallel_tasks_duration", + "Duration of parallel setup tasks (image download/fs + ports/network)", + BUCKETS.to_vec(), + *REGISTRY, + ).unwrap(); + + pub static ref IMAGE_DOWNLOAD_REQUEST_TOTAL: IntCounter = register_int_counter_with_registry!( + "image_download_request_total", + "Total number of download requests.", + *REGISTRY, + ).unwrap(); + + pub static ref IMAGE_DOWNLOAD_CACHE_MISS_TOTAL: IntCounter = register_int_counter_with_registry!( + "image_download_cache_miss_total", + "Total number of download requests that missed cache.", + *REGISTRY, + ).unwrap(); + + pub static ref IMAGE_CACHE_COUNT: IntGauge = register_int_gauge_with_registry!( + "image_cache_count", + "Total number of images currently in cache.", + *REGISTRY, + ).unwrap(); + + pub static ref IMAGE_CACHE_SIZE: IntGauge = register_int_gauge_with_registry!( + "image_cache_size", + "Total byte size of cache images folder.", *REGISTRY, ).unwrap(); } diff --git a/packages/edge/infra/client/manager/src/pull_addr_handler.rs b/packages/edge/infra/client/manager/src/pull_addr_handler.rs index 1ba9ab2b76..60cba4eb5e 100644 --- a/packages/edge/infra/client/manager/src/pull_addr_handler.rs +++ b/packages/edge/infra/client/manager/src/pull_addr_handler.rs @@ -8,6 +8,7 @@ use tokio::sync::RwLock; /// Duration between pulling addresses again. const PULL_INTERVAL: Duration = Duration::from_secs(3 * 60); +/// Handles the list of ATS node addresses to pull images from. pub struct PullAddrHandler { last_pull: RwLock>, addresses: RwLock>, diff --git a/packages/edge/infra/client/manager/src/runner.rs b/packages/edge/infra/client/manager/src/runner.rs index 6d1c09b2ad..153b02dd45 100644 --- a/packages/edge/infra/client/manager/src/runner.rs +++ b/packages/edge/infra/client/manager/src/runner.rs @@ -230,6 +230,19 @@ impl Handle { // Disassociate from the parent by creating a new session setsid().context("setsid failed")?; + // Adjust nice, cpu priority, and OOM score + let pid = std::process::id() as i32; + utils::libc::set_nice_level(pid, 0).context("failed to set nice level")?; + utils::libc::set_oom_score_adj(pid, 0) + .context("failed to set oom score adjustment")?; + utils::libc::set_scheduling_policy( + pid, + utils::libc::SchedPolicy::Other, + // Must be 0 with SCHED_OTHER + 0, + ) + .context("failed to set scheduling policy")?; + // Exit immediately on fail in order to not leak process let err = std::process::Command::new(&runner_binary_path) .args(&runner_args) diff --git a/packages/edge/infra/client/manager/src/utils/libc.rs b/packages/edge/infra/client/manager/src/utils/libc.rs new file mode 100644 index 0000000000..2f346f2781 --- /dev/null +++ b/packages/edge/infra/client/manager/src/utils/libc.rs @@ -0,0 +1,49 @@ +use std::{fs, io}; + +use nix::libc::{c_int, sched_param, sched_setscheduler, setpriority, PRIO_PROCESS}; + +#[allow(dead_code)] +#[repr(i32)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SchedPolicy { + Other = nix::libc::SCHED_OTHER, + Fifo = nix::libc::SCHED_FIFO, + Rr = nix::libc::SCHED_RR, + Batch = nix::libc::SCHED_BATCH, + Idle = nix::libc::SCHED_IDLE, +} + +/// `nice_value` must be between -20 (highest priority) to 19 (lowest priority) +pub fn set_nice_level(pid: i32, nice_value: i32) -> io::Result<()> { + let result = unsafe { setpriority(PRIO_PROCESS, pid as u32, nice_value as c_int) }; + + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} + +/// `oom_score_adj` must be between -1000 (don't kill) to 1000 (most likely to be killed) +pub fn set_oom_score_adj(pid: i32, oom_score_adj: i32) -> io::Result<()> { + fs::write( + format!("/proc/{pid}/oom_score_adj"), + oom_score_adj.to_string(), + ) +} + +/// with SchedPolicy::Other, SchedPolicy::Batch, and SchedPolicy::Idle `priority` must be 0. With +/// SchedPolicy::Fifo and SchedPolicy::Rr `priority` must be between 1 (lowest) to 99 (highest). +pub fn set_scheduling_policy(pid: i32, policy: SchedPolicy, priority: i32) -> io::Result<()> { + let param = sched_param { + sched_priority: priority, + }; + + let result = unsafe { sched_setscheduler(pid, policy as c_int, ¶m) }; + + if result == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} diff --git a/packages/edge/infra/client/manager/src/utils/mod.rs b/packages/edge/infra/client/manager/src/utils/mod.rs index edbae38096..475ff557a2 100644 --- a/packages/edge/infra/client/manager/src/utils/mod.rs +++ b/packages/edge/infra/client/manager/src/utils/mod.rs @@ -1,13 +1,10 @@ use std::{ - hash::{DefaultHasher, Hasher}, path::Path, result::Result::{Err, Ok}, - sync::Arc, time::{self, Duration}, }; use anyhow::*; -use futures_util::Stream; use indoc::indoc; use notify::{ event::{AccessKind, AccessMode}, @@ -15,8 +12,6 @@ use notify::{ }; use pegboard::protocol; use pegboard_config::Config; -use rand::{prelude::SliceRandom, SeedableRng}; -use rand_chacha::ChaCha12Rng; use sql::SqlitePoolExt; use sqlx::{ migrate::MigrateDatabase, @@ -27,11 +22,8 @@ use tokio::{ fs, sync::mpsc::{channel, Receiver}, }; -use url::Url; -use uuid::Uuid; - -use crate::ctx::Ctx; +pub mod libc; pub mod sql; pub async fn init_dir(config: &Config) -> Result<()> { @@ -73,6 +65,12 @@ pub async fn init_dir(config: &Config) -> Result<()> { x => x.context("failed to create /actors dir in data dir")?, } + // Create images dir + match fs::create_dir(data_dir.join("images")).await { + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {} + x => x.context("failed to create /images dir in data dir")?, + } + // Create runner dir match fs::create_dir(data_dir.join("runner")).await { Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {} @@ -193,6 +191,22 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { .execute(&mut *conn) .await?; + sqlx::query(indoc!( + " + CREATE TABLE IF NOT EXISTS images_cache ( + image_id BLOB NOT NULL, -- UUID + size INTEGER NOT NULL, + + last_used_ts INTEGER NOT NULL, + download_complete_ts INTEGER, + + PRIMARY KEY (image_id) + ) STRICT + ", + )) + .execute(&mut *conn) + .await?; + sqlx::query(indoc!( " CREATE TABLE IF NOT EXISTS actors ( @@ -208,6 +222,9 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { pid INTEGER, exit_code INTEGER, + -- Also exists in the config column but this is for indexing + image_id BLOB NOT NULL, -- UUID + PRIMARY KEY (actor_id, generation) ) STRICT ", @@ -215,6 +232,16 @@ async fn init_sqlite_schema(pool: &SqlitePool) -> Result<()> { .execute(&mut *conn) .await?; + sqlx::query(indoc!( + " + CREATE INDEX IF NOT EXISTS actors_image_id_idx + ON actors(image_id) + WHERE stop_ts IS NULL + ", + )) + .execute(&mut *conn) + .await?; + sqlx::query(indoc!( " CREATE TABLE IF NOT EXISTS actor_ports ( @@ -261,50 +288,6 @@ pub fn now() -> i64 { .expect("now doesn't fit in i64") } -/// Generates a list of address URLs for a given build ID, with deterministic shuffling. -/// -/// This function accepts a build ID and returns an array of URLs, including both -/// the seeded shuffling and the fallback address (if provided). -pub async fn get_image_addresses( - ctx: &Ctx, - image_id: Uuid, - image_artifact_url_stub: &str, - image_fallback_artifact_url: Option<&str>, -) -> Result> { - // Get hash from image id - let mut hasher = DefaultHasher::new(); - hasher.write(image_id.as_bytes()); - let hash = hasher.finish(); - - let mut rng = ChaCha12Rng::seed_from_u64(hash); - - // Shuffle based on hash - let mut addresses = ctx - .pull_addr_handler - .addresses(ctx.config()) - .await? - .iter() - .map(|addr| { - Ok(Url::parse(&format!("{addr}{}", image_artifact_url_stub)) - .context("failed to build artifact url")? - .to_string()) - }) - .collect::>>()?; - addresses.shuffle(&mut rng); - - // Add fallback url to the end if one is set - if let Some(fallback_artifact_url) = image_fallback_artifact_url { - addresses.push(fallback_artifact_url.to_string()); - } - - ensure!( - !addresses.is_empty(), - "no artifact urls available (no pull addresses nor fallback)" - ); - - Ok(addresses) -} - /// Creates an async file watcher. fn async_watcher() -> Result<(RecommendedWatcher, Receiver>)> { let (tx, rx) = channel(1); @@ -353,65 +336,59 @@ pub async fn wait_for_write>(path: P) -> Result<()> { Ok(()) } -/// Deterministically shuffles a list of available ATS URL's to download the image from based on the image -// ID and attempts to download from each URL until success. -pub async fn fetch_image_stream( - ctx: &Ctx, - image_id: Uuid, - image_artifact_url_stub: &str, - image_fallback_artifact_url: Option<&str>, -) -> Result>> { - let addresses = get_image_addresses( - ctx, - image_id, - image_artifact_url_stub, - image_fallback_artifact_url, - ) - .await?; +/// Recursively copy a directory from source to destination. +pub async fn copy_dir_all, Q: AsRef>(src: P, dst: Q) -> Result<()> { + let src = src.as_ref(); + let dst = dst.as_ref(); - let mut iter = addresses.into_iter(); - while let Some(artifact_url) = iter.next() { - // Log the full URL we're attempting to download from - tracing::info!(?image_id, %artifact_url, "attempting to download image"); + if !src.is_dir() { + return Err(anyhow!("source is not a directory: {}", src.display())); + } - match reqwest::get(&artifact_url) - .await - .and_then(|res| res.error_for_status()) - { - Ok(res) => { - tracing::info!(?image_id, %artifact_url, "successfully downloading image"); - return Ok(res.bytes_stream()); - } - Err(err) => { - tracing::warn!( - ?image_id, - %artifact_url, - %err, - "failed to download image" - ); - } + if !dst.exists() { + fs::create_dir_all(dst).await?; + } else if !dst.is_dir() { + return Err(anyhow!( + "destination exists but is not a directory: {}", + dst.display() + )); + } + + let mut read_dir = fs::read_dir(src).await?; + + while let Some(entry) = read_dir.next_entry().await? { + let entry_path = entry.path(); + let file_name = entry.file_name(); + let dst_path = dst.join(file_name); + + if entry_path.is_dir() { + Box::pin(copy_dir_all(entry_path, dst_path)).await?; + } else { + fs::copy(entry_path, dst_path).await?; } } - bail!("artifact url could not be resolved"); + Ok(()) } -pub fn prewarm_image(ctx: &Arc, image_id: Uuid, image_artifact_url_stub: &str) { - // Log full URL for prewarm operation - let prewarm_url = format!("{}/{}", image_artifact_url_stub, image_id); - tracing::info!(?image_id, %prewarm_url, "prewarming image"); - - let ctx = ctx.clone(); - let image_artifact_url_stub = image_artifact_url_stub.to_string(); - - tokio::spawn(async move { - match fetch_image_stream(&ctx, image_id, &image_artifact_url_stub, None).await { - Ok(_) => tracing::info!(?image_id, %prewarm_url, "prewarm complete"), - Err(_) => tracing::warn!( - ?image_id, - %prewarm_url, - "prewarm failed, artifact url could not be resolved" - ), +/// Calculates the total size of a folder in bytes. +pub async fn total_dir_size>(path: P) -> Result { + let path = path.as_ref(); + + ensure!(path.is_dir(), "path is not a directory: {}", path.display()); + + let mut total_size = 0; + let mut read_dir = fs::read_dir(path).await?; + + while let Some(entry) = read_dir.next_entry().await? { + let entry_path = entry.path(); + + if entry_path.is_dir() { + total_size += Box::pin(total_dir_size(entry_path)).await?; + } else { + total_size += fs::metadata(entry_path).await?.len(); } - }); + } + + Ok(total_size) } diff --git a/packages/edge/infra/client/manager/tests/client_rebuild_state.rs b/packages/edge/infra/client/manager/tests/client_rebuild_state.rs index 9a2598b46a..76de5502f2 100644 --- a/packages/edge/infra/client/manager/tests/client_rebuild_state.rs +++ b/packages/edge/infra/client/manager/tests/client_rebuild_state.rs @@ -33,7 +33,6 @@ async fn client_rebuild_state() { let close_tx = Arc::new(close_tx); let actor_id = Uuid::new_v4(); - let actor_port = portpicker::pick_unused_port().expect("no free ports"); let first_client = Arc::new(AtomicBool::new(true)); let port = portpicker::pick_unused_port().expect("no free ports"); @@ -48,7 +47,6 @@ async fn client_rebuild_state() { close_tx, raw_stream, actor_id, - actor_port, first_client2.clone(), ) }, @@ -72,7 +70,6 @@ async fn handle_connection( close_tx: Arc>, raw_stream: TcpStream, actor_id: Uuid, - actor_port: u16, first_client: Arc, ) { tokio::spawn(async move { @@ -100,7 +97,7 @@ async fn handle_connection( if first_client.load(Ordering::SeqCst) { // Spawn actor on first client - start_echo_actor(&mut tx, actor_id, actor_port).await; + start_echo_actor(&mut tx, actor_id).await; } else { tokio::time::sleep(Duration::from_millis(350)).await; @@ -108,7 +105,7 @@ async fn handle_connection( let actors = ctx.actors().read().await; assert!( - actors.contains_key(&actor_id), + actors.contains_key(&(actor_id, 0)), "actor not in client memory" ); @@ -119,9 +116,9 @@ async fn handle_connection( &mut tx, protocol::Command::SignalActor { actor_id, + generation: 0, signal: Signal::SIGKILL as i32, persist_storage: false, - ignore_future_state: false, }, ) .await; @@ -146,7 +143,7 @@ async fn handle_connection( // Verify client state let actors = ctx.actors().read().await; assert!( - !actors.contains_key(&actor_id), + !actors.contains_key(&(actor_id, 0)), "actor still in client memory" ); @@ -159,6 +156,7 @@ async fn handle_connection( } } } + protocol::ToServer::AckCommands { .. } => {} } } Message::Close(_) => { diff --git a/packages/edge/infra/client/manager/tests/client_state_external_kill.rs b/packages/edge/infra/client/manager/tests/client_state_external_kill.rs index ecc65bcfd5..12e27c3155 100644 --- a/packages/edge/infra/client/manager/tests/client_state_external_kill.rs +++ b/packages/edge/infra/client/manager/tests/client_state_external_kill.rs @@ -36,7 +36,6 @@ async fn client_state_external_kill() { let close_tx = Arc::new(close_tx); let actor_id = Uuid::new_v4(); - let actor_port = portpicker::pick_unused_port().expect("no free ports"); let first_client = Arc::new(AtomicBool::new(true)); let actor_pid = Arc::new(AtomicI32::new(0)); @@ -53,7 +52,6 @@ async fn client_state_external_kill() { close_tx, raw_stream, actor_id, - actor_port, first_client2.clone(), actor_pid2.clone(), ) @@ -87,7 +85,6 @@ async fn handle_connection( close_tx: Arc>, raw_stream: TcpStream, actor_id: Uuid, - actor_port: u16, first_client: Arc, actor_pid: Arc, ) { @@ -116,7 +113,7 @@ async fn handle_connection( if first_client.load(Ordering::SeqCst) { // Spawn actor on first client - start_echo_actor(&mut tx, actor_id, actor_port).await; + start_echo_actor(&mut tx, actor_id).await; } } protocol::ToServer::Events(events) => { @@ -141,7 +138,7 @@ async fn handle_connection( // Verify client state let actors = ctx.actors().read().await; assert!( - !actors.contains_key(&actor_id), + !actors.contains_key(&(actor_id, 0)), "actor still in client memory" ); @@ -154,6 +151,7 @@ async fn handle_connection( } } } + protocol::ToServer::AckCommands { .. } => {} } } Message::Close(_) => { diff --git a/packages/edge/infra/client/manager/tests/common.rs b/packages/edge/infra/client/manager/tests/common.rs index b46311d3f0..28dd45aed3 100644 --- a/packages/edge/infra/client/manager/tests/common.rs +++ b/packages/edge/infra/client/manager/tests/common.rs @@ -68,7 +68,6 @@ pub async fn send_init_packet(tx: &mut SplitSink, Message>, actor_id: Uuid, - port: u16, ) { let cmd = protocol::Command::StartActor { actor_id, @@ -82,7 +81,7 @@ pub async fn start_echo_actor( compression: protocol::ImageCompression::None, }, root_user_enabled: false, - env: [("PORT".to_string(), port.to_string())] + env: [("foo".to_string(), "bar".to_string())] .into_iter() .collect(), ports: [( @@ -266,11 +265,12 @@ pub async fn init_client(gen_path: &Path, working_path: &Path) -> Config { // Not necessary for the test flavor: protocol::ClientFlavor::Container, port: None, - use_mounts: Some(false), + use_mounts: Some(true), container_runner_binary_path: Some(container_runner_binary_path), isolate_runner_binary_path: Some(isolate_runner_binary_path), }, images: Images { + max_cache_size: None, // Should match the URL in `serve_binaries` pull_addresses: Some(Addresses::Static(vec![format!( "http://127.0.0.1:{ARTIFACTS_PORT}" @@ -487,11 +487,23 @@ pub async fn serve_binaries(gen_path: PathBuf) { panic!("invalid path: {path}"); }; - let file = File::open(path).await?; + let file = File::open(&path).await?; + + // Get file metadata to determine content length + let metadata = file.metadata().await?; + let content_length = metadata.len(); + let stream = ReaderStream::new(BufReader::new(file)); let body = Body::wrap_stream(stream); - Result::<_, std::io::Error>::Ok(Response::new(body)) + // Create response and add Content-Length header + let mut res = Response::new(body); + res.headers_mut().insert( + hyper::header::CONTENT_LENGTH, + hyper::header::HeaderValue::from(content_length), + ); + + Result::<_, std::io::Error>::Ok(res) } })) } diff --git a/packages/edge/infra/client/manager/tests/container_external_kill.rs b/packages/edge/infra/client/manager/tests/container_external_kill.rs index 046ee78dec..ed2babd53b 100644 --- a/packages/edge/infra/client/manager/tests/container_external_kill.rs +++ b/packages/edge/infra/client/manager/tests/container_external_kill.rs @@ -58,7 +58,6 @@ async fn handle_connection( }; let actor_id = Uuid::new_v4(); - let actor_port = portpicker::pick_unused_port().expect("no free ports"); // Receive messages from socket while let Some(msg) = rx.next().await { @@ -71,7 +70,7 @@ async fn handle_connection( protocol::ToServer::Init { .. } => { send_init_packet(&mut tx).await; - start_echo_actor(&mut tx, actor_id, actor_port).await; + start_echo_actor(&mut tx, actor_id).await; } protocol::ToServer::Events(events) => { for event in events { @@ -92,7 +91,7 @@ async fn handle_connection( // Verify client state let actors = ctx.actors().read().await; assert!( - !actors.contains_key(&actor_id), + !actors.contains_key(&(actor_id, 0)), "actor still in client memory" ); @@ -105,6 +104,7 @@ async fn handle_connection( } } } + protocol::ToServer::AckCommands { .. } => {} } } Message::Close(_) => { diff --git a/packages/edge/infra/client/manager/tests/container_lifecycle.rs b/packages/edge/infra/client/manager/tests/container_lifecycle.rs index 3975e92ab2..e6f716f7ea 100644 --- a/packages/edge/infra/client/manager/tests/container_lifecycle.rs +++ b/packages/edge/infra/client/manager/tests/container_lifecycle.rs @@ -40,8 +40,13 @@ async fn container_lifecycle() { // Init project directories let tmp_dir = tempfile::TempDir::new().unwrap(); - let config = init_client(&gen_tmp_dir_path, tmp_dir.path()).await; - tracing::info!(path=%tmp_dir.path().display(), "client dir"); + let path = tmp_dir.path(); + // let path = std::path::Path::new( + // "/home/rivet/rivet-ee/oss/packages/edge/infra/client/manager/tests/foo", + // ); + + let config = init_client(&gen_tmp_dir_path, &path).await; + tracing::info!(path=%path.display(), "client dir"); start_client(config, ctx_wrapper, close_rx, port).await; } @@ -64,7 +69,6 @@ async fn handle_connection( }; let actor_id = Uuid::new_v4(); - let actor_port = portpicker::pick_unused_port().expect("no free ports"); let mut actor_state = State::None; // Receive messages from socket @@ -78,7 +82,10 @@ async fn handle_connection( protocol::ToServer::Init { .. } => { send_init_packet(&mut tx).await; - start_echo_actor(&mut tx, actor_id, actor_port).await; + start_echo_actor(&mut tx, actor_id).await; + start_echo_actor(&mut tx, Uuid::new_v4()).await; + + tokio::time::sleep(std::time::Duration::from_millis(10000)).await; } protocol::ToServer::Events(events) => { for event in events { @@ -104,12 +111,12 @@ async fn handle_connection( "actor not in client memory" ); } - protocol::ActorState::Running { .. } => { + protocol::ActorState::Running { ref ports, .. } => { if let State::Starting = actor_state { actor_state = State::Running; } else { panic!( - "invalid prior state: {actor_state:?} -> {state:?}" + "invalid prior state: {actor_state:?} -> {state:?}", ); } @@ -125,10 +132,12 @@ async fn handle_connection( tracing::info!("sending echo"); + let port = ports.get("main").expect("no main port").source; + // Send echo test let req = b"hello world"; let res = reqwest::Client::new() - .post(format!("http://0.0.0.0:{actor_port}")) + .post(format!("http://0.0.0.0:{port}")) .body(req.to_vec()) .send() .await @@ -179,12 +188,12 @@ async fn handle_connection( ); } - tokio::time::sleep(Duration::from_millis(5)).await; + tokio::time::sleep(Duration::from_millis(50)).await; // Verify client state let actors = ctx.actors().read().await; assert!( - actors.contains_key(&(actor_id, 0)), + !actors.contains_key(&(actor_id, 0)), "actor still in client memory" ); @@ -195,6 +204,7 @@ async fn handle_connection( } } } + protocol::ToServer::AckCommands { .. } => {} } } Message::Close(_) => { diff --git a/packages/edge/infra/client/manager/tests/isolate_lifecycle.rs b/packages/edge/infra/client/manager/tests/isolate_lifecycle.rs index 72f2323ef0..56c2ef9ec4 100644 --- a/packages/edge/infra/client/manager/tests/isolate_lifecycle.rs +++ b/packages/edge/infra/client/manager/tests/isolate_lifecycle.rs @@ -99,7 +99,7 @@ async fn handle_connection( // Verify client state let actors = ctx.actors().read().await; assert!( - actors.contains_key(&actor_id), + actors.contains_key(&(actor_id, 0)), "actor not in client memory" ); } @@ -117,7 +117,7 @@ async fn handle_connection( // Verify client state let actors = ctx.actors().read().await; assert!( - actors.contains_key(&actor_id), + actors.contains_key(&(actor_id, 0)), "actor not in client memory" ); @@ -148,9 +148,9 @@ async fn handle_connection( &mut tx, protocol::Command::SignalActor { actor_id, + generation: 0, signal: Signal::SIGKILL as i32, persist_storage: false, - ignore_future_state: false, }, ) .await; @@ -167,7 +167,7 @@ async fn handle_connection( // Verify client state let actors = ctx.actors().read().await; assert!( - actors.contains_key(&actor_id), + actors.contains_key(&(actor_id, 0)), "actor not in client memory" ); } @@ -185,7 +185,7 @@ async fn handle_connection( // Verify client state let actors = ctx.actors().read().await; assert!( - !actors.contains_key(&actor_id), + !actors.contains_key(&(actor_id, 0)), "actor still in client memory" ); @@ -196,6 +196,7 @@ async fn handle_connection( } } } + protocol::ToServer::AckCommands { .. } => {} } } Message::Close(_) => { diff --git a/packages/edge/infra/client/manager/tests/vector.json b/packages/edge/infra/client/manager/tests/vector.json index 7cdd5fe0d2..8c2cdc760a 100644 --- a/packages/edge/infra/client/manager/tests/vector.json +++ b/packages/edge/infra/client/manager/tests/vector.json @@ -20,7 +20,10 @@ "transforms": { "actors": { "type": "filter", - "inputs": ["vector", "tcp_json"], + "inputs": [ + "vector", + "tcp_json" + ], "condition": { "type": "vrl", "source": ".source == \"actors\"" @@ -28,24 +31,30 @@ }, "add_prefix": { "type": "remap", - "inputs": ["actors"], - "source": ".message, err = \"\u001b[2m\" + \"actor_id=\" + .server_id + \"\u001b[0m \" + .message" + "inputs": [ + "actors" + ], + "source": ".message, err = \"\u001b[2m\" + \"actor_id=\" + .actor_id + \"\u001b[0m \" + .message" } }, "sinks": { "actor_logs": { "type": "console", - "inputs": ["add_prefix"], + "inputs": [ + "add_prefix" + ], "encoding": { "codec": "text" } }, "console": { "type": "console", - "inputs": ["vector_logs"], + "inputs": [ + "vector_logs" + ], "encoding": { "codec": "text" } } } -} +} \ No newline at end of file diff --git a/packages/edge/infra/guard/core/src/metrics.rs b/packages/edge/infra/guard/core/src/metrics.rs index 9e9e8a0b17..ae1f3c0ced 100644 --- a/packages/edge/infra/guard/core/src/metrics.rs +++ b/packages/edge/infra/guard/core/src/metrics.rs @@ -3,8 +3,8 @@ use rivet_metrics::{prometheus::*, REGISTRY, BUCKETS}; lazy_static! { // MARK: Internal - pub static ref ROUTE_CACHE_SIZE: IntGauge = register_int_gauge_with_registry!( - "guard_route_cache_size", + pub static ref ROUTE_CACHE_COUNT: IntGauge = register_int_gauge_with_registry!( + "guard_route_cache_count", "Number of entries in the route cache", *REGISTRY, ).unwrap(); diff --git a/packages/edge/infra/guard/core/src/proxy_service.rs b/packages/edge/infra/guard/core/src/proxy_service.rs index 3bbb20ba70..d025a2dbbd 100644 --- a/packages/edge/infra/guard/core/src/proxy_service.rs +++ b/packages/edge/infra/guard/core/src/proxy_service.rs @@ -182,7 +182,7 @@ impl RouteCache { async fn insert(&self, hostname: String, path: String, result: RouteConfig) { self.cache.insert((hostname, path), result).await; - metrics::ROUTE_CACHE_SIZE.set(self.cache.entry_count() as i64); + metrics::ROUTE_CACHE_COUNT.set(self.cache.entry_count() as i64); } #[tracing::instrument(skip_all)] @@ -191,7 +191,7 @@ impl RouteCache { .invalidate(&(hostname.to_owned(), path.to_owned())) .await; - metrics::ROUTE_CACHE_SIZE.set(self.cache.entry_count() as i64); + metrics::ROUTE_CACHE_COUNT.set(self.cache.entry_count() as i64); } } @@ -1166,7 +1166,9 @@ impl ProxyService { let target_url = format!("ws://{}:{}{}", target.host, target.port, target.path); tracing::debug!( "WebSocket request attempt {}/{} to {}", - attempts, max_attempts, target_url + attempts, + max_attempts, + target_url ); match tokio::time::timeout( @@ -1177,7 +1179,10 @@ impl ProxyService { { Ok(Ok((ws_stream, resp))) => { tracing::debug!("Successfully connected to upstream WebSocket server"); - tracing::debug!("Upstream connection response status: {:?}", resp.status()); + tracing::debug!( + "Upstream connection response status: {:?}", + resp.status() + ); // Log headers for debugging for (name, value) in resp.headers() { @@ -1193,16 +1198,24 @@ impl ProxyService { tracing::debug!(?err, "WebSocket request attempt {} failed", attempts); } Err(_) => { - tracing::debug!("WebSocket request attempt {} timed out after 5s", attempts); + tracing::debug!( + "WebSocket request attempt {} timed out after 5s", + attempts + ); } } // Check if we've reached max attempts if attempts >= max_attempts { - tracing::debug!("All {} WebSocket connection attempts failed", max_attempts); + tracing::debug!( + "All {} WebSocket connection attempts failed", + max_attempts + ); // Send a close message to the client since we can't connect to upstream - tracing::debug!("Sending close message to client due to upstream connection failure"); + tracing::debug!( + "Sending close message to client due to upstream connection failure" + ); let (mut client_sink, _) = client_ws.split(); match client_sink .send(hyper_tungstenite::tungstenite::Message::Close(Some( @@ -1214,12 +1227,18 @@ impl ProxyService { .await { Ok(_) => tracing::debug!("Successfully sent close message to client"), - Err(err) => tracing::error!(?err, "Failed to send close message to client"), + Err(err) => { + tracing::error!(?err, "Failed to send close message to client") + } }; match client_sink.flush().await { - Ok(_) => tracing::debug!("Successfully flushed client sink after close"), - Err(err) => tracing::error!(?err, "Failed to flush client sink after close"), + Ok(_) => { + tracing::debug!("Successfully flushed client sink after close") + } + Err(err) => { + tracing::error!(?err, "Failed to flush client sink after close") + } }; return; @@ -1254,7 +1273,9 @@ impl ProxyService { ws } Option::None => { - tracing::error!("Failed to establish upstream WebSocket connection (unexpected)"); + tracing::error!( + "Failed to establish upstream WebSocket connection (unexpected)" + ); return; // Should never happen due to checks above, but just in case } }; @@ -1404,12 +1425,16 @@ impl ProxyService { .await { Ok(_) => tracing::debug!("Close message sent to upstream successfully"), - Err(err) => tracing::debug!(?err, "Failed to send close message to upstream"), + Err(err) => { + tracing::debug!(?err, "Failed to send close message to upstream") + } }; match sink.flush().await { Ok(_) => tracing::debug!("Upstream sink flushed successfully after close"), - Err(err) => tracing::debug!(?err, "Failed to flush upstream sink after close"), + Err(err) => { + tracing::debug!(?err, "Failed to flush upstream sink after close") + } }; tracing::debug!("Client-to-upstream task completed"); @@ -1556,7 +1581,9 @@ impl ProxyService { match sink.flush().await { Ok(_) => tracing::debug!("Client sink flushed successfully after close"), - Err(err) => tracing::debug!(?err, "Failed to flush client sink after close"), + Err(err) => { + tracing::debug!(?err, "Failed to flush client sink after close") + } }; tracing::debug!("Upstream-to-client task completed"); diff --git a/packages/edge/infra/guard/core/src/server.rs b/packages/edge/infra/guard/core/src/server.rs index cf6d9ebcfe..43254e6e4d 100644 --- a/packages/edge/infra/guard/core/src/server.rs +++ b/packages/edge/infra/guard/core/src/server.rs @@ -51,39 +51,40 @@ pub async fn run_server( let http_listener = tokio::net::TcpListener::bind(http_addr).await?; // Set up HTTPS server (if configured) - let (https_addr, https_factory, https_listener, https_acceptor) = - if let Some(https) = &guard_config.https { - let https_addr: std::net::SocketAddr = ([0, 0, 0, 0], https.port).into(); - let https_factory = Arc::new(ProxyServiceFactory::new( - config.clone(), - routing_fn.clone(), - middleware_fn.clone(), - crate::proxy_service::PortType::Https, - clickhouse_inserter.clone(), - )); - let listener = tokio::net::TcpListener::bind(https_addr).await?; - - // Configure TLS if resolver function is provided - let acceptor = if let Some(resolver_fn) = cert_resolver_fn { - // Create a TLS server config using our certificate resolver - let server_config = create_tls_config(resolver_fn); - - Some(TlsAcceptor::from(Arc::new(server_config))) - } else { - tracing::warn!("No TLS certificate resolver provided, HTTPS will not work properly"); - None - }; - - ( - Some(https_addr), - Some(https_factory), - Some(listener), - acceptor, - ) + let (https_addr, https_factory, https_listener, https_acceptor) = if let Some(https) = + &guard_config.https + { + let https_addr: std::net::SocketAddr = ([0, 0, 0, 0], https.port).into(); + let https_factory = Arc::new(ProxyServiceFactory::new( + config.clone(), + routing_fn.clone(), + middleware_fn.clone(), + crate::proxy_service::PortType::Https, + clickhouse_inserter.clone(), + )); + let listener = tokio::net::TcpListener::bind(https_addr).await?; + + // Configure TLS if resolver function is provided + let acceptor = if let Some(resolver_fn) = cert_resolver_fn { + // Create a TLS server config using our certificate resolver + let server_config = create_tls_config(resolver_fn); + + Some(TlsAcceptor::from(Arc::new(server_config))) } else { - (None, None, None, None) + tracing::warn!("No TLS certificate resolver provided, HTTPS will not work properly"); + None }; + ( + Some(https_addr), + Some(https_factory), + Some(listener), + acceptor, + ) + } else { + (None, None, None, None) + }; + // Set up server builder and graceful shutdown let server = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new()); let graceful = hyper_util::server::graceful::GracefulShutdown::new(); diff --git a/packages/edge/services/pegboard/src/protocol.rs b/packages/edge/services/pegboard/src/protocol.rs index 3aaef0db97..d1b1b609e4 100644 --- a/packages/edge/services/pegboard/src/protocol.rs +++ b/packages/edge/services/pegboard/src/protocol.rs @@ -28,8 +28,7 @@ pub enum ToClient { }, Commands(Vec), PrewarmImage { - image_id: Uuid, - image_artifact_url_stub: String, + image: Image, }, } @@ -130,6 +129,16 @@ pub enum ImageKind { JavaScript, } +impl From for ImageKind { + fn from(kind: build::types::BuildKind) -> Self { + match kind { + build::types::BuildKind::DockerImage => ImageKind::DockerImage, + build::types::BuildKind::OciBundle => ImageKind::OciBundle, + build::types::BuildKind::JavaScript => ImageKind::JavaScript, + } + } +} + impl ImageKind { pub fn client_flavor(&self) -> ClientFlavor { match self { @@ -146,6 +155,15 @@ pub enum ImageCompression { Lz4, } +impl From for ImageCompression { + fn from(compression: build::types::BuildCompression) -> Self { + match compression { + build::types::BuildCompression::None => ImageCompression::None, + build::types::BuildCompression::Lz4 => ImageCompression::Lz4, + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone, Hash)] #[serde(rename_all = "snake_case")] pub struct Port { diff --git a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs index da9481b9f2..dd4c6594c1 100644 --- a/packages/edge/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/edge/services/pegboard/src/workflows/actor/runtime.rs @@ -1,6 +1,6 @@ use std::time::Instant; -use build::types::{BuildCompression, BuildKind}; +use build::types::BuildKind; use chirp_workflow::prelude::*; use fdb_util::{end_of_key_range, FormalKey, SERIALIZABLE, SNAPSHOT}; use foundationdb::{ @@ -685,15 +685,8 @@ pub async fn spawn_actor( id: actor_setup.image_id, artifact_url_stub: actor_setup.artifact_url_stub.clone(), fallback_artifact_url: actor_setup.fallback_artifact_url.clone(), - kind: match actor_setup.meta.build_kind { - BuildKind::DockerImage => protocol::ImageKind::DockerImage, - BuildKind::OciBundle => protocol::ImageKind::OciBundle, - BuildKind::JavaScript => protocol::ImageKind::JavaScript, - }, - compression: match actor_setup.meta.build_compression { - BuildCompression::None => protocol::ImageCompression::None, - BuildCompression::Lz4 => protocol::ImageCompression::Lz4, - }, + kind: actor_setup.meta.build_kind.into(), + compression: actor_setup.meta.build_compression.into(), }, root_user_enabled: input.root_user_enabled, env: input.environment.clone(), @@ -803,15 +796,14 @@ pub async fn reschedule_actor( let mut backoff = util::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, state.retry_count); - let (now, reset) = ctx.v(2).activity(CompareRetryInput { - last_retry_ts: state.last_retry_ts, - }).await?; + let (now, reset) = ctx + .v(2) + .activity(CompareRetryInput { + last_retry_ts: state.last_retry_ts, + }) + .await?; - state.retry_count = if reset { - 0 - } else { - state.retry_count + 1 - }; + state.retry_count = if reset { 0 } else { state.retry_count + 1 }; state.last_retry_ts = now; // Don't sleep for first retry diff --git a/packages/edge/services/pegboard/src/workflows/client/mod.rs b/packages/edge/services/pegboard/src/workflows/client/mod.rs index 62f25b3f56..0f9fa637a8 100644 --- a/packages/edge/services/pegboard/src/workflows/client/mod.rs +++ b/packages/edge/services/pegboard/src/workflows/client/mod.rs @@ -180,13 +180,10 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu ) .await?; } - Some(Main::PrewarmImage(sig)) => { + Some(Main::PrewarmImage2(sig)) => { ctx.msg(ToWs { client_id, - inner: protocol::ToClient::PrewarmImage { - image_id: sig.image_id, - image_artifact_url_stub: sig.image_artifact_url_stub, - }, + inner: protocol::ToClient::PrewarmImage { image: sig.image }, }) .send() .await?; @@ -1146,10 +1143,9 @@ pub struct ToWs { pub inner: protocol::ToClient, } -#[signal("pegboard_prewarm_image")] -pub struct PrewarmImage { - pub image_id: Uuid, - pub image_artifact_url_stub: String, +#[signal("pegboard_prewarm_image2")] +pub struct PrewarmImage2 { + pub image: protocol::Image, } #[message("pegboard_client_close_ws")] pub struct CloseWs { @@ -1168,7 +1164,7 @@ join_signal!(Main { Command(protocol::Command), // Forwarded from the ws to this workflow Forward(protocol::ToServer), - PrewarmImage, + PrewarmImage2, Drain, Undrain, }); diff --git a/packages/edge/services/pegboard/standalone/usage-metrics-publish/Cargo.toml b/packages/edge/services/pegboard/standalone/usage-metrics-publish/Cargo.toml index dc122c6c57..d498c3e029 100644 --- a/packages/edge/services/pegboard/standalone/usage-metrics-publish/Cargo.toml +++ b/packages/edge/services/pegboard/standalone/usage-metrics-publish/Cargo.toml @@ -13,14 +13,15 @@ fdb-util.workspace = true rivet-connection.workspace = true rivet-health-checks.workspace = true rivet-metrics.workspace = true +rivet-config.workspace = true rivet-runtime.workspace = true tokio.workspace = true tracing = "0.1" tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] } -pegboard.workspace = true +cluster.workspace = true build.workspace = true -rivet-config.workspace = true +pegboard.workspace = true [dependencies.sqlx] workspace = true diff --git a/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs index 174af526df..d3ccdbb824 100644 --- a/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs +++ b/packages/edge/services/pegboard/standalone/usage-metrics-publish/src/lib.rs @@ -39,6 +39,17 @@ pub async fn run_from_env( ) .await?; + let dc_id = ctx.config().server()?.rivet.edge()?.datacenter_id; + let dc_res = ctx + .op(cluster::ops::datacenter::get::Input { + datacenter_ids: vec![dc_id], + }) + .await?; + if dc_res.datacenters.is_empty() { + tracing::debug!("cluster not initialized"); + return Ok(()); + } + // List all actor ids that are currently running let actor_ids = ctx .fdb() diff --git a/sdks/api/fern/definition/core-intercom/pegboard/__package__.yml b/sdks/api/fern/definition/core-intercom/pegboard/__package__.yml index 04c37fb409..6128df2f0c 100644 --- a/sdks/api/fern/definition/core-intercom/pegboard/__package__.yml +++ b/sdks/api/fern/definition/core-intercom/pegboard/__package__.yml @@ -14,4 +14,4 @@ service: types: MarkClientRegisteredRequest: properties: - server_id: uuid \ No newline at end of file + server_id: uuid diff --git a/sdks/api/fern/definition/edge-intercom/pegboard/__package__.yml b/sdks/api/fern/definition/edge-intercom/pegboard/__package__.yml index 09e78df9ff..2a67662741 100644 --- a/sdks/api/fern/definition/edge-intercom/pegboard/__package__.yml +++ b/sdks/api/fern/definition/edge-intercom/pegboard/__package__.yml @@ -22,9 +22,8 @@ service: types: PrewarmImageRequest: - properties: - image_artifact_url_stub: string - + properties: {} + ToggleClientDrainRequest: properties: draining: boolean diff --git a/sdks/api/full/go/edgeintercom/pegboard/pegboard.go b/sdks/api/full/go/edgeintercom/pegboard/pegboard.go index 2c54df660a..b24fb550fe 100644 --- a/sdks/api/full/go/edgeintercom/pegboard/pegboard.go +++ b/sdks/api/full/go/edgeintercom/pegboard/pegboard.go @@ -10,8 +10,6 @@ import ( ) type PrewarmImageRequest struct { - ImageArtifactUrlStub string `json:"image_artifact_url_stub"` - _rawJSON json.RawMessage } diff --git a/sdks/api/full/openapi/openapi.yml b/sdks/api/full/openapi/openapi.yml index c34da7aa9e..c655b8c7e2 100644 --- a/sdks/api/full/openapi/openapi.yml +++ b/sdks/api/full/openapi/openapi.yml @@ -10064,11 +10064,7 @@ components: - server_id EdgeIntercomPegboardPrewarmImageRequest: type: object - properties: - image_artifact_url_stub: - type: string - required: - - image_artifact_url_stub + properties: {} EdgeIntercomPegboardToggleClientDrainRequest: type: object properties: diff --git a/sdks/api/full/openapi_compat/openapi.yml b/sdks/api/full/openapi_compat/openapi.yml index 82469df3a6..847d0b4b14 100644 --- a/sdks/api/full/openapi_compat/openapi.yml +++ b/sdks/api/full/openapi_compat/openapi.yml @@ -10064,11 +10064,7 @@ components: - server_id EdgeIntercomPegboardPrewarmImageRequest: type: object - properties: - image_artifact_url_stub: - type: string - required: - - image_artifact_url_stub + properties: {} EdgeIntercomPegboardToggleClientDrainRequest: type: object properties: diff --git a/sdks/api/full/rust/.openapi-generator/FILES b/sdks/api/full/rust/.openapi-generator/FILES index 052f513e9b..82be060260 100644 --- a/sdks/api/full/rust/.openapi-generator/FILES +++ b/sdks/api/full/rust/.openapi-generator/FILES @@ -218,7 +218,6 @@ docs/CloudVersionSummary.md docs/CoreIntercomPegboardApi.md docs/CoreIntercomPegboardMarkClientRegisteredRequest.md docs/EdgeIntercomPegboardApi.md -docs/EdgeIntercomPegboardPrewarmImageRequest.md docs/EdgeIntercomPegboardToggleClientDrainRequest.md docs/ErrorBody.md docs/GameGameSummary.md @@ -628,7 +627,6 @@ src/models/cloud_version_matchmaker_port_range.rs src/models/cloud_version_matchmaker_proxy_kind.rs src/models/cloud_version_summary.rs src/models/core_intercom_pegboard_mark_client_registered_request.rs -src/models/edge_intercom_pegboard_prewarm_image_request.rs src/models/edge_intercom_pegboard_toggle_client_drain_request.rs src/models/error_body.rs src/models/game_game_summary.rs diff --git a/sdks/api/full/rust/README.md b/sdks/api/full/rust/README.md index fdf0fbd35c..bed765e99a 100644 --- a/sdks/api/full/rust/README.md +++ b/sdks/api/full/rust/README.md @@ -363,7 +363,6 @@ Class | Method | HTTP request | Description - [CloudVersionMatchmakerProxyKind](docs/CloudVersionMatchmakerProxyKind.md) - [CloudVersionSummary](docs/CloudVersionSummary.md) - [CoreIntercomPegboardMarkClientRegisteredRequest](docs/CoreIntercomPegboardMarkClientRegisteredRequest.md) - - [EdgeIntercomPegboardPrewarmImageRequest](docs/EdgeIntercomPegboardPrewarmImageRequest.md) - [EdgeIntercomPegboardToggleClientDrainRequest](docs/EdgeIntercomPegboardToggleClientDrainRequest.md) - [ErrorBody](docs/ErrorBody.md) - [GameGameSummary](docs/GameGameSummary.md) diff --git a/sdks/api/full/rust/docs/EdgeIntercomPegboardApi.md b/sdks/api/full/rust/docs/EdgeIntercomPegboardApi.md index eca99556a7..67062599a2 100644 --- a/sdks/api/full/rust/docs/EdgeIntercomPegboardApi.md +++ b/sdks/api/full/rust/docs/EdgeIntercomPegboardApi.md @@ -11,7 +11,7 @@ Method | HTTP request | Description ## edge_intercom_pegboard_prewarm_image -> edge_intercom_pegboard_prewarm_image(image_id, edge_intercom_pegboard_prewarm_image_request) +> edge_intercom_pegboard_prewarm_image(image_id, body) ### Parameters @@ -20,7 +20,7 @@ Method | HTTP request | Description Name | Type | Description | Required | Notes ------------- | ------------- | ------------- | ------------- | ------------- **image_id** | **uuid::Uuid** | | [required] | -**edge_intercom_pegboard_prewarm_image_request** | [**EdgeIntercomPegboardPrewarmImageRequest**](EdgeIntercomPegboardPrewarmImageRequest.md) | | [required] | +**body** | **serde_json::Value** | | [required] | ### Return type diff --git a/sdks/api/full/rust/docs/EdgeIntercomPegboardPrewarmImageRequest.md b/sdks/api/full/rust/docs/EdgeIntercomPegboardPrewarmImageRequest.md deleted file mode 100644 index dca24ef72c..0000000000 --- a/sdks/api/full/rust/docs/EdgeIntercomPegboardPrewarmImageRequest.md +++ /dev/null @@ -1,11 +0,0 @@ -# EdgeIntercomPegboardPrewarmImageRequest - -## Properties - -Name | Type | Description | Notes ------------- | ------------- | ------------- | ------------- -**image_artifact_url_stub** | **String** | | - -[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) - - diff --git a/sdks/api/full/rust/src/apis/edge_intercom_pegboard_api.rs b/sdks/api/full/rust/src/apis/edge_intercom_pegboard_api.rs index 70a4e65e79..8d0442fab5 100644 --- a/sdks/api/full/rust/src/apis/edge_intercom_pegboard_api.rs +++ b/sdks/api/full/rust/src/apis/edge_intercom_pegboard_api.rs @@ -42,7 +42,7 @@ pub enum EdgeIntercomPegboardToggleClientDrainError { pub async fn edge_intercom_pegboard_prewarm_image( configuration: &configuration::Configuration, image_id: &str, - edge_intercom_pegboard_prewarm_image_request: crate::models::EdgeIntercomPegboardPrewarmImageRequest, + body: serde_json::Value, ) -> Result<(), Error> { let local_var_configuration = configuration; @@ -63,8 +63,7 @@ pub async fn edge_intercom_pegboard_prewarm_image( if let Some(ref local_var_token) = local_var_configuration.bearer_access_token { local_var_req_builder = local_var_req_builder.bearer_auth(local_var_token.to_owned()); }; - local_var_req_builder = - local_var_req_builder.json(&edge_intercom_pegboard_prewarm_image_request); + local_var_req_builder = local_var_req_builder.json(&body); let local_var_req = local_var_req_builder.build()?; let local_var_resp = local_var_client.execute(local_var_req).await?; diff --git a/sdks/api/full/rust/src/models/edge_intercom_pegboard_prewarm_image_request.rs b/sdks/api/full/rust/src/models/edge_intercom_pegboard_prewarm_image_request.rs deleted file mode 100644 index 1de13fec1d..0000000000 --- a/sdks/api/full/rust/src/models/edge_intercom_pegboard_prewarm_image_request.rs +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Rivet API - * - * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) - * - * The version of the OpenAPI document: 0.0.1 - * - * Generated by: https://openapi-generator.tech - */ - -#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] -pub struct EdgeIntercomPegboardPrewarmImageRequest { - #[serde(rename = "image_artifact_url_stub")] - pub image_artifact_url_stub: String, -} - -impl EdgeIntercomPegboardPrewarmImageRequest { - pub fn new(image_artifact_url_stub: String) -> EdgeIntercomPegboardPrewarmImageRequest { - EdgeIntercomPegboardPrewarmImageRequest { - image_artifact_url_stub, - } - } -} diff --git a/sdks/api/full/rust/src/models/mod.rs b/sdks/api/full/rust/src/models/mod.rs index b8d864a9ef..414f7e4832 100644 --- a/sdks/api/full/rust/src/models/mod.rs +++ b/sdks/api/full/rust/src/models/mod.rs @@ -380,8 +380,6 @@ pub mod cloud_version_summary; pub use self::cloud_version_summary::CloudVersionSummary; pub mod core_intercom_pegboard_mark_client_registered_request; pub use self::core_intercom_pegboard_mark_client_registered_request::CoreIntercomPegboardMarkClientRegisteredRequest; -pub mod edge_intercom_pegboard_prewarm_image_request; -pub use self::edge_intercom_pegboard_prewarm_image_request::EdgeIntercomPegboardPrewarmImageRequest; pub mod edge_intercom_pegboard_toggle_client_drain_request; pub use self::edge_intercom_pegboard_toggle_client_drain_request::EdgeIntercomPegboardToggleClientDrainRequest; pub mod error_body; diff --git a/sdks/api/full/typescript/src/api/resources/edgeIntercom/resources/pegboard/client/Client.ts b/sdks/api/full/typescript/src/api/resources/edgeIntercom/resources/pegboard/client/Client.ts index 4747aa5f41..aba14bb267 100644 --- a/sdks/api/full/typescript/src/api/resources/edgeIntercom/resources/pegboard/client/Client.ts +++ b/sdks/api/full/typescript/src/api/resources/edgeIntercom/resources/pegboard/client/Client.ts @@ -50,9 +50,7 @@ export class Pegboard { * @throws {@link Rivet.BadRequestError} * * @example - * await client.edgeIntercom.pegboard.prewarmImage("d5e9c84f-c2b2-4bf4-b4b0-7ffd7a9ffc32", { - * imageArtifactUrlStub: "string" - * }) + * await client.edgeIntercom.pegboard.prewarmImage("d5e9c84f-c2b2-4bf4-b4b0-7ffd7a9ffc32", {}) */ public async prewarmImage( imageId: string, diff --git a/sdks/api/full/typescript/src/api/resources/edgeIntercom/resources/pegboard/types/PrewarmImageRequest.ts b/sdks/api/full/typescript/src/api/resources/edgeIntercom/resources/pegboard/types/PrewarmImageRequest.ts index 2ed27ceac5..821b5fb70a 100644 --- a/sdks/api/full/typescript/src/api/resources/edgeIntercom/resources/pegboard/types/PrewarmImageRequest.ts +++ b/sdks/api/full/typescript/src/api/resources/edgeIntercom/resources/pegboard/types/PrewarmImageRequest.ts @@ -2,6 +2,4 @@ * This file was auto-generated by Fern from our API Definition. */ -export interface PrewarmImageRequest { - imageArtifactUrlStub: string; -} +export interface PrewarmImageRequest {} diff --git a/sdks/api/full/typescript/src/serialization/resources/edgeIntercom/resources/pegboard/types/PrewarmImageRequest.ts b/sdks/api/full/typescript/src/serialization/resources/edgeIntercom/resources/pegboard/types/PrewarmImageRequest.ts index d2b0c182e8..8c1977edd0 100644 --- a/sdks/api/full/typescript/src/serialization/resources/edgeIntercom/resources/pegboard/types/PrewarmImageRequest.ts +++ b/sdks/api/full/typescript/src/serialization/resources/edgeIntercom/resources/pegboard/types/PrewarmImageRequest.ts @@ -9,12 +9,8 @@ import * as core from "../../../../../../core"; export const PrewarmImageRequest: core.serialization.ObjectSchema< serializers.edgeIntercom.pegboard.PrewarmImageRequest.Raw, Rivet.edgeIntercom.pegboard.PrewarmImageRequest -> = core.serialization.object({ - imageArtifactUrlStub: core.serialization.property("image_artifact_url_stub", core.serialization.string()), -}); +> = core.serialization.object({}); export declare namespace PrewarmImageRequest { - export interface Raw { - image_artifact_url_stub: string; - } + export interface Raw {} } diff --git a/tests/load/actor-lifecycle/actor.ts b/tests/load/actor-lifecycle/actor.ts index d38aa5b67d..088bdff46c 100644 --- a/tests/load/actor-lifecycle/actor.ts +++ b/tests/load/actor-lifecycle/actor.ts @@ -19,11 +19,11 @@ export function createActor(config: Config): CreateActorResponse { }, ...(config.buildName === "ws-container" ? { - resources: { - cpu: 125, - memory: 128, - }, - } + resources: { + cpu: 125, + memory: 128, + }, + } : {}), lifecycle: { durable: false }, }); diff --git a/tests/load/actor-lifecycle/index.ts b/tests/load/actor-lifecycle/index.ts index e31a18d00c..c9f7f86f26 100644 --- a/tests/load/actor-lifecycle/index.ts +++ b/tests/load/actor-lifecycle/index.ts @@ -47,18 +47,18 @@ export default function () { if (!isHealthy) fail("actor did not become healthy"); } - // // Test WebSocket if not disabled - // if (!CONFIG.disableWebsocket) { - // const wsUrl = `${actorOrigin.replace("http:", "ws:").replace("https:", "wss:")}/ws`; - // testWebSocket(wsUrl); - // } - // - // // Sleep if not disabled - // if (!CONFIG.disableSleep) { - // const sleepDuration = (start + 60_000 - Date.now()) / 1000; - // console.log(`sleeping for ${sleepDuration}s`); - // sleep(sleepDuration); - // } + // Test WebSocket if not disabled + if (!CONFIG.disableWebsocket) { + const wsUrl = `${actorOrigin.replace("http:", "ws:").replace("https:", "wss:")}/ws`; + testWebSocket(wsUrl); + } + + // Sleep if not disabled + if (!CONFIG.disableSleep) { + const sleepDuration = (start + 60_000 - Date.now()) / 1000; + console.log(`sleeping for ${sleepDuration}s`); + sleep(sleepDuration); + } } finally { // Cleanup if (actorId) {