From d766ffab6bfafcc65b26be79400aaa96b7a98f3e Mon Sep 17 00:00:00 2001 From: Jinni Gu Date: Sat, 22 Nov 2025 00:21:41 -0800 Subject: [PATCH 1/4] support parallel agent --- .../com/google/adk/agents/ParallelAgent.java | 19 +++- .../google/adk/agents/ParallelAgentTest.java | 86 ++++++++++++++++++- 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/google/adk/agents/ParallelAgent.java b/core/src/main/java/com/google/adk/agents/ParallelAgent.java index 4bfd0b255..52f6f3840 100644 --- a/core/src/main/java/com/google/adk/agents/ParallelAgent.java +++ b/core/src/main/java/com/google/adk/agents/ParallelAgent.java @@ -20,6 +20,8 @@ import com.google.adk.agents.ConfigAgentUtils.ConfigurationException; import com.google.adk.events.Event; import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -35,6 +37,7 @@ public class ParallelAgent extends BaseAgent { private static final Logger logger = LoggerFactory.getLogger(ParallelAgent.class); + private final Scheduler scheduler; /** * Constructor for ParallelAgent. @@ -44,24 +47,34 @@ public class ParallelAgent extends BaseAgent { * @param subAgents The list of sub-agents to run in parallel. * @param beforeAgentCallback Optional callback before the agent runs. * @param afterAgentCallback Optional callback after the agent runs. + * @param scheduler The scheduler to use for parallel execution. */ private ParallelAgent( String name, String description, List subAgents, List beforeAgentCallback, - List afterAgentCallback) { + List afterAgentCallback, + Scheduler scheduler) { super(name, description, subAgents, beforeAgentCallback, afterAgentCallback); + this.scheduler = scheduler; } /** Builder for {@link ParallelAgent}. */ public static class Builder extends BaseAgent.Builder { + private Scheduler scheduler = Schedulers.io(); + + public Builder scheduler(Scheduler scheduler) { + this.scheduler = scheduler; + return this; + } + @Override public ParallelAgent build() { return new ParallelAgent( - name, description, subAgents, beforeAgentCallback, afterAgentCallback); + name, description, subAgents, beforeAgentCallback, afterAgentCallback, scheduler); } } @@ -131,7 +144,7 @@ protected Flowable runAsyncImpl(InvocationContext invocationContext) { List> agentFlowables = new ArrayList<>(); for (BaseAgent subAgent : currentSubAgents) { - agentFlowables.add(subAgent.runAsync(invocationContext)); + agentFlowables.add(subAgent.runAsync(invocationContext).subscribeOn(scheduler)); } return Flowable.merge(agentFlowables); } diff --git a/core/src/test/java/com/google/adk/agents/ParallelAgentTest.java b/core/src/test/java/com/google/adk/agents/ParallelAgentTest.java index a6afb5793..e51240c45 100644 --- a/core/src/test/java/com/google/adk/agents/ParallelAgentTest.java +++ b/core/src/test/java/com/google/adk/agents/ParallelAgentTest.java @@ -25,7 +25,10 @@ import com.google.genai.types.Content; import com.google.genai.types.Part; import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.schedulers.TestScheduler; +import io.reactivex.rxjava3.subscribers.TestSubscriber; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,10 +39,16 @@ public final class ParallelAgentTest { static class TestingAgent extends BaseAgent { private final long delayMillis; + private final Scheduler scheduler; private TestingAgent(String name, String description, long delayMillis) { + this(name, description, delayMillis, Schedulers.computation()); + } + + private TestingAgent(String name, String description, long delayMillis, Scheduler scheduler) { super(name, description, ImmutableList.of(), null, null); this.delayMillis = delayMillis; + this.scheduler = scheduler; } @Override @@ -55,7 +64,7 @@ protected Flowable runAsyncImpl(InvocationContext invocationContext) { .build()); if (delayMillis > 0) { - return event.delay(delayMillis, MILLISECONDS, Schedulers.computation()); + return event.delay(delayMillis, MILLISECONDS, scheduler); } return event; } @@ -110,4 +119,79 @@ public void runAsync_noSubAgents_returnsEmptyFlowable() { assertThat(events).isEmpty(); } + + static class BlockingAgent extends BaseAgent { + private final long sleepMillis; + + private BlockingAgent(String name, long sleepMillis) { + super(name, "Blocking Agent", ImmutableList.of(), null, null); + this.sleepMillis = sleepMillis; + } + + @Override + protected Flowable runAsyncImpl(InvocationContext invocationContext) { + return Flowable.fromCallable( + () -> { + Thread.sleep(sleepMillis); + return Event.builder() + .author(name()) + .branch(invocationContext.branch().orElse(null)) + .invocationId(invocationContext.invocationId()) + .content(Content.fromParts(Part.fromText("Done"))) + .build(); + }); + } + + @Override + protected Flowable runLiveImpl(InvocationContext invocationContext) { + throw new UnsupportedOperationException("Not implemented"); + } + } + + @Test + public void runAsync_blockingSubAgents_shouldExecuteInParallel() { + long sleepTime = 1000; + BlockingAgent agent1 = new BlockingAgent("agent1", sleepTime); + BlockingAgent agent2 = new BlockingAgent("agent2", sleepTime); + + ParallelAgent parallelAgent = + ParallelAgent.builder().name("parallel_agent").subAgents(agent1, agent2).build(); + + InvocationContext invocationContext = createInvocationContext(parallelAgent); + + long startTime = System.currentTimeMillis(); + List events = parallelAgent.runAsync(invocationContext).toList().blockingGet(); + long duration = System.currentTimeMillis() - startTime; + + assertThat(events).hasSize(2); + // If parallel, duration should be less than 1.5 * sleepTime (1500ms). + assertThat(duration).isAtLeast(sleepTime); + assertThat(duration).isLessThan((long) (1.5 * sleepTime)); + } + + @Test + public void runAsync_withTestScheduler_usesVirtualTime() { + TestScheduler testScheduler = new TestScheduler(); + long delayMillis = 1000; + TestingAgent agent = + new TestingAgent("delayed_agent", "Delayed Agent", delayMillis, testScheduler); + + ParallelAgent parallelAgent = + ParallelAgent.builder() + .name("parallel_agent") + .subAgents(agent) + .scheduler(testScheduler) + .build(); + + InvocationContext invocationContext = createInvocationContext(parallelAgent); + + TestSubscriber testSubscriber = parallelAgent.runAsync(invocationContext).test(); + + testScheduler.advanceTimeBy(delayMillis - 100, MILLISECONDS); + testSubscriber.assertNoValues(); + testSubscriber.assertNotComplete(); + testScheduler.advanceTimeBy(200, MILLISECONDS); + testSubscriber.assertValueCount(1); + testSubscriber.assertComplete(); + } } From 68765053587aa3902dfd8850395fc62b628128c1 Mon Sep 17 00:00:00 2001 From: Guillaume Laforge Date: Mon, 23 Mar 2026 12:20:00 +0100 Subject: [PATCH 2/4] feat(agents): implement ParallelAgent and fix branch initialization in BaseAgent --- .../java/com/google/adk/agents/BaseAgent.java | 10 +++++---- .../com/google/adk/agents/ParallelAgent.java | 22 ------------------- 2 files changed, 6 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/com/google/adk/agents/BaseAgent.java b/core/src/main/java/com/google/adk/agents/BaseAgent.java index 95fe838cc..59f0a150c 100644 --- a/core/src/main/java/com/google/adk/agents/BaseAgent.java +++ b/core/src/main/java/com/google/adk/agents/BaseAgent.java @@ -285,10 +285,12 @@ private InvocationContext createInvocationContext(InvocationContext parentContex InvocationContext.Builder builder = parentContext.toBuilder(); builder.agent(this); // Check for branch to be truthy (not None, not empty string), - parentContext - .branch() - .filter(s -> !s.isEmpty()) - .ifPresent(branch -> builder.branch(branch + "." + name())); + String parentBranch = parentContext.branch().filter(s -> !s.isEmpty()).orElse(null); + if (parentBranch == null) { + builder.branch(name()); + } else { + builder.branch(parentBranch + "." + name()); + } return builder.build(); } diff --git a/core/src/main/java/com/google/adk/agents/ParallelAgent.java b/core/src/main/java/com/google/adk/agents/ParallelAgent.java index 091368ff0..8959ed3b7 100644 --- a/core/src/main/java/com/google/adk/agents/ParallelAgent.java +++ b/core/src/main/java/com/google/adk/agents/ParallelAgent.java @@ -15,9 +15,6 @@ */ package com.google.adk.agents; -import static com.google.common.base.Strings.isNullOrEmpty; -import static com.google.common.collect.ImmutableList.toImmutableList; - import com.google.adk.agents.ConfigAgentUtils.ConfigurationException; import com.google.adk.events.Event; import io.reactivex.rxjava3.core.Flowable; @@ -108,25 +105,6 @@ public static ParallelAgent fromConfig(ParallelAgentConfig config, String config return agent; } - /** - * Sets the branch for the current agent in the invocation context. - * - *

Appends the agent name to the current branch, or sets it if undefined. - * - * @param currentAgent Current agent. - * @param invocationContext Invocation context to update. - * @return A new invocation context with branch set. - */ - private static InvocationContext setBranchForCurrentAgent( - BaseAgent currentAgent, InvocationContext invocationContext) { - String branch = invocationContext.branch().orElse(null); - if (isNullOrEmpty(branch)) { - return invocationContext.toBuilder().branch(currentAgent.name()).build(); - } else { - return invocationContext.toBuilder().branch(branch + "." + currentAgent.name()).build(); - } - } - /** * Runs sub-agents in parallel and emits their events. * From a6d95e29fff67e29a43a62fac58dba6b2a4da217 Mon Sep 17 00:00:00 2001 From: Guillaume Laforge Date: Mon, 23 Mar 2026 17:10:40 +0100 Subject: [PATCH 3/4] feat(agents): add branch propagation logic to ParallelAgent for invocation context management --- .../com/google/adk/agents/ParallelAgent.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/google/adk/agents/ParallelAgent.java b/core/src/main/java/com/google/adk/agents/ParallelAgent.java index 8959ed3b7..0a53fbec9 100644 --- a/core/src/main/java/com/google/adk/agents/ParallelAgent.java +++ b/core/src/main/java/com/google/adk/agents/ParallelAgent.java @@ -15,6 +15,8 @@ */ package com.google.adk.agents; +import static com.google.common.base.Strings.isNullOrEmpty; + import com.google.adk.agents.ConfigAgentUtils.ConfigurationException; import com.google.adk.events.Event; import io.reactivex.rxjava3.core.Flowable; @@ -105,6 +107,25 @@ public static ParallelAgent fromConfig(ParallelAgentConfig config, String config return agent; } + /** + * Sets the branch for the current agent in the invocation context. + * + *

Appends the agent name to the current branch, or sets it if undefined. + * + * @param currentAgent Current agent. + * @param invocationContext Invocation context to update. + * @return A new invocation context with branch set. + */ + private static InvocationContext setBranchForCurrentAgent( + BaseAgent currentAgent, InvocationContext invocationContext) { + String branch = invocationContext.branch().orElse(null); + if (isNullOrEmpty(branch)) { + return invocationContext.toBuilder().branch(currentAgent.name()).build(); + } else { + return invocationContext.toBuilder().branch(branch + "." + currentAgent.name()).build(); + } + } + /** * Runs sub-agents in parallel and emits their events. * @@ -120,9 +141,10 @@ protected Flowable runAsyncImpl(InvocationContext invocationContext) { return Flowable.empty(); } + var updatedInvocationContext = setBranchForCurrentAgent(this, invocationContext); List> agentFlowables = new ArrayList<>(); for (BaseAgent subAgent : currentSubAgents) { - agentFlowables.add(subAgent.runAsync(invocationContext).subscribeOn(scheduler)); + agentFlowables.add(subAgent.runAsync(updatedInvocationContext).subscribeOn(scheduler)); } return Flowable.merge(agentFlowables); } From 3dfa93a5a2a16fd316e4aea216f444511ed9bac1 Mon Sep 17 00:00:00 2001 From: Guillaume Laforge Date: Mon, 23 Mar 2026 17:12:47 +0100 Subject: [PATCH 4/4] feat(agents): restore branch name logic in BaseAgent --- .../src/main/java/com/google/adk/agents/BaseAgent.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/com/google/adk/agents/BaseAgent.java b/core/src/main/java/com/google/adk/agents/BaseAgent.java index 59f0a150c..95fe838cc 100644 --- a/core/src/main/java/com/google/adk/agents/BaseAgent.java +++ b/core/src/main/java/com/google/adk/agents/BaseAgent.java @@ -285,12 +285,10 @@ private InvocationContext createInvocationContext(InvocationContext parentContex InvocationContext.Builder builder = parentContext.toBuilder(); builder.agent(this); // Check for branch to be truthy (not None, not empty string), - String parentBranch = parentContext.branch().filter(s -> !s.isEmpty()).orElse(null); - if (parentBranch == null) { - builder.branch(name()); - } else { - builder.branch(parentBranch + "." + name()); - } + parentContext + .branch() + .filter(s -> !s.isEmpty()) + .ifPresent(branch -> builder.branch(branch + "." + name())); return builder.build(); }