From 06dc77b073e8b76f2956764027ab60b593bff206 Mon Sep 17 00:00:00 2001 From: Justin Jones Date: Sat, 21 Mar 2026 17:25:22 -0500 Subject: [PATCH 1/2] =?UTF-8?q?fix(dashboard):=20address=20CodeRabbit=20re?= =?UTF-8?q?view=20=E2=80=94=20robustness=20and=20concurrency?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - queue.go: handle rand.Read error, use unique tmp path per PID, add file locking (flock) for cross-process safety - data.go: handle UserHomeDir error on config write path - server.go: NewServer returns error instead of log.Fatalf - ws.go: snapshot conns under RLock before writing to avoid holding mutex during network I/O; fix misleading CheckOrigin comment Co-Authored-By: Claude Opus 4.6 --- cli/cmd/fellowship/main.go | 6 +++- cli/internal/dashboard/data.go | 6 +++- cli/internal/dashboard/queue.go | 50 +++++++++++++++++++++++---- cli/internal/dashboard/server.go | 15 ++++---- cli/internal/dashboard/server_test.go | 23 ++++++++---- cli/internal/dashboard/ws.go | 23 ++++++++++-- 6 files changed, 97 insertions(+), 26 deletions(-) diff --git a/cli/cmd/fellowship/main.go b/cli/cmd/fellowship/main.go index 8322715..957b462 100644 --- a/cli/cmd/fellowship/main.go +++ b/cli/cmd/fellowship/main.go @@ -913,7 +913,11 @@ func runDashboard(d *db.DB, args []string) int { poll := fs.Int("poll", 5, "Poll interval in seconds") fs.Parse(args) - srv := dashboard.NewServer(d, gitRootOrCwd(), *poll) + srv, err := dashboard.NewServer(d, gitRootOrCwd(), *poll) + if err != nil { + fmt.Fprintf(os.Stderr, "dashboard: %v\n", err) + return 1 + } addr := fmt.Sprintf("localhost:%d", *port) url := fmt.Sprintf("http://%s", addr) diff --git a/cli/internal/dashboard/data.go b/cli/internal/dashboard/data.go index 0b85d72..5919778 100644 --- a/cli/internal/dashboard/data.go +++ b/cli/internal/dashboard/data.go @@ -124,7 +124,11 @@ func (s *Server) handleConfigWrite(w http.ResponseWriter, r *http.Request) { var configPath string switch req.Scope { case "global": - home, _ := os.UserHomeDir() + home, err := os.UserHomeDir() + if err != nil { + http.Error(w, "unable to determine home directory", http.StatusInternalServerError) + return + } configPath = filepath.Join(home, ".claude", "fellowship.json") case "project": configPath = filepath.Join(s.gitRoot, ".fellowship", "config.json") diff --git a/cli/internal/dashboard/queue.go b/cli/internal/dashboard/queue.go index 9f764b0..70c7774 100644 --- a/cli/internal/dashboard/queue.go +++ b/cli/internal/dashboard/queue.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "sync" + "syscall" "time" ) @@ -47,10 +48,12 @@ func commandQueuePath(gitRoot string) string { return filepath.Join(gitRoot, ".fellowship", "command-queue.json") } -func generateID() string { +func generateID() (string, error) { b := make([]byte, 8) - rand.Read(b) - return hex.EncodeToString(b) + if _, err := rand.Read(b); err != nil { + return "", fmt.Errorf("generating command ID: %w", err) + } + return hex.EncodeToString(b), nil } func LoadCommandQueue(gitRoot string) (*CommandQueue, error) { @@ -78,23 +81,58 @@ func SaveCommandQueue(gitRoot string, q *CommandQueue) error { if err != nil { return err } - tmp := path + ".tmp" + tmp := fmt.Sprintf("%s.tmp.%d", path, os.Getpid()) if err := os.WriteFile(tmp, data, 0644); err != nil { return err } - return os.Rename(tmp, path) + if err := os.Rename(tmp, path); err != nil { + os.Remove(tmp) + return err + } + return nil +} + +func lockQueueFile(gitRoot string) (*os.File, error) { + lockPath := commandQueuePath(gitRoot) + ".lock" + if err := os.MkdirAll(filepath.Dir(lockPath), 0755); err != nil { + return nil, err + } + f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return nil, fmt.Errorf("opening queue lock: %w", err) + } + if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil { + f.Close() + return nil, fmt.Errorf("acquiring queue lock: %w", err) + } + return f, nil +} + +func unlockQueueFile(f *os.File) { + syscall.Flock(int(f.Fd()), syscall.LOCK_UN) + f.Close() } func EnqueueCommand(gitRoot string, action CommandAction, params json.RawMessage) (*Command, error) { queueMu.Lock() defer queueMu.Unlock() + lockFile, err := lockQueueFile(gitRoot) + if err != nil { + return nil, err + } + defer unlockQueueFile(lockFile) + q, err := LoadCommandQueue(gitRoot) if err != nil { return nil, err } + id, err := generateID() + if err != nil { + return nil, err + } cmd := Command{ - ID: generateID(), + ID: id, Action: action, Status: StatusPending, Params: params, diff --git a/cli/internal/dashboard/server.go b/cli/internal/dashboard/server.go index 1f1532f..56accf5 100644 --- a/cli/internal/dashboard/server.go +++ b/cli/internal/dashboard/server.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" iofs "io/fs" - "log" "net/http" "strings" "time" @@ -31,7 +30,12 @@ type Server struct { hub *Hub } -func NewServer(d *db.DB, gitRoot string, pollInterval int) *Server { +func NewServer(d *db.DB, gitRoot string, pollInterval int) (*Server, error) { + staticFS, err := iofs.Sub(staticFiles, "static") + if err != nil { + return nil, fmt.Errorf("dashboard: failed to load static assets: %w", err) + } + s := &Server{ mux: http.NewServeMux(), db: d, @@ -59,11 +63,6 @@ func NewServer(d *db.DB, gitRoot string, pollInterval int) *Server { s.mux.HandleFunc("GET /api/tome/", s.handleTome) s.mux.HandleFunc("GET /api/config", s.handleConfigRead) s.mux.HandleFunc("POST /api/config", s.handleConfigWrite) - - staticFS, err := iofs.Sub(staticFiles, "static") - if err != nil { - log.Fatalf("dashboard: failed to load static assets: %v", err) - } fileServer := http.FileServer(http.FS(staticFS)) s.mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if strings.HasPrefix(r.URL.Path, "/api/") || r.URL.Path == "/ws" { @@ -94,7 +93,7 @@ func NewServer(d *db.DB, gitRoot string, pollInterval int) *Server { w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Write(data) }) - return s + return s, nil } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/cli/internal/dashboard/server_test.go b/cli/internal/dashboard/server_test.go index f4421ec..86b8e75 100644 --- a/cli/internal/dashboard/server_test.go +++ b/cli/internal/dashboard/server_test.go @@ -14,6 +14,15 @@ import ( "github.com/justinjdev/fellowship/cli/internal/state" ) +func mustNewServer(t *testing.T, d *db.DB) *Server { + t.Helper() + srv, err := NewServer(d, "", 5) + if err != nil { + t.Fatalf("NewServer: %v", err) + } + return srv +} + func setupTestDB(t *testing.T) (*db.DB, string) { t.Helper() d := db.OpenTest(t) @@ -51,7 +60,7 @@ func setupTestDB(t *testing.T) (*db.DB, string) { func TestAPIStatus(t *testing.T) { d, _ := setupTestDB(t) - srv := NewServer(d, "", 5) + srv := mustNewServer(t, d) req := httptest.NewRequest("GET", "/api/status", nil) w := httptest.NewRecorder() @@ -98,7 +107,7 @@ func TestAPIStatus(t *testing.T) { func TestAPIGateApprove(t *testing.T) { d, worktreeDir := setupTestDB(t) - srv := NewServer(d, "", 5) + srv := mustNewServer(t, d) body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir)) req := httptest.NewRequest("POST", "/api/gate/approve", body) @@ -127,7 +136,7 @@ func TestAPIGateApprove(t *testing.T) { func TestAPIGateReject(t *testing.T) { d, worktreeDir := setupTestDB(t) - srv := NewServer(d, "", 5) + srv := mustNewServer(t, d) body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir)) req := httptest.NewRequest("POST", "/api/gate/reject", body) @@ -170,7 +179,7 @@ func TestAPIGateApprove_NoPending(t *testing.T) { t.Fatal(err) } - srv := NewServer(d, "", 5) + srv := mustNewServer(t, d) body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir)) req := httptest.NewRequest("POST", "/api/gate/approve", body) @@ -184,7 +193,7 @@ func TestAPIGateApprove_NoPending(t *testing.T) { func TestAPIGateApprove_HeraldLogging(t *testing.T) { d, worktreeDir := setupTestDB(t) - srv := NewServer(d, "", 5) + srv := mustNewServer(t, d) body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir)) req := httptest.NewRequest("POST", "/api/gate/approve", body) @@ -228,7 +237,7 @@ func TestAPIGateApprove_HeraldLogging(t *testing.T) { func TestAPIGateReject_HeraldLogging(t *testing.T) { d, worktreeDir := setupTestDB(t) - srv := NewServer(d, "", 5) + srv := mustNewServer(t, d) body := strings.NewReader(fmt.Sprintf(`{"dir":%q}`, worktreeDir)) req := httptest.NewRequest("POST", "/api/gate/reject", body) @@ -261,7 +270,7 @@ func TestAPIGateReject_HeraldLogging(t *testing.T) { func TestAPIStatus_NotFound(t *testing.T) { d, _ := setupTestDB(t) - srv := NewServer(d, "", 5) + srv := mustNewServer(t, d) req := httptest.NewRequest("GET", "/api/nonexistent", nil) w := httptest.NewRecorder() diff --git a/cli/internal/dashboard/ws.go b/cli/internal/dashboard/ws.go index 7bcef70..c5afd57 100644 --- a/cli/internal/dashboard/ws.go +++ b/cli/internal/dashboard/ws.go @@ -23,7 +23,9 @@ type WSEvent struct { } var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { return true }, // allow all origins (dashboard is localhost-only by design) + // Allow all origins — the dashboard binds to localhost but may be accessed + // from different ports or via forwarded connections during development. + CheckOrigin: func(r *http.Request) bool { return true }, } // Hub manages WebSocket connections and broadcasts events. @@ -59,14 +61,29 @@ func (h *Hub) Broadcast(event WSEvent) { return } - h.mu.Lock() - defer h.mu.Unlock() + // Snapshot connections under read lock to avoid holding the lock during writes. + h.mu.RLock() + snapshot := make([]*websocket.Conn, 0, len(h.conns)) for conn := range h.conns { + snapshot = append(snapshot, conn) + } + h.mu.RUnlock() + + var failed []*websocket.Conn + for _, conn := range snapshot { conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { + failed = append(failed, conn) + } + } + + if len(failed) > 0 { + h.mu.Lock() + for _, conn := range failed { delete(h.conns, conn) conn.Close() } + h.mu.Unlock() } } From 92f291b0b6b4e2084764928b9bdf66d1c4f670e9 Mon Sep 17 00:00:00 2001 From: Justin Jones Date: Sat, 21 Mar 2026 17:34:18 -0500 Subject: [PATCH 2/2] =?UTF-8?q?fix(dashboard):=20address=20balrog=20findin?= =?UTF-8?q?gs=20=E2=80=94=20per-conn=20write=20mutex,=20idempotent=20close?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ws.go: introduce wsConn wrapper with per-connection write mutex (gorilla/websocket requires serialized writes) and sync.Once Close to prevent double-close on concurrent Remove + Broadcast failure - data.go: move error check before suffix filter in handleAutopsies to avoid masking DB errors as 404 - data.go: use PID-unique tmp path in handleConfigWrite, consistent with queue.go SaveCommandQueue Co-Authored-By: Claude Opus 4.6 --- cli/internal/dashboard/data.go | 13 +++---- cli/internal/dashboard/ws.go | 66 ++++++++++++++++++++++------------ 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/cli/internal/dashboard/data.go b/cli/internal/dashboard/data.go index 5919778..2368949 100644 --- a/cli/internal/dashboard/data.go +++ b/cli/internal/dashboard/data.go @@ -3,6 +3,7 @@ package dashboard import ( "context" "encoding/json" + "fmt" "net/http" "os" "path/filepath" @@ -27,6 +28,11 @@ func (s *Server) handleAutopsies(w http.ResponseWriter, r *http.Request) { return loadErr }) + if err != nil { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode([]interface{}{}) + return + } if suffix != "" { // Filter to matching record by quest name for _, r := range records { @@ -39,11 +45,6 @@ func (s *Server) handleAutopsies(w http.ResponseWriter, r *http.Request) { http.Error(w, "autopsy not found", http.StatusNotFound) return } - if err != nil { - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode([]interface{}{}) - return - } if records == nil { records = []autopsy.Autopsy{} } @@ -156,7 +157,7 @@ func (s *Server) handleConfigWrite(w http.ResponseWriter, r *http.Request) { http.Error(w, "failed to marshal config", http.StatusInternalServerError) return } - tmp := configPath + ".tmp" + tmp := fmt.Sprintf("%s.tmp.%d", configPath, os.Getpid()) if err := os.WriteFile(tmp, data, 0644); err != nil { http.Error(w, "failed to write config", http.StatusInternalServerError) return diff --git a/cli/internal/dashboard/ws.go b/cli/internal/dashboard/ws.go index c5afd57..9aef6f0 100644 --- a/cli/internal/dashboard/ws.go +++ b/cli/internal/dashboard/ws.go @@ -28,27 +28,48 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } +// wsConn wraps a websocket.Conn with a per-connection write mutex and +// idempotent Close. gorilla/websocket supports one concurrent reader and +// one concurrent writer — the write mutex serializes all writes to this +// connection across concurrent Broadcast calls. +type wsConn struct { + conn *websocket.Conn + writeMu sync.Mutex + once sync.Once +} + +func (c *wsConn) writeMessage(messageType int, data []byte, deadline time.Time) error { + c.writeMu.Lock() + defer c.writeMu.Unlock() + c.conn.SetWriteDeadline(deadline) + return c.conn.WriteMessage(messageType, data) +} + +func (c *wsConn) close() { + c.once.Do(func() { c.conn.Close() }) +} + // Hub manages WebSocket connections and broadcasts events. type Hub struct { mu sync.RWMutex - conns map[*websocket.Conn]struct{} + conns map[*wsConn]struct{} } func NewHub() *Hub { - return &Hub{conns: make(map[*websocket.Conn]struct{})} + return &Hub{conns: make(map[*wsConn]struct{})} } -func (h *Hub) Add(conn *websocket.Conn) { +func (h *Hub) add(c *wsConn) { h.mu.Lock() - h.conns[conn] = struct{}{} + h.conns[c] = struct{}{} h.mu.Unlock() } -func (h *Hub) Remove(conn *websocket.Conn) { +func (h *Hub) remove(c *wsConn) { h.mu.Lock() - delete(h.conns, conn) + delete(h.conns, c) h.mu.Unlock() - conn.Close() + c.close() } func (h *Hub) Broadcast(event WSEvent) { @@ -63,43 +84,44 @@ func (h *Hub) Broadcast(event WSEvent) { // Snapshot connections under read lock to avoid holding the lock during writes. h.mu.RLock() - snapshot := make([]*websocket.Conn, 0, len(h.conns)) - for conn := range h.conns { - snapshot = append(snapshot, conn) + snapshot := make([]*wsConn, 0, len(h.conns)) + for c := range h.conns { + snapshot = append(snapshot, c) } h.mu.RUnlock() - var failed []*websocket.Conn - for _, conn := range snapshot { - conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) - if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { - failed = append(failed, conn) + var failed []*wsConn + deadline := time.Now().Add(5 * time.Second) + for _, c := range snapshot { + if err := c.writeMessage(websocket.TextMessage, data, deadline); err != nil { + failed = append(failed, c) } } if len(failed) > 0 { h.mu.Lock() - for _, conn := range failed { - delete(h.conns, conn) - conn.Close() + for _, c := range failed { + delete(h.conns, c) + c.close() } h.mu.Unlock() } } func (h *Hub) HandleWS(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) + raw, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("ws: upgrade error: %v", err) return } - h.Add(conn) + c := &wsConn{conn: raw} + h.add(c) // Read pump — just drain pings/pongs, we don't expect client messages go func() { - defer h.Remove(conn) + defer h.remove(c) for { - if _, _, err := conn.ReadMessage(); err != nil { + if _, _, err := raw.ReadMessage(); err != nil { break } }