Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 269 additions & 2 deletions cmd/ax/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@ package main
import (
"context"
"database/sql"
_ "embed"
"encoding/json"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"time"

"github.com/google/ax/cmd/ax/internal/cliutil"
"github.com/google/ax/internal/controller/executor"
"github.com/google/ax/proto"
"github.com/spf13/cobra"
_ "modernc.org/sqlite"
)

//go:embed web/index.html
var dashboardHTML string

var (
dashboardAddr string
dashboardConfigFile string
Expand Down Expand Up @@ -65,6 +75,11 @@ func runDashboard(cmd *cobra.Command, args []string) error {

dbPath := cfg.EventLog.SQLiteConfig.Filename
slog.InfoContext(ctx, "Opening event log database", slog.String("path", dbPath))

if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil {
return fmt.Errorf("failed to create database directory: %w", err)
}

db, err := sql.Open("sqlite", dbPath)
if err != nil {
return fmt.Errorf("failed to open sqlite database: %w", err)
Expand All @@ -76,6 +91,30 @@ func runDashboard(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to ping database: %w", err)
}

// Create tables if they don't exist (to avoid crashes on fresh setup)
if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS conversation_log (
conversation_id TEXT NOT NULL,
seq INTEGER NOT NULL,
payload TEXT NOT NULL,
PRIMARY KEY (conversation_id, seq)
)`); err != nil {
return fmt.Errorf("failed to initialize conversation_log table: %w", err)
}

if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS execution_log (
exec_id TEXT NOT NULL,
payload TEXT NOT NULL,
timestamp DATETIME NOT NULL
)`); err != nil {
return fmt.Errorf("failed to initialize execution_log table: %w", err)
}

if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_execution_log_exec_id ON execution_log(exec_id)`); err != nil {
return fmt.Errorf("failed to create index on execution_log: %w", err)
}

// Setup API handlers
mux := http.NewServeMux()
mux.HandleFunc("/api/conversations", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -97,13 +136,52 @@ func runDashboard(cmd *cobra.Command, args []string) error {
}
})

mux.HandleFunc("/api/trace", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

convID := r.URL.Query().Get("conversation")
if convID == "" {
http.Error(w, "Missing conversation ID", http.StatusBadRequest)
return
}

data, err := loadTraceData(r.Context(), cfg, convID)
if err != nil {
slog.ErrorContext(r.Context(), "Failed to load trace data", slog.String("conversation_id", convID), slog.Any("error", err))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(data); err != nil {
slog.ErrorContext(r.Context(), "Failed to encode trace data response", slog.Any("error", err))
}
})

mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
fmt.Fprint(w, dashboardHTML)
})

listener, err := net.Listen("tcp", dashboardAddr)
if err != nil {
return fmt.Errorf("failed to bind server to %s: %w", dashboardAddr, err)
}
defer listener.Close()

slog.InfoContext(ctx, "AX Dashboard started", slog.String("url", fmt.Sprintf("http://%s", listener.Addr().String())))
addr := listener.Addr().String()
host, port, err := net.SplitHostPort(addr)
if err == nil && (host == "::" || host == "0.0.0.0" || host == "" || host == "[::]") {
addr = fmt.Sprintf("localhost:%s", port)
}
url := fmt.Sprintf("http://%s", addr)

slog.InfoContext(ctx, "AX Dashboard started", slog.String("url", url))

go openBrowser(url)

server := &http.Server{
Handler: mux,
Expand Down Expand Up @@ -148,7 +226,7 @@ LEFT JOIN (
}
defer rows.Close()

var convs []ConversationResponse
convs := []ConversationResponse{}
for rows.Next() {
var id string
var lastSeq int32
Expand Down Expand Up @@ -221,3 +299,192 @@ func parseSQLiteTime(s string) (time.Time, error) {
}
return time.Time{}, err
}

func openBrowser(url string) {
var err error
switch runtime.GOOS {
case "linux":
err = exec.Command("xdg-open", url).Start()
case "windows":
err = exec.Command("rundll32", "url.dll,FileProtocolHandler", url).Start()
case "darwin":
err = exec.Command("open", url).Start()
}
if err != nil {
fmt.Printf("Failed to open browser: %v\n", err)
}
}

type Text struct {
Text string `json:"text"`
}

type Approval struct {
Approved bool `json:"approved"`
}

type Confirmation struct {
ID string `json:"id"`
Question string `json:"question,omitempty"`
Approval *Approval `json:"approval,omitempty"`
}

type Content struct {
Role string `json:"role"`
Text *Text `json:"text,omitempty"`
Confirmation *Confirmation `json:"confirmation,omitempty"`
}

type ExecutionEvent struct {
ExecID string `json:"exec_id"`
AgentID string `json:"agent_id"`
Inputs []Content `json:"inputs"`
Outputs []Content `json:"outputs"`
State string `json:"state"`
Timestamp time.Time `json:"timestamp"`
}

type ExecTrace struct {
ExecID string `json:"exec_id"`
AgentID string `json:"agent_id"`
Events []ExecutionEvent `json:"events"`
}

type TraceData struct {
ConversationID string `json:"conversation_id"`
RootExecID string `json:"root_exec_id"`
Execs []ExecTrace `json:"execs"`
}

func loadTraceData(ctx context.Context, cfg *cliutil.Config, convID string) (*TraceData, error) {
events, rootExecID, execIDs, err := fetch(ctx, cfg, convID)
if err != nil {
return nil, err
}

// TODO(jbd): Trace view incorrectly displays graph executions. We are not
// changing the EventLog interface to fix this because the executor is being
// removed soon in favor of a linear execution model. We will adopt a different
// style of visualization once that's done.
return &TraceData{
ConversationID: convID,
RootExecID: rootExecID,
Execs: buildExecTraces(execIDs, events),
}, nil
}

func fetch(ctx context.Context, cfg *cliutil.Config, convID string) ([]*proto.ExecutionEvent, string, []string, error) {
evLog, err := executor.OpenSQLiteEventLog(cfg.EventLog.SQLiteConfig.Filename)
if err != nil {
return nil, "", nil, fmt.Errorf("could not open sqlite eventlog: %w", err)
}
defer evLog.Close()

convEvents, err := evLog.Events(ctx, convID)
if err != nil {
return nil, "", nil, fmt.Errorf("failed to query conversation events: %w", err)
}

var execIDs []string
seen := make(map[string]bool)
for _, ev := range convEvents {
if ev.ExecId != "" && !seen[ev.ExecId] {
execIDs = append(execIDs, ev.ExecId)
seen[ev.ExecId] = true
}
}

if len(execIDs) == 0 {
return nil, "", nil, fmt.Errorf("no executions found for conversation: %s", convID)
}

var allEvents []*proto.ExecutionEvent
for _, eID := range execIDs {
events, err := evLog.ExecEvents(ctx, eID)
if err != nil {
return nil, "", nil, fmt.Errorf("failed to query events for exec %s: %w", eID, err)
}
allEvents = append(allEvents, events...)
}

// Use the first execID as the rootExecID as requested by user
rootExecID := execIDs[0]

return allEvents, rootExecID, execIDs, nil
}

func buildExecTraces(execIDs []string, events []*proto.ExecutionEvent) []ExecTrace {
execsMap := make(map[string][]ExecutionEvent)

for _, protoEv := range events {
exID := protoEv.ExecId
ev := extractExecutionEvent(exID, protoEv)
execsMap[exID] = append(execsMap[exID], ev)
}

var execs []ExecTrace
for _, execID := range execIDs {
if evs, ok := execsMap[execID]; ok {
agentID := ""
for _, ev := range evs {
if ev.AgentID != "" {
agentID = ev.AgentID
break
}
}
execs = append(execs, ExecTrace{
ExecID: execID,
AgentID: agentID,
Events: evs,
})
}
}

return execs
}

func extractMsgs(protoContents []*proto.Message) []Content {
var results []Content
for _, c := range protoContents {
// Skip messages flagged as internal-only.
if c.GetInternalOnly() {
continue
}
content := Content{Role: c.Role}
msgContent := c.GetContent()
if msgContent == nil {
continue
}
if textC := msgContent.GetText(); textC != nil {
content.Text = &Text{Text: textC.Text}
} else if conf := msgContent.GetConfirmation(); conf != nil {
content.Confirmation = &Confirmation{
ID: conf.Id,
Question: conf.Question,
}
if app := conf.GetApproval(); app != nil {
content.Confirmation.Approval = &Approval{Approved: app.Approved}
} else if dec := conf.GetDecline(); dec != nil {
content.Confirmation.Approval = &Approval{Approved: !dec.Declined}
}
}
results = append(results, content)
}
return results
}

func extractExecutionEvent(execID string, protoEv *proto.ExecutionEvent) ExecutionEvent {
ev := ExecutionEvent{
ExecID: execID,
AgentID: protoEv.AgentId,
}
if protoEv.Timestamp != nil {
ev.Timestamp = protoEv.Timestamp.AsTime()
}

ev.State = fmt.Sprint(protoEv.State)
ev.Outputs = extractMsgs(protoEv.Outputs)
ev.Inputs = extractMsgs(protoEv.Inputs)

return ev
}
Loading
Loading