Skip to content

Commit 35a73d2

Browse files
committed
Add incremental chat completion traces
1 parent 10c7318 commit 35a73d2

12 files changed

Lines changed: 927 additions & 143 deletions

File tree

cli/src/hooks/helpers/__tests__/send-message.test.ts

Lines changed: 264 additions & 116 deletions
Large diffs are not rendered by default.

cli/src/utils/run-state-storage.ts

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import * as fs from 'fs'
22
import path from 'path'
3+
import { randomUUID } from 'node:crypto'
34

4-
import { getCurrentChatDir, getMostRecentChatDir, getProjectDataDir } from '../project-files'
5+
import {
6+
getCurrentChatDir,
7+
getMostRecentChatDir,
8+
getProjectDataDir,
9+
} from '../project-files'
510
import { logger } from './logger'
611

712
import type { ChatMessage, ContentBlock } from '../types/chat'
@@ -21,9 +26,9 @@ type SavedChatState = {
2126
*/
2227
function extractToggleIds(blocks: ContentBlock[] | undefined): string[] {
2328
if (!blocks) return []
24-
29+
2530
const ids: string[] = []
26-
31+
2732
for (const block of blocks) {
2833
if (block.type === 'agent') {
2934
ids.push(block.agentId)
@@ -33,7 +38,7 @@ function extractToggleIds(blocks: ContentBlock[] | undefined): string[] {
3338
ids.push(block.toolCallId)
3439
}
3540
}
36-
41+
3742
return ids
3843
}
3944

@@ -42,11 +47,11 @@ function extractToggleIds(blocks: ContentBlock[] | undefined): string[] {
4247
*/
4348
export function getAllToggleIdsFromMessages(messages: ChatMessage[]): string[] {
4449
const ids: string[] = []
45-
50+
4651
for (const message of messages) {
4752
ids.push(...extractToggleIds(message.blocks))
4853
}
49-
54+
5055
return ids
5156
}
5257

@@ -69,11 +74,14 @@ export function getChatMessagesPath(): string {
6974
/**
7075
* Save both the RunState and ChatMessage[] to disk
7176
*/
72-
export function saveChatState(runState: RunState, messages: ChatMessage[]): void {
77+
export function saveChatState(
78+
runState: RunState,
79+
messages: ChatMessage[],
80+
): void {
7381
try {
7482
const runStatePath = getRunStatePath()
7583
const messagesPath = getChatMessagesPath()
76-
84+
7785
fs.writeFileSync(runStatePath, JSON.stringify(runState, null, 2))
7886
fs.writeFileSync(messagesPath, JSON.stringify(messages, null, 2))
7987
} catch (error) {
@@ -92,14 +100,19 @@ export function saveChatState(runState: RunState, messages: ChatMessage[]): void
92100
* recently modified chat directory is used.
93101
* Returns null if no previous chat exists or files can't be parsed.
94102
*/
95-
export function loadMostRecentChatState(chatId?: string): SavedChatState | null {
103+
export function loadMostRecentChatState(
104+
chatId?: string,
105+
): SavedChatState | null {
96106
try {
97107
let chatDir: string | null = null
98108

99109
if (chatId && chatId.trim().length > 0) {
100110
const baseDir = path.join(getProjectDataDir(), 'chats')
101111
const candidateDir = path.join(baseDir, chatId.trim())
102-
if (fs.existsSync(candidateDir) && fs.statSync(candidateDir).isDirectory()) {
112+
if (
113+
fs.existsSync(candidateDir) &&
114+
fs.statSync(candidateDir).isDirectory()
115+
) {
103116
chatDir = candidateDir
104117
} else {
105118
logger.debug(
@@ -133,12 +146,18 @@ export function loadMostRecentChatState(chatId?: string): SavedChatState | null
133146
const messagesContent = fs.readFileSync(messagesPath, 'utf8')
134147

135148
const runState = JSON.parse(runStateContent) as RunState
149+
runState.traceSessionId ??= randomUUID()
136150
const messages = JSON.parse(messagesContent) as ChatMessage[]
137151

138152
const resolvedChatId = path.basename(chatDir)
139153

140154
logger.info(
141-
{ runStatePath, messagesPath, messageCount: messages.length, chatId: resolvedChatId },
155+
{
156+
runStatePath,
157+
messagesPath,
158+
messageCount: messages.length,
159+
chatId: resolvedChatId,
160+
},
142161
'Loaded chat state from chat directory',
143162
)
144163

@@ -161,18 +180,15 @@ export function clearChatState(): void {
161180
try {
162181
const runStatePath = getRunStatePath()
163182
const messagesPath = getChatMessagesPath()
164-
183+
165184
if (fs.existsSync(runStatePath)) {
166185
fs.unlinkSync(runStatePath)
167186
}
168187
if (fs.existsSync(messagesPath)) {
169188
fs.unlinkSync(messagesPath)
170189
}
171-
172-
logger.debug(
173-
{ runStatePath, messagesPath },
174-
'Cleared chat state files'
175-
)
190+
191+
logger.debug({ runStatePath, messagesPath }, 'Cleared chat state files')
176192
} catch (error) {
177193
logger.error(
178194
{

common/src/types/contracts/bigquery.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,35 @@ export type InsertMessageBigqueryFn = (params: {
2121
dataset?: string
2222
logger: Logger
2323
}) => Promise<boolean>
24+
25+
export type ChatCompletionTraceRow = {
26+
id: string
27+
user_id: string
28+
client_id?: string | null
29+
trace_session_id: string
30+
trace_lineage_id: string
31+
run_id: string
32+
agent_id: string
33+
created_at: Date
34+
model: string
35+
cost_mode?: string | null
36+
request: unknown
37+
message_count: number
38+
message_start_index: number
39+
message_delta_count: number
40+
previous_message_count?: number | null
41+
common_prefix_length: number
42+
cache_hit: boolean
43+
full_snapshot: boolean
44+
messages: unknown[]
45+
delta_message_hashes: string[]
46+
tool_count: number
47+
tools?: unknown[] | null
48+
tools_omitted: boolean
49+
}
50+
51+
export type InsertChatCompletionTraceBigqueryFn = (params: {
52+
row: ChatCompletionTraceRow
53+
dataset?: string
54+
logger: Logger
55+
}) => Promise<boolean>

packages/bigquery/src/client.ts

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,26 @@ import { IS_PROD } from '@codebuff/common/env'
22
import { getErrorObject } from '@codebuff/common/util/error'
33
import { BigQuery } from '@google-cloud/bigquery'
44

5-
import { MESSAGE_SCHEMA, RELABELS_SCHEMA, TRACES_SCHEMA } from './schema'
5+
import {
6+
CHAT_COMPLETION_TRACES_SCHEMA,
7+
MESSAGE_SCHEMA,
8+
RELABELS_SCHEMA,
9+
TRACES_SCHEMA,
10+
} from './schema'
611

712
import type { BaseTrace, GetRelevantFilesTrace, Relabel, Trace } from './schema'
8-
import type { MessageRow } from '@codebuff/common/types/contracts/bigquery'
13+
import type {
14+
ChatCompletionTraceRow,
15+
MessageRow,
16+
} from '@codebuff/common/types/contracts/bigquery'
917
import type { Logger } from '@codebuff/common/types/contracts/logger'
1018

1119
const DATASET = IS_PROD ? 'codebuff_data' : 'codebuff_data_dev'
1220

1321
const TRACES_TABLE = 'traces'
1422
const RELABELS_TABLE = 'relabels'
1523
const MESSAGE_TABLE = 'message'
24+
const CHAT_COMPLETION_TRACES_TABLE = 'chat_completion_traces'
1625

1726
// Create a single BigQuery client instance to be used by all functions
1827
let client: BigQuery | null = null
@@ -77,6 +86,17 @@ export async function setupBigQuery({
7786
fields: ['user_id'],
7887
},
7988
})
89+
await ds.table(CHAT_COMPLETION_TRACES_TABLE).get({
90+
autoCreate: true,
91+
schema: CHAT_COMPLETION_TRACES_SCHEMA,
92+
timePartitioning: {
93+
type: 'MONTH',
94+
field: 'created_at',
95+
},
96+
clustering: {
97+
fields: ['user_id', 'trace_session_id', 'trace_lineage_id'],
98+
},
99+
})
80100
} catch (error) {
81101
const err = error as Error & { code?: string; details?: unknown }
82102
logger.error(
@@ -94,6 +114,53 @@ export async function setupBigQuery({
94114
}
95115
}
96116

117+
export async function insertChatCompletionTraceBigquery({
118+
row,
119+
dataset,
120+
logger,
121+
}: {
122+
row: ChatCompletionTraceRow
123+
dataset?: string
124+
logger: Logger
125+
}) {
126+
const resolvedDataset = dataset ?? DATASET
127+
try {
128+
await getClient()
129+
.dataset(resolvedDataset)
130+
.table(CHAT_COMPLETION_TRACES_TABLE)
131+
.insert({
132+
...row,
133+
request: JSON.stringify(row.request),
134+
messages: JSON.stringify(row.messages),
135+
delta_message_hashes: JSON.stringify(row.delta_message_hashes),
136+
tools: row.tools ? JSON.stringify(row.tools) : null,
137+
})
138+
139+
logger.debug(
140+
{
141+
traceId: row.id,
142+
userId: row.user_id,
143+
clientId: row.client_id,
144+
traceSessionId: row.trace_session_id,
145+
traceLineageId: row.trace_lineage_id,
146+
runId: row.run_id,
147+
messageStartIndex: row.message_start_index,
148+
messageDeltaCount: row.message_delta_count,
149+
fullSnapshot: row.full_snapshot,
150+
},
151+
'Inserted chat completion trace into BigQuery',
152+
)
153+
return true
154+
} catch (error) {
155+
logger.error(
156+
{ error: getErrorObject(error), traceId: row.id },
157+
'Failed to insert chat completion trace into BigQuery',
158+
)
159+
160+
return false
161+
}
162+
}
163+
97164
export async function insertMessageBigquery({
98165
row,
99166
dataset,

packages/bigquery/src/schema.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,31 @@ export const MESSAGE_SCHEMA: TableSchema = {
143143
{ name: 'cache_read_input_tokens', type: 'INTEGER', mode: 'NULLABLE' },
144144
],
145145
}
146+
147+
export const CHAT_COMPLETION_TRACES_SCHEMA: TableSchema = {
148+
fields: [
149+
{ name: 'id', type: 'STRING', mode: 'REQUIRED' },
150+
{ name: 'user_id', type: 'STRING', mode: 'REQUIRED' },
151+
{ name: 'client_id', type: 'STRING', mode: 'NULLABLE' },
152+
{ name: 'trace_session_id', type: 'STRING', mode: 'REQUIRED' },
153+
{ name: 'trace_lineage_id', type: 'STRING', mode: 'REQUIRED' },
154+
{ name: 'run_id', type: 'STRING', mode: 'REQUIRED' },
155+
{ name: 'agent_id', type: 'STRING', mode: 'REQUIRED' },
156+
{ name: 'created_at', type: 'TIMESTAMP', mode: 'REQUIRED' },
157+
{ name: 'model', type: 'STRING', mode: 'REQUIRED' },
158+
{ name: 'cost_mode', type: 'STRING', mode: 'NULLABLE' },
159+
{ name: 'request', type: 'JSON', mode: 'REQUIRED' },
160+
{ name: 'message_count', type: 'INTEGER', mode: 'REQUIRED' },
161+
{ name: 'message_start_index', type: 'INTEGER', mode: 'REQUIRED' },
162+
{ name: 'message_delta_count', type: 'INTEGER', mode: 'REQUIRED' },
163+
{ name: 'previous_message_count', type: 'INTEGER', mode: 'NULLABLE' },
164+
{ name: 'common_prefix_length', type: 'INTEGER', mode: 'REQUIRED' },
165+
{ name: 'cache_hit', type: 'BOOLEAN', mode: 'REQUIRED' },
166+
{ name: 'full_snapshot', type: 'BOOLEAN', mode: 'REQUIRED' },
167+
{ name: 'messages', type: 'JSON', mode: 'REQUIRED' },
168+
{ name: 'delta_message_hashes', type: 'JSON', mode: 'REQUIRED' },
169+
{ name: 'tool_count', type: 'INTEGER', mode: 'REQUIRED' },
170+
{ name: 'tools', type: 'JSON', mode: 'NULLABLE' },
171+
{ name: 'tools_omitted', type: 'BOOLEAN', mode: 'REQUIRED' },
172+
],
173+
}

sdk/src/run-state.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ export function selectHighestPriorityKnowledgeFile(
6262
export type RunState = {
6363
sessionState?: SessionState
6464
output: AgentOutput
65+
traceSessionId: string
6566
}
6667

6768
export type InitialSessionStateOptions = {
@@ -630,6 +631,7 @@ export async function generateInitialRunState({
630631
fs: CodebuffFileSystem
631632
}): Promise<RunState> {
632633
return {
634+
traceSessionId: crypto.randomUUID(),
633635
sessionState: await initialSessionState({
634636
cwd,
635637
skillsDir,

0 commit comments

Comments
 (0)