From ab45c358d80abe2ab9912a9e1bac9228411779d1 Mon Sep 17 00:00:00 2001 From: Dmitrii Kostyrev Date: Thu, 3 Jul 2025 22:47:57 +0100 Subject: [PATCH] Redis scheduler store should read OperationId as a JSON instead of String. --- .../src/store_awaited_action_db.rs | 3 +- .../redis_store_awaited_action_db_test.rs | 53 ++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/nativelink-scheduler/src/store_awaited_action_db.rs b/nativelink-scheduler/src/store_awaited_action_db.rs index 4de133ad0..c586b94bc 100644 --- a/nativelink-scheduler/src/store_awaited_action_db.rs +++ b/nativelink-scheduler/src/store_awaited_action_db.rs @@ -264,7 +264,8 @@ impl SchedulerStoreKeyProvider for ClientIdToOperationId<'_> { impl SchedulerStoreDecodeTo for ClientIdToOperationId<'_> { type DecodeOutput = OperationId; fn decode(_version: u64, data: Bytes) -> Result { - OperationId::try_from(data).err_tip(|| "In ClientIdToOperationId::decode") + serde_json::from_slice(&data) + .map_err(|e| make_input_err!("In ClientIdToOperationId::decode - {e:?}")) } } diff --git a/nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs b/nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs index bd42e7222..32ffc34af 100644 --- a/nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs +++ b/nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs @@ -35,7 +35,7 @@ use nativelink_scheduler::awaited_action_db::{ use nativelink_scheduler::store_awaited_action_db::StoreAwaitedActionDb; use nativelink_store::redis_store::{RedisStore, RedisSubscriptionManager}; use nativelink_util::action_messages::{ - ActionInfo, ActionStage, ActionUniqueKey, ActionUniqueQualifier, + ActionInfo, ActionStage, ActionUniqueKey, ActionUniqueQualifier, OperationId, }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -196,6 +196,8 @@ async fn add_action_smoke_test() -> Result<(), Error> { new_awaited_action }; + let worker_operation_id = OperationId::from(WORKER_OPERATION_ID); + let ft_aggregate_args = vec![ format!("aa__unique_qualifier__{SCRIPT_VERSION}").into(), format!("@unique_qualifier:{{ {INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c }}").into(), @@ -349,6 +351,43 @@ async fn add_action_smoke_test() -> Result<(), Error> { ])), None, ) + .expect( + MockCommand { + cmd: Str::from_static("HMGET"), + subcommand: None, + args: vec![ + format!("cid_{CLIENT_OPERATION_ID}").as_bytes().into(), + "version".as_bytes().into(), + "data".as_bytes().into(), + ], + }, + Ok(RedisValue::Array(vec![ + // Version. + RedisValue::Null, + // Data. + RedisValue::Bytes(Bytes::from(serde_json::to_string(&worker_operation_id).unwrap())), + ])), + None, + ) + .expect( + MockCommand { + cmd: Str::from_static("HMGET"), + subcommand: None, + args: vec![ + format!("aa_{WORKER_OPERATION_ID}").as_bytes().into(), + "version".as_bytes().into(), + "data".as_bytes().into(), + ], + }, + Ok(RedisValue::Array(vec![ + // Version. + "2".into(), + // Data. + RedisValue::Bytes(Bytes::from(serde_json::to_string(&new_awaited_action).unwrap())), + ])), + None, + ) + .expect( MockCommand { cmd: Str::from_static("EVALSHA"), @@ -453,6 +492,18 @@ async fn add_action_smoke_test() -> Result<(), Error> { ); } + { + let get_subscription = awaited_action_db + .get_awaited_action_by_id(&OperationId::from(CLIENT_OPERATION_ID)) + .await + .unwrap() + .unwrap(); + + let get_res = get_subscription.borrow().await; + + assert_eq!(get_res.unwrap().state().stage, ActionStage::Executing); + } + { // Update the action and check the new state. let (changed_awaited_action_res, update_res) = tokio::join!(