Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1915,9 +1915,9 @@ impl Database for DatabaseFdbSqliteNats {
limit: Some(1),
..(&pending_signal_subspace).into()
},
// NOTE: This does not have to be SERIALIZABLE because the conflict occurs
// with acking which is a separate row. See below
SNAPSHOT,
// NOTE: This is serializable because any insert into this subspace
// should cause a conflict and retry of this txn
SERIALIZABLE,
)
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -2245,7 +2245,7 @@ impl Database for DatabaseFdbSqliteNats {
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
);

// Write ray id ts
// Write ray id
let ray_id_key = keys::signal::RayIdKey::new(signal_id);
tx.set(
&self.subspace.pack(&ray_id_key),
Expand Down
112 changes: 19 additions & 93 deletions packages/common/chirp-workflow/core/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,68 +39,23 @@ async fn fdb_sqlite_nats_driver() {
// .await
// .unwrap();

// let res = db
// .find_workflow(
// "workflow_name",
// &json!({
// "bald": "eagle",
// "fat": "man"
// }),
// )
// .await
// .unwrap();
// tracing::info!(?res);

// db.update_workflow_tags(
// workflow_id,
// "workflow_name",
// &json!({
// "bald": "eagle",
// "fat": "man"
// }),
// )
// .await
// .unwrap();
let workflow_id = ctx.workflow(def::Input { })
.dispatch()
.await
.unwrap();

// let res = db
// .find_workflow(
// "workflow_name",
// &json!({
// "bald": "eagle",
// "fat": "man"
// }),
// )
// .await
// .unwrap();
// tracing::info!(?res);

if std::env::var("SPAWN_WF").unwrap_or_default() == "1" {
for _ in 0..1 {
let ctx2 = ctx.clone();
tokio::spawn(async move {
ctx2.workflow(def::Input {})
.tag("foo", "bar")
.dispatch()
.await
.unwrap();
});
}
}
let ctx2 = ctx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(110)).await;

// let ctx2 = ctx.clone();
// tokio::spawn(async move {
// for _ in 0..10 {
// tokio::time::sleep(Duration::from_secs(2)).await;
// ctx2.signal(def::MySignal {
// test: Uuid::new_v4(),
// })
// .to_workflow::<def::Workflow>()
// .tag("foo", "bar")
// .send()
// .await
// .unwrap();
// }
// });
ctx2.signal(def::MySignal {
test: Uuid::new_v4(),
})
.to_workflow_id(workflow_id)
.send()
.await
.unwrap();
});

let worker = Worker::new(reg.clone(), db.clone());

Expand All @@ -120,43 +75,14 @@ mod def {
pub async fn test(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
tracing::info!(w=?ctx.workflow_id(), "hello from workflow");

ctx.activity(TestActivityInput {
foo: "bar".to_string(),
})
.await?;

// let workflow_id = ctx.workflow_id();
// ctx.signal(MySignal {
// test: Uuid::new_v4(),
// ctx.activity(TestActivityInput {
// foo: "bar".to_string(),
// })
// .to_workflow_id(workflow_id)
// .send()
// .await?;

ctx.repeat(|ctx| {
async move {
let sig = ctx.listen_with_timeout::<MySignal>(5 * 1000).await?;
tracing::info!(?sig);

let start = std::time::Instant::now();

ctx.activity(TestActivityInput {
foo: "bar".to_string(),
})
.await?;
let sig = ctx.listen::<MySignal>().await?;

ctx.activity(TestActivityInput {
foo: "bar".to_string(),
})
.await?;

tracing::info!(dt=?start.elapsed(), "-------------");

Ok(Loop::<()>::Continue)
}
.boxed()
})
.await?;
tracing::info!(?sig, "signal recv ------------------");

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chirp_workflow::prelude::*;
use fdb_util::{end_of_key_range, FormalKey, SNAPSHOT};
use fdb_util::{end_of_key_range, FormalKey, SERIALIZABLE};
use foundationdb::{
self as fdb,
options::{ConflictRangeType, StreamingMode},
Expand Down Expand Up @@ -80,9 +80,7 @@ pub(crate) async fn pegboard_actor_allocate_ingress_ports(
mode: StreamingMode::Iterator,
..(start_key, end_key.clone()).into()
},
// NOTE: This is not SERIALIZABLE because we don't want to conflict with all of the keys,
// just the one we choose
SNAPSHOT,
SERIALIZABLE,
);

// Continue iterating over the same stream until all of the required ports are found
Expand Down Expand Up @@ -111,9 +109,7 @@ pub(crate) async fn pegboard_actor_allocate_ingress_ports(
limit: Some(old_start as usize),
..(start_key, end_key.clone()).into()
},
// NOTE: This is not SERIALIZABLE because we don't want to conflict
// with all of the keys, just the one we choose
SNAPSHOT,
SERIALIZABLE,
);

continue;
Expand Down
Loading