Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/lapp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"os"

"github.com/cloudwego/eino/callbacks"
"github.com/joho/godotenv"
"github.com/spf13/cobra"
"github.com/strrl/lapp/pkg/tape"
"github.com/strrl/lapp/pkg/tracing"
)

Expand All @@ -15,6 +17,7 @@ func main() {

flush := tracing.InitLangfuse()
otelShutdown := tracing.InitOTel(context.Background())
callbacks.AppendGlobalHandlers(tape.NewSlogHandler())

root := &cobra.Command{
Use: "lapp",
Expand Down
2 changes: 2 additions & 0 deletions cmd/lapp/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,11 @@ func runWorkspaceAnalyze(cmd *cobra.Command, args []string) error {
return errors.Errorf("resolve workspace dir: %w", err)
}

tapePath := filepath.Join(absDir, ".tape.jsonl")
config := analyzer.Config{
Provider: analyzeWsACP,
Model: analyzeWsModel,
TapePath: tapePath,
}

prompt := analyzer.BuildWorkspaceSystemPrompt(absDir)
Expand Down
10 changes: 10 additions & 0 deletions pkg/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/cloudwego/eino-ext/adk/backend/local"
"github.com/cloudwego/eino/adk"
fsmw "github.com/cloudwego/eino/adk/middlewares/filesystem"
"github.com/cloudwego/eino/callbacks"
"github.com/go-errors/errors"
einoacp "github.com/strrl/eino-acp"
"github.com/strrl/lapp/pkg/tape"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
Expand Down Expand Up @@ -45,6 +47,8 @@ Be concise and actionable. Focus on what matters.`,
type Config struct {
Provider string
Model string
// TapePath, when set, enables tape recording to this JSONL file.
TapePath string
}

// BuildWorkspaceSystemPrompt builds a system prompt for the structured workspace layout.
Expand Down Expand Up @@ -103,6 +107,12 @@ func RunAgentWithPrompt(ctx context.Context, config Config, workDir, question, s
attribute.String("model", config.Model),
)

if config.TapePath != "" {
jsonlStore := tape.NewJSONLStore(config.TapePath)
callbacks.AppendGlobalHandlers(tape.NewHandler(jsonlStore))
slog.Info("Tape recording enabled", "path", config.TapePath)
Comment on lines +110 to +113
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid appending tape callbacks to global handler state

RunAgentWithPrompt installs a new tape callback via callbacks.AppendGlobalHandlers on every invocation, but global handlers are process-wide and never removed. If this function is called more than once in a process (e.g., repeated analyzer runs in tests or embedded usage), later runs will invoke all previously registered tape handlers, causing duplicated events and writes to stale tape files from earlier sessions.

Useful? React with 👍 / 👎.

}

slog.Info("Analyzing with ACP provider", "provider", provider, "model", config.Model)

chatModel, err := einoacp.NewChatModel(ctx, &einoacp.Config{
Expand Down
76 changes: 76 additions & 0 deletions pkg/tape/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package tape

import "time"

// Entry is a single append-only entry in a tape, modeled after republic's TapeEntry.
type Entry struct {
ID int `json:"id"`
Kind string `json:"kind"`
Payload map[string]any `json:"payload"`
Meta map[string]any `json:"meta,omitempty"`
Date string `json:"date"`
}

// Entry kinds.
const (
KindMessage = "message"
KindSystem = "system"
KindToolCall = "tool_call"
KindToolResult = "tool_result"
KindError = "error"
KindEvent = "event"
)

func newEntry(kind string, payload, meta map[string]any) Entry {
return Entry{
Kind: kind,
Payload: payload,
Meta: meta,
Date: time.Now().UTC().Format(time.RFC3339Nano),
}
}

// MessageEntry creates a message entry.
func MessageEntry(role, content string, meta map[string]any) Entry {
return newEntry(KindMessage, map[string]any{
"role": role,
"content": content,
}, meta)
}

// SystemEntry creates a system prompt entry.
func SystemEntry(content string, meta map[string]any) Entry {
return newEntry(KindSystem, map[string]any{
"content": content,
}, meta)
}

// ToolCallEntry creates a tool call entry.
func ToolCallEntry(calls []map[string]any, meta map[string]any) Entry {
return newEntry(KindToolCall, map[string]any{
"calls": calls,
}, meta)
}

// ToolResultEntry creates a tool result entry.
func ToolResultEntry(results []any, meta map[string]any) Entry {
return newEntry(KindToolResult, map[string]any{
"results": results,
}, meta)
}

// ErrorEntry creates an error entry.
func ErrorEntry(kind, message string, meta map[string]any) Entry {
return newEntry(KindError, map[string]any{
"kind": kind,
"message": message,
}, meta)
}

// EventEntry creates a generic event entry.
func EventEntry(name string, data, meta map[string]any) Entry {
return newEntry(KindEvent, map[string]any{
"name": name,
"data": data,
}, meta)
}
220 changes: 220 additions & 0 deletions pkg/tape/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package tape

import (
"context"
"log/slog"

"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/schema"
)

// NewHandler creates an eino callbacks.Handler that records tape entries to the given store.
func NewHandler(store Recorder) callbacks.Handler {
return callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
onStart(store, info, input)
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
onEnd(store, info, output)
return ctx
}).
OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
onError(store, info, err)
return ctx
}).
OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
onEndWithStream(store, info, output)
return ctx
}).
Build()
}

func componentMeta(info *callbacks.RunInfo) map[string]any {
return map[string]any{
"component": string(info.Component),
"name": info.Name,
"type": info.Type,
}
}

func onStart(store Recorder, info *callbacks.RunInfo, input callbacks.CallbackInput) {
if info == nil {
return
}
meta := componentMeta(info)

if info.Component == components.ComponentOfChatModel {
modelInput := model.ConvCallbackInput(input)
if modelInput != nil {
recordModelInput(store, modelInput, meta)
return
}
}

if info.Component == components.ComponentOfTool {
entry := EventEntry("tool_start", map[string]any{
"component": string(info.Component),
"name": info.Name,
}, meta)
appendSafe(store, entry)
return
}

entry := EventEntry("start", map[string]any{
"component": string(info.Component),
"name": info.Name,
}, meta)
appendSafe(store, entry)
}

func onEnd(store Recorder, info *callbacks.RunInfo, output callbacks.CallbackOutput) {
if info == nil {
return
}
meta := componentMeta(info)

if info.Component == components.ComponentOfChatModel {
modelOutput := model.ConvCallbackOutput(output)
if modelOutput != nil {
recordModelOutput(store, modelOutput, meta)
return
}
}

if info.Component == components.ComponentOfTool {
recordToolResult(store, output, meta)
return
}

entry := EventEntry("end", map[string]any{
"component": string(info.Component),
"name": info.Name,
}, meta)
appendSafe(store, entry)
}

func onError(store Recorder, info *callbacks.RunInfo, err error) {
if info == nil {
return
}
meta := componentMeta(info)
entry := ErrorEntry(string(info.Component), err.Error(), meta)
appendSafe(store, entry)
}

func onEndWithStream(store Recorder, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) {
if info == nil {
return
}
Comment on lines +109 to +111
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Close stream copies even when RunInfo is missing

This early return skips closing the output stream copy when info is nil, but OnEndWithStreamOutput handlers are required to close their stream copies to let the original stream fully release resources. In standalone or misconfigured callback contexts where RunInfo can be absent, this can leak goroutines and keep stream cleanup from completing (the same pattern appears in pkg/tape/slog_handler.go).

Useful? React with 👍 / 👎.

meta := componentMeta(info)

// Consume the stream in a goroutine to avoid blocking
go func() {
defer func() {
if r := recover(); r != nil {
slog.Warn("tape: panic consuming stream", "recover", r)
}
}()

var lastOutput callbacks.CallbackOutput
for {
chunk, err := output.Recv()
if err != nil {
break
}
lastOutput = chunk
}
Comment on lines +122 to +129
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Aggregate streamed model chunks before writing tape

onEndWithStream overwrites lastOutput for every Recv() and only records the final chunk, so streamed chat responses lose earlier chunks (including incremental content/tool-call arguments) and the resulting tape can be incomplete or wrong for multi-chunk generations. Because Eino streaming callbacks deliver per-chunk outputs, this should either append each chunk or merge chunks before calling recordModelOutput.

Useful? React with 👍 / 👎.

output.Close()

if lastOutput == nil {
return
}

if info.Component == components.ComponentOfChatModel {
modelOutput := model.ConvCallbackOutput(lastOutput)
if modelOutput != nil {
recordModelOutput(store, modelOutput, meta)
return
}
}

entry := EventEntry("end", map[string]any{
"component": string(info.Component),
"name": info.Name,
}, meta)
appendSafe(store, entry)
}()
}

func recordModelInput(store Recorder, input *model.CallbackInput, meta map[string]any) {
// Record system message separately if present
for _, msg := range input.Messages {
if msg.Role == schema.System {
appendSafe(store, SystemEntry(msg.Content, meta))
continue
}
appendSafe(store, MessageEntry(string(msg.Role), msg.Content, meta))
}

// Record tool calls from messages
if len(input.Tools) > 0 {
var tools []map[string]any
for _, t := range input.Tools {
tools = append(tools, map[string]any{
"name": t.Name,
"desc": t.Desc,
})
}
appendSafe(store, EventEntry("tools_available", map[string]any{
"tools": tools,
}, meta))
}
}

func recordModelOutput(store Recorder, output *model.CallbackOutput, meta map[string]any) {
if output.TokenUsage != nil {
meta["token_usage"] = map[string]any{
"prompt_tokens": output.TokenUsage.PromptTokens,
"completion_tokens": output.TokenUsage.CompletionTokens,
"total_tokens": output.TokenUsage.TotalTokens,
}
}

if output.Message != nil {
// Record tool calls if present
if len(output.Message.ToolCalls) > 0 {
var calls []map[string]any
for _, tc := range output.Message.ToolCalls {
calls = append(calls, map[string]any{
"id": tc.ID,
"function": tc.Function.Name,
"args": tc.Function.Arguments,
})
}
appendSafe(store, ToolCallEntry(calls, meta))
}

// Record assistant message
if output.Message.Content != "" {
appendSafe(store, MessageEntry(string(output.Message.Role), output.Message.Content, meta))
}
}
}

func recordToolResult(store Recorder, output callbacks.CallbackOutput, meta map[string]any) {
// Tool output is typically a string
if s, ok := output.(string); ok {
appendSafe(store, ToolResultEntry([]any{s}, meta))
return
}
appendSafe(store, ToolResultEntry([]any{output}, meta))
}

func appendSafe(store Recorder, entry Entry) {
if err := store.Append(entry); err != nil {
slog.Warn("tape: failed to append entry", "err", err, "kind", entry.Kind)
}
}
Loading
Loading