Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 223 additions & 0 deletions cmd/ax/dashboard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"time"

"github.com/spf13/cobra"
_ "modernc.org/sqlite"
)

var (
dashboardAddr string
dashboardConfigFile string
)

var dashboardCmd = &cobra.Command{
Use: "dashboard",
Short: "Start the AX Dashboard",
Long: `Start a local HTTP server to display AX conversations and executions dashboard.`,
RunE: runDashboard,
}

func init() {
dashboardCmd.Flags().StringVar(&dashboardAddr, "addr", "localhost:8080", "Server address to listen on")
dashboardCmd.Flags().StringVar(&dashboardConfigFile, "config", "ax.yaml", "Path to YAML configuration file")
}

type ConversationResponse struct {
ID string `json:"id"`
Agent string `json:"agent"`
Status string `json:"status"`
LastSeq int32 `json:"last_seq"`
Duration string `json:"duration"`
}

func runDashboard(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))

cfg, err := newConfig(cmd, dashboardConfigFile)
if err != nil {
return err
}

dbPath := cfg.EventLog.SQLiteConfig.Filename
slog.InfoContext(ctx, "Opening event log database", slog.String("path", dbPath))
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return fmt.Errorf("failed to open sqlite database: %w", err)
}
defer db.Close()

// Verify database connection
if err := db.PingContext(ctx); err != nil {
return fmt.Errorf("failed to ping database: %w", err)
}

// Setup API handlers
mux := http.NewServeMux()
mux.HandleFunc("/api/conversations", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

convs, err := fetchConversations(r.Context(), db)
if err != nil {
slog.ErrorContext(r.Context(), "Failed to fetch conversations", slog.Any("error", err))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(convs); err != nil {
slog.ErrorContext(r.Context(), "Failed to encode conversations response", slog.Any("error", err))
}
})

listener, err := net.Listen("tcp", dashboardAddr)
if err != nil {
return fmt.Errorf("failed to bind server to %s: %w", dashboardAddr, err)
}
defer listener.Close()

slog.InfoContext(ctx, "AX Dashboard started", slog.String("url", fmt.Sprintf("http://%s", listener.Addr().String())))

server := &http.Server{
Handler: mux,
}

return server.Serve(listener)
}

func fetchConversations(ctx context.Context, db *sql.DB) ([]ConversationResponse, error) {
query := `
SELECT
c.conversation_id,
c.last_seq,
c.state,
e.agent_id,
e.start_time,
e.end_time
FROM (
SELECT conversation_id, seq AS last_seq,
json_extract(payload, '$.exec_id') AS exec_id,
json_extract(payload, '$.state') AS state
FROM conversation_log
WHERE (conversation_id, seq) IN (
SELECT conversation_id, MAX(seq)
FROM conversation_log
GROUP BY conversation_id
)
) c
LEFT JOIN (
SELECT
exec_id,
json_extract(payload, '$.agent_id') AS agent_id,
MIN(timestamp) AS start_time,
MAX(timestamp) AS end_time
FROM execution_log
GROUP BY exec_id
) e ON c.exec_id = e.exec_id;
`
rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()

var convs []ConversationResponse
for rows.Next() {
var id string
var lastSeq int32
var state string
var agentID sql.NullString
var startTimeStr, endTimeStr sql.NullString

err := rows.Scan(&id, &lastSeq, &state, &agentID, &startTimeStr, &endTimeStr)
if err != nil {
return nil, err
}

agent := "unknown"
if agentID.Valid && agentID.String != "" {
agent = agentID.String
// Strip special prefix if it starts with "__"
if len(agent) > 2 && agent[:2] == "__" {
agent = agent[2:]
}
}

durationStr := "N/A"
if startTimeStr.Valid && endTimeStr.Valid {
startTime, err1 := parseSQLiteTime(startTimeStr.String)
endTime, err2 := parseSQLiteTime(endTimeStr.String)
if err1 == nil && err2 == nil {
duration := endTime.Sub(startTime)
durationStr = fmt.Sprintf("%.1fs", duration.Seconds())
} else {
slog.WarnContext(ctx, "Failed to parse sqlite timestamps", slog.String("start", startTimeStr.String), slog.String("end", endTimeStr.String), slog.Any("err1", err1), slog.Any("err2", err2))
}
}

status := state
if len(status) > 6 && status[:6] == "STATE_" {
status = status[6:]
}
if status == "PENDING" {
status = "RUNNING"
}

convs = append(convs, ConversationResponse{
ID: id,
Agent: agent,
Status: status,
LastSeq: lastSeq,
Duration: durationStr,
})
}

return convs, nil
}

