diff --git a/cmd/subtask/draft.go b/cmd/subtask/draft.go index 96eb85a..7cabaf0 100644 --- a/cmd/subtask/draft.go +++ b/cmd/subtask/draft.go @@ -23,6 +23,7 @@ type DraftCmd struct { Description string `arg:"" optional:"" help:"Task description (or use stdin)"` Base string `name:"base-branch" required:"" help:"Base branch"` Title string `required:"" help:"Short description"` + Priority int `default:"3" help:"Priority 1-5 (P1=critical, P5=low, default P3)"` Model string `help:"Default model for this task (overrides project config)"` Reasoning string `help:"Default reasoning for this task (codex-only; overrides project config)"` Workflow string `help:"Workflow template to use (e.g., collaborative)"` @@ -65,6 +66,16 @@ func (c *DraftCmd) Run() error { if err := workspace.ValidateReasoningLevel(c.Reasoning); err != nil { return err } + + // Validate and normalize priority (0 means use default) + priority := c.Priority + if priority == 0 { + priority = task.DefaultPriority + } + if priority < 1 || priority > 5 { + return fmt.Errorf("priority must be 1-5 (P1=critical, P5=low), got %d", c.Priority) + } + t := &task.Task{ Name: c.Task, Title: c.Title, @@ -74,6 +85,7 @@ func (c *DraftCmd) Run() error { Model: c.Model, Reasoning: c.Reasoning, Schema: 1, + Priority: priority, } if err := t.Save(); err != nil { @@ -103,6 +115,7 @@ func (c *DraftCmd) Run() error { "base_branch": c.Base, "workflow": wf.Name, "title": c.Title, + "priority": priority, "follow_up": c.FollowUp, "model": c.Model, "reasoning": c.Reasoning, diff --git a/cmd/subtask/main.go b/cmd/subtask/main.go index 2189c99..b170a8c 100644 --- a/cmd/subtask/main.go +++ b/cmd/subtask/main.go @@ -30,6 +30,7 @@ type CLI struct { Diff DiffCmd `cmd:"" help:"Show task diff"` Close CloseCmd `cmd:"" help:"Close a task and free workspace"` Merge MergeCmd `cmd:"" help:"Merge task into base branch (marks as merged)"` + Queue QueueCmd `cmd:"" help:"Merge queue operations"` Workspace WorkspaceCmd `cmd:"" help:"Print workspace path for a task"` Review ReviewCmd `cmd:"" help:"Get review of task changes"` Trace LogsCmd `cmd:"" help:"Debug worker runs (tool calls, errors)"` diff --git a/cmd/subtask/queue.go b/cmd/subtask/queue.go new file mode 100644 index 0000000..da8acb4 --- /dev/null +++ b/cmd/subtask/queue.go @@ -0,0 +1,212 @@ +package main + +import ( + "context" + "fmt" + "strings" + + "github.com/zippoxer/subtask/pkg/render" + "github.com/zippoxer/subtask/pkg/task/index" + "github.com/zippoxer/subtask/pkg/task/ops" + "github.com/zippoxer/subtask/pkg/task/queue" +) + +// QueueCmd implements 'subtask queue' with subcommands. +type QueueCmd struct { + List QueueListCmd `cmd:"" help:"Show merge queue with ordering"` + Merge QueueMergeCmd `cmd:"" help:"Merge ready tasks in queue order"` +} + +// QueueListCmd implements 'subtask queue list'. +type QueueListCmd struct { + Strategy string `default:"conflict-aware" enum:"fifo,priority,conflict-aware" help:"Ordering strategy: fifo, priority, conflict-aware"` + Stage string `help:"Filter by minimum stage (e.g., ready)"` + All bool `short:"a" help:"Include blocked tasks"` +} + +// Run executes the queue list command. +func (c *QueueListCmd) Run() error { + idx, err := index.OpenDefault() + if err != nil { + return fmt.Errorf("failed to open index: %w", err) + } + defer idx.Close() + + ctx := context.Background() + + // Refresh to get latest state + if err := idx.Refresh(ctx, index.RefreshPolicy{ + Git: index.GitPolicy{ + Mode: index.GitOpenOnly, + IncludeIntegration: true, + IncludeConflicts: true, + }, + }); err != nil { + printWarning(fmt.Sprintf("failed to refresh index: %v", err)) + } + + // Build queue + items, err := queue.Build(ctx, idx, queue.Options{ + Strategy: queue.Strategy(c.Strategy), + MinStage: c.Stage, + IncludeAll: c.All, + }) + if err != nil { + return fmt.Errorf("failed to build queue: %w", err) + } + + if len(items) == 0 { + fmt.Println("Merge queue is empty.") + if !c.All { + fmt.Println("\nUse --all to include blocked tasks.") + } + return nil + } + + // Print queue + fmt.Printf("Merge Queue (%d tasks, strategy: %s)\n\n", len(items), c.Strategy) + + // Table header + if render.Pretty { + fmt.Printf(" %-4s %-30s %-8s %-10s %s\n", + "#", "TASK", "PRIORITY", "STAGE", "STATUS") + fmt.Printf(" %-4s %-30s %-8s %-10s %s\n", + "──", "────────────────────────────────", "────────", "──────────", "──────────────────────────") + } else { + fmt.Printf(" %-4s %-30s %-8s %-10s %s\n", + "#", "TASK", "PRIORITY", "STAGE", "STATUS") + } + + for i, item := range items { + name := item.Name + if len(name) > 30 { + name = name[:27] + "..." + } + + stage := item.Stage + if stage == "" { + stage = "-" + } + + status := "ready" + if !item.CanMerge { + status = item.BlockedReason + if len(status) > 30 { + status = status[:27] + "..." + } + } + + // Add conflict/overlap indicators + if len(item.ConflictFiles) > 0 { + status += fmt.Sprintf(" [%d conflicts]", len(item.ConflictFiles)) + } + if len(item.OverlapsWith) > 0 && item.CanMerge { + status += fmt.Sprintf(" [overlaps: %d]", len(item.OverlapsWith)) + } + + fmt.Printf(" %-4d %-30s %-8s %-10s %s\n", + i+1, name, queue.ShortPriorityLabel(item.Priority), stage, status) + } + + // Summary + ready := 0 + blocked := 0 + for _, item := range items { + if item.CanMerge { + ready++ + } else { + blocked++ + } + } + + fmt.Println() + if ready > 0 { + fmt.Printf("%d task(s) ready to merge.\n", ready) + fmt.Printf("\nRun: subtask queue merge\n") + } + if blocked > 0 && c.All { + fmt.Printf("%d task(s) blocked.\n", blocked) + } + + return nil +} + +// QueueMergeCmd implements 'subtask queue merge'. +type QueueMergeCmd struct { + Strategy string `default:"conflict-aware" enum:"fifo,priority,conflict-aware" help:"Ordering strategy"` + Stage string `default:"ready" help:"Required minimum stage"` + DryRun bool `help:"Show plan without merging"` + StopOnError bool `help:"Stop on first failure"` + NoRebase bool `help:"Skip auto-rebase on conflicts"` +} + +// Run executes the queue merge command. +func (c *QueueMergeCmd) Run() error { + idx, err := index.OpenDefault() + if err != nil { + return fmt.Errorf("failed to open index: %w", err) + } + defer idx.Close() + + ctx := context.Background() + + // Refresh to get latest state + if err := idx.Refresh(ctx, index.RefreshPolicy{ + Git: index.GitPolicy{ + Mode: index.GitOpenOnly, + IncludeIntegration: true, + IncludeConflicts: true, + }, + }); err != nil { + printWarning(fmt.Sprintf("failed to refresh index: %v", err)) + } + + result, err := ops.MergeQueue(ctx, idx, ops.QueueMergeOptions{ + Strategy: queue.Strategy(c.Strategy), + MinStage: c.Stage, + DryRun: c.DryRun, + StopOnError: c.StopOnError, + NoRebase: c.NoRebase, + }, cliOpsLogger{}) + + // Print summary + fmt.Println() + if c.DryRun { + fmt.Println("Dry run completed.") + } + + if len(result.Merged) > 0 { + printSuccess(fmt.Sprintf("Merged %d task(s): %s", len(result.Merged), strings.Join(result.Merged, ", "))) + } + + if len(result.Rebased) > 0 { + printInfo(fmt.Sprintf("Rebased %d task(s): %s", len(result.Rebased), strings.Join(result.Rebased, ", "))) + } + + if len(result.Failed) > 0 { + fmt.Println() + printWarning(fmt.Sprintf("Failed %d task(s):", len(result.Failed))) + for name, reason := range result.Failed { + fmt.Printf(" %s: %s\n", name, reason) + } + } + + if len(result.Skipped) > 0 && !c.DryRun { + fmt.Printf("Skipped %d task(s): %s\n", len(result.Skipped), strings.Join(result.Skipped, ", ")) + } + + // Refresh integration cache after merges + if len(result.Merged) > 0 { + if err := idx.Refresh(ctx, index.RefreshPolicy{ + Git: index.GitPolicy{ + Mode: index.GitTasks, + Tasks: result.Merged, + IncludeIntegration: true, + }, + }); err != nil { + printWarning(fmt.Sprintf("failed to refresh integration cache: %v", err)) + } + } + + return err +} diff --git a/pkg/task/index/query.go b/pkg/task/index/query.go index 5a2200b..d22eecd 100644 --- a/pkg/task/index/query.go +++ b/pkg/task/index/query.go @@ -15,6 +15,7 @@ type ListItem struct { Title string FollowUp string BaseBranch string + Priority int TaskStatus task.TaskStatus WorkerStatus task.WorkerStatus @@ -47,6 +48,7 @@ type Record struct { TaskStatus task.TaskStatus WorkerStatus task.WorkerStatus Stage string + Priority int State *task.State ProgressMeta *task.Progress @@ -71,7 +73,7 @@ func (i *Index) ListAll(ctx context.Context) ([]ListItem, error) { } const q = ` SELECT - name, title, follow_up, base_branch, + name, title, follow_up, base_branch, priority, task_status, worker_status, stage, workspace, started_at_ns, last_error, last_history_ns, @@ -92,7 +94,7 @@ func (i *Index) ListOpen(ctx context.Context) ([]ListItem, error) { } const q = ` SELECT - name, title, follow_up, base_branch, + name, title, follow_up, base_branch, priority, task_status, worker_status, stage, workspace, started_at_ns, last_error, last_history_ns, @@ -114,7 +116,7 @@ func (i *Index) ListClosed(ctx context.Context) ([]ListItem, error) { } const q = ` SELECT - name, title, follow_up, base_branch, + name, title, follow_up, base_branch, priority, task_status, worker_status, stage, workspace, started_at_ns, last_error, last_history_ns, @@ -141,6 +143,7 @@ func (i *Index) queryList(ctx context.Context, q string) ([]ListItem, error) { for rows.Next() { var ( name, title, followUp, baseBranch string + priority int taskStatus, workerStatus, stage string workspace string startedAtNS int64 @@ -155,7 +158,7 @@ func (i *Index) queryList(ctx context.Context, q string) ([]ListItem, error) { integratedReason sql.NullString ) if err := rows.Scan( - &name, &title, &followUp, &baseBranch, + &name, &title, &followUp, &baseBranch, &priority, &taskStatus, &workerStatus, &stage, &workspace, &startedAtNS, &lastError, &lastHistoryNS, @@ -173,6 +176,7 @@ func (i *Index) queryList(ctx context.Context, q string) ([]ListItem, error) { Title: title, FollowUp: followUp, BaseBranch: baseBranch, + Priority: priority, TaskStatus: task.TaskStatus(taskStatus), WorkerStatus: task.ParseWorkerStatus(workerStatus), Stage: stage, @@ -210,7 +214,7 @@ func (i *Index) Get(ctx context.Context, taskName string) (Record, bool, error) const q = ` SELECT name, title, base_branch, follow_up, model, reasoning, description, - task_schema, task_status, worker_status, stage, + task_schema, priority, task_status, worker_status, stage, workspace, started_at_ns, supervisor_pid, last_error, last_history_ns, tool_calls, last_active_ns, @@ -226,6 +230,7 @@ WHERE name = ?; var ( name, title, baseBranch, followUp, model, reasoning, description string taskSchema int + priority int taskStatus, workerStatus, stage string workspace string startedAtNS int64 @@ -243,7 +248,7 @@ WHERE name = ?; err := i.db.QueryRowContext(ctx, q, taskName).Scan( &name, &title, &baseBranch, &followUp, &model, &reasoning, &description, - &taskSchema, &taskStatus, &workerStatus, &stage, + &taskSchema, &priority, &taskStatus, &workerStatus, &stage, &workspace, &startedAtNS, &supervisorPID, &lastError, &lastHistoryNS, &toolCalls, &lastActiveNS, @@ -270,10 +275,12 @@ WHERE name = ?; Reasoning: reasoning, Schema: taskSchema, Description: description, + Priority: priority, }, TaskStatus: task.TaskStatus(taskStatus), WorkerStatus: task.ParseWorkerStatus(workerStatus), Stage: stage, + Priority: priority, LastHistory: timeFromNS(lastHistoryNS), ProgressDone: progressDone, ProgressTotal: progressTotal, diff --git a/pkg/task/index/refresh.go b/pkg/task/index/refresh.go index c09786e..5ed22d9 100644 --- a/pkg/task/index/refresh.go +++ b/pkg/task/index/refresh.go @@ -256,6 +256,7 @@ type taskRow struct { reasoning string description string taskSchema int + priority int // Durable state (history.jsonl) taskStatus string @@ -315,6 +316,7 @@ func buildRowFromDisk(taskName, filesSig string) (taskRow, bool, error) { row.reasoning = t.Reasoning row.description = t.Description row.taskSchema = t.Schema + row.priority = t.EffectivePriority() // Durable state from history. tail, _ := history.Tail(taskName) @@ -553,6 +555,7 @@ INSERT INTO tasks ( last_run_duration_ms, progress_done, progress_total, status_rank, + priority, files_sig ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, @@ -562,6 +565,7 @@ INSERT INTO tasks ( ?, ?, ?, ?, + ?, ? ) ON CONFLICT(name) DO UPDATE SET @@ -587,6 +591,7 @@ ON CONFLICT(name) DO UPDATE SET progress_done=excluded.progress_done, progress_total=excluded.progress_total, status_rank=excluded.status_rank, + priority=excluded.priority, files_sig=excluded.files_sig; ` @@ -607,6 +612,7 @@ ON CONFLICT(name) DO UPDATE SET row.lastRunDuration, row.progressDone, row.progressTotal, row.statusRank, + row.priority, row.filesSig, ); err != nil { return fmt.Errorf("index refresh: upsert %q: %w", row.name, err) diff --git a/pkg/task/index/schema.go b/pkg/task/index/schema.go index dcd4813..8593f9a 100644 --- a/pkg/task/index/schema.go +++ b/pkg/task/index/schema.go @@ -7,7 +7,7 @@ import ( "strings" ) -const schemaVersion = 6 +const schemaVersion = 7 func migrateSchema(ctx context.Context, db *sql.DB) error { var v int @@ -69,6 +69,13 @@ func migrateSchema(ctx context.Context, db *sql.DB) error { v = 6 } + if v == 6 { + if err := migrateToV7(ctx, tx); err != nil { + return err + } + v = 7 + } + if _, err := tx.ExecContext(ctx, fmt.Sprintf("PRAGMA user_version=%d;", v)); err != nil { return fmt.Errorf("set index schema version: %w", err) } @@ -253,3 +260,29 @@ func migrateToV6(ctx context.Context, tx *sql.Tx) error { } return nil } + +func migrateToV7(ctx context.Context, tx *sql.Tx) error { + // Add priority column for merge queue ordering. + // Priority 1-5 (P1=critical, P5=low), default 3. + // Index uses ASC because lower numbers = higher priority. + + // Add the priority column first. + if _, err := tx.ExecContext(ctx, `ALTER TABLE tasks ADD COLUMN priority INTEGER NOT NULL DEFAULT 3;`); err != nil { + // ALTER TABLE is not idempotent; ignore duplicate column errors. + if !strings.Contains(err.Error(), "duplicate column name") { + return fmt.Errorf("migrate v7: %w", err) + } + } + + // Create index only if last_history_ns column exists (it should in real databases, + // but may not exist in minimal test schemas from earlier migration tests). + var hasColumn int + err := tx.QueryRowContext(ctx, `SELECT COUNT(*) FROM pragma_table_info('tasks') WHERE name='last_history_ns';`).Scan(&hasColumn) + if err == nil && hasColumn > 0 { + if _, err := tx.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS tasks_priority_idx ON tasks(priority ASC, last_history_ns DESC);`); err != nil { + return fmt.Errorf("migrate v7: create index: %w", err) + } + } + + return nil +} diff --git a/pkg/task/ops/queue.go b/pkg/task/ops/queue.go new file mode 100644 index 0000000..0b5b5e2 --- /dev/null +++ b/pkg/task/ops/queue.go @@ -0,0 +1,211 @@ +package ops + +import ( + "context" + "fmt" + "strings" + + "github.com/zippoxer/subtask/pkg/git" + "github.com/zippoxer/subtask/pkg/task" + "github.com/zippoxer/subtask/pkg/task/index" + "github.com/zippoxer/subtask/pkg/task/queue" +) + +// QueueMergeOptions configures the merge queue operation. +type QueueMergeOptions struct { + Strategy queue.Strategy // Ordering strategy + MinStage string // Required minimum stage (default: "ready") + DryRun bool // Show plan without merging + StopOnError bool // Stop on first failure + NoRebase bool // Skip auto-rebase on conflicts + RepoDir string // Repository directory +} + +// QueueMergeResult contains the results of a queue merge operation. +type QueueMergeResult struct { + Merged []string // Successfully merged tasks + Rebased []string // Tasks that were auto-rebased + Failed map[string]string // Task name -> error message + Skipped []string // Tasks skipped (blocked or dry-run) +} + +// MergeQueue merges all ready tasks in queue order. +func MergeQueue(ctx context.Context, idx *index.Index, opts QueueMergeOptions, logger Logger) (QueueMergeResult, error) { + if opts.Strategy == "" { + opts.Strategy = queue.StrategyConflictAware + } + if opts.MinStage == "" { + opts.MinStage = "ready" + } + if opts.RepoDir == "" { + opts.RepoDir = "." + } + + result := QueueMergeResult{ + Failed: make(map[string]string), + } + + // Build the queue + items, err := queue.Build(ctx, idx, queue.Options{ + Strategy: opts.Strategy, + MinStage: opts.MinStage, + IncludeAll: false, // Only mergeable tasks + RepoDir: opts.RepoDir, + }) + if err != nil { + return result, fmt.Errorf("failed to build queue: %w", err) + } + + if len(items) == 0 { + return result, nil + } + + logInfo(logger, fmt.Sprintf("Merge queue: %d tasks ready", len(items))) + + for i, item := range items { + taskName := item.Name + + // In dry-run mode, just report what would happen + if opts.DryRun { + logInfo(logger, fmt.Sprintf("[%d/%d] Would merge: %s (%s)", + i+1, len(items), taskName, queue.ShortPriorityLabel(item.Priority))) + result.Skipped = append(result.Skipped, taskName) + continue + } + + logInfo(logger, fmt.Sprintf("[%d/%d] Merging: %s", i+1, len(items), taskName)) + + // Re-check for conflicts (base may have moved after previous merges) + if !opts.NoRebase && item.Workspace != "" { + conflicts, err := git.MergeConflictFiles(item.Workspace, item.BaseBranch, "HEAD") + if err == nil && len(conflicts) > 0 { + // Try auto-rebase + logInfo(logger, fmt.Sprintf(" Rebasing onto %s...", item.BaseBranch)) + if err := AutoRebase(taskName, item.Workspace, item.BaseBranch, logger); err != nil { + result.Failed[taskName] = fmt.Sprintf("rebase failed: %v", err) + if opts.StopOnError { + return result, fmt.Errorf("merge queue stopped: rebase failed for %s: %w", taskName, err) + } + continue + } + result.Rebased = append(result.Rebased, taskName) + } + } + + // Attempt merge + commitMsg := fmt.Sprintf("Merge %s", item.Title) + if item.Title == "" { + commitMsg = fmt.Sprintf("Merge task %s", taskName) + } + + mergeResult, err := MergeTask(taskName, commitMsg, logger) + if err != nil { + result.Failed[taskName] = err.Error() + if opts.StopOnError { + return result, fmt.Errorf("merge queue stopped: %s failed: %w", taskName, err) + } + logWarning(logger, fmt.Sprintf(" Failed: %v", err)) + continue + } + + if mergeResult.AlreadyMerged || mergeResult.AlreadyClosed { + result.Skipped = append(result.Skipped, taskName) + continue + } + + result.Merged = append(result.Merged, taskName) + } + + return result, nil +} + +// AutoRebase rebases a task's branch onto the current base branch. +// This is used to resolve conflicts that arise when the base branch moves. +func AutoRebase(taskName, workspace, baseBranch string, logger Logger) error { + var rebaseErr error + locked, err := task.TryWithLock(taskName, func() error { + rebaseErr = autoRebaseUnlocked(workspace, baseBranch, logger) + return rebaseErr + }) + if err != nil { + return err + } + if !locked { + return fmt.Errorf("task %q is busy (another operation is in progress)", taskName) + } + return rebaseErr +} + +func autoRebaseUnlocked(workspace, baseBranch string, logger Logger) error { + // Get current branch + currentBranch, err := git.CurrentBranch(workspace) + if err != nil { + return fmt.Errorf("failed to get current branch: %w", err) + } + + if currentBranch == "HEAD" { + return fmt.Errorf("workspace is in detached HEAD state") + } + + // Ensure workspace is clean + clean, err := git.IsClean(workspace) + if err != nil { + return fmt.Errorf("failed to check workspace status: %w", err) + } + if !clean { + return fmt.Errorf("workspace has uncommitted changes") + } + + // Attempt rebase + if err := git.RebaseOnto(workspace, baseBranch); err != nil { + // Extract conflict info from error + errStr := err.Error() + if strings.Contains(errStr, "CONFLICT") { + return fmt.Errorf("conflicts detected:\n%s", errStr) + } + return fmt.Errorf("rebase failed: %w", err) + } + + logSuccess(logger, fmt.Sprintf(" Rebased onto %s", baseBranch)) + return nil +} + +// CheckQueueHealth returns information about the merge queue state. +func CheckQueueHealth(ctx context.Context, idx *index.Index, opts queue.Options) (QueueHealthInfo, error) { + items, err := queue.Build(ctx, idx, queue.Options{ + Strategy: opts.Strategy, + MinStage: opts.MinStage, + IncludeAll: true, // Include blocked to count them + RepoDir: opts.RepoDir, + }) + if err != nil { + return QueueHealthInfo{}, err + } + + info := QueueHealthInfo{} + for _, item := range items { + info.Total++ + if item.CanMerge { + info.Ready++ + } else { + info.Blocked++ + } + if len(item.ConflictFiles) > 0 { + info.WithConflicts++ + } + if len(item.OverlapsWith) > 0 { + info.WithOverlaps++ + } + } + + return info, nil +} + +// QueueHealthInfo provides summary statistics about the merge queue. +type QueueHealthInfo struct { + Total int // Total tasks in queue + Ready int // Tasks ready to merge + Blocked int // Tasks blocked from merging + WithConflicts int // Tasks with base branch conflicts + WithOverlaps int // Tasks with file overlaps with other tasks +} diff --git a/pkg/task/queue/conflicts.go b/pkg/task/queue/conflicts.go new file mode 100644 index 0000000..11d7b1f --- /dev/null +++ b/pkg/task/queue/conflicts.go @@ -0,0 +1,130 @@ +package queue + +import ( + "sort" +) + +// BuildOverlapMatrix builds a map of task name -> tasks that share changed files. +// This helps identify which tasks might conflict with each other. +func BuildOverlapMatrix(items []QueueItem) map[string][]string { + // Build file -> tasks map + fileToTasks := make(map[string][]string) + for _, item := range items { + for _, f := range item.changedFiles { + fileToTasks[f] = append(fileToTasks[f], item.Name) + } + } + + // Build task -> overlapping tasks map + overlaps := make(map[string][]string) + for _, item := range items { + seen := make(map[string]bool) + for _, f := range item.changedFiles { + for _, other := range fileToTasks[f] { + if other != item.Name && !seen[other] { + seen[other] = true + overlaps[item.Name] = append(overlaps[item.Name], other) + } + } + } + // Sort for deterministic output + sort.Strings(overlaps[item.Name]) + } + + return overlaps +} + +// OptimalMergeOrder returns tasks ordered to minimize cascading conflicts. +// Uses a greedy algorithm: at each step, pick the task with fewest remaining overlaps. +// After "merging" a task, its files no longer count as conflicts for remaining tasks. +func OptimalMergeOrder(items []QueueItem) []QueueItem { + if len(items) <= 1 { + return items + } + + // Build file -> tasks map + fileToTasks := make(map[string]map[string]bool) + for _, item := range items { + for _, f := range item.changedFiles { + if fileToTasks[f] == nil { + fileToTasks[f] = make(map[string]bool) + } + fileToTasks[f][item.Name] = true + } + } + + // Track which tasks are already ordered + ordered := make([]QueueItem, 0, len(items)) + remaining := make(map[string]*QueueItem, len(items)) + for i := range items { + remaining[items[i].Name] = &items[i] + } + + for len(remaining) > 0 { + // Find task with fewest remaining overlaps, preferring higher scores + var best *QueueItem + bestOverlaps := -1 + + for _, item := range remaining { + overlaps := countRemainingOverlaps(item, fileToTasks, remaining) + + if best == nil || + overlaps < bestOverlaps || + (overlaps == bestOverlaps && item.Score > best.Score) { + best = item + bestOverlaps = overlaps + } + } + + if best == nil { + break + } + + // Add to ordered list + ordered = append(ordered, *best) + delete(remaining, best.Name) + + // Remove this task's files from the overlap map + // (simulating that these files are now "committed" to base) + for _, f := range best.changedFiles { + delete(fileToTasks[f], best.Name) + } + } + + return ordered +} + +// countRemainingOverlaps counts how many remaining tasks share files with this task. +func countRemainingOverlaps(item *QueueItem, fileToTasks map[string]map[string]bool, remaining map[string]*QueueItem) int { + seen := make(map[string]bool) + for _, f := range item.changedFiles { + for other := range fileToTasks[f] { + if other != item.Name && remaining[other] != nil && !seen[other] { + seen[other] = true + } + } + } + return len(seen) +} + +// GetOverlappingFiles returns the files that overlap between two tasks. +func GetOverlappingFiles(a, b *QueueItem) []string { + if a == nil || b == nil { + return nil + } + + aFiles := make(map[string]bool, len(a.changedFiles)) + for _, f := range a.changedFiles { + aFiles[f] = true + } + + var overlap []string + for _, f := range b.changedFiles { + if aFiles[f] { + overlap = append(overlap, f) + } + } + + sort.Strings(overlap) + return overlap +} diff --git a/pkg/task/queue/maturity.go b/pkg/task/queue/maturity.go new file mode 100644 index 0000000..987c1fa --- /dev/null +++ b/pkg/task/queue/maturity.go @@ -0,0 +1,165 @@ +package queue + +import ( + "github.com/zippoxer/subtask/pkg/git" + "github.com/zippoxer/subtask/pkg/task" +) + +// CheckMaturity checks if a task is ready to be merged. +// Returns (canMerge, reason) where reason explains why it can't merge. +func CheckMaturity(item *QueueItem, minStage string) (bool, string) { + // Check task status + if item.TaskStatus != task.TaskStatusOpen { + return false, "task is " + string(item.TaskStatus) + } + + // Check worker status + if item.WorkerStatus == task.WorkerStatusRunning { + return false, "worker is running" + } + + // Check workspace exists + if item.Workspace == "" { + return false, "no workspace assigned" + } + + // Check stage requirement + if minStage != "" && !stageReached(item.Stage, minStage) { + return false, "stage is " + item.Stage + ", requires " + minStage + } + + // Check workspace is clean + clean, err := git.IsClean(item.Workspace) + if err != nil { + return false, "cannot check workspace: " + err.Error() + } + if !clean { + return false, "workspace has uncommitted changes" + } + + // Check for conflicts with base branch + if len(item.ConflictFiles) > 0 { + return false, "has conflicts with " + item.BaseBranch + } + + // Check for commits to merge + if !hasCommits(item) { + return false, "no commits to merge" + } + + return true, "" +} + +// stageReached checks if current stage has reached the minimum required stage. +func stageReached(current, min string) bool { + stages := []string{"plan", "implement", "doing", "review", "ready"} + + currentIdx := -1 + minIdx := -1 + for i, s := range stages { + if s == current { + currentIdx = i + } + if s == min { + minIdx = i + } + } + + // If stage not found, be lenient + if currentIdx == -1 || minIdx == -1 { + return true + } + + return currentIdx >= minIdx +} + +// hasCommits checks if the task branch has commits to merge. +func hasCommits(item *QueueItem) bool { + if item.Workspace == "" || item.BaseBranch == "" { + return false + } + + // Get merge base + base, err := git.MergeBase(item.Workspace, item.BaseBranch, "HEAD") + if err != nil { + return false + } + + // Check if there are commits between merge base and HEAD + subjects, err := git.GetCommitSubjects(item.Workspace, base) + if err != nil { + return false + } + + return len(subjects) > 0 +} + +// MaturityStatus provides detailed maturity information. +type MaturityStatus struct { + CanMerge bool + Reason string + WorkspaceClean bool + HasCommits bool + StageReached bool + HasConflicts bool +} + +// GetMaturityStatus returns detailed maturity information for a queue item. +func GetMaturityStatus(item *QueueItem, minStage string) MaturityStatus { + status := MaturityStatus{} + + // Task must be open + if item.TaskStatus != task.TaskStatusOpen { + status.Reason = "task is " + string(item.TaskStatus) + return status + } + + // Worker must not be running + if item.WorkerStatus == task.WorkerStatusRunning { + status.Reason = "worker is running" + return status + } + + // Check workspace + if item.Workspace == "" { + status.Reason = "no workspace assigned" + return status + } + + // Check stage + status.StageReached = stageReached(item.Stage, minStage) + if minStage != "" && !status.StageReached { + status.Reason = "stage is " + item.Stage + ", requires " + minStage + } + + // Check workspace cleanliness + clean, err := git.IsClean(item.Workspace) + if err == nil { + status.WorkspaceClean = clean + if !clean && status.Reason == "" { + status.Reason = "workspace has uncommitted changes" + } + } + + // Check conflicts + status.HasConflicts = len(item.ConflictFiles) > 0 + if status.HasConflicts && status.Reason == "" { + status.Reason = "has conflicts with " + item.BaseBranch + } + + // Check commits + status.HasCommits = hasCommits(item) + if !status.HasCommits && status.Reason == "" { + status.Reason = "no commits to merge" + } + + // Determine overall merge readiness + status.CanMerge = status.WorkspaceClean && + status.HasCommits && + status.StageReached && + !status.HasConflicts && + item.TaskStatus == task.TaskStatusOpen && + item.WorkerStatus != task.WorkerStatusRunning + + return status +} diff --git a/pkg/task/queue/queue.go b/pkg/task/queue/queue.go new file mode 100644 index 0000000..b0bb3ef --- /dev/null +++ b/pkg/task/queue/queue.go @@ -0,0 +1,196 @@ +// Package queue provides merge queue ordering and conflict detection. +package queue + +import ( + "context" + "encoding/json" + "sort" + + "github.com/zippoxer/subtask/pkg/git" + "github.com/zippoxer/subtask/pkg/task" + "github.com/zippoxer/subtask/pkg/task/index" +) + +// Strategy defines how the merge queue is ordered. +type Strategy string + +const ( + // StrategyFIFO orders tasks by creation time (oldest first). + StrategyFIFO Strategy = "fifo" + // StrategyPriority orders by priority then age. + StrategyPriority Strategy = "priority" + // StrategyConflictAware orders to minimize cascading conflicts (default). + StrategyConflictAware Strategy = "conflict-aware" +) + +// QueueItem represents a task in the merge queue. +type QueueItem struct { + Name string + Title string + Priority int // 1-5 (P1=critical, P5=low) + Stage string + TaskStatus task.TaskStatus + WorkerStatus task.WorkerStatus + Workspace string + BaseBranch string + + // Computed fields + Score float64 // Computed merge priority score + ConflictFiles []string // Files that would conflict with base branch + OverlapsWith []string // Other tasks with shared changed files + CanMerge bool // Whether task can be merged now + BlockedReason string // Why task cannot be merged (if blocked) + + // Internal + changedFiles []string // Files changed by this task + lastHistory int64 // For age calculation +} + +// Options configures queue building. +type Options struct { + Strategy Strategy // Ordering strategy (default: conflict-aware) + MinStage string // Minimum stage required (e.g., "ready") + IncludeAll bool // Include blocked tasks + RepoDir string // Repository directory for git operations +} + +// Build creates an ordered merge queue from indexed tasks. +func Build(ctx context.Context, idx *index.Index, opts Options) ([]QueueItem, error) { + if opts.Strategy == "" { + opts.Strategy = StrategyConflictAware + } + if opts.RepoDir == "" { + opts.RepoDir = "." + } + + // Get all open tasks + items, err := idx.ListOpen(ctx) + if err != nil { + return nil, err + } + + // Convert to queue items + queue := make([]QueueItem, 0, len(items)) + for _, item := range items { + qi := QueueItem{ + Name: item.Name, + Title: item.Title, + Priority: item.Priority, + Stage: item.Stage, + TaskStatus: item.TaskStatus, + WorkerStatus: item.WorkerStatus, + Workspace: item.Workspace, + BaseBranch: item.BaseBranch, + lastHistory: item.LastHistory.UnixNano(), + } + + // Skip merged/closed tasks + if item.TaskStatus != task.TaskStatusOpen { + continue + } + + // Check maturity (can merge now?) + qi.CanMerge, qi.BlockedReason = CheckMaturity(&qi, opts.MinStage) + + // Get conflict info from index if available + rec, ok, err := idx.Get(ctx, item.Name) + if err == nil && ok && rec.ConflictFilesJSON != "" { + var files []string + if json.Unmarshal([]byte(rec.ConflictFilesJSON), &files) == nil { + qi.ConflictFiles = files + } + } + + // Get changed files for overlap detection (if workspace exists) + if qi.Workspace != "" { + if base, err := git.ResolveDiffBase(qi.Workspace, "HEAD", qi.BaseBranch); err == nil { + if files, err := git.DiffNameStatus(qi.Workspace, base); err == nil { + qi.changedFiles = make([]string, 0, len(files)) + for f := range files { + qi.changedFiles = append(qi.changedFiles, f) + } + } + } + } + + if !opts.IncludeAll && !qi.CanMerge { + continue + } + + queue = append(queue, qi) + } + + // Build overlap matrix and compute overlaps + if opts.Strategy == StrategyConflictAware { + matrix := BuildOverlapMatrix(queue) + for i := range queue { + queue[i].OverlapsWith = matrix[queue[i].Name] + } + } + + // Calculate scores + var maxAge int64 + for _, qi := range queue { + if qi.lastHistory > maxAge { + maxAge = qi.lastHistory + } + } + for i := range queue { + queue[i].Score = CalculateScore(&queue[i], maxAge) + } + + // Sort by strategy + switch opts.Strategy { + case StrategyFIFO: + sort.Slice(queue, func(i, j int) bool { + return queue[i].lastHistory < queue[j].lastHistory + }) + case StrategyPriority: + sort.Slice(queue, func(i, j int) bool { + if queue[i].Priority != queue[j].Priority { + return queue[i].Priority < queue[j].Priority // P1 < P5 + } + return queue[i].lastHistory < queue[j].lastHistory + }) + case StrategyConflictAware: + queue = OptimalMergeOrder(queue) + } + + return queue, nil +} + +// PriorityLabel returns a human-readable label for priority. +func PriorityLabel(p int) string { + switch p { + case 1: + return "P1 (critical)" + case 2: + return "P2 (high)" + case 3: + return "P3 (normal)" + case 4: + return "P4 (low)" + case 5: + return "P5 (lowest)" + default: + return "P3 (normal)" + } +} + +// ShortPriorityLabel returns a short label for priority. +func ShortPriorityLabel(p int) string { + switch p { + case 1: + return "P1" + case 2: + return "P2" + case 3: + return "P3" + case 4: + return "P4" + case 5: + return "P5" + default: + return "P3" + } +} diff --git a/pkg/task/queue/scoring.go b/pkg/task/queue/scoring.go new file mode 100644 index 0000000..25b9b1a --- /dev/null +++ b/pkg/task/queue/scoring.go @@ -0,0 +1,96 @@ +package queue + +import ( + "time" +) + +// Score weights for merge queue ordering. +const ( + weightPriority = 0.50 // Priority weight (P1=1.0, P5=0.2) + weightAge = 0.20 // Age weight (older = higher) + weightConflictFree = 0.20 // Conflict-free bonus + weightStage = 0.10 // Stage bonus (ready=1.0, review=0.75, etc.) +) + +// CalculateScore computes a merge priority score for a queue item. +// Higher scores indicate higher merge priority. +// The score is normalized to 0-1 range. +func CalculateScore(item *QueueItem, maxAgeNS int64) float64 { + var score float64 + + // Priority component: P1=1.0, P2=0.8, P3=0.6, P4=0.4, P5=0.2 + priorityScore := priorityToScore(item.Priority) + score += weightPriority * priorityScore + + // Age component: older tasks get higher scores + ageScore := ageToScore(item.lastHistory, maxAgeNS) + score += weightAge * ageScore + + // Conflict-free component: tasks without conflicts get bonus + if len(item.ConflictFiles) == 0 { + score += weightConflictFree + } + + // Stage component: tasks closer to ready get higher scores + stageScore := stageToScore(item.Stage) + score += weightStage * stageScore + + return score +} + +// priorityToScore converts priority (1-5) to a 0-1 score. +// P1 (critical) = 1.0, P5 (lowest) = 0.2 +func priorityToScore(priority int) float64 { + switch priority { + case 1: + return 1.0 + case 2: + return 0.8 + case 3: + return 0.6 + case 4: + return 0.4 + case 5: + return 0.2 + default: + return 0.6 // Default to P3 + } +} + +// ageToScore converts task age to a 0-1 score. +// Older tasks get higher scores (up to 1 week old = 1.0). +func ageToScore(taskHistoryNS, maxAgeNS int64) float64 { + if maxAgeNS == 0 || taskHistoryNS == 0 { + return 0.5 // Default middle score + } + + now := time.Now().UnixNano() + taskAge := now - taskHistoryNS + if taskAge <= 0 { + return 0.0 + } + + // Normalize age to 0-1, capping at 1 week + maxAge := int64(7 * 24 * time.Hour) + if taskAge > maxAge { + return 1.0 + } + return float64(taskAge) / float64(maxAge) +} + +// stageToScore converts workflow stage to a 0-1 score. +// Tasks closer to "ready" get higher scores. +func stageToScore(stage string) float64 { + switch stage { + case "ready": + return 1.0 + case "review": + return 0.75 + case "implement", "doing": + return 0.5 + case "plan": + return 0.25 + default: + return 0.5 // Default + } +} diff --git a/pkg/task/task.go b/pkg/task/task.go index e648bff..07832ea 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -20,6 +20,7 @@ type Task struct { Reasoning string // Optional: override reasoning (codex-only) for this task Schema int // Task schema version (0 if missing) Description string // Optional task description/context (not the prompt) + Priority int // 1-5 (P1=critical, P5=low), default 3 } // frontmatter is the YAML frontmatter in TASK.md. @@ -30,6 +31,7 @@ type frontmatter struct { FollowUp string `yaml:"follow-up,omitempty"` Model string `yaml:"model,omitempty"` Reasoning string `yaml:"reasoning,omitempty"` + Priority int `yaml:"priority,omitempty"` } // Save writes the task to .subtask/tasks//TASK.md. @@ -46,6 +48,7 @@ func (t *Task) Save() error { FollowUp: t.FollowUp, Model: t.Model, Reasoning: t.Reasoning, + Priority: t.Priority, } var buf bytes.Buffer @@ -105,6 +108,7 @@ func Load(name string) (*Task, error) { Model: fm.Model, Reasoning: fm.Reasoning, Description: strings.TrimSpace(prompt), + Priority: fm.Priority, }, nil } @@ -133,3 +137,14 @@ func List() ([]string, error) { func (t *Task) Path() string { return Path(t.Name) } + +// DefaultPriority is the default priority for tasks (middle of 1-5 scale). +const DefaultPriority = 3 + +// EffectivePriority returns the task's priority, defaulting to 3 if not set. +func (t *Task) EffectivePriority() int { + if t.Priority == 0 { + return DefaultPriority + } + return t.Priority +}