From 55bb3dd4194bb96638a668f7c746a257bec609b6 Mon Sep 17 00:00:00 2001 From: Zhiqiang ZHOU Date: Thu, 19 Mar 2026 21:49:19 -0700 Subject: [PATCH 1/2] fix(analyze): ACP-only path, tape, dedupe ChatModel callbacks - Drop eino filesystem middleware so ToolsNode never runs; ACP executes tools - acpToolCallingModel reports IsCallbacksEnabled to skip callbackInjectedModel - Wire tape JSONL + SlogEinoHandler on analyze; remove local backend adapter - go mod tidy (drop unused eino-ext/local backend) Made-with: Cursor --- go.mod | 4 +- go.sum | 4 - pkg/analyzer/acp_tool_model.go | 10 +- pkg/analyzer/analyzer.go | 46 ++- pkg/analyzer/local_backend_adapter.go | 52 ---- pkg/tape/eino_handler.go | 387 ++++++++++++++++++++++++++ pkg/tape/eino_handler_test.go | 11 + pkg/tape/entry.go | 75 +++++ pkg/tape/jsonl.go | 94 +++++++ pkg/tape/jsonl_test.go | 38 +++ pkg/tracing/slog_eino.go | 110 ++++++++ pkg/tracing/slog_eino_test.go | 12 + 12 files changed, 759 insertions(+), 84 deletions(-) delete mode 100644 pkg/analyzer/local_backend_adapter.go create mode 100644 pkg/tape/eino_handler.go create mode 100644 pkg/tape/eino_handler_test.go create mode 100644 pkg/tape/entry.go create mode 100644 pkg/tape/jsonl.go create mode 100644 pkg/tape/jsonl_test.go create mode 100644 pkg/tracing/slog_eino.go create mode 100644 pkg/tracing/slog_eino_test.go diff --git a/go.mod b/go.mod index 47f9da9..be8aad9 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/strrl/lapp go 1.25.7 require ( + github.com/bytedance/sonic v1.15.0 github.com/cloudwego/eino v0.8.0 - github.com/cloudwego/eino-ext/adk/backend/local v0.1.2-0.20260306073537-008f82264d85 github.com/cloudwego/eino-ext/callbacks/langfuse v0.0.0-20260227151421-e109b4ff9563 github.com/cloudwego/eino-ext/components/model/openrouter v0.1.2 github.com/duckdb/duckdb-go/v2 v2.5.5 @@ -24,10 +24,8 @@ require ( require ( github.com/apache/arrow-go/v18 v18.5.1 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect - github.com/bmatcuk/doublestar/v4 v4.10.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/bytedance/gopkg v0.1.3 // indirect - github.com/bytedance/sonic v1.15.0 // indirect github.com/bytedance/sonic/loader v0.5.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect diff --git a/go.sum b/go.sum index 4180aca..307965b 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,6 @@ github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJe github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= -github.com/bmatcuk/doublestar/v4 v4.10.0 h1:zU9WiOla1YA122oLM6i4EXvGW62DvKZVxIe6TYWexEs= -github.com/bmatcuk/doublestar/v4 v4.10.0/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= @@ -34,8 +32,6 @@ github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= github.com/cloudwego/eino v0.8.0 h1:DLbrgEAloA+l7aR2qim7qQocQB48DjPrb8LzG3PYMHY= github.com/cloudwego/eino v0.8.0/go.mod h1:+2N4nsMPxA6kGBHpH+75JuTfEcGprAMTdsZESrShKpU= -github.com/cloudwego/eino-ext/adk/backend/local v0.1.2-0.20260306073537-008f82264d85 h1:mD47o0GKdeqMdGI5xEqnlO8ZtArvhalIorRtrCmLRkA= -github.com/cloudwego/eino-ext/adk/backend/local v0.1.2-0.20260306073537-008f82264d85/go.mod h1:LfFk+VqZk0JOxIyl5RaerYqlFVLyXOCoSaqqak8hNls= github.com/cloudwego/eino-ext/callbacks/langfuse v0.0.0-20260227151421-e109b4ff9563 h1:DKTXDDw8ErC4RorZLfB2ZdHChjDKWIqOEO7VRSjjfbg= github.com/cloudwego/eino-ext/callbacks/langfuse v0.0.0-20260227151421-e109b4ff9563/go.mod h1:lrNKITZR4QUaYl9Rdz9W6qGOolHRy6mPamEZYA8uz7s= github.com/cloudwego/eino-ext/components/model/openrouter v0.1.2 h1:zDFteouktUsGk4I/7m1b7yT4e9qawy45gWtLoyeHwxI= diff --git a/pkg/analyzer/acp_tool_model.go b/pkg/analyzer/acp_tool_model.go index fdd6033..52b725a 100644 --- a/pkg/analyzer/acp_tool_model.go +++ b/pkg/analyzer/acp_tool_model.go @@ -3,12 +3,16 @@ package analyzer import ( "context" + "github.com/cloudwego/eino/components" "github.com/cloudwego/eino/components/model" "github.com/cloudwego/eino/schema" einoacp "github.com/strrl/eino-acp" ) -var _ model.ToolCallingChatModel = (*acpToolCallingModel)(nil) +var ( + _ model.ToolCallingChatModel = (*acpToolCallingModel)(nil) + _ components.Checker = (*acpToolCallingModel)(nil) +) // acpToolCallingModel adapts eino-acp ChatModel to ToolCallingChatModel. // ACP agents manage tools in their own runtime, so WithTools is a no-op. @@ -20,6 +24,10 @@ func newACPToolCallingModel(base *einoacp.ChatModel) model.ToolCallingChatModel return &acpToolCallingModel{base: base} } +func (m *acpToolCallingModel) IsCallbacksEnabled() bool { + return true +} + func (m *acpToolCallingModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (*schema.Message, error) { return m.base.Generate(ctx, input, opts...) } diff --git a/pkg/analyzer/analyzer.go b/pkg/analyzer/analyzer.go index 35a8635..29ac49c 100644 --- a/pkg/analyzer/analyzer.go +++ b/pkg/analyzer/analyzer.go @@ -7,20 +7,21 @@ import ( "path/filepath" "strings" - "github.com/cloudwego/eino-ext/adk/backend/local" "github.com/cloudwego/eino/adk" - fsmw "github.com/cloudwego/eino/adk/middlewares/filesystem" "github.com/go-errors/errors" + "github.com/google/uuid" einoacp "github.com/strrl/eino-acp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + + "github.com/strrl/lapp/pkg/tape" + "github.com/strrl/lapp/pkg/tracing" ) func buildSystemPrompt(workDir string) string { return fmt.Sprintf(`You are a log analysis expert helping developers troubleshoot issues. -IMPORTANT: All file operations (read_file, grep, ls, glob, execute) MUST use paths under %s. -Do NOT access files outside this workspace directory. +IMPORTANT: Stay within the workspace directory %s for any file or shell work (your runtime provides the tools). Your workspace contains pre-processed log data at %s: - %s/raw.log — the original log file @@ -28,8 +29,8 @@ Your workspace contains pre-processed log data at %s: - %s/errors.txt — error and warning patterns extracted from logs Start by reading %s/summary.txt and %s/errors.txt to understand the log patterns. -Then use grep and read_file on %s/raw.log to investigate specific patterns in detail. -You can also use the execute tool to run shell commands (e.g., awk, sort, wc) for deeper analysis. +Then search and read %s/raw.log for specifics (grep, read, or equivalents your environment exposes). +Use shell only when it helps (e.g. awk, sort, wc). Provide: 1. Key findings from the logs @@ -51,8 +52,7 @@ type Config struct { func BuildWorkspaceSystemPrompt(workDir string) string { return fmt.Sprintf(`You are a log analysis expert helping developers troubleshoot issues. -IMPORTANT: All file operations (read_file, grep, ls, glob, execute) MUST use paths under %s. -Do NOT access files outside this workspace directory. +IMPORTANT: Stay within the workspace directory %s for any file or shell work (your runtime provides the tools). Your workspace at %s contains structured log data: - %s/logs/ — original log files @@ -64,8 +64,8 @@ Your workspace at %s contains structured log data: Start by reading %s/notes/summary.md and %s/notes/errors.md to understand the log patterns. Then drill into specific patterns under %s/patterns/ for details. -Use grep on %s/logs/ to search for specific terms across all log files. -You can also use the execute tool to run shell commands (e.g., awk, sort, wc) for deeper analysis. +Search %s/logs/ for specific terms across log files. +Use shell only when it helps (e.g., awk, sort, wc). Provide: 1. Key findings from the logs @@ -114,30 +114,28 @@ func RunAgentWithPrompt(ctx context.Context, config Config, workDir, question, s return "", errors.Errorf("create chat model: %w", err) } - backend, err := local.NewBackend(ctx, &local.Config{}) - if err != nil { - return "", errors.Errorf("create local backend: %w", err) + if systemPrompt == "" { + systemPrompt = buildSystemPrompt(absDir) } - backendAdapter := newLocalBackendAdapter(backend) - fsHandler, err := fsmw.New(ctx, &fsmw.MiddlewareConfig{ - Backend: backendAdapter, - StreamingShell: backendAdapter, - }) + tapePath := filepath.Join(absDir, tape.FileName) + tapeStore, err := tape.OpenJSONL(tapePath) if err != nil { - return "", errors.Errorf("create filesystem middleware: %w", err) + return "", errors.Errorf("open tape store: %w", err) } + defer func() { _ = tapeStore.Close() }() - if systemPrompt == "" { - systemPrompt = buildSystemPrompt(absDir) - } + tapeHandler := tape.NewEinoHandler(tapeStore, tape.RunMeta{ + RunID: uuid.NewString(), + Provider: provider, + Model: config.Model, + }) agent, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{ Name: "log-analyzer", Description: "Analyzes log files to find root causes", Instruction: systemPrompt, Model: newACPToolCallingModel(chatModel), - Handlers: []adk.ChatModelAgentMiddleware{fsHandler}, MaxIterations: 15, }) if err != nil { @@ -150,7 +148,7 @@ func RunAgentWithPrompt(ctx context.Context, config Config, workDir, question, s } runner := adk.NewRunner(ctx, adk.RunnerConfig{Agent: agent}) - iter := runner.Query(ctx, userMessage) + iter := runner.Query(ctx, userMessage, adk.WithCallbacks(tracing.NewSlogEinoHandler(nil), tapeHandler)) var result strings.Builder for { diff --git a/pkg/analyzer/local_backend_adapter.go b/pkg/analyzer/local_backend_adapter.go deleted file mode 100644 index 22247fe..0000000 --- a/pkg/analyzer/local_backend_adapter.go +++ /dev/null @@ -1,52 +0,0 @@ -package analyzer - -import ( - "context" - - "github.com/cloudwego/eino-ext/adk/backend/local" - "github.com/cloudwego/eino/adk/filesystem" - "github.com/cloudwego/eino/schema" -) - -var _ filesystem.Backend = (*localBackendAdapter)(nil) -var _ filesystem.StreamingShell = (*localBackendAdapter)(nil) - -type localBackendAdapter struct { - base *local.Local -} - -func newLocalBackendAdapter(base *local.Local) *localBackendAdapter { - return &localBackendAdapter{base: base} -} - -func (a *localBackendAdapter) LsInfo(ctx context.Context, req *filesystem.LsInfoRequest) ([]filesystem.FileInfo, error) { - return a.base.LsInfo(ctx, req) -} - -func (a *localBackendAdapter) Read(ctx context.Context, req *filesystem.ReadRequest) (*filesystem.FileContent, error) { - content, err := a.base.Read(ctx, req) - if err != nil { - return nil, err - } - return &filesystem.FileContent{Content: content}, nil -} - -func (a *localBackendAdapter) GrepRaw(ctx context.Context, req *filesystem.GrepRequest) ([]filesystem.GrepMatch, error) { - return a.base.GrepRaw(ctx, req) -} - -func (a *localBackendAdapter) GlobInfo(ctx context.Context, req *filesystem.GlobInfoRequest) ([]filesystem.FileInfo, error) { - return a.base.GlobInfo(ctx, req) -} - -func (a *localBackendAdapter) Write(ctx context.Context, req *filesystem.WriteRequest) error { - return a.base.Write(ctx, req) -} - -func (a *localBackendAdapter) Edit(ctx context.Context, req *filesystem.EditRequest) error { - return a.base.Edit(ctx, req) -} - -func (a *localBackendAdapter) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) { - return a.base.ExecuteStreaming(ctx, input) -} diff --git a/pkg/tape/eino_handler.go b/pkg/tape/eino_handler.go new file mode 100644 index 0000000..da28d0b --- /dev/null +++ b/pkg/tape/eino_handler.go @@ -0,0 +1,387 @@ +package tape + +import ( + "context" + "errors" + "io" + "log" + "runtime/debug" + + "github.com/bytedance/sonic" + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/components" + "github.com/cloudwego/eino/components/model" + "github.com/cloudwego/eino/components/tool" + "github.com/cloudwego/eino/schema" + goerrors "github.com/go-errors/errors" +) + +type RunMeta struct { + RunID string + Provider string + Model string +} + +type toolCtxKey struct{} + +type pendingTool struct { + Name string + Args string +} + +type EinoHandler struct { + store *JSONLStore + meta RunMeta + recordedMsg int +} + +func NewEinoHandler(store *JSONLStore, meta RunMeta) *EinoHandler { + return &EinoHandler{store: store, meta: meta} +} + +func (h *EinoHandler) baseMeta() map[string]any { + m := map[string]any{"run_id": h.meta.RunID} + if h.meta.Provider != "" { + m["provider"] = h.meta.Provider + } + if h.meta.Model != "" { + m["model"] = h.meta.Model + } + return m +} + +func (h *EinoHandler) write(e Entry) { + if err := h.store.Append(e); err != nil { + log.Printf("tape append: %v", err) + } +} + +func (h *EinoHandler) recordMessagesDelta(msgs []*schema.Message) { + if len(msgs) <= h.recordedMsg { + return + } + for i := h.recordedMsg; i < len(msgs); i++ { + m := msgs[i] + if m == nil { + continue + } + if m.Role == schema.Assistant { + continue + } + h.write(Message(messageToMap(m), h.baseMeta())) + } + h.recordedMsg = len(msgs) +} + +func (h *EinoHandler) Needed(ctx context.Context, info *callbacks.RunInfo, timing callbacks.CallbackTiming) bool { + _ = ctx + _ = info + switch timing { + case callbacks.TimingOnStart, callbacks.TimingOnEnd, callbacks.TimingOnError, + callbacks.TimingOnStartWithStreamInput, callbacks.TimingOnEndWithStreamOutput: + return true + default: + return false + } +} + +func (h *EinoHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { + if info == nil { + return ctx + } + switch info.Component { + case components.ComponentOfChatModel: + mIn := model.ConvCallbackInput(input) + if mIn == nil { + return ctx + } + h.recordMessagesDelta(mIn.Messages) + case components.ComponentOfTool: + tIn := tool.ConvCallbackInput(input) + if tIn == nil { + return ctx + } + return context.WithValue(ctx, toolCtxKey{}, &pendingTool{Name: info.Name, Args: tIn.ArgumentsInJSON}) + default: + } + return ctx +} + +func (h *EinoHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + if info == nil { + return ctx + } + switch info.Component { + case components.ComponentOfChatModel: + mOut := model.ConvCallbackOutput(output) + if mOut == nil || mOut.Message == nil { + return ctx + } + h.write(Message(messageToMap(mOut.Message), h.baseMeta())) + runData := map[string]any{"status": "ok"} + if mOut.TokenUsage != nil { + runData["usage"] = map[string]any{ + "prompt_tokens": mOut.TokenUsage.PromptTokens, + "completion_tokens": mOut.TokenUsage.CompletionTokens, + "total_tokens": mOut.TokenUsage.TotalTokens, + } + } + if h.meta.Provider != "" { + runData["provider"] = h.meta.Provider + } + if h.meta.Model != "" { + runData["model"] = h.meta.Model + } + h.write(Event("run", runData, h.baseMeta())) + case components.ComponentOfTool: + pend, _ := ctx.Value(toolCtxKey{}).(*pendingTool) + name := info.Name + args := "" + if pend != nil { + name = pend.Name + args = pend.Args + } + tOut := tool.ConvCallbackOutput(output) + res := "" + if tOut != nil { + if tOut.Response != "" { + res = tOut.Response + } else if tOut.ToolOutput != nil { + if b, err := sonic.MarshalString(tOut.ToolOutput); err == nil { + res = b + } + } + } + h.write(ToolCall([]map[string]any{{ + "name": name, + "arguments": args, + }}, h.baseMeta())) + h.write(ToolResult([]any{res}, h.baseMeta())) + default: + } + return ctx +} + +func (h *EinoHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { + if info == nil || err == nil { + return ctx + } + ex := map[string]any{"component": string(info.Component), "node": info.Name} + h.write(ErrorPayload(err.Error(), ex, h.baseMeta())) + if info.Component == components.ComponentOfChatModel { + h.write(Event("run", map[string]any{ + "status": "error", + "provider": h.meta.Provider, + "model": h.meta.Model, + }, h.baseMeta())) + } + return ctx +} + +func (h *EinoHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context { + if info == nil || input == nil { + return ctx + } + if info.Component != components.ComponentOfChatModel { + return ctx + } + go func() { + defer func() { + if e := recover(); e != nil { + log.Printf("tape stream input panic: %v\n%s", e, string(debug.Stack())) + } + input.Close() + }() + var chunks []callbacks.CallbackInput + for { + ch, err := input.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + log.Printf("tape stream input recv: %v", err) + return + } + chunks = append(chunks, ch) + } + ins := convModelCallbackInputs(chunks) + _, msgs, _, err := extractModelInput(ins) + if err != nil { + log.Printf("tape extract model input: %v", err) + return + } + h.recordMessagesDelta(msgs) + }() + return ctx +} + +func (h *EinoHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { + if info == nil || output == nil { + return ctx + } + if info.Component != components.ComponentOfChatModel { + return ctx + } + go func() { + defer func() { + if e := recover(); e != nil { + log.Printf("tape stream output panic: %v\n%s", e, string(debug.Stack())) + } + output.Close() + }() + var chunks []callbacks.CallbackOutput + for { + ch, err := output.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + log.Printf("tape stream output recv: %v", err) + break + } + chunks = append(chunks, ch) + } + outs := convModelCallbackOutputs(chunks) + usage, msg, _, err := extractModelOutput(outs) + if err != nil { + log.Printf("tape extract model output: %v", err) + return + } + if msg != nil { + h.write(Message(messageToMap(msg), h.baseMeta())) + } + runData := map[string]any{"status": "ok"} + if usage != nil { + runData["usage"] = map[string]any{ + "prompt_tokens": usage.PromptTokens, + "completion_tokens": usage.CompletionTokens, + "total_tokens": usage.TotalTokens, + } + } + if h.meta.Provider != "" { + runData["provider"] = h.meta.Provider + } + if h.meta.Model != "" { + runData["model"] = h.meta.Model + } + h.write(Event("run", runData, h.baseMeta())) + }() + return ctx +} + +func messageToMap(m *schema.Message) map[string]any { + if m == nil { + return map[string]any{} + } + raw, err := sonic.Marshal(m) + if err != nil { + return map[string]any{"role": string(m.Role), "content": m.Content} + } + var out map[string]any + if err := sonic.Unmarshal(raw, &out); err != nil { + return map[string]any{"role": string(m.Role), "content": m.Content} + } + return out +} + +func convModelCallbackInputs(in []callbacks.CallbackInput) []*model.CallbackInput { + ret := make([]*model.CallbackInput, len(in)) + for i, c := range in { + ret[i] = model.ConvCallbackInput(c) + } + return ret +} + +func convModelCallbackOutputs(out []callbacks.CallbackOutput) []*model.CallbackOutput { + ret := make([]*model.CallbackOutput, len(out)) + for i, c := range out { + ret[i] = model.ConvCallbackOutput(c) + } + return ret +} + +func extractModelInput(ins []*model.CallbackInput) (config *model.Config, messages []*schema.Message, extra map[string]any, err error) { + var mas [][]*schema.Message + for _, in := range ins { + if in == nil { + continue + } + if len(in.Messages) > 0 { + mas = append(mas, in.Messages) + } + if len(in.Extra) > 0 { + extra = in.Extra + } + if in.Config != nil { + config = in.Config + } + } + if len(mas) == 0 { + return config, []*schema.Message{}, extra, nil + } + messages, err = concatMessageArrays(mas) + if err != nil { + return nil, nil, nil, err + } + return config, messages, extra, nil +} + +func extractModelOutput(outs []*model.CallbackOutput) (usage *model.TokenUsage, message *schema.Message, extra map[string]any, err error) { + var mas []*schema.Message + for _, out := range outs { + if out == nil { + continue + } + if out.TokenUsage != nil { + usage = out.TokenUsage + } + if out.Message != nil { + mas = append(mas, out.Message) + } + if out.Extra != nil { + extra = out.Extra + } + } + if len(mas) == 0 { + return usage, &schema.Message{}, extra, nil + } + message, err = schema.ConcatMessages(mas) + if err != nil { + return nil, nil, nil, err + } + return usage, message, extra, nil +} + +func concatMessageArrays(mas [][]*schema.Message) ([]*schema.Message, error) { + if len(mas) == 0 { + return nil, nil + } + arrayLen := len(mas[0]) + ret := make([]*schema.Message, arrayLen) + slicesToConcat := make([][]*schema.Message, arrayLen) + for _, ma := range mas { + if len(ma) != arrayLen { + return nil, goerrors.Errorf("mismatch streamed message batch length: got %d want %d", len(ma), arrayLen) + } + for i := 0; i < arrayLen; i++ { + if ma[i] != nil { + slicesToConcat[i] = append(slicesToConcat[i], ma[i]) + } + } + } + for i, slice := range slicesToConcat { + switch len(slice) { + case 0: + ret[i] = nil + case 1: + ret[i] = slice[0] + default: + cm, err := schema.ConcatMessages(slice) + if err != nil { + return nil, err + } + ret[i] = cm + } + } + return ret, nil +} diff --git a/pkg/tape/eino_handler_test.go b/pkg/tape/eino_handler_test.go new file mode 100644 index 0000000..97c8d2a --- /dev/null +++ b/pkg/tape/eino_handler_test.go @@ -0,0 +1,11 @@ +package tape + +import ( + "testing" + + "github.com/cloudwego/eino/callbacks" +) + +func TestEinoHandlerImplementsCallbackHandler(t *testing.T) { + var _ callbacks.Handler = (*EinoHandler)(nil) +} diff --git a/pkg/tape/entry.go b/pkg/tape/entry.go new file mode 100644 index 0000000..2bc5329 --- /dev/null +++ b/pkg/tape/entry.go @@ -0,0 +1,75 @@ +// Package tape implements append-only audit logs (“tape-first”) aligned with Republic’s +// tape schema (https://github.com/bubbuild/republic, src/republic/tape; concepts at https://getrepublic.org). +package tape + +import ( + "time" +) + +type Entry struct { + ID int64 `json:"id"` + Kind string `json:"kind"` + Payload map[string]any `json:"payload"` + Meta map[string]any `json:"meta,omitempty"` + Date string `json:"date"` +} + +func newEntry(kind string, payload, meta map[string]any) Entry { + if payload == nil { + payload = map[string]any{} + } + if meta == nil { + meta = map[string]any{} + } + return Entry{ + ID: 0, + Kind: kind, + Payload: payload, + Meta: meta, + Date: time.Now().UTC().Format(time.RFC3339Nano), + } +} + +func Message(msg, meta map[string]any) Entry { + p := map[string]any{} + for k, v := range msg { + p[k] = v + } + return newEntry("message", p, meta) +} + +func System(content string, meta map[string]any) Entry { + return newEntry("system", map[string]any{"content": content}, meta) +} + +func Anchor(name string, state, meta map[string]any) Entry { + p := map[string]any{"name": name} + if state != nil { + p["state"] = state + } + return newEntry("anchor", p, meta) +} + +func ToolCall(calls []map[string]any, meta map[string]any) Entry { + return newEntry("tool_call", map[string]any{"calls": calls}, meta) +} + +func ToolResult(results []any, meta map[string]any) Entry { + return newEntry("tool_result", map[string]any{"results": results}, meta) +} + +func ErrorPayload(message string, extra, meta map[string]any) Entry { + p := map[string]any{"message": message} + for k, v := range extra { + p[k] = v + } + return newEntry("error", p, meta) +} + +func Event(name string, data, meta map[string]any) Entry { + p := map[string]any{"name": name} + if data != nil { + p["data"] = data + } + return newEntry("event", p, meta) +} diff --git a/pkg/tape/jsonl.go b/pkg/tape/jsonl.go new file mode 100644 index 0000000..440b4a8 --- /dev/null +++ b/pkg/tape/jsonl.go @@ -0,0 +1,94 @@ +package tape + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "sync" +) + +const FileName = ".tape.jsonl" + +type JSONLStore struct { + path string + file *os.File + mu sync.Mutex + nextID int64 +} + +func OpenJSONL(path string) (*JSONLStore, error) { + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return nil, err + } + nextID, err := scanMaxID(path) + if err != nil { + return nil, err + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + return nil, err + } + return &JSONLStore{path: path, file: f, nextID: nextID}, nil +} + +func scanMaxID(path string) (int64, error) { + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + return 1, nil + } + return 0, err + } + defer f.Close() + + var maxID int64 + sc := bufio.NewScanner(f) + for sc.Scan() { + var row struct { + ID int64 `json:"id"` + } + if json.Unmarshal(sc.Bytes(), &row) == nil && row.ID > maxID { + maxID = row.ID + } + } + if err := sc.Err(); err != nil { + return 0, err + } + if maxID == 0 { + return 1, nil + } + return maxID + 1, nil +} + +func (s *JSONLStore) Append(e Entry) error { + s.mu.Lock() + defer s.mu.Unlock() + + e.ID = s.nextID + s.nextID++ + + enc, err := json.Marshal(e) + if err != nil { + return err + } + if _, err := s.file.Write(append(enc, '\n')); err != nil { + return err + } + return s.file.Sync() +} + +func (s *JSONLStore) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.file == nil { + return nil + } + err := s.file.Close() + s.file = nil + return err +} + +func (s *JSONLStore) Path() string { + return s.path +} diff --git a/pkg/tape/jsonl_test.go b/pkg/tape/jsonl_test.go new file mode 100644 index 0000000..5d6f2e2 --- /dev/null +++ b/pkg/tape/jsonl_test.go @@ -0,0 +1,38 @@ +package tape + +import ( + "os" + "path/filepath" + "testing" +) + +func TestJSONLAppendSequentialID(t *testing.T) { + dir := t.TempDir() + p := filepath.Join(dir, FileName) + + s1, err := OpenJSONL(p) + if err != nil { + t.Fatal(err) + } + if err := s1.Append(Message(map[string]any{"role": "user", "content": "hi"}, map[string]any{"run_id": "a"})); err != nil { + t.Fatal(err) + } + if err := s1.Close(); err != nil { + t.Fatal(err) + } + + s2, err := OpenJSONL(p) + if err != nil { + t.Fatal(err) + } + defer s2.Close() + if err := s2.Append(System("sys", map[string]any{"run_id": "b"})); err != nil { + t.Fatal(err) + } + + data, err := os.ReadFile(p) + if err != nil { + t.Fatal(err) + } + t.Logf("%s", data) +} diff --git a/pkg/tracing/slog_eino.go b/pkg/tracing/slog_eino.go new file mode 100644 index 0000000..cbbea72 --- /dev/null +++ b/pkg/tracing/slog_eino.go @@ -0,0 +1,110 @@ +package tracing + +import ( + "context" + "fmt" + "log/slog" + + "github.com/bytedance/sonic" + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/schema" +) + +// SlogEinoHandler logs full callback payloads as JSON for side-by-side comparison with tape / Langfuse. +// Stream callbacks only log a placeholder (the StreamReader is not drained here). +type SlogEinoHandler struct { + Log *slog.Logger +} + +func NewSlogEinoHandler(log *slog.Logger) *SlogEinoHandler { + if log == nil { + log = slog.Default() + } + return &SlogEinoHandler{Log: log} +} + +func (h *SlogEinoHandler) Needed(context.Context, *callbacks.RunInfo, callbacks.CallbackTiming) bool { + return true +} + +func (h *SlogEinoHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { + h.logPayload(ctx, "eino.callback.OnStart", info, "input", input) + return ctx +} + +func (h *SlogEinoHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + h.logPayload(ctx, "eino.callback.OnEnd", info, "output", output) + return ctx +} + +func (h *SlogEinoHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { + if err == nil { + return ctx + } + attrs := []any{ + slog.String("err", err.Error()), + } + if info != nil { + attrs = append(attrs, + slog.String("node", info.Name), + slog.String("component", string(info.Component)), + slog.String("graph_type", info.Type), + ) + } + h.Log.WarnContext(ctx, "eino.callback.OnError", attrs...) + return ctx +} + +func (h *SlogEinoHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context { + h.logPayload(ctx, "eino.callback.OnStartWithStreamInput", info, "input_stream", streamInputNote(input)) + return ctx +} + +func (h *SlogEinoHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { + h.logPayload(ctx, "eino.callback.OnEndWithStreamOutput", info, "output_stream", streamOutputNote(output)) + return ctx +} + +func streamInputNote(s *schema.StreamReader[callbacks.CallbackInput]) any { + if s == nil { + return map[string]any{"kind": "StreamReader[CallbackInput]", "nil": true} + } + return map[string]any{ + "kind": "StreamReader[CallbackInput]", + "note": "not drained here; compare non-stream OnStart/OnEnd JSON or tape", + } +} + +func streamOutputNote(s *schema.StreamReader[callbacks.CallbackOutput]) any { + if s == nil { + return map[string]any{"kind": "StreamReader[CallbackOutput]", "nil": true} + } + return map[string]any{ + "kind": "StreamReader[CallbackOutput]", + "note": "not drained here; compare non-stream callbacks or tape", + } +} + +func (h *SlogEinoHandler) logPayload(ctx context.Context, msg string, info *callbacks.RunInfo, field string, v any) { + attrs := []any{} + if info != nil { + attrs = append(attrs, + slog.String("node", info.Name), + slog.String("component", string(info.Component)), + slog.String("graph_type", info.Type), + ) + } + attrs = append(attrs, slog.String(field, marshalCallbackPayload(v))) + h.Log.InfoContext(ctx, msg, attrs...) +} + +func marshalCallbackPayload(v any) string { + if v == nil { + return "null" + } + b, err := sonic.MarshalString(v) + if err != nil { + return fmt.Sprintf("%q", fmt.Sprintf("", err, v)) + } + return b +} diff --git a/pkg/tracing/slog_eino_test.go b/pkg/tracing/slog_eino_test.go new file mode 100644 index 0000000..e85414e --- /dev/null +++ b/pkg/tracing/slog_eino_test.go @@ -0,0 +1,12 @@ +package tracing + +import ( + "testing" + + "github.com/cloudwego/eino/callbacks" +) + +func TestSlogEinoHandlerImplementsCallbackHandler(t *testing.T) { + t.Helper() + var _ callbacks.Handler = (*SlogEinoHandler)(nil) +} From 4e029860ef29550eefb3acec1502ca6864ba1d09 Mon Sep 17 00:00:00 2001 From: Zhiqiang ZHOU Date: Sat, 4 Apr 2026 17:57:24 -0700 Subject: [PATCH 2/2] fix: increase scanner buffer limit and track stream errors in tape (babysit) - scanMaxID: increase bufio.Scanner buffer to 10MB to handle large tool outputs that exceed the default 64KB token limit - OnEndWithStreamOutput: track stream read errors and emit run event with "error" status instead of always reporting "ok" --- pkg/tape/eino_handler.go | 8 +++++++- pkg/tape/jsonl.go | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/tape/eino_handler.go b/pkg/tape/eino_handler.go index da28d0b..b2e408a 100644 --- a/pkg/tape/eino_handler.go +++ b/pkg/tape/eino_handler.go @@ -230,6 +230,7 @@ func (h *EinoHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks output.Close() }() var chunks []callbacks.CallbackOutput + var streamErr error for { ch, err := output.Recv() if errors.Is(err, io.EOF) { @@ -237,6 +238,7 @@ func (h *EinoHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks } if err != nil { log.Printf("tape stream output recv: %v", err) + streamErr = err break } chunks = append(chunks, ch) @@ -250,7 +252,11 @@ func (h *EinoHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks if msg != nil { h.write(Message(messageToMap(msg), h.baseMeta())) } - runData := map[string]any{"status": "ok"} + status := "ok" + if streamErr != nil { + status = "error" + } + runData := map[string]any{"status": status} if usage != nil { runData["usage"] = map[string]any{ "prompt_tokens": usage.PromptTokens, diff --git a/pkg/tape/jsonl.go b/pkg/tape/jsonl.go index 440b4a8..2e8c135 100644 --- a/pkg/tape/jsonl.go +++ b/pkg/tape/jsonl.go @@ -44,6 +44,8 @@ func scanMaxID(path string) (int64, error) { var maxID int64 sc := bufio.NewScanner(f) + // Allow entries up to 10 MB to handle large tool outputs in tape lines. + sc.Buffer(make([]byte, 0, 64*1024), 10*1024*1024) for sc.Scan() { var row struct { ID int64 `json:"id"`