Skip to content

Commit c41fa9c

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Propagating the otel context
This change ensures that the OpenTelemetry context is correctly propagated across asynchronous boundaries throughout the ADK, primarily within RxJava streams. ### Key Changes * **Context Propagation:** Replaces manual `Scope` management (which often fails in reactive code) with `.compose(Tracing.withContext(context))`. This ensures the OTel context is preserved when work moves between different threads or schedulers. * **`Runner` Refactoring:** * Adds a top-level `"invocation"` span to `runAsync` and `runLive` calls. * Captures the context at entry points and propagates it through the internal execution flow (`runAsyncImpl`, `runLiveImpl`, `runAgentWithFreshSession`). * **`BaseLlmFlow` & `Functions`:** Updates preprocessing, postprocessing, and tool execution logic to maintain context. This ensures that spans created within tools or processors are correctly parented. * **`PluginManager`:** Ensures that plugin callbacks (like `afterRunCallback` and `onEventCallback`) execute within the captured context. * **Testing:** Adds several unit tests across `BaseLlmFlowTest`, `FunctionsTest`, `PluginManagerTest`, and `RunnerTest` that specifically verify context propagation using `ContextKey` and `Schedulers.computation()`. ### Files Modified * **`BaseLlmFlow.java`**, **`Functions.java`**, **`PluginManager.java`**, **`Runner.java`**: Core logic updates for context propagation. * **`LlmAgentTest.java`**, **`BaseLlmFlowTest.java`**, **`FunctionsTest.java`**, **`PluginManagerTest.java`**, **`RunnerTest.java`**: New tests for OTel integration. * **`BUILD` files**: Updated dependencies for OpenTelemetry APIs and SDK testing. PiperOrigin-RevId: 881463869
1 parent 28a8cd0 commit c41fa9c

9 files changed

Lines changed: 601 additions & 234 deletions

File tree

core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java

Lines changed: 127 additions & 81 deletions
Large diffs are not rendered by default.

core/src/main/java/com/google/adk/flows/llmflows/Functions.java

