Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 148 additions & 146 deletions api/protos/backend_service.pb.go

Large diffs are not rendered by default.

2,439 changes: 2,439 additions & 0 deletions api/protos/history_events.pb.go

Large diffs are not rendered by default.

1,226 changes: 1,226 additions & 0 deletions api/protos/orchestration.pb.go

Large diffs are not rendered by default.

1,061 changes: 1,061 additions & 0 deletions api/protos/orchestrator_actions.pb.go

Large diffs are not rendered by default.

6,682 changes: 1,287 additions & 5,395 deletions api/protos/orchestrator_service.pb.go

Large diffs are not rendered by default.

173 changes: 87 additions & 86 deletions api/protos/runtime_state.pb.go

Large diffs are not rendered by default.

22 changes: 16 additions & 6 deletions backend/runtimestate/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,25 @@ func (a *Applier) Actions(s *protos.OrchestrationRuntimeState, customStatus *wra
s.PendingMessages = append(s.PendingMessages, msg)
}
}
} else if createtimer := action.GetCreateTimer(); createtimer != nil {
} else if timerAction := action.GetCreateTimer(); timerAction != nil {
timerCreated := &protos.TimerCreatedEvent{
FireAt: timerAction.FireAt,
Name: timerAction.Name,
}
if externalEvent := timerAction.GetExternalEvent(); externalEvent != nil {
timerCreated.Origin = &protos.TimerCreatedEvent_ExternalEvent{
ExternalEvent: externalEvent,
}
} else if ct := timerAction.GetCreateTimer(); ct != nil {
timerCreated.Origin = &protos.TimerCreatedEvent_CreateTimer{
CreateTimer: ct,
}
}
_ = AddEvent(s, &protos.HistoryEvent{
EventId: action.Id,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_TimerCreated{
TimerCreated: &protos.TimerCreatedEvent{
FireAt: createtimer.FireAt,
Name: createtimer.Name,
},
TimerCreated: timerCreated,
},
Router: action.Router,
})
Expand All @@ -151,7 +161,7 @@ func (a *Applier) Actions(s *protos.OrchestrationRuntimeState, customStatus *wra
EventType: &protos.HistoryEvent_TimerFired{
TimerFired: &protos.TimerFiredEvent{
TimerId: action.Id,
FireAt: createtimer.FireAt,
FireAt: timerAction.FireAt,
},
},
})
Expand Down
45 changes: 37 additions & 8 deletions task/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,31 @@ func (ctx *OrchestrationContext) createTimerInternal(name *string, delay time.Du
CreateTimer: &protos.CreateTimerAction{
FireAt: timestamppb.New(fireAt),
Name: name,
Origin: &protos.CreateTimerAction_CreateTimer{
CreateTimer: &protos.TimerOriginCreateTimer{},
},
},
},
}
ctx.pendingActions[timerAction.Id] = timerAction

task := newTask(ctx)
ctx.pendingTasks[timerAction.Id] = task
return task
}

func (ctx *OrchestrationContext) createExternalEventTimerInternal(eventName string, fireAt time.Time) *completableTask {
timerAction := &protos.OrchestratorAction{
Id: ctx.getNextSequenceNumber(),
OrchestratorActionType: &protos.OrchestratorAction_CreateTimer{
CreateTimer: &protos.CreateTimerAction{
FireAt: timestamppb.New(fireAt),
Name: &eventName,
Origin: &protos.CreateTimerAction_ExternalEvent{
ExternalEvent: &protos.TimerOriginExternalEvent{
Name: eventName,
},
},
},
},
}
Expand Down Expand Up @@ -494,16 +519,20 @@ func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout ti
}
taskElement := taskList.PushBack(task)

