Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,6 @@ func (wh *WorkflowHandler) validateTimeSkippingConfig(
timeSkippingConfig *workflowpb.TimeSkippingConfig,
namespaceName namespace.Name,
) error {

if timeSkippingConfig == nil {
return nil
}
Expand Down Expand Up @@ -722,7 +721,7 @@ func (wh *WorkflowHandler) validateTimeSkippingConfig(
namespace.MinTimeSkippingDuration,
)
}
// todo: will need to check current virtual time in updateOptions scenario
// todo: need to adapt the timeSource after time-skipping timeSource is implemented
case *workflowpb.TimeSkippingConfig_MaxTargetTime:
if bound.MaxTargetTime.AsTime().Before(wh.namespaceHandler.timeSource.Now().Add(namespace.MinTimeSkippingDuration)) {
return serviceerror.NewUnimplementedf(
Expand Down Expand Up @@ -2456,6 +2455,17 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context, request *
return nil, serviceerror.NewInternalf("unknown reset reapply type: %v", request.GetResetReapplyType())
}

for _, postOp := range request.GetPostResetOperations() {
if updateOpts := postOp.GetUpdateWorkflowOptions(); updateOpts != nil {
if err := wh.validateTimeSkippingConfig(
updateOpts.GetWorkflowExecutionOptions().GetTimeSkippingConfig(),
namespace.Name(request.GetNamespace()),
); err != nil {
return nil, err
}
}
}

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
Expand Down Expand Up @@ -5669,9 +5679,25 @@ func (wh *WorkflowHandler) StartBatchOperation(
case *workflowservice.StartBatchOperationRequest_ResetOperation:
input.BatchType = enumspb.BATCH_OPERATION_TYPE_RESET
identity = op.ResetOperation.GetIdentity()
for _, postOp := range op.ResetOperation.GetPostResetOperations() {
if updateOpts := postOp.GetUpdateWorkflowOptions(); updateOpts != nil {
if err := wh.validateTimeSkippingConfig(
updateOpts.GetWorkflowExecutionOptions().GetTimeSkippingConfig(),
namespace.Name(request.GetNamespace()),
); err != nil {
return nil, err
}
}
}
case *workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation:
input.BatchType = enumspb.BATCH_OPERATION_TYPE_UPDATE_EXECUTION_OPTIONS
identity = op.UpdateWorkflowOptionsOperation.GetIdentity()
if err := wh.validateTimeSkippingConfig(
op.UpdateWorkflowOptionsOperation.GetWorkflowExecutionOptions().GetTimeSkippingConfig(),
namespace.Name(request.GetNamespace()),
); err != nil {
return nil, err
}
case *workflowservice.StartBatchOperationRequest_UnpauseActivitiesOperation:
input.BatchType = enumspb.BATCH_OPERATION_TYPE_UNPAUSE_ACTIVITY
identity = op.UnpauseActivitiesOperation.GetIdentity()
Expand Down Expand Up @@ -6876,6 +6902,9 @@ func (wh *WorkflowHandler) UpdateWorkflowExecutionOptions(
if err := priorities.Validate(opts.GetPriority()); err != nil {
return nil, err
}
if err := wh.validateTimeSkippingConfig(opts.GetTimeSkippingConfig(), namespace.Name(request.GetNamespace())); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If frontend.TimeSkippingEnabled is turned off during a workflow execution, validateTimeSkippingConfig will not allow disabling time skipping for a workflow. validateTimeSkippingConfig will return an error in that case. User will not be able to disable time skipping explicitly if frontend.TimeSkippingEnabled is false. Is that the intended behaviour?

Copy link
Copy Markdown
Contributor Author

@feiyang3cat feiyang3cat Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that is my understanding of how dynamic config feature flag works -> we don't allow users turn on a feature and have a bunch of workflows running with this feature and then turn off the feature and still want to try to manipulate the feature. this kind of flexibility will make the system complicated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may understand this incorrectly. cc @yycptt for a confirmation

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense. The one I pointed out is not a normal case scenario.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we don't support removing a feature flag after users enable it? I don't know I feel that will be weird.

return nil, err
}

namespaceID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
Expand Down
124 changes: 124 additions & 0 deletions service/frontend/workflow_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3486,6 +3486,107 @@ func (s *WorkflowHandlerSuite) TestSignalWithStartWorkflowExecution_TimeSkipping
s.True(resp.Started)
}

// TestResetWorkflowExecution_TimeSkipping_DCDisabled verifies that when the DC gate is off,
// a ResetWorkflowExecution request with a TimeSkippingConfig inside PostResetOperations is rejected.
func (s *WorkflowHandlerSuite) TestResetWorkflowExecution_TimeSkipping_DCDisabled() {
config := s.newConfig()
config.TimeSkippingEnabled = dc.GetBoolPropertyFnFilteredByNamespace(false)
wh := s.getWorkflowHandler(config)

_, err := wh.ResetWorkflowExecution(context.Background(), &workflowservice.ResetWorkflowExecutionRequest{
Namespace: s.testNamespace.String(),
RequestId: uuid.NewString(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "workflow-id",
RunId: uuid.NewString(),
},
PostResetOperations: []*workflowpb.PostResetOperation{
{
Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{
UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true},
},
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}},
},
},
},
},
})
var unimplemented *serviceerror.Unimplemented
s.ErrorAs(err, &unimplemented)
s.ErrorContains(err, "The Time-Skipping feature is not enabled for namespace")
}

