Skip to content

Commit 47d54bb

Browse files
committed
feat(tasks): wire agent server event ingest
1 parent 41b9fdd commit 47d54bb

7 files changed

Lines changed: 645 additions & 55 deletions

File tree

packages/agent/src/server/agent-server.test.ts

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,247 @@ describe("AgentServer HTTP Mode", () => {
203203
});
204204

205205
describe("turn completion", () => {
206+
function stubSessionCleanup(testServer: unknown): {
207+
cleanupSession: (options?: {
208+
completeEventStream?: boolean;
209+
}) => Promise<void>;
210+
eventStreamSender: {
211+
enqueue: ReturnType<typeof vi.fn>;
212+
stop: ReturnType<typeof vi.fn>;
213+
};
214+
} {
215+
const cleanupServer = testServer as {
216+
session: unknown;
217+
eventStreamSender: {
218+
enqueue: ReturnType<typeof vi.fn>;
219+
stop: ReturnType<typeof vi.fn>;
220+
};
221+
captureCheckpointState: ReturnType<typeof vi.fn>;
222+
cleanupSession: (options?: {
223+
completeEventStream?: boolean;
224+
}) => Promise<void>;
225+
};
226+
cleanupServer.captureCheckpointState = vi.fn(async () => {});
227+
cleanupServer.eventStreamSender = {
228+
enqueue: vi.fn(),
229+
stop: vi.fn(async () => {}),
230+
};
231+
cleanupServer.session = {
232+
payload: { run_id: "run-1" },
233+
pendingHandoffGitState: undefined,
234+
logWriter: { flush: vi.fn(async () => {}) },
235+
acpConnection: { cleanup: vi.fn(async () => {}) },
236+
sseController: { close: vi.fn() },
237+
};
238+
return cleanupServer;
239+
}
240+
241+
it("keeps event ingest open for non-terminal session cleanup", async () => {
242+
const testServer = stubSessionCleanup(createServer());
243+
244+
await testServer.cleanupSession();
245+
246+
expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled();
247+
expect(testServer.eventStreamSender.stop).not.toHaveBeenCalled();
248+
});
249+
250+
it("stops event ingest for terminal session cleanup without fake task completion", async () => {
251+
const testServer = stubSessionCleanup(createServer());
252+
253+
await testServer.cleanupSession({ completeEventStream: true });
254+
255+
expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled();
256+
expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce();
257+
});
258+
259+
it("writes terminal failure status before completing event ingest", async () => {
260+
const order: string[] = [];
261+
const testServer = new AgentServer({
262+
port,
263+
jwtPublicKey: TEST_PUBLIC_KEY,
264+
repositoryPath: repo.path,
265+
apiUrl: "http://localhost:8000",
266+
apiKey: "test-api-key",
267+
projectId: 1,
268+
mode: "interactive",
269+
taskId: "test-task-id",
270+
runId: "test-run-id",
271+
}) as unknown as {
272+
eventStreamSender: {
273+
enqueue: (event: Record<string, unknown>) => void;
274+
stop: () => Promise<void>;
275+
};
276+
posthogAPI: {
277+
updateTaskRun: (
278+
taskId: string,
279+
runId: string,
280+
payload: Record<string, unknown>,
281+
) => Promise<unknown>;
282+
};
283+
signalTaskComplete(
284+
payload: JwtPayload,
285+
stopReason: string,
286+
errorMessage?: string,
287+
): Promise<void>;
288+
};
289+
testServer.eventStreamSender = {
290+
enqueue: vi.fn(() => {
291+
order.push("enqueue");
292+
}),
293+
stop: vi.fn(async () => {
294+
order.push("stop");
295+
}),
296+
};
297+
testServer.posthogAPI = {
298+
updateTaskRun: vi.fn(async () => {
299+
order.push("update");
300+
return {};
301+
}),
302+
};
303+
304+
await testServer.signalTaskComplete(
305+
{
306+
run_id: "run-1",
307+
task_id: "task-1",
308+
team_id: 1,
309+
user_id: 1,
310+
distinct_id: "distinct-id",
311+
mode: "interactive",
312+
},
313+
"error",
314+
"boom",
315+
);
316+
317+
expect(order).toEqual(["enqueue", "update", "stop"]);
318+
expect(testServer.eventStreamSender.enqueue).toHaveBeenCalledWith(
319+
expect.objectContaining({
320+
type: "notification",
321+
notification: expect.objectContaining({
322+
method: "_posthog/error",
323+
params: expect.objectContaining({ error: "boom" }),
324+
}),
325+
}),
326+
);
327+
expect(testServer.posthogAPI.updateTaskRun).toHaveBeenCalledWith(
328+
"task-1",
329+
"run-1",
330+
{
331+
status: "failed",
332+
error_message: "boom",
333+
},
334+
);
335+
});
336+
337+
it("still stops event ingest when terminal failure status update fails", async () => {
338+
const testServer = new AgentServer({
339+
port,
340+
jwtPublicKey: TEST_PUBLIC_KEY,
341+
repositoryPath: repo.path,
342+
apiUrl: "http://localhost:8000",
343+
apiKey: "test-api-key",
344+
projectId: 1,
345+
mode: "interactive",
346+
taskId: "test-task-id",
347+
runId: "test-run-id",
348+
}) as unknown as {
349+
eventStreamSender: {
350+
enqueue: (event: Record<string, unknown>) => void;
351+
stop: () => Promise<void>;
352+
};
353+
posthogAPI: {
354+
updateTaskRun: (
355+
taskId: string,
356+
runId: string,
357+
payload: Record<string, unknown>,
358+
) => Promise<unknown>;
359+
};
360+
signalTaskComplete(
361+
payload: JwtPayload,
362+
stopReason: string,
363+
errorMessage?: string,
364+
): Promise<void>;
365+
};
366+
testServer.eventStreamSender = {
367+
enqueue: vi.fn(),
368+
stop: vi.fn(async () => {}),
369+
};
370+
testServer.posthogAPI = {
371+
updateTaskRun: vi.fn(async () => {
372+
throw new Error("update failed");
373+
}),
374+
};
375+
376+
await testServer.signalTaskComplete(
377+
{
378+
run_id: "run-1",
379+
task_id: "task-1",
380+
team_id: 1,
381+
user_id: 1,
382+
distinct_id: "distinct-id",
383+
mode: "interactive",
384+
},
385+
"error",
386+
"boom",
387+
);
388+
389+
expect(testServer.eventStreamSender.enqueue).toHaveBeenCalledOnce();
390+
expect(testServer.posthogAPI.updateTaskRun).toHaveBeenCalledOnce();
391+
expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce();
392+
});
393+
394+
it("leaves event ingest open for non-error stop reasons", async () => {
395+
const testServer = new AgentServer({
396+
port,
397+
jwtPublicKey: TEST_PUBLIC_KEY,
398+
repositoryPath: repo.path,
399+
apiUrl: "http://localhost:8000",
400+
apiKey: "test-api-key",
401+
projectId: 1,
402+
mode: "interactive",
403+
taskId: "test-task-id",
404+
runId: "test-run-id",
405+
}) as unknown as {
406+
eventStreamSender: {
407+
enqueue: (event: Record<string, unknown>) => void;
408+
stop: () => Promise<void>;
409+
};
410+
posthogAPI: {
411+
updateTaskRun: (
412+
taskId: string,
413+
runId: string,
414+
payload: Record<string, unknown>,
415+
) => Promise<unknown>;
416+
};
417+
signalTaskComplete(
418+
payload: JwtPayload,
419+
stopReason: string,
420+
): Promise<void>;
421+
};
422+
testServer.eventStreamSender = {
423+
enqueue: vi.fn(),
424+
stop: vi.fn(async () => {}),
425+
};
426+
testServer.posthogAPI = {
427+
updateTaskRun: vi.fn(async () => ({})),
428+
};
429+
430+
await testServer.signalTaskComplete(
431+
{
432+
run_id: "run-1",
433+
task_id: "task-1",
434+
team_id: 1,
435+
user_id: 1,
436+
distinct_id: "distinct-id",
437+
mode: "interactive",
438+
},
439+
"end_turn",
440+
);
441+
442+
expect(testServer.eventStreamSender.enqueue).not.toHaveBeenCalled();
443+
expect(testServer.eventStreamSender.stop).not.toHaveBeenCalled();
444+
expect(testServer.posthogAPI.updateTaskRun).not.toHaveBeenCalled();
445+
});
446+
206447
it("persists structured turn completion notifications", () => {
207448
const appendRawLine = vi.fn();
208449
const testServer = new AgentServer({

packages/agent/src/server/agent-server.ts

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import {
5454
normalizeCloudPromptContent,
5555
promptBlocksToText,
5656
} from "./cloud-prompt";
57+
import { TaskRunEventStreamSender } from "./event-stream-sender";
5758
import { type JwtPayload, JwtValidationError, validateJwt } from "./jwt";
5859
import {
5960
handoffLocalGitStateSchema,
@@ -228,6 +229,7 @@ export class AgentServer {
228229
private session: ActiveSession | null = null;
229230
private app: Hono;
230231
private posthogAPI: PostHogAPIClient;
232+
private eventStreamSender: TaskRunEventStreamSender | null = null;
231233
private questionRelayedToSlack = false;
232234
private detectedPrUrl: string | null = null;
233235
private lastReportedBranch: string | null = null;
@@ -292,6 +294,17 @@ export class AgentServer {
292294
getApiKey: () => config.apiKey,
293295
userAgent: `posthog/cloud.hog.dev; version: ${config.version ?? packageJson.version}`,
294296
});
297+
if (config.eventIngestToken) {
298+
this.eventStreamSender = new TaskRunEventStreamSender({
299+
apiUrl: config.apiUrl,
300+
projectId: config.projectId,
301+
taskId: config.taskId,
302+
runId: config.runId,
303+
token: config.eventIngestToken,
304+
logger: this.logger.child("EventIngest"),
305+
streamWindowMs: config.eventIngestStreamWindowMs,
306+
});
307+
}
295308
this.app = this.createApp();
296309
}
297310

@@ -555,7 +568,9 @@ export class AgentServer {
555568
this.logger.debug("Stopping agent server...");
556569

557570
if (this.session) {
558-
await this.cleanupSession();
571+
await this.cleanupSession({ completeEventStream: true });
572+
} else {
573+
await this.eventStreamSender?.stop();
559574
}
560575

561576
if (this.server) {
@@ -1791,6 +1806,12 @@ ${attributionInstructions}
17911806

17921807
const status = "failed";
17931808

1809+
this.enqueueTaskTerminalEvent(POSTHOG_NOTIFICATIONS.ERROR, {
1810+
source: "agent_server",
1811+
stopReason,
1812+
error: errorMessage ?? "Agent error",
1813+
});
1814+
17941815
try {
17951816
await this.posthogAPI.updateTaskRun(payload.task_id, payload.run_id, {
17961817
status,
@@ -1799,9 +1820,28 @@ ${attributionInstructions}
17991820
this.logger.debug("Task completion signaled", { status, stopReason });
18001821
} catch (error) {
18011822
this.logger.error("Failed to signal task completion", error);
1823+
} finally {
1824+
await this.eventStreamSender?.stop();
18021825
}
18031826
}
18041827

1828+
private enqueueTaskTerminalEvent(
1829+
method:
1830+
| typeof POSTHOG_NOTIFICATIONS.TASK_COMPLETE
1831+
| typeof POSTHOG_NOTIFICATIONS.ERROR,
1832+
params: Record<string, unknown>,
1833+
): void {
1834+
this.eventStreamSender?.enqueue({
1835+
type: "notification",
1836+
timestamp: new Date().toISOString(),
1837+
notification: {
1838+
jsonrpc: "2.0",
1839+
method,
1840+
params,
1841+
},
1842+
});
1843+
}
1844+
18051845
private configureEnvironment({
18061846
isInternal = false,
18071847
}: {
@@ -2199,7 +2239,11 @@ ${attributionInstructions}
21992239
}
22002240
}
22012241

2202-
private async cleanupSession(): Promise<void> {
2242+
private async cleanupSession({
2243+
completeEventStream = false,
2244+
}: {
2245+
completeEventStream?: boolean;
2246+
} = {}): Promise<void> {
22032247
if (!this.session) return;
22042248

22052249
this.logger.debug("Cleaning up session");
@@ -2238,6 +2282,10 @@ ${attributionInstructions}
22382282
this.session.sseController.close();
22392283
}
22402284

2285+
if (completeEventStream) {
2286+
await this.eventStreamSender?.stop();
2287+
}
2288+
22412289
this.pendingEvents = [];
22422290
this.lastReportedBranch = null;
22432291
this.session = null;
@@ -2321,9 +2369,13 @@ ${attributionInstructions}
23212369
}
23222370

23232371
private broadcastEvent(event: Record<string, unknown>): void {
2372+
if (!this.session) return;
2373+
2374+
this.eventStreamSender?.enqueue(event);
2375+
23242376
if (this.session?.sseController) {
23252377
this.sendSseEvent(this.session.sseController, event);
2326-
} else if (this.session) {
2378+
} else {
23272379
// Buffer events during initialization (sseController not yet attached)
23282380
this.pendingEvents.push(event);
23292381
}

packages/agent/src/server/bin.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ const envSchema = z.object({
3232
POSTHOG_CODE_REASONING_EFFORT: z
3333
.enum(["low", "medium", "high", "xhigh", "max"])
3434
.optional(),
35+
POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN: z.string().min(1).optional(),
36+
POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS: z
37+
.string()
38+
.regex(
39+
/^[1-9]\d*$/,
40+
"POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS must be a positive integer",
41+
)
42+
.transform((value) => parseInt(value, 10))
43+
.optional(),
3544
});
3645

3746
const program = new Command();
@@ -148,6 +157,9 @@ program
148157
const server = new AgentServer({
149158
port: parseInt(options.port, 10),
150159
jwtPublicKey: env.JWT_PUBLIC_KEY,
160+
eventIngestToken: env.POSTHOG_TASK_RUN_EVENT_INGEST_TOKEN,
161+
eventIngestStreamWindowMs:
162+
env.POSTHOG_TASK_RUN_EVENT_INGEST_STREAM_WINDOW_MS,
151163
repositoryPath: options.repositoryPath,
152164
apiUrl: env.POSTHOG_API_URL,
153165
apiKey: env.POSTHOG_PERSONAL_API_KEY,

0 commit comments

Comments
 (0)