Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cmd/subtask/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type DraftCmd struct {
Description string `arg:"" optional:"" help:"Task description (or use stdin)"`
Base string `name:"base-branch" required:"" help:"Base branch"`
Title string `required:"" help:"Short description"`
Priority int `default:"3" help:"Priority 1-5 (P1=critical, P5=low, default P3)"`
Model string `help:"Default model for this task (overrides project config)"`
Reasoning string `help:"Default reasoning for this task (codex-only; overrides project config)"`
Workflow string `help:"Workflow template to use (e.g., collaborative)"`
Expand Down Expand Up @@ -65,6 +66,16 @@ func (c *DraftCmd) Run() error {
if err := workspace.ValidateReasoningLevel(c.Reasoning); err != nil {
return err
}

// Validate and normalize priority (0 means use default)
priority := c.Priority
if priority == 0 {
priority = task.DefaultPriority
}
if priority < 1 || priority > 5 {
return fmt.Errorf("priority must be 1-5 (P1=critical, P5=low), got %d", c.Priority)
}

t := &task.Task{
Name: c.Task,
Title: c.Title,
Expand All @@ -74,6 +85,7 @@ func (c *DraftCmd) Run() error {
Model: c.Model,
Reasoning: c.Reasoning,
Schema: 1,
Priority: priority,
}

if err := t.Save(); err != nil {
Expand Down Expand Up @@ -103,6 +115,7 @@ func (c *DraftCmd) Run() error {
"base_branch": c.Base,
"workflow": wf.Name,
"title": c.Title,
"priority": priority,
"follow_up": c.FollowUp,
"model": c.Model,
"reasoning": c.Reasoning,
Expand Down
1 change: 1 addition & 0 deletions cmd/subtask/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type CLI struct {
Diff DiffCmd `cmd:"" help:"Show task diff"`
Close CloseCmd `cmd:"" help:"Close a task and free workspace"`
Merge MergeCmd `cmd:"" help:"Merge task into base branch (marks as merged)"`
Queue QueueCmd `cmd:"" help:"Merge queue operations"`
Workspace WorkspaceCmd `cmd:"" help:"Print workspace path for a task"`
Review ReviewCmd `cmd:"" help:"Get review of task changes"`
Trace LogsCmd `cmd:"" help:"Debug worker runs (tool calls, errors)"`
Expand Down
212 changes: 212 additions & 0 deletions cmd/subtask/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package main

import (
"context"
"fmt"
"strings"

"github.com/zippoxer/subtask/pkg/render"
"github.com/zippoxer/subtask/pkg/task/index"
"github.com/zippoxer/subtask/pkg/task/ops"
"github.com/zippoxer/subtask/pkg/task/queue"
)

// QueueCmd implements 'subtask queue' with subcommands.
type QueueCmd struct {
List QueueListCmd `cmd:"" help:"Show merge queue with ordering"`
Merge QueueMergeCmd `cmd:"" help:"Merge ready tasks in queue order"`
}

// QueueListCmd implements 'subtask queue list'.
type QueueListCmd struct {
Strategy string `default:"conflict-aware" enum:"fifo,priority,conflict-aware" help:"Ordering strategy: fifo, priority, conflict-aware"`
Stage string `help:"Filter by minimum stage (e.g., ready)"`
All bool `short:"a" help:"Include blocked tasks"`
}

// Run executes the queue list command.
func (c *QueueListCmd) Run() error {
idx, err := index.OpenDefault()
if err != nil {
return fmt.Errorf("failed to open index: %w", err)
}
defer idx.Close()

ctx := context.Background()

// Refresh to get latest state
if err := idx.Refresh(ctx, index.RefreshPolicy{
Git: index.GitPolicy{
Mode: index.GitOpenOnly,
IncludeIntegration: true,
IncludeConflicts: true,
},
}); err != nil {
printWarning(fmt.Sprintf("failed to refresh index: %v", err))
}

// Build queue
items, err := queue.Build(ctx, idx, queue.Options{
Strategy: queue.Strategy(c.Strategy),
MinStage: c.Stage,
IncludeAll: c.All,
})
if err != nil {
return fmt.Errorf("failed to build queue: %w", err)
}

if len(items) == 0 {
fmt.Println("Merge queue is empty.")
if !c.All {
fmt.Println("\nUse --all to include blocked tasks.")
}
return nil
}

// Print queue
fmt.Printf("Merge Queue (%d tasks, strategy: %s)\n\n", len(items), c.Strategy)

// Table header
if render.Pretty {
fmt.Printf(" %-4s %-30s %-8s %-10s %s\n",
"#", "TASK", "PRIORITY", "STAGE", "STATUS")
fmt.Printf(" %-4s %-30s %-8s %-10s %s\n",
"──", "────────────────────────────────", "────────", "──────────", "──────────────────────────")
} else {
fmt.Printf(" %-4s %-30s %-8s %-10s %s\n",
"#", "TASK", "PRIORITY", "STAGE", "STATUS")
}

for i, item := range items {
name := item.Name
if len(name) > 30 {
name = name[:27] + "..."
}

stage := item.Stage
if stage == "" {
stage = "-"
}

status := "ready"
if !item.CanMerge {
status = item.BlockedReason
if len(status) > 30 {
status = status[:27] + "..."
}
}

// Add conflict/overlap indicators
if len(item.ConflictFiles) > 0 {
status += fmt.Sprintf(" [%d conflicts]", len(item.ConflictFiles))
}
if len(item.OverlapsWith) > 0 && item.CanMerge {
status += fmt.Sprintf(" [overlaps: %d]", len(item.OverlapsWith))
}

fmt.Printf(" %-4d %-30s %-8s %-10s %s\n",
i+1, name, queue.ShortPriorityLabel(item.Priority), stage, status)
}

// Summary
ready := 0
blocked := 0
for _, item := range items {
if item.CanMerge {
ready++
} else {
blocked++
}
}

fmt.Println()
if ready > 0 {
fmt.Printf("%d task(s) ready to merge.\n", ready)
fmt.Printf("\nRun: subtask queue merge\n")
}
if blocked > 0 && c.All {
fmt.Printf("%d task(s) blocked.\n", blocked)
}

return nil
}

// QueueMergeCmd implements 'subtask queue merge'.
type QueueMergeCmd struct {
Strategy string `default:"conflict-aware" enum:"fifo,priority,conflict-aware" help:"Ordering strategy"`
Stage string `default:"ready" help:"Required minimum stage"`
DryRun bool `help:"Show plan without merging"`
StopOnError bool `help:"Stop on first failure"`
NoRebase bool `help:"Skip auto-rebase on conflicts"`
}

// Run executes the queue merge command.
func (c *QueueMergeCmd) Run() error {
idx, err := index.OpenDefault()
if err != nil {
return fmt.Errorf("failed to open index: %w", err)
}
defer idx.Close()

ctx := context.Background()

// Refresh to get latest state
if err := idx.Refresh(ctx, index.RefreshPolicy{
Git: index.GitPolicy{
Mode: index.GitOpenOnly,
IncludeIntegration: true,
IncludeConflicts: true,
},
}); err != nil {
printWarning(fmt.Sprintf("failed to refresh index: %v", err))
}

result, err := ops.MergeQueue(ctx, idx, ops.QueueMergeOptions{
Strategy: queue.Strategy(c.Strategy),
MinStage: c.Stage,
DryRun: c.DryRun,
StopOnError: c.StopOnError,
NoRebase: c.NoRebase,
}, cliOpsLogger{})

// Print summary
fmt.Println()
if c.DryRun {
fmt.Println("Dry run completed.")
}

if len(result.Merged) > 0 {
printSuccess(fmt.Sprintf("Merged %d task(s): %s", len(result.Merged), strings.Join(result.Merged, ", ")))
}

if len(result.Rebased) > 0 {
printInfo(fmt.Sprintf("Rebased %d task(s): %s", len(result.Rebased), strings.Join(result.Rebased, ", ")))
}

if len(result.Failed) > 0 {
fmt.Println()
printWarning(fmt.Sprintf("Failed %d task(s):", len(result.Failed)))
for name, reason := range result.Failed {
fmt.Printf(" %s: %s\n", name, reason)
}
}

if len(result.Skipped) > 0 && !c.DryRun {
fmt.Printf("Skipped %d task(s): %s\n", len(result.Skipped), strings.Join(result.Skipped, ", "))
}

// Refresh integration cache after merges
if len(result.Merged) > 0 {
if err := idx.Refresh(ctx, index.RefreshPolicy{
Git: index.GitPolicy{
Mode: index.GitTasks,
Tasks: result.Merged,
IncludeIntegration: true,
},
}); err != nil {
printWarning(fmt.Sprintf("failed to refresh integration cache: %v", err))
}
}

return err
}
19 changes: 13 additions & 6 deletions pkg/task/index/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type ListItem struct {
Title string
FollowUp string
BaseBranch string
Priority int

TaskStatus task.TaskStatus
WorkerStatus task.WorkerStatus
Expand Down Expand Up @@ -47,6 +48,7 @@ type Record struct {
TaskStatus task.TaskStatus
WorkerStatus task.WorkerStatus
Stage string
Priority int

State *task.State
ProgressMeta *task.Progress
Expand All @@ -71,7 +73,7 @@ func (i *Index) ListAll(ctx context.Context) ([]ListItem, error) {
}
const q = `
SELECT
name, title, follow_up, base_branch,
name, title, follow_up, base_branch, priority,
task_status, worker_status, stage,
workspace, started_at_ns, last_error,
last_history_ns,
Expand All @@ -92,7 +94,7 @@ func (i *Index) ListOpen(ctx context.Context) ([]ListItem, error) {
}
const q = `
SELECT
name, title, follow_up, base_branch,
name, title, follow_up, base_branch, priority,
task_status, worker_status, stage,
workspace, started_at_ns, last_error,
last_history_ns,
Expand All @@ -114,7 +116,7 @@ func (i *Index) ListClosed(ctx context.Context) ([]ListItem, error) {
}
const q = `
SELECT
name, title, follow_up, base_branch,
name, title, follow_up, base_branch, priority,
task_status, worker_status, stage,
workspace, started_at_ns, last_error,
last_history_ns,
Expand All @@ -141,6 +143,7 @@ func (i *Index) queryList(ctx context.Context, q string) ([]ListItem, error) {
for rows.Next() {
var (
name, title, followUp, baseBranch string
priority int
taskStatus, workerStatus, stage string
workspace string
startedAtNS int64
Expand All @@ -155,7 +158,7 @@ func (i *Index) queryList(ctx context.Context, q string) ([]ListItem, error) {
integratedReason sql.NullString
)
if err := rows.Scan(
&name, &title, &followUp, &baseBranch,
&name, &title, &followUp, &baseBranch, &priority,
&taskStatus, &workerStatus, &stage,
&workspace, &startedAtNS, &lastError,
&lastHistoryNS,
Expand All @@ -173,6 +176,7 @@ func (i *Index) queryList(ctx context.Context, q string) ([]ListItem, error) {
Title: title,
FollowUp: followUp,
BaseBranch: baseBranch,
Priority: priority,
TaskStatus: task.TaskStatus(taskStatus),
WorkerStatus: task.ParseWorkerStatus(workerStatus),
Stage: stage,
Expand Down Expand Up @@ -210,7 +214,7 @@ func (i *Index) Get(ctx context.Context, taskName string) (Record, bool, error)
const q = `
SELECT
name, title, base_branch, follow_up, model, reasoning, description,
task_schema, task_status, worker_status, stage,
task_schema, priority, task_status, worker_status, stage,
workspace, started_at_ns, supervisor_pid, last_error,
last_history_ns,
tool_calls, last_active_ns,
Expand All @@ -226,6 +230,7 @@ WHERE name = ?;
var (
name, title, baseBranch, followUp, model, reasoning, description string
taskSchema int
priority int
taskStatus, workerStatus, stage string
workspace string
startedAtNS int64
Expand All @@ -243,7 +248,7 @@ WHERE name = ?;

err := i.db.QueryRowContext(ctx, q, taskName).Scan(
&name, &title, &baseBranch, &followUp, &model, &reasoning, &description,
&taskSchema, &taskStatus, &workerStatus, &stage,
&taskSchema, &priority, &taskStatus, &workerStatus, &stage,
&workspace, &startedAtNS, &supervisorPID, &lastError,
&lastHistoryNS,
&toolCalls, &lastActiveNS,
Expand All @@ -270,10 +275,12 @@ WHERE name = ?;
Reasoning: reasoning,
Schema: taskSchema,
Description: description,
Priority: priority,
},
TaskStatus: task.TaskStatus(taskStatus),
WorkerStatus: task.ParseWorkerStatus(workerStatus),
Stage: stage,
Priority: priority,
LastHistory: timeFromNS(lastHistoryNS),
ProgressDone: progressDone,
ProgressTotal: progressTotal,
Expand Down
Loading