diff --git a/task/orchestrator.go b/task/orchestrator.go index 8afc3ac..3aaadd3 100644 --- a/task/orchestrator.go +++ b/task/orchestrator.go @@ -366,8 +366,17 @@ func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task { return ctx.createTimerInternal(delay) } +// CreateTimerAt schedules a durable timer that fires at the specified absolute UTC time. +func (ctx *OrchestrationContext) CreateTimerAt(fireAt time.Time) Task { + return ctx.createTimerAtInternal(fireAt) +} + func (ctx *OrchestrationContext) createTimerInternal(delay time.Duration) *completableTask { fireAt := ctx.CurrentTimeUtc.Add(delay) + return ctx.createTimerAtInternal(fireAt) +} + +func (ctx *OrchestrationContext) createTimerAtInternal(fireAt time.Time) *completableTask { timerAction := helpers.NewCreateTimerAction(ctx.getNextSequenceNumber(), fireAt) ctx.pendingActions[timerAction.Id] = timerAction diff --git a/tests/orchestrations_test.go b/tests/orchestrations_test.go index f7b66c6..f8514bb 100644 --- a/tests/orchestrations_test.go +++ b/tests/orchestrations_test.go @@ -88,6 +88,45 @@ func Test_SingleTimer(t *testing.T) { ) } +func Test_SingleTimerAt(t *testing.T) { + // Registration + r := task.NewTaskRegistry() + require.NoError(t, r.AddOrchestratorN("SingleTimerAt", func(ctx *task.OrchestrationContext) (any, error) { + // Schedule a timer that fires 1 second in the future using an absolute time + fireAt := ctx.CurrentTimeUtc.Add(1 * time.Second) + err := ctx.CreateTimerAt(fireAt).Await(nil) + return nil, err + })) + + // Initialization + ctx := context.Background() + exporter := initTracing() + client, worker := initTaskHubWorker(ctx, r) + defer func() { + if err := worker.Shutdown(ctx); err != nil { + t.Logf("shutdown: %v", err) + } + }() + + // Run the orchestration + id, err := client.ScheduleNewOrchestration(ctx, "SingleTimerAt") + if assert.NoError(t, err) { + metadata, err := client.WaitForOrchestrationCompletion(ctx, id) + if assert.NoError(t, err) { + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) + assert.GreaterOrEqual(t, metadata.LastUpdatedAt, metadata.CreatedAt) + } + } + + // Validate the exported OTel traces + spans := exporter.GetSpans().Snapshots() + assertSpanSequence(t, spans, + assertOrchestratorCreated("SingleTimerAt", id), + assertTimer(id), + assertOrchestratorExecuted("SingleTimerAt", id, "COMPLETED"), + ) +} + func Test_ConcurrentTimers(t *testing.T) { // Registration r := task.NewTaskRegistry()