From b4f612239dc0e5b98613f5912f129fbad23beebb Mon Sep 17 00:00:00 2001 From: Diptanu Gon Choudhury Date: Fri, 16 Jan 2026 13:34:02 -0800 Subject: [PATCH] feat: add AllocationCreated and AllocationCompleted events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactors request state change events to clearly separate two distinct lifecycles: - Function Run lifecycle: the logical unit of work (may have multiple execution attempts) - Allocation lifecycle: a single physical execution attempt on an executor Changes: - Add AllocationCreated event (renamed from FunctionRunAssigned) - Add AllocationCompleted event for when an execution attempt finishes - Update FunctionRunCompleted to represent final function run outcome - Keep allocation_id as optional in FunctionRunCompleted for backward compatibility - Maintain backward compatibility with FunctionRunAssigned type alias - Update RequestStateChangeEvent enum and all helper methods - Add comprehensive backward compatibility tests Backward compatibility: - Old servers sending FunctionRunAssigned → deserializes correctly - Old servers sending FunctionRunCompleted (with allocation_id) → deserializes correctly - New servers sending AllocationCreated/AllocationCompleted → works as expected This aligns with tensorlakeai/indexify PR #2042. --- crates/cloud-sdk/src/applications/models.rs | 275 +++++++++++++++++++- 1 file changed, 262 insertions(+), 13 deletions(-) diff --git a/crates/cloud-sdk/src/applications/models.rs b/crates/cloud-sdk/src/applications/models.rs index 8e069a9..50f7680 100644 --- a/crates/cloud-sdk/src/applications/models.rs +++ b/crates/cloud-sdk/src/applications/models.rs @@ -520,11 +520,19 @@ pub trait RequestEventMetadata { pub enum RequestStateChangeEvent { RequestStarted(RequestStartedEvent), FunctionRunCreated(FunctionRunCreated), - FunctionRunAssigned(FunctionRunAssigned), + /// Event emitted when a function run reaches its final outcome (after all retries exhausted or success) FunctionRunCompleted(FunctionRunCompleted), FunctionRunMatchedCache(FunctionRunMatchedCache), + /// Event emitted when an allocation (execution attempt) is created and assigned to an executor + AllocationCreated(AllocationCreated), + /// Event emitted when an allocation (execution attempt) completes with an outcome + AllocationCompleted(AllocationCompleted), RequestProgressUpdated(RequestProgressUpdated), RequestFinished(RequestFinishedEvent), + // Legacy variants for backward compatibility + /// @deprecated Use AllocationCreated instead + #[serde(alias = "FunctionRunAssigned")] + FunctionRunAssigned(AllocationCreated), } impl RequestStateChangeEvent { @@ -532,11 +540,14 @@ impl RequestStateChangeEvent { match self { RequestStateChangeEvent::RequestStarted(_) => "RequestStarted", RequestStateChangeEvent::FunctionRunCreated(_) => "FunctionRunCreated", - RequestStateChangeEvent::FunctionRunAssigned(_) => "FunctionRunAssigned", RequestStateChangeEvent::FunctionRunCompleted(_) => "FunctionRunCompleted", RequestStateChangeEvent::FunctionRunMatchedCache(_) => "FunctionRunMatchedCache", + RequestStateChangeEvent::AllocationCreated(_) => "AllocationCreated", + RequestStateChangeEvent::AllocationCompleted(_) => "AllocationCompleted", RequestStateChangeEvent::RequestProgressUpdated(_) => "RequestProgressUpdated", RequestStateChangeEvent::RequestFinished(_) => "RequestFinished", + // Legacy - maps to new name + RequestStateChangeEvent::FunctionRunAssigned(_) => "AllocationCreated", } } @@ -549,10 +560,12 @@ impl RequestStateChangeEvent { RequestStateChangeEvent::RequestStarted(event) => event.namespace(), RequestStateChangeEvent::RequestFinished(event) => event.namespace(), RequestStateChangeEvent::FunctionRunCreated(event) => event.namespace(), - RequestStateChangeEvent::FunctionRunAssigned(event) => event.namespace(), RequestStateChangeEvent::FunctionRunCompleted(event) => event.namespace(), RequestStateChangeEvent::FunctionRunMatchedCache(event) => event.namespace(), + RequestStateChangeEvent::AllocationCreated(event) => event.namespace(), + RequestStateChangeEvent::AllocationCompleted(event) => event.namespace(), RequestStateChangeEvent::RequestProgressUpdated(event) => event.namespace(), + RequestStateChangeEvent::FunctionRunAssigned(event) => event.namespace(), } } @@ -561,10 +574,12 @@ impl RequestStateChangeEvent { RequestStateChangeEvent::RequestStarted(event) => event.application_name(), RequestStateChangeEvent::RequestFinished(event) => event.application_name(), RequestStateChangeEvent::FunctionRunCreated(event) => event.application_name(), - RequestStateChangeEvent::FunctionRunAssigned(event) => event.application_name(), RequestStateChangeEvent::FunctionRunCompleted(event) => event.application_name(), RequestStateChangeEvent::FunctionRunMatchedCache(event) => event.application_name(), + RequestStateChangeEvent::AllocationCreated(event) => event.application_name(), + RequestStateChangeEvent::AllocationCompleted(event) => event.application_name(), RequestStateChangeEvent::RequestProgressUpdated(event) => event.application_name(), + RequestStateChangeEvent::FunctionRunAssigned(event) => event.application_name(), } } @@ -573,10 +588,12 @@ impl RequestStateChangeEvent { RequestStateChangeEvent::RequestStarted(event) => event.application_version(), RequestStateChangeEvent::RequestFinished(event) => event.application_version(), RequestStateChangeEvent::FunctionRunCreated(event) => event.application_version(), - RequestStateChangeEvent::FunctionRunAssigned(event) => event.application_version(), RequestStateChangeEvent::FunctionRunCompleted(event) => event.application_version(), RequestStateChangeEvent::FunctionRunMatchedCache(event) => event.application_version(), + RequestStateChangeEvent::AllocationCreated(event) => event.application_version(), + RequestStateChangeEvent::AllocationCompleted(event) => event.application_version(), RequestStateChangeEvent::RequestProgressUpdated(event) => event.application_version(), + RequestStateChangeEvent::FunctionRunAssigned(event) => event.application_version(), } } @@ -585,10 +602,12 @@ impl RequestStateChangeEvent { RequestStateChangeEvent::RequestStarted(event) => event.request_id(), RequestStateChangeEvent::RequestFinished(event) => event.request_id(), RequestStateChangeEvent::FunctionRunCreated(event) => event.request_id(), - RequestStateChangeEvent::FunctionRunAssigned(event) => event.request_id(), RequestStateChangeEvent::FunctionRunCompleted(event) => event.request_id(), RequestStateChangeEvent::FunctionRunMatchedCache(event) => event.request_id(), + RequestStateChangeEvent::AllocationCreated(event) => event.request_id(), + RequestStateChangeEvent::AllocationCompleted(event) => event.request_id(), RequestStateChangeEvent::RequestProgressUpdated(event) => event.request_id(), + RequestStateChangeEvent::FunctionRunAssigned(event) => event.request_id(), } } @@ -597,10 +616,12 @@ impl RequestStateChangeEvent { RequestStateChangeEvent::RequestStarted(event) => event.created_at(), RequestStateChangeEvent::RequestFinished(event) => event.created_at(), RequestStateChangeEvent::FunctionRunCreated(event) => event.created_at(), - RequestStateChangeEvent::FunctionRunAssigned(event) => event.created_at(), RequestStateChangeEvent::FunctionRunCompleted(event) => event.created_at(), RequestStateChangeEvent::FunctionRunMatchedCache(event) => event.created_at(), + RequestStateChangeEvent::AllocationCreated(event) => event.created_at(), + RequestStateChangeEvent::AllocationCompleted(event) => event.created_at(), RequestStateChangeEvent::RequestProgressUpdated(event) => event.created_at(), + RequestStateChangeEvent::FunctionRunAssigned(event) => event.created_at(), } } @@ -609,10 +630,12 @@ impl RequestStateChangeEvent { RequestStateChangeEvent::RequestStarted(event) => event.set_created_at(date), RequestStateChangeEvent::RequestFinished(event) => event.set_created_at(date), RequestStateChangeEvent::FunctionRunCreated(event) => event.set_created_at(date), - RequestStateChangeEvent::FunctionRunAssigned(event) => event.set_created_at(date), RequestStateChangeEvent::FunctionRunCompleted(event) => event.set_created_at(date), RequestStateChangeEvent::FunctionRunMatchedCache(event) => event.set_created_at(date), + RequestStateChangeEvent::AllocationCreated(event) => event.set_created_at(date), + RequestStateChangeEvent::AllocationCompleted(event) => event.set_created_at(date), RequestStateChangeEvent::RequestProgressUpdated(event) => event.set_created_at(date), + RequestStateChangeEvent::FunctionRunAssigned(event) => event.set_created_at(date), } } @@ -621,12 +644,15 @@ impl RequestStateChangeEvent { RequestStateChangeEvent::RequestStarted(_) => "Request Started", RequestStateChangeEvent::RequestFinished(_) => "Request Finished", RequestStateChangeEvent::FunctionRunCreated(_) => "Function Run Created", - RequestStateChangeEvent::FunctionRunAssigned(_) => "Function Run Assigned", RequestStateChangeEvent::FunctionRunCompleted(_) => "Function Run Completed", - RequestStateChangeEvent::RequestProgressUpdated(_) => "Request Progress Updated", RequestStateChangeEvent::FunctionRunMatchedCache(_) => { "Function Run Matched a Cached output" } + RequestStateChangeEvent::AllocationCreated(_) => "Allocation Created", + RequestStateChangeEvent::AllocationCompleted(_) => "Allocation Completed", + RequestStateChangeEvent::RequestProgressUpdated(_) => "Request Progress Updated", + // Legacy - maps to new message + RequestStateChangeEvent::FunctionRunAssigned(_) => "Allocation Created", } } } @@ -837,8 +863,9 @@ impl RequestEventMetadata for FunctionRunCreated { } } +/// Event emitted when an allocation (execution attempt) is created and assigned to an executor #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct FunctionRunAssigned { +pub struct AllocationCreated { pub namespace: String, pub application_name: String, pub application_version: String, @@ -851,7 +878,7 @@ pub struct FunctionRunAssigned { pub created_at: Option, } -impl RequestEventMetadata for FunctionRunAssigned { +impl RequestEventMetadata for AllocationCreated { fn namespace(&self) -> &str { &self.namespace } @@ -877,6 +904,9 @@ impl RequestEventMetadata for FunctionRunAssigned { } } +/// @deprecated Use AllocationCreated instead +pub type FunctionRunAssigned = AllocationCreated; + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "lowercase")] pub enum FunctionRunOutcomeSummary { @@ -885,6 +915,11 @@ pub enum FunctionRunOutcomeSummary { Failure, } +/// Event emitted when a function run reaches its final outcome (after all retries exhausted or success) +/// +/// Note: In older server versions (before allocation/function-run lifecycle split), +/// this event included `allocation_id`. For backward compatibility, `allocation_id` +/// is kept as an optional field. New server versions will not include it. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct FunctionRunCompleted { pub namespace: String, @@ -893,7 +928,10 @@ pub struct FunctionRunCompleted { pub request_id: String, pub function_name: String, pub function_run_id: String, - pub allocation_id: String, + /// Optional for backward compatibility with older servers. + /// New servers (with allocation lifecycle) won't include this field. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub allocation_id: Option, pub outcome: FunctionRunOutcomeSummary, #[serde(default)] pub created_at: Option, @@ -925,6 +963,47 @@ impl RequestEventMetadata for FunctionRunCompleted { } } +/// Event emitted when an allocation (execution attempt) completes with an outcome +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct AllocationCompleted { + pub namespace: String, + pub application_name: String, + pub application_version: String, + pub request_id: String, + pub function_name: String, + pub function_run_id: String, + pub allocation_id: String, + pub outcome: FunctionRunOutcomeSummary, + #[serde(default)] + pub created_at: Option, +} + +impl RequestEventMetadata for AllocationCompleted { + fn namespace(&self) -> &str { + &self.namespace + } + + fn application_name(&self) -> &str { + &self.application_name + } + + fn application_version(&self) -> &str { + &self.application_version + } + + fn request_id(&self) -> &str { + &self.request_id + } + + fn created_at(&self) -> Option<&DateTime> { + self.created_at.as_ref().map(|rfc| &rfc.0) + } + + fn set_created_at(&mut self, date: DateTime) { + self.created_at = Some(Rfc3339DateTime(date)); + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct FunctionRunMatchedCache { pub namespace: String, @@ -1377,4 +1456,174 @@ mod tests { "Expected 'Z' at end of created_at value" ); } + + // Backward compatibility tests for allocation events (PR #2042) + + #[test] + fn test_old_server_function_run_completed_with_allocation_id() { + // Old server sends FunctionRunCompleted WITH allocation_id + let json = json!({ + "FunctionRunCompleted": { + "namespace": "test-ns", + "application_name": "test-app", + "application_version": "1.0", + "request_id": "req-123", + "function_name": "my-func", + "function_run_id": "run-456", + "allocation_id": "alloc-789", + "outcome": "success" + } + }); + + let result: Result = serde_json::from_value(json); + assert!( + result.is_ok(), + "Failed to deserialize old server FunctionRunCompleted: {:?}", + result.err() + ); + + let event = result.unwrap(); + match event { + RequestStateChangeEvent::FunctionRunCompleted(e) => { + assert_eq!(e.allocation_id, Some("alloc-789".to_string())); + assert_eq!(e.function_run_id, "run-456"); + } + _ => panic!("Expected FunctionRunCompleted variant"), + } + } + + #[test] + fn test_new_server_function_run_completed_without_allocation_id() { + // New server sends FunctionRunCompleted WITHOUT allocation_id + let json = json!({ + "FunctionRunCompleted": { + "namespace": "test-ns", + "application_name": "test-app", + "application_version": "1.0", + "request_id": "req-123", + "function_name": "my-func", + "function_run_id": "run-456", + "outcome": "success" + } + }); + + let result: Result = serde_json::from_value(json); + assert!( + result.is_ok(), + "Failed to deserialize new server FunctionRunCompleted: {:?}", + result.err() + ); + + let event = result.unwrap(); + match event { + RequestStateChangeEvent::FunctionRunCompleted(e) => { + assert_eq!(e.allocation_id, None); + assert_eq!(e.function_run_id, "run-456"); + } + _ => panic!("Expected FunctionRunCompleted variant"), + } + } + + #[test] + fn test_old_server_function_run_assigned() { + // Old server sends FunctionRunAssigned + let json = json!({ + "FunctionRunAssigned": { + "namespace": "test-ns", + "application_name": "test-app", + "application_version": "1.0", + "request_id": "req-123", + "function_name": "my-func", + "function_run_id": "run-456", + "allocation_id": "alloc-789", + "executor_id": "exec-001" + } + }); + + let result: Result = serde_json::from_value(json); + assert!( + result.is_ok(), + "Failed to deserialize old server FunctionRunAssigned: {:?}", + result.err() + ); + + let event = result.unwrap(); + // Should deserialize to FunctionRunAssigned variant (backward compat) + match event { + RequestStateChangeEvent::FunctionRunAssigned(e) => { + assert_eq!(e.allocation_id, "alloc-789"); + assert_eq!(e.executor_id, "exec-001"); + } + _ => panic!( + "Expected FunctionRunAssigned variant, got {:?}", + event.as_str() + ), + } + } + + #[test] + fn test_new_server_allocation_created() { + // New server sends AllocationCreated + let json = json!({ + "AllocationCreated": { + "namespace": "test-ns", + "application_name": "test-app", + "application_version": "1.0", + "request_id": "req-123", + "function_name": "my-func", + "function_run_id": "run-456", + "allocation_id": "alloc-789", + "executor_id": "exec-001" + } + }); + + let result: Result = serde_json::from_value(json); + assert!( + result.is_ok(), + "Failed to deserialize new server AllocationCreated: {:?}", + result.err() + ); + + let event = result.unwrap(); + match event { + RequestStateChangeEvent::AllocationCreated(e) => { + assert_eq!(e.allocation_id, "alloc-789"); + assert_eq!(e.executor_id, "exec-001"); + } + _ => panic!("Expected AllocationCreated variant"), + } + } + + #[test] + fn test_new_server_allocation_completed() { + // New server sends AllocationCompleted + let json = json!({ + "AllocationCompleted": { + "namespace": "test-ns", + "application_name": "test-app", + "application_version": "1.0", + "request_id": "req-123", + "function_name": "my-func", + "function_run_id": "run-456", + "allocation_id": "alloc-789", + "outcome": "failure" + } + }); + + let result: Result = serde_json::from_value(json); + assert!( + result.is_ok(), + "Failed to deserialize new server AllocationCompleted: {:?}", + result.err() + ); + + let event = result.unwrap(); + match event { + RequestStateChangeEvent::AllocationCompleted(e) => { + assert_eq!(e.allocation_id, "alloc-789"); + assert_eq!(e.outcome, FunctionRunOutcomeSummary::Failure); + } + _ => panic!("Expected AllocationCompleted variant"), + } + } }