diff --git a/docs/docs.go b/docs/docs.go index 83af6153..0dafb113 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -29163,6 +29163,9 @@ const docTemplate = `{ "inProgressSteps": { "type": "integer" }, + "overdueSteps": { + "type": "integer" + }, "pendingSteps": { "type": "integer" }, @@ -29355,6 +29358,9 @@ const docTemplate = `{ "evidence-required": { "type": "string" }, + "grace-period-days": { + "type": "integer" + }, "name": { "type": "string" }, @@ -29380,6 +29386,9 @@ const docTemplate = `{ "description": { "type": "string" }, + "grace-period-days": { + "type": "integer" + }, "is-active": { "type": "boolean" }, @@ -29421,6 +29430,9 @@ const docTemplate = `{ "$ref": "#/definitions/workflows.EvidenceRequirement" } }, + "grace-period-days": { + "type": "integer" + }, "name": { "type": "string" }, @@ -29711,6 +29723,9 @@ const docTemplate = `{ "deleted_at": { "$ref": "#/definitions/gorm.DeletedAt" }, + "due_date": { + "type": "string" + }, "failed-at": { "type": "string" }, @@ -29720,6 +29735,9 @@ const docTemplate = `{ "id": { "type": "string" }, + "overdue-at": { + "type": "string" + }, "reassignment_history": { "type": "array", "items": { @@ -29929,6 +29947,9 @@ const docTemplate = `{ "evidence-required": { "type": "string" }, + "grace-period-days": { + "type": "integer" + }, "name": { "type": "string" }, @@ -29949,6 +29970,9 @@ const docTemplate = `{ "description": { "type": "string" }, + "grace-period-days": { + "type": "integer" + }, "is-active": { "type": "boolean" }, @@ -29978,6 +30002,9 @@ const docTemplate = `{ "$ref": "#/definitions/workflows.EvidenceRequirement" } }, + "grace-period-days": { + "type": "integer" + }, "name": { "type": "string" }, @@ -30012,7 +30039,7 @@ const docTemplate = `{ "description": "JSON array of required evidence types", "type": "string" }, - "grace_period_days": { + "grace-period-days": { "description": "Override global default if set", "type": "integer" }, @@ -30098,6 +30125,9 @@ const docTemplate = `{ "id": { "type": "string" }, + "overdue-at": { + "type": "string" + }, "period_label": { "description": "Scheduling Context", "type": "string" @@ -30204,7 +30234,7 @@ const docTemplate = `{ "$ref": "#/definitions/workflows.WorkflowExecution" } }, - "grace_period_days": { + "grace-period-days": { "description": "Override definition/global default if set", "type": "integer" }, @@ -30311,6 +30341,10 @@ const docTemplate = `{ "$ref": "#/definitions/workflows.EvidenceRequirement" } }, + "grace-period-days": { + "description": "Override default grace for this specific step", + "type": "integer" + }, "id": { "type": "string" }, diff --git a/docs/swagger.json b/docs/swagger.json index 6fb9e5f3..99b1c0de 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -29157,6 +29157,9 @@ "inProgressSteps": { "type": "integer" }, + "overdueSteps": { + "type": "integer" + }, "pendingSteps": { "type": "integer" }, @@ -29349,6 +29352,9 @@ "evidence-required": { "type": "string" }, + "grace-period-days": { + "type": "integer" + }, "name": { "type": "string" }, @@ -29374,6 +29380,9 @@ "description": { "type": "string" }, + "grace-period-days": { + "type": "integer" + }, "is-active": { "type": "boolean" }, @@ -29415,6 +29424,9 @@ "$ref": "#/definitions/workflows.EvidenceRequirement" } }, + "grace-period-days": { + "type": "integer" + }, "name": { "type": "string" }, @@ -29705,6 +29717,9 @@ "deleted_at": { "$ref": "#/definitions/gorm.DeletedAt" }, + "due_date": { + "type": "string" + }, "failed-at": { "type": "string" }, @@ -29714,6 +29729,9 @@ "id": { "type": "string" }, + "overdue-at": { + "type": "string" + }, "reassignment_history": { "type": "array", "items": { @@ -29923,6 +29941,9 @@ "evidence-required": { "type": "string" }, + "grace-period-days": { + "type": "integer" + }, "name": { "type": "string" }, @@ -29943,6 +29964,9 @@ "description": { "type": "string" }, + "grace-period-days": { + "type": "integer" + }, "is-active": { "type": "boolean" }, @@ -29972,6 +29996,9 @@ "$ref": "#/definitions/workflows.EvidenceRequirement" } }, + "grace-period-days": { + "type": "integer" + }, "name": { "type": "string" }, @@ -30006,7 +30033,7 @@ "description": "JSON array of required evidence types", "type": "string" }, - "grace_period_days": { + "grace-period-days": { "description": "Override global default if set", "type": "integer" }, @@ -30092,6 +30119,9 @@ "id": { "type": "string" }, + "overdue-at": { + "type": "string" + }, "period_label": { "description": "Scheduling Context", "type": "string" @@ -30198,7 +30228,7 @@ "$ref": "#/definitions/workflows.WorkflowExecution" } }, - "grace_period_days": { + "grace-period-days": { "description": "Override definition/global default if set", "type": "integer" }, @@ -30305,6 +30335,10 @@ "$ref": "#/definitions/workflows.EvidenceRequirement" } }, + "grace-period-days": { + "description": "Override default grace for this specific step", + "type": "integer" + }, "id": { "type": "string" }, diff --git a/docs/swagger.yaml b/docs/swagger.yaml index fd4685aa..fabc5c31 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -6332,6 +6332,8 @@ definitions: type: string inProgressSteps: type: integer + overdueSteps: + type: integer pendingSteps: type: integer startedAt: @@ -6458,6 +6460,8 @@ definitions: type: string evidence-required: type: string + grace-period-days: + type: integer name: type: string suggested-cadence: @@ -6473,6 +6477,8 @@ definitions: type: string description: type: string + grace-period-days: + type: integer is-active: type: boolean name: @@ -6501,6 +6507,8 @@ definitions: items: $ref: '#/definitions/workflows.EvidenceRequirement' type: array + grace-period-days: + type: integer name: type: string responsible-role: @@ -6697,12 +6705,16 @@ definitions: type: string deleted_at: $ref: '#/definitions/gorm.DeletedAt' + due_date: + type: string failed-at: type: string failure_reason: type: string id: type: string + overdue-at: + type: string reassignment_history: items: $ref: '#/definitions/workflows.StepReassignmentHistory' @@ -6840,6 +6852,8 @@ definitions: type: string evidence-required: type: string + grace-period-days: + type: integer name: type: string suggested-cadence: @@ -6853,6 +6867,8 @@ definitions: type: string description: type: string + grace-period-days: + type: integer is-active: type: boolean name: @@ -6872,6 +6888,8 @@ definitions: items: $ref: '#/definitions/workflows.EvidenceRequirement' type: array + grace-period-days: + type: integer name: type: string responsible-role: @@ -6895,7 +6913,7 @@ definitions: evidence_required: description: JSON array of required evidence types type: string - grace_period_days: + grace-period-days: description: Override global default if set type: integer id: @@ -6953,6 +6971,8 @@ definitions: type: string id: type: string + overdue-at: + type: string period_label: description: Scheduling Context type: string @@ -7023,7 +7043,7 @@ definitions: items: $ref: '#/definitions/workflows.WorkflowExecution' type: array - grace_period_days: + grace-period-days: description: Override definition/global default if set type: integer id: @@ -7094,6 +7114,9 @@ definitions: items: $ref: '#/definitions/workflows.EvidenceRequirement' type: array + grace-period-days: + description: Override default grace for this specific step + type: integer id: type: string name: diff --git a/internal/api/handler/workflows/step_execution.go b/internal/api/handler/workflows/step_execution.go index 28d1f374..ab1297e0 100644 --- a/internal/api/handler/workflows/step_execution.go +++ b/internal/api/handler/workflows/step_execution.go @@ -306,6 +306,9 @@ func (h *StepExecutionHandler) TransitionStep(ctx echo.Context) error { if isNotFoundError(err) { return ctx.JSON(http.StatusNotFound, api.NewError(err)) } + if errors.Is(err, workflow.ErrInvalidStepTransition) { + return ctx.JSON(http.StatusBadRequest, api.NewError(err)) + } // Check if it's a permission error if isPermissionError(err) { return ctx.JSON(http.StatusForbidden, api.NewError(err)) @@ -420,7 +423,7 @@ func (h *StepExecutionHandler) Fail(ctx echo.Context) error { return HandleError(err) } - if err := h.service.Fail(id, req.Reason); err != nil { + if err := h.transitionService.FailStep(ctx.Request().Context(), id, req.Reason); err != nil { return h.HandleServiceError(ctx, err, "fail", "step execution") } diff --git a/internal/api/handler/workflows/step_execution_integration_test.go b/internal/api/handler/workflows/step_execution_integration_test.go index d1918e0c..82290256 100644 --- a/internal/api/handler/workflows/step_execution_integration_test.go +++ b/internal/api/handler/workflows/step_execution_integration_test.go @@ -307,6 +307,43 @@ func TestStepExecutionHandler_TransitionStep(t *testing.T) { require.NotNil(t, response.Data) assert.Equal(t, "completed", response.Data.Status) }) + + t.Run("OverdueToInProgressReturnsBadRequest", func(t *testing.T) { + overdueStepDef := &workflows.WorkflowStepDefinition{ + WorkflowDefinitionID: workflowDef.ID, + Name: "Overdue Step", + ResponsibleRole: "engineer", + EvidenceRequired: []workflows.EvidenceRequirement{}, + } + require.NoError(t, db.Create(overdueStepDef).Error) + + overdueStep := &workflows.StepExecution{ + WorkflowExecutionID: execution.ID, + WorkflowStepDefinitionID: overdueStepDef.ID, + Status: workflows.StepStatusOverdue.String(), + } + require.NoError(t, db.Create(overdueStep).Error) + + reqBody := TransitionStepRequest{ + Status: "in_progress", + UserID: "test-user", + UserType: "user", + } + + body, err := json.Marshal(reqBody) + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodPut, "/workflows/step-executions/"+overdueStep.ID.String()+"/transition", bytes.NewReader(body)) + req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetParamNames("id") + c.SetParamValues(overdueStep.ID.String()) + + err = handler.TransitionStep(c) + require.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, rec.Code) + }) } func TestStepExecutionHandler_GetEvidenceRequirements(t *testing.T) { @@ -519,6 +556,18 @@ func TestStepExecutionHandler_ListMy(t *testing.T) { ResponsibleRole: "reviewer", } require.NoError(t, db.Create(stepDef2).Error) + stepDef3 := &workflows.WorkflowStepDefinition{ + WorkflowDefinitionID: workflowDef.ID, + Name: "Step 3", + ResponsibleRole: "engineer", + } + require.NoError(t, db.Create(stepDef3).Error) + stepDef4 := &workflows.WorkflowStepDefinition{ + WorkflowDefinitionID: workflowDef.ID, + Name: "Step 4", + ResponsibleRole: "engineer", + } + require.NoError(t, db.Create(stepDef4).Error) // Create step executions assigned to test user 1 (by user ID) assignedAt := time.Now() @@ -534,7 +583,7 @@ func TestStepExecutionHandler_ListMy(t *testing.T) { stepExec2 := &workflows.StepExecution{ WorkflowExecutionID: execution.ID, - WorkflowStepDefinitionID: stepDef.ID, + WorkflowStepDefinitionID: stepDef3.ID, Status: "in_progress", AssignedToType: "user", AssignedToID: testUser1.ID.String(), @@ -556,7 +605,7 @@ func TestStepExecutionHandler_ListMy(t *testing.T) { // Create step execution assigned to different user stepExec4 := &workflows.StepExecution{ WorkflowExecutionID: execution.ID, - WorkflowStepDefinitionID: stepDef.ID, + WorkflowStepDefinitionID: stepDef4.ID, Status: "pending", AssignedToType: "user", AssignedToID: testUser2.ID.String(), diff --git a/internal/api/handler/workflows/workflow_definition.go b/internal/api/handler/workflows/workflow_definition.go index d6403ac5..b0711dba 100644 --- a/internal/api/handler/workflows/workflow_definition.go +++ b/internal/api/handler/workflows/workflow_definition.go @@ -33,6 +33,7 @@ type CreateWorkflowDefinitionRequest struct { Version string `json:"version"` SuggestedCadence string `json:"suggested-cadence"` EvidenceRequired string `json:"evidence-required"` + GracePeriodDays *int `json:"grace-period-days"` } type UpdateWorkflowDefinitionRequest struct { @@ -41,6 +42,7 @@ type UpdateWorkflowDefinitionRequest struct { Version *string `json:"version"` SuggestedCadence *string `json:"suggested-cadence"` EvidenceRequired *string `json:"evidence-required"` + GracePeriodDays *int `json:"grace-period-days"` } type WorkflowDefinitionResponse struct { @@ -77,6 +79,7 @@ func (h *WorkflowDefinitionHandler) Create(ctx echo.Context) error { Version: req.Version, SuggestedCadence: req.SuggestedCadence, EvidenceRequired: req.EvidenceRequired, + GracePeriodDays: req.GracePeriodDays, } if err := h.service.Create(definition); err != nil { @@ -184,6 +187,9 @@ func (h *WorkflowDefinitionHandler) Update(ctx echo.Context) error { if req.EvidenceRequired != nil { definition.EvidenceRequired = *req.EvidenceRequired } + if req.GracePeriodDays != nil { + definition.GracePeriodDays = req.GracePeriodDays + } if err := h.service.Update(id, definition); err != nil { return h.HandleServiceError(ctx, err, "update", "workflow definition") diff --git a/internal/api/handler/workflows/workflow_definition_integration_test.go b/internal/api/handler/workflows/workflow_definition_integration_test.go index a0af10bb..ac65a323 100644 --- a/internal/api/handler/workflows/workflow_definition_integration_test.go +++ b/internal/api/handler/workflows/workflow_definition_integration_test.go @@ -38,6 +38,7 @@ func TestWorkflowDefinitionHandler_Create(t *testing.T) { Version: "1.0", SuggestedCadence: "quarterly", EvidenceRequired: `["vulnerability_scan", "penetration_test"]`, + GracePeriodDays: intPtr(10), } body, err := json.Marshal(reqBody) @@ -61,6 +62,8 @@ func TestWorkflowDefinitionHandler_Create(t *testing.T) { assert.Equal(t, "Quarterly security assessment process", response.Data.Description) assert.Equal(t, "1.0", response.Data.Version) assert.Equal(t, "quarterly", response.Data.SuggestedCadence) + require.NotNil(t, response.Data.GracePeriodDays) + assert.Equal(t, 10, *response.Data.GracePeriodDays) }) t.Run("ValidationError_MissingName", func(t *testing.T) { @@ -91,6 +94,7 @@ func TestWorkflowDefinitionHandler_Create(t *testing.T) { require.NoError(t, err) assert.Equal(t, http.StatusBadRequest, rec.Code) }) + } func TestWorkflowDefinitionHandler_List(t *testing.T) { @@ -201,9 +205,11 @@ func TestWorkflowDefinitionHandler_Update(t *testing.T) { t.Run("Success", func(t *testing.T) { newName := "Updated Name" newDescription := "Updated description" + newGrace := 14 reqBody := UpdateWorkflowDefinitionRequest{ - Name: &newName, - Description: &newDescription, + Name: &newName, + Description: &newDescription, + GracePeriodDays: &newGrace, } body, err := json.Marshal(reqBody) @@ -226,6 +232,8 @@ func TestWorkflowDefinitionHandler_Update(t *testing.T) { assert.Equal(t, "Updated Name", response.Data.Name) assert.Equal(t, "Updated description", response.Data.Description) assert.Equal(t, "1.0", response.Data.Version) // Unchanged + require.NotNil(t, response.Data.GracePeriodDays) + assert.Equal(t, 14, *response.Data.GracePeriodDays) }) t.Run("PartialUpdate", func(t *testing.T) { diff --git a/internal/api/handler/workflows/workflow_execution_integration_test.go b/internal/api/handler/workflows/workflow_execution_integration_test.go index b3a16676..7231cd97 100644 --- a/internal/api/handler/workflows/workflow_execution_integration_test.go +++ b/internal/api/handler/workflows/workflow_execution_integration_test.go @@ -295,6 +295,12 @@ func TestWorkflowExecutionHandler_ReassignRole(t *testing.T) { ResponsibleRole: "it-ops", } require.NoError(t, db.Create(stepDefTarget).Error) + stepDefTargetCompleted := &workflows.WorkflowStepDefinition{ + WorkflowDefinitionID: workflowDef.ID, + Name: "Target Step Completed", + ResponsibleRole: "it-ops", + } + require.NoError(t, db.Create(stepDefTargetCompleted).Error) stepDefOther := &workflows.WorkflowStepDefinition{ WorkflowDefinitionID: workflowDef.ID, @@ -314,7 +320,7 @@ func TestWorkflowExecutionHandler_ReassignRole(t *testing.T) { ineligibleStep := &workflows.StepExecution{ WorkflowExecutionID: execution.ID, - WorkflowStepDefinitionID: stepDefTarget.ID, + WorkflowStepDefinitionID: stepDefTargetCompleted.ID, Status: "completed", AssignedToType: "group", AssignedToID: "old-completed", diff --git a/internal/api/handler/workflows/workflow_instance.go b/internal/api/handler/workflows/workflow_instance.go index b1712716..aba99c47 100644 --- a/internal/api/handler/workflows/workflow_instance.go +++ b/internal/api/handler/workflows/workflow_instance.go @@ -37,13 +37,15 @@ type CreateWorkflowInstanceRequest struct { SystemSecurityPlanID string `json:"system-id" validate:"required"` Cadence string `json:"cadence"` IsActive *bool `json:"is-active"` + GracePeriodDays *int `json:"grace-period-days"` } type UpdateWorkflowInstanceRequest struct { - Name *string `json:"name"` - Description *string `json:"description"` - Cadence *string `json:"cadence"` - IsActive *bool `json:"is-active"` + Name *string `json:"name"` + Description *string `json:"description"` + Cadence *string `json:"cadence"` + IsActive *bool `json:"is-active"` + GracePeriodDays *int `json:"grace-period-days"` } type WorkflowInstanceResponse struct { @@ -85,6 +87,7 @@ func (h *WorkflowInstanceHandler) Create(ctx echo.Context) error { SystemSecurityPlanID: &systemId, Cadence: req.Cadence, IsActive: true, // Default to active + GracePeriodDays: req.GracePeriodDays, } if req.IsActive != nil { @@ -231,6 +234,9 @@ func (h *WorkflowInstanceHandler) Update(ctx echo.Context) error { if req.IsActive != nil { instance.IsActive = *req.IsActive } + if req.GracePeriodDays != nil { + instance.GracePeriodDays = req.GracePeriodDays + } if err := h.service.Update(id, instance); err != nil { return h.HandleServiceError(ctx, err, "update", "workflow instance") diff --git a/internal/api/handler/workflows/workflow_instance_integration_test.go b/internal/api/handler/workflows/workflow_instance_integration_test.go index f5a88664..59b83a6e 100644 --- a/internal/api/handler/workflows/workflow_instance_integration_test.go +++ b/internal/api/handler/workflows/workflow_instance_integration_test.go @@ -47,6 +47,7 @@ func TestWorkflowInstanceHandler_Create(t *testing.T) { Description: "Security assessment for production environment", SystemSecurityPlanID: sysId.String(), Cadence: "quarterly", + GracePeriodDays: intPtr(21), } body, err := json.Marshal(reqBody) @@ -70,6 +71,8 @@ func TestWorkflowInstanceHandler_Create(t *testing.T) { assert.Equal(t, sysId, *response.Data.SystemSecurityPlanID) assert.Equal(t, "quarterly", response.Data.Cadence) assert.True(t, response.Data.IsActive) + require.NotNil(t, response.Data.GracePeriodDays) + assert.Equal(t, 21, *response.Data.GracePeriodDays) }) t.Run("ValidationError_MissingName", func(t *testing.T) { @@ -117,6 +120,7 @@ func TestWorkflowInstanceHandler_Create(t *testing.T) { require.NoError(t, err) assert.NotNil(t, response.Data) }) + } func TestWorkflowInstanceHandler_List(t *testing.T) { @@ -295,9 +299,11 @@ func TestWorkflowInstanceHandler_Update(t *testing.T) { t.Run("Success", func(t *testing.T) { newName := "Updated Name" newCadence := "quarterly" + newGrace := 7 reqBody := UpdateWorkflowInstanceRequest{ - Name: &newName, - Cadence: &newCadence, + Name: &newName, + Cadence: &newCadence, + GracePeriodDays: &newGrace, } body, err := json.Marshal(reqBody) @@ -320,6 +326,10 @@ func TestWorkflowInstanceHandler_Update(t *testing.T) { assert.Equal(t, "Updated Name", response.Data.Name) assert.Equal(t, "quarterly", response.Data.Cadence) assert.Equal(t, sspID, *response.Data.SystemSecurityPlanID) // Unchanged + var updated workflows.WorkflowInstance + require.NoError(t, db.First(&updated, "id = ?", instance.ID).Error) + require.NotNil(t, updated.GracePeriodDays) + assert.Equal(t, 7, *updated.GracePeriodDays) }) t.Run("NotFound", func(t *testing.T) { diff --git a/internal/api/handler/workflows/workflow_step_definition.go b/internal/api/handler/workflows/workflow_step_definition.go index 6f0232ba..2d8d7f28 100644 --- a/internal/api/handler/workflows/workflow_step_definition.go +++ b/internal/api/handler/workflows/workflow_step_definition.go @@ -108,6 +108,7 @@ type CreateWorkflowStepDefinitionRequest struct { ResponsibleRole string `json:"responsible-role" validate:"required"` EvidenceRequired []workflows.EvidenceRequirement `json:"evidence-required"` EstimatedDuration int `json:"estimated-duration"` + GracePeriodDays *int `json:"grace-period-days"` DependsOn []string `json:"depends-on"` // Array of step IDs this step depends on } @@ -117,6 +118,7 @@ type UpdateWorkflowStepDefinitionRequest struct { ResponsibleRole *string `json:"responsible-role"` EvidenceRequired *[]workflows.EvidenceRequirement `json:"evidence-required"` EstimatedDuration *int `json:"estimated-duration"` + GracePeriodDays *int `json:"grace-period-days"` DependsOn *[]string `json:"depends-on"` } @@ -155,6 +157,7 @@ func (h *WorkflowStepDefinitionHandler) Create(ctx echo.Context) error { ResponsibleRole: req.ResponsibleRole, EvidenceRequired: req.EvidenceRequired, EstimatedDuration: req.EstimatedDuration, + GracePeriodDays: req.GracePeriodDays, } if err := h.service.Create(stepDef); err != nil { @@ -290,6 +293,9 @@ func (h *WorkflowStepDefinitionHandler) Update(ctx echo.Context) error { if req.EstimatedDuration != nil { stepDef.EstimatedDuration = *req.EstimatedDuration } + if req.GracePeriodDays != nil { + stepDef.GracePeriodDays = req.GracePeriodDays + } if err := h.service.Update(id, stepDef); err != nil { return h.HandleServiceError(ctx, err, "update", "workflow step definition") diff --git a/internal/api/handler/workflows/workflow_step_definition_integration_test.go b/internal/api/handler/workflows/workflow_step_definition_integration_test.go index c5e88f95..9abc6074 100644 --- a/internal/api/handler/workflows/workflow_step_definition_integration_test.go +++ b/internal/api/handler/workflows/workflow_step_definition_integration_test.go @@ -19,6 +19,10 @@ import ( "gorm.io/gorm" ) +func intPtr(v int) *int { + return &v +} + func setupStepTestHandler(t *testing.T) (*WorkflowStepDefinitionHandler, *gorm.DB) { db := setupTestDB(t) logger := zap.NewNop().Sugar() @@ -45,6 +49,7 @@ func TestWorkflowStepDefinitionHandler_Create(t *testing.T) { Name: "Security Review", Description: "Conduct security review", ResponsibleRole: "security_engineer", + GracePeriodDays: intPtr(5), EvidenceRequired: []workflows.EvidenceRequirement{ { Type: "document", @@ -75,6 +80,8 @@ func TestWorkflowStepDefinitionHandler_Create(t *testing.T) { assert.Equal(t, "Security Review", response.Data.Name) assert.Equal(t, "security_engineer", response.Data.ResponsibleRole) assert.Equal(t, 120, response.Data.EstimatedDuration) + require.NotNil(t, response.Data.GracePeriodDays) + assert.Equal(t, 5, *response.Data.GracePeriodDays) }) t.Run("ValidationError_MissingName", func(t *testing.T) { @@ -273,9 +280,11 @@ func TestWorkflowStepDefinitionHandler_Update(t *testing.T) { t.Run("Success", func(t *testing.T) { newName := "Updated Name" newDuration := 90 + newGracePeriod := 3 reqBody := UpdateWorkflowStepDefinitionRequest{ Name: &newName, EstimatedDuration: &newDuration, + GracePeriodDays: &newGracePeriod, } body, err := json.Marshal(reqBody) @@ -297,6 +306,8 @@ func TestWorkflowStepDefinitionHandler_Update(t *testing.T) { require.NoError(t, err) assert.Equal(t, "Updated Name", response.Data.Name) assert.Equal(t, 90, response.Data.EstimatedDuration) + require.NotNil(t, response.Data.GracePeriodDays) + assert.Equal(t, 3, *response.Data.GracePeriodDays) assert.Equal(t, "engineer", response.Data.ResponsibleRole) // Unchanged }) diff --git a/internal/service/relational/workflows/constants.go b/internal/service/relational/workflows/constants.go index ff82a831..8138c26d 100644 --- a/internal/service/relational/workflows/constants.go +++ b/internal/service/relational/workflows/constants.go @@ -107,6 +107,7 @@ type WorkflowExecutionStatus string const ( WorkflowStatusPending WorkflowExecutionStatus = "pending" WorkflowStatusInProgress WorkflowExecutionStatus = "in_progress" + WorkflowStatusOverdue WorkflowExecutionStatus = "overdue" WorkflowStatusCompleted WorkflowExecutionStatus = "completed" WorkflowStatusFailed WorkflowExecutionStatus = "failed" WorkflowStatusCancelled WorkflowExecutionStatus = "cancelled" @@ -115,7 +116,7 @@ const ( // IsValid checks if the workflow execution status is valid func (w WorkflowExecutionStatus) IsValid() bool { switch w { - case WorkflowStatusPending, WorkflowStatusInProgress, WorkflowStatusCompleted, WorkflowStatusFailed, WorkflowStatusCancelled: + case WorkflowStatusPending, WorkflowStatusInProgress, WorkflowStatusOverdue, WorkflowStatusCompleted, WorkflowStatusFailed, WorkflowStatusCancelled: return true } return false @@ -137,6 +138,7 @@ const ( StepStatusPending StepExecutionStatus = "pending" StepStatusBlocked StepExecutionStatus = "blocked" StepStatusInProgress StepExecutionStatus = "in_progress" + StepStatusOverdue StepExecutionStatus = "overdue" StepStatusCompleted StepExecutionStatus = "completed" StepStatusFailed StepExecutionStatus = "failed" StepStatusSkipped StepExecutionStatus = "skipped" @@ -145,7 +147,7 @@ const ( // IsValid checks if the step execution status is valid func (s StepExecutionStatus) IsValid() bool { switch s { - case StepStatusPending, StepStatusBlocked, StepStatusInProgress, StepStatusCompleted, StepStatusFailed, StepStatusSkipped: + case StepStatusPending, StepStatusBlocked, StepStatusInProgress, StepStatusOverdue, StepStatusCompleted, StepStatusFailed, StepStatusSkipped: return true } return false diff --git a/internal/service/relational/workflows/step_execution_service.go b/internal/service/relational/workflows/step_execution_service.go index f57bd689..5af141bb 100644 --- a/internal/service/relational/workflows/step_execution_service.go +++ b/internal/service/relational/workflows/step_execution_service.go @@ -3,8 +3,10 @@ package workflows import ( "context" "errors" + "fmt" "time" + "github.com/compliance-framework/api/internal/config" "github.com/google/uuid" "go.uber.org/zap" "gorm.io/gorm" @@ -23,6 +25,9 @@ type StepExecutionService struct { logger *zap.SugaredLogger } +var ErrInvalidStepExecutionStatusTransition = errors.New("invalid step execution status transition") +var ErrStepExecutionStatusTransitionConflict = errors.New("step execution status transition conflict") + // NewStepExecutionService creates a new StepExecutionService func NewStepExecutionService(db *gorm.DB, evidenceCreator StepEvidenceCreator) *StepExecutionService { return &StepExecutionService{ @@ -89,12 +94,22 @@ func (s *StepExecutionService) UpdateStatus(ctx context.Context, id *uuid.UUID, return err } + var current StepExecution + if err := s.db.WithContext(ctx).Select("status").First(¤t, "id = ?", id).Error; err != nil { + return err + } + if !isValidStepStatusTransition(current.Status, status) { + return fmt.Errorf("%w: %s -> %s", ErrInvalidStepExecutionStatusTransition, current.Status, status) + } + updates := map[string]interface{}{"status": status} now := time.Now() + if err := s.setDueDateIfNeeded(id, ¤t, status, now, updates); err != nil { + return err + } switch status { case "in_progress": updates["started_at"] = now - // Add step started evidence when transitioning to in_progress if s.evidenceCreator != nil { if err := s.evidenceCreator.AddStepStartedEvidence(ctx, id); err != nil { // Log error but don't fail the status update @@ -105,35 +120,98 @@ func (s *StepExecutionService) UpdateStatus(ctx context.Context, id *uuid.UUID, } case "completed": updates["completed_at"] = now + case "overdue": + updates["overdue_at"] = now case "failed": updates["failed_at"] = now } - return s.base.UpdateStatus(&StepExecution{}, id, status, "status", updates) + result := s.db.WithContext(ctx). + Model(&StepExecution{}). + Where("id = ? AND status = ?", id, current.Status). + Updates(updates) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + var latest StepExecution + if err := s.db.WithContext(ctx).Select("status").First(&latest, "id = ?", id).Error; err != nil { + return err + } + if !isValidStepStatusTransition(latest.Status, status) { + return fmt.Errorf("%w: %s -> %s", ErrInvalidStepExecutionStatusTransition, latest.Status, status) + } + return fmt.Errorf("%w: %s -> %s", ErrStepExecutionStatusTransitionConflict, current.Status, status) + } + + return nil } -// Start marks a step execution as started -func (s *StepExecutionService) Start(id *uuid.UUID) error { - now := time.Now() - err := s.base.UpdateStatus(&StepExecution{}, id, "in_progress", "status", map[string]interface{}{ - "status": "in_progress", - "started_at": now, - }) +func (s *StepExecutionService) setDueDateIfNeeded( + stepExecutionID *uuid.UUID, + current *StepExecution, + nextStatus string, + now time.Time, + updates map[string]interface{}, +) error { + if current == nil || current.DueDate != nil { + return nil + } + if current.Status != StepStatusBlocked.String() && + nextStatus != StepStatusInProgress.String() { + return nil + } + if nextStatus != StepStatusPending.String() && + nextStatus != StepStatusInProgress.String() { + return nil + } + + dueDate, err := s.resolveStepDueDate(stepExecutionID, now) if err != nil { return err } + if dueDate != nil { + updates["due_date"] = *dueDate + } + return nil +} - // Add step started evidence for in_progress steps - if s.evidenceCreator != nil { - if err := s.evidenceCreator.AddStepStartedEvidence(context.Background(), id); err != nil { - // Log error but don't fail the start - s.logger.Warnw("Failed to create step started evidence", - "step_execution_id", id, - "error", err) - } +func isValidStepStatusTransition(current, next string) bool { + switch current { + case StepStatusPending.String(): + return next == StepStatusPending.String() || + next == StepStatusInProgress.String() || + next == StepStatusCompleted.String() || + next == StepStatusOverdue.String() || + next == StepStatusFailed.String() || + next == StepStatusSkipped.String() + case StepStatusBlocked.String(): + return next == StepStatusBlocked.String() || + next == StepStatusPending.String() || + next == StepStatusOverdue.String() || + next == StepStatusFailed.String() || + next == StepStatusSkipped.String() + case StepStatusInProgress.String(): + return next == StepStatusInProgress.String() || + next == StepStatusCompleted.String() || + next == StepStatusOverdue.String() || + next == StepStatusFailed.String() || + next == StepStatusSkipped.String() + case StepStatusOverdue.String(): + return next == StepStatusOverdue.String() || + next == StepStatusCompleted.String() || + next == StepStatusFailed.String() || + next == StepStatusSkipped.String() + case StepStatusCompleted.String(), StepStatusFailed.String(), StepStatusSkipped.String(): + return next == current || (current == StepStatusCompleted.String() && next == StepStatusFailed.String()) + default: + return false } +} - return nil +// Start marks a step execution as started +func (s *StepExecutionService) Start(id *uuid.UUID) error { + return s.UpdateStatus(context.Background(), id, StepStatusInProgress.String()) } // Complete marks a step execution as completed @@ -164,9 +242,40 @@ func (s *StepExecutionService) Block(id *uuid.UUID) error { // Unblock marks a step execution as unblocked (pending) func (s *StepExecutionService) Unblock(id *uuid.UUID) error { - return s.db.Model(&StepExecution{}). - Where("id = ?", id). - Update("status", "pending").Error + return s.UpdateStatus(context.Background(), id, StepStatusPending.String()) +} + +func (s *StepExecutionService) resolveStepDueDate(stepExecutionID *uuid.UUID, from time.Time) (*time.Time, error) { + type graceResult struct { + StepGrace *int + InstanceGrace *int + DefinitionGrace *int + } + + var grace graceResult + err := s.db.Model(&StepExecution{}). + Select("workflow_step_definitions.grace_period_days AS step_grace, workflow_instances.grace_period_days AS instance_grace, workflow_definitions.grace_period_days AS definition_grace"). + Joins("JOIN workflow_step_definitions ON workflow_step_definitions.id = step_executions.workflow_step_definition_id"). + Joins("JOIN workflow_executions ON workflow_executions.id = step_executions.workflow_execution_id"). + Joins("JOIN workflow_instances ON workflow_instances.id = workflow_executions.workflow_instance_id"). + Joins("JOIN workflow_definitions ON workflow_definitions.id = workflow_instances.workflow_definition_id"). + Where("step_executions.id = ?", stepExecutionID). + Take(&grace).Error + if err != nil { + return nil, err + } + + graceDays := config.DefaultWorkflowConfig().GracePeriodDays + if grace.StepGrace != nil { + graceDays = *grace.StepGrace + } else if grace.InstanceGrace != nil { + graceDays = *grace.InstanceGrace + } else if grace.DefinitionGrace != nil { + graceDays = *grace.DefinitionGrace + } + + dueDate := from.AddDate(0, 0, graceDays) + return &dueDate, nil } // AssignTo assigns a step execution to a user or group @@ -215,7 +324,7 @@ func (s *StepExecutionService) GetCompletedSteps(executionID *uuid.UUID) ([]Step func (s *StepExecutionService) GetAssignedSteps(assignedToType, assignedToID string) ([]StepExecution, error) { var stepExecutions []StepExecution err := s.db.Where("assigned_to_type = ? AND assigned_to_id = ? AND status IN ?", - assignedToType, assignedToID, []string{"pending", "in_progress", "blocked"}). + assignedToType, assignedToID, []string{"pending", "in_progress", "blocked", "overdue"}). Preload("WorkflowExecution"). Preload("WorkflowExecution.WorkflowInstance"). Preload("WorkflowStepDefinition"). @@ -244,7 +353,7 @@ func (s *StepExecutionService) GetMyAssignments(userID, userEmail string, filter Joins("JOIN workflow_instances ON workflow_instances.id = workflow_executions.workflow_instance_id"). Where("((step_executions.assigned_to_type = ? AND step_executions.assigned_to_id = ?) OR (step_executions.assigned_to_type = ? AND step_executions.assigned_to_id = ?))", "user", userID, "email", userEmail). - Where("workflow_executions.status IN ?", []string{"pending", "in_progress"}) + Where("workflow_executions.status IN ?", []string{"pending", "in_progress", "overdue"}) // Apply step status filter only when explicitly specified; otherwise rely on workflow execution status filter above. if filter.Status != "" { diff --git a/internal/service/relational/workflows/step_execution_service_test.go b/internal/service/relational/workflows/step_execution_service_test.go index 9fd31f5a..d6b70af1 100644 --- a/internal/service/relational/workflows/step_execution_service_test.go +++ b/internal/service/relational/workflows/step_execution_service_test.go @@ -107,10 +107,12 @@ func TestStepExecutionService_GetByWorkflowExecutionID(t *testing.T) { stepDef := createTestWorkflowStepDefinition(workflowDef.ID) require.NoError(t, db.Create(stepDef).Error) + stepDef2 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef2).Error) // Create multiple step executions stepExec1 := createTestStepExecution(execution.ID, stepDef.ID) - stepExec2 := createTestStepExecution(execution.ID, stepDef.ID) + stepExec2 := createTestStepExecution(execution.ID, stepDef2.ID) require.NoError(t, db.Create(stepExec1).Error) require.NoError(t, db.Create(stepExec2).Error) @@ -196,6 +198,8 @@ func TestStepExecutionService_UpdateStatus(t *testing.T) { stepDef := createTestWorkflowStepDefinition(workflowDef.ID) require.NoError(t, db.Create(stepDef).Error) + stepDef2 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef2).Error) // Create a test step execution stepExecution := createTestStepExecution(execution.ID, stepDef.ID) @@ -233,6 +237,20 @@ func TestStepExecutionService_UpdateStatus(t *testing.T) { err = service.UpdateStatus(context.Background(), stepExecution.ID, "invalid_status") assert.Error(t, err) assert.Contains(t, err.Error(), "invalid status") + + // Test overdue -> completed transition + stepExecution2 := createTestStepExecution(execution.ID, stepDef2.ID) + require.NoError(t, db.Create(stepExecution2).Error) + err = service.UpdateStatus(context.Background(), stepExecution2.ID, "overdue") + require.NoError(t, err) + err = service.UpdateStatus(context.Background(), stepExecution2.ID, "completed") + require.NoError(t, err) + var updatedOverdue StepExecution + err = db.First(&updatedOverdue, stepExecution2.ID).Error + require.NoError(t, err) + assert.Equal(t, "completed", updatedOverdue.Status) + assert.NotNil(t, updatedOverdue.OverdueAt) + assert.NotNil(t, updatedOverdue.CompletedAt) } // TestStepExecutionService_Start tests the Start method @@ -252,6 +270,10 @@ func TestStepExecutionService_Start(t *testing.T) { stepDef := createTestWorkflowStepDefinition(workflowDef.ID) require.NoError(t, db.Create(stepDef).Error) + stepDef2 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef2).Error) + stepDef3 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef3).Error) // Create a test step execution stepExecution := createTestStepExecution(execution.ID, stepDef.ID) @@ -406,6 +428,38 @@ func TestStepExecutionService_Unblock(t *testing.T) { assert.Equal(t, "pending", unblocked.Status) } +func TestStepExecutionService_UnblockSetsDueDateWhenMissing(t *testing.T) { + db := setupTestDB(t) + service := NewStepExecutionService(db, nil) + + workflowDef := createTestWorkflowDefinition() + require.NoError(t, db.Create(workflowDef).Error) + + instance := createTestWorkflowInstance(workflowDef.ID) + require.NoError(t, db.Create(instance).Error) + + execution := createTestWorkflowExecution(instance.ID) + require.NoError(t, db.Create(execution).Error) + + stepDef := createTestWorkflowStepDefinition(workflowDef.ID) + zero := 0 + stepDef.GracePeriodDays = &zero + require.NoError(t, db.Create(stepDef).Error) + + stepExecution := createTestStepExecution(execution.ID, stepDef.ID) + stepExecution.Status = StepStatusBlocked.String() + stepExecution.DueDate = nil + require.NoError(t, db.Create(stepExecution).Error) + + err := service.Unblock(stepExecution.ID) + require.NoError(t, err) + + var updated StepExecution + require.NoError(t, db.First(&updated, stepExecution.ID).Error) + assert.Equal(t, StepStatusPending.String(), updated.Status) + require.NotNil(t, updated.DueDate) +} + // TestStepExecutionService_AssignTo tests the AssignTo method func TestStepExecutionService_AssignTo(t *testing.T) { db := setupTestDB(t) @@ -460,11 +514,15 @@ func TestStepExecutionService_GetPendingSteps(t *testing.T) { stepDef := createTestWorkflowStepDefinition(workflowDef.ID) require.NoError(t, db.Create(stepDef).Error) + stepDef2 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef2).Error) + stepDef3 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef3).Error) // Create step executions with different statuses pendingStep1 := createTestStepExecution(execution.ID, stepDef.ID) - pendingStep2 := createTestStepExecution(execution.ID, stepDef.ID) - inProgressStep := createTestStepExecution(execution.ID, stepDef.ID) + pendingStep2 := createTestStepExecution(execution.ID, stepDef2.ID) + inProgressStep := createTestStepExecution(execution.ID, stepDef3.ID) inProgressStep.Status = "in_progress" require.NoError(t, db.Create(pendingStep1).Error) @@ -500,13 +558,17 @@ func TestStepExecutionService_GetBlockedSteps(t *testing.T) { stepDef := createTestWorkflowStepDefinition(workflowDef.ID) require.NoError(t, db.Create(stepDef).Error) + stepDef2 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef2).Error) + stepDef3 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef3).Error) // Create step executions with different statuses blockedStep1 := createTestStepExecution(execution.ID, stepDef.ID) blockedStep1.Status = "blocked" - blockedStep2 := createTestStepExecution(execution.ID, stepDef.ID) + blockedStep2 := createTestStepExecution(execution.ID, stepDef2.ID) blockedStep2.Status = "blocked" - pendingStep := createTestStepExecution(execution.ID, stepDef.ID) + pendingStep := createTestStepExecution(execution.ID, stepDef3.ID) require.NoError(t, db.Create(blockedStep1).Error) require.NoError(t, db.Create(blockedStep2).Error) @@ -541,13 +603,17 @@ func TestStepExecutionService_GetCompletedSteps(t *testing.T) { stepDef := createTestWorkflowStepDefinition(workflowDef.ID) require.NoError(t, db.Create(stepDef).Error) + stepDef2 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef2).Error) + stepDef3 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef3).Error) // Create step executions with different statuses completedStep1 := createTestStepExecution(execution.ID, stepDef.ID) completedStep1.Status = "completed" - completedStep2 := createTestStepExecution(execution.ID, stepDef.ID) + completedStep2 := createTestStepExecution(execution.ID, stepDef2.ID) completedStep2.Status = "completed" - pendingStep := createTestStepExecution(execution.ID, stepDef.ID) + pendingStep := createTestStepExecution(execution.ID, stepDef3.ID) require.NoError(t, db.Create(completedStep1).Error) require.NoError(t, db.Create(completedStep2).Error) @@ -582,17 +648,21 @@ func TestStepExecutionService_GetAssignedSteps(t *testing.T) { stepDef := createTestWorkflowStepDefinition(workflowDef.ID) require.NoError(t, db.Create(stepDef).Error) + stepDef2 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef2).Error) + stepDef3 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef3).Error) // Create step executions with different assignments assignedStep1 := createTestStepExecution(execution.ID, stepDef.ID) assignedStep1.AssignedToType = "user" assignedStep1.AssignedToID = "user123" - assignedStep2 := createTestStepExecution(execution.ID, stepDef.ID) + assignedStep2 := createTestStepExecution(execution.ID, stepDef2.ID) assignedStep2.AssignedToType = "user" assignedStep2.AssignedToID = "user123" - unassignedStep := createTestStepExecution(execution.ID, stepDef.ID) + unassignedStep := createTestStepExecution(execution.ID, stepDef3.ID) require.NoError(t, db.Create(assignedStep1).Error) require.NoError(t, db.Create(assignedStep2).Error) @@ -697,11 +767,13 @@ func TestStepExecutionService_GetUnblockableSteps(t *testing.T) { stepDef := createTestWorkflowStepDefinition(workflowDef.ID) require.NoError(t, db.Create(stepDef).Error) + stepDef2 := createTestWorkflowStepDefinition(workflowDef.ID) + require.NoError(t, db.Create(stepDef2).Error) // Create blocked step executions blockedStep1 := createTestStepExecution(execution.ID, stepDef.ID) blockedStep1.Status = "blocked" - blockedStep2 := createTestStepExecution(execution.ID, stepDef.ID) + blockedStep2 := createTestStepExecution(execution.ID, stepDef2.ID) blockedStep2.Status = "blocked" require.NoError(t, db.Create(blockedStep1).Error) diff --git a/internal/service/relational/workflows/workflow_definition.go b/internal/service/relational/workflows/workflow_definition.go index ea6aed19..083e489e 100644 --- a/internal/service/relational/workflows/workflow_definition.go +++ b/internal/service/relational/workflows/workflow_definition.go @@ -24,7 +24,7 @@ type WorkflowDefinition struct { // Workflow Configuration SuggestedCadence string `gorm:"size:50" json:"suggested_cadence"` // daily, weekly, monthly, quarterly, annually EvidenceRequired string `gorm:"type:text" json:"evidence_required"` // JSON array of required evidence types - GracePeriodDays *int `json:"grace_period_days,omitempty"` // Override global default if set + GracePeriodDays *int `json:"grace-period-days,omitempty"` // Override global default if set // Audit Fields CreatedByID *uuid.UUID `gorm:"index" json:"created_by_id,omitempty"` diff --git a/internal/service/relational/workflows/workflow_definition_service.go b/internal/service/relational/workflows/workflow_definition_service.go index 230f022a..bc0f2162 100644 --- a/internal/service/relational/workflows/workflow_definition_service.go +++ b/internal/service/relational/workflows/workflow_definition_service.go @@ -109,6 +109,9 @@ func (s *WorkflowDefinitionService) ValidateDefinition(definition *WorkflowDefin if definition == nil { return errors.New("workflow definition cannot be nil") } + if definition.GracePeriodDays != nil && *definition.GracePeriodDays < 0 { + return errors.New("grace period days must be non-negative") + } return CombineErrors( ValidateStringRequired(definition.Name, "workflow definition name"), diff --git a/internal/service/relational/workflows/workflow_execution.go b/internal/service/relational/workflows/workflow_execution.go index 8688a8bd..fda6a78f 100644 --- a/internal/service/relational/workflows/workflow_execution.go +++ b/internal/service/relational/workflows/workflow_execution.go @@ -18,6 +18,7 @@ type WorkflowExecution struct { // Execution Information Status string `gorm:"size:50;index:idx_workflow_exec_instance_status,priority:2;index:idx_workflow_exec_status_created" json:"status"` // pending, in_progress, completed, failed, cancelled StartedAt *time.Time `json:"started-at,omitempty"` + OverdueAt *time.Time `json:"overdue-at,omitempty"` CompletedAt *time.Time `json:"completed-at,omitempty"` FailedAt *time.Time `json:"failed-at,omitempty"` FailureReason string `gorm:"type:text" json:"failure_reason,omitempty"` @@ -57,6 +58,8 @@ type StepExecution struct { // Execution Information Status string `gorm:"size:50;index:idx_step_exec_workflow_status,priority:2;index:idx_step_exec_step_status,priority:2" json:"status"` // pending, blocked, in_progress, completed, failed, skipped StartedAt *time.Time `json:"started-at,omitempty"` + OverdueAt *time.Time `json:"overdue-at,omitempty"` + DueDate *time.Time `gorm:"index" json:"due_date,omitempty"` CompletedAt *time.Time `json:"completed-at,omitempty"` FailedAt *time.Time `json:"failed-at,omitempty"` FailureReason string `gorm:"type:text" json:"failure_reason,omitempty"` @@ -67,8 +70,8 @@ type StepExecution struct { AssignedAt *time.Time `json:"assigned-at,omitempty"` // Foreign Keys - WorkflowExecutionID *uuid.UUID `gorm:"not null;index;index:idx_step_exec_workflow_status,priority:1" json:"workflow_execution_id"` - WorkflowStepDefinitionID *uuid.UUID `gorm:"not null;index;index:idx_step_exec_step_status,priority:1" json:"workflow_step_definition_id"` + WorkflowExecutionID *uuid.UUID `gorm:"not null;index;index:idx_step_exec_workflow_status,priority:1;uniqueIndex:uidx_step_exec_workflow_step,priority:1" json:"workflow_execution_id"` + WorkflowStepDefinitionID *uuid.UUID `gorm:"not null;index;index:idx_step_exec_step_status,priority:1;uniqueIndex:uidx_step_exec_workflow_step,priority:2" json:"workflow_step_definition_id"` // Relationships WorkflowExecution *WorkflowExecution `gorm:"foreignKey:WorkflowExecutionID" json:"workflow_execution,omitempty"` diff --git a/internal/service/relational/workflows/workflow_execution_service.go b/internal/service/relational/workflows/workflow_execution_service.go index f516f7f1..91e43b01 100644 --- a/internal/service/relational/workflows/workflow_execution_service.go +++ b/internal/service/relational/workflows/workflow_execution_service.go @@ -3,6 +3,7 @@ package workflows import ( "context" "errors" + "fmt" "time" "github.com/google/uuid" @@ -23,6 +24,9 @@ type WorkflowExecutionService struct { logger *zap.SugaredLogger } +var ErrInvalidWorkflowExecutionStatusTransition = errors.New("invalid workflow execution status transition") +var ErrWorkflowExecutionStatusTransitionConflict = errors.New("workflow execution status transition conflict") + // NewWorkflowExecutionService creates a new WorkflowExecutionService func NewWorkflowExecutionService(db *gorm.DB) *WorkflowExecutionService { return &WorkflowExecutionService{ @@ -129,14 +133,21 @@ func (s *WorkflowExecutionService) UpdateStatus(ctx context.Context, id *uuid.UU return err } + var current WorkflowExecution + if err := s.db.WithContext(ctx).Select("status").First(¤t, "id = ?", id).Error; err != nil { + return err + } + if !isValidExecutionStatusTransition(current.Status, status) { + return fmt.Errorf("%w: %s -> %s", ErrInvalidWorkflowExecutionStatusTransition, current.Status, status) + } + updates := map[string]interface{}{"status": status} now := time.Now() switch status { case "in_progress": - // Add workflow execution started evidence when transitioning to in_progress (while still pending) + // Keep behavior: create started evidence while execution is still pending. if s.evidenceCreator != nil { if err := s.evidenceCreator.AddWorkflowExecutionEvidence(ctx, id, "started"); err != nil { - // Log error but don't fail the status update s.logger.Warnw("Failed to create workflow execution started evidence", "workflow_execution_id", id, "error", err) @@ -144,20 +155,66 @@ func (s *WorkflowExecutionService) UpdateStatus(ctx context.Context, id *uuid.UU } updates["started_at"] = now case "completed": - updates["completed_at"] = now + // Keep behavior: create completed evidence prior to status update. if s.evidenceCreator != nil { if err := s.evidenceCreator.AddWorkflowExecutionEvidence(ctx, id, "completed"); err != nil { - // Log error but don't fail the status update s.logger.Warnw("Failed to create workflow execution completed evidence", "workflow_execution_id", id, "error", err) } } + updates["completed_at"] = now + case "overdue": + updates["overdue_at"] = now case "failed": updates["failed_at"] = now } - return s.base.UpdateStatus(&WorkflowExecution{}, id, status, "status", updates) + result := s.db.WithContext(ctx). + Model(&WorkflowExecution{}). + Where("id = ? AND status = ?", id, current.Status). + Updates(updates) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + var latest WorkflowExecution + if err := s.db.WithContext(ctx).Select("status").First(&latest, "id = ?", id).Error; err != nil { + return err + } + if !isValidExecutionStatusTransition(latest.Status, status) { + return fmt.Errorf("%w: %s -> %s", ErrInvalidWorkflowExecutionStatusTransition, latest.Status, status) + } + return fmt.Errorf("%w: %s -> %s", ErrWorkflowExecutionStatusTransitionConflict, current.Status, status) + } + + return nil +} + +func isValidExecutionStatusTransition(current, next string) bool { + switch current { + case WorkflowStatusPending.String(): + return next == WorkflowStatusPending.String() || + next == WorkflowStatusInProgress.String() || + next == WorkflowStatusOverdue.String() || + next == WorkflowStatusCancelled.String() || + next == WorkflowStatusFailed.String() + case WorkflowStatusInProgress.String(): + return next == WorkflowStatusInProgress.String() || + next == WorkflowStatusCompleted.String() || + next == WorkflowStatusFailed.String() || + next == WorkflowStatusOverdue.String() || + next == WorkflowStatusCancelled.String() + case WorkflowStatusOverdue.String(): + return next == WorkflowStatusOverdue.String() || + next == WorkflowStatusFailed.String() || + next == WorkflowStatusCompleted.String() || + next == WorkflowStatusCancelled.String() + case WorkflowStatusCompleted.String(), WorkflowStatusFailed.String(), WorkflowStatusCancelled.String(): + return next == current || (current == WorkflowStatusCompleted.String() && next == WorkflowStatusFailed.String()) + default: + return false + } } // Start marks a workflow execution as started diff --git a/internal/service/relational/workflows/workflow_execution_service_test.go b/internal/service/relational/workflows/workflow_execution_service_test.go index 862bb1d4..8f911dee 100644 --- a/internal/service/relational/workflows/workflow_execution_service_test.go +++ b/internal/service/relational/workflows/workflow_execution_service_test.go @@ -335,6 +335,25 @@ func TestWorkflowExecutionService_UpdateStatus(t *testing.T) { err = service.UpdateStatus(context.Background(), execution.ID, "invalid_status") assert.Error(t, err) assert.Contains(t, err.Error(), "invalid status") + + // Test overdue transition + execution2 := createTestWorkflowExecution(instance.ID) + require.NoError(t, db.Create(execution2).Error) + err = service.UpdateStatus(context.Background(), execution2.ID, "overdue") + require.NoError(t, err) + var updatedOverdue WorkflowExecution + err = db.First(&updatedOverdue, execution2.ID).Error + require.NoError(t, err) + assert.Equal(t, "overdue", updatedOverdue.Status) + assert.NotNil(t, updatedOverdue.OverdueAt) + + // Test overdue -> completed transition + err = service.UpdateStatus(context.Background(), execution2.ID, "completed") + require.NoError(t, err) + err = db.First(&updatedOverdue, execution2.ID).Error + require.NoError(t, err) + assert.Equal(t, "completed", updatedOverdue.Status) + assert.NotNil(t, updatedOverdue.CompletedAt) } // TestWorkflowExecutionService_Start tests the Start method diff --git a/internal/service/relational/workflows/workflow_instance.go b/internal/service/relational/workflows/workflow_instance.go index 6d0ffffe..f1cf96b1 100644 --- a/internal/service/relational/workflows/workflow_instance.go +++ b/internal/service/relational/workflows/workflow_instance.go @@ -23,7 +23,7 @@ type WorkflowInstance struct { // Instance Configuration Cadence string `gorm:"size:50" json:"cadence"` // daily, weekly, monthly, quarterly, annually IsActive bool `gorm:"default:true" json:"is_active"` - GracePeriodDays *int `json:"grace_period_days,omitempty"` // Override definition/global default if set + GracePeriodDays *int `json:"grace-period-days,omitempty"` // Override definition/global default if set // Scheduling NextScheduledAt *time.Time `json:"next-scheduled-at,omitempty"` diff --git a/internal/service/relational/workflows/workflow_instance_service.go b/internal/service/relational/workflows/workflow_instance_service.go index b258f10e..912c76b4 100644 --- a/internal/service/relational/workflows/workflow_instance_service.go +++ b/internal/service/relational/workflows/workflow_instance_service.go @@ -206,6 +206,9 @@ func (s *WorkflowInstanceService) ValidateInstance(instance *WorkflowInstance) e if instance == nil { return errors.New("workflow instance cannot be nil") } + if instance.GracePeriodDays != nil && *instance.GracePeriodDays < 0 { + return errors.New("grace period days must be non-negative") + } return CombineErrors( ValidateStringRequired(instance.Name, "instance name"), diff --git a/internal/service/relational/workflows/workflow_step_definition.go b/internal/service/relational/workflows/workflow_step_definition.go index 6ec42b4d..3b883059 100644 --- a/internal/service/relational/workflows/workflow_step_definition.go +++ b/internal/service/relational/workflows/workflow_step_definition.go @@ -26,6 +26,7 @@ type WorkflowStepDefinition struct { ResponsibleRole string `gorm:"not null;size:255" json:"responsible_role"` // Role responsible for this step EvidenceRequired datatypes.JSONSlice[EvidenceRequirement] `json:"evidence_required"` // JSON array of required evidence types EstimatedDuration int `gorm:"default:0" json:"estimated_duration"` // Estimated duration in minutes + GracePeriodDays *int `json:"grace-period-days,omitempty"` // Override default grace for this specific step // Foreign Keys WorkflowDefinitionID *uuid.UUID `gorm:"not null;index" json:"workflow_definition_id"` diff --git a/internal/service/relational/workflows/workflow_step_definition_service.go b/internal/service/relational/workflows/workflow_step_definition_service.go index e2b9ffd3..814f9e51 100644 --- a/internal/service/relational/workflows/workflow_step_definition_service.go +++ b/internal/service/relational/workflows/workflow_step_definition_service.go @@ -188,6 +188,9 @@ func (s *WorkflowStepDefinitionService) ValidateStep(step *WorkflowStepDefinitio if step.WorkflowDefinitionID == nil { return errors.New("workflow definition ID is required") } + if step.GracePeriodDays != nil && *step.GracePeriodDays < 0 { + return errors.New("grace period days must be non-negative") + } return nil } diff --git a/internal/service/worker/service.go b/internal/service/worker/service.go index 36fb1a56..0d19495e 100644 --- a/internal/service/worker/service.go +++ b/internal/service/worker/service.go @@ -170,13 +170,24 @@ func NewServiceWithDigest( // Determine grace period days for the workflow scheduler, with safe defaults. gracePeriodDays := config.DefaultWorkflowConfig().GracePeriodDays + overdueCheckEnabled := config.DefaultWorkflowConfig().OverdueCheckEnabled if digestCfg != nil && digestCfg.Workflow != nil { gracePeriodDays = digestCfg.Workflow.GracePeriodDays + overdueCheckEnabled = digestCfg.Workflow.OverdueCheckEnabled } schedulerWorker := workflow.NewWorkflowSchedulerWorker( workflowManager, workflowInstService, + workflow.NewOverdueService( + db, + workflowExecService, + stepExecService, + evidenceIntegration, + logger, + gracePeriodDays, + ), + overdueCheckEnabled, logger, gracePeriodDays, ) diff --git a/internal/workflow/assignment.go b/internal/workflow/assignment.go index 7a6376f0..bb376880 100644 --- a/internal/workflow/assignment.go +++ b/internal/workflow/assignment.go @@ -221,7 +221,8 @@ func (s *AssignmentService) BulkReassignByRole( func isReassignableStatus(status string) bool { return status == workflows.StepStatusPending.String() || status == workflows.StepStatusBlocked.String() || - status == workflows.StepStatusInProgress.String() + status == workflows.StepStatusInProgress.String() || + status == workflows.StepStatusOverdue.String() } func (s *AssignmentService) validateAssignee(assignee Assignee) error { diff --git a/internal/workflow/assignment_test.go b/internal/workflow/assignment_test.go index b32cd8c8..2c290b33 100644 --- a/internal/workflow/assignment_test.go +++ b/internal/workflow/assignment_test.go @@ -274,6 +274,31 @@ func TestReassignStep_RejectsInvalidStatus(t *testing.T) { } } +func TestReassignStep_AllowsOverdueStatus(t *testing.T) { + db := setupAssignmentServiceTestDB(t) + roleService := new(MockRoleAssignmentService) + service := NewAssignmentService(roleService, db) + + _, _, stepExec := createAssignmentServiceGraph(t, db) + stepExec.Status = workflows.StepStatusOverdue.String() + require.NoError(t, db.Save(stepExec).Error) + + err := service.ReassignStep( + context.Background(), + *stepExec.ID, + Assignee{Type: "group", ID: "new-group"}, + "", + nil, + "actor@example.com", + ) + require.NoError(t, err) + + var updated workflows.StepExecution + require.NoError(t, db.First(&updated, stepExec.ID).Error) + assert.Equal(t, "group", updated.AssignedToType) + assert.Equal(t, "new-group", updated.AssignedToID) +} + func TestReassignStep_RejectsInvalidAssigneeAndMissingUser(t *testing.T) { db := setupAssignmentServiceTestDB(t) roleService := new(MockRoleAssignmentService) diff --git a/internal/workflow/constants.go b/internal/workflow/constants.go index e4135547..0bc4c421 100644 --- a/internal/workflow/constants.go +++ b/internal/workflow/constants.go @@ -26,6 +26,7 @@ const ( StatusPending StepStatus = "pending" StatusBlocked StepStatus = "blocked" StatusInProgress StepStatus = "in_progress" + StatusOverdue StepStatus = "overdue" StatusCompleted StepStatus = "completed" StatusFailed StepStatus = "failed" StatusSkipped StepStatus = "skipped" @@ -35,7 +36,7 @@ const ( // IsValid checks if the step status is valid func (s StepStatus) IsValid() bool { switch s { - case StatusPending, StatusBlocked, StatusInProgress, StatusCompleted, StatusFailed, StatusSkipped, StatusCancelled: + case StatusPending, StatusBlocked, StatusInProgress, StatusOverdue, StatusCompleted, StatusFailed, StatusSkipped, StatusCancelled: return true } return false diff --git a/internal/workflow/evidence.go b/internal/workflow/evidence.go index 17bbc895..c6fe7e52 100644 --- a/internal/workflow/evidence.go +++ b/internal/workflow/evidence.go @@ -5,6 +5,8 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "sort" + "strings" "time" "github.com/compliance-framework/api/internal/service/relational" @@ -508,6 +510,146 @@ func (e *EvidenceIntegration) AddExecutionCompletionEvidence(ctx context.Context return nil } +// AddExecutionFailureEvidence adds workflow execution failure evidence to both execution and instance streams. +func (e *EvidenceIntegration) AddExecutionFailureEvidence(ctx context.Context, workflowExecutionID *uuid.UUID) error { + execution, err := e.workflowExecutionSvc.GetByID(workflowExecutionID) + if err != nil { + return fmt.Errorf("failed to get workflow execution: %w", err) + } + if execution.Status != workflows.WorkflowStatusFailed.String() { + return fmt.Errorf("workflow execution is not failed, status: %s", execution.Status) + } + + stepExecutions, err := e.stepExecutionSvc.GetByWorkflowExecutionID(workflowExecutionID) + if err != nil { + return fmt.Errorf("failed to get step executions: %w", err) + } + + completedCount := 0 + failedCount := 0 + overdueCount := 0 + unresolvedAssignees := make(map[string]struct{}) + for _, step := range stepExecutions { + switch step.Status { + case workflows.StepStatusCompleted.String(): + completedCount++ + case workflows.StepStatusOverdue.String(): + overdueCount++ + case workflows.StepStatusFailed.String(): + failedCount++ + } + + if step.Status != workflows.StepStatusCompleted.String() { + key := step.AssignedToType + ":" + step.AssignedToID + if step.AssignedToID != "" { + unresolvedAssignees[key] = struct{}{} + } + } + } + + assignees := make([]string, 0, len(unresolvedAssignees)) + for key := range unresolvedAssignees { + assignees = append(assignees, key) + } + sort.Strings(assignees) + + const failureDescriptionTemplate = `Workflow Execution Failed +Execution ID: %s +Started: %s +Failed: %s +Completed Steps: %d +Failed Steps: %d +Overdue Steps: %d +Unresolved Assignees: %s` + + description := fmt.Sprintf( + failureDescriptionTemplate, + execution.ID, + formatOptionalTime(execution.StartedAt), + formatOptionalTime(execution.FailedAt), + completedCount, + failedCount, + overdueCount, + strings.Join(assignees, ","), + ) + + if err := e.addFailureEvidenceToStream(ctx, execution, description, completedCount, failedCount, overdueCount, assignees, true); err != nil { + return err + } + if err := e.addFailureEvidenceToStream(ctx, execution, description, completedCount, failedCount, overdueCount, assignees, false); err != nil { + return err + } + + e.logger.Infow("Execution failure evidence added", + "workflow_execution_id", workflowExecutionID, + "failed_steps", failedCount, + "overdue_steps", overdueCount, + ) + return nil +} + +func (e *EvidenceIntegration) addFailureEvidenceToStream( + ctx context.Context, + execution *workflows.WorkflowExecution, + description string, + completedCount int, + failedCount int, + overdueCount int, + unresolvedAssignees []string, + executionStream bool, +) error { + var stream *relational.Evidence + var err error + if executionStream { + stream, err = e.GetOrCreateExecutionStream(ctx, execution.ID) + } else { + stream, err = e.GetOrCreateInstanceStream(ctx, execution.WorkflowInstanceID) + } + if err != nil { + return err + } + + evidence := &relational.Evidence{ + UUID: stream.UUID, + Title: "Workflow Execution Failed", + Description: description, + Start: nowOrValue(execution.StartedAt), + End: nowOrValue(execution.FailedAt), + Status: datatypes.NewJSONType[oscalTypes_1_1_3.ObjectiveStatus](oscalTypes_1_1_3.ObjectiveStatus{State: "not-satisfied"}), + } + id := uuid.New() + evidence.ID = &id + if err := e.db.Create(evidence).Error; err != nil { + return err + } + + labels := []relational.Labels{ + {Name: "workflow.execution.id", Value: execution.ID.String()}, + {Name: "workflow.execution.status", Value: execution.Status}, + {Name: "evidence.type", Value: "execution_failure"}, + {Name: "workflow.failed_steps", Value: fmt.Sprintf("%d", failedCount)}, + {Name: "workflow.overdue_steps", Value: fmt.Sprintf("%d", overdueCount)}, + {Name: "workflow.completed_steps", Value: fmt.Sprintf("%d", completedCount)}, + {Name: "workflow.unresolved_assignees", Value: strings.Join(unresolvedAssignees, ",")}, + } + + return e.db.Model(evidence).Association("Labels").Append(labels) +} + +func formatOptionalTime(ts *time.Time) string { + if ts == nil { + return "unknown" + } + return ts.Format(time.RFC3339) +} + +func nowOrValue(ts *time.Time) time.Time { + if ts == nil { + return time.Now() + } + return *ts +} + // generateExecutionStreamUUID generates a deterministic UUID for an execution stream based on labels func (e *EvidenceIntegration) generateExecutionStreamUUID( definition *workflows.WorkflowDefinition, diff --git a/internal/workflow/executor.go b/internal/workflow/executor.go index dd8bedb6..1428a3f3 100644 --- a/internal/workflow/executor.go +++ b/internal/workflow/executor.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/compliance-framework/api/internal/config" "github.com/compliance-framework/api/internal/service/relational/workflows" "github.com/google/uuid" ) @@ -139,6 +140,12 @@ func (e *DAGExecutor) InitializeWorkflow(ctx context.Context, workflowExecutionI if err != nil { return fmt.Errorf("failed to get workflow execution: %w", err) } + // Early guard to avoid unnecessary work when initialization is no longer valid. + // We intentionally re-check this again later (after DB writes) to protect against races. + if workflowExecution.Status != StatusPending.String() { + e.logger.Printf("Skipping workflow initialization for execution %s in status %s", workflowExecutionID.String(), workflowExecution.Status) + return nil + } // Get all step definitions for this workflow stepDefinitions, err := e.stepDefinitionService.GetByWorkflowDefinitionID(workflowExecution.WorkflowInstance.WorkflowDefinitionID) @@ -161,8 +168,29 @@ func (e *DAGExecutor) InitializeWorkflow(ctx context.Context, workflowExecutionI } } + retryCompletedSteps := e.resolveRetryCompletedStepDefinitions(workflowExecution) + existingStepExecutions, err := e.stepExecutionService.GetByWorkflowExecutionID(workflowExecutionID) + if err != nil { + return fmt.Errorf("failed to get existing step executions: %w", err) + } + existingByStepDef := make(map[uuid.UUID]workflows.StepExecution, len(existingStepExecutions)) + for _, existing := range existingStepExecutions { + if existing.WorkflowStepDefinitionID != nil { + existingByStepDef[*existing.WorkflowStepDefinitionID] = existing + } + } + // Create step execution records for each step definition + executionStart := time.Now() + if workflowExecution.StartedAt != nil { + executionStart = *workflowExecution.StartedAt + } + for _, stepDef := range stepDefinitions { + if _, exists := existingByStepDef[*stepDef.ID]; exists { + continue + } + // Get dependencies for this step dependencies, _ := e.stepDefinitionService.GetDependencies(stepDef.ID) @@ -171,12 +199,20 @@ func (e *DAGExecutor) InitializeWorkflow(ctx context.Context, workflowExecutionI if len(dependencies) > 0 { initialStatus = StatusBlocked.String() } + if retryCompletedSteps[*stepDef.ID] { + initialStatus = StatusCompleted.String() + } stepExecution := &workflows.StepExecution{ WorkflowExecutionID: workflowExecutionID, WorkflowStepDefinitionID: stepDef.ID, Status: initialStatus, } + if initialStatus != StatusBlocked.String() { + graceDays := resolveStepGraceDays(workflowExecution, stepDef) + dueDate := executionStart.AddDate(0, 0, graceDays) + stepExecution.DueDate = &dueDate + } // Apply assignment if resolved if assignee, ok := assignments[*stepDef.ID]; ok { @@ -185,12 +221,32 @@ func (e *DAGExecutor) InitializeWorkflow(ctx context.Context, workflowExecutionI now := time.Now() stepExecution.AssignedAt = &now } + if initialStatus == StatusCompleted.String() { + stepExecution.StartedAt = &executionStart + stepExecution.CompletedAt = &executionStart + } if err := e.stepExecutionService.Create(stepExecution); err != nil { return fmt.Errorf("failed to create step execution for step %s: %w", stepDef.ID.String(), err) } } + if len(retryCompletedSteps) > 0 { + if err := e.unblockReadySteps(workflowExecutionID); err != nil { + e.logger.Printf("Warning: failed to unblock ready steps: %v", err) + } + } + + // Re-check status to avoid stale transitions if another worker changed it concurrently. + workflowExecution, err = e.workflowExecutionService.GetByID(workflowExecutionID) + if err != nil { + return fmt.Errorf("failed to reload workflow execution: %w", err) + } + if workflowExecution.Status != StatusPending.String() { + e.logger.Printf("Skipping in_progress transition for execution %s now in status %s", workflowExecutionID.String(), workflowExecution.Status) + return nil + } + // Update workflow execution status to in_progress if err := e.workflowExecutionService.UpdateStatus(ctx, workflowExecutionID, StatusInProgress.String()); err != nil { return fmt.Errorf("failed to update workflow execution status: %w", err) @@ -200,6 +256,49 @@ func (e *DAGExecutor) InitializeWorkflow(ctx context.Context, workflowExecutionI return nil } +func (e *DAGExecutor) resolveRetryCompletedStepDefinitions(workflowExecution *workflows.WorkflowExecution) map[uuid.UUID]bool { + completed := make(map[uuid.UUID]bool) + if workflowExecution == nil || workflowExecution.TriggeredBy != workflows.TriggerManual.String() { + return completed + } + + retrySourceID, err := uuid.Parse(workflowExecution.TriggeredByID) + if err != nil { + return completed + } + + sourceExecution, err := e.workflowExecutionService.GetByID(&retrySourceID) + if err != nil || sourceExecution == nil || sourceExecution.WorkflowInstanceID == nil || + workflowExecution.WorkflowInstanceID == nil || + *sourceExecution.WorkflowInstanceID != *workflowExecution.WorkflowInstanceID { + return completed + } + + sourceSteps, err := e.stepExecutionService.GetByWorkflowExecutionID(sourceExecution.ID) + if err != nil { + return completed + } + + for _, step := range sourceSteps { + if step.Status == workflows.StepStatusCompleted.String() && step.WorkflowStepDefinitionID != nil { + completed[*step.WorkflowStepDefinitionID] = true + } + } + + return completed +} + +func (e *DAGExecutor) unblockReadySteps(workflowExecutionID *uuid.UUID) error { + stepExecutions, err := e.stepExecutionService.GetByWorkflowExecutionID(workflowExecutionID) + if err != nil { + return err + } + for i := range stepExecutions { + _ = e.tryUnblockStep(&stepExecutions[i]) + } + return nil +} + // initializeExecutionState creates and initializes the execution state for a workflow func (e *DAGExecutor) initializeExecutionState(workflowExecutionID *uuid.UUID, stepDefinitions []workflows.WorkflowStepDefinition) *ExecutionState { state := &ExecutionState{ @@ -270,6 +369,50 @@ func (e *DAGExecutor) ProcessStepCompletion(ctx context.Context, stepExecutionID return nil } +// ProcessStepFailure propagates a failed step through dependent steps and reevaluates workflow completion. +func (e *DAGExecutor) ProcessStepFailure(ctx context.Context, stepExecutionID *uuid.UUID) error { + stepExecution, err := e.stepExecutionService.GetByID(stepExecutionID) + if err != nil { + return fmt.Errorf("failed to get step execution: %w", err) + } + + visited := make(map[uuid.UUID]bool) + var skipDependents func(stepDefID *uuid.UUID) error + skipDependents = func(stepDefID *uuid.UUID) error { + dependents, err := e.stepDefinitionService.GetDependentSteps(stepDefID) + if err != nil { + return err + } + for _, dep := range dependents { + if dep.ID == nil || visited[*dep.ID] { + continue + } + visited[*dep.ID] = true + + depExec := e.findDependentStepExecution(stepExecution.WorkflowExecutionID, dep.ID) + if depExec != nil && depExec.Status != StatusCompleted.String() && + depExec.Status != StatusFailed.String() && + depExec.Status != StatusSkipped.String() && + depExec.Status != StatusCancelled.String() { + if err := e.stepExecutionService.UpdateStatus(ctx, depExec.ID, StatusSkipped.String()); err != nil { + e.logger.Printf("Failed to skip dependent step %s: %v", depExec.ID.String(), err) + } + } + + if err := skipDependents(dep.ID); err != nil { + return err + } + } + return nil + } + + if err := skipDependents(stepExecution.WorkflowStepDefinitionID); err != nil { + return fmt.Errorf("failed to propagate step failure: %w", err) + } + + return e.checkWorkflowCompletion(ctx, stepExecution.WorkflowExecutionID) +} + // unblockDependentSteps processes dependent steps and unblocks those that are ready func (e *DAGExecutor) unblockDependentSteps(stepExecution *workflows.StepExecution, dependentSteps []workflows.WorkflowStepDefinition) int { unblockedCount := 0 @@ -391,18 +534,21 @@ func (e *DAGExecutor) checkWorkflowCompletion(ctx context.Context, workflowExecu } completedCount := 0 + skippedCount := 0 failedCount := 0 for _, stepExec := range stepExecutions { switch stepExec.Status { case StatusCompleted.String(): completedCount++ + case StatusSkipped.String(): + skippedCount++ case StatusFailed.String(): failedCount++ } } - // Check if all steps are completed or failed - if completedCount+failedCount == len(stepExecutions) { + // Check if all steps are in terminal states + if completedCount+failedCount+skippedCount == len(stepExecutions) { if failedCount > 0 { // Workflow failed reason := fmt.Sprintf("%d of %d steps failed", failedCount, len(stepExecutions)) @@ -411,7 +557,7 @@ func (e *DAGExecutor) checkWorkflowCompletion(ctx context.Context, workflowExecu } e.logger.Printf("Workflow execution failed: %s", reason) } else { - // All steps completed successfully + // All steps reached successful terminal states (completed/skipped) if err := e.workflowExecutionService.UpdateStatus(ctx, workflowExecutionID, StatusCompleted.String()); err != nil { return fmt.Errorf("failed to mark workflow as completed: %w", err) } @@ -430,6 +576,22 @@ func (e *DAGExecutor) checkWorkflowCompletion(ctx context.Context, workflowExecu return nil } +func resolveStepGraceDays(workflowExecution *workflows.WorkflowExecution, stepDef workflows.WorkflowStepDefinition) int { + if stepDef.GracePeriodDays != nil { + return *stepDef.GracePeriodDays + } + if workflowExecution.WorkflowInstance != nil && workflowExecution.WorkflowInstance.GracePeriodDays != nil { + return *workflowExecution.WorkflowInstance.GracePeriodDays + } + if workflowExecution.WorkflowInstance != nil && workflowExecution.WorkflowInstance.WorkflowDefinition != nil && + workflowExecution.WorkflowInstance.WorkflowDefinition.GracePeriodDays != nil { + return *workflowExecution.WorkflowInstance.WorkflowDefinition.GracePeriodDays + } + // Step due-date initialization uses global workflow defaults from config. + // Execution failure grace in OverdueService can use an injected default to support worker-level overrides. + return config.DefaultWorkflowConfig().GracePeriodDays +} + // CheckAutomaticTriggers checks if a step has automatic triggers configured // and evaluates them (for future Phase 5 implementation) func (e *DAGExecutor) CheckAutomaticTriggers(ctx context.Context, stepExecutionID *uuid.UUID) error { diff --git a/internal/workflow/executor_test.go b/internal/workflow/executor_test.go index a82cd24d..020d68b4 100644 --- a/internal/workflow/executor_test.go +++ b/internal/workflow/executor_test.go @@ -322,6 +322,30 @@ func TestIsExecutionComplete(t *testing.T) { assert.True(t, executor.isExecutionComplete(state)) } +func TestResolveStepGraceDays_Preference(t *testing.T) { + defGrace := 9 + instanceGrace := 5 + stepGrace := 2 + + workflowExecution := &workflows.WorkflowExecution{ + WorkflowInstance: &workflows.WorkflowInstance{ + GracePeriodDays: &instanceGrace, + WorkflowDefinition: &workflows.WorkflowDefinition{ + GracePeriodDays: &defGrace, + }, + }, + } + + stepWithOverride := workflows.WorkflowStepDefinition{GracePeriodDays: &stepGrace} + assert.Equal(t, stepGrace, resolveStepGraceDays(workflowExecution, stepWithOverride)) + + stepWithoutOverride := workflows.WorkflowStepDefinition{} + assert.Equal(t, instanceGrace, resolveStepGraceDays(workflowExecution, stepWithoutOverride)) + + workflowExecution.WorkflowInstance.GracePeriodDays = nil + assert.Equal(t, defGrace, resolveStepGraceDays(workflowExecution, stepWithoutOverride)) +} + func TestInitializeWorkflow_Success(t *testing.T) { mockStepExecService := &MockStepExecutionService{} mockWorkflowExecService := &MockWorkflowExecutionService{} @@ -359,6 +383,7 @@ func TestInitializeWorkflow_Success(t *testing.T) { mockStepDefService.On("GetByWorkflowDefinitionID", &workflowDefID).Return(stepDefinitions, nil) mockStepDefService.On("GetDependencies", &stepDefID1).Return([]workflows.WorkflowStepDefinition{}, nil) mockStepDefService.On("GetDependencies", &stepDefID2).Return([]workflows.WorkflowStepDefinition{workflows.WorkflowStepDefinition{UUIDModel: relational.UUIDModel{ID: &stepDefID1}}}, nil) + mockStepExecService.On("GetByWorkflowExecutionID", &workflowExecutionID).Return([]workflows.StepExecution{}, nil) mockWorkflowExecService.On("UpdateStatus", mock.Anything, &workflowExecutionID, "in_progress").Return(nil) // Mock assignment service diff --git a/internal/workflow/manager.go b/internal/workflow/manager.go index f40a2858..6ef95bbc 100644 --- a/internal/workflow/manager.go +++ b/internal/workflow/manager.go @@ -153,7 +153,7 @@ func (m *Manager) GetExecutionStatus(ctx context.Context, executionID *uuid.UUID } // Count steps by status - var pending, blocked, inProgress, completed, failed, cancelled int + var pending, blocked, inProgress, overdue, completed, failed, cancelled int for _, step := range stepExecutions { switch step.Status { case "pending": @@ -162,6 +162,8 @@ func (m *Manager) GetExecutionStatus(ctx context.Context, executionID *uuid.UUID blocked++ case "in_progress": inProgress++ + case "overdue": + overdue++ case "completed": completed++ case "failed": @@ -178,6 +180,7 @@ func (m *Manager) GetExecutionStatus(ctx context.Context, executionID *uuid.UUID PendingSteps: pending, BlockedSteps: blocked, InProgressSteps: inProgress, + OverdueSteps: overdue, CompletedSteps: completed, FailedSteps: failed, CancelledSteps: cancelled, @@ -364,6 +367,7 @@ type ExecutionStatus struct { PendingSteps int BlockedSteps int InProgressSteps int + OverdueSteps int CompletedSteps int FailedSteps int CancelledSteps int diff --git a/internal/workflow/overdue.go b/internal/workflow/overdue.go new file mode 100644 index 00000000..0e68f0bd --- /dev/null +++ b/internal/workflow/overdue.go @@ -0,0 +1,218 @@ +package workflow + +import ( + "context" + "fmt" + "time" + + "github.com/compliance-framework/api/internal/service/relational/workflows" + "github.com/google/uuid" + "gorm.io/gorm" +) + +// OverdueService automates overdue and failed status transitions for workflow and step executions. +type OverdueService struct { + db *gorm.DB + workflowExecutionService *workflows.WorkflowExecutionService + stepExecutionService *workflows.StepExecutionService + evidenceIntegration *EvidenceIntegration + logger Logger + defaultGracePeriodDays int +} + +func NewOverdueService( + db *gorm.DB, + workflowExecutionService *workflows.WorkflowExecutionService, + stepExecutionService *workflows.StepExecutionService, + evidenceIntegration *EvidenceIntegration, + logger Logger, + defaultGracePeriodDays int, +) *OverdueService { + return &OverdueService{ + db: db, + workflowExecutionService: workflowExecutionService, + stepExecutionService: stepExecutionService, + evidenceIntegration: evidenceIntegration, + logger: logger, + defaultGracePeriodDays: defaultGracePeriodDays, + } +} + +// CheckOverdueExecutions marks workflow executions as overdue once due date passes. +func (s *OverdueService) CheckOverdueExecutions(ctx context.Context) (int, error) { + var executions []workflows.WorkflowExecution + now := time.Now() + if err := s.db.WithContext(ctx). + Where("status IN ? AND due_date IS NOT NULL AND due_date < ?", []string{ + workflows.WorkflowStatusPending.String(), + workflows.WorkflowStatusInProgress.String(), + }, now). + Find(&executions).Error; err != nil { + return 0, fmt.Errorf("failed to query overdue executions: %w", err) + } + + updated := 0 + for i := range executions { + if err := s.workflowExecutionService.UpdateStatus(ctx, executions[i].ID, workflows.WorkflowStatusOverdue.String()); err != nil { + s.logger.Errorw("Failed to mark workflow execution overdue", "workflow_execution_id", executions[i].ID, "error", err) + continue + } + updated++ + } + + s.logger.Infow("Checked overdue workflow executions", "found", len(executions), "updated", updated) + return updated, nil +} + +// CheckOverdueSteps marks incomplete steps as overdue once step due date passes. +func (s *OverdueService) CheckOverdueSteps(ctx context.Context) (int, error) { + var steps []workflows.StepExecution + now := time.Now() + if err := s.db.WithContext(ctx). + Where("status IN ? AND due_date IS NOT NULL AND due_date < ?", []string{ + workflows.StepStatusPending.String(), + workflows.StepStatusInProgress.String(), + }, now). + Find(&steps).Error; err != nil { + return 0, fmt.Errorf("failed to query overdue steps: %w", err) + } + + updated := 0 + for i := range steps { + if err := s.stepExecutionService.UpdateStatus(ctx, steps[i].ID, workflows.StepStatusOverdue.String()); err != nil { + s.logger.Errorw("Failed to mark step overdue", "step_execution_id", steps[i].ID, "error", err) + continue + } + updated++ + } + + executionUpdates, err := s.markExecutionsOverdueFromStepOverdue(ctx) + if err != nil { + return updated, err + } + + s.logger.Infow("Checked overdue workflow steps", "found", len(steps), "updated", updated, "execution_status_updated", executionUpdates) + return updated, nil +} + +// CheckFailedExecutions marks overdue executions as failed after overdue grace period expires. +func (s *OverdueService) CheckFailedExecutions(ctx context.Context) (int, error) { + var overdueExecutions []workflows.WorkflowExecution + if err := s.db.WithContext(ctx). + Where("status = ? AND overdue_at IS NOT NULL", workflows.WorkflowStatusOverdue.String()). + Preload("WorkflowInstance"). + Preload("WorkflowInstance.WorkflowDefinition"). + Find(&overdueExecutions).Error; err != nil { + return 0, fmt.Errorf("failed to query overdue executions for failure transition: %w", err) + } + + now := time.Now() + failed := 0 + for i := range overdueExecutions { + exec := overdueExecutions[i] + graceDays := s.resolveExecutionGraceDays(&exec) + if exec.OverdueAt == nil || exec.OverdueAt.AddDate(0, 0, graceDays).After(now) { + continue + } + + if err := s.failExecutionAndSteps(ctx, &exec); err != nil { + s.logger.Errorw("Failed to mark overdue execution as failed", "workflow_execution_id", exec.ID, "error", err) + continue + } + failed++ + } + + s.logger.Infow("Checked failed workflow executions", "checked", len(overdueExecutions), "failed", failed) + return failed, nil +} + +func (s *OverdueService) resolveExecutionGraceDays(execution *workflows.WorkflowExecution) int { + if execution.WorkflowInstance != nil && execution.WorkflowInstance.GracePeriodDays != nil { + return *execution.WorkflowInstance.GracePeriodDays + } + if execution.WorkflowInstance != nil && execution.WorkflowInstance.WorkflowDefinition != nil && + execution.WorkflowInstance.WorkflowDefinition.GracePeriodDays != nil { + return *execution.WorkflowInstance.WorkflowDefinition.GracePeriodDays + } + return s.defaultGracePeriodDays +} + +func (s *OverdueService) failExecutionAndSteps(ctx context.Context, execution *workflows.WorkflowExecution) error { + now := time.Now() + failureReason := "overdue - grace period expired" + executionFailed := false + + if err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + execResult := tx.Model(&workflows.WorkflowExecution{}). + Where("id = ? AND status = ?", execution.ID, workflows.WorkflowStatusOverdue.String()). + Updates(map[string]interface{}{ + "status": workflows.WorkflowStatusFailed.String(), + "failed_at": now, + "failure_reason": failureReason, + }) + if execResult.Error != nil { + return execResult.Error + } + if execResult.RowsAffected == 0 { + return nil + } + executionFailed = true + + if err := tx.Model(&workflows.StepExecution{}). + Where("workflow_execution_id = ? AND status IN ?", execution.ID, []string{ + workflows.StepStatusPending.String(), + workflows.StepStatusBlocked.String(), + workflows.StepStatusInProgress.String(), + workflows.StepStatusOverdue.String(), + }). + Updates(map[string]interface{}{ + "status": workflows.StepStatusFailed.String(), + "failed_at": now, + "failure_reason": failureReason, + }).Error; err != nil { + return err + } + + return nil + }); err != nil { + return err + } + if !executionFailed { + return nil + } + + if s.evidenceIntegration != nil { + if err := s.evidenceIntegration.AddExecutionFailureEvidence(ctx, execution.ID); err != nil { + s.logger.Warnw("Failed to add workflow execution failure evidence", "workflow_execution_id", execution.ID, "error", err) + } + } + + return nil +} + +func (s *OverdueService) markExecutionsOverdueFromStepOverdue(ctx context.Context) (int, error) { + var executionIDs []uuid.UUID + if err := s.db.WithContext(ctx). + Table("step_executions se"). + Select("DISTINCT se.workflow_execution_id"). + Joins("JOIN workflow_executions we ON we.id = se.workflow_execution_id"). + Where("se.status = ? AND we.status IN ?", workflows.StepStatusOverdue.String(), []string{ + workflows.WorkflowStatusPending.String(), + workflows.WorkflowStatusInProgress.String(), + }). + Scan(&executionIDs).Error; err != nil { + return 0, fmt.Errorf("failed to query executions with overdue steps: %w", err) + } + + updated := 0 + for i := range executionIDs { + executionID := executionIDs[i] + if err := s.workflowExecutionService.UpdateStatus(ctx, &executionID, workflows.WorkflowStatusOverdue.String()); err != nil { + s.logger.Errorw("Failed to mark workflow execution overdue from step overdue", "workflow_execution_id", executionID, "error", err) + continue + } + updated++ + } + + return updated, nil +} diff --git a/internal/workflow/overdue_test.go b/internal/workflow/overdue_test.go new file mode 100644 index 00000000..2379067d --- /dev/null +++ b/internal/workflow/overdue_test.go @@ -0,0 +1,234 @@ +package workflow + +import ( + "context" + "testing" + "time" + + "github.com/compliance-framework/api/internal/service/relational" + "github.com/compliance-framework/api/internal/service/relational/workflows" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +func setupOverdueTestDB(t *testing.T) *gorm.DB { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + require.NoError(t, err) + + err = db.AutoMigrate( + &workflows.WorkflowDefinition{}, + &workflows.WorkflowStepDefinition{}, + &workflows.StepDependency{}, + &workflows.StepTrigger{}, + &workflows.WorkflowInstance{}, + &workflows.RoleAssignment{}, + &workflows.WorkflowExecution{}, + &workflows.StepExecution{}, + &workflows.StepEvidence{}, + &workflows.StepReassignmentHistory{}, + &workflows.ControlRelationship{}, + ) + require.NoError(t, err) + return db +} + +func createOverdueFixture(t *testing.T, db *gorm.DB) (*workflows.WorkflowDefinition, *workflows.WorkflowInstance, *workflows.WorkflowExecution, *workflows.StepExecution) { + defID := uuid.New() + defGrace := 1 + def := &workflows.WorkflowDefinition{ + UUIDModel: relational.UUIDModel{ID: &defID}, + Name: "Overdue Workflow", + Version: "1.0", + SuggestedCadence: string(workflows.CadenceWeekly), + GracePeriodDays: &defGrace, + } + require.NoError(t, db.Create(def).Error) + + stepDefID := uuid.New() + stepDef := &workflows.WorkflowStepDefinition{ + UUIDModel: relational.UUIDModel{ID: &stepDefID}, + Name: "Upload Certificate", + ResponsibleRole: "employee", + WorkflowDefinitionID: &defID, + } + require.NoError(t, db.Create(stepDef).Error) + + instanceID := uuid.New() + sspID := uuid.New() + instance := &workflows.WorkflowInstance{ + UUIDModel: relational.UUIDModel{ID: &instanceID}, + Name: "Overdue Instance", + WorkflowDefinitionID: &defID, + SystemSecurityPlanID: &sspID, + Cadence: string(workflows.CadenceWeekly), + IsActive: true, + } + require.NoError(t, db.Create(instance).Error) + + execID := uuid.New() + started := time.Now().Add(-72 * time.Hour) + due := time.Now().Add(-48 * time.Hour) + exec := &workflows.WorkflowExecution{ + UUIDModel: relational.UUIDModel{ID: &execID}, + Status: workflows.WorkflowStatusInProgress.String(), + TriggeredBy: workflows.TriggerManual.String(), + TriggeredByID: "user", + WorkflowInstanceID: &instanceID, + StartedAt: &started, + DueDate: &due, + } + require.NoError(t, db.Create(exec).Error) + + stepID := uuid.New() + stepDue := time.Now().Add(-24 * time.Hour) + step := &workflows.StepExecution{ + UUIDModel: relational.UUIDModel{ID: &stepID}, + WorkflowExecutionID: &execID, + WorkflowStepDefinitionID: &stepDefID, + Status: workflows.StepStatusInProgress.String(), + DueDate: &stepDue, + AssignedToType: "user", + AssignedToID: "u1", + } + require.NoError(t, db.Create(step).Error) + + return def, instance, exec, step +} + +func TestOverdueService_CheckOverdueTransitions(t *testing.T) { + db := setupOverdueTestDB(t) + _, _, exec, step := createOverdueFixture(t, db) + + workflowExecSvc := workflows.NewWorkflowExecutionService(db) + stepExecSvc := workflows.NewStepExecutionService(db, nil) + svc := NewOverdueService(db, workflowExecSvc, stepExecSvc, nil, zap.NewNop().Sugar(), 7) + + updatedSteps, err := svc.CheckOverdueSteps(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, updatedSteps) + + updatedExecutions, err := svc.CheckOverdueExecutions(context.Background()) + require.NoError(t, err) + assert.Equal(t, 0, updatedExecutions) + + var stepAfter workflows.StepExecution + require.NoError(t, db.First(&stepAfter, step.ID).Error) + assert.Equal(t, workflows.StepStatusOverdue.String(), stepAfter.Status) + require.NotNil(t, stepAfter.OverdueAt) + + var execAfter workflows.WorkflowExecution + require.NoError(t, db.First(&execAfter, exec.ID).Error) + assert.Equal(t, workflows.WorkflowStatusOverdue.String(), execAfter.Status) + require.NotNil(t, execAfter.OverdueAt) +} + +func TestOverdueService_CheckFailedExecutions_StepOverduePromotesExecutionAndFailsWithZeroGrace(t *testing.T) { + db := setupOverdueTestDB(t) + def, instance, exec, step := createOverdueFixture(t, db) + + zero := 0 + require.NoError(t, db.Model(&workflows.WorkflowDefinition{}). + Where("id = ?", def.ID). + Update("grace_period_days", zero).Error) + require.NoError(t, db.Model(&workflows.WorkflowInstance{}). + Where("id = ?", instance.ID). + Update("grace_period_days", zero).Error) + require.NoError(t, db.Model(&workflows.WorkflowExecution{}). + Where("id = ?", exec.ID). + Update("due_date", nil).Error) + + workflowExecSvc := workflows.NewWorkflowExecutionService(db) + stepExecSvc := workflows.NewStepExecutionService(db, nil) + svc := NewOverdueService(db, workflowExecSvc, stepExecSvc, nil, zap.NewNop().Sugar(), 0) + + updatedSteps, err := svc.CheckOverdueSteps(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, updatedSteps) + + failed, err := svc.CheckFailedExecutions(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, failed) + + var execAfter workflows.WorkflowExecution + require.NoError(t, db.First(&execAfter, exec.ID).Error) + assert.Equal(t, workflows.WorkflowStatusFailed.String(), execAfter.Status) + require.NotNil(t, execAfter.FailedAt) + + var stepAfter workflows.StepExecution + require.NoError(t, db.First(&stepAfter, step.ID).Error) + assert.Equal(t, workflows.StepStatusFailed.String(), stepAfter.Status) + require.NotNil(t, stepAfter.FailedAt) +} + +func TestOverdueService_CheckFailedExecutions(t *testing.T) { + db := setupOverdueTestDB(t) + _, _, exec, step := createOverdueFixture(t, db) + + overdueAt := time.Now().Add(-48 * time.Hour) + require.NoError(t, db.Model(&workflows.WorkflowExecution{}). + Where("id = ?", exec.ID). + Updates(map[string]interface{}{ + "status": workflows.WorkflowStatusOverdue.String(), + "overdue_at": overdueAt, + }).Error) + + require.NoError(t, db.Model(&workflows.StepExecution{}). + Where("id = ?", step.ID). + Updates(map[string]interface{}{ + "status": workflows.StepStatusOverdue.String(), + "overdue_at": overdueAt, + }).Error) + + workflowExecSvc := workflows.NewWorkflowExecutionService(db) + stepExecSvc := workflows.NewStepExecutionService(db, nil) + svc := NewOverdueService(db, workflowExecSvc, stepExecSvc, nil, zap.NewNop().Sugar(), 1) + + failed, err := svc.CheckFailedExecutions(context.Background()) + require.NoError(t, err) + assert.Equal(t, 1, failed) + + var execAfter workflows.WorkflowExecution + require.NoError(t, db.First(&execAfter, exec.ID).Error) + assert.Equal(t, workflows.WorkflowStatusFailed.String(), execAfter.Status) + assert.Equal(t, "overdue - grace period expired", execAfter.FailureReason) + require.NotNil(t, execAfter.FailedAt) + + var stepAfter workflows.StepExecution + require.NoError(t, db.First(&stepAfter, step.ID).Error) + assert.Equal(t, workflows.StepStatusFailed.String(), stepAfter.Status) + assert.Equal(t, "overdue - grace period expired", stepAfter.FailureReason) + require.NotNil(t, stepAfter.FailedAt) +} + +func TestOverdueService_CheckOverdueSteps_DoesNotMarkBlockedSteps(t *testing.T) { + db := setupOverdueTestDB(t) + _, _, _, step := createOverdueFixture(t, db) + + pastDue := time.Now().Add(-24 * time.Hour) + require.NoError(t, db.Model(&workflows.StepExecution{}). + Where("id = ?", step.ID). + Updates(map[string]interface{}{ + "status": workflows.StepStatusBlocked.String(), + "due_date": pastDue, + }).Error) + + workflowExecSvc := workflows.NewWorkflowExecutionService(db) + stepExecSvc := workflows.NewStepExecutionService(db, nil) + svc := NewOverdueService(db, workflowExecSvc, stepExecSvc, nil, zap.NewNop().Sugar(), 7) + + updatedSteps, err := svc.CheckOverdueSteps(context.Background()) + require.NoError(t, err) + assert.Equal(t, 0, updatedSteps) + + var stepAfter workflows.StepExecution + require.NoError(t, db.First(&stepAfter, step.ID).Error) + assert.Equal(t, workflows.StepStatusBlocked.String(), stepAfter.Status) + assert.Nil(t, stepAfter.OverdueAt) +} diff --git a/internal/workflow/scheduler.go b/internal/workflow/scheduler.go index 2b2f4de0..95e3b28d 100644 --- a/internal/workflow/scheduler.go +++ b/internal/workflow/scheduler.go @@ -14,6 +14,8 @@ import ( type WorkflowSchedulerWorker struct { manager *Manager workflowInstanceService WorkflowInstanceServiceInterface + overdueService *OverdueService + overdueCheckEnabled bool logger Logger defaultGracePeriod int } @@ -22,12 +24,16 @@ type WorkflowSchedulerWorker struct { func NewWorkflowSchedulerWorker( manager *Manager, workflowInstanceService WorkflowInstanceServiceInterface, + overdueService *OverdueService, + overdueCheckEnabled bool, logger Logger, defaultGracePeriod int, ) *WorkflowSchedulerWorker { return &WorkflowSchedulerWorker{ manager: manager, workflowInstanceService: workflowInstanceService, + overdueService: overdueService, + overdueCheckEnabled: overdueCheckEnabled, logger: logger, defaultGracePeriod: defaultGracePeriod, } @@ -176,5 +182,17 @@ func (w *WorkflowSchedulerWorker) Work(ctx context.Context, job *river.Job[Sched "errors", errorCount, ) + if w.overdueCheckEnabled && w.overdueService != nil { + if _, err := w.overdueService.CheckOverdueSteps(ctx); err != nil { + w.logger.Errorw("Failed to check overdue steps", "error", err) + } + if _, err := w.overdueService.CheckOverdueExecutions(ctx); err != nil { + w.logger.Errorw("Failed to check overdue executions", "error", err) + } + if _, err := w.overdueService.CheckFailedExecutions(ctx); err != nil { + w.logger.Errorw("Failed to check failed executions", "error", err) + } + } + return nil } diff --git a/internal/workflow/step_transition.go b/internal/workflow/step_transition.go index d8057bfb..b216aa90 100644 --- a/internal/workflow/step_transition.go +++ b/internal/workflow/step_transition.go @@ -2,6 +2,7 @@ package workflow import ( "context" + "errors" "fmt" "strings" "time" @@ -27,6 +28,8 @@ type StepTransitionService struct { evidenceIntegration *EvidenceIntegration } +var ErrInvalidStepTransition = errors.New("invalid step transition") + // WorkflowDefinitionServiceInterface defines the interface for workflow definition operations type WorkflowDefinitionServiceInterface interface { GetByID(id *uuid.UUID) (*workflows.WorkflowDefinition, error) @@ -157,6 +160,19 @@ func (s *StepTransitionService) TransitionStepStatus(ctx context.Context, stepEx return nil } +// FailStep marks a step as failed and propagates failure through dependent steps. +func (s *StepTransitionService) FailStep(ctx context.Context, stepExecutionID *uuid.UUID, reason string) error { + if err := s.stepExecutionService.Fail(stepExecutionID, reason); err != nil { + return err + } + if s.executor != nil { + if err := s.executor.ProcessStepFailure(ctx, stepExecutionID); err != nil { + return err + } + } + return nil +} + // verifyUserPermission checks if the user has permission to transition the step func (s *StepTransitionService) verifyUserPermission(instanceID *uuid.UUID, responsibleRole, userID, userType string) error { // Find the role assignment for the responsible role @@ -185,6 +201,7 @@ func (s *StepTransitionService) validateTransition(currentStatus, newStatus stri allowedTransitions := map[string][]string{ StatusPending.String(): {StatusInProgress.String()}, StatusInProgress.String(): {StatusCompleted.String()}, + StatusOverdue.String(): {StatusCompleted.String()}, StatusBlocked.String(): {}, // Blocked steps cannot be manually transitioned StatusCompleted.String(): {}, // Completed steps cannot be changed StatusFailed.String(): {}, // Failed steps cannot be manually changed (only by executor) @@ -192,7 +209,7 @@ func (s *StepTransitionService) validateTransition(currentStatus, newStatus stri allowed, exists := allowedTransitions[currentStatus] if !exists { - return fmt.Errorf("invalid current status: %s", currentStatus) + return fmt.Errorf("%w: invalid current status: %s", ErrInvalidStepTransition, currentStatus) } // Check if the new status is in the allowed list @@ -202,7 +219,7 @@ func (s *StepTransitionService) validateTransition(currentStatus, newStatus stri } } - return fmt.Errorf("transition from %s to %s is not allowed", currentStatus, newStatus) + return fmt.Errorf("%w: transition from %s to %s is not allowed", ErrInvalidStepTransition, currentStatus, newStatus) } // validateEvidenceRequirements validates that all required evidence has been submitted