Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions tinycloud-core/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ where
.begin_with_config(None, Some(sea_orm::AccessMode::ReadOnly))
.await
}

pub async fn writable(&self) -> Result<DatabaseTransaction, DbErr> {
self.conn.begin().await
}
}

impl<C, B, K> SpaceDatabase<C, B, K>
Expand Down Expand Up @@ -1188,22 +1192,7 @@ async fn get_kv_entity<C: ConnectionTrait>(
key: &Path,
// TODO version: Option<(i64, Hash, i64)>,
) -> Result<Option<kv_write::Model>, DbErr> {
// Ok(if let Some((seq, epoch, epoch_seq)) = version {
// event_order::Entity::find_by_id((epoch, epoch_seq, space_id.clone().into()))
// .reverse_join(kv_write::Entity)
// .find_also_related(kv_delete::Entity)
// .filter(
// Condition::all()
// .add(kv_write::Column::Key.eq(key))
// .add(kv_write::Column::Space.eq(space_id.clone().into()))
// .add(kv_delete::Column::InvocationId.is_null()),
// )
// .one(db)
// .await?
// .map(|(kv, _)| kv)
// } else {
// we want to find the latest kv_write which is not deleted
Ok(kv_write::Entity::find()
let latest_write = kv_write::Entity::find()
.filter(
Condition::all()
.add(kv_write::Column::Key.eq(key.as_str()))
Expand All @@ -1212,11 +1201,51 @@ async fn get_kv_entity<C: ConnectionTrait>(
.order_by_desc(kv_write::Column::Seq)
.order_by_desc(kv_write::Column::Epoch)
.order_by_desc(kv_write::Column::EpochSeq)
.find_also_related(kv_delete::Entity)
.filter(kv_delete::Column::InvocationId.is_null())
.one(db)
.await?;

let latest_write_order = latest_write
.as_ref()
.map(|row| (row.seq, row.epoch, row.epoch_seq));

let mut latest_delete_order: Option<(i64, Hash, i64)> = None;
for delete in kv_delete::Entity::find()
.filter(
Condition::all()
.add(kv_delete::Column::Key.eq(key.as_str()))
.add(kv_delete::Column::Space.eq(SpaceIdWrap(space_id.clone()))),
)
.all(db)
.await?
.map(|(kv, _)| kv))
{
let Some(delete_order) = event_order::Entity::find()
.filter(event_order::Column::Space.eq(SpaceIdWrap(space_id.clone())))
.filter(event_order::Column::Event.eq(delete.invocation_id))
.one(db)
.await?
else {
continue;
};

let candidate = (delete_order.seq, delete_order.epoch, delete_order.epoch_seq);
if latest_delete_order
.as_ref()
.map(|current| candidate > *current)
.unwrap_or(true)
{
latest_delete_order = Some(candidate);
}
}

if latest_delete_order
.as_ref()
.zip(latest_write_order.as_ref())
.is_some_and(|(delete_order, write_order)| delete_order > write_order)
{
return Ok(None);
}

Ok(latest_write)
}

async fn get_valid_delegations<C: ConnectionTrait, S: StorageSetup, K: Secrets>(
Expand Down
98 changes: 78 additions & 20 deletions tinycloud-core/src/sql/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use dashmap::DashMap;
use tinycloud_auth::resource::SpaceId;

use crate::database_artifacts::{DatabaseArtifactError, DatabaseArtifactRepository};
use crate::hash::hash;

use super::{
caveats::SqlCaveats,
Expand Down Expand Up @@ -83,31 +84,56 @@ impl SqlService {

pub async fn export(&self, space: &SpaceId, db_name: &str) -> Result<Vec<u8>, SqlError> {
let key = (space.to_string(), db_name.to_string());
let artifact = self
.artifact_repository
.load("sql", &space.to_string(), db_name)
.await
.map_err(artifact_error_to_sql)?;

// If there's a live actor, route through it (handles both in-memory and file-backed)
if let Some(handle) = self.databases.get(&key).map(|h| h.clone()) {
match handle.export().await {
Err(SqlError::Internal(ref msg))
if msg.contains("Database actor not available") =>
{
// Actor is dead — remove stale entry and fall through to cold read
tracing::warn!(space=%space, db=%db_name, "Dead SQL actor detected during export, removing");
self.databases.remove(&key);
if let Some(artifact) = artifact.as_ref() {
match handle.export().await {
Ok(payload) => {
let live_hash = hash(&payload).to_cid(0x55).to_string();
if live_hash == artifact.content_hash {
return Ok(payload);
}
}
Err(SqlError::Internal(ref msg))
if msg.contains("Database actor not available") =>
{
tracing::warn!(
space=%space,
db=%db_name,
"Dead SQL actor detected during export, removing"
);
}
Err(err) => return Err(err),
}
other => return other,

let _ = self.discard_local_state(&key).await;
} else {
return handle.export().await;
}
}

match self
.artifact_repository
.load("sql", &space.to_string(), db_name)
.await
.map_err(artifact_error_to_sql)?
{
Some(artifact) => {
remove_sql_cache_files(&self.cache_path(space, db_name)).await?;
write_cache_file(&self.cache_path(space, db_name), &artifact.payload).await?;
Ok(artifact.payload)
match artifact {
Some(_) => {
self.hydrate_cache(space, db_name).await?;
let handle = self
.databases
.entry(key)
.or_insert_with(|| {
spawn_actor(
space.to_string(),
db_name.to_string(),
self.base_path.clone(),
self.memory_threshold,
self.databases.clone(),
)
})
.clone();
handle.export().await
}
None => Err(SqlError::DatabaseNotFound),
}
Expand All @@ -120,8 +146,40 @@ impl SqlService {

async fn handle(&self, space: &SpaceId, db_name: &str) -> Result<DatabaseHandle, SqlError> {
let key = (space.to_string(), db_name.to_string());

// If reconciliation replaced the durable artifact, drop any stale live
// actor so the next access rehydrates from the winning payload.
let artifact = self
.artifact_repository
.load("sql", &space.to_string(), db_name)
.await
.map_err(artifact_error_to_sql)?;

if let Some(handle) = self.databases.get(&key).map(|h| h.clone()) {
return Ok(handle);
if let Some(artifact) = artifact.as_ref() {
match handle.export().await {
Ok(payload) => {
let live_hash = hash(&payload).to_cid(0x55).to_string();
if live_hash == artifact.content_hash {
return Ok(handle);
}
}
Err(SqlError::Internal(ref msg))
if msg.contains("Database actor not available") =>
{
tracing::warn!(
space=%space,
db=%db_name,
"Dead SQL actor detected during refresh, removing"
);
}
Err(err) => return Err(err),
}

let _ = self.discard_local_state(&key).await;
} else {
return Ok(handle);
}
}

self.hydrate_cache(space, db_name).await?;
Expand Down
5 changes: 4 additions & 1 deletion tinycloud-node-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use routes::{
hooks::{create_hook_ticket, create_webhook, delete_webhook, hook_events, list_webhooks},
info, invoke, open_host_key,
public::{public_kv_get, public_kv_head, public_kv_list, public_kv_options, RateLimiter},
signed_kv_get,
reconciliation_ranges, reconciliation_snapshot, reconciliation_snapshot_apply, signed_kv_get,
util_routes::*,
version,
};
Expand Down Expand Up @@ -125,6 +125,9 @@ pub async fn app(config: &Figment) -> Result<Rocket<Build>> {
delegate,
create_signed_kv_url,
signed_kv_get,
reconciliation_ranges,
reconciliation_snapshot,
reconciliation_snapshot_apply,
create_hook_ticket,
hook_events,
create_webhook,
Expand Down
Loading