func parseSQLiteTime(s string) (time.Time, error) {
layouts := []string{
time.RFC3339Nano,
time.RFC3339,
"2006-01-02 15:04:05.999999999-07:00",
"2006-01-02 15:04:05.999999999",
"2006-01-02 15:04:05.999999999 -0700 MST",
"2006-01-02 15:04:05",
}
var err error
var t time.Time
for _, layout := range layouts {
t, err = time.Parse(layout, s)
if err == nil {
return t, nil
}
}
return time.Time{}, err
}
139 changes: 139 additions & 0 deletions cmd/ax/dashboard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"database/sql"
"testing"
"time"

_ "modernc.org/sqlite"
)

func TestFetchConversations(t *testing.T) {
// Create an in-memory SQLite database for testing
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
t.Fatalf("failed to open in-memory db: %v", err)
}
defer db.Close()

// Initialize the schema
schema := `
CREATE TABLE conversation_log (
conversation_id TEXT,
seq INTEGER,
timestamp DATETIME,
payload TEXT
);

CREATE TABLE execution_log (
exec_id TEXT,
timestamp DATETIME,
payload TEXT
);
`
if _, err := db.Exec(schema); err != nil {
t.Fatalf("failed to create schema: %v", err)
}

// Insert test data
// Conversation 1: Only conversation_log (V2 execution without execution_log)
_, err = db.Exec(`
INSERT INTO conversation_log (conversation_id, seq, timestamp, payload)
VALUES ('conv-1', 1, ?, '{"exec_id": "exec-1", "state": "STATE_PENDING"}')
`, time.Now().Format(time.RFC3339))
if err != nil {
t.Fatalf("failed to insert conv-1: %v", err)
}

// Conversation 2: Both conversation_log and execution_log (V1 execution)
now := time.Now()
start := now.Add(-5 * time.Second)
end := now

_, err = db.Exec(`
INSERT INTO conversation_log (conversation_id, seq, timestamp, payload)
VALUES ('conv-2', 5, ?, '{"exec_id": "exec-2", "state": "STATE_COMPLETED"}')
`, end.Format(time.RFC3339))
if err != nil {
t.Fatalf("failed to insert conv-2: %v", err)
}

_, err = db.Exec(`
INSERT INTO execution_log (exec_id, timestamp, payload)
VALUES
('exec-2', ?, '{"agent_id": "my-agent"}'),
('exec-2', ?, '{"agent_id": "my-agent"}')
`, start.Format(time.RFC3339), end.Format(time.RFC3339))
if err != nil {
t.Fatalf("failed to insert exec-2: %v", err)
}

// Fetch the conversations
ctx := context.Background()
convs, err := fetchConversations(ctx, db)
if err != nil {
t.Fatalf("fetchConversations failed: %v", err)
}

if len(convs) != 2 {
t.Fatalf("expected 2 conversations, got %d", len(convs))
}

// Create a map for easy lookup
convMap := make(map[string]ConversationResponse)
for _, c := range convs {
convMap[c.ID] = c
}

// Check conv-1
c1, ok := convMap["conv-1"]
if !ok {
t.Fatalf("conv-1 not found")
}
if c1.Status != "RUNNING" { // STATE_PENDING -> RUNNING
t.Errorf("conv-1 expected status RUNNING, got %q", c1.Status)
}
if c1.Agent != "unknown" {
t.Errorf("conv-1 expected agent unknown, got %q", c1.Agent)
}
if c1.Duration != "N/A" {
t.Errorf("conv-1 expected duration N/A, got %q", c1.Duration)
}
if c1.LastSeq != 1 {
t.Errorf("conv-1 expected last_seq 1, got %d", c1.LastSeq)
}

// Check conv-2
c2, ok := convMap["conv-2"]
if !ok {
t.Fatalf("conv-2 not found")
}
if c2.Status != "COMPLETED" { // STATE_COMPLETED -> COMPLETED
t.Errorf("conv-2 expected status COMPLETED, got %q", c2.Status)
}
if c2.Agent != "my-agent" {
t.Errorf("conv-2 expected agent my-agent, got %q", c2.Agent)
}
// Duration should be roughly 5.0s
if c2.Duration != "5.0s" {
t.Errorf("conv-2 expected duration 5.0s, got %q", c2.Duration)
}
if c2.LastSeq != 5 {
t.Errorf("conv-2 expected last_seq 5, got %d", c2.LastSeq)
}
}
2 changes: 2 additions & 0 deletions cmd/ax/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ and run the controller server.`,
func init() {
rootCmd.AddCommand(execCmd)
rootCmd.AddCommand(serveCmd)

rootCmd.AddCommand(dashboardCmd)
}

func connect(server string) (*grpc.ClientConn, error) {
Expand Down
Loading