Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions internal/service/relational/workflows/step_execution_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@ func (s *StepExecutionService) UpdateStatus(ctx context.Context, id *uuid.UUID,
}

var current StepExecution
if err := s.db.WithContext(ctx).Select("status").First(&current, "id = ?", id).Error; err != nil {
if err := s.db.WithContext(ctx).Select("status", "due_date").First(&current, "id = ?", id).Error; err != nil {
return err
}
if current.Status == status {
return nil
}
if !isValidStepStatusTransition(current.Status, status) {
return fmt.Errorf("%w: %s -> %s", ErrInvalidStepExecutionStatusTransition, current.Status, status)
}
Expand All @@ -110,14 +113,6 @@ func (s *StepExecutionService) UpdateStatus(ctx context.Context, id *uuid.UUID,
switch status {
case "in_progress":
updates["started_at"] = now
if s.evidenceCreator != nil {
if err := s.evidenceCreator.AddStepStartedEvidence(ctx, id); err != nil {
// Log error but don't fail the status update
s.logger.Warnw("Failed to create step started evidence",
"step_execution_id", id,
"error", err)
}
}
case "completed":
updates["completed_at"] = now
case "overdue":
Expand All @@ -135,15 +130,27 @@ func (s *StepExecutionService) UpdateStatus(ctx context.Context, id *uuid.UUID,
}
if result.RowsAffected == 0 {
var latest StepExecution
if err := s.db.WithContext(ctx).Select("status").First(&latest, "id = ?", id).Error; err != nil {
if err := s.db.WithContext(ctx).Select("status", "due_date").First(&latest, "id = ?", id).Error; err != nil {
return err
}
if latest.Status == status {
return nil
}
if !isValidStepStatusTransition(latest.Status, status) {
return fmt.Errorf("%w: %s -> %s", ErrInvalidStepExecutionStatusTransition, latest.Status, status)
}
return fmt.Errorf("%w: %s -> %s", ErrStepExecutionStatusTransitionConflict, current.Status, status)
}

if status == StepStatusInProgress.String() && s.evidenceCreator != nil {
if err := s.evidenceCreator.AddStepStartedEvidence(ctx, id); err != nil {
// Log error but don't fail the status update
s.logger.Warnw("Failed to create step started evidence",
"step_execution_id", id,
"error", err)
}
}

return nil
}

Expand Down
39 changes: 23 additions & 16 deletions internal/service/relational/workflows/workflow_execution_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (s *WorkflowExecutionService) UpdateStatus(ctx context.Context, id *uuid.UU
if err := s.db.WithContext(ctx).Select("status").First(&current, "id = ?", id).Error; err != nil {
return err
}
if current.Status == status {
return nil
}
if !isValidExecutionStatusTransition(current.Status, status) {
return fmt.Errorf("%w: %s -> %s", ErrInvalidWorkflowExecutionStatusTransition, current.Status, status)
}
Expand All @@ -145,24 +148,8 @@ func (s *WorkflowExecutionService) UpdateStatus(ctx context.Context, id *uuid.UU
now := time.Now()
switch status {
case "in_progress":
// Keep behavior: create started evidence while execution is still pending.
if s.evidenceCreator != nil {
if err := s.evidenceCreator.AddWorkflowExecutionEvidence(ctx, id, "started"); err != nil {
s.logger.Warnw("Failed to create workflow execution started evidence",
"workflow_execution_id", id,
"error", err)
}
}
updates["started_at"] = now
case "completed":
// Keep behavior: create completed evidence prior to status update.
if s.evidenceCreator != nil {
if err := s.evidenceCreator.AddWorkflowExecutionEvidence(ctx, id, "completed"); err != nil {
s.logger.Warnw("Failed to create workflow execution completed evidence",
"workflow_execution_id", id,
"error", err)
}
}
updates["completed_at"] = now
case "overdue":
updates["overdue_at"] = now
Expand All @@ -182,12 +169,32 @@ func (s *WorkflowExecutionService) UpdateStatus(ctx context.Context, id *uuid.UU
if err := s.db.WithContext(ctx).Select("status").First(&latest, "id = ?", id).Error; err != nil {
return err
}
if latest.Status == status {
return nil
}
if !isValidExecutionStatusTransition(latest.Status, status) {
return fmt.Errorf("%w: %s -> %s", ErrInvalidWorkflowExecutionStatusTransition, latest.Status, status)
}
return fmt.Errorf("%w: %s -> %s", ErrWorkflowExecutionStatusTransitionConflict, current.Status, status)
}

if s.evidenceCreator != nil {
switch status {
case "in_progress":
if err := s.evidenceCreator.AddWorkflowExecutionEvidence(ctx, id, "started"); err != nil {
s.logger.Warnw("Failed to create workflow execution started evidence",
"workflow_execution_id", id,
"error", err)
}
case "completed":
if err := s.evidenceCreator.AddWorkflowExecutionEvidence(ctx, id, "completed"); err != nil {
s.logger.Warnw("Failed to create workflow execution completed evidence",
"workflow_execution_id", id,
"error", err)
}
}
}

return nil
}

Expand Down
12 changes: 5 additions & 7 deletions internal/workflow/evidence.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ func (e *EvidenceIntegration) AddWorkflowExecutionEvidence(ctx context.Context,
return fmt.Errorf("failed to get workflow execution: %w", err)
}

// Only add started evidence for pending executions (before transition to in_progress)
if execution.Status != "pending" && status == "started" {
// Started evidence may be emitted right before or right after the transition.
if status == "started" && execution.Status != "pending" && execution.Status != "in_progress" {
return fmt.Errorf("workflow execution is not in pending status, status: %s", execution.Status)
}
if execution.Status != "in_progress" && status == "complete" {
if status == "completed" && execution.Status != "in_progress" && execution.Status != "completed" {
return fmt.Errorf("workflow execution is not in status, status: %s", execution.Status)
}

Expand Down Expand Up @@ -273,10 +273,8 @@ func (e *EvidenceIntegration) AddStepStartedEvidence(ctx context.Context, stepEx
return fmt.Errorf("failed to get step execution: %w", err)
}

// Only add started evidence for steps transitioning to in_progress status
// This means the current stepExecution status must be pending
// This should only be called when a user/scheduler moves a step from pending -> in_progress
if stepExecution.Status != "pending" {
// Started evidence may be emitted right before or right after transitioning to in_progress.
if stepExecution.Status != "pending" && stepExecution.Status != "in_progress" {
return fmt.Errorf("step execution is not in pending status, status: %s", stepExecution.Status)
}

Expand Down
10 changes: 5 additions & 5 deletions internal/workflow/evidence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,15 +666,15 @@ func TestAddWorkflowExecutionStartedEvidence(t *testing.T) {
assert.Equal(t, "workflow_execution_started", labelMap["evidence.type"])
})

t.Run("RejectNonPendingExecution", func(t *testing.T) {
t.Run("RejectInvalidExecutionStatusForStarted", func(t *testing.T) {
// Create workflow context
_, _, execution, _ := createTestWorkflowContext(t, db)

// Update execution to in_progress status
err := db.Model(execution).Update("status", "in_progress").Error
// Update execution to failed status
err := db.Model(execution).Update("status", "failed").Error
require.NoError(t, err)

// Try to add evidence for an in_progress execution (should fail since evidence is only created for pending executions)
// Try to add started evidence for a failed execution (should fail)
err = evidenceIntegration.AddWorkflowExecutionEvidence(context.Background(), execution.ID, "started")
require.Error(t, err)
assert.Contains(t, err.Error(), "not in pending status")
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestAddStepStartedEvidence(t *testing.T) {
assert.Equal(t, stepExecution.ID.String(), labelMap["step.execution.id"])
assert.Equal(t, stepDef.ID.String(), labelMap["step.definition.id"])
assert.Equal(t, stepDef.Name, labelMap["step.name"])
assert.Equal(t, "pending", labelMap["step.status"])
assert.Equal(t, "in_progress", labelMap["step.status"])
})

t.Run("RejectNonInProgressStep", func(t *testing.T) {
Expand Down