diff --git a/cmd/lapp/main.go b/cmd/lapp/main.go index 4ad3ebc..d3e310e 100644 --- a/cmd/lapp/main.go +++ b/cmd/lapp/main.go @@ -4,8 +4,10 @@ import ( "context" "os" + "github.com/cloudwego/eino/callbacks" "github.com/joho/godotenv" "github.com/spf13/cobra" + "github.com/strrl/lapp/pkg/tape" "github.com/strrl/lapp/pkg/tracing" ) @@ -15,6 +17,7 @@ func main() { flush := tracing.InitLangfuse() otelShutdown := tracing.InitOTel(context.Background()) + callbacks.AppendGlobalHandlers(tape.NewSlogHandler()) root := &cobra.Command{ Use: "lapp", diff --git a/cmd/lapp/workspace.go b/cmd/lapp/workspace.go index a7dee28..ac03d4c 100644 --- a/cmd/lapp/workspace.go +++ b/cmd/lapp/workspace.go @@ -392,9 +392,11 @@ func runWorkspaceAnalyze(cmd *cobra.Command, args []string) error { return errors.Errorf("resolve workspace dir: %w", err) } + tapePath := filepath.Join(absDir, ".tape.jsonl") config := analyzer.Config{ Provider: analyzeWsACP, Model: analyzeWsModel, + TapePath: tapePath, } prompt := analyzer.BuildWorkspaceSystemPrompt(absDir) diff --git a/pkg/analyzer/analyzer.go b/pkg/analyzer/analyzer.go index 35a8635..0621cfe 100644 --- a/pkg/analyzer/analyzer.go +++ b/pkg/analyzer/analyzer.go @@ -10,8 +10,10 @@ import ( "github.com/cloudwego/eino-ext/adk/backend/local" "github.com/cloudwego/eino/adk" fsmw "github.com/cloudwego/eino/adk/middlewares/filesystem" + "github.com/cloudwego/eino/callbacks" "github.com/go-errors/errors" einoacp "github.com/strrl/eino-acp" + "github.com/strrl/lapp/pkg/tape" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" ) @@ -45,6 +47,8 @@ Be concise and actionable. Focus on what matters.`, type Config struct { Provider string Model string + // TapePath, when set, enables tape recording to this JSONL file. + TapePath string } // BuildWorkspaceSystemPrompt builds a system prompt for the structured workspace layout. @@ -103,6 +107,12 @@ func RunAgentWithPrompt(ctx context.Context, config Config, workDir, question, s attribute.String("model", config.Model), ) + if config.TapePath != "" { + jsonlStore := tape.NewJSONLStore(config.TapePath) + callbacks.AppendGlobalHandlers(tape.NewHandler(jsonlStore)) + slog.Info("Tape recording enabled", "path", config.TapePath) + } + slog.Info("Analyzing with ACP provider", "provider", provider, "model", config.Model) chatModel, err := einoacp.NewChatModel(ctx, &einoacp.Config{ diff --git a/pkg/tape/entry.go b/pkg/tape/entry.go new file mode 100644 index 0000000..7271b8d --- /dev/null +++ b/pkg/tape/entry.go @@ -0,0 +1,76 @@ +package tape + +import "time" + +// Entry is a single append-only entry in a tape, modeled after republic's TapeEntry. +type Entry struct { + ID int `json:"id"` + Kind string `json:"kind"` + Payload map[string]any `json:"payload"` + Meta map[string]any `json:"meta,omitempty"` + Date string `json:"date"` +} + +// Entry kinds. +const ( + KindMessage = "message" + KindSystem = "system" + KindToolCall = "tool_call" + KindToolResult = "tool_result" + KindError = "error" + KindEvent = "event" +) + +func newEntry(kind string, payload, meta map[string]any) Entry { + return Entry{ + Kind: kind, + Payload: payload, + Meta: meta, + Date: time.Now().UTC().Format(time.RFC3339Nano), + } +} + +// MessageEntry creates a message entry. +func MessageEntry(role, content string, meta map[string]any) Entry { + return newEntry(KindMessage, map[string]any{ + "role": role, + "content": content, + }, meta) +} + +// SystemEntry creates a system prompt entry. +func SystemEntry(content string, meta map[string]any) Entry { + return newEntry(KindSystem, map[string]any{ + "content": content, + }, meta) +} + +// ToolCallEntry creates a tool call entry. +func ToolCallEntry(calls []map[string]any, meta map[string]any) Entry { + return newEntry(KindToolCall, map[string]any{ + "calls": calls, + }, meta) +} + +// ToolResultEntry creates a tool result entry. +func ToolResultEntry(results []any, meta map[string]any) Entry { + return newEntry(KindToolResult, map[string]any{ + "results": results, + }, meta) +} + +// ErrorEntry creates an error entry. +func ErrorEntry(kind, message string, meta map[string]any) Entry { + return newEntry(KindError, map[string]any{ + "kind": kind, + "message": message, + }, meta) +} + +// EventEntry creates a generic event entry. +func EventEntry(name string, data, meta map[string]any) Entry { + return newEntry(KindEvent, map[string]any{ + "name": name, + "data": data, + }, meta) +} diff --git a/pkg/tape/handler.go b/pkg/tape/handler.go new file mode 100644 index 0000000..7b8158b --- /dev/null +++ b/pkg/tape/handler.go @@ -0,0 +1,220 @@ +package tape + +import ( + "context" + "log/slog" + + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/components" + "github.com/cloudwego/eino/components/model" + "github.com/cloudwego/eino/schema" +) + +// NewHandler creates an eino callbacks.Handler that records tape entries to the given store. +func NewHandler(store Recorder) callbacks.Handler { + return callbacks.NewHandlerBuilder(). + OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { + onStart(store, info, input) + return ctx + }). + OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + onEnd(store, info, output) + return ctx + }). + OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { + onError(store, info, err) + return ctx + }). + OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { + onEndWithStream(store, info, output) + return ctx + }). + Build() +} + +func componentMeta(info *callbacks.RunInfo) map[string]any { + return map[string]any{ + "component": string(info.Component), + "name": info.Name, + "type": info.Type, + } +} + +func onStart(store Recorder, info *callbacks.RunInfo, input callbacks.CallbackInput) { + if info == nil { + return + } + meta := componentMeta(info) + + if info.Component == components.ComponentOfChatModel { + modelInput := model.ConvCallbackInput(input) + if modelInput != nil { + recordModelInput(store, modelInput, meta) + return + } + } + + if info.Component == components.ComponentOfTool { + entry := EventEntry("tool_start", map[string]any{ + "component": string(info.Component), + "name": info.Name, + }, meta) + appendSafe(store, entry) + return + } + + entry := EventEntry("start", map[string]any{ + "component": string(info.Component), + "name": info.Name, + }, meta) + appendSafe(store, entry) +} + +func onEnd(store Recorder, info *callbacks.RunInfo, output callbacks.CallbackOutput) { + if info == nil { + return + } + meta := componentMeta(info) + + if info.Component == components.ComponentOfChatModel { + modelOutput := model.ConvCallbackOutput(output) + if modelOutput != nil { + recordModelOutput(store, modelOutput, meta) + return + } + } + + if info.Component == components.ComponentOfTool { + recordToolResult(store, output, meta) + return + } + + entry := EventEntry("end", map[string]any{ + "component": string(info.Component), + "name": info.Name, + }, meta) + appendSafe(store, entry) +} + +func onError(store Recorder, info *callbacks.RunInfo, err error) { + if info == nil { + return + } + meta := componentMeta(info) + entry := ErrorEntry(string(info.Component), err.Error(), meta) + appendSafe(store, entry) +} + +func onEndWithStream(store Recorder, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) { + if info == nil { + return + } + meta := componentMeta(info) + + // Consume the stream in a goroutine to avoid blocking + go func() { + defer func() { + if r := recover(); r != nil { + slog.Warn("tape: panic consuming stream", "recover", r) + } + }() + + var lastOutput callbacks.CallbackOutput + for { + chunk, err := output.Recv() + if err != nil { + break + } + lastOutput = chunk + } + output.Close() + + if lastOutput == nil { + return + } + + if info.Component == components.ComponentOfChatModel { + modelOutput := model.ConvCallbackOutput(lastOutput) + if modelOutput != nil { + recordModelOutput(store, modelOutput, meta) + return + } + } + + entry := EventEntry("end", map[string]any{ + "component": string(info.Component), + "name": info.Name, + }, meta) + appendSafe(store, entry) + }() +} + +func recordModelInput(store Recorder, input *model.CallbackInput, meta map[string]any) { + // Record system message separately if present + for _, msg := range input.Messages { + if msg.Role == schema.System { + appendSafe(store, SystemEntry(msg.Content, meta)) + continue + } + appendSafe(store, MessageEntry(string(msg.Role), msg.Content, meta)) + } + + // Record tool calls from messages + if len(input.Tools) > 0 { + var tools []map[string]any + for _, t := range input.Tools { + tools = append(tools, map[string]any{ + "name": t.Name, + "desc": t.Desc, + }) + } + appendSafe(store, EventEntry("tools_available", map[string]any{ + "tools": tools, + }, meta)) + } +} + +func recordModelOutput(store Recorder, output *model.CallbackOutput, meta map[string]any) { + if output.TokenUsage != nil { + meta["token_usage"] = map[string]any{ + "prompt_tokens": output.TokenUsage.PromptTokens, + "completion_tokens": output.TokenUsage.CompletionTokens, + "total_tokens": output.TokenUsage.TotalTokens, + } + } + + if output.Message != nil { + // Record tool calls if present + if len(output.Message.ToolCalls) > 0 { + var calls []map[string]any + for _, tc := range output.Message.ToolCalls { + calls = append(calls, map[string]any{ + "id": tc.ID, + "function": tc.Function.Name, + "args": tc.Function.Arguments, + }) + } + appendSafe(store, ToolCallEntry(calls, meta)) + } + + // Record assistant message + if output.Message.Content != "" { + appendSafe(store, MessageEntry(string(output.Message.Role), output.Message.Content, meta)) + } + } +} + +func recordToolResult(store Recorder, output callbacks.CallbackOutput, meta map[string]any) { + // Tool output is typically a string + if s, ok := output.(string); ok { + appendSafe(store, ToolResultEntry([]any{s}, meta)) + return + } + appendSafe(store, ToolResultEntry([]any{output}, meta)) +} + +func appendSafe(store Recorder, entry Entry) { + if err := store.Append(entry); err != nil { + slog.Warn("tape: failed to append entry", "err", err, "kind", entry.Kind) + } +} diff --git a/pkg/tape/slog_handler.go b/pkg/tape/slog_handler.go new file mode 100644 index 0000000..ddda737 --- /dev/null +++ b/pkg/tape/slog_handler.go @@ -0,0 +1,149 @@ +package tape + +import ( + "context" + "encoding/json" + "log/slog" + + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/components" + "github.com/cloudwego/eino/components/model" + "github.com/cloudwego/eino/schema" +) + +// NewSlogHandler creates an eino callbacks.Handler that logs all callback +// events directly via slog. Register it with callbacks.AppendGlobalHandlers. +func NewSlogHandler() callbacks.Handler { + return callbacks.NewHandlerBuilder(). + OnStartFn(slogOnStart). + OnEndFn(slogOnEnd). + OnErrorFn(slogOnError). + OnEndWithStreamOutputFn(slogOnEndWithStream). + Build() +} + +func slogOnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { + if info == nil { + return ctx + } + if info.Component == components.ComponentOfChatModel { + if mi := model.ConvCallbackInput(input); mi != nil { + for _, msg := range mi.Messages { + slog.Info("tape.start", + "component", string(info.Component), + "name", info.Name, + "role", string(msg.Role), + "content", truncate(msg.Content, 200), + "tool_calls", len(msg.ToolCalls), + ) + } + return ctx + } + } + + slog.Info("tape.start", + "component", string(info.Component), + "name", info.Name, + "type", info.Type, + "input", formatAny(input), + ) + return ctx +} + +func slogOnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + if info == nil { + return ctx + } + if info.Component == components.ComponentOfChatModel { + if mo := model.ConvCallbackOutput(output); mo != nil { + attrs := []any{ + "component", string(info.Component), + "name", info.Name, + } + if mo.Message != nil { + attrs = append(attrs, + "role", string(mo.Message.Role), + "content", truncate(mo.Message.Content, 200), + "tool_calls", len(mo.Message.ToolCalls), + ) + } + if mo.TokenUsage != nil { + attrs = append(attrs, + "prompt_tokens", mo.TokenUsage.PromptTokens, + "completion_tokens", mo.TokenUsage.CompletionTokens, + "total_tokens", mo.TokenUsage.TotalTokens, + ) + } + slog.Info("tape.end", attrs...) + return ctx + } + } + + slog.Info("tape.end", + "component", string(info.Component), + "name", info.Name, + "type", info.Type, + "output", formatAny(output), + ) + return ctx +} + +func slogOnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { + if info == nil { + return ctx + } + slog.Error("tape.error", + "component", string(info.Component), + "name", info.Name, + "err", err, + ) + return ctx +} + +func slogOnEndWithStream(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { + if info == nil { + return ctx + } + go func() { + defer func() { + if r := recover(); r != nil { + slog.Warn("tape.stream: panic", "recover", r) + } + }() + + var last callbacks.CallbackOutput + for { + chunk, err := output.Recv() + if err != nil { + break + } + last = chunk + } + output.Close() + + if last == nil { + return + } + + slogOnEnd(context.Background(), info, last) + }() + return ctx +} + +func truncate(s string, limit int) string { + if len(s) <= limit { + return s + } + return s[:limit] + "..." +} + +func formatAny(v any) string { + if v == nil { + return "" + } + b, err := json.Marshal(v) + if err != nil { + return "" + } + return truncate(string(b), 300) +} diff --git a/pkg/tape/store.go b/pkg/tape/store.go new file mode 100644 index 0000000..18f52ce --- /dev/null +++ b/pkg/tape/store.go @@ -0,0 +1,83 @@ +package tape + +import ( + "bufio" + "encoding/json" + "os" + "sync" +) + +// Recorder is the interface for appending tape entries. +type Recorder interface { + Append(entry Entry) error +} + +var _ Recorder = (*JSONLStore)(nil) + +// JSONLStore is an append-only tape store that writes entries as JSONL to a file. +type JSONLStore struct { + mu sync.Mutex + path string + nextID int +} + +// NewJSONLStore creates a new JSONL store writing to the given file path. +// It scans any existing file to resume IDs after the current maximum. +func NewJSONLStore(path string) *JSONLStore { + nextID := scanMaxID(path) + return &JSONLStore{ + path: path, + nextID: nextID, + } +} + +// scanMaxID reads an existing JSONL file and returns the next ID to use. +// Returns 1 if the file does not exist or contains no entries. +func scanMaxID(path string) int { + f, err := os.Open(path) + if err != nil { + return 1 + } + defer f.Close() + + var maxID int + sc := bufio.NewScanner(f) + sc.Buffer(make([]byte, 0, 64*1024), 10*1024*1024) + for sc.Scan() { + var row struct { + ID int `json:"id"` + } + if json.Unmarshal(sc.Bytes(), &row) == nil && row.ID > maxID { + maxID = row.ID + } + } + if maxID == 0 { + return 1 + } + return maxID + 1 +} + +// Append adds an entry to the tape, assigns it an ID, and writes it as a JSON line. +func (s *JSONLStore) Append(entry Entry) error { + s.mu.Lock() + defer s.mu.Unlock() + + entry.ID = s.nextID + s.nextID++ + + line, err := json.Marshal(entry) + if err != nil { + return err + } + line = append(line, '\n') + + //nolint:gosec // tape file is not sensitive; path is controlled by the application + f, err := os.OpenFile(s.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return err + } + defer f.Close() + + _, err = f.Write(line) + return err +} diff --git a/pkg/tape/store_test.go b/pkg/tape/store_test.go new file mode 100644 index 0000000..dd9246a --- /dev/null +++ b/pkg/tape/store_test.go @@ -0,0 +1,56 @@ +package tape + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "testing" +) + +func TestJSONLStoreAppend(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, ".tape.jsonl") + + store := NewJSONLStore(path) + + if err := store.Append(MessageEntry("user", "hello", nil)); err != nil { + t.Fatalf("append message: %v", err) + } + if err := store.Append(MessageEntry("assistant", "hi there", map[string]any{"model": "test"})); err != nil { + t.Fatalf("append response: %v", err) + } + if err := store.Append(ErrorEntry("test", "something failed", nil)); err != nil { + t.Fatalf("append error: %v", err) + } + + f, err := os.Open(path) + if err != nil { + t.Fatalf("open file: %v", err) + } + defer f.Close() + + var entries []Entry + scanner := bufio.NewScanner(f) + for scanner.Scan() { + var e Entry + if err := json.Unmarshal(scanner.Bytes(), &e); err != nil { + t.Fatalf("unmarshal: %v", err) + } + entries = append(entries, e) + } + + if len(entries) != 3 { + t.Fatalf("expected 3 entries, got %d", len(entries)) + } + + if entries[0].ID != 1 || entries[0].Kind != KindMessage { + t.Errorf("entry 0: id=%d kind=%s", entries[0].ID, entries[0].Kind) + } + if entries[1].ID != 2 || entries[1].Payload["role"] != "assistant" { + t.Errorf("entry 1: id=%d role=%v", entries[1].ID, entries[1].Payload["role"]) + } + if entries[2].Kind != KindError { + t.Errorf("entry 2: kind=%s", entries[2].Kind) + } +}