-
Notifications
You must be signed in to change notification settings - Fork 1
feat: add tape recording system for workspace analyze sessions #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| package tape | ||
|
|
||
| import "time" | ||
|
|
||
| // Entry is a single append-only entry in a tape, modeled after republic's TapeEntry. | ||
| type Entry struct { | ||
| ID int `json:"id"` | ||
| Kind string `json:"kind"` | ||
| Payload map[string]any `json:"payload"` | ||
| Meta map[string]any `json:"meta,omitempty"` | ||
| Date string `json:"date"` | ||
| } | ||
|
|
||
| // Entry kinds. | ||
| const ( | ||
| KindMessage = "message" | ||
| KindSystem = "system" | ||
| KindToolCall = "tool_call" | ||
| KindToolResult = "tool_result" | ||
| KindError = "error" | ||
| KindEvent = "event" | ||
| ) | ||
|
|
||
| func newEntry(kind string, payload, meta map[string]any) Entry { | ||
| return Entry{ | ||
| Kind: kind, | ||
| Payload: payload, | ||
| Meta: meta, | ||
| Date: time.Now().UTC().Format(time.RFC3339Nano), | ||
| } | ||
| } | ||
|
|
||
| // MessageEntry creates a message entry. | ||
| func MessageEntry(role, content string, meta map[string]any) Entry { | ||
| return newEntry(KindMessage, map[string]any{ | ||
| "role": role, | ||
| "content": content, | ||
| }, meta) | ||
| } | ||
|
|
||
| // SystemEntry creates a system prompt entry. | ||
| func SystemEntry(content string, meta map[string]any) Entry { | ||
| return newEntry(KindSystem, map[string]any{ | ||
| "content": content, | ||
| }, meta) | ||
| } | ||
|
|
||
| // ToolCallEntry creates a tool call entry. | ||
| func ToolCallEntry(calls []map[string]any, meta map[string]any) Entry { | ||
| return newEntry(KindToolCall, map[string]any{ | ||
| "calls": calls, | ||
| }, meta) | ||
| } | ||
|
|
||
| // ToolResultEntry creates a tool result entry. | ||
| func ToolResultEntry(results []any, meta map[string]any) Entry { | ||
| return newEntry(KindToolResult, map[string]any{ | ||
| "results": results, | ||
| }, meta) | ||
| } | ||
|
|
||
| // ErrorEntry creates an error entry. | ||
| func ErrorEntry(kind, message string, meta map[string]any) Entry { | ||
| return newEntry(KindError, map[string]any{ | ||
| "kind": kind, | ||
| "message": message, | ||
| }, meta) | ||
| } | ||
|
|
||
| // EventEntry creates a generic event entry. | ||
| func EventEntry(name string, data, meta map[string]any) Entry { | ||
| return newEntry(KindEvent, map[string]any{ | ||
| "name": name, | ||
| "data": data, | ||
| }, meta) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,220 @@ | ||
| package tape | ||
|
|
||
| import ( | ||
| "context" | ||
| "log/slog" | ||
|
|
||
| "github.com/cloudwego/eino/callbacks" | ||
| "github.com/cloudwego/eino/components" | ||
| "github.com/cloudwego/eino/components/model" | ||
| "github.com/cloudwego/eino/schema" | ||
| ) | ||
|
|
||
| // NewHandler creates an eino callbacks.Handler that records tape entries to the given store. | ||
| func NewHandler(store Recorder) callbacks.Handler { | ||
| return callbacks.NewHandlerBuilder(). | ||
| OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { | ||
| onStart(store, info, input) | ||
| return ctx | ||
| }). | ||
| OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { | ||
| onEnd(store, info, output) | ||
| return ctx | ||
| }). | ||
| OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { | ||
| onError(store, info, err) | ||
| return ctx | ||
| }). | ||
| OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { | ||
| onEndWithStream(store, info, output) | ||
| return ctx | ||
| }). | ||
| Build() | ||
| } | ||
|
|
||
| func componentMeta(info *callbacks.RunInfo) map[string]any { | ||
| return map[string]any{ | ||
| "component": string(info.Component), | ||
| "name": info.Name, | ||
| "type": info.Type, | ||
| } | ||
| } | ||
|
|
||
| func onStart(store Recorder, info *callbacks.RunInfo, input callbacks.CallbackInput) { | ||
| if info == nil { | ||
| return | ||
| } | ||
| meta := componentMeta(info) | ||
|
|
||
| if info.Component == components.ComponentOfChatModel { | ||
| modelInput := model.ConvCallbackInput(input) | ||
| if modelInput != nil { | ||
| recordModelInput(store, modelInput, meta) | ||
| return | ||
| } | ||
| } | ||
|
|
||
| if info.Component == components.ComponentOfTool { | ||
| entry := EventEntry("tool_start", map[string]any{ | ||
| "component": string(info.Component), | ||
| "name": info.Name, | ||
| }, meta) | ||
| appendSafe(store, entry) | ||
| return | ||
| } | ||
|
|
||
| entry := EventEntry("start", map[string]any{ | ||
| "component": string(info.Component), | ||
| "name": info.Name, | ||
| }, meta) | ||
| appendSafe(store, entry) | ||
| } | ||
|
|
||
| func onEnd(store Recorder, info *callbacks.RunInfo, output callbacks.CallbackOutput) { | ||
| if info == nil { | ||
| return | ||
| } | ||
| meta := componentMeta(info) | ||
|
|
||
| if info.Component == components.ComponentOfChatModel { | ||
| modelOutput := model.ConvCallbackOutput(output) | ||
| if modelOutput != nil { | ||
| recordModelOutput(store, modelOutput, meta) | ||
| return | ||
| } | ||
| } | ||
|
|
||
| if info.Component == components.ComponentOfTool { | ||
| recordToolResult(store, output, meta) | ||
| return | ||
| } | ||
|
|
||
| entry := EventEntry("end", map[string]any{ | ||
| "component": string(info.Component), | ||
| "name": info.Name, | ||
| }, meta) | ||
| appendSafe(store, entry) | ||
| } | ||
|
|
||
| func onError(store Recorder, info *callbacks.RunInfo, err error) { | ||
| if info == nil { | ||
| return | ||
| } | ||
| meta := componentMeta(info) | ||
| entry := ErrorEntry(string(info.Component), err.Error(), meta) | ||
| appendSafe(store, entry) | ||
| } | ||
|
|
||
| func onEndWithStream(store Recorder, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) { | ||
| if info == nil { | ||
| return | ||
| } | ||
|
Comment on lines
+109
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This early return skips closing the Useful? React with 👍 / 👎. |
||
| meta := componentMeta(info) | ||
|
|
||
| // Consume the stream in a goroutine to avoid blocking | ||
| go func() { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| slog.Warn("tape: panic consuming stream", "recover", r) | ||
| } | ||
| }() | ||
|
|
||
| var lastOutput callbacks.CallbackOutput | ||
| for { | ||
| chunk, err := output.Recv() | ||
| if err != nil { | ||
| break | ||
| } | ||
| lastOutput = chunk | ||
| } | ||
|
Comment on lines
+122
to
+129
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. |
||
| output.Close() | ||
|
|
||
| if lastOutput == nil { | ||
| return | ||
| } | ||
|
|
||
| if info.Component == components.ComponentOfChatModel { | ||
| modelOutput := model.ConvCallbackOutput(lastOutput) | ||
| if modelOutput != nil { | ||
| recordModelOutput(store, modelOutput, meta) | ||
| return | ||
| } | ||
| } | ||
|
|
||
| entry := EventEntry("end", map[string]any{ | ||
| "component": string(info.Component), | ||
| "name": info.Name, | ||
| }, meta) | ||
| appendSafe(store, entry) | ||
| }() | ||
| } | ||
|
|
||
| func recordModelInput(store Recorder, input *model.CallbackInput, meta map[string]any) { | ||
| // Record system message separately if present | ||
| for _, msg := range input.Messages { | ||
| if msg.Role == schema.System { | ||
| appendSafe(store, SystemEntry(msg.Content, meta)) | ||
| continue | ||
| } | ||
| appendSafe(store, MessageEntry(string(msg.Role), msg.Content, meta)) | ||
| } | ||
|
|
||
| // Record tool calls from messages | ||
| if len(input.Tools) > 0 { | ||
| var tools []map[string]any | ||
| for _, t := range input.Tools { | ||
| tools = append(tools, map[string]any{ | ||
| "name": t.Name, | ||
| "desc": t.Desc, | ||
| }) | ||
| } | ||
| appendSafe(store, EventEntry("tools_available", map[string]any{ | ||
| "tools": tools, | ||
| }, meta)) | ||
| } | ||
| } | ||
|
|
||
| func recordModelOutput(store Recorder, output *model.CallbackOutput, meta map[string]any) { | ||
| if output.TokenUsage != nil { | ||
| meta["token_usage"] = map[string]any{ | ||
| "prompt_tokens": output.TokenUsage.PromptTokens, | ||
| "completion_tokens": output.TokenUsage.CompletionTokens, | ||
| "total_tokens": output.TokenUsage.TotalTokens, | ||
| } | ||
| } | ||
|
|
||
| if output.Message != nil { | ||
| // Record tool calls if present | ||
| if len(output.Message.ToolCalls) > 0 { | ||
| var calls []map[string]any | ||
| for _, tc := range output.Message.ToolCalls { | ||
| calls = append(calls, map[string]any{ | ||
| "id": tc.ID, | ||
| "function": tc.Function.Name, | ||
| "args": tc.Function.Arguments, | ||
| }) | ||
| } | ||
| appendSafe(store, ToolCallEntry(calls, meta)) | ||
| } | ||
|
|
||
| // Record assistant message | ||
| if output.Message.Content != "" { | ||
| appendSafe(store, MessageEntry(string(output.Message.Role), output.Message.Content, meta)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func recordToolResult(store Recorder, output callbacks.CallbackOutput, meta map[string]any) { | ||
| // Tool output is typically a string | ||
| if s, ok := output.(string); ok { | ||
| appendSafe(store, ToolResultEntry([]any{s}, meta)) | ||
| return | ||
| } | ||
| appendSafe(store, ToolResultEntry([]any{output}, meta)) | ||
| } | ||
|
|
||
| func appendSafe(store Recorder, entry Entry) { | ||
| if err := store.Append(entry); err != nil { | ||
| slog.Warn("tape: failed to append entry", "err", err, "kind", entry.Kind) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RunAgentWithPromptinstalls a new tape callback viacallbacks.AppendGlobalHandlerson every invocation, but global handlers are process-wide and never removed. If this function is called more than once in a process (e.g., repeated analyzer runs in tests or embedded usage), later runs will invoke all previously registered tape handlers, causing duplicated events and writes to stale tape files from earlier sessions.Useful? React with 👍 / 👎.