var fireAt time.Time
if timeout > 0 {
ctx.createTimerInternal(&eventName, timeout).onCompleted(func() {
task.cancel()
if taskList.Len() > 1 {
taskList.Remove(taskElement)
} else {
delete(ctx.pendingExternalEventTasks, key)
}
})
fireAt = ctx.CurrentTimeUtc.Add(timeout)
} else {
fireAt = time.Date(9999, 12, 31, 23, 59, 59, 999999999, time.UTC)
}
ctx.createExternalEventTimerInternal(eventName, fireAt).onCompleted(func() {
task.cancel()
Comment thread
cicoyle marked this conversation as resolved.
if taskList.Len() > 1 {
taskList.Remove(taskElement)
} else {
delete(ctx.pendingExternalEventTasks, key)
}
})
}
return task
}
Expand Down
7 changes: 6 additions & 1 deletion tests/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,12 @@ func Test_ScheduleTimerTasks(t *testing.T) {
getOrchestratorActions := func() []*protos.OrchestratorAction {
return []*protos.OrchestratorAction{{
OrchestratorActionType: &protos.OrchestratorAction_CreateTimer{
CreateTimer: &protos.CreateTimerAction{FireAt: timestamppb.New(expectedFireAt)},
CreateTimer: &protos.CreateTimerAction{
FireAt: timestamppb.New(expectedFireAt),
Origin: &protos.CreateTimerAction_CreateTimer{
CreateTimer: &protos.TimerOriginCreateTimer{},
},
},
},
}}
}
Expand Down
59 changes: 59 additions & 0 deletions tests/runtimestate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ func Test_CreateTimer(t *testing.T) {
CreateTimer: &protos.CreateTimerAction{
FireAt: timestamppb.New(expectedFireAt),
Name: &timerName,
Origin: &protos.CreateTimerAction_CreateTimer{
CreateTimer: &protos.TimerOriginCreateTimer{},
},
},
},
})
Expand All @@ -293,6 +296,7 @@ func Test_CreateTimer(t *testing.T) {
if timerCreated := e.GetTimerCreated(); assert.NotNil(t, timerCreated) {
assert.WithinDuration(t, expectedFireAt, timerCreated.FireAt.AsTime(), 0)
assert.Equal(t, timerName, timerCreated.GetName())
assert.NotNil(t, timerCreated.GetCreateTimer(), "expected TimerCreatedEvent to carry CreateTimer origin")
}
}
}
Expand All @@ -309,6 +313,61 @@ func Test_CreateTimer(t *testing.T) {
}
}

func Test_CreateTimer_ExternalEventOrigin(t *testing.T) {
const iid = "abc"
eventName := "myEvent"
expectedFireAt := time.Now().UTC().Add(30 * time.Minute)

s := runtimestate.NewOrchestrationRuntimeState(iid, nil, []*protos.HistoryEvent{
{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_ExecutionStarted{
ExecutionStarted: &protos.ExecutionStartedEvent{
Name: "MyOrchestration",
OrchestrationInstance: &protos.OrchestrationInstance{
InstanceId: iid,
ExecutionId: wrapperspb.String(uuid.New().String()),
},
},
},
},
})

actions := []*protos.OrchestratorAction{
{
Id: 1,
OrchestratorActionType: &protos.OrchestratorAction_CreateTimer{
CreateTimer: &protos.CreateTimerAction{
FireAt: timestamppb.New(expectedFireAt),
Name: &eventName,
Origin: &protos.CreateTimerAction_ExternalEvent{
ExternalEvent: &protos.TimerOriginExternalEvent{
Name: eventName,
},
},
},
},
},
}

applier := runtimestate.NewApplier("example")
continuedAsNew, err := applier.Actions(s, nil, actions, nil)
if assert.NoError(t, err) && assert.False(t, continuedAsNew) {
if assert.Len(t, s.NewEvents, 1) {
timerCreated := s.NewEvents[0].GetTimerCreated()
if assert.NotNil(t, timerCreated) {
assert.WithinDuration(t, expectedFireAt, timerCreated.FireAt.AsTime(), 0)
assert.Equal(t, eventName, timerCreated.GetName())
externalEvent := timerCreated.GetExternalEvent()
if assert.NotNil(t, externalEvent, "expected TimerCreatedEvent to carry ExternalEvent origin") {
assert.Equal(t, eventName, externalEvent.Name)
}
}
}
}
}

