Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/thin-spies-train.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"braintrust": minor
---

feat(flue): Update flue instrumentation to use new observe hooks
12 changes: 1 addition & 11 deletions e2e/config/pr-comment-scenarios.json
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,7 @@
"scenarioDirName": "flue-instrumentation",
"label": "Flue Instrumentation",
"metadataScenario": "flue-instrumentation",
"variants": [
{ "variantKey": "flue-v0-7-0-wrapped", "label": "v0.7.0 wrapped" },
{
"variantKey": "flue-v0-7-0-auto-hook",
"label": "v0.7.0 auto-hook"
},
{
"variantKey": "flue-v0-7-0-openai-auto-hook",
"label": "v0.7.0 OpenAI auto-hook"
}
]
"variants": [{ "variantKey": "flue-v0-8-0", "label": "v0.8.0" }]
},
{
"scenarioDirName": "github-copilot-instrumentation",
Expand Down
28 changes: 22 additions & 6 deletions e2e/helpers/mock-braintrust-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,19 @@ export async function startMockBraintrustServer(

function trackProdForwarding(promise: Promise<void>): void {
pendingProdForwarding.add(promise);
promise.finally(() => {
pendingProdForwarding.delete(promise);
});
void promise.then(
() => {
pendingProdForwarding.delete(promise);
},
() => {
pendingProdForwarding.delete(promise);
},
);
}

async function forwardProdRequest(
capturedRequest: CapturedRequest,
options: { drainResponseBody?: boolean } = {},
): Promise<Response> {
if (!prodForwarding) {
throw new Error("prodForwarding is not enabled");
Expand Down Expand Up @@ -434,6 +440,10 @@ export async function startMockBraintrustServer(
);
}

if (options.drainResponseBody) {
await response.arrayBuffer();
}

return response;
}

Expand Down Expand Up @@ -658,7 +668,9 @@ export async function startMockBraintrustServer(
}
if (prodForwarding) {
trackProdForwarding(
forwardProdRequest(capturedRequest)
forwardProdRequest(capturedRequest, {
drainResponseBody: true,
})
.then(() => undefined)
.catch(() => undefined),
);
Expand All @@ -673,7 +685,9 @@ export async function startMockBraintrustServer(
) {
if (prodForwarding) {
trackProdForwarding(
forwardProdRequest(capturedRequest)
forwardProdRequest(capturedRequest, {
drainResponseBody: true,
})
.then(() => undefined)
.catch(() => undefined),
);
Expand Down Expand Up @@ -706,7 +720,9 @@ export async function startMockBraintrustServer(
await new Promise<void>((resolve, reject) => {
server.close((error) => (error ? reject(error) : resolve()));
});
await Promise.allSettled([...pendingProdForwarding]);
while (pendingProdForwarding.size > 0) {
await Promise.allSettled([...pendingProdForwarding]);
}
},
events,
payloads,
Expand Down
104 changes: 104 additions & 0 deletions e2e/scenarios/flue-instrumentation/.flue/app.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { mkdir, writeFile } from "node:fs/promises";
import { dirname } from "node:path";
import { configureProvider, flue, observe } from "@flue/runtime/app";
import { flush, initLogger } from "braintrust";

function projectName() {
const configured = process.env.BRAINTRUST_E2E_PROJECT_NAME;
if (configured) {
return configured;
}
const testRunId = process.env.BRAINTRUST_E2E_RUN_ID ?? "local";
return `e2e-flue-instrumentation-${testRunId.toLowerCase().replace(/[^a-z0-9-]/g, "-")}`;
}

initLogger({ projectName: projectName() });

const exitProcess = process.exit.bind(process);
if (process.env.FLUE_E2E_FLUSH_FILE) {
let isExiting = false;
process.exit = (code) => {
if (isExiting) {
return exitProcess(code);
}
isExiting = true;
const keepAlive = setTimeout(() => {}, 30_000);
void flushBeforeExit()
.catch((error) => {
console.error(error);
})
.finally(() => {
clearTimeout(keepAlive);
exitProcess(code);
});
};
}

if (process.env.FLUE_E2E_EXPLICIT_OBSERVE === "1") {
const { braintrustFlueObserver } = await import("braintrust");
observe(braintrustFlueObserver);
}

const openAIBaseUrl =
process.env.OPENAI_BASE_URL ?? process.env.BRAINTRUST_E2E_MODEL_BASE_URL;
if (openAIBaseUrl) {
configureProvider("openai", { baseUrl: openAIBaseUrl });
}

const anthropicBaseUrl = process.env.ANTHROPIC_BASE_URL;
if (anthropicBaseUrl) {
configureProvider("anthropic", {
apiKey: process.env.ANTHROPIC_API_KEY ?? "test-key",
baseUrl: anthropicBaseUrl,
});
}

let didScheduleFlush = false;
function scheduleFinalFlush(exitAfterFlush = false) {
if (didScheduleFlush) {
return;
}
didScheduleFlush = true;
const keepAlive = setTimeout(() => {}, 30_000);
void flushBeforeExit()
.catch((error) => {
console.error(error);
})
.finally(() => {
clearTimeout(keepAlive);
if (exitAfterFlush) {
exitProcess(0);
}
});
}

process.on("SIGTERM", () => {
scheduleFinalFlush(true);
});

process.on("beforeExit", () => {
scheduleFinalFlush();
});

const app = flue();

async function flushBeforeExit() {
await new Promise((resolve) => setTimeout(resolve, 250));
await flush();
if (process.env.FLUE_E2E_FLUSH_FILE) {
await mkdir(dirname(process.env.FLUE_E2E_FLUSH_FILE), {
recursive: true,
});
await writeFile(process.env.FLUE_E2E_FLUSH_FILE, "ok");
}
}

export default {
async fetch(request, env, ctx) {
if (new URL(request.url).pathname === "/__braintrust_flush") {
await flush();
return new Response("ok");
}
return app.fetch(request, env, ctx);
},
};
142 changes: 142 additions & 0 deletions e2e/scenarios/flue-instrumentation/.flue/workflows/instrumentation.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { createAgent, Type } from "@flue/runtime";
import { local } from "@flue/runtime/node";
import {
FLUE_MODEL,
FLUE_REASONING_MODEL,
SCENARIO_NAME,
} from "../../constants.mjs";

function flueModel() {
return process.env.FLUE_E2E_MODEL ?? FLUE_MODEL;
}

function flueReasoningModel() {
return process.env.FLUE_E2E_REASONING_MODEL ?? FLUE_REASONING_MODEL;
}

function fluePromptModel() {
return process.env.FLUE_E2E_PROMPT_MODEL ?? flueReasoningModel();
}

function fluePromptThinkingLevel() {
return (
process.env.FLUE_E2E_PROMPT_THINKING_LEVEL ?? flueReasoningThinkingLevel()
);
}

function flueReasoningThinkingLevel() {
return process.env.FLUE_E2E_REASONING_THINKING_LEVEL ?? "medium";
}

const flueE2EAgent = createAgent(() => ({
compaction: {
keepRecentTokens: 1,
reserveTokens: 64,
},
cwd: process.cwd(),
instructions: [
"You are a deterministic Flue instrumentation test agent.",
"Follow user instructions exactly.",
"When asked for a marker, output only that marker and no extra text.",
"When running a local skill file, read it yourself and do not delegate it to a task.",
].join(" "),
model: flueModel(),
sandbox: local({ cwd: process.cwd() }),
thinkingLevel: "off",
}));

const lookupTool = {
description:
"Return a deterministic lookup result with an id needed by web_search.",
execute: async (args) =>
JSON.stringify({
id: "flue-session-2026",
query: args.query,
topic: "session instrumentation",
}),
name: "lookup",
parameters: Type.Object({
query: Type.String(),
}),
};

const webSearchTool = {
description:
"Search a deterministic local web index. Requires the id returned by lookup.",
execute: async (args) =>
JSON.stringify({
lookupId: args.lookupId,
query: args.query,
results: [
{
title: "Flue reasoning stream instrumentation",
url: "https://example.test/flue/reasoning-streams",
},
],
}),
name: "web_search",
parameters: Type.Object({
lookupId: Type.String(),
query: Type.String(),
}),
};

const summarizeSourceTool = {
description:
"Summarize the selected deterministic source after web_search returns a URL.",
execute: async (args) =>
JSON.stringify({
summary:
"Flue emits reasoning, tool execution, and LLM turn events separately.",
url: args.url,
}),
name: "summarize_source",
parameters: Type.Object({
url: Type.String(),
}),
};

export async function route(_ctx, next) {
await next();
}

export async function run({ init, payload }) {
const harness = await init(flueE2EAgent, { name: "default" });
const session = await harness.session("main");
const skillSession = await harness.session("skill");
const taskSession = await harness.session("task");

await session.prompt(
[
"Complete this instrumented research flow.",
"Call exactly one tool per turn and wait for each tool result before choosing the next tool.",
'Step 1: call lookup with query "flue instrumentation".',
'Step 2: use the lookup result id as lookupId and call web_search with query "Braintrust Flue reasoning stream instrumentation".',
"Step 3: use the first web_search result url and call summarize_source.",
"After summarize_source returns, reply with exactly PROMPT_DONE and no other text.",
].join(" "),
{
model: fluePromptModel(),
thinkingLevel: fluePromptThinkingLevel(),
tools: [lookupTool, webSearchTool, summarizeSourceTool],
},
);

await skillSession.skill("e2e-flue-skill", {
args: { marker: "SKILL_DONE" },
model: flueReasoningModel(),
thinkingLevel: "off",
});

await taskSession.task("Reply with exactly TASK_DONE and no other text.", {
model: FLUE_MODEL,
thinkingLevel: "off",
});

await session.compact();

return {
scenario: payload?.scenario ?? SCENARIO_NAME,
status: "done",
};
}
Loading
Loading