@@ -35,7 +35,7 @@ use nativelink_scheduler::awaited_action_db::{
3535use nativelink_scheduler:: store_awaited_action_db:: StoreAwaitedActionDb ;
3636use nativelink_store:: redis_store:: { RedisStore , RedisSubscriptionManager } ;
3737use nativelink_util:: action_messages:: {
38- ActionInfo , ActionStage , ActionUniqueKey , ActionUniqueQualifier ,
38+ ActionInfo , ActionStage , ActionUniqueKey , ActionUniqueQualifier , OperationId ,
3939} ;
4040use nativelink_util:: common:: DigestInfo ;
4141use nativelink_util:: digest_hasher:: DigestHasherFunc ;
@@ -196,6 +196,8 @@ async fn add_action_smoke_test() -> Result<(), Error> {
196196 new_awaited_action
197197 } ;
198198
199+ let worker_operation_id = OperationId :: from ( WORKER_OPERATION_ID ) ;
200+
199201 let ft_aggregate_args = vec ! [
200202 format!( "aa__unique_qualifier__{SCRIPT_VERSION}" ) . into( ) ,
201203 format!( "@unique_qualifier:{{ {INSTANCE_NAME}_SHA256_0000000000000000000000000000000000000000000000000000000000000000_0_c }}" ) . into( ) ,
@@ -349,6 +351,43 @@ async fn add_action_smoke_test() -> Result<(), Error> {
349351 ] ) ) ,
350352 None ,
351353 )
354+ . expect (
355+ MockCommand {
356+ cmd : Str :: from_static ( "HMGET" ) ,
357+ subcommand : None ,
358+ args : vec ! [
359+ format!( "cid_{CLIENT_OPERATION_ID}" ) . as_bytes( ) . into( ) ,
360+ "version" . as_bytes( ) . into( ) ,
361+ "data" . as_bytes( ) . into( ) ,
362+ ] ,
363+ } ,
364+ Ok ( RedisValue :: Array ( vec ! [
365+ // Version.
366+ RedisValue :: Null ,
367+ // Data.
368+ RedisValue :: Bytes ( Bytes :: from( serde_json:: to_string( & worker_operation_id) . unwrap( ) ) ) ,
369+ ] ) ) ,
370+ None ,
371+ )
372+ . expect (
373+ MockCommand {
374+ cmd : Str :: from_static ( "HMGET" ) ,
375+ subcommand : None ,
376+ args : vec ! [
377+ format!( "aa_{WORKER_OPERATION_ID}" ) . as_bytes( ) . into( ) ,
378+ "version" . as_bytes( ) . into( ) ,
379+ "data" . as_bytes( ) . into( ) ,
380+ ] ,
381+ } ,
382+ Ok ( RedisValue :: Array ( vec ! [
383+ // Version.
384+ "2" . into( ) ,
385+ // Data.
386+ RedisValue :: Bytes ( Bytes :: from( serde_json:: to_string( & new_awaited_action) . unwrap( ) ) ) ,
387+ ] ) ) ,
388+ None ,
389+ )
390+
352391 . expect (
353392 MockCommand {
354393 cmd : Str :: from_static ( "EVALSHA" ) ,
@@ -453,6 +492,18 @@ async fn add_action_smoke_test() -> Result<(), Error> {
453492 ) ;
454493 }
455494
495+ {
496+ let get_subscription = awaited_action_db
497+ . get_awaited_action_by_id ( & OperationId :: from ( CLIENT_OPERATION_ID ) )
498+ . await
499+ . unwrap ( )
500+ . unwrap ( ) ;
501+
502+ let get_res = get_subscription. borrow ( ) . await ;
503+
504+ assert_eq ! ( get_res. unwrap( ) . state( ) . stage, ActionStage :: Executing ) ;
505+ }
506+
456507 {
457508 // Update the action and check the new state.
458509 let ( changed_awaited_action_res, update_res) = tokio:: join!(
0 commit comments