Skip to content
This repository was archived by the owner on Jun 10, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING

from aws_durable_execution_sdk_python.lambda_service import (
OperationAction,
OperationType,
OperationUpdate,
)
Expand Down Expand Up @@ -83,6 +84,7 @@ def _validate_operation_update(
update: OperationUpdate, execution: Execution
) -> None:
"""Validate a single operation update."""
CheckpointValidator._validate_inconsistent_operation_metadata(update, execution)
CheckpointValidator._validate_payload_sizes(update)
ValidActionsByOperationTypeValidator.validate(
update.operation_type, update.action
Expand Down Expand Up @@ -127,43 +129,112 @@ def _validate_operation_status_transition(

raise InvalidParameterValueException(msg)

@staticmethod
def _validate_inconsistent_operation_metadata(
update: OperationUpdate, execution: Execution
) -> None:
"""Validate that operation metadata is consistent with existing operation."""
current_state = None
for operation in execution.operations:
if operation.operation_id == update.operation_id:
current_state = operation
break

if current_state is not None:
if (
update.operation_type is not None
and update.operation_type != current_state.operation_type
):
msg: str = "Inconsistent operation type."
raise InvalidParameterValueException(msg)

if (
update.sub_type is not None
and update.sub_type != current_state.sub_type
):
msg_subtype: str = "Inconsistent operation subtype."
raise InvalidParameterValueException(msg_subtype)

if update.name is not None and update.name != current_state.name:
msg_name: str = "Inconsistent operation name."
raise InvalidParameterValueException(msg_name)

if (
update.parent_id is not None
and update.parent_id != current_state.parent_id
):
msg_parent: str = "Inconsistent parent operation id."
raise InvalidParameterValueException(msg_parent)

@staticmethod
def _validate_parent_id_and_duplicate_id(
updates: list[OperationUpdate], execution: Execution
) -> None:
"""Validate parent IDs and check for duplicate operation IDs."""
operations_seen: MutableMapping[str, OperationUpdate] = {}
"""Validate parent IDs and check for duplicate operation IDs.

Validate that any provided parentId is valid, and also validate no duplicate operation is being
updated at the same time (unless it is a STEP/CONTEXT starting + performing one more non-START action).
"""
operations_started: MutableMapping[str, OperationUpdate] = {}
last_updates_seen: MutableMapping[str, OperationUpdate] = {}

for update in updates:
if update.operation_id in operations_seen:
msg: str = "Cannot update the same operation twice in a single request."
raise InvalidParameterValueException(msg)
if CheckpointValidator._is_invalid_duplicate_update(
update, last_updates_seen
):
msg_duplicate: str = (
"Cannot checkpoint multiple operations with the same ID."
)
Comment thread
yaythomas marked this conversation as resolved.
raise InvalidParameterValueException(msg_duplicate)

if not CheckpointValidator._is_valid_parent_for_update(
execution, update, operations_seen
execution, update, operations_started
):
msg_invalid_parent: str = "Invalid parent operation id."
msg_parent: str = "Invalid parent operation id."
raise InvalidParameterValueException(msg_parent)

if update.action == OperationAction.START:
operations_started[update.operation_id] = update

last_updates_seen[update.operation_id] = update

@staticmethod
def _is_invalid_duplicate_update(
update: OperationUpdate, last_updates_seen: MutableMapping[str, OperationUpdate]
) -> bool:
"""Check if this is an invalid duplicate update."""
last_update = last_updates_seen.get(update.operation_id)
if last_update is None:
return False

raise InvalidParameterValueException(msg_invalid_parent)
if last_update.operation_type in (OperationType.STEP, OperationType.CONTEXT):
# Allow duplicate for STEP/CONTEXT if last was START and current is not START
allow_duplicate = (
last_update.action == OperationAction.START
and update.action != OperationAction.START
)
return not allow_duplicate

operations_seen[update.operation_id] = update
return True

@staticmethod
def _is_valid_parent_for_update(
execution: Execution,
update: OperationUpdate,
operations_seen: MutableMapping[str, OperationUpdate],
operations_started: MutableMapping[str, OperationUpdate],
) -> bool:
"""Check if the parent ID is valid for the update."""
parent_id = update.parent_id

if parent_id is None:
return True

if parent_id in operations_seen:
parent_update = operations_seen[parent_id]
# Check if parent is in operations started in this batch
if parent_id in operations_started:
parent_update = operations_started[parent_id]
return parent_update.operation_type == OperationType.CONTEXT

# Check if parent exists in current execution state
for operation in execution.operations:
if operation.operation_id == parent_id:
return operation.operation_type == OperationType.CONTEXT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def to_bytes(self, data: Any) -> bytes:
Raises:
InvalidParameterValueException: If serialization fails
"""
...
... # pragma: no cover


class Deserializer(Protocol):
Expand All @@ -54,7 +54,7 @@ def from_bytes(self, data: bytes) -> dict[str, Any]:
Raises:
InvalidParameterValueException: If deserialization fails
"""
...
... # pragma: no cover


class AwsRestJsonSerializer:
Expand Down
Loading
Loading