Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cli/cmd/fellowship/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,11 @@ func runDashboard(d *db.DB, args []string) int {
poll := fs.Int("poll", 5, "Poll interval in seconds")
fs.Parse(args)

srv := dashboard.NewServer(d, gitRootOrCwd(), *poll)
srv, err := dashboard.NewServer(d, gitRootOrCwd(), *poll)
if err != nil {
fmt.Fprintf(os.Stderr, "dashboard: %v\n", err)
return 1
}

addr := fmt.Sprintf("localhost:%d", *port)
url := fmt.Sprintf("http://%s", addr)
Expand Down
19 changes: 12 additions & 7 deletions cli/internal/dashboard/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dashboard
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
Expand All @@ -27,6 +28,11 @@ func (s *Server) handleAutopsies(w http.ResponseWriter, r *http.Request) {
return loadErr
})

if err != nil {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]interface{}{})
return
}
if suffix != "" {
// Filter to matching record by quest name
for _, r := range records {
Expand All @@ -39,11 +45,6 @@ func (s *Server) handleAutopsies(w http.ResponseWriter, r *http.Request) {
http.Error(w, "autopsy not found", http.StatusNotFound)
return
}
if err != nil {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]interface{}{})
return
}
if records == nil {
records = []autopsy.Autopsy{}
}
Expand Down Expand Up @@ -124,7 +125,11 @@ func (s *Server) handleConfigWrite(w http.ResponseWriter, r *http.Request) {
var configPath string
switch req.Scope {
case "global":
home, _ := os.UserHomeDir()
home, err := os.UserHomeDir()
if err != nil {
http.Error(w, "unable to determine home directory", http.StatusInternalServerError)
return
}
configPath = filepath.Join(home, ".claude", "fellowship.json")
case "project":
configPath = filepath.Join(s.gitRoot, ".fellowship", "config.json")
Expand Down Expand Up @@ -152,7 +157,7 @@ func (s *Server) handleConfigWrite(w http.ResponseWriter, r *http.Request) {
http.Error(w, "failed to marshal config", http.StatusInternalServerError)
return
}
tmp := configPath + ".tmp"
tmp := fmt.Sprintf("%s.tmp.%d", configPath, os.Getpid())
if err := os.WriteFile(tmp, data, 0644); err != nil {
http.Error(w, "failed to write config", http.StatusInternalServerError)
return
Expand Down
50 changes: 44 additions & 6 deletions cli/internal/dashboard/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"sync"
"syscall"
"time"
)

Expand Down Expand Up @@ -47,10 +48,12 @@ func commandQueuePath(gitRoot string) string {
return filepath.Join(gitRoot, ".fellowship", "command-queue.json")
}

func generateID() string {
func generateID() (string, error) {
b := make([]byte, 8)
rand.Read(b)
return hex.EncodeToString(b)
if _, err := rand.Read(b); err != nil {
return "", fmt.Errorf("generating command ID: %w", err)
}
return hex.EncodeToString(b), nil
}

func LoadCommandQueue(gitRoot string) (*CommandQueue, error) {
Expand Down Expand Up @@ -78,23 +81,58 @@ func SaveCommandQueue(gitRoot string, q *CommandQueue) error {
if err != nil {
return err
}
tmp := path + ".tmp"
tmp := fmt.Sprintf("%s.tmp.%d", path, os.Getpid())
if err := os.WriteFile(tmp, data, 0644); err != nil {
return err
}
return os.Rename(tmp, path)
if err := os.Rename(tmp, path); err != nil {
os.Remove(tmp)
return err
}
return nil
}

func lockQueueFile(gitRoot string) (*os.File, error) {
lockPath := commandQueuePath(gitRoot) + ".lock"
if err := os.MkdirAll(filepath.Dir(lockPath), 0755); err != nil {
return nil, err
}
f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("opening queue lock: %w", err)
}
if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil {
f.Close()
return nil, fmt.Errorf("acquiring queue lock: %w", err)
}
return f, nil
}

func unlockQueueFile(f *os.File) {
syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
f.Close()
}

func EnqueueCommand(gitRoot string, action CommandAction, params json.RawMessage) (*Command, error) {
queueMu.Lock()
defer queueMu.Unlock()

lockFile, err := lockQueueFile(gitRoot)
if err != nil {
return nil, err
}
defer unlockQueueFile(lockFile)

q, err := LoadCommandQueue(gitRoot)
if err != nil {
return nil, err
}
id, err := generateID()
if err != nil {
return nil, err
}
cmd := Command{
ID: generateID(),
ID: id,
Action: action,
Status: StatusPending,
Params: params,
Expand Down
15 changes: 7 additions & 8 deletions cli/internal/dashboard/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
iofs "io/fs"
"log"
"net/http"
"strings"
"time"
Expand All @@ -31,7 +30,12 @@ type Server struct {
hub *Hub
}

func NewServer(d *db.DB, gitRoot string, pollInterval int) *Server {
func NewServer(d *db.DB, gitRoot string, pollInterval int) (*Server, error) {
staticFS, err := iofs.Sub(staticFiles, "static")
if err != nil {
return nil, fmt.Errorf("dashboard: failed to load static assets: %w", err)
}

s := &Server{
mux: http.NewServeMux(),
db: d,
Expand Down Expand Up @@ -59,11 +63,6 @@ func NewServer(d *db.DB, gitRoot string, pollInterval int) *Server {
s.mux.HandleFunc("GET /api/tome/", s.handleTome)
s.mux.HandleFunc("GET /api/config", s.handleConfigRead)
s.mux.HandleFunc("POST /api/config", s.handleConfigWrite)

staticFS, err := iofs.Sub(staticFiles, "static")
if err != nil {
log.Fatalf("dashboard: failed to load static assets: %v", err)
}
fileServer := http.FileServer(http.FS(staticFS))
s.mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/api/") || r.URL.Path == "/ws" {
Expand Down Expand Up @@ -94,7 +93,7 @@ func NewServer(d *db.DB, gitRoot string, pollInterval int) *Server {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write(data)
})
return s
return s, nil
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down
23 changes: 16 additions & 7 deletions cli/internal/dashboard/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ import (
"github.com/justinjdev/fellowship/cli/internal/state"
)

func mustNewServer(t *testing.T, d *db.DB) *Server {
t.Helper()
srv, err := NewServer(d, "", 5)
if err != nil {
t.Fatalf("NewServer: %v", err)
}
return srv
}

func setupTestDB(t *testing.T) (*db.DB, string) {
t.Helper()
d := db.OpenTest(t)
Expand Down Expand Up @@ -51,7 +60,7 @@ func setupTestDB(t *testing.T) (*db.DB, string) {

func TestAPIStatus(t *testing.T) {
d, _ := setupTestDB(t)
srv := NewServer(d, "", 5)
srv := mustNewServer(t, d)

req := httptest.NewRequest("GET", "/api/status", nil)
w := httptest.NewRecorder()
Expand Down Expand Up @@ -98,7 +107,7 @@ func TestAPIStatus(t *testing.T) {

func TestAPIGateApprove(t *testing.T) {
d, worktreeDir := setupTestDB(t)
srv := NewServer(d, "", 5)
srv := mustNewServer(t, d)

body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir))
req := httptest.NewRequest("POST", "/api/gate/approve", body)
Expand Down Expand Up @@ -127,7 +136,7 @@ func TestAPIGateApprove(t *testing.T) {

func TestAPIGateReject(t *testing.T) {
d, worktreeDir := setupTestDB(t)
srv := NewServer(d, "", 5)
srv := mustNewServer(t, d)

body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir))
req := httptest.NewRequest("POST", "/api/gate/reject", body)
Expand Down Expand Up @@ -170,7 +179,7 @@ func TestAPIGateApprove_NoPending(t *testing.T) {
t.Fatal(err)
}

srv := NewServer(d, "", 5)
srv := mustNewServer(t, d)

body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir))
req := httptest.NewRequest("POST", "/api/gate/approve", body)
Expand All @@ -184,7 +193,7 @@ func TestAPIGateApprove_NoPending(t *testing.T) {

func TestAPIGateApprove_HeraldLogging(t *testing.T) {
d, worktreeDir := setupTestDB(t)
srv := NewServer(d, "", 5)
srv := mustNewServer(t, d)

body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir))
req := httptest.NewRequest("POST", "/api/gate/approve", body)
Expand Down Expand Up @@ -228,7 +237,7 @@ func TestAPIGateApprove_HeraldLogging(t *testing.T) {

func TestAPIGateReject_HeraldLogging(t *testing.T) {
d, worktreeDir := setupTestDB(t)
srv := NewServer(d, "", 5)
srv := mustNewServer(t, d)

body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir))
req := httptest.NewRequest("POST", "/api/gate/reject", body)
Expand Down Expand Up @@ -261,7 +270,7 @@ func TestAPIGateReject_HeraldLogging(t *testing.T) {

func TestAPIStatus_NotFound(t *testing.T) {
d, _ := setupTestDB(t)
srv := NewServer(d, "", 5)
srv := mustNewServer(t, d)

req := httptest.NewRequest("GET", "/api/nonexistent", nil)
w := httptest.NewRecorder()
Expand Down
77 changes: 58 additions & 19 deletions cli/internal/dashboard/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,53 @@ type WSEvent struct {
}

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true }, // allow all origins (dashboard is localhost-only by design)
// Allow all origins — the dashboard binds to localhost but may be accessed
// from different ports or via forwarded connections during development.
CheckOrigin: func(r *http.Request) bool { return true },
}

// wsConn wraps a websocket.Conn with a per-connection write mutex and
// idempotent Close. gorilla/websocket supports one concurrent reader and
// one concurrent writer — the write mutex serializes all writes to this
// connection across concurrent Broadcast calls.
type wsConn struct {
conn *websocket.Conn
writeMu sync.Mutex
once sync.Once
}

func (c *wsConn) writeMessage(messageType int, data []byte, deadline time.Time) error {
c.writeMu.Lock()
defer c.writeMu.Unlock()
c.conn.SetWriteDeadline(deadline)
return c.conn.WriteMessage(messageType, data)
}

func (c *wsConn) close() {
c.once.Do(func() { c.conn.Close() })
}

// Hub manages WebSocket connections and broadcasts events.
type Hub struct {
mu sync.RWMutex
conns map[*websocket.Conn]struct{}
conns map[*wsConn]struct{}
}

func NewHub() *Hub {
return &Hub{conns: make(map[*websocket.Conn]struct{})}
return &Hub{conns: make(map[*wsConn]struct{})}
}

func (h *Hub) Add(conn *websocket.Conn) {
func (h *Hub) add(c *wsConn) {
h.mu.Lock()
h.conns[conn] = struct{}{}
h.conns[c] = struct{}{}
h.mu.Unlock()
}

func (h *Hub) Remove(conn *websocket.Conn) {
func (h *Hub) remove(c *wsConn) {
h.mu.Lock()
delete(h.conns, conn)
delete(h.conns, c)
h.mu.Unlock()
conn.Close()
c.close()
}

func (h *Hub) Broadcast(event WSEvent) {
Expand All @@ -59,30 +82,46 @@ func (h *Hub) Broadcast(event WSEvent) {
return
}

h.mu.Lock()
defer h.mu.Unlock()
for conn := range h.conns {
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
delete(h.conns, conn)
conn.Close()
// Snapshot connections under read lock to avoid holding the lock during writes.
h.mu.RLock()
snapshot := make([]*wsConn, 0, len(h.conns))
for c := range h.conns {
snapshot = append(snapshot, c)
}
h.mu.RUnlock()

var failed []*wsConn
deadline := time.Now().Add(5 * time.Second)
for _, c := range snapshot {
if err := c.writeMessage(websocket.TextMessage, data, deadline); err != nil {
failed = append(failed, c)
}
}

if len(failed) > 0 {
h.mu.Lock()
for _, c := range failed {
delete(h.conns, c)
c.close()
}
h.mu.Unlock()
}
}

func (h *Hub) HandleWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
raw, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("ws: upgrade error: %v", err)
return
}
h.Add(conn)
c := &wsConn{conn: raw}
h.add(c)

// Read pump — just drain pings/pongs, we don't expect client messages
go func() {
defer h.Remove(conn)
defer h.remove(c)
for {
if _, _, err := conn.ReadMessage(); err != nil {
if _, _, err := raw.ReadMessage(); err != nil {
break
}
}
Expand Down