From ccc484928e4cd88d271d2b0e52c3b86f464c433d Mon Sep 17 00:00:00 2001 From: yaythomas Date: Mon, 27 Oct 2025 17:11:18 -0700 Subject: [PATCH] fix(testing-sdk): checkpoint validation parent & duplicate IDs The SDK now uses background thread batching for checkpoints, which can send multiple updates for the same operation in a single batch (e.g., START followed by SUCCEED for fast-completing operations). Updated the checkpoint validator to allow this valid behavior. Changes: - Allow duplicate operation IDs in checkpoint batches for STEP/CONTEXT operations when first action is START and subsequent is non-START - Reject duplicate IDs for other operation types (WAIT, CALLBACK, etc.) - Add 11 new tests covering all duplicate/inconsistency scenarios - Add pragma comments to Protocol method stubs in serialization.py --- .../checkpoint/validators/checkpoint.py | 95 +++++- .../web/serialization.py | 4 +- .../checkpoint/validators/checkpoint_test.py | 315 +++++++++++++++++- 3 files changed, 394 insertions(+), 20 deletions(-) diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/validators/checkpoint.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/validators/checkpoint.py index e6971377..86d654db 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/validators/checkpoint.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/validators/checkpoint.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING from aws_durable_execution_sdk_python.lambda_service import ( + OperationAction, OperationType, OperationUpdate, ) @@ -83,6 +84,7 @@ def _validate_operation_update( update: OperationUpdate, execution: Execution ) -> None: """Validate a single operation update.""" + CheckpointValidator._validate_inconsistent_operation_metadata(update, execution) CheckpointValidator._validate_payload_sizes(update) ValidActionsByOperationTypeValidator.validate( update.operation_type, update.action @@ -127,32 +129,99 @@ def _validate_operation_status_transition( raise InvalidParameterValueException(msg) + @staticmethod + def _validate_inconsistent_operation_metadata( + update: OperationUpdate, execution: Execution + ) -> None: + """Validate that operation metadata is consistent with existing operation.""" + current_state = None + for operation in execution.operations: + if operation.operation_id == update.operation_id: + current_state = operation + break + + if current_state is not None: + if ( + update.operation_type is not None + and update.operation_type != current_state.operation_type + ): + msg: str = "Inconsistent operation type." + raise InvalidParameterValueException(msg) + + if ( + update.sub_type is not None + and update.sub_type != current_state.sub_type + ): + msg_subtype: str = "Inconsistent operation subtype." + raise InvalidParameterValueException(msg_subtype) + + if update.name is not None and update.name != current_state.name: + msg_name: str = "Inconsistent operation name." + raise InvalidParameterValueException(msg_name) + + if ( + update.parent_id is not None + and update.parent_id != current_state.parent_id + ): + msg_parent: str = "Inconsistent parent operation id." + raise InvalidParameterValueException(msg_parent) + @staticmethod def _validate_parent_id_and_duplicate_id( updates: list[OperationUpdate], execution: Execution ) -> None: - """Validate parent IDs and check for duplicate operation IDs.""" - operations_seen: MutableMapping[str, OperationUpdate] = {} + """Validate parent IDs and check for duplicate operation IDs. + + Validate that any provided parentId is valid, and also validate no duplicate operation is being + updated at the same time (unless it is a STEP/CONTEXT starting + performing one more non-START action). + """ + operations_started: MutableMapping[str, OperationUpdate] = {} + last_updates_seen: MutableMapping[str, OperationUpdate] = {} for update in updates: - if update.operation_id in operations_seen: - msg: str = "Cannot update the same operation twice in a single request." - raise InvalidParameterValueException(msg) + if CheckpointValidator._is_invalid_duplicate_update( + update, last_updates_seen + ): + msg_duplicate: str = ( + "Cannot checkpoint multiple operations with the same ID." + ) + raise InvalidParameterValueException(msg_duplicate) if not CheckpointValidator._is_valid_parent_for_update( - execution, update, operations_seen + execution, update, operations_started ): - msg_invalid_parent: str = "Invalid parent operation id." + msg_parent: str = "Invalid parent operation id." + raise InvalidParameterValueException(msg_parent) + + if update.action == OperationAction.START: + operations_started[update.operation_id] = update + + last_updates_seen[update.operation_id] = update + + @staticmethod + def _is_invalid_duplicate_update( + update: OperationUpdate, last_updates_seen: MutableMapping[str, OperationUpdate] + ) -> bool: + """Check if this is an invalid duplicate update.""" + last_update = last_updates_seen.get(update.operation_id) + if last_update is None: + return False - raise InvalidParameterValueException(msg_invalid_parent) + if last_update.operation_type in (OperationType.STEP, OperationType.CONTEXT): + # Allow duplicate for STEP/CONTEXT if last was START and current is not START + allow_duplicate = ( + last_update.action == OperationAction.START + and update.action != OperationAction.START + ) + return not allow_duplicate - operations_seen[update.operation_id] = update + return True @staticmethod def _is_valid_parent_for_update( execution: Execution, update: OperationUpdate, - operations_seen: MutableMapping[str, OperationUpdate], + operations_started: MutableMapping[str, OperationUpdate], ) -> bool: """Check if the parent ID is valid for the update.""" parent_id = update.parent_id @@ -160,10 +229,12 @@ def _is_valid_parent_for_update( if parent_id is None: return True - if parent_id in operations_seen: - parent_update = operations_seen[parent_id] + # Check if parent is in operations started in this batch + if parent_id in operations_started: + parent_update = operations_started[parent_id] return parent_update.operation_type == OperationType.CONTEXT + # Check if parent exists in current execution state for operation in execution.operations: if operation.operation_id == parent_id: return operation.operation_type == OperationType.CONTEXT diff --git a/src/aws_durable_execution_sdk_python_testing/web/serialization.py b/src/aws_durable_execution_sdk_python_testing/web/serialization.py index f6f4483a..7af7f71d 100644 --- a/src/aws_durable_execution_sdk_python_testing/web/serialization.py +++ b/src/aws_durable_execution_sdk_python_testing/web/serialization.py @@ -36,7 +36,7 @@ def to_bytes(self, data: Any) -> bytes: Raises: InvalidParameterValueException: If serialization fails """ - ... + ... # pragma: no cover class Deserializer(Protocol): @@ -54,7 +54,7 @@ def from_bytes(self, data: bytes) -> dict[str, Any]: Raises: InvalidParameterValueException: If deserialization fails """ - ... + ... # pragma: no cover class AwsRestJsonSerializer: diff --git a/tests/checkpoint/validators/checkpoint_test.py b/tests/checkpoint/validators/checkpoint_test.py index c00d0c6b..6777d74e 100644 --- a/tests/checkpoint/validators/checkpoint_test.py +++ b/tests/checkpoint/validators/checkpoint_test.py @@ -167,7 +167,12 @@ def test_validate_payload_sizes_error_within_limit(): def test_validate_duplicate_operation_ids(): - """Test validation fails with duplicate operation IDs.""" + """Test validation allows duplicate operation IDs in same batch. + + With background batching, the SDK can send multiple updates for the same + operation in a single batch (e.g., START followed by SUCCEED). This is + valid behavior and should be allowed. + """ execution = _create_test_execution() updates = [ OperationUpdate( @@ -182,11 +187,8 @@ def test_validate_duplicate_operation_ids(): ), ] - with pytest.raises( - InvalidParameterValueException, - match="Cannot update the same operation twice in a single request", - ): - CheckpointValidator.validate_input(updates, execution) + # Should not raise - duplicate operation IDs are allowed in batches + CheckpointValidator.validate_input(updates, execution) def test_validate_valid_parent_id_in_execution(): @@ -404,3 +406,304 @@ def test_validate_operation_status_transition_execution(): ) ] CheckpointValidator.validate_input(updates, execution) + + +def test_validate_inconsistent_operation_type(): + """Test validation fails when operation type is inconsistent.""" + execution = _create_test_execution() + + # Add existing operation + step_op = Operation( + operation_id="op-1", + operation_type=OperationType.STEP, + status=OperationStatus.STARTED, + ) + execution.operations.append(step_op) + + # Try to update with different type + updates = [ + OperationUpdate( + operation_id="op-1", + operation_type=OperationType.CONTEXT, + action=OperationAction.SUCCEED, + ) + ] + + with pytest.raises( + InvalidParameterValueException, match="Inconsistent operation type" + ): + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_inconsistent_operation_subtype(): + """Test validation fails when operation subtype is inconsistent.""" + execution = _create_test_execution() + + # Add existing operation with subtype + from aws_durable_execution_sdk_python.lambda_service import OperationSubType + + context_op = Operation( + operation_id="op-1", + operation_type=OperationType.CONTEXT, + status=OperationStatus.STARTED, + sub_type=OperationSubType.PARALLEL, + ) + execution.operations.append(context_op) + + # Try to update with different subtype + updates = [ + OperationUpdate( + operation_id="op-1", + operation_type=OperationType.CONTEXT, + action=OperationAction.SUCCEED, + sub_type=OperationSubType.MAP, + ) + ] + + with pytest.raises( + InvalidParameterValueException, match="Inconsistent operation subtype" + ): + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_inconsistent_operation_name(): + """Test validation fails when operation name is inconsistent.""" + execution = _create_test_execution() + + # Add existing operation with name + step_op = Operation( + operation_id="op-1", + operation_type=OperationType.STEP, + status=OperationStatus.STARTED, + name="original_name", + ) + execution.operations.append(step_op) + + # Try to update with different name + updates = [ + OperationUpdate( + operation_id="op-1", + operation_type=OperationType.STEP, + action=OperationAction.SUCCEED, + name="different_name", + ) + ] + + with pytest.raises( + InvalidParameterValueException, match="Inconsistent operation name" + ): + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_inconsistent_parent_operation_id(): + """Test validation fails when parent operation ID is inconsistent.""" + execution = _create_test_execution() + + # Add TWO context operations + context_op1 = Operation( + operation_id="context-1", + operation_type=OperationType.CONTEXT, + status=OperationStatus.STARTED, + ) + execution.operations.append(context_op1) + + context_op2 = Operation( + operation_id="context-2", + operation_type=OperationType.CONTEXT, + status=OperationStatus.STARTED, + ) + execution.operations.append(context_op2) + + # Add existing step with parent context-1 + step_op = Operation( + operation_id="step-1", + operation_type=OperationType.STEP, + status=OperationStatus.STARTED, + parent_id="context-1", + ) + execution.operations.append(step_op) + + # Try to update with different parent context-2 (which exists, so passes parent validation) + updates = [ + OperationUpdate( + operation_id="step-1", + operation_type=OperationType.STEP, + action=OperationAction.SUCCEED, + parent_id="context-2", + ) + ] + + with pytest.raises( + InvalidParameterValueException, match="Inconsistent parent operation id" + ): + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_invalid_duplicate_wait_operations(): + """Test validation fails with duplicate WAIT operations.""" + execution = _create_test_execution() + + # WAIT operations cannot have duplicate updates in same batch + updates = [ + OperationUpdate( + operation_id="wait-1", + operation_type=OperationType.WAIT, + action=OperationAction.START, + ), + OperationUpdate( + operation_id="wait-1", + operation_type=OperationType.WAIT, + action=OperationAction.CANCEL, + ), + ] + + with pytest.raises( + InvalidParameterValueException, + match="Cannot checkpoint multiple operations with the same ID", + ): + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_invalid_duplicate_callback_operations(): + """Test validation fails with duplicate CALLBACK operations.""" + execution = _create_test_execution() + + # CALLBACK operations cannot have duplicate updates in same batch + updates = [ + OperationUpdate( + operation_id="callback-1", + operation_type=OperationType.CALLBACK, + action=OperationAction.START, + ), + OperationUpdate( + operation_id="callback-1", + operation_type=OperationType.CALLBACK, + action=OperationAction.SUCCEED, + ), + ] + + with pytest.raises( + InvalidParameterValueException, + match="Cannot checkpoint multiple operations with the same ID", + ): + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_invalid_duplicate_invoke_operations(): + """Test validation fails with duplicate CHAINED_INVOKE operations.""" + execution = _create_test_execution() + + # CHAINED_INVOKE operations cannot have duplicate updates in same batch + updates = [ + OperationUpdate( + operation_id="invoke-1", + operation_type=OperationType.CHAINED_INVOKE, + action=OperationAction.START, + ), + OperationUpdate( + operation_id="invoke-1", + operation_type=OperationType.CHAINED_INVOKE, + action=OperationAction.SUCCEED, + ), + ] + + with pytest.raises( + InvalidParameterValueException, + match="Cannot checkpoint multiple operations with the same ID", + ): + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_invalid_duplicate_execution_operations(): + """Test validation fails with duplicate EXECUTION operations.""" + execution = _create_test_execution() + + # EXECUTION operations cannot have duplicate updates in same batch + # (though this is also caught by _validate_conflicting_execution_update) + updates = [ + OperationUpdate( + operation_id="exec-1", + operation_type=OperationType.EXECUTION, + action=OperationAction.SUCCEED, + ), + OperationUpdate( + operation_id="exec-1", + operation_type=OperationType.EXECUTION, + action=OperationAction.SUCCEED, + ), + ] + + with pytest.raises(InvalidParameterValueException): + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_duplicate_context_start_then_succeed(): + """Test validation allows CONTEXT START followed by SUCCEED.""" + execution = _create_test_execution() + + # CONTEXT operations can have START + non-START in same batch + updates = [ + OperationUpdate( + operation_id="context-1", + operation_type=OperationType.CONTEXT, + action=OperationAction.START, + ), + OperationUpdate( + operation_id="context-1", + operation_type=OperationType.CONTEXT, + action=OperationAction.SUCCEED, + ), + ] + + # Should not raise + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_invalid_duplicate_context_non_start(): + """Test validation fails with duplicate CONTEXT non-START operations.""" + execution = _create_test_execution() + + # CONTEXT operations cannot have duplicate non-START updates + updates = [ + OperationUpdate( + operation_id="context-1", + operation_type=OperationType.CONTEXT, + action=OperationAction.SUCCEED, + ), + OperationUpdate( + operation_id="context-1", + operation_type=OperationType.CONTEXT, + action=OperationAction.SUCCEED, + ), + ] + + with pytest.raises( + InvalidParameterValueException, + match="Cannot checkpoint multiple operations with the same ID", + ): + CheckpointValidator.validate_input(updates, execution) + + +def test_validate_invalid_duplicate_step_non_start(): + """Test validation fails with duplicate STEP non-START operations.""" + execution = _create_test_execution() + + # STEP operations cannot have duplicate non-START updates + updates = [ + OperationUpdate( + operation_id="step-1", + operation_type=OperationType.STEP, + action=OperationAction.SUCCEED, + ), + OperationUpdate( + operation_id="step-1", + operation_type=OperationType.STEP, + action=OperationAction.SUCCEED, + ), + ] + + with pytest.raises( + InvalidParameterValueException, + match="Cannot checkpoint multiple operations with the same ID", + ): + CheckpointValidator.validate_input(updates, execution)