// TestStartBatchOperation_ResetOperation_TimeSkipping_DCDisabled verifies that when the DC gate
// is off, a batch reset with a TimeSkippingConfig inside PostResetOperations is rejected.
func (s *WorkflowHandlerSuite) TestStartBatchOperation_ResetOperation_TimeSkipping_DCDisabled() {
config := s.newConfig()
config.TimeSkippingEnabled = dc.GetBoolPropertyFnFilteredByNamespace(false)
wh := s.getWorkflowHandler(config)
namespaceID := namespace.ID(uuid.NewString())
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes()
s.mockVisibilityMgr.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.CountWorkflowExecutionsResponse{Count: 0}, nil)

_, err := wh.StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
Namespace: s.testNamespace.String(),
JobId: uuid.NewString(),
Reason: "test",
VisibilityQuery: "WorkflowId='test'",
Operation: &workflowservice.StartBatchOperationRequest_ResetOperation{
ResetOperation: &batchpb.BatchOperationReset{
Options: &commonpb.ResetOptions{
Target: &commonpb.ResetOptions_WorkflowTaskId{WorkflowTaskId: 1},
},
PostResetOperations: []*workflowpb.PostResetOperation{
{
Variant: &workflowpb.PostResetOperation_UpdateWorkflowOptions_{
UpdateWorkflowOptions: &workflowpb.PostResetOperation_UpdateWorkflowOptions{
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true},
},
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}},
},
},
},
},
},
},
})
var unimplemented *serviceerror.Unimplemented
s.ErrorAs(err, &unimplemented)
s.ErrorContains(err, "The Time-Skipping feature is not enabled for namespace")
}

// TestStartBatchOperation_UpdateWorkflowOptionsOperation_TimeSkipping_DCDisabled verifies that
// when the DC gate is off, a batch UpdateWorkflowOptions with a TimeSkippingConfig is rejected.
func (s *WorkflowHandlerSuite) TestStartBatchOperation_UpdateWorkflowOptionsOperation_TimeSkipping_DCDisabled() {
config := s.newConfig()
config.TimeSkippingEnabled = dc.GetBoolPropertyFnFilteredByNamespace(false)
wh := s.getWorkflowHandler(config)
namespaceID := namespace.ID(uuid.NewString())
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceID, nil).AnyTimes()
s.mockVisibilityMgr.EXPECT().CountWorkflowExecutions(gomock.Any(), gomock.Any()).Return(&manager.CountWorkflowExecutionsResponse{Count: 0}, nil)

_, err := wh.StartBatchOperation(context.Background(), &workflowservice.StartBatchOperationRequest{
Namespace: s.testNamespace.String(),
JobId: uuid.NewString(),
Reason: "test",
VisibilityQuery: "WorkflowId='test'",
Operation: &workflowservice.StartBatchOperationRequest_UpdateWorkflowOptionsOperation{
UpdateWorkflowOptionsOperation: &batchpb.BatchOperationUpdateWorkflowExecutionOptions{
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true},
},
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}},
},
},
})
var unimplemented *serviceerror.Unimplemented
s.ErrorAs(err, &unimplemented)
s.ErrorContains(err, "The Time-Skipping feature is not enabled for namespace")
}

func (s *WorkflowHandlerSuite) newConfig() *Config {
return NewConfig(dc.NewNoopCollection(), numHistoryShards)
}
Expand Down Expand Up @@ -4475,6 +4576,29 @@ func (s *WorkflowHandlerSuite) TestUpdateWorkflowExecutionOptions_Priority() {
// NOTE: only testing a single validation scenario here; the priority validation has its own unit tests
}

