diff --git a/cmd/ax/dashboard.go b/cmd/ax/dashboard.go index 1cdda7f..52b4276 100644 --- a/cmd/ax/dashboard.go +++ b/cmd/ax/dashboard.go @@ -17,18 +17,28 @@ package main import ( "context" "database/sql" + _ "embed" "encoding/json" "fmt" "log/slog" "net" "net/http" "os" + "os/exec" + "path/filepath" + "runtime" "time" + "github.com/google/ax/cmd/ax/internal/cliutil" + "github.com/google/ax/internal/controller/executor" + "github.com/google/ax/proto" "github.com/spf13/cobra" _ "modernc.org/sqlite" ) +//go:embed web/index.html +var dashboardHTML string + var ( dashboardAddr string dashboardConfigFile string @@ -65,6 +75,11 @@ func runDashboard(cmd *cobra.Command, args []string) error { dbPath := cfg.EventLog.SQLiteConfig.Filename slog.InfoContext(ctx, "Opening event log database", slog.String("path", dbPath)) + + if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { + return fmt.Errorf("failed to create database directory: %w", err) + } + db, err := sql.Open("sqlite", dbPath) if err != nil { return fmt.Errorf("failed to open sqlite database: %w", err) @@ -76,6 +91,30 @@ func runDashboard(cmd *cobra.Command, args []string) error { return fmt.Errorf("failed to ping database: %w", err) } + // Create tables if they don't exist (to avoid crashes on fresh setup) + if _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS conversation_log ( + conversation_id TEXT NOT NULL, + seq INTEGER NOT NULL, + payload TEXT NOT NULL, + PRIMARY KEY (conversation_id, seq) + )`); err != nil { + return fmt.Errorf("failed to initialize conversation_log table: %w", err) + } + + if _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS execution_log ( + exec_id TEXT NOT NULL, + payload TEXT NOT NULL, + timestamp DATETIME NOT NULL + )`); err != nil { + return fmt.Errorf("failed to initialize execution_log table: %w", err) + } + + if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_execution_log_exec_id ON execution_log(exec_id)`); err != nil { + return fmt.Errorf("failed to create index on execution_log: %w", err) + } + // Setup API handlers mux := http.NewServeMux() mux.HandleFunc("/api/conversations", func(w http.ResponseWriter, r *http.Request) { @@ -97,13 +136,52 @@ func runDashboard(cmd *cobra.Command, args []string) error { } }) + mux.HandleFunc("/api/trace", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + convID := r.URL.Query().Get("conversation") + if convID == "" { + http.Error(w, "Missing conversation ID", http.StatusBadRequest) + return + } + + data, err := loadTraceData(r.Context(), cfg, convID) + if err != nil { + slog.ErrorContext(r.Context(), "Failed to load trace data", slog.String("conversation_id", convID), slog.Any("error", err)) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(data); err != nil { + slog.ErrorContext(r.Context(), "Failed to encode trace data response", slog.Any("error", err)) + } + }) + + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + fmt.Fprint(w, dashboardHTML) + }) + listener, err := net.Listen("tcp", dashboardAddr) if err != nil { return fmt.Errorf("failed to bind server to %s: %w", dashboardAddr, err) } defer listener.Close() - slog.InfoContext(ctx, "AX Dashboard started", slog.String("url", fmt.Sprintf("http://%s", listener.Addr().String()))) + addr := listener.Addr().String() + host, port, err := net.SplitHostPort(addr) + if err == nil && (host == "::" || host == "0.0.0.0" || host == "" || host == "[::]") { + addr = fmt.Sprintf("localhost:%s", port) + } + url := fmt.Sprintf("http://%s", addr) + + slog.InfoContext(ctx, "AX Dashboard started", slog.String("url", url)) + + go openBrowser(url) server := &http.Server{ Handler: mux, @@ -148,7 +226,7 @@ LEFT JOIN ( } defer rows.Close() - var convs []ConversationResponse + convs := []ConversationResponse{} for rows.Next() { var id string var lastSeq int32 @@ -221,3 +299,192 @@ func parseSQLiteTime(s string) (time.Time, error) { } return time.Time{}, err } + +func openBrowser(url string) { + var err error + switch runtime.GOOS { + case "linux": + err = exec.Command("xdg-open", url).Start() + case "windows": + err = exec.Command("rundll32", "url.dll,FileProtocolHandler", url).Start() + case "darwin": + err = exec.Command("open", url).Start() + } + if err != nil { + fmt.Printf("Failed to open browser: %v\n", err) + } +} + +type Text struct { + Text string `json:"text"` +} + +type Approval struct { + Approved bool `json:"approved"` +} + +type Confirmation struct { + ID string `json:"id"` + Question string `json:"question,omitempty"` + Approval *Approval `json:"approval,omitempty"` +} + +type Content struct { + Role string `json:"role"` + Text *Text `json:"text,omitempty"` + Confirmation *Confirmation `json:"confirmation,omitempty"` +} + +type ExecutionEvent struct { + ExecID string `json:"exec_id"` + AgentID string `json:"agent_id"` + Inputs []Content `json:"inputs"` + Outputs []Content `json:"outputs"` + State string `json:"state"` + Timestamp time.Time `json:"timestamp"` +} + +type ExecTrace struct { + ExecID string `json:"exec_id"` + AgentID string `json:"agent_id"` + Events []ExecutionEvent `json:"events"` +} + +type TraceData struct { + ConversationID string `json:"conversation_id"` + RootExecID string `json:"root_exec_id"` + Execs []ExecTrace `json:"execs"` +} + +func loadTraceData(ctx context.Context, cfg *cliutil.Config, convID string) (*TraceData, error) { + events, rootExecID, execIDs, err := fetch(ctx, cfg, convID) + if err != nil { + return nil, err + } + + // TODO(jbd): Trace view incorrectly displays graph executions. We are not + // changing the EventLog interface to fix this because the executor is being + // removed soon in favor of a linear execution model. We will adopt a different + // style of visualization once that's done. + return &TraceData{ + ConversationID: convID, + RootExecID: rootExecID, + Execs: buildExecTraces(execIDs, events), + }, nil +} + +func fetch(ctx context.Context, cfg *cliutil.Config, convID string) ([]*proto.ExecutionEvent, string, []string, error) { + evLog, err := executor.OpenSQLiteEventLog(cfg.EventLog.SQLiteConfig.Filename) + if err != nil { + return nil, "", nil, fmt.Errorf("could not open sqlite eventlog: %w", err) + } + defer evLog.Close() + + convEvents, err := evLog.Events(ctx, convID) + if err != nil { + return nil, "", nil, fmt.Errorf("failed to query conversation events: %w", err) + } + + var execIDs []string + seen := make(map[string]bool) + for _, ev := range convEvents { + if ev.ExecId != "" && !seen[ev.ExecId] { + execIDs = append(execIDs, ev.ExecId) + seen[ev.ExecId] = true + } + } + + if len(execIDs) == 0 { + return nil, "", nil, fmt.Errorf("no executions found for conversation: %s", convID) + } + + var allEvents []*proto.ExecutionEvent + for _, eID := range execIDs { + events, err := evLog.ExecEvents(ctx, eID) + if err != nil { + return nil, "", nil, fmt.Errorf("failed to query events for exec %s: %w", eID, err) + } + allEvents = append(allEvents, events...) + } + + // Use the first execID as the rootExecID as requested by user + rootExecID := execIDs[0] + + return allEvents, rootExecID, execIDs, nil +} + +func buildExecTraces(execIDs []string, events []*proto.ExecutionEvent) []ExecTrace { + execsMap := make(map[string][]ExecutionEvent) + + for _, protoEv := range events { + exID := protoEv.ExecId + ev := extractExecutionEvent(exID, protoEv) + execsMap[exID] = append(execsMap[exID], ev) + } + + var execs []ExecTrace + for _, execID := range execIDs { + if evs, ok := execsMap[execID]; ok { + agentID := "" + for _, ev := range evs { + if ev.AgentID != "" { + agentID = ev.AgentID + break + } + } + execs = append(execs, ExecTrace{ + ExecID: execID, + AgentID: agentID, + Events: evs, + }) + } + } + + return execs +} + +func extractMsgs(protoContents []*proto.Message) []Content { + var results []Content + for _, c := range protoContents { + // Skip messages flagged as internal-only. + if c.GetInternalOnly() { + continue + } + content := Content{Role: c.Role} + msgContent := c.GetContent() + if msgContent == nil { + continue + } + if textC := msgContent.GetText(); textC != nil { + content.Text = &Text{Text: textC.Text} + } else if conf := msgContent.GetConfirmation(); conf != nil { + content.Confirmation = &Confirmation{ + ID: conf.Id, + Question: conf.Question, + } + if app := conf.GetApproval(); app != nil { + content.Confirmation.Approval = &Approval{Approved: app.Approved} + } else if dec := conf.GetDecline(); dec != nil { + content.Confirmation.Approval = &Approval{Approved: !dec.Declined} + } + } + results = append(results, content) + } + return results +} + +func extractExecutionEvent(execID string, protoEv *proto.ExecutionEvent) ExecutionEvent { + ev := ExecutionEvent{ + ExecID: execID, + AgentID: protoEv.AgentId, + } + if protoEv.Timestamp != nil { + ev.Timestamp = protoEv.Timestamp.AsTime() + } + + ev.State = fmt.Sprint(protoEv.State) + ev.Outputs = extractMsgs(protoEv.Outputs) + ev.Inputs = extractMsgs(protoEv.Inputs) + + return ev +} diff --git a/cmd/ax/web/index.html b/cmd/ax/web/index.html new file mode 100644 index 0000000..2595b07 --- /dev/null +++ b/cmd/ax/web/index.html @@ -0,0 +1,1221 @@ + + + + + + + +AX Dashboard + + + + + + + + +
+
+ +

AX Dashboard

+
+
+ + Connected +
+
+ +
+ +
+ +
+
+
+

Total

+
0
+
+
Σ
+
+
+
+

Active

+
0
+
+
+
+
+
+

Completed

+
0
+
+
+
+
+
+

Failed

+
0
+
+
+
+
+ + +
+
+ +
+ + +
+ + +
+ + + + + + + + + + + + + + +
Conversation IDAgentStatusLast SeqDurationActions
+ +
+
+ + +
+
+ +
+

Execution Trace

+

+
+
+ + +
+
Execution Timeline
+
+ +
+
+ + +
+ +
+
+
+ + + +