diff --git a/pkg/cli/runner.go b/pkg/cli/runner.go index 8201adc8d..b3842ab86 100644 --- a/pkg/cli/runner.go +++ b/pkg/cli/runner.go @@ -39,6 +39,8 @@ type Config struct { AutoApprove bool HideToolCalls bool OutputJSON bool + ExecutionMode string + MaxIterations int } // Run executes an agent in non-TUI mode, handling user input and runtime events @@ -53,6 +55,26 @@ func Run(ctx context.Context, out *Printer, cfg Config, rt runtime.Runtime, sess ctx = telemetry.WithClient(ctx, telemetryClient) } + var orch runtime.Orchestrator + + switch cfg.ExecutionMode { + case "loop": + orch = runtime.NewLoopAgentOrchestrator( + runtime.NewSingleAgentOrchestrator(rt), + cfg.MaxIterations, + func(ctx context.Context, iter int, events []runtime.Event) bool { + // você pode customizar a condição de saída, por agora false (loop até MaxIterations) + return false + }, + ) + case "sequential": + orch = runtime.NewSequentialAgentOrchestrator(rt) + case "parallel": + orch = runtime.NewParallelAgentOrchestrator(rt) + default: + orch = runtime.NewSingleAgentOrchestrator(rt) + } + sess.Title = "Running agent" // If the last received event was an error, return it. That way the exit code // will be non-zero if the agent failed. @@ -67,7 +89,7 @@ func Run(ctx context.Context, out *Printer, cfg Config, rt runtime.Runtime, sess sess.AddMessage(PrepareUserMessage(ctx, rt, userInput, cfg.AttachmentPath)) if cfg.OutputJSON { - for event := range rt.RunStream(ctx, sess) { + for event := range orch.Run(ctx, sess) { switch e := event.(type) { case *runtime.ToolCallConfirmationEvent: if !cfg.AutoApprove { @@ -90,7 +112,7 @@ func Run(ctx context.Context, out *Printer, cfg Config, rt runtime.Runtime, sess firstLoop := true lastAgent := rt.CurrentAgentName() var lastConfirmedToolCallID string - for event := range rt.RunStream(ctx, sess) { + for event := range orch.Run(ctx, sess) { agentName := event.GetAgentName() if agentName != "" && (firstLoop || lastAgent != agentName) { if !firstLoop { diff --git a/pkg/runtime/orchestrator.go b/pkg/runtime/orchestrator.go new file mode 100644 index 000000000..9224d5bc6 --- /dev/null +++ b/pkg/runtime/orchestrator.go @@ -0,0 +1,12 @@ +package runtime + +import ( + "context" + + "github.com/docker/cagent/pkg/session" +) + +// Orchestrator defines how agents are executed. +type Orchestrator interface { + Run(ctx context.Context, sess *session.Session) <-chan Event +} diff --git a/pkg/runtime/orchestrator_loop.go b/pkg/runtime/orchestrator_loop.go new file mode 100644 index 000000000..9be3d83a2 --- /dev/null +++ b/pkg/runtime/orchestrator_loop.go @@ -0,0 +1,82 @@ +package runtime + +import ( + "context" + + "github.com/docker/cagent/pkg/session" +) + +// ExitCondition decides whether the loop should stop. +type ExitCondition func( + ctx context.Context, + iteration int, + events []Event, +) bool + +// LoopAgentOrchestrator runs an orchestrator repeatedly until an exit condition is met. +type LoopAgentOrchestrator struct { + body Orchestrator + maxIterations int + exitConditions []ExitCondition +} + +// NewLoopAgentOrchestrator creates a loop orchestrator. +// maxIterations <= 0 means unlimited. +func NewLoopAgentOrchestrator( + body Orchestrator, + maxIterations int, + exitConditions ...ExitCondition, +) *LoopAgentOrchestrator { + return &LoopAgentOrchestrator{ + body: body, + maxIterations: maxIterations, + exitConditions: exitConditions, + } +} + +func (o *LoopAgentOrchestrator) Run( + ctx context.Context, + sess *session.Session, +) <-chan Event { + out := make(chan Event) + + go func() { + defer close(out) + + iteration := 0 + + for { + if ctx.Err() != nil { + return + } + + if o.maxIterations > 0 && iteration >= o.maxIterations { + return + } + + events := o.body.Run(ctx, sess) + + var collected []Event + + for ev := range events { + collected = append(collected, ev) + + select { + case out <- ev: + case <-ctx.Done(): + return + } + } + + for _, cond := range o.exitConditions { + if cond(ctx, iteration, collected) { + return + } + } + + iteration++ + } + }() + + return out +} diff --git a/pkg/runtime/orchestrator_parallel.go b/pkg/runtime/orchestrator_parallel.go new file mode 100644 index 000000000..bf68fe37a --- /dev/null +++ b/pkg/runtime/orchestrator_parallel.go @@ -0,0 +1,72 @@ +package runtime + +import ( + "context" + "sync" + + "github.com/docker/cagent/pkg/session" +) + +// ParallelAgentOrchestrator runs agents in parallel but emits events +// in the order of the runtimes. +type ParallelAgentOrchestrator struct { + runtimes []Runtime +} + +func NewParallelAgentOrchestrator(rts ...Runtime) *ParallelAgentOrchestrator { + return &ParallelAgentOrchestrator{runtimes: rts} +} + +func (o *ParallelAgentOrchestrator) Run( + ctx context.Context, + sess *session.Session, +) <-chan Event { + out := make(chan Event) + + type stream struct { + events <-chan Event + } + + streams := make([]stream, len(o.runtimes)) + + var wg sync.WaitGroup + wg.Add(len(o.runtimes)) + + for i, rt := range o.runtimes { + ch := make(chan Event) + streams[i] = stream{events: ch} + + go func(rt Runtime, out chan Event) { + defer wg.Done() + defer close(out) + + events := rt.RunStream(ctx, sess) + for event := range events { + select { + case out <- event: + case <-ctx.Done(): + return + } + } + }(rt, ch) + } + + go func() { + defer close(out) + + // Emit strictly in runtime order + for _, s := range streams { + for event := range s.events { + select { + case out <- event: + case <-ctx.Done(): + return + } + } + } + + wg.Wait() + }() + + return out +} diff --git a/pkg/runtime/orchestrator_sequential.go b/pkg/runtime/orchestrator_sequential.go new file mode 100644 index 000000000..f1ff240be --- /dev/null +++ b/pkg/runtime/orchestrator_sequential.go @@ -0,0 +1,40 @@ +package runtime + +import ( + "context" + + "github.com/docker/cagent/pkg/session" +) + +// SequentialAgentOrchestrator runs agents one after another. +type SequentialAgentOrchestrator struct { + runtimes []Runtime +} + +func NewSequentialAgentOrchestrator(rts ...Runtime) *SequentialAgentOrchestrator { + return &SequentialAgentOrchestrator{runtimes: rts} +} + +func (o *SequentialAgentOrchestrator) Run( + ctx context.Context, + sess *session.Session, +) <-chan Event { + out := make(chan Event) + + go func() { + defer close(out) + + for _, rt := range o.runtimes { + events := rt.RunStream(ctx, sess) + for event := range events { + select { + case out <- event: + case <-ctx.Done(): + return + } + } + } + }() + + return out +} diff --git a/pkg/runtime/orchestrator_single.go b/pkg/runtime/orchestrator_single.go new file mode 100644 index 000000000..c1ac87749 --- /dev/null +++ b/pkg/runtime/orchestrator_single.go @@ -0,0 +1,23 @@ +package runtime + +import ( + "context" + + "github.com/docker/cagent/pkg/session" +) + +// SingleAgentOrchestrator runs exactly one agent (current behavior) +type SingleAgentOrchestrator struct { + rt Runtime +} + +func NewSingleAgentOrchestrator(rt Runtime) *SingleAgentOrchestrator { + return &SingleAgentOrchestrator{rt: rt} +} + +func (o *SingleAgentOrchestrator) Run( + ctx context.Context, + sess *session.Session, +) <-chan Event { + return o.rt.RunStream(ctx, sess) +} diff --git a/pkg/runtime/runtime_orchestrator_test.go b/pkg/runtime/runtime_orchestrator_test.go new file mode 100644 index 000000000..1fde703b8 --- /dev/null +++ b/pkg/runtime/runtime_orchestrator_test.go @@ -0,0 +1,62 @@ +package runtime_test + +import ( + "context" + "testing" + + "github.com/docker/cagent/pkg/runtime" + "github.com/docker/cagent/pkg/session" + "github.com/docker/cagent/pkg/tools" +) + +// ------------------------ +// fakeRuntime implements runtime.Runtime for testing. +// ------------------------ +type fakeRuntime struct { + events []runtime.Event +} + +func (f *fakeRuntime) RunStream(ctx context.Context, _ *session.Session) <-chan runtime.Event { + ch := make(chan runtime.Event) + go func() { + defer close(ch) + for _, ev := range f.events { + select { + case ch <- ev: + case <-ctx.Done(): + return + } + } + }() + return ch +} + +func (f *fakeRuntime) Resume(ctx context.Context, _ runtime.ResumeType) {} + +func (f *fakeRuntime) CurrentAgentInfo(ctx context.Context) runtime.CurrentAgentInfo { + return runtime.CurrentAgentInfo{Name: "test-agent"} +} + +func (f *fakeRuntime) CurrentAgentName() string { return "test-agent" } + +func (f *fakeRuntime) CurrentAgentTools(ctx context.Context) ([]tools.Tool, error) { + return nil, nil +} + +func (f *fakeRuntime) EmitStartupInfo(ctx context.Context, ch chan runtime.Event) {} + +func (f *fakeRuntime) ResetStartupInfo() {} + +func TestFakeRuntime_RunStream_smoke(t *testing.T) { + ctx := t.Context() + + fr := &fakeRuntime{ + events: []runtime.Event{}, + } + + ch := fr.RunStream(ctx, nil) + + // drain channel to avoid goroutine leak + for range ch { + } +}