// TestUpdateWorkflowExecutionOptions_TimeSkipping_DCDisabled verifies that when the DC gate is
// off, UpdateWorkflowExecutionOptions rejects a request containing a TimeSkippingConfig.
func (s *WorkflowHandlerSuite) TestUpdateWorkflowExecutionOptions_TimeSkipping_DCDisabled() {
config := s.newConfig()
config.TimeSkippingEnabled = dc.GetBoolPropertyFnFilteredByNamespace(false)
wh := s.getWorkflowHandler(config)

_, err := wh.UpdateWorkflowExecutionOptions(context.Background(), &workflowservice.UpdateWorkflowExecutionOptionsRequest{
Namespace: s.testNamespace.String(),
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: "workflow-id",
RunId: "run-id",
},
WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{
TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true},
},
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"time_skipping_config"}},
})
var unimplemented *serviceerror.Unimplemented
s.ErrorAs(err, &unimplemented)
s.ErrorContains(err, "The Time-Skipping feature is not enabled for namespace")
}

func (s *WorkflowHandlerSuite) TestUpdateActivityOptions_Priority() {
config := s.newConfig()
wh := s.getWorkflowHandler(config)
Expand Down
1 change: 1 addition & 0 deletions service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ func (s *Starter) handleUseExistingWorkflowOnConflictOptions(
links,
"", // identity
nil, // priority
nil, // timeSkippingConfig
)
return api.UpdateWorkflowWithoutWorkflowTask, err
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use existing execution on wfID conflict doesn't involve ts config changes

},
Expand Down
56 changes: 55 additions & 1 deletion service/history/api/updateworkflowoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func MergeAndApply(
if mergedOpts.GetVersioningOverride() == nil {
unsetOverride = true
}
_, err = ms.AddWorkflowExecutionOptionsUpdatedEvent(mergedOpts.GetVersioningOverride(), unsetOverride, "", nil, nil, identity, mergedOpts.GetPriority())
_, err = ms.AddWorkflowExecutionOptionsUpdatedEvent(mergedOpts.GetVersioningOverride(), unsetOverride, "", nil, nil, identity, mergedOpts.GetPriority(), mergedOpts.GetTimeSkippingConfig())
if err != nil {
return nil, hasChanges, err
}
Expand All @@ -172,6 +172,11 @@ func getOptionsFromMutableState(ms historyi.MutableState) *workflowpb.WorkflowEx
opts.Priority = cloned
}
}
if tsInfo := ms.GetExecutionInfo().GetTimeSkippingInfo(); tsInfo != nil {
if cloned, ok := proto.Clone(tsInfo.GetConfig()).(*workflowpb.TimeSkippingConfig); ok {
opts.TimeSkippingConfig = cloned
}
}
return opts
}

Expand Down Expand Up @@ -230,5 +235,54 @@ func mergeWorkflowExecutionOptions(
mergeInto.Priority.FairnessWeight = mergeFrom.Priority.GetFairnessWeight()
}

// ==== Time Skipping Config
// nil means "no change" — only update if the caller provided an explicit value.
if _, ok := updateFields["timeSkippingConfig"]; ok {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can updateFields contain something like timeSkippingConfig.bound if the client just wants to change that? I see that things like priority.fairnessWeight are checked in the above lines.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this commit : d72dcb3

if mergeFrom.GetTimeSkippingConfig() != nil {
mergeInto.TimeSkippingConfig = mergeFrom.GetTimeSkippingConfig()
}
}

if _, ok := updateFields["timeSkippingConfig.enabled"]; ok {
if mergeInto.TimeSkippingConfig == nil {
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
}
mergeInto.TimeSkippingConfig.Enabled = mergeFrom.GetTimeSkippingConfig().GetEnabled()
}

if _, ok := updateFields["timeSkippingConfig.disablePropagation"]; ok {
if mergeInto.TimeSkippingConfig == nil {
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
}
mergeInto.TimeSkippingConfig.DisablePropagation = mergeFrom.GetTimeSkippingConfig().GetDisablePropagation()
}

if _, ok := updateFields["timeSkippingConfig.maxSkippedDuration"]; ok {
if mergeInto.TimeSkippingConfig == nil {
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
}
mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxSkippedDuration{
MaxSkippedDuration: mergeFrom.GetTimeSkippingConfig().GetMaxSkippedDuration(),
}
}

if _, ok := updateFields["timeSkippingConfig.maxElapsedDuration"]; ok {
if mergeInto.TimeSkippingConfig == nil {
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
}
mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxElapsedDuration{
MaxElapsedDuration: mergeFrom.GetTimeSkippingConfig().GetMaxElapsedDuration(),
}
}

if _, ok := updateFields["timeSkippingConfig.maxTargetTime"]; ok {
if mergeInto.TimeSkippingConfig == nil {
mergeInto.TimeSkippingConfig = &workflowpb.TimeSkippingConfig{}
}
mergeInto.TimeSkippingConfig.Bound = &workflowpb.TimeSkippingConfig_MaxTargetTime{
MaxTargetTime: mergeFrom.GetTimeSkippingConfig().GetMaxTargetTime(),
}
}

return mergeInto, nil
}
Loading
Loading