Lines changed: 66 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import com.google.genai.types.Part;
4343
import io.opentelemetry.api.trace.Span;
4444
import io.opentelemetry.context.Context;
45-
import io.opentelemetry.context.Scope;
4645
import io.reactivex.rxjava3.core.Flowable;
4746
import io.reactivex.rxjava3.core.Maybe;
4847
import io.reactivex.rxjava3.core.Observable;
@@ -163,7 +162,9 @@ public static Maybe<Event> handleFunctionCalls(
163162
}
164163
return functionResponseEventsObservable
165164
.toList()
166-
.flatMapMaybe(
165+
.toMaybe()
166+
.compose(Tracing.withContext(parentContext))
167+
.flatMap(
167168
events -> {
168169
if (events.isEmpty()) {
169170
return Maybe.empty();
@@ -226,7 +227,9 @@ public static Maybe<Event> handleFunctionCallsLive(
226227

227228
return responseEventsObservable
228229
.toList()
229-
.flatMapMaybe(
230+
.toMaybe()
231+
.compose(Tracing.withContext(parentContext))
232+
.flatMap(
230233
events -> {
231234
if (events.isEmpty()) {
232235
return Maybe.empty();
@@ -243,47 +246,45 @@ private static Function<FunctionCall, Maybe<Event>> getFunctionCallMapper(
243246
Context parentContext) {
244247
return functionCall ->
245248
Maybe.defer(
246-
() -> {
247-
try (Scope scope = parentContext.makeCurrent()) {
248-
BaseTool tool = tools.get(functionCall.name().get());
249-
ToolContext toolContext =
250-
ToolContext.builder(invocationContext)
251-
.functionCallId(functionCall.id().orElse(""))
252-
.toolConfirmation(
253-
functionCall.id().map(toolConfirmations::get).orElse(null))
254-
.build();
255-
256-
Map<String, Object> functionArgs =
257-
functionCall.args().map(HashMap::new).orElse(new HashMap<>());
258-
259-
Maybe<Map<String, Object>> maybeFunctionResult =
260-
maybeInvokeBeforeToolCall(invocationContext, tool, functionArgs, toolContext)
261-
.switchIfEmpty(
262-
Maybe.defer(
263-
() -> {
264-
try (Scope innerScope = parentContext.makeCurrent()) {
265-
return isLive
266-
? processFunctionLive(
267-
invocationContext,
268-
tool,
269-
toolContext,
270-
functionCall,
271-
functionArgs,
272-
parentContext)
273-
: callTool(tool, functionArgs, toolContext, parentContext);
274-
}
275-
}));
276-
277-
return postProcessFunctionResult(
278-
maybeFunctionResult,
279-
invocationContext,
280-
tool,
281-
functionArgs,
282-
toolContext,
283-
isLive,
284-
parentContext);
285-
}
286-
});
249+
() -> {
250+
BaseTool tool = tools.get(functionCall.name().get());
251+
ToolContext toolContext =
252+
ToolContext.builder(invocationContext)
253+
.functionCallId(functionCall.id().orElse(""))
254+
.toolConfirmation(
255+
functionCall.id().map(toolConfirmations::get).orElse(null))
256+
.build();
257+
258+
Map<String, Object> functionArgs =
259+
functionCall.args().map(HashMap::new).orElse(new HashMap<>());
260+
261+
Maybe<Map<String, Object>> maybeFunctionResult =
262+
maybeInvokeBeforeToolCall(invocationContext, tool, functionArgs, toolContext)
263+
.switchIfEmpty(
264+
Maybe.defer(
265+
() ->
266+
isLive
267+
? processFunctionLive(
268+
invocationContext,
269+
tool,
270+
toolContext,
271+
functionCall,
272+
functionArgs,
273+
parentContext)
274+
: callTool(
275+
tool, functionArgs, toolContext, parentContext))
276+
.compose(Tracing.withContext(parentContext)));
277+
278+
return postProcessFunctionResult(
279+
maybeFunctionResult,
280+
invocationContext,
281+
tool,
282+
functionArgs,
283+
toolContext,
284+
isLive,
285+
parentContext);
286+
})
287+
.compose(Tracing.withContext(parentContext));
287288
}
288289

289290
/**
@@ -410,34 +411,27 @@ private static Maybe<Event> postProcessFunctionResult(
410411
})
411412
.flatMapMaybe(
412413
optionalInitialResult -> {
413-
try (Scope scope = parentContext.makeCurrent()) {
414-
Map<String, Object> initialFunctionResult = optionalInitialResult.orElse(null);
415-
416-
return maybeInvokeAfterToolCall(
417-
invocationContext, tool, functionArgs, toolContext, initialFunctionResult)
418-
.map(Optional::of)
419-
.defaultIfEmpty(Optional.ofNullable(initialFunctionResult))
420-
.flatMapMaybe(
421-
finalOptionalResult -> {
422-
Map<String, Object> finalFunctionResult =
423-
finalOptionalResult.orElse(null);
424-
if (tool.longRunning() && finalFunctionResult == null) {
425-
return Maybe.empty();
426-
}
427-
return Maybe.fromCallable(
428-
() ->
429-
buildResponseEvent(
430-
tool,
431-
finalFunctionResult,
432-
toolContext,
433-
invocationContext))
434-
.compose(
435-
Tracing.<Event>trace("tool_response [" + tool.name() + "]")
436-
.setParent(parentContext))
437-
.doOnSuccess(event -> Tracing.traceToolResponse(event.id(), event));
438-
});
439-
}
440-
});
414+
Map<String, Object> initialFunctionResult = optionalInitialResult.orElse(null);
415+
416+
return maybeInvokeAfterToolCall(
417+
invocationContext, tool, functionArgs, toolContext, initialFunctionResult)
418+
.map(Optional::of)
419+
.defaultIfEmpty(Optional.ofNullable(initialFunctionResult))
420+
.flatMapMaybe(
421+
finalOptionalResult -> {
422+
Map<String, Object> finalFunctionResult = finalOptionalResult.orElse(null);
423+
if (tool.longRunning() && finalFunctionResult == null) {
424+
return Maybe.empty();
425+
}
426+
Event event =
427+
buildResponseEvent(
428+
tool, finalFunctionResult, toolContext, invocationContext);
429+
Tracing.traceToolResponse(event.id(), event);
430+
return Maybe.just(event);
431+
});
432+
})
433+
.compose(
434+
Tracing.<Event>trace("tool_response [" + tool.name() + "]").setParent(parentContext));
441435
}
442436

443437
private static Optional<Event> mergeParallelFunctionResponseEvents(

core/src/main/java/com/google/adk/flows/llmflows/RequestConfirmationLlmRequestProcessor.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.adk.events.Event;
3030
import com.google.adk.events.ToolConfirmation;
3131
import com.google.adk.models.LlmRequest;
32+
import com.google.adk.telemetry.Tracing;
3233
import com.google.adk.tools.BaseTool;
3334
import com.google.common.collect.ImmutableList;
3435
import com.google.common.collect.ImmutableMap;
@@ -37,6 +38,7 @@
3738
import com.google.genai.types.FunctionCall;
3839
import com.google.genai.types.FunctionResponse;
3940
import com.google.genai.types.Part;
41+
import io.opentelemetry.context.Context;
4042
import io.reactivex.rxjava3.core.Maybe;
4143
import io.reactivex.rxjava3.core.Single;
4244
import java.util.Collection;
@@ -216,10 +218,13 @@ private Maybe<Event> assembleEvent(
216218
.build())
217219
.build();
218220

219-
return toolsMapSingle.flatMapMaybe(
220-
toolsMap ->
221-
Functions.handleFunctionCalls(
222-
invocationContext, functionCallEvent, toolsMap, toolConfirmations));
221+
Context parentContext = Context.current();
222+
return toolsMapSingle
223+
.flatMapMaybe(
224+
toolsMap ->
225+
Functions.handleFunctionCalls(
226+
invocationContext, functionCallEvent, toolsMap, toolConfirmations))
227+
.compose(Tracing.withContext(parentContext));
223228
}
224229

225230
private static Optional<Map.Entry<String, ToolConfirmation>> maybeCreateToolConfirmationEntry(

core/src/main/java/com/google/adk/plugins/PluginManager.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import com.google.adk.events.Event;
2222
import com.google.adk.models.LlmRequest;
2323
import com.google.adk.models.LlmResponse;
24+
import com.google.adk.telemetry.Tracing;
2425
import com.google.adk.tools.BaseTool;
2526
import com.google.adk.tools.ToolContext;
2627
import com.google.common.annotations.VisibleForTesting;
2728
import com.google.common.collect.ImmutableList;
2829
import com.google.genai.types.Content;
30+
import io.opentelemetry.context.Context;
2931
import io.reactivex.rxjava3.core.Completable;
3032
import io.reactivex.rxjava3.core.Flowable;
3133
import io.reactivex.rxjava3.core.Maybe;
@@ -126,6 +128,7 @@ public Maybe<Content> beforeRunCallback(InvocationContext invocationContext) {
126128

127129
@Override
128130
public Completable afterRunCallback(InvocationContext invocationContext) {
131+
Context capturedContext = Context.current();
129132
return Flowable.fromIterable(plugins)
130133
.concatMapCompletable(
131134
plugin ->
@@ -136,11 +139,13 @@ public Completable afterRunCallback(InvocationContext invocationContext) {
136139
logger.error(
137140
"[{}] Error during callback 'afterRunCallback'",
138141
plugin.getName(),
139-
e)));
142+
e)))
143+
.compose(Tracing.withContext(capturedContext));
140144
}
141145

142146
@Override
143147
public Completable close() {
148+
Context capturedContext = Context.current();
144149
return Flowable.fromIterable(plugins)
145150
.concatMapCompletableDelayError(
146151
plugin ->
@@ -149,7 +154,8 @@ public Completable close() {
149154
.doOnError(
150155
e ->
151156
logger.error(
152-
"[{}] Error during callback 'close'", plugin.getName(), e)));
157+
"[{}] Error during callback 'close'", plugin.getName(), e)))
158+
.compose(Tracing.withContext(capturedContext));
153159
}
154160

155161
@Override
@@ -227,7 +233,7 @@ public Maybe<Map<String, Object>> onToolErrorCallback(
227233
*/
228234
private <T> Maybe<T> runMaybeCallbacks(
229235
Function<Plugin, Maybe<T>> callbackExecutor, String callbackName) {
230-
236+
Context capturedContext = Context.current();
231237
return Flowable.fromIterable(this.plugins)
232238
.concatMapMaybe(
233239
plugin ->
@@ -247,6 +253,7 @@ private <T> Maybe<T> runMaybeCallbacks(
247253
plugin.getName(),
248254
callbackName,
249255
e)))
250-
.firstElement();
256+
.firstElement()
257+
.compose(Tracing.withContext(capturedContext));
251258
}
252259
}

0 commit comments

Comments
 (0)