DescribeWorkflow reads pending nexus operations from CHASM#9913
DescribeWorkflow reads pending nexus operations from CHASM#9913S15 wants to merge 1 commit intotemporalio:mainfrom
Conversation
| o.Status = status | ||
| } | ||
|
|
||
| func cancellationAPIState(status nexusoperationpb.CancellationStatus) enumspb.NexusOperationCancellationState { |
There was a problem hiding this comment.
This should be implemented in @stephanos's standalone nexus branch. Add a TODO to remove the duplication please.
| // ToCancellationInfo converts a CHASM Cancellation to the API NexusOperationCancellationInfo format. | ||
| func (o *Cancellation) ToCancellationInfo(circuitBreakerOpen func(endpoint string) bool, endpoint string) *workflowpb.NexusOperationCancellationInfo { |
There was a problem hiding this comment.
| // ToCancellationInfo converts a CHASM Cancellation to the API NexusOperationCancellationInfo format. | |
| func (o *Cancellation) ToCancellationInfo(circuitBreakerOpen func(endpoint string) bool, endpoint string) *workflowpb.NexusOperationCancellationInfo { | |
| // ToWorkflowCancellationInfo converts a CHASM Cancellation to the API workflow.NexusOperationCancellationInfo format. | |
| func (o *Cancellation) ToWorkflowCancellationInfo(circuitBreakerOpen func(endpoint string) bool, endpoint string) *workflowpb.NexusOperationCancellationInfo { |
| return nil | ||
| } | ||
|
|
||
| func pendingOperationState(status nexusoperationpb.OperationStatus) enumspb.PendingNexusOperationState { |
There was a problem hiding this comment.
Same as above, it's part of standalone nexus.
| // ToPendingNexusOperationInfo converts a CHASM Operation to the API PendingNexusOperationInfo format. | ||
| // Returns nil if the operation is not in a pending state. | ||
| func (o *Operation) ToPendingNexusOperationInfo( |
There was a problem hiding this comment.
| // ToPendingNexusOperationInfo converts a CHASM Operation to the API PendingNexusOperationInfo format. | |
| // Returns nil if the operation is not in a pending state. | |
| func (o *Operation) ToPendingNexusOperationInfo( | |
| // ToWorkflowPendingNexusOperationInfo converts a CHASM Operation to the API workflow.PendingNexusOperationInfo format. | |
| // Returns nil if the operation is not in a pending state. | |
| func (o *Operation) ToWorkflowPendingNexusOperationInfo( |
There was a problem hiding this comment.
Also tempted to put this in chasm/lib/workflow instead because the component should be mostly agnostic to where it is used.
| if state == enumspb.PENDING_NEXUS_OPERATION_STATE_SCHEDULED && invocationCBOpen(o.Endpoint) { | ||
| state = enumspb.PENDING_NEXUS_OPERATION_STATE_BLOCKED | ||
| blockedReason = "The circuit breaker is open." | ||
| } |
There was a problem hiding this comment.
@stephanos I don't remember seeing this in your standalone branch. Did we miss this?
| } | ||
|
|
||
| // buildPendingNexusOperationInfosFromChasm reads nexus operations from the CHASM tree and converts them to API format. | ||
| func buildPendingNexusOperationInfosFromChasm( |
There was a problem hiding this comment.
Move all of this to chasm/lib/workflow please.
| invocationBreaker := func(endpoint string) bool { | ||
| cb := outboundQueueCBPool.Get(tasks.TaskGroupNamespaceIDAndDestination{ | ||
| TaskGroup: nexusoperations.TaskTypeInvocation, | ||
| NamespaceID: namespaceID.String(), | ||
| Destination: endpoint, | ||
| }) | ||
| return cb.State() != gobreaker.StateClosed | ||
| } | ||
|
|
||
| cancellationBreaker := func(endpoint string) bool { | ||
| cb := outboundQueueCBPool.Get(tasks.TaskGroupNamespaceIDAndDestination{ | ||
| TaskGroup: nexusoperations.TaskTypeCancelation, | ||
| NamespaceID: namespaceID.String(), | ||
| Destination: endpoint, | ||
| }) | ||
| return cb.State() != gobreaker.StateClosed | ||
| } |
There was a problem hiding this comment.
I think the task group is wrong here, it using definitions from the HSM implementation.
Let's also make sure we have a way to put both of these in the same task group. There's no need to separate them out. It's been an outstanding item for a while.
How did you test it?