diff --git a/tinycloud-core/src/db.rs b/tinycloud-core/src/db.rs index 9e2c848..8c3d63a 100644 --- a/tinycloud-core/src/db.rs +++ b/tinycloud-core/src/db.rs @@ -173,6 +173,10 @@ where .begin_with_config(None, Some(sea_orm::AccessMode::ReadOnly)) .await } + + pub async fn writable(&self) -> Result { + self.conn.begin().await + } } impl SpaceDatabase @@ -1188,22 +1192,7 @@ async fn get_kv_entity( key: &Path, // TODO version: Option<(i64, Hash, i64)>, ) -> Result, DbErr> { - // Ok(if let Some((seq, epoch, epoch_seq)) = version { - // event_order::Entity::find_by_id((epoch, epoch_seq, space_id.clone().into())) - // .reverse_join(kv_write::Entity) - // .find_also_related(kv_delete::Entity) - // .filter( - // Condition::all() - // .add(kv_write::Column::Key.eq(key)) - // .add(kv_write::Column::Space.eq(space_id.clone().into())) - // .add(kv_delete::Column::InvocationId.is_null()), - // ) - // .one(db) - // .await? - // .map(|(kv, _)| kv) - // } else { - // we want to find the latest kv_write which is not deleted - Ok(kv_write::Entity::find() + let latest_write = kv_write::Entity::find() .filter( Condition::all() .add(kv_write::Column::Key.eq(key.as_str())) @@ -1212,11 +1201,51 @@ async fn get_kv_entity( .order_by_desc(kv_write::Column::Seq) .order_by_desc(kv_write::Column::Epoch) .order_by_desc(kv_write::Column::EpochSeq) - .find_also_related(kv_delete::Entity) - .filter(kv_delete::Column::InvocationId.is_null()) .one(db) + .await?; + + let latest_write_order = latest_write + .as_ref() + .map(|row| (row.seq, row.epoch, row.epoch_seq)); + + let mut latest_delete_order: Option<(i64, Hash, i64)> = None; + for delete in kv_delete::Entity::find() + .filter( + Condition::all() + .add(kv_delete::Column::Key.eq(key.as_str())) + .add(kv_delete::Column::Space.eq(SpaceIdWrap(space_id.clone()))), + ) + .all(db) .await? - .map(|(kv, _)| kv)) + { + let Some(delete_order) = event_order::Entity::find() + .filter(event_order::Column::Space.eq(SpaceIdWrap(space_id.clone()))) + .filter(event_order::Column::Event.eq(delete.invocation_id)) + .one(db) + .await? + else { + continue; + }; + + let candidate = (delete_order.seq, delete_order.epoch, delete_order.epoch_seq); + if latest_delete_order + .as_ref() + .map(|current| candidate > *current) + .unwrap_or(true) + { + latest_delete_order = Some(candidate); + } + } + + if latest_delete_order + .as_ref() + .zip(latest_write_order.as_ref()) + .is_some_and(|(delete_order, write_order)| delete_order > write_order) + { + return Ok(None); + } + + Ok(latest_write) } async fn get_valid_delegations( diff --git a/tinycloud-core/src/sql/service.rs b/tinycloud-core/src/sql/service.rs index ef67ec3..cef695f 100644 --- a/tinycloud-core/src/sql/service.rs +++ b/tinycloud-core/src/sql/service.rs @@ -7,6 +7,7 @@ use dashmap::DashMap; use tinycloud_auth::resource::SpaceId; use crate::database_artifacts::{DatabaseArtifactError, DatabaseArtifactRepository}; +use crate::hash::hash; use super::{ caveats::SqlCaveats, @@ -83,31 +84,56 @@ impl SqlService { pub async fn export(&self, space: &SpaceId, db_name: &str) -> Result, SqlError> { let key = (space.to_string(), db_name.to_string()); + let artifact = self + .artifact_repository + .load("sql", &space.to_string(), db_name) + .await + .map_err(artifact_error_to_sql)?; - // If there's a live actor, route through it (handles both in-memory and file-backed) if let Some(handle) = self.databases.get(&key).map(|h| h.clone()) { - match handle.export().await { - Err(SqlError::Internal(ref msg)) - if msg.contains("Database actor not available") => - { - // Actor is dead — remove stale entry and fall through to cold read - tracing::warn!(space=%space, db=%db_name, "Dead SQL actor detected during export, removing"); - self.databases.remove(&key); + if let Some(artifact) = artifact.as_ref() { + match handle.export().await { + Ok(payload) => { + let live_hash = hash(&payload).to_cid(0x55).to_string(); + if live_hash == artifact.content_hash { + return Ok(payload); + } + } + Err(SqlError::Internal(ref msg)) + if msg.contains("Database actor not available") => + { + tracing::warn!( + space=%space, + db=%db_name, + "Dead SQL actor detected during export, removing" + ); + } + Err(err) => return Err(err), } - other => return other, + + let _ = self.discard_local_state(&key).await; + } else { + return handle.export().await; } } - match self - .artifact_repository - .load("sql", &space.to_string(), db_name) - .await - .map_err(artifact_error_to_sql)? - { - Some(artifact) => { - remove_sql_cache_files(&self.cache_path(space, db_name)).await?; - write_cache_file(&self.cache_path(space, db_name), &artifact.payload).await?; - Ok(artifact.payload) + match artifact { + Some(_) => { + self.hydrate_cache(space, db_name).await?; + let handle = self + .databases + .entry(key) + .or_insert_with(|| { + spawn_actor( + space.to_string(), + db_name.to_string(), + self.base_path.clone(), + self.memory_threshold, + self.databases.clone(), + ) + }) + .clone(); + handle.export().await } None => Err(SqlError::DatabaseNotFound), } @@ -120,8 +146,40 @@ impl SqlService { async fn handle(&self, space: &SpaceId, db_name: &str) -> Result { let key = (space.to_string(), db_name.to_string()); + + // If reconciliation replaced the durable artifact, drop any stale live + // actor so the next access rehydrates from the winning payload. + let artifact = self + .artifact_repository + .load("sql", &space.to_string(), db_name) + .await + .map_err(artifact_error_to_sql)?; + if let Some(handle) = self.databases.get(&key).map(|h| h.clone()) { - return Ok(handle); + if let Some(artifact) = artifact.as_ref() { + match handle.export().await { + Ok(payload) => { + let live_hash = hash(&payload).to_cid(0x55).to_string(); + if live_hash == artifact.content_hash { + return Ok(handle); + } + } + Err(SqlError::Internal(ref msg)) + if msg.contains("Database actor not available") => + { + tracing::warn!( + space=%space, + db=%db_name, + "Dead SQL actor detected during refresh, removing" + ); + } + Err(err) => return Err(err), + } + + let _ = self.discard_local_state(&key).await; + } else { + return Ok(handle); + } } self.hydrate_cache(space, db_name).await?; diff --git a/tinycloud-node-server/src/lib.rs b/tinycloud-node-server/src/lib.rs index 2d72850..2101dfa 100644 --- a/tinycloud-node-server/src/lib.rs +++ b/tinycloud-node-server/src/lib.rs @@ -40,7 +40,7 @@ use routes::{ hooks::{create_hook_ticket, create_webhook, delete_webhook, hook_events, list_webhooks}, info, invoke, open_host_key, public::{public_kv_get, public_kv_head, public_kv_list, public_kv_options, RateLimiter}, - signed_kv_get, + reconciliation_ranges, reconciliation_snapshot, reconciliation_snapshot_apply, signed_kv_get, util_routes::*, version, }; @@ -125,6 +125,9 @@ pub async fn app(config: &Figment) -> Result> { delegate, create_signed_kv_url, signed_kv_get, + reconciliation_ranges, + reconciliation_snapshot, + reconciliation_snapshot_apply, create_hook_ticket, hook_events, create_webhook, diff --git a/tinycloud-node-server/src/routes/mod.rs b/tinycloud-node-server/src/routes/mod.rs index dde3641..4152d57 100644 --- a/tinycloud-node-server/src/routes/mod.rs +++ b/tinycloud-node-server/src/routes/mod.rs @@ -2,9 +2,13 @@ use anyhow::Result; use futures::io::AsyncWriteExt; use percent_encoding::percent_decode_str; use rocket::{data::ToByteUnit, http::Status, serde::json::Json, State}; -use serde::Serialize; -use std::collections::{BTreeMap, HashMap, HashSet}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + path::{Path as StdPath, PathBuf}, +}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; +use tinycloud_auth::identity::did_principal_matches; use tinycloud_auth::resource::{Path, SpaceId}; use tokio::io::AsyncReadExt; use tokio_util::compat::TokioAsyncReadCompatExt; @@ -22,7 +26,7 @@ use crate::{ validate_signed_kv_ticket, SignedKvUrlRequest, SignedKvUrlResponse, SignedUrlRuntime, }, tracing::TracingSpan, - BlockStage, BlockStores, TinyCloud, + BlockConfig, BlockStage, BlockStores, TinyCloud, }; #[cfg(feature = "duckdb")] use tinycloud_core::duckdb::{ @@ -31,9 +35,18 @@ use tinycloud_core::duckdb::{ use tinycloud_core::{ encryption_network::EncryptionService, events::Invocation, - models::{hook_delivery, hook_subscription, kv_delete, kv_write}, - sea_orm::DbErr, - sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder}, + hash::Hash, + models::{ + abilities, actor, database_artifact, delegation, epoch, hook_delivery, hook_subscription, + invocation, kv_delete, kv_write, revocation, space, + }, + relationships::{epoch_order, event_order, invoked_abilities, parent_delegations}, + sea_orm::sea_query::OnConflict, + sea_orm::ActiveValue::Set, + sea_orm::{ + self, ActiveModelTrait, ColumnTrait, ConnectionTrait, DbErr, EntityTrait, QueryFilter, + QueryOrder, + }, sql::{SqlCaveats, SqlError, SqlRequest, SqlService}, storage::{HashBuffer, ImmutableReadStore, ImmutableStaging}, types::{Metadata, Resource}, @@ -103,6 +116,1380 @@ pub fn version( Json(build_info(tee, quota_cache, encryption)) } +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalFactRangeResponse { + pub node_id: String, + pub planes: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalFactPlaneRange { + pub name: &'static str, + pub empty: bool, + pub tables: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalFactTableRange { + pub name: &'static str, + pub empty: bool, + pub count: usize, + pub start: Option, + pub end: Option, + pub keys: Vec, +} + +fn hash_to_cid(hash: Hash) -> String { + hash.to_cid(0x55).to_string() +} + +fn build_table_range(name: &'static str, mut keys: Vec) -> LocalFactTableRange { + keys.sort_unstable(); + keys.dedup(); + + let count = keys.len(); + let empty = count == 0; + let start = keys.first().cloned(); + let end = keys.last().cloned(); + + LocalFactTableRange { + name, + empty, + count, + start, + end, + keys, + } +} + +fn build_plane_range(name: &'static str, tables: Vec) -> LocalFactPlaneRange { + let empty = tables.iter().all(|table| table.empty); + + LocalFactPlaneRange { + name, + empty, + tables, + } +} + +fn encode_base64(bytes: &[u8]) -> String { + base64::encode(bytes) +} + +fn decode_base64(value: &str) -> Result, String> { + base64::decode(value).map_err(|err| err.to_string()) +} + +fn format_rfc3339(value: &OffsetDateTime) -> String { + value + .format(&Rfc3339) + .expect("current timestamps should format as RFC3339") +} + +fn parse_rfc3339(value: Option) -> Result, String> { + match value { + Some(value) => OffsetDateTime::parse(&value, &Rfc3339) + .map(Some) + .map_err(|err| err.to_string()), + None => Ok(None), + } +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationSnapshot { + pub node_id: String, + pub spaces: Vec, + pub actors: Vec, + pub delegations: Vec, + pub revocations: Vec, + pub abilities: Vec, + pub parent_delegations: Vec, + pub invocations: Vec, + pub invoked_abilities: Vec, + pub kv_writes: Vec, + pub kv_deletes: Vec, + pub database_artifacts: Vec, + pub epochs: Vec, + pub epoch_orders: Vec, + pub event_orders: Vec, + pub blocks: Vec, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationSpace { + pub id: tinycloud_core::types::SpaceIdWrap, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationActor { + pub id: String, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationDelegation { + pub id: String, + pub delegator: String, + pub delegatee: String, + pub expiry: Option, + pub issued_at: Option, + pub not_before: Option, + pub facts: Option, + pub serialization: String, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationRevocation { + pub id: String, + pub revoker: String, + pub revoked: String, + pub serialization: String, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationAbility { + pub resource: Resource, + pub ability: tinycloud_core::types::Ability, + pub delegation: String, + pub caveats: tinycloud_core::types::Caveats, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationParentDelegation { + pub parent: String, + pub child: String, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationInvocation { + pub id: String, + pub invoker: String, + pub issued_at: String, + pub facts: Option, + pub serialization: String, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationInvokedAbility { + pub invocation: String, + pub resource: Resource, + pub ability: tinycloud_core::types::Ability, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationKvWrite { + pub space: tinycloud_core::types::SpaceIdWrap, + pub key: tinycloud_core::types::Path, + pub invocation: String, + pub seq: i64, + pub epoch: String, + pub epoch_seq: i64, + pub value: String, + pub metadata: Metadata, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationKvDelete { + pub invocation_id: String, + pub space: tinycloud_core::types::SpaceIdWrap, + pub key: tinycloud_core::types::Path, + pub deleted_invocation_id: String, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationDatabaseArtifact { + pub service: String, + pub space: String, + pub name: String, + pub revision: i64, + pub content_hash: String, + pub payload: String, + pub size_bytes: i64, + pub backend: String, + pub storage_mode: String, + pub created_at: String, + pub updated_at: String, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationEpoch { + pub seq: i64, + pub id: String, + pub space: tinycloud_core::types::SpaceIdWrap, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationEpochOrder { + pub parent: String, + pub child: String, + pub space: tinycloud_core::types::SpaceIdWrap, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationEventOrder { + pub seq: i64, + pub epoch: String, + pub epoch_seq: i64, + pub event: String, + pub space: tinycloud_core::types::SpaceIdWrap, +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationBlock { + pub path: String, + pub bytes: String, +} + +macro_rules! collect_table_range { + ($db:expr, $entity:ty, $name:expr, $mapper:expr) => {{ + let keys = <$entity>::find() + .all($db) + .await? + .into_iter() + .map($mapper) + .collect::>(); + build_table_range($name, keys) + }}; +} + +async fn local_fact_ranges( + tinycloud: &State, + encryption: &State, +) -> Result { + let tx = tinycloud.readable().await?; + + let auth_fact_plane = build_plane_range( + "authFacts", + vec![ + collect_table_range!(&tx, space::Entity, "space", |row: space::Model| { + format!("space:{}", row.id.0) + }), + collect_table_range!(&tx, actor::Entity, "actor", |row: actor::Model| { + format!("actor:{}", row.id) + }), + collect_table_range!( + &tx, + delegation::Entity, + "delegation", + |row: delegation::Model| { + format!( + "delegation:{}|{}|{}", + hash_to_cid(row.id), + row.delegator, + row.delegatee + ) + } + ), + collect_table_range!( + &tx, + revocation::Entity, + "revocation", + |row: revocation::Model| { + format!( + "revocation:{}|{}|{}", + hash_to_cid(row.id), + row.revoker, + hash_to_cid(row.revoked) + ) + } + ), + collect_table_range!( + &tx, + abilities::Entity, + "ability", + |row: abilities::Model| { + format!( + "ability:{}|{}|{}", + row.resource, + row.ability, + hash_to_cid(row.delegation) + ) + } + ), + collect_table_range!( + &tx, + parent_delegations::Entity, + "parentDelegation", + |row: parent_delegations::Model| { + format!( + "parent-delegation:{}|{}", + hash_to_cid(row.parent), + hash_to_cid(row.child) + ) + } + ), + ], + ); + + let authored_fact_plane = build_plane_range( + "authoredFacts", + vec![ + collect_table_range!( + &tx, + invocation::Entity, + "invocation", + |row: invocation::Model| { + format!("invocation:{}|{}", hash_to_cid(row.id), row.invoker) + } + ), + collect_table_range!( + &tx, + invoked_abilities::Entity, + "invokedAbilities", + |row: invoked_abilities::Model| { + format!( + "invoked-ability:{}|{}|{}", + hash_to_cid(row.invocation), + row.resource, + row.ability + ) + } + ), + collect_table_range!(&tx, kv_write::Entity, "kvWrite", |row: kv_write::Model| { + format!( + "kv-write:{}|{}|{}|{}", + row.space.0, + row.key, + hash_to_cid(row.invocation), + hash_to_cid(row.value) + ) + }), + collect_table_range!( + &tx, + kv_delete::Entity, + "kvDelete", + |row: kv_delete::Model| { + format!( + "kv-delete:{}|{}|{}|{}", + row.space.0, + row.key, + hash_to_cid(row.invocation_id), + hash_to_cid(row.deleted_invocation_id) + ) + } + ), + ], + ); + + let blob_plane = build_plane_range( + "blobs", + vec![collect_table_range!( + &tx, + database_artifact::Entity, + "databaseArtifact", + |row: database_artifact::Model| { + format!( + "database-artifact:{}|{}|{}|{:020}|{}", + row.service, row.space, row.name, row.revision, row.content_hash + ) + } + )], + ); + + let derived_view_input_plane = build_plane_range( + "derivedViewInputs", + vec![ + collect_table_range!(&tx, epoch::Entity, "epoch", |row: epoch::Model| { + format!( + "epoch:{}|{:020}|{}", + row.space.0, + row.seq, + hash_to_cid(row.id) + ) + }), + collect_table_range!( + &tx, + epoch_order::Entity, + "epochOrder", + |row: epoch_order::Model| { + format!( + "epoch-order:{}|{}|{}", + row.space.0, + hash_to_cid(row.parent), + hash_to_cid(row.child) + ) + } + ), + collect_table_range!( + &tx, + event_order::Entity, + "eventOrder", + |row: event_order::Model| { + format!( + "event-order:{}|{}|{:020}|{:020}|{}", + row.space.0, + hash_to_cid(row.epoch), + row.epoch_seq, + row.seq, + hash_to_cid(row.event) + ) + } + ), + ], + ); + + Ok(LocalFactRangeResponse { + node_id: encryption.node_did().to_string(), + planes: vec![ + auth_fact_plane, + authored_fact_plane, + blob_plane, + derived_view_input_plane, + ], + }) +} + +#[get("/reconciliation/ranges")] +pub async fn reconciliation_ranges( + tinycloud: &State, + encryption: &State, +) -> Result, (Status, String)> { + local_fact_ranges(tinycloud, encryption) + .await + .map(Json) + .map_err(|e| (Status::InternalServerError, e.to_string())) +} + +fn has_host_capability(delegation: &DelegationInfo) -> bool { + delegation.capabilities.iter().any(|cap| { + cap.ability.to_string() == "tinycloud.space/host" && cap.resource.space().is_some() + }) +} + +fn validate_host_reconciliation_access( + delegation: &DelegationInfo, +) -> Result<(), (Status, String)> { + if !has_host_capability(delegation) { + return Err((Status::Unauthorized, "Host delegation required".to_string())); + } + + let space = delegation + .capabilities + .iter() + .find_map(|cap| cap.resource.space()) + .ok_or_else(|| { + ( + Status::Unauthorized, + "Host delegation must target a space".to_string(), + ) + })?; + + if !did_principal_matches(space.did().as_str(), &delegation.delegator) { + return Err(( + Status::Unauthorized, + "Host delegation must be issued by the space owner".to_string(), + )); + } + + Ok(()) +} + +fn local_block_root(config: &Config) -> Result { + match &config.storage.blocks { + BlockConfig::B(fs) => Ok(fs.path().to_path_buf()), + BlockConfig::A(_) => Err( + "host-host reconciliation currently requires local filesystem block storage" + .to_string(), + ), + } +} + +async fn collect_block_snapshots(root: &StdPath) -> Result, String> { + let mut snapshots = Vec::new(); + let mut stack = vec![root.to_path_buf()]; + + while let Some(current) = stack.pop() { + let mut dir = match tokio::fs::read_dir(¤t).await { + Ok(dir) => dir, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue, + Err(err) => return Err(err.to_string()), + }; + + while let Some(entry) = dir.next_entry().await.map_err(|err| err.to_string())? { + let ty = entry.file_type().await.map_err(|err| err.to_string())?; + let path = entry.path(); + if ty.is_dir() { + stack.push(path); + continue; + } + if !ty.is_file() { + continue; + } + + let rel = path + .strip_prefix(root) + .map_err(|err| err.to_string())? + .to_string_lossy() + .into_owned(); + let bytes = tokio::fs::read(&path) + .await + .map_err(|err| err.to_string())?; + snapshots.push(ReconciliationBlock { + path: rel, + bytes: encode_base64(&bytes), + }); + } + } + + snapshots.sort_by(|a, b| a.path.cmp(&b.path)); + snapshots.dedup_by(|a, b| a.path == b.path); + Ok(snapshots) +} + +async fn write_block_snapshots( + root: &StdPath, + blocks: &[ReconciliationBlock], +) -> Result<(), String> { + for block in blocks { + let path = root.join(&block.path); + let bytes = decode_base64(&block.bytes)?; + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent) + .await + .map_err(|err| err.to_string())?; + } + tokio::fs::write(&path, bytes) + .await + .map_err(|err| err.to_string())?; + } + Ok(()) +} + +async fn insert_many_ignore( + db: &C, + rows: Vec, + conflict: OnConflict, +) -> Result<(), DbErr> +where + C: ConnectionTrait, + E: EntityTrait, + E::ActiveModel: sea_orm::ActiveModelTrait, + M: sea_orm::IntoActiveModel, +{ + let _ = conflict; + let rows = rows + .into_iter() + .map(sea_orm::IntoActiveModel::into_active_model) + .collect::>(); + + if rows.is_empty() { + return Ok(()); + } + + let mut conflict = OnConflict::new(); + conflict.do_nothing(); + + match E::insert_many(rows).on_conflict(conflict).exec(db).await { + Err(DbErr::RecordNotInserted) => Ok(()), + result => { + result?; + Ok(()) + } + } +} + +// Deterministic SQL reducer policy: +// prefer the higher revision, then break ties by content hash so every peer +// converges on the same winning materialized view without consulting clocks. +fn database_artifact_prefers_incoming( + existing: &database_artifact::Model, + incoming: &database_artifact::Model, +) -> bool { + incoming.revision > existing.revision + || (incoming.revision == existing.revision && incoming.content_hash > existing.content_hash) +} + +async fn upsert_database_artifact( + db: &C, + incoming: database_artifact::Model, +) -> Result<(), DbErr> +where + C: ConnectionTrait, +{ + let key = ( + incoming.service.clone(), + incoming.space.clone(), + incoming.name.clone(), + ); + let existing = database_artifact::Entity::find_by_id(key).one(db).await?; + + if let Some(existing) = existing.as_ref() { + if !database_artifact_prefers_incoming(existing, &incoming) { + return Ok(()); + } + } + + let active = database_artifact::ActiveModel { + service: Set(incoming.service), + space: Set(incoming.space), + name: Set(incoming.name), + revision: Set(incoming.revision), + content_hash: Set(incoming.content_hash), + payload: Set(incoming.payload), + size_bytes: Set(incoming.size_bytes), + backend: Set(incoming.backend), + storage_mode: Set(incoming.storage_mode), + created_at: Set(incoming.created_at), + updated_at: Set(incoming.updated_at), + }; + + match existing { + Some(_) => { + active.update(db).await?; + } + None => { + active.insert(db).await?; + } + } + + Ok(()) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ReconciliationApplyResponse { + pub node_id: String, + pub spaces: usize, + pub actors: usize, + pub delegations: usize, + pub revocations: usize, + pub abilities: usize, + pub parent_delegations: usize, + pub invocations: usize, + pub invoked_abilities: usize, + pub kv_writes: usize, + pub kv_deletes: usize, + pub database_artifacts: usize, + pub epochs: usize, + pub epoch_orders: usize, + pub event_orders: usize, + pub blocks: usize, +} + +async fn build_reconciliation_snapshot( + tinycloud: &State, + config: &State, + encryption: &State, +) -> Result { + let tx = tinycloud.readable().await.map_err(|err| err.to_string())?; + + let spaces = space::Entity::find() + .order_by_asc(space::Column::Id) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationSpace { id: row.id }) + .collect::>(); + + let actors = actor::Entity::find() + .order_by_asc(actor::Column::Id) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationActor { id: row.id }) + .collect::>(); + + let delegations = delegation::Entity::find() + .order_by_asc(delegation::Column::Id) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationDelegation { + id: hash_to_cid(row.id), + delegator: row.delegator, + delegatee: row.delegatee, + expiry: row.expiry.as_ref().map(format_rfc3339), + issued_at: row.issued_at.as_ref().map(format_rfc3339), + not_before: row.not_before.as_ref().map(format_rfc3339), + facts: row.facts, + serialization: encode_base64(&row.serialization), + }) + .collect::>(); + + let revocations = revocation::Entity::find() + .order_by_asc(revocation::Column::Id) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationRevocation { + id: hash_to_cid(row.id), + revoker: row.revoker, + revoked: hash_to_cid(row.revoked), + serialization: encode_base64(&row.serialization), + }) + .collect::>(); + + let abilities = abilities::Entity::find() + .order_by_asc(abilities::Column::Delegation) + .order_by_asc(abilities::Column::Resource) + .order_by_asc(abilities::Column::Ability) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationAbility { + resource: row.resource, + ability: row.ability, + delegation: hash_to_cid(row.delegation), + caveats: row.caveats, + }) + .collect::>(); + + let parent_delegations = parent_delegations::Entity::find() + .order_by_asc(parent_delegations::Column::Parent) + .order_by_asc(parent_delegations::Column::Child) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationParentDelegation { + parent: hash_to_cid(row.parent), + child: hash_to_cid(row.child), + }) + .collect::>(); + + let invocations = invocation::Entity::find() + .order_by_asc(invocation::Column::Id) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationInvocation { + id: hash_to_cid(row.id), + invoker: row.invoker, + issued_at: format_rfc3339(&row.issued_at), + facts: row.facts, + serialization: encode_base64(&row.serialization), + }) + .collect::>(); + + let invoked_abilities = invoked_abilities::Entity::find() + .order_by_asc(invoked_abilities::Column::Invocation) + .order_by_asc(invoked_abilities::Column::Resource) + .order_by_asc(invoked_abilities::Column::Ability) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationInvokedAbility { + invocation: hash_to_cid(row.invocation), + resource: row.resource, + ability: row.ability, + }) + .collect::>(); + + let kv_writes = kv_write::Entity::find() + .order_by_asc(kv_write::Column::Space) + .order_by_asc(kv_write::Column::Key) + .order_by_asc(kv_write::Column::Invocation) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationKvWrite { + space: row.space, + key: row.key, + invocation: hash_to_cid(row.invocation), + seq: row.seq, + epoch: hash_to_cid(row.epoch), + epoch_seq: row.epoch_seq, + value: hash_to_cid(row.value), + metadata: row.metadata, + }) + .collect::>(); + + let kv_deletes = kv_delete::Entity::find() + .order_by_asc(kv_delete::Column::InvocationId) + .order_by_asc(kv_delete::Column::Space) + .order_by_asc(kv_delete::Column::Key) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationKvDelete { + invocation_id: hash_to_cid(row.invocation_id), + space: row.space, + key: row.key, + deleted_invocation_id: hash_to_cid(row.deleted_invocation_id), + }) + .collect::>(); + + let database_artifacts = database_artifact::Entity::find() + .order_by_asc(database_artifact::Column::Service) + .order_by_asc(database_artifact::Column::Space) + .order_by_asc(database_artifact::Column::Name) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationDatabaseArtifact { + service: row.service, + space: row.space, + name: row.name, + revision: row.revision, + content_hash: row.content_hash, + payload: encode_base64(&row.payload), + size_bytes: row.size_bytes, + backend: row.backend, + storage_mode: row.storage_mode, + created_at: row.created_at, + updated_at: row.updated_at, + }) + .collect::>(); + + let epochs = epoch::Entity::find() + .order_by_asc(epoch::Column::Space) + .order_by_asc(epoch::Column::Seq) + .order_by_asc(epoch::Column::Id) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationEpoch { + seq: row.seq, + id: hash_to_cid(row.id), + space: row.space, + }) + .collect::>(); + + let epoch_orders = epoch_order::Entity::find() + .order_by_asc(epoch_order::Column::Space) + .order_by_asc(epoch_order::Column::Parent) + .order_by_asc(epoch_order::Column::Child) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationEpochOrder { + parent: hash_to_cid(row.parent), + child: hash_to_cid(row.child), + space: row.space, + }) + .collect::>(); + + let event_orders = event_order::Entity::find() + .order_by_asc(event_order::Column::Space) + .order_by_asc(event_order::Column::Epoch) + .order_by_asc(event_order::Column::EpochSeq) + .order_by_asc(event_order::Column::Seq) + .all(&tx) + .await + .map_err(|err| err.to_string())? + .into_iter() + .map(|row| ReconciliationEventOrder { + seq: row.seq, + epoch: hash_to_cid(row.epoch), + epoch_seq: row.epoch_seq, + event: hash_to_cid(row.event), + space: row.space, + }) + .collect::>(); + + let blocks_root = local_block_root(config)?; + let blocks = collect_block_snapshots(&blocks_root).await?; + + Ok(ReconciliationSnapshot { + node_id: encryption.node_did().to_string(), + spaces, + actors, + delegations, + revocations, + abilities, + parent_delegations, + invocations, + invoked_abilities, + kv_writes, + kv_deletes, + database_artifacts, + epochs, + epoch_orders, + event_orders, + blocks, + }) +} + +async fn apply_reconciliation_snapshot( + tinycloud: &State, + config: &State, + snapshot: ReconciliationSnapshot, +) -> Result { + let blocks_root = local_block_root(config)?; + write_block_snapshots(&blocks_root, &snapshot.blocks).await?; + + let tx = tinycloud.writable().await.map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .spaces + .iter() + .cloned() + .map(|row| space::Model { id: row.id }) + .collect::>(), + OnConflict::column(space::Column::Id) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .actors + .iter() + .cloned() + .map(|row| actor::Model { id: row.id }) + .collect::>(), + OnConflict::column(actor::Column::Id) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .delegations + .iter() + .cloned() + .map(|row| { + let expiry = parse_rfc3339(row.expiry).map_err(|err| err.to_string())?; + let issued_at = parse_rfc3339(row.issued_at).map_err(|err| err.to_string())?; + let not_before = parse_rfc3339(row.not_before).map_err(|err| err.to_string())?; + Ok::<_, String>(delegation::Model { + id: row + .id + .parse::() + .map_err(|err| err.to_string())? + .into(), + delegator: row.delegator, + delegatee: row.delegatee, + expiry, + issued_at, + not_before, + facts: row.facts, + serialization: decode_base64(&row.serialization)?, + }) + }) + .collect::, _>>()?, + OnConflict::column(delegation::Column::Id) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .revocations + .iter() + .cloned() + .map(|row| { + Ok::<_, String>(revocation::Model { + id: row + .id + .parse::() + .map_err(|err| err.to_string())? + .into(), + revoker: row.revoker, + revoked: row + .revoked + .parse::() + .map_err(|err| err.to_string())? + .into(), + serialization: decode_base64(&row.serialization)?, + }) + }) + .collect::, _>>()?, + OnConflict::column(revocation::Column::Id) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .abilities + .iter() + .cloned() + .map(|row| abilities::Model { + resource: row.resource, + ability: row.ability, + delegation: row + .delegation + .parse::() + .map_err(|err| err.to_string()) + .unwrap() + .into(), + caveats: row.caveats, + }) + .collect::>(), + OnConflict::columns([ + abilities::Column::Resource, + abilities::Column::Ability, + abilities::Column::Delegation, + ]) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .parent_delegations + .iter() + .cloned() + .map(|row| { + Ok::<_, String>(parent_delegations::Model { + parent: row + .parent + .parse::() + .map_err(|err| err.to_string())? + .into(), + child: row + .child + .parse::() + .map_err(|err| err.to_string())? + .into(), + }) + }) + .collect::, _>>()?, + OnConflict::columns([ + parent_delegations::Column::Parent, + parent_delegations::Column::Child, + ]) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .invocations + .iter() + .cloned() + .map(|row| { + Ok::<_, String>(invocation::Model { + id: row + .id + .parse::() + .map_err(|err| err.to_string())? + .into(), + invoker: row.invoker, + issued_at: OffsetDateTime::parse(&row.issued_at, &Rfc3339) + .map_err(|err| err.to_string())?, + facts: row.facts, + serialization: decode_base64(&row.serialization)?, + }) + }) + .collect::, _>>()?, + OnConflict::column(invocation::Column::Id) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .invoked_abilities + .iter() + .cloned() + .map(|row| { + Ok::<_, String>(invoked_abilities::Model { + invocation: row + .invocation + .parse::() + .map_err(|err| err.to_string())? + .into(), + resource: row.resource, + ability: row.ability, + }) + }) + .collect::, _>>()?, + OnConflict::columns([ + invoked_abilities::Column::Invocation, + invoked_abilities::Column::Resource, + invoked_abilities::Column::Ability, + ]) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .epochs + .iter() + .cloned() + .map(|row| { + Ok::<_, String>(epoch::Model { + seq: row.seq, + id: row + .id + .parse::() + .map_err(|err| err.to_string())? + .into(), + space: row.space, + }) + }) + .collect::, _>>()?, + OnConflict::columns([epoch::Column::Id, epoch::Column::Space, epoch::Column::Seq]) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .epoch_orders + .iter() + .cloned() + .map(|row| { + Ok::<_, String>(epoch_order::Model { + parent: row + .parent + .parse::() + .map_err(|err| err.to_string())? + .into(), + child: row + .child + .parse::() + .map_err(|err| err.to_string())? + .into(), + space: row.space, + }) + }) + .collect::, _>>()?, + OnConflict::columns([ + epoch_order::Column::Parent, + epoch_order::Column::Child, + epoch_order::Column::Space, + ]) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .event_orders + .iter() + .cloned() + .map(|row| { + Ok::<_, String>(event_order::Model { + seq: row.seq, + epoch: row + .epoch + .parse::() + .map_err(|err| err.to_string())? + .into(), + epoch_seq: row.epoch_seq, + event: row + .event + .parse::() + .map_err(|err| err.to_string())? + .into(), + space: row.space, + }) + }) + .collect::, _>>()?, + OnConflict::columns([ + event_order::Column::Epoch, + event_order::Column::EpochSeq, + event_order::Column::Space, + ]) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .kv_writes + .iter() + .cloned() + .map(|row| { + Ok::<_, String>(kv_write::Model { + space: row.space, + key: row.key, + invocation: row + .invocation + .parse::() + .map_err(|err| err.to_string())? + .into(), + seq: row.seq, + epoch: row + .epoch + .parse::() + .map_err(|err| err.to_string())? + .into(), + epoch_seq: row.epoch_seq, + value: row + .value + .parse::() + .map_err(|err| err.to_string())? + .into(), + metadata: row.metadata, + }) + }) + .collect::, _>>()?, + OnConflict::columns([ + kv_write::Column::Space, + kv_write::Column::Key, + kv_write::Column::Invocation, + ]) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + insert_many_ignore::( + &tx, + snapshot + .kv_deletes + .iter() + .cloned() + .map(|row| { + Ok::<_, String>(kv_delete::Model { + invocation_id: row + .invocation_id + .parse::() + .map_err(|err| err.to_string())? + .into(), + space: row.space, + key: row.key, + deleted_invocation_id: row + .deleted_invocation_id + .parse::() + .map_err(|err| err.to_string())? + .into(), + }) + }) + .collect::, _>>()?, + OnConflict::columns([kv_delete::Column::InvocationId, kv_delete::Column::Space]) + .do_nothing() + .to_owned(), + ) + .await + .map_err(|err| err.to_string())?; + + for row in snapshot.database_artifacts.iter().cloned() { + upsert_database_artifact( + &tx, + database_artifact::Model { + service: row.service, + space: row.space, + name: row.name, + revision: row.revision, + content_hash: row.content_hash, + payload: decode_base64(&row.payload)?, + size_bytes: row.size_bytes, + backend: row.backend, + storage_mode: row.storage_mode, + created_at: row.created_at, + updated_at: row.updated_at, + }, + ) + .await + .map_err(|err| err.to_string())?; + } + + tx.commit().await.map_err(|err| err.to_string())?; + + Ok(ReconciliationApplyResponse { + node_id: snapshot.node_id, + spaces: snapshot.spaces.len(), + actors: snapshot.actors.len(), + delegations: snapshot.delegations.len(), + revocations: snapshot.revocations.len(), + abilities: snapshot.abilities.len(), + parent_delegations: snapshot.parent_delegations.len(), + invocations: snapshot.invocations.len(), + invoked_abilities: snapshot.invoked_abilities.len(), + kv_writes: snapshot.kv_writes.len(), + kv_deletes: snapshot.kv_deletes.len(), + database_artifacts: snapshot.database_artifacts.len(), + epochs: snapshot.epochs.len(), + epoch_orders: snapshot.epoch_orders.len(), + event_orders: snapshot.event_orders.len(), + blocks: snapshot.blocks.len(), + }) +} + +#[get("/reconciliation/snapshot")] +pub async fn reconciliation_snapshot( + d: AuthHeaderGetter, + tinycloud: &State, + config: &State, + encryption: &State, +) -> Result, (Status, String)> { + let delegation = d.0 .0; + validate_host_reconciliation_access(&delegation)?; + + build_reconciliation_snapshot(tinycloud, config, encryption) + .await + .map(Json) + .map_err(|err| (Status::InternalServerError, err)) +} + +#[post("/reconciliation/snapshot", data = "")] +pub async fn reconciliation_snapshot_apply( + d: AuthHeaderGetter, + snapshot: Json, + tinycloud: &State, + config: &State, +) -> Result, (Status, String)> { + let delegation = d.0 .0; + validate_host_reconciliation_access(&delegation)?; + + apply_reconciliation_snapshot(tinycloud, config, snapshot.into_inner()) + .await + .map(Json) + .map_err(|err| (Status::InternalServerError, err)) +} + #[allow(clippy::let_unit_value)] pub mod util_routes { use super::*;