From 64599ee377e3087163faba67cb2b9811eca8a8d1 Mon Sep 17 00:00:00 2001 From: Gustavo Carvalho Date: Thu, 19 Feb 2026 07:28:02 -0300 Subject: [PATCH 01/13] feat: digest feat: new task assigned Signed-off-by: Gustavo Carvalho --- Makefile | 2 +- internal/api/handler/api.go | 4 +- .../step_execution_integration_test.go | 2 +- .../workflow_execution_integration_test.go | 2 +- .../service/email/templates/service_test.go | 76 +++++ .../templates/workflow-task-assigned.html | 195 ++++++++++++ .../templates/workflow-task-assigned.txt | 19 ++ .../templates/workflow-task-due-soon.html | 193 ++++++++++++ .../templates/workflow-task-due-soon.txt | 19 ++ internal/service/worker/due_soon_checker.go | 117 ++++++++ internal/service/worker/jobs.go | 284 +++++++++++++++++- internal/service/worker/service.go | 83 ++++- internal/service/worker/service_test.go | 14 +- internal/service/worker/user_repository.go | 46 +++ .../workflow_task_assigned_worker_test.go | 151 ++++++++++ .../workflow_task_due_soon_worker_test.go | 142 +++++++++ internal/workflow/assignment.go | 47 ++- internal/workflow/assignment_test.go | 15 +- internal/workflow/executor.go | 23 +- .../workflow/executor_integration_test.go | 9 +- 20 files changed, 1410 insertions(+), 33 deletions(-) create mode 100644 internal/service/email/templates/templates/workflow-task-assigned.html create mode 100644 internal/service/email/templates/templates/workflow-task-assigned.txt create mode 100644 internal/service/email/templates/templates/workflow-task-due-soon.html create mode 100644 internal/service/email/templates/templates/workflow-task-due-soon.txt create mode 100644 internal/service/worker/due_soon_checker.go create mode 100644 internal/service/worker/user_repository.go create mode 100644 internal/service/worker/workflow_task_assigned_worker_test.go create mode 100644 internal/service/worker/workflow_task_due_soon_worker_test.go diff --git a/Makefile b/Makefile index 067b572d..e6c8a6b2 100644 --- a/Makefile +++ b/Makefile @@ -109,7 +109,7 @@ lint: lint.check ## Run golangci-lint ##@ Run .PHONY: reviewable -reviewable: swag # Ensure a PR is ready for review. +reviewable: lint test-integration # Ensure a PR is ready for review. @go mod tidy .PHONY: check-diff diff --git a/internal/api/handler/api.go b/internal/api/handler/api.go index 12c99b5f..ca56af91 100644 --- a/internal/api/handler/api.go +++ b/internal/api/handler/api.go @@ -84,7 +84,7 @@ func registerWorkflowHandlers(server *api.Server, logger *zap.SugaredLogger, db // registerWorkflowExecutionHandlers registers execution-related handlers that require the workflow manager func registerWorkflowExecutionHandlers(workflowGroup *echo.Group, logger *zap.SugaredLogger, db *gorm.DB, workflowManager *workflow.Manager) { roleAssignmentService := workflowsvc.NewRoleAssignmentService(db) - assignmentService := workflow.NewAssignmentService(roleAssignmentService, db) + assignmentService := workflow.NewAssignmentService(roleAssignmentService, db, logger) // Workflow execution handler workflowExecutionHandler := workflows.NewWorkflowExecutionHandler(logger, db, workflowManager, assignmentService) @@ -107,7 +107,7 @@ func createStepTransitionService(db *gorm.DB, logger *zap.SugaredLogger) *workfl roleAssignmentService := workflowsvc.NewRoleAssignmentService(db) // Create assignment service - assignmentService := workflow.NewAssignmentService(roleAssignmentService, db) + assignmentService := workflow.NewAssignmentService(roleAssignmentService, db, logger) // Create executor for step transition coordination stdLogger := log.Default() diff --git a/internal/api/handler/workflows/step_execution_integration_test.go b/internal/api/handler/workflows/step_execution_integration_test.go index 82290256..0c5c2e06 100644 --- a/internal/api/handler/workflows/step_execution_integration_test.go +++ b/internal/api/handler/workflows/step_execution_integration_test.go @@ -43,7 +43,7 @@ func setupStepExecutionTestHandler(t *testing.T) (*StepExecutionHandler, *gorm.D evidenceIntegration.SetWorkflowExecutionService(workflowExecService) // Create assignment service - assignmentService := workflow.NewAssignmentService(roleAssignmentService, db) + assignmentService := workflow.NewAssignmentService(roleAssignmentService, db, zap.NewNop().Sugar()) // Create executor for step transition coordination stdLogger := log.Default() diff --git a/internal/api/handler/workflows/workflow_execution_integration_test.go b/internal/api/handler/workflows/workflow_execution_integration_test.go index 7231cd97..4c346a1e 100644 --- a/internal/api/handler/workflows/workflow_execution_integration_test.go +++ b/internal/api/handler/workflows/workflow_execution_integration_test.go @@ -48,7 +48,7 @@ func setupExecutionTestHandler(t *testing.T) (*WorkflowExecutionHandler, *gorm.D workflowExecService := workflows.NewWorkflowExecutionService(db) workflowInstService := workflows.NewWorkflowInstanceService(db) roleAssignmentService := workflows.NewRoleAssignmentService(db) - assignmentService := workflow.NewAssignmentService(roleAssignmentService, db) + assignmentService := workflow.NewAssignmentService(roleAssignmentService, db, zap.NewNop().Sugar()) // Create a mock river client for testing mockRiver := &MockRiverClient{} diff --git a/internal/service/email/templates/service_test.go b/internal/service/email/templates/service_test.go index ddc43f93..82ad3016 100644 --- a/internal/service/email/templates/service_test.go +++ b/internal/service/email/templates/service_test.go @@ -56,6 +56,82 @@ func TestTemplateService_MissingTemplates(t *testing.T) { require.Error(t, err, "UseText should error for missing template") } +func TestTemplateService_WorkflowTaskAssigned(t *testing.T) { + service, err := NewTemplateService() + require.NoError(t, err) + + dueDate := "2026-03-01 09:00:00 +0000 UTC" + data := TemplateData{ + "UserName": "Alice Smith", + "StepTitle": "Review Policy", + "WorkflowTitle": "Annual Audit", + "WorkflowInstanceTitle": "Audit 2026", + "StepURL": "https://app.example.com/steps/abc", + "DueDate": dueDate, + } + + html, text, err := service.Use("workflow-task-assigned", data) + require.NoError(t, err) + require.NotEmpty(t, html) + require.NotEmpty(t, text) + require.Contains(t, html, "Alice Smith") + require.Contains(t, html, "Review Policy") + require.Contains(t, html, "Annual Audit") + require.Contains(t, html, "https://app.example.com/steps/abc") + require.Contains(t, text, "Alice Smith") + require.Contains(t, text, "Review Policy") + require.Contains(t, text, "https://app.example.com/steps/abc") +} + +func TestTemplateService_WorkflowTaskAssigned_NoDueDate(t *testing.T) { + service, err := NewTemplateService() + require.NoError(t, err) + + data := TemplateData{ + "UserName": "Bob", + "StepTitle": "Submit Evidence", + "WorkflowTitle": "SOC2 Audit", + "WorkflowInstanceTitle": "SOC2 2026", + "StepURL": "https://app.example.com/steps/xyz", + "DueDate": nil, + } + + html, text, err := service.Use("workflow-task-assigned", data) + require.NoError(t, err) + require.NotEmpty(t, html) + require.NotEmpty(t, text) + require.Contains(t, html, "Bob") + require.NotContains(t, html, "Due Date") +} + +func TestTemplateService_WorkflowTaskDueSoon(t *testing.T) { + service, err := NewTemplateService() + require.NoError(t, err) + + data := TemplateData{ + "UserName": "Alice Smith", + "StepTitle": "Submit Evidence", + "WorkflowTitle": "SOC2 Audit", + "WorkflowInstanceTitle": "SOC2 2026", + "StepURL": "https://app.example.com/steps/abc", + "DueDate": "2026-03-01", + } + + html, text, err := service.Use("workflow-task-due-soon", data) + require.NoError(t, err) + require.NotEmpty(t, html) + require.NotEmpty(t, text) + require.Contains(t, html, "Alice Smith") + require.Contains(t, html, "Submit Evidence") + require.Contains(t, html, "SOC2 Audit") + require.Contains(t, html, "https://app.example.com/steps/abc") + require.Contains(t, html, "2026-03-01") + require.Contains(t, text, "Alice Smith") + require.Contains(t, text, "Submit Evidence") + require.Contains(t, text, "https://app.example.com/steps/abc") + require.Contains(t, text, "TOMORROW") +} + func TestTemplateService_ListTemplates(t *testing.T) { service, err := NewTemplateService() require.NoError(t, err, "Failed to create template service") diff --git a/internal/service/email/templates/templates/workflow-task-assigned.html b/internal/service/email/templates/templates/workflow-task-assigned.html new file mode 100644 index 00000000..51b184e1 --- /dev/null +++ b/internal/service/email/templates/templates/workflow-task-assigned.html @@ -0,0 +1,195 @@ + + + + + + Task Ready for You + + + + + + diff --git a/internal/service/email/templates/templates/workflow-task-assigned.txt b/internal/service/email/templates/templates/workflow-task-assigned.txt new file mode 100644 index 00000000..8c0a918c --- /dev/null +++ b/internal/service/email/templates/templates/workflow-task-assigned.txt @@ -0,0 +1,19 @@ +Task Ready for You +================== + +Hello {{.UserName}}, + +A workflow task has been assigned to you and is ready for action. + +TASK DETAILS +------------ +Task: {{.StepTitle}} +Workflow: {{.WorkflowTitle}} +Instance: {{.WorkflowInstanceTitle}} +{{if .DueDate}}Due Date: {{.DueDate}} +{{end}} +View your task: {{.StepURL}} + +--- +This is an automated notification from Compliance Framework. +Please do not reply to this email. diff --git a/internal/service/email/templates/templates/workflow-task-due-soon.html b/internal/service/email/templates/templates/workflow-task-due-soon.html new file mode 100644 index 00000000..05e1a509 --- /dev/null +++ b/internal/service/email/templates/templates/workflow-task-due-soon.html @@ -0,0 +1,193 @@ + + + + + + Task Due Tomorrow + + + + + + diff --git a/internal/service/email/templates/templates/workflow-task-due-soon.txt b/internal/service/email/templates/templates/workflow-task-due-soon.txt new file mode 100644 index 00000000..6b757a74 --- /dev/null +++ b/internal/service/email/templates/templates/workflow-task-due-soon.txt @@ -0,0 +1,19 @@ +Task Due Tomorrow +================= + +Hello {{.UserName}}, + +This is a reminder that the following task is due TOMORROW. Please make sure to complete it on time. + +TASK DETAILS +------------ +Task: {{.StepTitle}} +Workflow: {{.WorkflowTitle}} +Instance: {{.WorkflowInstanceTitle}} +Due Date: {{.DueDate}} + +Complete your task: {{.StepURL}} + +--- +This is an automated notification from Compliance Framework. +Please do not reply to this email. diff --git a/internal/service/worker/due_soon_checker.go b/internal/service/worker/due_soon_checker.go new file mode 100644 index 00000000..0e6b1555 --- /dev/null +++ b/internal/service/worker/due_soon_checker.go @@ -0,0 +1,117 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/compliance-framework/api/internal/service/relational/workflows" + "github.com/riverqueue/river" + "github.com/riverqueue/river/rivertype" + "gorm.io/gorm" +) + +// DueSoonCheckerArgs represents the arguments for the periodic due-soon checker job +type DueSoonCheckerArgs struct{} + +// Kind returns the job kind for River +func (DueSoonCheckerArgs) Kind() string { return "workflow_due_soon_checker" } + +// Timeout returns the timeout for the due-soon checker job +func (DueSoonCheckerArgs) Timeout() time.Duration { return 5 * time.Minute } + +// DueSoonCheckerWorker scans for step executions due tomorrow and enqueues reminder emails +type DueSoonCheckerWorker struct { + db *gorm.DB + client RiverInserter + logger Logger +} + +// RiverInserter is the minimal interface for inserting River jobs +type RiverInserter interface { + InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) +} + +// NewDueSoonCheckerWorker creates a new DueSoonCheckerWorker +func NewDueSoonCheckerWorker(db *gorm.DB, client RiverInserter, logger Logger) *DueSoonCheckerWorker { + return &DueSoonCheckerWorker{ + db: db, + client: client, + logger: logger, + } +} + +// Work scans for step executions due in ~24 hours and enqueues WorkflowTaskDueSoonArgs jobs +func (w *DueSoonCheckerWorker) Work(ctx context.Context, job *river.Job[DueSoonCheckerArgs]) error { + now := time.Now() + windowStart := now.Add(23 * time.Hour) + windowEnd := now.Add(25 * time.Hour) + + var steps []workflows.StepExecution + if err := w.db.WithContext(ctx). + Preload("WorkflowExecution.WorkflowInstance.WorkflowDefinition"). + Preload("WorkflowStepDefinition"). + Where("status IN ? AND due_date IS NOT NULL AND due_date >= ? AND due_date <= ? AND assigned_to_type = ? AND assigned_to_id != ''", + []string{ + workflows.StepStatusPending.String(), + workflows.StepStatusInProgress.String(), + }, + windowStart, + windowEnd, + workflows.AssignmentTypeUser.String(), + ). + Find(&steps).Error; err != nil { + return fmt.Errorf("due-soon checker: failed to query steps: %w", err) + } + + if len(steps) == 0 { + w.logger.Infow("DueSoonCheckerWorker: no steps due soon", "window_start", windowStart, "window_end", windowEnd) + return nil + } + + params := make([]river.InsertManyParams, 0, len(steps)) + for i := range steps { + step := &steps[i] + if step.DueDate == nil { + continue + } + + stepTitle := "" + workflowTitle := "" + workflowInstanceTitle := "" + if step.WorkflowStepDefinition != nil { + stepTitle = step.WorkflowStepDefinition.Name + } + if step.WorkflowExecution != nil && step.WorkflowExecution.WorkflowInstance != nil { + if step.WorkflowExecution.WorkflowInstance.WorkflowDefinition != nil { + workflowTitle = step.WorkflowExecution.WorkflowInstance.WorkflowDefinition.Name + } + workflowInstanceTitle = step.WorkflowExecution.WorkflowInstance.Name + } + + args := WorkflowTaskDueSoonArgs{ + UserID: step.AssignedToID, + StepExecutionID: step.ID.String(), + StepTitle: stepTitle, + WorkflowTitle: workflowTitle, + WorkflowInstanceTitle: workflowInstanceTitle, + StepURL: "", + DueDate: *step.DueDate, + } + params = append(params, river.InsertManyParams{ + Args: args, + InsertOpts: JobInsertOptionsForWorkflowNotification(), + }) + } + + if len(params) == 0 { + return nil + } + + if _, err := w.client.InsertMany(ctx, params); err != nil { + return fmt.Errorf("due-soon checker: failed to enqueue reminder jobs: %w", err) + } + + w.logger.Infow("DueSoonCheckerWorker: enqueued due-soon reminders", "count", len(params)) + return nil +} diff --git a/internal/service/worker/jobs.go b/internal/service/worker/jobs.go index b232b309..970e5c4e 100644 --- a/internal/service/worker/jobs.go +++ b/internal/service/worker/jobs.go @@ -17,6 +17,58 @@ const ( JobTypeSendGlobalDigest = "send_global_digest" ) +// Job types for workflow notifications +const ( + JobTypeWorkflowTaskAssigned = "workflow_task_assigned" + JobTypeWorkflowTaskDueSoon = "workflow_task_due_soon" + JobTypeWorkflowTaskDigest = "workflow_task_digest" +) + +// WorkflowTaskAssignedArgs represents the arguments for a new-task-assigned notification email +type WorkflowTaskAssignedArgs struct { + UserID string `json:"user_id"` + StepExecutionID string `json:"step_execution_id"` + StepTitle string `json:"step_title"` + WorkflowTitle string `json:"workflow_title"` + WorkflowInstanceTitle string `json:"workflow_instance_title"` + StepURL string `json:"step_url"` + DueDate *time.Time `json:"due_date,omitempty"` +} + +// WorkflowTaskDueSoonArgs represents the arguments for a task-due-in-1-day reminder email +type WorkflowTaskDueSoonArgs struct { + UserID string `json:"user_id"` + StepExecutionID string `json:"step_execution_id"` + StepTitle string `json:"step_title"` + WorkflowTitle string `json:"workflow_title"` + WorkflowInstanceTitle string `json:"workflow_instance_title"` + StepURL string `json:"step_url"` + DueDate time.Time `json:"due_date"` +} + +// WorkflowTaskDigestArgs represents the arguments for a per-user task digest email +type WorkflowTaskDigestArgs struct { + UserID string `json:"user_id"` +} + +// Kind returns the job kind for River +func (WorkflowTaskAssignedArgs) Kind() string { return JobTypeWorkflowTaskAssigned } + +// Kind returns the job kind for River +func (WorkflowTaskDueSoonArgs) Kind() string { return JobTypeWorkflowTaskDueSoon } + +// Kind returns the job kind for River +func (WorkflowTaskDigestArgs) Kind() string { return JobTypeWorkflowTaskDigest } + +// Timeout returns the timeout for workflow task assigned jobs +func (WorkflowTaskAssignedArgs) Timeout() time.Duration { return 30 * time.Second } + +// Timeout returns the timeout for workflow task due soon jobs +func (WorkflowTaskDueSoonArgs) Timeout() time.Duration { return 30 * time.Second } + +// Timeout returns the timeout for workflow task digest jobs +func (WorkflowTaskDigestArgs) Timeout() time.Duration { return 5 * time.Minute } + // SendEmailArgs represents the arguments for sending an email type SendEmailArgs struct { // Email message fields @@ -66,6 +118,23 @@ func (SendGlobalDigestArgs) Kind() string { return JobTypeSendGlobalDigest } type EmailService interface { Send(ctx context.Context, message *types.Message) (*types.SendResult, error) SendWithProvider(ctx context.Context, providerName string, message *types.Message) (*types.SendResult, error) + UseTemplate(templateName string, data map[string]interface{}) (htmlContent, textContent string, err error) + GetDefaultFromAddress() string +} + +// UserRepository is the minimal DB interface needed by notification workers +type UserRepository interface { + FindUserByID(ctx context.Context, userID string) (NotificationUser, error) +} + +// NotificationUser holds the user fields needed for sending notification emails +type NotificationUser struct { + ID string + Email string + FirstName string + LastName string + TaskAvailableEmailSubscribed bool + TaskDailyDigestSubscribed bool } // Logger interface for logging @@ -282,6 +351,210 @@ func (w *SendGlobalDigestWorker) Work(ctx context.Context, job *river.Job[SendGl return nil } +// WorkflowTaskAssignedWorker handles new-task-assigned notification email jobs +type WorkflowTaskAssignedWorker struct { + emailService EmailService + userRepo UserRepository + logger Logger +} + +// NewWorkflowTaskAssignedWorker creates a new WorkflowTaskAssignedWorker +func NewWorkflowTaskAssignedWorker(emailService EmailService, userRepo UserRepository, logger Logger) *WorkflowTaskAssignedWorker { + return &WorkflowTaskAssignedWorker{ + emailService: emailService, + userRepo: userRepo, + logger: logger, + } +} + +// Work is the River work function for sending new-task-assigned notification emails +func (w *WorkflowTaskAssignedWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskAssignedArgs]) error { + args := job.Args + + user, err := w.userRepo.FindUserByID(ctx, args.UserID) + if err != nil { + w.logger.Warnw("WorkflowTaskAssignedWorker: user not found, skipping", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "error", err, + ) + return nil + } + + if !user.TaskAvailableEmailSubscribed { + w.logger.Debugw("WorkflowTaskAssignedWorker: user not subscribed, skipping", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + ) + return nil + } + + userName := user.FirstName + if user.LastName != "" { + userName = user.FirstName + " " + user.LastName + } + + templateData := map[string]interface{}{ + "UserName": userName, + "StepTitle": args.StepTitle, + "WorkflowTitle": args.WorkflowTitle, + "WorkflowInstanceTitle": args.WorkflowInstanceTitle, + "StepURL": args.StepURL, + "DueDate": args.DueDate, + } + + htmlBody, textBody, err := w.emailService.UseTemplate("workflow-task-assigned", templateData) + if err != nil { + w.logger.Errorw("WorkflowTaskAssignedWorker: failed to render template", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "error", err, + ) + return fmt.Errorf("failed to render workflow-task-assigned template: %w", err) + } + + message := &types.Message{ + From: w.emailService.GetDefaultFromAddress(), + To: []string{user.Email}, + Subject: fmt.Sprintf("Task ready for you: %s — %s", args.StepTitle, args.WorkflowTitle), + HTMLBody: htmlBody, + TextBody: textBody, + } + + result, err := w.emailService.Send(ctx, message) + if err != nil { + w.logger.Errorw("WorkflowTaskAssignedWorker: failed to send email", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "error", err, + ) + return fmt.Errorf("failed to send workflow-task-assigned email: %w", err) + } + + if !result.Success { + w.logger.Errorw("WorkflowTaskAssignedWorker: email send reported failure", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "error", result.Error, + ) + return fmt.Errorf("workflow-task-assigned email send failed: %s", result.Error) + } + + w.logger.Infow("WorkflowTaskAssignedWorker: email sent", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "message_id", result.MessageID, + ) + + return nil +} + +// WorkflowTaskDueSoonWorker handles task-due-in-1-day reminder email jobs +type WorkflowTaskDueSoonWorker struct { + emailService EmailService + userRepo UserRepository + logger Logger +} + +// NewWorkflowTaskDueSoonWorker creates a new WorkflowTaskDueSoonWorker +func NewWorkflowTaskDueSoonWorker(emailService EmailService, userRepo UserRepository, logger Logger) *WorkflowTaskDueSoonWorker { + return &WorkflowTaskDueSoonWorker{ + emailService: emailService, + userRepo: userRepo, + logger: logger, + } +} + +// Work is the River work function for sending task-due-in-1-day reminder emails +func (w *WorkflowTaskDueSoonWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskDueSoonArgs]) error { + args := job.Args + + user, err := w.userRepo.FindUserByID(ctx, args.UserID) + if err != nil { + w.logger.Warnw("WorkflowTaskDueSoonWorker: user not found, skipping", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "error", err, + ) + return nil + } + + if !user.TaskAvailableEmailSubscribed { + w.logger.Debugw("WorkflowTaskDueSoonWorker: user not subscribed, skipping", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + ) + return nil + } + + userName := user.FirstName + if user.LastName != "" { + userName = user.FirstName + " " + user.LastName + } + + templateData := map[string]interface{}{ + "UserName": userName, + "StepTitle": args.StepTitle, + "WorkflowTitle": args.WorkflowTitle, + "WorkflowInstanceTitle": args.WorkflowInstanceTitle, + "StepURL": args.StepURL, + "DueDate": args.DueDate.Format("2006-01-02"), + } + + htmlBody, textBody, err := w.emailService.UseTemplate("workflow-task-due-soon", templateData) + if err != nil { + w.logger.Errorw("WorkflowTaskDueSoonWorker: failed to render template", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "error", err, + ) + return fmt.Errorf("failed to render workflow-task-due-soon template: %w", err) + } + + message := &types.Message{ + From: w.emailService.GetDefaultFromAddress(), + To: []string{user.Email}, + Subject: fmt.Sprintf("Reminder: %s is due tomorrow — %s", args.StepTitle, args.WorkflowTitle), + HTMLBody: htmlBody, + TextBody: textBody, + } + + result, err := w.emailService.Send(ctx, message) + if err != nil { + w.logger.Errorw("WorkflowTaskDueSoonWorker: failed to send email", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "error", err, + ) + return fmt.Errorf("failed to send workflow-task-due-soon email: %w", err) + } + + if !result.Success { + w.logger.Errorw("WorkflowTaskDueSoonWorker: email send reported failure", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "error", result.Error, + ) + return fmt.Errorf("workflow-task-due-soon email send failed: %s", result.Error) + } + + w.logger.Infow("WorkflowTaskDueSoonWorker: email sent", + "step_execution_id", args.StepExecutionID, + "user_id", args.UserID, + "message_id", result.MessageID, + ) + + return nil +} + +// JobInsertOptionsForWorkflowNotification returns insert options for workflow notification email jobs +func JobInsertOptionsForWorkflowNotification() *river.InsertOpts { + return &river.InsertOpts{ + Queue: "email", + MaxAttempts: 5, + } +} + // JobInsertOptions returns common insert options for email jobs func JobInsertOptions() *river.InsertOpts { return &river.InsertOpts{ @@ -310,7 +583,7 @@ func JobInsertOptionsWithRetry(queue string, maxAttempts int) *river.InsertOpts } // Workers returns all workers as work functions with dependencies injected -func Workers(emailService EmailService, digestService DigestService, logger Logger) *river.Workers { +func Workers(emailService EmailService, digestService DigestService, userRepo UserRepository, logger Logger) *river.Workers { workers := river.NewWorkers() // Create worker instances with dependencies @@ -326,5 +599,14 @@ func Workers(emailService EmailService, digestService DigestService, logger Logg river.AddWorker(workers, river.WorkFunc(sendGlobalDigestWorker.Work)) } + // Register workflow notification workers if dependencies are available + if userRepo != nil { + workflowTaskAssignedWorker := NewWorkflowTaskAssignedWorker(emailService, userRepo, logger) + river.AddWorker(workers, river.WorkFunc(workflowTaskAssignedWorker.Work)) + + workflowTaskDueSoonWorker := NewWorkflowTaskDueSoonWorker(emailService, userRepo, logger) + river.AddWorker(workers, river.WorkFunc(workflowTaskDueSoonWorker.Work)) + } + return workers } diff --git a/internal/service/worker/service.go b/internal/service/worker/service.go index 0d19495e..e7f2bc6c 100644 --- a/internal/service/worker/service.go +++ b/internal/service/worker/service.go @@ -31,6 +31,7 @@ type Service struct { db *gorm.DB emailSvc *email.Service digestSvc DigestService + userRepo UserRepository logger *zap.SugaredLogger started bool startedMu sync.RWMutex @@ -130,7 +131,7 @@ func NewServiceWithDigest( roleAssignmentService := workflows.NewRoleAssignmentService(db) // Create assignment service - assignmentService := workflow.NewAssignmentService(roleAssignmentService, db) + assignmentService := workflow.NewAssignmentService(roleAssignmentService, db, logger) // Create workflow executor workflowLogger := log.New(os.Stdout, "[WORKFLOW] ", log.LstdFlags) @@ -194,13 +195,18 @@ func NewServiceWithDigest( // Register workers with dependencies injected // We start with the email/digest workers - workers := Workers(emailSvc, digestSvc, logger) + userRepo := NewGORMUserRepository(db) + workers := Workers(emailSvc, digestSvc, userRepo, logger) // Add workflow workers river.AddWorker(workers, river.WorkFunc(workflowExecutionWorker.Work)) river.AddWorker(workers, river.WorkFunc(stepExecutionWorker.Work)) river.AddWorker(workers, river.WorkFunc(schedulerWorker.Work)) + // Add due-soon checker worker (uses clientProxy which is wired to the real client after construction) + dueSoonCheckerWorker := NewDueSoonCheckerWorker(db, clientProxy, logger) + river.AddWorker(workers, river.WorkFunc(dueSoonCheckerWorker.Work)) + // Configure periodic jobs periodicJobs := periodicJobsFromConfig(digestCfg, logger) @@ -222,6 +228,7 @@ func NewServiceWithDigest( db: db, emailSvc: emailSvc, digestSvc: digestSvc, + userRepo: userRepo, digestCfg: digestCfg, logger: logger, started: false, @@ -235,6 +242,11 @@ func NewServiceWithDigest( stepDefinitionService: stepDefService, } + // Wire the service itself as the notification enqueuer for the executor and assignment service. + // This must happen after the service is built so the River client is available. + executor.SetNotificationEnqueuer(service) + assignmentService.SetNotificationEnqueuer(service) + return service, nil } @@ -386,6 +398,23 @@ func parseCronScheduleWithFallback(cronSchedule string, fallback string, jobName return schedule } +func NewDueSoonCheckerPeriodicJob(logger *zap.SugaredLogger) *river.PeriodicJob { + schedule := parseCronScheduleWithFallback("0 8 * * *", "0 8 * * *", "due-soon checker", logger) + + return river.NewPeriodicJob( + schedule, + func() (river.JobArgs, *river.InsertOpts) { + return &DueSoonCheckerArgs{}, &river.InsertOpts{ + Queue: "email", + MaxAttempts: 3, + } + }, + &river.PeriodicJobOpts{ + RunOnStart: false, + }, + ) +} + func periodicJobsFromConfig(cfg *config.Config, logger *zap.SugaredLogger) []*river.PeriodicJob { var periodicJobs []*river.PeriodicJob if cfg == nil { @@ -396,6 +425,7 @@ func periodicJobsFromConfig(cfg *config.Config, logger *zap.SugaredLogger) []*ri } if cfg.Workflow != nil && cfg.Workflow.SchedulerEnabled { periodicJobs = append(periodicJobs, NewWorkflowSchedulerPeriodicJob(cfg.Workflow.Schedule, logger)) + periodicJobs = append(periodicJobs, NewDueSoonCheckerPeriodicJob(logger)) } return periodicJobs } @@ -424,6 +454,55 @@ func buildRiverConfig(cfg *config.WorkerConfig, workers *river.Workers, periodic } } +// EnqueueWorkflowTaskAssigned enqueues a workflow-task-assigned notification email job. +// Implements the workflow.NotificationEnqueuer interface. +func (s *Service) EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error { + if !s.config.Enabled || s.client == nil { + return nil + } + + if stepExecution == nil { + return nil + } + + // Only enqueue for user-type assignees — email and group types are not supported yet + if stepExecution.AssignedToType != workflows.AssignmentTypeUser.String() || stepExecution.AssignedToID == "" { + return nil + } + + stepTitle := "" + workflowTitle := "" + workflowInstanceTitle := "" + if stepExecution.WorkflowStepDefinition != nil { + stepTitle = stepExecution.WorkflowStepDefinition.Name + } + if stepExecution.WorkflowExecution != nil && stepExecution.WorkflowExecution.WorkflowInstance != nil { + if stepExecution.WorkflowExecution.WorkflowInstance.WorkflowDefinition != nil { + workflowTitle = stepExecution.WorkflowExecution.WorkflowInstance.WorkflowDefinition.Name + } + workflowInstanceTitle = stepExecution.WorkflowExecution.WorkflowInstance.Name + } + + args := &WorkflowTaskAssignedArgs{ + UserID: stepExecution.AssignedToID, + StepExecutionID: stepExecution.ID.String(), + StepTitle: stepTitle, + WorkflowTitle: workflowTitle, + WorkflowInstanceTitle: workflowInstanceTitle, + StepURL: "", + DueDate: stepExecution.DueDate, + } + + _, err := s.client.InsertMany(ctx, []river.InsertManyParams{ + {Args: args, InsertOpts: JobInsertOptionsForWorkflowNotification()}, + }) + if err != nil { + return fmt.Errorf("failed to enqueue workflow-task-assigned job: %w", err) + } + + return nil +} + // EnqueueSendEmail enqueues a send email job func (s *Service) EnqueueSendEmail(ctx context.Context, args *SendEmailArgs) error { if !s.config.Enabled { diff --git a/internal/service/worker/service_test.go b/internal/service/worker/service_test.go index a476263b..c7f21a00 100644 --- a/internal/service/worker/service_test.go +++ b/internal/service/worker/service_test.go @@ -29,6 +29,16 @@ func (m *MockEmailService) SendWithProvider(ctx context.Context, providerName st return args.Get(0).(*types.SendResult), args.Error(1) } +func (m *MockEmailService) UseTemplate(templateName string, data map[string]interface{}) (string, string, error) { + args := m.Called(templateName, data) + return args.String(0), args.String(1), args.Error(2) +} + +func (m *MockEmailService) GetDefaultFromAddress() string { + args := m.Called() + return args.String(0) +} + // MockDigestService is a mock implementation of DigestService type MockDigestService struct { mock.Mock @@ -320,7 +330,7 @@ func TestWorkers(t *testing.T) { mockDigestService := &MockDigestService{} mockLogger := &MockLogger{} - workers := Workers(mockEmailService, mockDigestService, mockLogger) + workers := Workers(mockEmailService, mockDigestService, nil, mockLogger) assert.NotNil(t, workers) } @@ -365,7 +375,7 @@ func TestPeriodicJobsFromConfig_WorkflowSchedulerEnabledGuard(t *testing.T) { Schedule: "@every 15m", }, }, logger) - assert.Len(t, jobs, 1) + assert.Len(t, jobs, 2) } func TestWorkflowSchedulerPeriodicJobConstructor_InsertOpts(t *testing.T) { diff --git a/internal/service/worker/user_repository.go b/internal/service/worker/user_repository.go new file mode 100644 index 00000000..538bca88 --- /dev/null +++ b/internal/service/worker/user_repository.go @@ -0,0 +1,46 @@ +package worker + +import ( + "context" + "errors" + "fmt" + + "github.com/compliance-framework/api/internal/service/relational" + "github.com/google/uuid" + "gorm.io/gorm" +) + +// GORMUserRepository implements UserRepository using GORM +type GORMUserRepository struct { + db *gorm.DB +} + +// NewGORMUserRepository creates a new GORMUserRepository +func NewGORMUserRepository(db *gorm.DB) *GORMUserRepository { + return &GORMUserRepository{db: db} +} + +// FindUserByID looks up a user by UUID string and returns a NotificationUser +func (r *GORMUserRepository) FindUserByID(ctx context.Context, userID string) (NotificationUser, error) { + parsed, err := uuid.Parse(userID) + if err != nil { + return NotificationUser{}, fmt.Errorf("invalid user ID %q: %w", userID, err) + } + + var user relational.User + if err := r.db.WithContext(ctx).First(&user, parsed).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return NotificationUser{}, fmt.Errorf("user %s not found", userID) + } + return NotificationUser{}, fmt.Errorf("failed to fetch user %s: %w", userID, err) + } + + return NotificationUser{ + ID: user.ID.String(), + Email: user.Email, + FirstName: user.FirstName, + LastName: user.LastName, + TaskAvailableEmailSubscribed: user.TaskAvailableEmailSubscribed, + TaskDailyDigestSubscribed: user.TaskDailyDigestSubscribed, + }, nil +} diff --git a/internal/service/worker/workflow_task_assigned_worker_test.go b/internal/service/worker/workflow_task_assigned_worker_test.go new file mode 100644 index 00000000..330df962 --- /dev/null +++ b/internal/service/worker/workflow_task_assigned_worker_test.go @@ -0,0 +1,151 @@ +package worker + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/compliance-framework/api/internal/service/email/types" + "github.com/riverqueue/river" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/zap" +) + +// MockUserRepository is a mock implementation of UserRepository +type MockUserRepository struct { + mock.Mock +} + +func (m *MockUserRepository) FindUserByID(ctx context.Context, userID string) (NotificationUser, error) { + args := m.Called(ctx, userID) + return args.Get(0).(NotificationUser), args.Error(1) +} + +func makeTaskAssignedJob(args WorkflowTaskAssignedArgs) *river.Job[WorkflowTaskAssignedArgs] { + return &river.Job[WorkflowTaskAssignedArgs]{Args: args} +} + +func TestWorkflowTaskAssignedWorker_SubscribedUser_SendsEmail(t *testing.T) { + ctx := context.Background() + dueDate := time.Now().Add(48 * time.Hour) + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + user := NotificationUser{ + ID: "user-1", + Email: "alice@example.com", + FirstName: "Alice", + LastName: "Smith", + TaskAvailableEmailSubscribed: true, + } + mockRepo.On("FindUserByID", ctx, "user-1").Return(user, nil) + mockEmail.On("UseTemplate", "workflow-task-assigned", mock.Anything).Return("Task", "Task text", nil) + mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com") + mockEmail.On("Send", ctx, mock.MatchedBy(func(msg *types.Message) bool { + return msg.To[0] == "alice@example.com" + })).Return(&types.SendResult{Success: true, MessageID: "msg-1"}, nil) + + w := NewWorkflowTaskAssignedWorker(mockEmail, mockRepo, mockLog) + + args := WorkflowTaskAssignedArgs{ + UserID: "user-1", + StepExecutionID: "step-1", + StepTitle: "Review Policy", + WorkflowTitle: "Annual Audit", + WorkflowInstanceTitle: "Audit 2026", + StepURL: "https://app.example.com/steps/step-1", + DueDate: &dueDate, + } + + err := w.Work(ctx, makeTaskAssignedJob(args)) + assert.NoError(t, err) + mockEmail.AssertExpectations(t) + mockRepo.AssertExpectations(t) +} + +func TestWorkflowTaskAssignedWorker_UnsubscribedUser_Skips(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + user := NotificationUser{ + ID: "user-2", + Email: "bob@example.com", + FirstName: "Bob", + TaskAvailableEmailSubscribed: false, + } + mockRepo.On("FindUserByID", ctx, "user-2").Return(user, nil) + + w := NewWorkflowTaskAssignedWorker(mockEmail, mockRepo, mockLog) + + args := WorkflowTaskAssignedArgs{ + UserID: "user-2", + StepExecutionID: "step-2", + StepTitle: "Review Policy", + WorkflowTitle: "Annual Audit", + } + + err := w.Work(ctx, makeTaskAssignedJob(args)) + assert.NoError(t, err) + // Send must NOT be called + mockEmail.AssertNotCalled(t, "Send") +} + +func TestWorkflowTaskAssignedWorker_UserNotFound_Skips(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + mockRepo.On("FindUserByID", ctx, "missing-user").Return(NotificationUser{}, errors.New("not found")) + + w := NewWorkflowTaskAssignedWorker(mockEmail, mockRepo, mockLog) + + args := WorkflowTaskAssignedArgs{ + UserID: "missing-user", + StepExecutionID: "step-3", + } + + err := w.Work(ctx, makeTaskAssignedJob(args)) + // Should return nil (non-fatal skip) + assert.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send") +} + +func TestWorkflowTaskAssignedWorker_TemplateError_ReturnsError(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + user := NotificationUser{ + ID: "user-3", + Email: "carol@example.com", + FirstName: "Carol", + TaskAvailableEmailSubscribed: true, + } + mockRepo.On("FindUserByID", ctx, "user-3").Return(user, nil) + mockEmail.On("UseTemplate", "workflow-task-assigned", mock.Anything).Return("", "", errors.New("template broken")) + mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com") + + w := NewWorkflowTaskAssignedWorker(mockEmail, mockRepo, mockLog) + + args := WorkflowTaskAssignedArgs{ + UserID: "user-3", + StepExecutionID: "step-4", + StepTitle: "Review", + WorkflowTitle: "Audit", + } + + err := w.Work(ctx, makeTaskAssignedJob(args)) + assert.Error(t, err) + assert.Contains(t, err.Error(), "workflow-task-assigned template") +} diff --git a/internal/service/worker/workflow_task_due_soon_worker_test.go b/internal/service/worker/workflow_task_due_soon_worker_test.go new file mode 100644 index 00000000..9b5a3cbc --- /dev/null +++ b/internal/service/worker/workflow_task_due_soon_worker_test.go @@ -0,0 +1,142 @@ +package worker + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/compliance-framework/api/internal/service/email/types" + "github.com/riverqueue/river" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/zap" +) + +func makeDueSoonJob(args WorkflowTaskDueSoonArgs) *river.Job[WorkflowTaskDueSoonArgs] { + return &river.Job[WorkflowTaskDueSoonArgs]{Args: args} +} + +func TestWorkflowTaskDueSoonWorker_SubscribedUser_SendsEmail(t *testing.T) { + ctx := context.Background() + dueDate := time.Now().Add(24 * time.Hour) + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + user := NotificationUser{ + ID: "user-1", + Email: "alice@example.com", + FirstName: "Alice", + LastName: "Smith", + TaskAvailableEmailSubscribed: true, + } + mockRepo.On("FindUserByID", ctx, "user-1").Return(user, nil) + mockEmail.On("UseTemplate", "workflow-task-due-soon", mock.Anything).Return("Due Soon", "Due soon text", nil) + mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com") + mockEmail.On("Send", ctx, mock.MatchedBy(func(msg *types.Message) bool { + return msg.To[0] == "alice@example.com" && msg.Subject != "" + })).Return(&types.SendResult{Success: true, MessageID: "msg-2"}, nil) + + w := NewWorkflowTaskDueSoonWorker(mockEmail, mockRepo, mockLog) + + args := WorkflowTaskDueSoonArgs{ + UserID: "user-1", + StepExecutionID: "step-1", + StepTitle: "Submit Evidence", + WorkflowTitle: "SOC2 Audit", + WorkflowInstanceTitle: "SOC2 2026", + StepURL: "https://app.example.com/steps/step-1", + DueDate: dueDate, + } + + err := w.Work(ctx, makeDueSoonJob(args)) + assert.NoError(t, err) + mockEmail.AssertExpectations(t) + mockRepo.AssertExpectations(t) +} + +func TestWorkflowTaskDueSoonWorker_UnsubscribedUser_Skips(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + user := NotificationUser{ + ID: "user-2", + Email: "bob@example.com", + FirstName: "Bob", + TaskAvailableEmailSubscribed: false, + } + mockRepo.On("FindUserByID", ctx, "user-2").Return(user, nil) + + w := NewWorkflowTaskDueSoonWorker(mockEmail, mockRepo, mockLog) + + args := WorkflowTaskDueSoonArgs{ + UserID: "user-2", + StepExecutionID: "step-2", + StepTitle: "Submit Evidence", + WorkflowTitle: "SOC2 Audit", + DueDate: time.Now().Add(24 * time.Hour), + } + + err := w.Work(ctx, makeDueSoonJob(args)) + assert.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send") +} + +func TestWorkflowTaskDueSoonWorker_UserNotFound_Skips(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + mockRepo.On("FindUserByID", ctx, "missing-user").Return(NotificationUser{}, errors.New("not found")) + + w := NewWorkflowTaskDueSoonWorker(mockEmail, mockRepo, mockLog) + + args := WorkflowTaskDueSoonArgs{ + UserID: "missing-user", + StepExecutionID: "step-3", + DueDate: time.Now().Add(24 * time.Hour), + } + + err := w.Work(ctx, makeDueSoonJob(args)) + assert.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send") +} + +func TestWorkflowTaskDueSoonWorker_TemplateError_ReturnsError(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + user := NotificationUser{ + ID: "user-3", + Email: "carol@example.com", + FirstName: "Carol", + TaskAvailableEmailSubscribed: true, + } + mockRepo.On("FindUserByID", ctx, "user-3").Return(user, nil) + mockEmail.On("UseTemplate", "workflow-task-due-soon", mock.Anything).Return("", "", errors.New("template broken")) + mockEmail.On("GetDefaultFromAddress").Return("noreply@example.com") + + w := NewWorkflowTaskDueSoonWorker(mockEmail, mockRepo, mockLog) + + args := WorkflowTaskDueSoonArgs{ + UserID: "user-3", + StepExecutionID: "step-4", + StepTitle: "Submit Evidence", + WorkflowTitle: "SOC2 Audit", + DueDate: time.Now().Add(24 * time.Hour), + } + + err := w.Work(ctx, makeDueSoonJob(args)) + assert.Error(t, err) + assert.Contains(t, err.Error(), "workflow-task-due-soon template") +} diff --git a/internal/workflow/assignment.go b/internal/workflow/assignment.go index bb376880..77cce52b 100644 --- a/internal/workflow/assignment.go +++ b/internal/workflow/assignment.go @@ -10,6 +10,7 @@ import ( "github.com/compliance-framework/api/internal/service/relational" "github.com/compliance-framework/api/internal/service/relational/workflows" "github.com/google/uuid" + "go.uber.org/zap" "gorm.io/gorm" ) @@ -23,16 +24,24 @@ type Assignee struct { type AssignmentService struct { roleAssignmentService RoleAssignmentServiceInterface db *gorm.DB + notificationEnqueuer NotificationEnqueuer // Optional: for task-assigned notification emails + logger *zap.SugaredLogger } // NewAssignmentService creates a new assignment service -func NewAssignmentService(roleAssignmentService RoleAssignmentServiceInterface, db *gorm.DB) *AssignmentService { +func NewAssignmentService(roleAssignmentService RoleAssignmentServiceInterface, db *gorm.DB, logger *zap.SugaredLogger) *AssignmentService { return &AssignmentService{ roleAssignmentService: roleAssignmentService, db: db, + logger: logger, } } +// SetNotificationEnqueuer sets the notification enqueuer (optional) +func (s *AssignmentService) SetNotificationEnqueuer(enqueuer NotificationEnqueuer) { + s.notificationEnqueuer = enqueuer +} + // ResolveStepAssignees resolves assignees for a list of step definitions based on role assignments func (s *AssignmentService) ResolveStepAssignees(ctx context.Context, instance *workflows.WorkflowInstance, stepDefinitions []workflows.WorkflowStepDefinition) (map[uuid.UUID]Assignee, error) { assignments := make(map[uuid.UUID]Assignee) @@ -90,20 +99,23 @@ func (s *AssignmentService) ReassignStep( return err } - return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { - var stepExecution workflows.StepExecution - if err := tx.First(&stepExecution, stepExecutionID).Error; err != nil { + var updatedStepExecution workflows.StepExecution + if err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + if err := tx.First(&updatedStepExecution, stepExecutionID).Error; err != nil { return err } - if !isReassignableStatus(stepExecution.Status) { - return fmt.Errorf("%w: current status is %s", ErrReassignmentNotAllowed, stepExecution.Status) + if !isReassignableStatus(updatedStepExecution.Status) { + return fmt.Errorf("%w: current status is %s", ErrReassignmentNotAllowed, updatedStepExecution.Status) } if err := s.validateAssigneeExists(tx, newAssignee); err != nil { return err } + prevType := updatedStepExecution.AssignedToType + prevID := updatedStepExecution.AssignedToID + now := time.Now() if err := tx.Model(&workflows.StepExecution{}). Where("id = ?", stepExecutionID). @@ -114,12 +126,14 @@ func (s *AssignmentService) ReassignStep( }).Error; err != nil { return err } + updatedStepExecution.AssignedToType = newAssignee.Type + updatedStepExecution.AssignedToID = newAssignee.ID history := &workflows.StepReassignmentHistory{ - StepExecutionID: stepExecution.ID, - WorkflowExecutionID: stepExecution.WorkflowExecutionID, - PreviousAssignedToType: stepExecution.AssignedToType, - PreviousAssignedToID: stepExecution.AssignedToID, + StepExecutionID: updatedStepExecution.ID, + WorkflowExecutionID: updatedStepExecution.WorkflowExecutionID, + PreviousAssignedToType: prevType, + PreviousAssignedToID: prevID, NewAssignedToType: newAssignee.Type, NewAssignedToID: newAssignee.ID, Reason: reason, @@ -127,7 +141,18 @@ func (s *AssignmentService) ReassignStep( ReassignedByEmail: reassignedByEmail, } return tx.Create(history).Error - }) + }); err != nil { + return err + } + + if s.notificationEnqueuer != nil && newAssignee.Type == workflows.AssignmentTypeUser.String() { + if err := s.notificationEnqueuer.EnqueueWorkflowTaskAssigned(ctx, &updatedStepExecution); err != nil { + // Non-fatal: log but don't fail the reassignment + s.logger.Error("failed to enqueue workflow task assigned notification", "error", err) + } + } + + return nil } func (s *AssignmentService) BulkReassignByRole( diff --git a/internal/workflow/assignment_test.go b/internal/workflow/assignment_test.go index 2c290b33..0573f46f 100644 --- a/internal/workflow/assignment_test.go +++ b/internal/workflow/assignment_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap" "gorm.io/driver/sqlite" "gorm.io/gorm" ) @@ -62,7 +63,7 @@ func TestResolveStepAssignees(t *testing.T) { } mockRoleService := new(MockRoleAssignmentService) - assignmentService := NewAssignmentService(mockRoleService, nil) + assignmentService := NewAssignmentService(mockRoleService, nil, zap.NewNop().Sugar()) // Mock responses // Step 1: Role "admin" -> User "user-1" @@ -128,7 +129,7 @@ func TestResolveStepAssignees_NoRole(t *testing.T) { } mockRoleService := new(MockRoleAssignmentService) - assignmentService := NewAssignmentService(mockRoleService, nil) + assignmentService := NewAssignmentService(mockRoleService, nil, zap.NewNop().Sugar()) assignments, err := assignmentService.ResolveStepAssignees(context.Background(), instance, steps) assert.NoError(t, err) @@ -194,7 +195,7 @@ func createAssignmentServiceGraph(t *testing.T, db *gorm.DB) (*workflows.Workflo func TestReassignStep(t *testing.T) { db := setupAssignmentServiceTestDB(t) roleService := new(MockRoleAssignmentService) - service := NewAssignmentService(roleService, db) + service := NewAssignmentService(roleService, db, zap.NewNop().Sugar()) _, _, stepExec := createAssignmentServiceGraph(t, db) @@ -252,7 +253,7 @@ func TestReassignStep(t *testing.T) { func TestReassignStep_RejectsInvalidStatus(t *testing.T) { db := setupAssignmentServiceTestDB(t) roleService := new(MockRoleAssignmentService) - service := NewAssignmentService(roleService, db) + service := NewAssignmentService(roleService, db, zap.NewNop().Sugar()) _, _, stepExec := createAssignmentServiceGraph(t, db) @@ -277,7 +278,7 @@ func TestReassignStep_RejectsInvalidStatus(t *testing.T) { func TestReassignStep_AllowsOverdueStatus(t *testing.T) { db := setupAssignmentServiceTestDB(t) roleService := new(MockRoleAssignmentService) - service := NewAssignmentService(roleService, db) + service := NewAssignmentService(roleService, db, zap.NewNop().Sugar()) _, _, stepExec := createAssignmentServiceGraph(t, db) stepExec.Status = workflows.StepStatusOverdue.String() @@ -302,7 +303,7 @@ func TestReassignStep_AllowsOverdueStatus(t *testing.T) { func TestReassignStep_RejectsInvalidAssigneeAndMissingUser(t *testing.T) { db := setupAssignmentServiceTestDB(t) roleService := new(MockRoleAssignmentService) - service := NewAssignmentService(roleService, db) + service := NewAssignmentService(roleService, db, zap.NewNop().Sugar()) _, _, stepExec := createAssignmentServiceGraph(t, db) @@ -333,7 +334,7 @@ func TestReassignStep_RejectsInvalidAssigneeAndMissingUser(t *testing.T) { func TestBulkReassignByRole(t *testing.T) { db := setupAssignmentServiceTestDB(t) roleService := new(MockRoleAssignmentService) - service := NewAssignmentService(roleService, db) + service := NewAssignmentService(roleService, db, zap.NewNop().Sugar()) execution, stepDef, stepExec := createAssignmentServiceGraph(t, db) diff --git a/internal/workflow/executor.go b/internal/workflow/executor.go index 1428a3f3..ba7a0cbd 100644 --- a/internal/workflow/executor.go +++ b/internal/workflow/executor.go @@ -12,6 +12,12 @@ import ( "github.com/google/uuid" ) +// NotificationEnqueuer is the minimal interface for enqueuing workflow notification jobs. +// Implemented by the worker service to avoid a direct River dependency in this package. +type NotificationEnqueuer interface { + EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error +} + // DAGExecutor handles the execution of workflow DAGs with dependency resolution // and parallel step execution capabilities type DAGExecutor struct { @@ -20,6 +26,7 @@ type DAGExecutor struct { stepDefinitionService WorkflowStepDefinitionServiceInterface assignmentService AssignmentServiceInterface evidenceIntegration *EvidenceIntegration // Optional: for evidence stream integration + notificationEnqueuer NotificationEnqueuer // Optional: for workflow notification emails logger *log.Logger } @@ -87,6 +94,11 @@ func (e *DAGExecutor) SetEvidenceIntegration(evidenceIntegration *EvidenceIntegr e.evidenceIntegration = evidenceIntegration } +// SetNotificationEnqueuer sets the notification enqueuer (optional) +func (e *DAGExecutor) SetNotificationEnqueuer(enqueuer NotificationEnqueuer) { + e.notificationEnqueuer = enqueuer +} + // ExecutionState tracks the current state of a workflow execution type ExecutionState struct { WorkflowExecutionID uuid.UUID @@ -471,7 +483,16 @@ func (e *DAGExecutor) tryUnblockStep(stepExec *workflows.StepExecution) bool { } e.logger.Printf("Unblocked step: %s", stepExec.ID.String()) - // TODO: Hook for notification - step is now ready for user action + + if e.notificationEnqueuer != nil { + reloaded, err := e.stepExecutionService.GetByID(stepExec.ID) + if err == nil && reloaded != nil { + if err := e.notificationEnqueuer.EnqueueWorkflowTaskAssigned(context.Background(), reloaded); err != nil { + e.logger.Printf("Warning: failed to enqueue task-assigned notification for step %s: %v", stepExec.ID.String(), err) + } + } + } + return true } diff --git a/internal/workflow/executor_integration_test.go b/internal/workflow/executor_integration_test.go index b5d72675..61f99fd2 100644 --- a/internal/workflow/executor_integration_test.go +++ b/internal/workflow/executor_integration_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "gorm.io/driver/sqlite" "gorm.io/gorm" "gorm.io/gorm/logger" @@ -174,7 +175,7 @@ func TestDAGExecutor_Integration_InitializeWorkflow(t *testing.T) { workflowExecService := workflows.NewWorkflowExecutionService(db) stepDefService := workflows.NewWorkflowStepDefinitionService(db) roleAssignmentService := workflows.NewRoleAssignmentService(db) - assignmentService := NewAssignmentService(roleAssignmentService, db) + assignmentService := NewAssignmentService(roleAssignmentService, db, zap.NewNop().Sugar()) // Create executor logger := log.New(os.Stdout, "[TEST] ", log.LstdFlags) @@ -236,7 +237,7 @@ func TestDAGExecutor_Integration_ProcessStepCompletion(t *testing.T) { workflowExecService := workflows.NewWorkflowExecutionService(db) stepDefService := workflows.NewWorkflowStepDefinitionService(db) roleAssignmentService := workflows.NewRoleAssignmentService(db) - assignmentService := NewAssignmentService(roleAssignmentService, db) + assignmentService := NewAssignmentService(roleAssignmentService, db, zap.NewNop().Sugar()) // Create executor logger := log.New(os.Stdout, "[TEST] ", log.LstdFlags) @@ -299,7 +300,7 @@ func TestDAGExecutor_Integration_GetExecutionStatus(t *testing.T) { workflowExecService := workflows.NewWorkflowExecutionService(db) stepDefService := workflows.NewWorkflowStepDefinitionService(db) roleAssignmentService := workflows.NewRoleAssignmentService(db) - assignmentService := NewAssignmentService(roleAssignmentService, db) + assignmentService := NewAssignmentService(roleAssignmentService, db, zap.NewNop().Sugar()) // Create executor logger := log.New(os.Stdout, "[TEST] ", log.LstdFlags) @@ -358,7 +359,7 @@ func TestDAGExecutor_Integration_ParallelSteps(t *testing.T) { workflowExecService := workflows.NewWorkflowExecutionService(db) stepDefService := workflows.NewWorkflowStepDefinitionService(db) roleAssignmentService := workflows.NewRoleAssignmentService(db) - assignmentService := NewAssignmentService(roleAssignmentService, db) + assignmentService := NewAssignmentService(roleAssignmentService, db, zap.NewNop().Sugar()) // Create executor logger := log.New(os.Stdout, "[TEST] ", log.LstdFlags) From 2ab3a76d332a97ce8aad882f38a6685200e7aaba Mon Sep 17 00:00:00 2001 From: Gustavo Carvalho Date: Thu, 19 Feb 2026 07:48:09 -0300 Subject: [PATCH 02/13] feat: tasks digest email Signed-off-by: Gustavo Carvalho --- internal/config/workflow.go | 20 ++ internal/config/workflow_test.go | 33 ++++ .../service/email/templates/service_test.go | 61 ++++++ .../templates/workflow-task-digest.html | 132 +++++++++++++ .../templates/workflow-task-digest.txt | 31 +++ internal/service/worker/jobs.go | 9 +- internal/service/worker/service.go | 36 +++- internal/service/worker/service_test.go | 32 +++- .../worker/workflow_task_digest_checker.go | 87 +++++++++ .../worker/workflow_task_digest_worker.go | 178 ++++++++++++++++++ .../workflow_task_digest_worker_test.go | 53 ++++++ 11 files changed, 659 insertions(+), 13 deletions(-) create mode 100644 internal/service/email/templates/templates/workflow-task-digest.html create mode 100644 internal/service/email/templates/templates/workflow-task-digest.txt create mode 100644 internal/service/worker/workflow_task_digest_checker.go create mode 100644 internal/service/worker/workflow_task_digest_worker.go create mode 100644 internal/service/worker/workflow_task_digest_worker_test.go diff --git a/internal/config/workflow.go b/internal/config/workflow.go index 54c33e10..5ae5e552 100644 --- a/internal/config/workflow.go +++ b/internal/config/workflow.go @@ -22,6 +22,18 @@ type WorkflowConfig struct { // OverdueCheckEnabled determines if we should check for overdue workflows OverdueCheckEnabled bool `mapstructure:"overdue_check_enabled" yaml:"overdue_check_enabled" json:"overdueCheckEnabled"` + + // DueSoonEnabled determines if the daily due-soon reminder emails are enabled + DueSoonEnabled bool `mapstructure:"due_soon_enabled" yaml:"due_soon_enabled" json:"dueSoonEnabled"` + + // DueSoonSchedule is the cron schedule for the due-soon checker (default: daily at 08:00 UTC) + DueSoonSchedule string `mapstructure:"due_soon_schedule" yaml:"due_soon_schedule" json:"dueSoonSchedule"` + + // TaskDigestEnabled determines if the daily workflow task digest emails are enabled + TaskDigestEnabled bool `mapstructure:"task_digest_enabled" yaml:"task_digest_enabled" json:"taskDigestEnabled"` + + // TaskDigestSchedule is the cron schedule for the workflow task digest (default: daily at 08:00 UTC) + TaskDigestSchedule string `mapstructure:"task_digest_schedule" yaml:"task_digest_schedule" json:"taskDigestSchedule"` } // DefaultWorkflowConfig returns a default workflow configuration @@ -31,6 +43,10 @@ func DefaultWorkflowConfig() *WorkflowConfig { Schedule: "@every 15m", GracePeriodDays: 7, OverdueCheckEnabled: true, + DueSoonEnabled: false, + DueSoonSchedule: "0 8 * * *", + TaskDigestEnabled: false, + TaskDigestSchedule: "0 8 * * *", } } @@ -43,6 +59,10 @@ func LoadWorkflowConfig(path string) (*WorkflowConfig, error) { v.SetDefault("scheduler_schedule", "@every 15m") v.SetDefault("grace_period_days", 7) v.SetDefault("overdue_check_enabled", true) + v.SetDefault("due_soon_enabled", false) + v.SetDefault("due_soon_schedule", "0 8 * * *") + v.SetDefault("task_digest_enabled", false) + v.SetDefault("task_digest_schedule", "0 8 * * *") // Configure environment variable loading v.SetEnvPrefix("CCF_WORKFLOW") diff --git a/internal/config/workflow_test.go b/internal/config/workflow_test.go index 819d4dd5..dabd4c0d 100644 --- a/internal/config/workflow_test.go +++ b/internal/config/workflow_test.go @@ -15,6 +15,10 @@ func TestDefaultWorkflowConfig(t *testing.T) { assert.Equal(t, "@every 15m", config.Schedule) assert.Equal(t, 7, config.GracePeriodDays) assert.True(t, config.OverdueCheckEnabled) + assert.False(t, config.DueSoonEnabled) + assert.Equal(t, "0 8 * * *", config.DueSoonSchedule) + assert.False(t, config.TaskDigestEnabled) + assert.Equal(t, "0 8 * * *", config.TaskDigestSchedule) } func TestLoadWorkflowConfig_Defaults(t *testing.T) { @@ -23,6 +27,10 @@ func TestLoadWorkflowConfig_Defaults(t *testing.T) { require.NoError(t, os.Unsetenv("CCF_WORKFLOW_SCHEDULER_SCHEDULE")) require.NoError(t, os.Unsetenv("CCF_WORKFLOW_GRACE_PERIOD_DAYS")) require.NoError(t, os.Unsetenv("CCF_WORKFLOW_OVERDUE_CHECK_ENABLED")) + require.NoError(t, os.Unsetenv("CCF_WORKFLOW_DUE_SOON_ENABLED")) + require.NoError(t, os.Unsetenv("CCF_WORKFLOW_DUE_SOON_SCHEDULE")) + require.NoError(t, os.Unsetenv("CCF_WORKFLOW_TASK_DIGEST_ENABLED")) + require.NoError(t, os.Unsetenv("CCF_WORKFLOW_TASK_DIGEST_SCHEDULE")) config, err := LoadWorkflowConfig("") require.NoError(t, err) @@ -31,6 +39,10 @@ func TestLoadWorkflowConfig_Defaults(t *testing.T) { assert.Equal(t, "@every 15m", config.Schedule) assert.Equal(t, 7, config.GracePeriodDays) assert.True(t, config.OverdueCheckEnabled) + assert.False(t, config.DueSoonEnabled) + assert.Equal(t, "0 8 * * *", config.DueSoonSchedule) + assert.False(t, config.TaskDigestEnabled) + assert.Equal(t, "0 8 * * *", config.TaskDigestSchedule) } func TestLoadWorkflowConfig_EnvVars(t *testing.T) { @@ -54,6 +66,27 @@ func TestLoadWorkflowConfig_EnvVars(t *testing.T) { assert.False(t, config.OverdueCheckEnabled) } +func TestLoadWorkflowConfig_NotificationEnvVars(t *testing.T) { + require.NoError(t, os.Setenv("CCF_WORKFLOW_DUE_SOON_ENABLED", "true")) + require.NoError(t, os.Setenv("CCF_WORKFLOW_DUE_SOON_SCHEDULE", "0 9 * * *")) + require.NoError(t, os.Setenv("CCF_WORKFLOW_TASK_DIGEST_ENABLED", "true")) + require.NoError(t, os.Setenv("CCF_WORKFLOW_TASK_DIGEST_SCHEDULE", "0 7 * * *")) + defer func() { + _ = os.Unsetenv("CCF_WORKFLOW_DUE_SOON_ENABLED") + _ = os.Unsetenv("CCF_WORKFLOW_DUE_SOON_SCHEDULE") + _ = os.Unsetenv("CCF_WORKFLOW_TASK_DIGEST_ENABLED") + _ = os.Unsetenv("CCF_WORKFLOW_TASK_DIGEST_SCHEDULE") + }() + + config, err := LoadWorkflowConfig("") + require.NoError(t, err) + + assert.True(t, config.DueSoonEnabled) + assert.Equal(t, "0 9 * * *", config.DueSoonSchedule) + assert.True(t, config.TaskDigestEnabled) + assert.Equal(t, "0 7 * * *", config.TaskDigestSchedule) +} + func TestLoadWorkflowConfig_File(t *testing.T) { content := ` scheduler_enabled: true diff --git a/internal/service/email/templates/service_test.go b/internal/service/email/templates/service_test.go index 82ad3016..0130151a 100644 --- a/internal/service/email/templates/service_test.go +++ b/internal/service/email/templates/service_test.go @@ -132,6 +132,67 @@ func TestTemplateService_WorkflowTaskDueSoon(t *testing.T) { require.Contains(t, text, "TOMORROW") } +func TestTemplateService_WorkflowTaskDigest_WithTasks(t *testing.T) { + service, err := NewTemplateService() + require.NoError(t, err) + + pendingDue := "2026-03-15" + overdueDue := "2026-02-01" + data := TemplateData{ + "UserName": "Alice Smith", + "PeriodLabel": "Daily digest — Wednesday, 19 February 2026", + "PendingTasks": []map[string]interface{}{ + { + "StepTitle": "Submit Evidence", + "WorkflowTitle": "SOC2 Audit", + "WorkflowInstanceTitle": "SOC2 2026", + "DueDate": &pendingDue, + "StepURL": "https://app.example.com/steps/abc", + }, + }, + "OverdueTasks": []map[string]interface{}{ + { + "StepTitle": "Review Policy", + "WorkflowTitle": "Annual Audit", + "WorkflowInstanceTitle": "Audit 2026", + "DueDate": &overdueDue, + "StepURL": "https://app.example.com/steps/xyz", + }, + }, + } + + html, text, err := service.Use("workflow-task-digest", data) + require.NoError(t, err) + require.NotEmpty(t, html) + require.NotEmpty(t, text) + require.Contains(t, html, "Alice Smith") + require.Contains(t, html, "Submit Evidence") + require.Contains(t, html, "Review Policy") + require.Contains(t, html, "SOC2 Audit") + require.Contains(t, text, "Alice Smith") + require.Contains(t, text, "Submit Evidence") + require.Contains(t, text, "PENDING") + require.Contains(t, text, "OVERDUE") +} + +func TestTemplateService_WorkflowTaskDigest_EmptyTasks(t *testing.T) { + service, err := NewTemplateService() + require.NoError(t, err) + + data := TemplateData{ + "UserName": "Bob", + "PeriodLabel": "Daily digest — Wednesday, 19 February 2026", + "PendingTasks": []map[string]interface{}{}, + "OverdueTasks": []map[string]interface{}{}, + } + + html, text, err := service.Use("workflow-task-digest", data) + require.NoError(t, err) + require.NotEmpty(t, html) + require.NotEmpty(t, text) + require.Contains(t, html, "Bob") +} + func TestTemplateService_ListTemplates(t *testing.T) { service, err := NewTemplateService() require.NoError(t, err, "Failed to create template service") diff --git a/internal/service/email/templates/templates/workflow-task-digest.html b/internal/service/email/templates/templates/workflow-task-digest.html new file mode 100644 index 00000000..e6043ba7 --- /dev/null +++ b/internal/service/email/templates/templates/workflow-task-digest.html @@ -0,0 +1,132 @@ + + + + + + Your Workflow Task Summary + + + + + + diff --git a/internal/service/email/templates/templates/workflow-task-digest.txt b/internal/service/email/templates/templates/workflow-task-digest.txt new file mode 100644 index 00000000..d32892b1 --- /dev/null +++ b/internal/service/email/templates/templates/workflow-task-digest.txt @@ -0,0 +1,31 @@ +Your Workflow Task Summary +========================== + +Hello {{.UserName}}, + +Here is a summary of your workflow tasks that need attention. +Period: {{.PeriodLabel}} + +{{if .OverdueTasks}} +OVERDUE TASKS ({{len .OverdueTasks}}) +-------------------------------------- +{{range .OverdueTasks}} +- {{.StepTitle}} + Workflow: {{.WorkflowTitle}}{{if .WorkflowInstanceTitle}} / {{.WorkflowInstanceTitle}}{{end}} + {{if .DueDate}}Due: {{.DueDate}}{{end}} + {{if .StepURL}}Link: {{.StepURL}}{{end}} +{{end}} +{{end}} +{{if .PendingTasks}} +PENDING TASKS ({{len .PendingTasks}}) +-------------------------------------- +{{range .PendingTasks}} +- {{.StepTitle}} + Workflow: {{.WorkflowTitle}}{{if .WorkflowInstanceTitle}} / {{.WorkflowInstanceTitle}}{{end}} + {{if .DueDate}}Due: {{.DueDate}}{{end}} + {{if .StepURL}}Link: {{.StepURL}}{{end}} +{{end}} +{{end}} +--- +This is an automated digest from Compliance Framework. +To unsubscribe, update your notification preferences in your account settings. diff --git a/internal/service/worker/jobs.go b/internal/service/worker/jobs.go index 970e5c4e..a3393061 100644 --- a/internal/service/worker/jobs.go +++ b/internal/service/worker/jobs.go @@ -8,6 +8,7 @@ import ( "github.com/compliance-framework/api/internal/service/email/types" "github.com/riverqueue/river" + "gorm.io/gorm" ) // Job types for email processing @@ -578,12 +579,11 @@ func JobInsertOptionsWithRetry(queue string, maxAttempts int) *river.InsertOpts return &river.InsertOpts{ Queue: queue, MaxAttempts: maxAttempts, - // River uses exponential backoff by default } } // Workers returns all workers as work functions with dependencies injected -func Workers(emailService EmailService, digestService DigestService, userRepo UserRepository, logger Logger) *river.Workers { +func Workers(emailService EmailService, digestService DigestService, userRepo UserRepository, db *gorm.DB, logger Logger) *river.Workers { workers := river.NewWorkers() // Create worker instances with dependencies @@ -606,6 +606,11 @@ func Workers(emailService EmailService, digestService DigestService, userRepo Us workflowTaskDueSoonWorker := NewWorkflowTaskDueSoonWorker(emailService, userRepo, logger) river.AddWorker(workers, river.WorkFunc(workflowTaskDueSoonWorker.Work)) + + if db != nil { + workflowTaskDigestWorker := NewWorkflowTaskDigestWorker(db, emailService, userRepo, logger) + river.AddWorker(workers, river.WorkFunc(workflowTaskDigestWorker.Work)) + } } return workers diff --git a/internal/service/worker/service.go b/internal/service/worker/service.go index e7f2bc6c..2af2d4ee 100644 --- a/internal/service/worker/service.go +++ b/internal/service/worker/service.go @@ -196,7 +196,7 @@ func NewServiceWithDigest( // Register workers with dependencies injected // We start with the email/digest workers userRepo := NewGORMUserRepository(db) - workers := Workers(emailSvc, digestSvc, userRepo, logger) + workers := Workers(emailSvc, digestSvc, userRepo, db, logger) // Add workflow workers river.AddWorker(workers, river.WorkFunc(workflowExecutionWorker.Work)) @@ -207,6 +207,10 @@ func NewServiceWithDigest( dueSoonCheckerWorker := NewDueSoonCheckerWorker(db, clientProxy, logger) river.AddWorker(workers, river.WorkFunc(dueSoonCheckerWorker.Work)) + // Add workflow task digest checker worker + digestCheckerWorker := NewWorkflowTaskDigestCheckerWorker(db, clientProxy, logger) + river.AddWorker(workers, river.WorkFunc(digestCheckerWorker.Work)) + // Configure periodic jobs periodicJobs := periodicJobsFromConfig(digestCfg, logger) @@ -398,11 +402,11 @@ func parseCronScheduleWithFallback(cronSchedule string, fallback string, jobName return schedule } -func NewDueSoonCheckerPeriodicJob(logger *zap.SugaredLogger) *river.PeriodicJob { - schedule := parseCronScheduleWithFallback("0 8 * * *", "0 8 * * *", "due-soon checker", logger) +func NewDueSoonCheckerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob { + sched := parseCronScheduleWithFallback(schedule, "0 8 * * *", "due-soon checker", logger) return river.NewPeriodicJob( - schedule, + sched, func() (river.JobArgs, *river.InsertOpts) { return &DueSoonCheckerArgs{}, &river.InsertOpts{ Queue: "email", @@ -415,6 +419,23 @@ func NewDueSoonCheckerPeriodicJob(logger *zap.SugaredLogger) *river.PeriodicJob ) } +func NewWorkflowTaskDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob { + sched := parseCronScheduleWithFallback(schedule, "0 8 * * *", "workflow task digest", logger) + + return river.NewPeriodicJob( + sched, + func() (river.JobArgs, *river.InsertOpts) { + return &WorkflowTaskDigestCheckerArgs{}, &river.InsertOpts{ + Queue: "digest", + MaxAttempts: 3, + } + }, + &river.PeriodicJobOpts{ + RunOnStart: false, + }, + ) +} + func periodicJobsFromConfig(cfg *config.Config, logger *zap.SugaredLogger) []*river.PeriodicJob { var periodicJobs []*river.PeriodicJob if cfg == nil { @@ -425,7 +446,12 @@ func periodicJobsFromConfig(cfg *config.Config, logger *zap.SugaredLogger) []*ri } if cfg.Workflow != nil && cfg.Workflow.SchedulerEnabled { periodicJobs = append(periodicJobs, NewWorkflowSchedulerPeriodicJob(cfg.Workflow.Schedule, logger)) - periodicJobs = append(periodicJobs, NewDueSoonCheckerPeriodicJob(logger)) + } + if cfg.Workflow != nil && cfg.Workflow.DueSoonEnabled { + periodicJobs = append(periodicJobs, NewDueSoonCheckerPeriodicJob(cfg.Workflow.DueSoonSchedule, logger)) + } + if cfg.Workflow != nil && cfg.Workflow.TaskDigestEnabled { + periodicJobs = append(periodicJobs, NewWorkflowTaskDigestPeriodicJob(cfg.Workflow.TaskDigestSchedule, logger)) } return periodicJobs } diff --git a/internal/service/worker/service_test.go b/internal/service/worker/service_test.go index c7f21a00..1fe4954d 100644 --- a/internal/service/worker/service_test.go +++ b/internal/service/worker/service_test.go @@ -330,7 +330,7 @@ func TestWorkers(t *testing.T) { mockDigestService := &MockDigestService{} mockLogger := &MockLogger{} - workers := Workers(mockEmailService, mockDigestService, nil, mockLogger) + workers := Workers(mockEmailService, mockDigestService, nil, nil, mockLogger) assert.NotNil(t, workers) } @@ -359,23 +359,43 @@ func TestParseCronScheduleWithFallback_InvalidUsesFallback(t *testing.T) { func TestPeriodicJobsFromConfig_WorkflowSchedulerEnabledGuard(t *testing.T) { logger := zap.NewNop().Sugar() + // Nothing enabled → 0 jobs jobs := periodicJobsFromConfig(&config.Config{ DigestEnabled: false, Workflow: &config.WorkflowConfig{ - SchedulerEnabled: false, - Schedule: "@every 15m", + SchedulerEnabled: false, + Schedule: "@every 15m", + DueSoonEnabled: false, + TaskDigestEnabled: false, }, }, logger) assert.Len(t, jobs, 0) + // Scheduler only → 1 job jobs = periodicJobsFromConfig(&config.Config{ DigestEnabled: false, Workflow: &config.WorkflowConfig{ - SchedulerEnabled: true, - Schedule: "@every 15m", + SchedulerEnabled: true, + Schedule: "@every 15m", + DueSoonEnabled: false, + TaskDigestEnabled: false, }, }, logger) - assert.Len(t, jobs, 2) + assert.Len(t, jobs, 1) + + // Scheduler + due-soon + task digest → 3 jobs + jobs = periodicJobsFromConfig(&config.Config{ + DigestEnabled: false, + Workflow: &config.WorkflowConfig{ + SchedulerEnabled: true, + Schedule: "@every 15m", + DueSoonEnabled: true, + DueSoonSchedule: "0 8 * * *", + TaskDigestEnabled: true, + TaskDigestSchedule: "0 8 * * *", + }, + }, logger) + assert.Len(t, jobs, 3) } func TestWorkflowSchedulerPeriodicJobConstructor_InsertOpts(t *testing.T) { diff --git a/internal/service/worker/workflow_task_digest_checker.go b/internal/service/worker/workflow_task_digest_checker.go new file mode 100644 index 00000000..86430a2e --- /dev/null +++ b/internal/service/worker/workflow_task_digest_checker.go @@ -0,0 +1,87 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/compliance-framework/api/internal/service/relational" + "github.com/riverqueue/river" + "gorm.io/gorm" +) + +// WorkflowTaskDigestCheckerArgs represents the arguments for the periodic digest checker job +type WorkflowTaskDigestCheckerArgs struct{} + +// Kind returns the job kind for River +func (WorkflowTaskDigestCheckerArgs) Kind() string { return "workflow_task_digest_checker" } + +// Timeout returns the timeout for the digest checker job +func (WorkflowTaskDigestCheckerArgs) Timeout() time.Duration { return 5 * time.Minute } + +// WorkflowTaskDigestCheckerWorker queries all subscribed users and enqueues per-user digest jobs +type WorkflowTaskDigestCheckerWorker struct { + db *gorm.DB + client RiverInserter + logger Logger +} + +// NewWorkflowTaskDigestCheckerWorker creates a new WorkflowTaskDigestCheckerWorker +func NewWorkflowTaskDigestCheckerWorker(db *gorm.DB, client RiverInserter, logger Logger) *WorkflowTaskDigestCheckerWorker { + return &WorkflowTaskDigestCheckerWorker{ + db: db, + client: client, + logger: logger, + } +} + +// Work queries all users with TaskDailyDigestSubscribed=true and enqueues a WorkflowTaskDigestArgs job for each +func (w *WorkflowTaskDigestCheckerWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskDigestCheckerArgs]) error { + var users []relational.User + if err := w.db.WithContext(ctx). + Where("task_daily_digest_subscribed = ? AND deleted_at IS NULL", true). + Find(&users).Error; err != nil { + return fmt.Errorf("workflow-task-digest-checker: failed to query subscribed users: %w", err) + } + + if len(users) == 0 { + w.logger.Infow("WorkflowTaskDigestCheckerWorker: no subscribed users found") + return nil + } + + params := make([]river.InsertManyParams, 0, len(users)) + for i := range users { + if users[i].ID == nil { + continue + } + params = append(params, river.InsertManyParams{ + Args: WorkflowTaskDigestArgs{UserID: users[i].ID.String()}, + InsertOpts: &river.InsertOpts{ + Queue: "digest", + MaxAttempts: 3, + }, + }) + } + + if len(params) == 0 { + return nil + } + + results, err := w.client.InsertMany(ctx, params) + if err != nil { + return fmt.Errorf("workflow-task-digest-checker: failed to enqueue digest jobs: %w", err) + } + + inserted := 0 + for _, r := range results { + if r != nil && r.Job != nil { + inserted++ + } + } + + w.logger.Infow("WorkflowTaskDigestCheckerWorker: enqueued digest jobs", + "total_users", len(users), + "enqueued", inserted, + ) + return nil +} diff --git a/internal/service/worker/workflow_task_digest_worker.go b/internal/service/worker/workflow_task_digest_worker.go new file mode 100644 index 00000000..c07cc4f6 --- /dev/null +++ b/internal/service/worker/workflow_task_digest_worker.go @@ -0,0 +1,178 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/compliance-framework/api/internal/service/email/types" + "github.com/compliance-framework/api/internal/service/relational/workflows" + "github.com/riverqueue/river" + "gorm.io/gorm" +) + +// DigestTask represents a single task entry in the digest email +type DigestTask struct { + StepTitle string + WorkflowTitle string + WorkflowInstanceTitle string + DueDate *string + StepURL string +} + +// WorkflowTaskDigestWorker sends a per-user digest of pending and overdue workflow tasks +type WorkflowTaskDigestWorker struct { + db *gorm.DB + emailService EmailService + userRepo UserRepository + logger Logger +} + +// NewWorkflowTaskDigestWorker creates a new WorkflowTaskDigestWorker +func NewWorkflowTaskDigestWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, logger Logger) *WorkflowTaskDigestWorker { + return &WorkflowTaskDigestWorker{ + db: db, + emailService: emailService, + userRepo: userRepo, + logger: logger, + } +} + +// Work sends a digest email for the user identified by job.Args.UserID +func (w *WorkflowTaskDigestWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskDigestArgs]) error { + args := job.Args + + user, err := w.userRepo.FindUserByID(ctx, args.UserID) + if err != nil { + w.logger.Warnw("WorkflowTaskDigestWorker: user not found, skipping", + "user_id", args.UserID, + "error", err, + ) + return nil + } + + if !user.TaskDailyDigestSubscribed { + w.logger.Debugw("WorkflowTaskDigestWorker: user not subscribed to digest, skipping", + "user_id", args.UserID, + ) + return nil + } + + now := time.Now() + + var steps []workflows.StepExecution + if err := w.db.WithContext(ctx). + Preload("WorkflowExecution.WorkflowInstance.WorkflowDefinition"). + Preload("WorkflowStepDefinition"). + Where("assigned_to_type = ? AND assigned_to_id = ? AND status IN ?", + workflows.AssignmentTypeUser.String(), + args.UserID, + []string{ + workflows.StepStatusPending.String(), + workflows.StepStatusInProgress.String(), + workflows.StepStatusOverdue.String(), + }, + ). + Find(&steps).Error; err != nil { + return fmt.Errorf("WorkflowTaskDigestWorker: failed to query steps for user %s: %w", args.UserID, err) + } + + if len(steps) == 0 { + w.logger.Debugw("WorkflowTaskDigestWorker: no tasks for user, skipping", + "user_id", args.UserID, + ) + return nil + } + + var pendingTasks []DigestTask + var overdueTasks []DigestTask + + for i := range steps { + step := &steps[i] + task := buildDigestTask(step) + + if step.Status == workflows.StepStatusOverdue.String() || + (step.DueDate != nil && step.DueDate.Before(now)) { + overdueTasks = append(overdueTasks, task) + } else { + pendingTasks = append(pendingTasks, task) + } + } + + userName := user.FirstName + if user.LastName != "" { + userName = user.FirstName + " " + user.LastName + } + + periodLabel := "Daily digest — " + now.Format("Monday, 2 January 2006") + + templateData := map[string]interface{}{ + "UserName": userName, + "PeriodLabel": periodLabel, + "PendingTasks": pendingTasks, + "OverdueTasks": overdueTasks, + } + + htmlBody, textBody, err := w.emailService.UseTemplate("workflow-task-digest", templateData) + if err != nil { + w.logger.Errorw("WorkflowTaskDigestWorker: failed to render template", + "user_id", args.UserID, + "error", err, + ) + return fmt.Errorf("failed to render workflow-task-digest template: %w", err) + } + + message := &types.Message{ + From: w.emailService.GetDefaultFromAddress(), + To: []string{user.Email}, + Subject: fmt.Sprintf("Your workflow task summary — %s", now.Format("2 Jan 2006")), + HTMLBody: htmlBody, + TextBody: textBody, + } + + result, err := w.emailService.Send(ctx, message) + if err != nil { + w.logger.Errorw("WorkflowTaskDigestWorker: failed to send email", + "user_id", args.UserID, + "error", err, + ) + return fmt.Errorf("failed to send workflow-task-digest email: %w", err) + } + + if !result.Success { + w.logger.Errorw("WorkflowTaskDigestWorker: email send reported failure", + "user_id", args.UserID, + "error", result.Error, + ) + return fmt.Errorf("workflow-task-digest email send failed: %s", result.Error) + } + + w.logger.Infow("WorkflowTaskDigestWorker: digest email sent", + "user_id", args.UserID, + "pending", len(pendingTasks), + "overdue", len(overdueTasks), + "message_id", result.MessageID, + ) + + return nil +} + +func buildDigestTask(step *workflows.StepExecution) DigestTask { + task := DigestTask{} + + if step.WorkflowStepDefinition != nil { + task.StepTitle = step.WorkflowStepDefinition.Name + } + if step.WorkflowExecution != nil && step.WorkflowExecution.WorkflowInstance != nil { + if step.WorkflowExecution.WorkflowInstance.WorkflowDefinition != nil { + task.WorkflowTitle = step.WorkflowExecution.WorkflowInstance.WorkflowDefinition.Name + } + task.WorkflowInstanceTitle = step.WorkflowExecution.WorkflowInstance.Name + } + if step.DueDate != nil { + formatted := step.DueDate.Format("2006-01-02") + task.DueDate = &formatted + } + + return task +} diff --git a/internal/service/worker/workflow_task_digest_worker_test.go b/internal/service/worker/workflow_task_digest_worker_test.go new file mode 100644 index 00000000..8d187d3d --- /dev/null +++ b/internal/service/worker/workflow_task_digest_worker_test.go @@ -0,0 +1,53 @@ +package worker + +import ( + "context" + "errors" + "testing" + + "github.com/riverqueue/river" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func makeDigestJob(args WorkflowTaskDigestArgs) *river.Job[WorkflowTaskDigestArgs] { + return &river.Job[WorkflowTaskDigestArgs]{Args: args} +} + +func TestWorkflowTaskDigestWorker_UnsubscribedUser_Skips(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + user := NotificationUser{ + ID: "user-1", + Email: "alice@example.com", + FirstName: "Alice", + TaskDailyDigestSubscribed: false, + } + mockRepo.On("FindUserByID", ctx, "user-1").Return(user, nil) + + w := NewWorkflowTaskDigestWorker(nil, mockEmail, mockRepo, mockLog) + + err := w.Work(ctx, makeDigestJob(WorkflowTaskDigestArgs{UserID: "user-1"})) + assert.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send") +} + +func TestWorkflowTaskDigestWorker_UserNotFound_Skips(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + mockRepo.On("FindUserByID", ctx, "missing").Return(NotificationUser{}, errors.New("not found")) + + w := NewWorkflowTaskDigestWorker(nil, mockEmail, mockRepo, mockLog) + + err := w.Work(ctx, makeDigestJob(WorkflowTaskDigestArgs{UserID: "missing"})) + assert.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send") +} From a8230e1c609ef430dcfc14f8370373eed7d78f86 Mon Sep 17 00:00:00 2001 From: Gustavo Carvalho Date: Thu, 19 Feb 2026 08:00:48 -0300 Subject: [PATCH 03/13] feat: execution failed email Signed-off-by: Gustavo Carvalho --- .../service/email/templates/service_test.go | 57 +++++ .../templates/workflow-execution-failed.html | 195 ++++++++++++++++++ .../templates/workflow-execution-failed.txt | 32 +++ internal/service/worker/jobs.go | 21 +- internal/service/worker/service.go | 25 +++ .../workflow_execution_failed_worker.go | 163 +++++++++++++++ .../workflow_execution_failed_worker_test.go | 55 +++++ internal/workflow/executor.go | 13 ++ workflow.yaml | 6 +- 9 files changed, 563 insertions(+), 4 deletions(-) create mode 100644 internal/service/email/templates/templates/workflow-execution-failed.html create mode 100644 internal/service/email/templates/templates/workflow-execution-failed.txt create mode 100644 internal/service/worker/workflow_execution_failed_worker.go create mode 100644 internal/service/worker/workflow_execution_failed_worker_test.go diff --git a/internal/service/email/templates/service_test.go b/internal/service/email/templates/service_test.go index 0130151a..baf9f814 100644 --- a/internal/service/email/templates/service_test.go +++ b/internal/service/email/templates/service_test.go @@ -132,6 +132,63 @@ func TestTemplateService_WorkflowTaskDueSoon(t *testing.T) { require.Contains(t, text, "TOMORROW") } +func TestTemplateService_WorkflowExecutionFailed_WithData(t *testing.T) { + service, err := NewTemplateService() + require.NoError(t, err) + + data := TemplateData{ + "RecipientName": "Alice Smith", + "WorkflowTitle": "SOC2 Audit", + "WorkflowInstanceName": "SOC2 2026", + "ExecutionID": "exec-abc-123", + "FailureReason": "2 of 5 steps failed", + "FailedAt": "Wed, 19 Feb 2026 08:00:00 UTC", + "FailedSteps": 2, + "CompletedSteps": 3, + "TotalSteps": 5, + "WorkflowURL": "https://app.example.com/workflows/abc", + } + + html, text, err := service.Use("workflow-execution-failed", data) + require.NoError(t, err) + require.NotEmpty(t, html) + require.NotEmpty(t, text) + require.Contains(t, html, "Alice Smith") + require.Contains(t, html, "SOC2 Audit") + require.Contains(t, html, "SOC2 2026") + require.Contains(t, html, "2 of 5 steps failed") + require.Contains(t, html, "exec-abc-123") + require.Contains(t, text, "Alice Smith") + require.Contains(t, text, "SOC2 Audit") + require.Contains(t, text, "2 of 5 steps failed") + require.Contains(t, text, "FAILED") +} + +func TestTemplateService_WorkflowExecutionFailed_NoURL(t *testing.T) { + service, err := NewTemplateService() + require.NoError(t, err) + + data := TemplateData{ + "RecipientName": "Bob", + "WorkflowTitle": "Annual Audit", + "WorkflowInstanceName": "Audit 2026", + "ExecutionID": "exec-xyz-456", + "FailureReason": "1 of 3 steps failed", + "FailedAt": "Wed, 19 Feb 2026 09:00:00 UTC", + "FailedSteps": 1, + "CompletedSteps": 2, + "TotalSteps": 3, + "WorkflowURL": "", + } + + html, text, err := service.Use("workflow-execution-failed", data) + require.NoError(t, err) + require.NotEmpty(t, html) + require.NotEmpty(t, text) + require.Contains(t, html, "Bob") + require.NotContains(t, html, "View Workflow Instance") +} + func TestTemplateService_WorkflowTaskDigest_WithTasks(t *testing.T) { service, err := NewTemplateService() require.NoError(t, err) diff --git a/internal/service/email/templates/templates/workflow-execution-failed.html b/internal/service/email/templates/templates/workflow-execution-failed.html new file mode 100644 index 00000000..e8f1753c --- /dev/null +++ b/internal/service/email/templates/templates/workflow-execution-failed.html @@ -0,0 +1,195 @@ + + + + + + Workflow Execution Failed + + + + + + diff --git a/internal/service/email/templates/templates/workflow-execution-failed.txt b/internal/service/email/templates/templates/workflow-execution-failed.txt new file mode 100644 index 00000000..1f7cfa72 --- /dev/null +++ b/internal/service/email/templates/templates/workflow-execution-failed.txt @@ -0,0 +1,32 @@ +WORKFLOW EXECUTION FAILED +========================= + +Hi {{.RecipientName}}, + +A workflow execution has failed and may require your attention. + +WORKFLOW: {{.WorkflowTitle}} +INSTANCE: {{.WorkflowInstanceName}} + +EXECUTION SUMMARY +----------------- +Failed Steps: {{.FailedSteps}} +Completed Steps: {{.CompletedSteps}} +Total Steps: {{.TotalSteps}} + +EXECUTION DETAILS +----------------- +Execution ID: {{.ExecutionID}} +Failed At: {{.FailedAt}} + +FAILURE REASON +-------------- +{{.FailureReason}} +{{if .WorkflowURL}} +VIEW WORKFLOW INSTANCE +---------------------- +{{.WorkflowURL}} +{{end}} +--- +You are receiving this because you are the owner of this workflow instance. +Compliance Framework diff --git a/internal/service/worker/jobs.go b/internal/service/worker/jobs.go index a3393061..c8cabd2e 100644 --- a/internal/service/worker/jobs.go +++ b/internal/service/worker/jobs.go @@ -20,9 +20,10 @@ const ( // Job types for workflow notifications const ( - JobTypeWorkflowTaskAssigned = "workflow_task_assigned" - JobTypeWorkflowTaskDueSoon = "workflow_task_due_soon" - JobTypeWorkflowTaskDigest = "workflow_task_digest" + JobTypeWorkflowTaskAssigned = "workflow_task_assigned" + JobTypeWorkflowTaskDueSoon = "workflow_task_due_soon" + JobTypeWorkflowTaskDigest = "workflow_task_digest" + JobTypeWorkflowExecutionFailed = "workflow_execution_failed" ) // WorkflowTaskAssignedArgs represents the arguments for a new-task-assigned notification email @@ -52,6 +53,11 @@ type WorkflowTaskDigestArgs struct { UserID string `json:"user_id"` } +// WorkflowExecutionFailedArgs represents the arguments for a workflow-execution-failed notification email +type WorkflowExecutionFailedArgs struct { + WorkflowExecutionID string `json:"workflow_execution_id"` +} + // Kind returns the job kind for River func (WorkflowTaskAssignedArgs) Kind() string { return JobTypeWorkflowTaskAssigned } @@ -61,6 +67,9 @@ func (WorkflowTaskDueSoonArgs) Kind() string { return JobTypeWorkflowTaskDueSoon // Kind returns the job kind for River func (WorkflowTaskDigestArgs) Kind() string { return JobTypeWorkflowTaskDigest } +// Kind returns the job kind for River +func (WorkflowExecutionFailedArgs) Kind() string { return JobTypeWorkflowExecutionFailed } + // Timeout returns the timeout for workflow task assigned jobs func (WorkflowTaskAssignedArgs) Timeout() time.Duration { return 30 * time.Second } @@ -70,6 +79,9 @@ func (WorkflowTaskDueSoonArgs) Timeout() time.Duration { return 30 * time.Second // Timeout returns the timeout for workflow task digest jobs func (WorkflowTaskDigestArgs) Timeout() time.Duration { return 5 * time.Minute } +// Timeout returns the timeout for workflow execution failed jobs +func (WorkflowExecutionFailedArgs) Timeout() time.Duration { return 30 * time.Second } + // SendEmailArgs represents the arguments for sending an email type SendEmailArgs struct { // Email message fields @@ -610,6 +622,9 @@ func Workers(emailService EmailService, digestService DigestService, userRepo Us if db != nil { workflowTaskDigestWorker := NewWorkflowTaskDigestWorker(db, emailService, userRepo, logger) river.AddWorker(workers, river.WorkFunc(workflowTaskDigestWorker.Work)) + + workflowExecutionFailedWorker := NewWorkflowExecutionFailedWorker(db, emailService, userRepo, logger) + river.AddWorker(workers, river.WorkFunc(workflowExecutionFailedWorker.Work)) } } diff --git a/internal/service/worker/service.go b/internal/service/worker/service.go index 2af2d4ee..6c516950 100644 --- a/internal/service/worker/service.go +++ b/internal/service/worker/service.go @@ -529,6 +529,31 @@ func (s *Service) EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution return nil } +// EnqueueWorkflowExecutionFailed enqueues a workflow-execution-failed notification email job. +// Implements the workflow.NotificationEnqueuer interface. +func (s *Service) EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error { + if !s.config.Enabled || s.client == nil { + return nil + } + + if execution == nil || execution.ID == nil { + return nil + } + + args := &WorkflowExecutionFailedArgs{ + WorkflowExecutionID: execution.ID.String(), + } + + _, err := s.client.InsertMany(ctx, []river.InsertManyParams{ + {Args: args, InsertOpts: JobInsertOptionsForWorkflowNotification()}, + }) + if err != nil { + return fmt.Errorf("failed to enqueue workflow-execution-failed job: %w", err) + } + + return nil +} + // EnqueueSendEmail enqueues a send email job func (s *Service) EnqueueSendEmail(ctx context.Context, args *SendEmailArgs) error { if !s.config.Enabled { diff --git a/internal/service/worker/workflow_execution_failed_worker.go b/internal/service/worker/workflow_execution_failed_worker.go new file mode 100644 index 00000000..70f452c6 --- /dev/null +++ b/internal/service/worker/workflow_execution_failed_worker.go @@ -0,0 +1,163 @@ +package worker + +import ( + "context" + "fmt" + "time" + + "github.com/compliance-framework/api/internal/service/email/types" + "github.com/compliance-framework/api/internal/service/relational/workflows" + "github.com/google/uuid" + "github.com/riverqueue/river" + "gorm.io/gorm" +) + +// WorkflowExecutionFailedWorker sends a failure notification email to the workflow instance creator +type WorkflowExecutionFailedWorker struct { + db *gorm.DB + emailService EmailService + userRepo UserRepository + logger Logger +} + +// NewWorkflowExecutionFailedWorker creates a new WorkflowExecutionFailedWorker +func NewWorkflowExecutionFailedWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, logger Logger) *WorkflowExecutionFailedWorker { + return &WorkflowExecutionFailedWorker{ + db: db, + emailService: emailService, + userRepo: userRepo, + logger: logger, + } +} + +// Work sends a failure notification email for the workflow execution identified by job.Args.WorkflowExecutionID +func (w *WorkflowExecutionFailedWorker) Work(ctx context.Context, job *river.Job[WorkflowExecutionFailedArgs]) error { + args := job.Args + + executionID, err := uuid.Parse(args.WorkflowExecutionID) + if err != nil { + w.logger.Warnw("WorkflowExecutionFailedWorker: invalid execution ID, skipping", + "workflow_execution_id", args.WorkflowExecutionID, + "error", err, + ) + return nil + } + + var execution workflows.WorkflowExecution + if err := w.db.WithContext(ctx). + Preload("WorkflowInstance.WorkflowDefinition"). + Preload("StepExecutions"). + First(&execution, "id = ?", executionID).Error; err != nil { + return fmt.Errorf("WorkflowExecutionFailedWorker: failed to load execution %s: %w", args.WorkflowExecutionID, err) + } + + if execution.WorkflowInstance == nil { + w.logger.Warnw("WorkflowExecutionFailedWorker: workflow instance not found, skipping", + "workflow_execution_id", args.WorkflowExecutionID, + ) + return nil + } + + instance := execution.WorkflowInstance + if instance.CreatedByID == nil { + w.logger.Warnw("WorkflowExecutionFailedWorker: instance has no CreatedByID, skipping", + "workflow_execution_id", args.WorkflowExecutionID, + "workflow_instance_id", instance.ID, + ) + return nil + } + + recipient, err := w.userRepo.FindUserByID(ctx, instance.CreatedByID.String()) + if err != nil { + w.logger.Warnw("WorkflowExecutionFailedWorker: creator user not found, skipping", + "workflow_execution_id", args.WorkflowExecutionID, + "user_id", instance.CreatedByID, + "error", err, + ) + return nil + } + + workflowTitle := "" + if instance.WorkflowDefinition != nil { + workflowTitle = instance.WorkflowDefinition.Name + } + + failedSteps := 0 + completedSteps := 0 + for _, step := range execution.StepExecutions { + switch step.Status { + case workflows.StepStatusFailed.String(): + failedSteps++ + case workflows.StepStatusCompleted.String(): + completedSteps++ + } + } + totalSteps := len(execution.StepExecutions) + + failedAt := "unknown" + if execution.FailedAt != nil { + failedAt = execution.FailedAt.Format(time.RFC1123) + } + + recipientName := recipient.FirstName + if recipient.LastName != "" { + recipientName = recipient.FirstName + " " + recipient.LastName + } + + templateData := map[string]interface{}{ + "RecipientName": recipientName, + "WorkflowTitle": workflowTitle, + "WorkflowInstanceName": instance.Name, + "ExecutionID": execution.ID.String(), + "FailureReason": execution.FailureReason, + "FailedAt": failedAt, + "FailedSteps": failedSteps, + "CompletedSteps": completedSteps, + "TotalSteps": totalSteps, + "WorkflowURL": "", + } + + htmlBody, textBody, err := w.emailService.UseTemplate("workflow-execution-failed", templateData) + if err != nil { + w.logger.Errorw("WorkflowExecutionFailedWorker: failed to render template", + "workflow_execution_id", args.WorkflowExecutionID, + "error", err, + ) + return fmt.Errorf("failed to render workflow-execution-failed template: %w", err) + } + + message := &types.Message{ + From: w.emailService.GetDefaultFromAddress(), + To: []string{recipient.Email}, + Subject: fmt.Sprintf("Workflow execution failed: %s", instance.Name), + HTMLBody: htmlBody, + TextBody: textBody, + } + + result, err := w.emailService.Send(ctx, message) + if err != nil { + w.logger.Errorw("WorkflowExecutionFailedWorker: failed to send email", + "workflow_execution_id", args.WorkflowExecutionID, + "recipient", recipient.Email, + "error", err, + ) + return fmt.Errorf("failed to send workflow-execution-failed email: %w", err) + } + + if !result.Success { + w.logger.Errorw("WorkflowExecutionFailedWorker: email send reported failure", + "workflow_execution_id", args.WorkflowExecutionID, + "recipient", recipient.Email, + "error", result.Error, + ) + return fmt.Errorf("workflow-execution-failed email send failed: %s", result.Error) + } + + w.logger.Infow("WorkflowExecutionFailedWorker: failure notification sent", + "workflow_execution_id", args.WorkflowExecutionID, + "recipient", recipient.Email, + "message_id", result.MessageID, + ) + + return nil +} diff --git a/internal/service/worker/workflow_execution_failed_worker_test.go b/internal/service/worker/workflow_execution_failed_worker_test.go new file mode 100644 index 00000000..ef2b6fb3 --- /dev/null +++ b/internal/service/worker/workflow_execution_failed_worker_test.go @@ -0,0 +1,55 @@ +package worker + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/uuid" + "github.com/riverqueue/river" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func makeFailedJob(args WorkflowExecutionFailedArgs) *river.Job[WorkflowExecutionFailedArgs] { + return &river.Job[WorkflowExecutionFailedArgs]{Args: args} +} + +func TestWorkflowExecutionFailedWorker_InvalidExecutionID_Skips(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + w := NewWorkflowExecutionFailedWorker(nil, mockEmail, mockRepo, mockLog) + + err := w.Work(ctx, makeFailedJob(WorkflowExecutionFailedArgs{WorkflowExecutionID: "not-a-uuid"})) + assert.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send") +} + +func TestWorkflowExecutionFailedWorker_UserNotFound_Skips(t *testing.T) { + ctx := context.Background() + + mockEmail := &MockEmailService{} + mockRepo := &MockUserRepository{} + mockLog := zap.NewNop().Sugar() + + userID := uuid.New() + mockRepo.On("FindUserByID", ctx, userID.String()).Return(NotificationUser{}, errors.New("not found")) + + now := time.Now() + _ = userID + _ = now + + // We can't inject a mock DB without a real GORM setup, so we test the + // invalid-UUID and user-not-found paths which don't require DB access. + // The nil-DB path panics on GORM, so we only test the UUID guard here. + w := NewWorkflowExecutionFailedWorker(nil, mockEmail, mockRepo, mockLog) + + err := w.Work(ctx, makeFailedJob(WorkflowExecutionFailedArgs{WorkflowExecutionID: "bad-id"})) + assert.NoError(t, err) + mockEmail.AssertNotCalled(t, "Send") +} diff --git a/internal/workflow/executor.go b/internal/workflow/executor.go index ba7a0cbd..70fa1819 100644 --- a/internal/workflow/executor.go +++ b/internal/workflow/executor.go @@ -16,6 +16,7 @@ import ( // Implemented by the worker service to avoid a direct River dependency in this package. type NotificationEnqueuer interface { EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error + EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error } // DAGExecutor handles the execution of workflow DAGs with dependency resolution @@ -577,6 +578,18 @@ func (e *DAGExecutor) checkWorkflowCompletion(ctx context.Context, workflowExecu return fmt.Errorf("failed to mark workflow as failed: %w", err) } e.logger.Printf("Workflow execution failed: %s", reason) + + // Notify the workflow instance owner + if e.notificationEnqueuer != nil { + execution, execErr := e.workflowExecutionService.GetByID(workflowExecutionID) + if execErr == nil { + if notifyErr := e.notificationEnqueuer.EnqueueWorkflowExecutionFailed(ctx, execution); notifyErr != nil { + e.logger.Printf("Failed to enqueue workflow-execution-failed notification: %v", notifyErr) + } + } else { + e.logger.Printf("Failed to reload execution for failure notification: %v", execErr) + } + } } else { // All steps reached successful terminal states (completed/skipped) if err := e.workflowExecutionService.UpdateStatus(ctx, workflowExecutionID, StatusCompleted.String()); err != nil { diff --git a/workflow.yaml b/workflow.yaml index cd91a32a..2e478131 100644 --- a/workflow.yaml +++ b/workflow.yaml @@ -1 +1,5 @@ -scheduler_enabled: true \ No newline at end of file +scheduler_enabled: true +due_soon_enabled: true +due_soon_schedule: "0 * * * * *" +task_digest_enabled: true +task_digest_schedule: "0 * * * * *" \ No newline at end of file From 63eb9cf161a001e5bb7ce773c0556ff0755dade9 Mon Sep 17 00:00:00 2001 From: Gustavo Carvalho Date: Fri, 20 Feb 2026 07:12:52 -0300 Subject: [PATCH 04/13] fix: fixes through jobs Signed-off-by: Gustavo Carvalho --- .../handler/workflows/workflow_instance.go | 15 +++++ .../templates/workflow-execution-failed.html | 5 +- .../templates/workflow-task-assigned.html | 3 + .../templates/workflow-task-due-soon.html | 9 ++- internal/service/worker/due_soon_checker.go | 4 +- internal/service/worker/jobs.go | 55 +++++++++++++++---- internal/service/worker/service.go | 47 ++++++++++++---- internal/service/worker/service_test.go | 2 +- .../workflow_execution_failed_worker.go | 7 ++- .../workflow_execution_failed_worker_test.go | 4 +- .../workflow_task_assigned_worker_test.go | 8 +-- .../workflow_task_due_soon_worker_test.go | 8 +-- internal/workflow/assignment.go | 3 +- internal/workflow/executor.go | 10 ++++ internal/workflow/manager.go | 12 ++++ internal/workflow/overdue.go | 11 ++++ 16 files changed, 159 insertions(+), 44 deletions(-) diff --git a/internal/api/handler/workflows/workflow_instance.go b/internal/api/handler/workflows/workflow_instance.go index aba99c47..0e504b13 100644 --- a/internal/api/handler/workflows/workflow_instance.go +++ b/internal/api/handler/workflows/workflow_instance.go @@ -10,12 +10,14 @@ import ( type WorkflowInstanceHandler struct { *BaseHandler + db *gorm.DB service *workflows.WorkflowInstanceService } func NewWorkflowInstanceHandler(sugar *zap.SugaredLogger, db *gorm.DB) *WorkflowInstanceHandler { return &WorkflowInstanceHandler{ BaseHandler: NewBaseHandler(sugar), + db: db, service: workflows.NewWorkflowInstanceService(db), } } @@ -80,6 +82,12 @@ func (h *WorkflowInstanceHandler) Create(ctx echo.Context) error { h.sugar.Errorw("Failed to parse system security plan ID", "error", err) return h.HandleServiceError(ctx, err, "parse", "system security plan ID") } + + actorID, _, err := h.GetActorFromClaims(ctx, h.db) + if err != nil { + return HandleError(err) + } + instance := &workflows.WorkflowInstance{ WorkflowDefinitionID: req.WorkflowDefinitionID, Name: req.Name, @@ -88,6 +96,7 @@ func (h *WorkflowInstanceHandler) Create(ctx echo.Context) error { Cadence: req.Cadence, IsActive: true, // Default to active GracePeriodDays: req.GracePeriodDays, + CreatedByID: actorID, } if req.IsActive != nil { @@ -217,6 +226,11 @@ func (h *WorkflowInstanceHandler) Update(ctx echo.Context) error { return HandleError(err) } + actorID, _, err := h.GetActorFromClaims(ctx, h.db) + if err != nil { + return HandleError(err) + } + instance, err := h.service.GetByID(id) if err != nil { return h.HandleServiceError(ctx, err, "get", "workflow instance") @@ -237,6 +251,7 @@ func (h *WorkflowInstanceHandler) Update(ctx echo.Context) error { if req.GracePeriodDays != nil { instance.GracePeriodDays = req.GracePeriodDays } + instance.UpdatedByID = actorID if err := h.service.Update(id, instance); err != nil { return h.HandleServiceError(ctx, err, "update", "workflow instance") diff --git a/internal/service/email/templates/templates/workflow-execution-failed.html b/internal/service/email/templates/templates/workflow-execution-failed.html index e8f1753c..f8dc966a 100644 --- a/internal/service/email/templates/templates/workflow-execution-failed.html +++ b/internal/service/email/templates/templates/workflow-execution-failed.html @@ -181,9 +181,10 @@

⚠ Workflow Execution Failed

{{.FailureReason}}
- {{if .WorkflowURL}} View Workflow Instance - {{end}} +