diff --git a/core/src/main/java/com/google/adk/agents/BaseAgent.java b/core/src/main/java/com/google/adk/agents/BaseAgent.java index 00676ec31..ed6631c50 100644 --- a/core/src/main/java/com/google/adk/agents/BaseAgent.java +++ b/core/src/main/java/com/google/adk/agents/BaseAgent.java @@ -29,6 +29,7 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.errorprone.annotations.DoNotCall; import com.google.genai.types.Content; +import io.opentelemetry.context.Context; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Maybe; @@ -311,6 +312,7 @@ public Flowable runAsync(InvocationContext parentContext) { private Flowable run( InvocationContext parentContext, Function> runImplementation) { + Context parentSpanContext = Context.current(); return Flowable.defer( () -> { InvocationContext invocationContext = createInvocationContext(parentContext); @@ -339,8 +341,12 @@ private Flowable run( }) .switchIfEmpty(mainAndAfterEvents) .compose( - Tracing.traceAgent( - "invoke_agent " + name(), name(), description(), invocationContext)); + Tracing.trace("invoke_agent " + name()) + .setParent(parentSpanContext) + .configure( + span -> + Tracing.traceAgentInvocation( + span, name(), description(), invocationContext))); }); } diff --git a/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java b/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java index e1afca2b1..79066b213 100644 --- a/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java +++ b/core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java @@ -164,7 +164,10 @@ protected Flowable postprocess( * callbacks. Callbacks should not rely on its ID if they create their own separate events. */ private Flowable callLlm( - InvocationContext context, LlmRequest llmRequest, Event eventForCallbackUsage) { + Context spanContext, + InvocationContext context, + LlmRequest llmRequest, + Event eventForCallbackUsage) { LlmAgent agent = (LlmAgent) context.agent(); LlmRequest.Builder llmRequestBuilder = llmRequest.toBuilder(); @@ -200,7 +203,7 @@ private Flowable callLlm( span.setStatus(StatusCode.ERROR, error.getMessage()); span.recordException(error); }) - .compose(Tracing.trace("call_llm")) + .compose(Tracing.trace("call_llm").setParent(spanContext)) .concatMap( llmResp -> handleAfterModelCallback(context, llmResp, eventForCallbackUsage) @@ -319,7 +322,7 @@ private Single handleAfterModelCallback( * @throws LlmCallsLimitExceededException if the agent exceeds allowed LLM invocations. * @throws IllegalStateException if a transfer agent is specified but not found. */ - private Flowable runOneStep(InvocationContext context) { + private Flowable runOneStep(Context spanContext, InvocationContext context) { AtomicReference llmRequestRef = new AtomicReference<>(LlmRequest.builder().build()); return Flowable.defer( @@ -351,7 +354,11 @@ private Flowable runOneStep(InvocationContext context) { .build(); mutableEventTemplate.setTimestamp(0L); - return callLlm(context, llmRequestAfterPreprocess, mutableEventTemplate) + return callLlm( + spanContext, + context, + llmRequestAfterPreprocess, + mutableEventTemplate) .concatMap( llmResponse -> { try (Scope postScope = currentContext.makeCurrent()) { @@ -403,11 +410,12 @@ private Flowable runOneStep(InvocationContext context) { */ @Override public Flowable run(InvocationContext invocationContext) { - return run(invocationContext, 0); + return run(Context.current(), invocationContext, 0); } - private Flowable run(InvocationContext invocationContext, int stepsCompleted) { - Flowable currentStepEvents = runOneStep(invocationContext).cache(); + private Flowable run( + Context spanContext, InvocationContext invocationContext, int stepsCompleted) { + Flowable currentStepEvents = runOneStep(spanContext, invocationContext).cache(); if (stepsCompleted + 1 >= maxSteps) { logger.debug("Ending flow execution because max steps reached."); return currentStepEvents; @@ -427,7 +435,7 @@ private Flowable run(InvocationContext invocationContext, int stepsComple return Flowable.empty(); } else { logger.debug("Continuing to next step of the flow."); - return run(invocationContext, stepsCompleted + 1); + return run(spanContext, invocationContext, stepsCompleted + 1); } })); } @@ -444,6 +452,7 @@ private Flowable run(InvocationContext invocationContext, int stepsComple public Flowable runLive(InvocationContext invocationContext) { AtomicReference llmRequestRef = new AtomicReference<>(LlmRequest.builder().build()); Flowable preprocessEvents = preprocess(invocationContext, llmRequestRef); + Context spanContext = Context.current(); return preprocessEvents.concatWith( Flowable.defer( @@ -481,7 +490,7 @@ public Flowable runLive(InvocationContext invocationContext) { eventIdForSendData, llmRequestAfterPreprocess.contents()); }) - .compose(Tracing.trace("send_data")); + .compose(Tracing.trace("send_data").setParent(spanContext)); Flowable liveRequests = invocationContext diff --git a/core/src/main/java/com/google/adk/flows/llmflows/Functions.java b/core/src/main/java/com/google/adk/flows/llmflows/Functions.java index ecc2bb412..c1a996064 100644 --- a/core/src/main/java/com/google/adk/flows/llmflows/Functions.java +++ b/core/src/main/java/com/google/adk/flows/llmflows/Functions.java @@ -178,7 +178,7 @@ public static Maybe handleFunctionCalls( if (events.size() > 1) { return Maybe.just(mergedEvent) .doOnSuccess(event -> Tracing.traceToolResponse(event.id(), event)) - .compose(Tracing.trace("tool_response", parentContext)); + .compose(Tracing.trace("tool_response").setParent(parentContext)); } return Maybe.just(mergedEvent); }); @@ -432,8 +432,8 @@ private static Maybe postProcessFunctionResult( toolContext, invocationContext)) .compose( - Tracing.trace( - "tool_response [" + tool.name() + "]", parentContext)) + Tracing.trace("tool_response [" + tool.name() + "]") + .setParent(parentContext)) .doOnSuccess(event -> Tracing.traceToolResponse(event.id(), event)); }); } @@ -593,7 +593,9 @@ private static Maybe> callTool( Tracing.traceToolCall( tool.name(), tool.description(), tool.getClass().getSimpleName(), args)) .doOnError(t -> Span.current().recordException(t)) - .compose(Tracing.trace("tool_call [" + tool.name() + "]", parentContext)) + .compose( + Tracing.>trace("tool_call [" + tool.name() + "]") + .setParent(parentContext)) .onErrorResumeNext( e -> Maybe.error( diff --git a/core/src/main/java/com/google/adk/telemetry/Tracing.java b/core/src/main/java/com/google/adk/telemetry/Tracing.java index 07a640c37..fc2ca3abf 100644 --- a/core/src/main/java/com/google/adk/telemetry/Tracing.java +++ b/core/src/main/java/com/google/adk/telemetry/Tracing.java @@ -426,19 +426,6 @@ public static TracerProvider trace(String spanName) { return new TracerProvider<>(spanName); } - /** - * Returns a transformer that traces the execution of an RxJava stream with an explicit parent - * context. - * - * @param spanName The name of the span to create. - * @param parentContext The explicit parent context for the span. - * @param The type of the stream. - * @return A TracerProvider that can be used with .compose(). - */ - public static TracerProvider trace(String spanName, Context parentContext) { - return new TracerProvider(spanName).setParent(parentContext); - } - /** * Returns a transformer that traces an agent invocation. *