func Test_ScheduleTask(t *testing.T) {
const iid = "abc"
expectedTaskID := int32(1)
Expand Down
172 changes: 172 additions & 0 deletions tests/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,178 @@ func Test_Executor_WaitForEventSchedulesTimer(t *testing.T) {
require.NotNil(t, createTimerAction, "Expected the scheduled action to be a timer")
require.WithinDuration(t, startEvent.Timestamp.AsTime().Add(timerDuration), createTimerAction.FireAt.AsTime(), 0)
require.Equal(t, "MyEvent", createTimerAction.GetName())
require.NotNil(t, createTimerAction.GetExternalEvent())
require.Equal(t, "MyEvent", createTimerAction.GetExternalEvent().GetName())
Comment thread
acroca marked this conversation as resolved.
}

// Verifies that the WaitForSingleEvent API always creates a timer even when no timeout is specified (indefinite wait),
// with a far-future fireAt and the ExternalEvent origin set.
func Test_Executor_WaitForEventWithoutTimeout_CreatesInfiniteTimer(t *testing.T) {
r := task.NewTaskRegistry()
r.AddOrchestratorN("Orchestration", func(ctx *task.OrchestrationContext) (any, error) {
var value int
ctx.WaitForSingleEvent("MyEvent", -1).Await(&value)
return value, nil
})

iid := api.InstanceID("abc123")
newEvents := []*protos.HistoryEvent{
{
EventId: -1,
Timestamp: timestamppb.Now(),
EventType: &protos.HistoryEvent_OrchestratorStarted{
OrchestratorStarted: &protos.OrchestratorStartedEvent{},
},
},
{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_ExecutionStarted{
ExecutionStarted: &protos.ExecutionStartedEvent{
Name: "Orchestration",
OrchestrationInstance: &protos.OrchestrationInstance{
InstanceId: string(iid),
ExecutionId: wrapperspb.String(uuid.New().String()),
},
},
},
},
}

executor := task.NewTaskExecutor(r)
results, err := executor.ExecuteOrchestrator(ctx, iid, []*protos.HistoryEvent{}, newEvents)
require.NoError(t, err)
require.Equal(t, 1, len(results.Actions), "Expected a timer to be created even for indefinite waits")
createTimerAction := results.Actions[0].GetCreateTimer()
require.NotNil(t, createTimerAction, "Expected the action to be a timer")
expectedInfiniteFireAt := time.Date(9999, 12, 31, 23, 59, 59, 999999999, time.UTC)
require.Equal(t, expectedInfiniteFireAt, createTimerAction.FireAt.AsTime())
require.NotNil(t, createTimerAction.GetExternalEvent())
require.Equal(t, "MyEvent", createTimerAction.GetExternalEvent().GetName())
}

// Verifies that the CreateTimer API sets the CreateTimer origin on the resulting action.
func Test_Executor_CreateTimer_SetsCreateTimerOrigin(t *testing.T) {
timerDuration := 5 * time.Second
r := task.NewTaskRegistry()
r.AddOrchestratorN("Orchestration", func(ctx *task.OrchestrationContext) (any, error) {
return nil, ctx.CreateTimer(timerDuration).Await(nil)
})

iid := api.InstanceID("abc123")
newEvents := []*protos.HistoryEvent{
{
EventId: -1,
Timestamp: timestamppb.Now(),
EventType: &protos.HistoryEvent_OrchestratorStarted{
OrchestratorStarted: &protos.OrchestratorStartedEvent{},
},
},
{
EventId: -1,
Timestamp: timestamppb.New(time.Now()),
EventType: &protos.HistoryEvent_ExecutionStarted{
ExecutionStarted: &protos.ExecutionStartedEvent{
Name: "Orchestration",
OrchestrationInstance: &protos.OrchestrationInstance{
InstanceId: string(iid),
ExecutionId: wrapperspb.String(uuid.New().String()),
},
},
},
},
}

executor := task.NewTaskExecutor(r)
results, err := executor.ExecuteOrchestrator(ctx, iid, []*protos.HistoryEvent{}, newEvents)
require.NoError(t, err)
require.Equal(t, 1, len(results.Actions), "Expected a single action to be scheduled")
createTimerAction := results.Actions[0].GetCreateTimer()
require.NotNil(t, createTimerAction, "Expected the scheduled action to be a timer")
require.NotNil(t, createTimerAction.GetCreateTimer(), "Expected the timer action to carry CreateTimer origin")
}

// Verifies that when a timer fires before the external event arrives, the WaitForSingleEvent task
// is cancelled and the orchestration completes with a failure (ErrTaskCanceled).
func Test_Executor_WaitForEvent_TimerFiresCancelsTask(t *testing.T) {
timerDuration := 5 * time.Second
r := task.NewTaskRegistry()
r.AddOrchestratorN("Orchestration", func(ctx *task.OrchestrationContext) (any, error) {
var value int
if err := ctx.WaitForSingleEvent("MyEvent", timerDuration).Await(&value); err != nil {
return nil, err
}
return value, nil
})

iid := api.InstanceID("abc123")
executionID := uuid.New().String()
startTime := time.Now()

// oldEvents: replay of the first execution where the timer was created
oldEvents := []*protos.HistoryEvent{
{
EventId: -1,
Timestamp: timestamppb.New(startTime),
EventType: &protos.HistoryEvent_OrchestratorStarted{
OrchestratorStarted: &protos.OrchestratorStartedEvent{},
},
},
{
EventId: -1,
Timestamp: timestamppb.New(startTime),
EventType: &protos.HistoryEvent_ExecutionStarted{
ExecutionStarted: &protos.ExecutionStartedEvent{
Name: "Orchestration",
OrchestrationInstance: &protos.OrchestrationInstance{
InstanceId: string(iid),
ExecutionId: wrapperspb.String(executionID),
},
},
},
},
{
EventId: 0,
Timestamp: timestamppb.New(startTime),
EventType: &protos.HistoryEvent_TimerCreated{
TimerCreated: &protos.TimerCreatedEvent{
FireAt: timestamppb.New(startTime.Add(timerDuration)),
},
},
},
}

// newEvents: the timer fires (no external event arrived)
newEvents := []*protos.HistoryEvent{
{
EventId: -1,
Timestamp: timestamppb.New(startTime.Add(timerDuration)),
EventType: &protos.HistoryEvent_OrchestratorStarted{
OrchestratorStarted: &protos.OrchestratorStartedEvent{},
},
},
{
EventId: -1,
Timestamp: timestamppb.New(startTime.Add(timerDuration)),
EventType: &protos.HistoryEvent_TimerFired{
TimerFired: &protos.TimerFiredEvent{
TimerId: 0,
FireAt: timestamppb.New(startTime.Add(timerDuration)),
},
},
},
}

executor := task.NewTaskExecutor(r)
results, err := executor.ExecuteOrchestrator(ctx, iid, oldEvents, newEvents)
require.NoError(t, err)
require.Equal(t, 1, len(results.Actions), "Expected a single completion action")

completeAction := results.Actions[0].GetCompleteOrchestration()
require.NotNil(t, completeAction, "Expected a CompleteOrchestration action")
require.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED, completeAction.OrchestrationStatus)
require.NotNil(t, completeAction.FailureDetails)
require.Contains(t, completeAction.FailureDetails.ErrorMessage, "the task was canceled")
}

// This is a regression test for an issue where suspended orchestrations would continue to return
Expand Down
Loading