diff --git a/docs/backend/contracts/websocket-events.md b/docs/backend/contracts/websocket-events.md index f096154..2643110 100644 --- a/docs/backend/contracts/websocket-events.md +++ b/docs/backend/contracts/websocket-events.md @@ -9,6 +9,7 @@ Transport model: - Client sends JSON messages with a `type` discriminator. - Ephemeral events are relayed directly in-memory. - Durable structural events are committed through `canvas-service` first, then broadcast with committed metadata. +- The sender also receives the committed structural event back as the client-side ack path for optimistic queue reconciliation. - `crdt_op` and `sync_request` are special-cased for replay/sync. ## Join Flow diff --git a/frontend/package.json b/frontend/package.json index 0eb082e..6e0e2f0 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -22,7 +22,8 @@ "codemirror": "^6.0.2", "react": "^19.2.4", "react-dom": "^19.2.4", - "react-router-dom": "^7.13.2" + "react-router-dom": "^7.13.2", + "zustand": "^5.0.12" }, "devDependencies": { "@babel/core": "^7.29.0", diff --git a/frontend/pnpm-lock.yaml b/frontend/pnpm-lock.yaml index 34b6e6c..fa16344 100644 --- a/frontend/pnpm-lock.yaml +++ b/frontend/pnpm-lock.yaml @@ -41,6 +41,9 @@ importers: react-router-dom: specifier: ^7.13.2 version: 7.13.2(react-dom@19.2.4(react@19.2.4))(react@19.2.4) + zustand: + specifier: ^5.0.12 + version: 5.0.12(@types/react@19.2.14)(react@19.2.4) devDependencies: '@babel/core': specifier: ^7.29.0 @@ -1256,6 +1259,24 @@ packages: zod@4.3.6: resolution: {integrity: sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==} + zustand@5.0.12: + resolution: {integrity: sha512-i77ae3aZq4dhMlRhJVCYgMLKuSiZAaUPAct2AksxQ+gOtimhGMdXljRT21P5BNpeT4kXlLIckvkPM029OljD7g==} + engines: {node: '>=12.20.0'} + peerDependencies: + '@types/react': '>=18.0.0' + immer: '>=9.0.6' + react: '>=18.0.0' + use-sync-external-store: '>=1.2.0' + peerDependenciesMeta: + '@types/react': + optional: true + immer: + optional: true + react: + optional: true + use-sync-external-store: + optional: true + snapshots: '@babel/code-frame@7.29.0': @@ -2368,3 +2389,8 @@ snapshots: zod: 4.3.6 zod@4.3.6: {} + + zustand@5.0.12(@types/react@19.2.14)(react@19.2.4): + optionalDependencies: + '@types/react': 19.2.14 + react: 19.2.4 diff --git a/frontend/src/canvas/Canvas.tsx b/frontend/src/canvas/Canvas.tsx index e62e22f..eb93ea5 100644 --- a/frontend/src/canvas/Canvas.tsx +++ b/frontend/src/canvas/Canvas.tsx @@ -18,6 +18,8 @@ import { useCanvasShortcutContainer } from "./shortcuts/useCanvasShortcutContain import type { NodeDragVisual } from "./dragVisuals"; import { useCanvasDocument } from "./model/useCanvasDocument"; import { createCursorPresenceStore } from "./presence/cursorPresenceStore"; +import { createCanvasDocumentStore } from "./document/canvasDocumentStore"; +import { createCanvasOperationQueueStore } from "./ops/canvasOperationQueueStore"; interface CanvasProps { canvasId: string; @@ -41,6 +43,10 @@ export function Canvas({ canvasId, userId, displayName }: CanvasProps) { const viewportRef = useRef(null); const sendRef = useRef<((event: CanvasOutboundEvent) => void) | null>(null); const cursorStore = useMemo(() => createCursorPresenceStore(), []); + // eslint-disable-next-line react-hooks/exhaustive-deps + const documentStore = useMemo(() => createCanvasDocumentStore(), [canvasId]); + // eslint-disable-next-line react-hooks/exhaustive-deps + const operationQueueStore = useMemo(() => createCanvasOperationQueueStore(), [canvasId]); const [remoteSelections, setRemoteSelections] = useState< Map> >(new Map()); @@ -76,6 +82,8 @@ export function Canvas({ canvasId, userId, displayName }: CanvasProps) { viewportRef, transformRef, sendRef, + documentStore, + operationQueueStore, }); const [tool, setTool] = useState("select"); @@ -85,15 +93,8 @@ export function Canvas({ canvasId, userId, displayName }: CanvasProps) { const collabHandlers = useMemo( () => ({ - onNodeCreate: remote.applyNodeCreate, - onNodeMove: ( - remoteUserId: string, - nodeId: string, - x: number, - y: number, - ) => { + onNodeMove: (remoteUserId: string, nodeId: string, x: number, y: number) => { const previousNode = getNodeById(nodeId); - remote.applyNodeMove(remoteUserId, nodeId, x, y); if (!previousNode) return; setRemoteDragStates((prev) => { @@ -130,10 +131,6 @@ export function Canvas({ canvasId, userId, displayName }: CanvasProps) { return next; }); }, - onNodeUpdate: remote.applyNodeUpdate, - onNodeDelete: remote.applyNodeDelete, - onEdgeCreate: remote.applyEdgeCreate, - onEdgeDelete: remote.applyEdgeDelete, onNodeSelect: (remoteUserId: string, nodeIds: string[]) => { setRemoteSelections((prev) => { const next = new Map(prev); @@ -168,15 +165,17 @@ export function Canvas({ canvasId, userId, displayName }: CanvasProps) { remoteStrokes, send, onPointerMove: collabPointerMove, - } = useCanvasCollab( + } = useCanvasCollab({ canvasId, userId, displayName, viewportRef, transformRef, cursorStore, - collabHandlers, - ); + documentStore, + operationQueueStore, + handlers: collabHandlers, + }); useEffect(() => { sendRef.current = send; diff --git a/frontend/src/canvas/CursorOverlay.tsx b/frontend/src/canvas/CursorOverlay.tsx index 3aa76fb..7a75510 100644 --- a/frontend/src/canvas/CursorOverlay.tsx +++ b/frontend/src/canvas/CursorOverlay.tsx @@ -5,7 +5,7 @@ import resizeNwseCursorImage from "../assets/Cursor/Resize/North West South East import grabCursorImage from "../assets/Cursor/Grab/Grab.png"; import grabbingCursorImage from "../assets/Cursor/Grab/Grabbing.png"; import { - useRemoteCursors, + useInterpolatedRemoteCursors, type CursorPresenceStore, } from "./presence/cursorPresenceStore"; import type { CollabUser } from "./presence/types"; @@ -60,8 +60,12 @@ function CursorGlyph({
(null); - const cursors = useRemoteCursors(cursorStore); + const cursors = useInterpolatedRemoteCursors(cursorStore); useEffect(() => { const viewport = viewportRef.current; @@ -169,8 +173,12 @@ export function CursorOverlay({ aria-hidden="true" draggable={false} style={{ - left: localCursor.x - CURSOR_TIP_OFFSET_X, - top: localCursor.y - CURSOR_TIP_OFFSET_Y, + left: 0, + top: 0, + transform: `translate3d(${localCursor.x - CURSOR_TIP_OFFSET_X}px, ${ + localCursor.y - CURSOR_TIP_OFFSET_Y + }px, 0)`, + willChange: "transform", }} /> ))} diff --git a/frontend/src/canvas/document/canvasDocumentStore.ts b/frontend/src/canvas/document/canvasDocumentStore.ts new file mode 100644 index 0000000..43a6064 --- /dev/null +++ b/frontend/src/canvas/document/canvasDocumentStore.ts @@ -0,0 +1,273 @@ +import type { SetStateAction } from "react"; +import { useStore } from "zustand"; +import { createStore } from "zustand/vanilla"; +import type { + CanvasCommittedStructuralEvent, + CanvasOutboundStructuralEvent, +} from "../../shared/events"; +import type { CanvasNode, Edge } from "../../shared/nodes"; + +interface CanvasDocumentStoreState { + baseNodes: CanvasNode[]; + baseEdges: Edge[]; + nodes: CanvasNode[]; + edges: Edge[]; + headVersion: number; + replaceFromBootstrap: ( + nodes: CanvasNode[], + edges: Edge[], + headVersion: number, + pendingOperations?: CanvasOutboundStructuralEvent[], + ) => void; + applyOptimisticOperation: (event: CanvasOutboundStructuralEvent) => void; + applyOptimisticOperations: ( + events: CanvasOutboundStructuralEvent[], + ) => void; + rebaseOnCommittedOperation: ( + event: CanvasCommittedStructuralEvent, + pendingOperations: CanvasOutboundStructuralEvent[], + ) => void; + setNodes: (value: SetStateAction) => void; +} + +export type CanvasDocumentStore = ReturnType; + +function upsertNode(nodes: CanvasNode[], node: CanvasNode): CanvasNode[] { + const existingIndex = nodes.findIndex((candidate) => candidate.id === node.id); + if (existingIndex === -1) { + return [...nodes, node]; + } + const next = [...nodes]; + next[existingIndex] = node; + return next; +} + +function upsertEdge(edges: Edge[], edge: Edge): Edge[] { + const existingIndex = edges.findIndex((candidate) => candidate.id === edge.id); + if (existingIndex === -1) { + return [...edges, edge]; + } + const next = [...edges]; + next[existingIndex] = edge; + return next; +} + +function removeNodeAndAttachedEdges( + nodes: CanvasNode[], + edges: Edge[], + nodeId: string, +) { + return { + nodes: nodes.filter((node) => node.id !== nodeId), + edges: edges.filter( + (edge) => edge.fromNodeId !== nodeId && edge.toNodeId !== nodeId, + ), + }; +} + +function applyStructuralOperation( + nodes: CanvasNode[], + edges: Edge[], + event: CanvasOutboundStructuralEvent | CanvasCommittedStructuralEvent, +) { + switch (event.type) { + case "node_create": + return { nodes: upsertNode(nodes, event.node), edges }; + + case "node_move": + return { + nodes: nodes.map((node) => + node.id === event.nodeId + ? { ...node, x: event.x, y: event.y } + : node, + ), + edges, + }; + + case "node_update": + return { + nodes: nodes.map((node) => + node.id === event.nodeId + ? ({ ...node, ...event.patch } as CanvasNode) + : node, + ), + edges, + }; + + case "node_delete": + return removeNodeAndAttachedEdges(nodes, edges, event.nodeId); + + case "edge_create": + return { nodes, edges: upsertEdge(edges, event.edge) }; + + case "edge_delete": + return { + nodes, + edges: edges.filter((edge) => edge.id !== event.edgeId), + }; + } +} + +function replayPendingOperations( + baseNodes: CanvasNode[], + baseEdges: Edge[], + pendingOperations: CanvasOutboundStructuralEvent[], +) { + let projectedNodes = baseNodes; + let projectedEdges = baseEdges; + + for (const operation of pendingOperations) { + const next = applyStructuralOperation( + projectedNodes, + projectedEdges, + operation, + ); + projectedNodes = next.nodes; + projectedEdges = next.edges; + } + + return { projectedNodes, projectedEdges }; +} + +function syncBaseNodeContent( + baseNodes: CanvasNode[], + projectedNodes: CanvasNode[], +): CanvasNode[] { + const projectedById = new Map(projectedNodes.map((node) => [node.id, node])); + return baseNodes.map((baseNode) => { + const projectedNode = projectedById.get(baseNode.id); + if (!projectedNode) { + return baseNode; + } + + if (baseNode.type === "code" && projectedNode.type === "code") { + return { + ...baseNode, + data: { + ...baseNode.data, + content: projectedNode.data.content, + language: projectedNode.data.language, + }, + }; + } + + if (baseNode.type === "note" && projectedNode.type === "note") { + return { + ...baseNode, + data: { + ...baseNode.data, + content: projectedNode.data.content, + }, + }; + } + + return baseNode; + }); +} + +/** + * Own durable canvas structure on the client as two layers: + * + * - base state: the last committed snapshot plus committed structural events + * - projected state: base state with pending local operations replayed on top + * + * This keeps optimistic local edits responsive without letting remote committed + * events permanently stomp over still-pending local intent. + */ +export function createCanvasDocumentStore() { + return createStore((set) => ({ + baseNodes: [], + baseEdges: [], + nodes: [], + edges: [], + headVersion: 0, + + replaceFromBootstrap: ( + nodes, + edges, + headVersion, + pendingOperations = [], + ) => { + const { projectedNodes, projectedEdges } = replayPendingOperations( + nodes, + edges, + pendingOperations, + ); + + set({ + baseNodes: nodes, + baseEdges: edges, + nodes: projectedNodes, + edges: projectedEdges, + headVersion, + }); + }, + + applyOptimisticOperation: (event) => + set((state) => { + const next = applyStructuralOperation(state.nodes, state.edges, event); + return { + nodes: next.nodes, + edges: next.edges, + }; + }), + + applyOptimisticOperations: (events) => + set((state) => { + let nodes = state.nodes; + let edges = state.edges; + + for (const event of events) { + const next = applyStructuralOperation(nodes, edges, event); + nodes = next.nodes; + edges = next.edges; + } + + return { nodes, edges }; + }), + + rebaseOnCommittedOperation: (event, pendingOperations) => + set((state) => { + const rebased = applyStructuralOperation( + state.baseNodes, + state.baseEdges, + event, + ); + const { projectedNodes, projectedEdges } = replayPendingOperations( + rebased.nodes, + rebased.edges, + pendingOperations, + ); + + return { + baseNodes: rebased.nodes, + baseEdges: rebased.edges, + nodes: projectedNodes, + edges: projectedEdges, + headVersion: Math.max(state.headVersion, event.version), + }; + }), + + setNodes: (value) => + set((state) => { + const nextNodes = + typeof value === "function" + ? (value as (prev: CanvasNode[]) => CanvasNode[])(state.nodes) + : value; + return { + nodes: nextNodes, + baseNodes: syncBaseNodeContent(state.baseNodes, nextNodes), + }; + }), + })); +} + +/** + * Subscribe to one slice of a canvas document store from React. + */ +export function useCanvasDocumentStore( + store: CanvasDocumentStore, + selector: (state: CanvasDocumentStoreState) => T, +): T { + return useStore(store, selector); +} diff --git a/frontend/src/canvas/hooks/useCanvasCollab.ts b/frontend/src/canvas/hooks/useCanvasCollab.ts index 39e7e10..fbb57ad 100644 --- a/frontend/src/canvas/hooks/useCanvasCollab.ts +++ b/frontend/src/canvas/hooks/useCanvasCollab.ts @@ -1,58 +1,140 @@ import { useCallback, useEffect, useRef, useState } from "react"; import type { Transform } from "../types"; import type { + CanvasCommittedStructuralEvent, CanvasEventHandlers, CanvasInboundEvent, CanvasOutboundEvent, } from "../../shared/events"; +import { + isCommittedStructuralInboundEvent, +} from "../../shared/events"; import { screenToWorld } from "../utils/coordinates"; import { COLLAB_WS_URL } from "../../shared/config/env"; import type { CursorPresenceStore } from "../presence/cursorPresenceStore"; import type { CollabUser, RemoteStroke } from "../presence/types"; +import type { CanvasDocumentStore } from "../document/canvasDocumentStore"; +import type { CanvasOperationQueueStore } from "../ops/canvasOperationQueueStore"; +import { + getNextDispatchableOperation, + getPendingStructuralOperations, +} from "../ops/canvasOperationQueueStore"; const CURSOR_MOVE_UPDATE_INTERVAL = 17; +interface UseCanvasCollabArgs { + canvasId: string; + userId: string; + displayName: string; + viewportRef: React.RefObject; + transformRef: React.RefObject; + cursorStore: CursorPresenceStore; + documentStore: CanvasDocumentStore; + operationQueueStore: CanvasOperationQueueStore; + handlers?: CanvasEventHandlers; +} + /** - * Owns the shared canvas WebSocket session and routes inbound events by - * concern: document changes stay in React state/callbacks, while high-frequency - * remote cursor updates are written into an external presence store. + * Own the shared canvas WebSocket session and bridge transport events into the + * document store, operation queue, and lightweight presence state. */ -export function useCanvasCollab( - canvasId: string, - userId: string, - displayName: string, - viewportRef: React.RefObject, - transformRef: React.RefObject, - cursorStore: CursorPresenceStore, - handlers: CanvasEventHandlers = {}, -) { +export function useCanvasCollab({ + canvasId, + userId, + displayName, + viewportRef, + transformRef, + cursorStore, + documentStore, + operationQueueStore, + handlers = {}, +}: UseCanvasCollabArgs) { const [users, setUsers] = useState([]); - // Active in-progress strokes from remote users, keyed by userId. - // Points are world-space, accumulated as draw_points batches arrive. - // Cleared when draw_end arrives or the user leaves. const [remoteStrokes, setRemoteStrokes] = useState>( new Map(), ); const wsRef = useRef(null); const lastCursorSentAt = useRef(0); - - // Keep newest callbacks without reconnecting the socket every render. + const bufferedStructuralEventsRef = useRef( + [], + ); const handlersRef = useRef(handlers); useEffect(() => { handlersRef.current = handlers; }, [handlers]); + const send = useCallback( + (event: CanvasOutboundEvent) => { + const ws = wsRef.current; + if (!ws || ws.readyState !== WebSocket.OPEN) return; + ws.send(JSON.stringify({ ...event, canvasId, userId })); + }, + [canvasId, userId], + ); + + const reconcileCommittedStructuralEvent = useCallback( + (event: CanvasCommittedStructuralEvent) => { + const isLocalAck = event.userId === userId; + + if (!isLocalAck && event.type === "node_move") { + handlersRef.current.onNodeMove?.( + event.userId, + event.nodeId, + event.x, + event.y, + ); + } + + if (isLocalAck) { + operationQueueStore.getState().acknowledge(event.clientOperationId); + } + + operationQueueStore.getState().pruneForCommittedEvent(event); + documentStore.getState().rebaseOnCommittedOperation( + event, + getPendingStructuralOperations(operationQueueStore), + ); + }, + [documentStore, operationQueueStore, userId], + ); + + const tryDispatchNextQueuedOperation = useCallback(() => { + const next = getNextDispatchableOperation(operationQueueStore); + if (!next) { + return; + } + + const ws = wsRef.current; + if (!ws || ws.readyState !== WebSocket.OPEN) { + return; + } + + try { + ws.send(JSON.stringify({ ...next.event, canvasId, userId })); + operationQueueStore.getState().markInFlight(next.clientOperationId); + } catch (error) { + console.error("Failed to dispatch queued structural operation", error); + operationQueueStore.getState().setDispatchReady(false); + } + }, [canvasId, operationQueueStore, userId]); + + useEffect(() => { + tryDispatchNextQueuedOperation(); + return operationQueueStore.subscribe(() => { + tryDispatchNextQueuedOperation(); + }); + }, [operationQueueStore, tryDispatchNextQueuedOperation]); + useEffect(() => { const ws = new WebSocket(COLLAB_WS_URL); ws.onopen = () => { - // Reset local collab state when a fresh socket session is established. setUsers([]); cursorStore.clear(); setRemoteStrokes(new Map()); - - // Assign only when open so stale constructing sockets cannot clobber ref. + bufferedStructuralEventsRef.current = []; + operationQueueStore.getState().setDispatchReady(false); wsRef.current = ws; ws.send(JSON.stringify({ type: "join", canvasId, userId, displayName })); }; @@ -66,9 +148,14 @@ export function useCanvasCollab( return; } - // The relay currently echoes to all peers except sender for most events, - // but this guard is still useful for safety and future server changes. - if ("userId" in msg && msg.userId === userId) return; + if (isCommittedStructuralInboundEvent(msg)) { + if (!operationQueueStore.getState().dispatchReady) { + bufferedStructuralEventsRef.current.push(msg); + return; + } + reconcileCommittedStructuralEvent(msg); + return; + } switch (msg.type) { case "presence_snapshot": @@ -90,6 +177,25 @@ export function useCanvasCollab( ); break; + case "canvas_bootstrap": { + documentStore.getState().replaceFromBootstrap( + msg.nodes, + msg.edges, + msg.headVersion, + getPendingStructuralOperations(operationQueueStore), + ); + + const buffered = [...bufferedStructuralEventsRef.current].sort( + (a, b) => a.version - b.version, + ); + bufferedStructuralEventsRef.current = []; + for (const bufferedEvent of buffered) { + reconcileCommittedStructuralEvent(bufferedEvent); + } + operationQueueStore.getState().setDispatchReady(true); + break; + } + case "user_join": handlersRef.current.onUserJoin?.({ ...msg.user, @@ -118,6 +224,7 @@ export function useCanvasCollab( break; case "cursor_move": + if (msg.userId === userId) break; cursorStore.upsertCursor({ userId: msg.userId, x: msg.x, @@ -125,48 +232,23 @@ export function useCanvasCollab( }); break; - case "node_create": - handlersRef.current.onNodeCreate?.(msg.node); - break; - - case "node_move": - handlersRef.current.onNodeMove?.( - msg.userId, - msg.nodeId, - msg.x, - msg.y, - ); - break; - case "node_drag_start": + if (msg.userId === userId) break; handlersRef.current.onNodeDragStart?.(msg.userId, msg.nodeIds); break; case "node_drag_end": + if (msg.userId === userId) break; handlersRef.current.onNodeDragEnd?.(msg.userId); break; - case "node_update": - handlersRef.current.onNodeUpdate?.(msg.nodeId, msg.patch); - break; - - case "node_delete": - handlersRef.current.onNodeDelete?.(msg.nodeId); - break; - - case "edge_create": - handlersRef.current.onEdgeCreate?.(msg.edge); - break; - - case "edge_delete": - handlersRef.current.onEdgeDelete?.(msg.edgeId); - break; - case "node_select": + if (msg.userId === userId) break; handlersRef.current.onNodeSelect?.(msg.userId, msg.nodeIds); break; case "draw_points": + if (msg.userId === userId) break; setRemoteStrokes((prev) => { const next = new Map(prev); const existing = next.get(msg.userId); @@ -179,6 +261,7 @@ export function useCanvasCollab( break; case "draw_end": + if (msg.userId === userId) break; setRemoteStrokes((prev) => { const next = new Map(prev); next.delete(msg.userId); @@ -198,6 +281,7 @@ export function useCanvasCollab( break; case "crdt_op": + if (msg.userId === userId) break; handlersRef.current.onCrdtOp?.(msg.docId, msg.op, msg.userId); break; @@ -208,12 +292,14 @@ export function useCanvasCollab( }; ws.onclose = () => { - // StrictMode-safe: only clear if this exact socket is still current. if (wsRef.current === ws) wsRef.current = null; setUsers([]); cursorStore.clear(); setRemoteStrokes(new Map()); + bufferedStructuralEventsRef.current = []; + operationQueueStore.getState().setDispatchReady(false); + operationQueueStore.getState().requeueInFlightOperations(); }; ws.onerror = (err) => { @@ -221,23 +307,22 @@ export function useCanvasCollab( }; return () => { + bufferedStructuralEventsRef.current = []; + operationQueueStore.getState().setDispatchReady(false); + operationQueueStore.getState().requeueInFlightOperations(); cursorStore.clear(); ws.close(); }; - }, [canvasId, userId, displayName, cursorStore]); - - const send = useCallback( - (event: CanvasOutboundEvent) => { - const ws = wsRef.current; - if (!ws || ws.readyState !== WebSocket.OPEN) return; - - // canvasId + userId are envelope metadata used by relay/router logic. - ws.send(JSON.stringify({ ...event, canvasId, userId })); - }, - [canvasId, userId], - ); + }, [ + canvasId, + cursorStore, + displayName, + documentStore, + operationQueueStore, + reconcileCommittedStructuralEvent, + userId, + ]); - // Throttled cursor move updates (50fps) to cap network spam. const onPointerMove = useCallback( (e: React.PointerEvent) => { const now = performance.now(); diff --git a/frontend/src/canvas/model/useCanvasDocument.ts b/frontend/src/canvas/model/useCanvasDocument.ts index fb031af..4e40605 100644 --- a/frontend/src/canvas/model/useCanvasDocument.ts +++ b/frontend/src/canvas/model/useCanvasDocument.ts @@ -2,10 +2,12 @@ import { useCallback, useLayoutEffect, useMemo, useRef, useState } from "react"; import type { RefObject, SetStateAction } from "react"; import { useCanvasCrdt } from "../../shared/crdt/useCanvasCrdt"; import type { CrdtOp } from "../../shared/crdt"; -import type { CanvasOutboundEvent } from "../../shared/events"; +import type { + CanvasOutboundEvent, + CanvasOutboundStructuralEvent, +} from "../../shared/events"; import { type CanvasNode, - type Edge, type NodeType, createCodeNode, createDrawNode, @@ -13,6 +15,9 @@ import { createProblemNode, createTestResultsNode, } from "../../shared/nodes"; +import type { CanvasDocumentStore } from "../document/canvasDocumentStore"; +import { useCanvasDocumentStore } from "../document/canvasDocumentStore"; +import type { CanvasOperationQueueStore } from "../ops/canvasOperationQueueStore"; import type { Transform } from "../types"; import { screenToWorld } from "../utils/coordinates"; @@ -21,6 +26,8 @@ interface UseCanvasDocumentArgs { viewportRef: RefObject; transformRef: RefObject; sendRef: RefObject<((event: CanvasOutboundEvent) => void) | null>; + documentStore: CanvasDocumentStore; + operationQueueStore: CanvasOperationQueueStore; } function assertNever(value: never): never { @@ -62,20 +69,23 @@ function cloneCanvasNode(node: CanvasNode, dx: number, dy: number): CanvasNode { } /** - * Owns the local canvas document model: nodes, edges, selection, and document commands. + * Bridge the canvas UI command surface onto the new document store + queue + * architecture. * - * This hook is the primary mutation boundary for canvas content. UI handlers, - * draw tools, shortcuts, and remote collaboration events all funnel through - * the command surface returned here instead of mutating canvas state inline. + * Structural commands now optimistically project into the document store and + * enqueue durable operations separately, while ephemeral selection/CRDT flows + * still send immediately over the socket path. */ export function useCanvasDocument({ userId, viewportRef, transformRef, sendRef, + documentStore, + operationQueueStore, }: UseCanvasDocumentArgs) { - const [nodes, setNodes] = useState([]); - const [edges, setEdges] = useState([]); + const nodes = useCanvasDocumentStore(documentStore, (state) => state.nodes); + const edges = useCanvasDocumentStore(documentStore, (state) => state.edges); const [selectedNodeIds, setSelectedNodeIds] = useState>( () => new Set(), ); @@ -85,20 +95,6 @@ export function useCanvasDocument({ nodesRef.current = nodes; }, [nodes]); - const setNodesWithRef = useCallback( - (value: SetStateAction) => { - setNodes((prev) => { - const next = - typeof value === "function" - ? (value as (prev: CanvasNode[]) => CanvasNode[])(prev) - : value; - nodesRef.current = next; - return next; - }); - }, - [], - ); - const sendEvent = useCallback( (event: CanvasOutboundEvent) => { sendRef.current?.(event); @@ -106,6 +102,13 @@ export function useCanvasDocument({ [sendRef], ); + const setNodesWithRef = useCallback( + (value: SetStateAction) => { + documentStore.getState().setNodes(value); + }, + [documentStore], + ); + const { onTextEdits, onCrdtOp, @@ -118,44 +121,65 @@ export function useCanvasDocument({ sendRef, }); + const enqueueStructuralOperation = useCallback( + (event: CanvasOutboundStructuralEvent) => { + documentStore.getState().applyOptimisticOperation(event); + operationQueueStore.getState().enqueue(event); + }, + [documentStore, operationQueueStore], + ); + + const enqueueStructuralOperations = useCallback( + (events: CanvasOutboundStructuralEvent[]) => { + if (events.length === 0) { + return; + } + documentStore.getState().applyOptimisticOperations(events); + const queue = operationQueueStore.getState(); + for (const event of events) { + queue.enqueue(event); + } + }, + [documentStore, operationQueueStore], + ); + const updateNode = useCallback( (id: string, patch: Partial) => { - setNodesWithRef((prev) => - prev.map((node) => - node.id === id ? ({ ...node, ...patch } as CanvasNode) : node, - ), - ); - sendEvent({ type: "node_update", nodeId: id, patch }); + enqueueStructuralOperation({ + type: "node_update", + clientOperationId: crypto.randomUUID(), + nodeId: id, + patch, + }); }, - [sendEvent, setNodesWithRef], + [enqueueStructuralOperation], ); const moveNodes = useCallback( (moves: Array<{ id: string; x: number; y: number }>) => { - setNodesWithRef((prev) => { - const moveMap = new Map(moves.map((move) => [move.id, move])); - return prev.map((node) => { - const move = moveMap.get(node.id); - return move ? { ...node, x: move.x, y: move.y } : node; - }); - }); - for (const move of moves) { - sendEvent({ type: "node_move", nodeId: move.id, x: move.x, y: move.y }); - } + enqueueStructuralOperations( + moves.map((move) => ({ + type: "node_move" as const, + clientOperationId: crypto.randomUUID(), + nodeId: move.id, + x: move.x, + y: move.y, + })), + ); }, - [sendEvent, setNodesWithRef], + [enqueueStructuralOperations], ); const resizeNode = useCallback( (id: string, width: number, height: number) => { - setNodesWithRef((prev) => - prev.map((node) => - node.id === id ? { ...node, width, height } : node, - ), - ); - sendEvent({ type: "node_update", nodeId: id, patch: { width, height } }); + enqueueStructuralOperation({ + type: "node_update", + clientOperationId: crypto.randomUUID(), + nodeId: id, + patch: { width, height }, + }); }, - [sendEvent, setNodesWithRef], + [enqueueStructuralOperation], ); const selectNodes = useCallback( @@ -172,12 +196,6 @@ export function useCanvasDocument({ const deleteNode = useCallback( (nodeId: string) => { - setNodesWithRef((prev) => prev.filter((node) => node.id !== nodeId)); - setEdges((prev) => - prev.filter( - (edge) => edge.fromNodeId !== nodeId && edge.toNodeId !== nodeId, - ), - ); setSelectedNodeIds((prev) => { if (!prev.has(nodeId)) return prev; const next = new Set(prev); @@ -185,9 +203,13 @@ export function useCanvasDocument({ return next; }); onDeleteCrdtDoc(nodeId); - sendEvent({ type: "node_delete", nodeId }); + enqueueStructuralOperation({ + type: "node_delete", + clientOperationId: crypto.randomUUID(), + nodeId, + }); }, - [onDeleteCrdtDoc, sendEvent, setNodesWithRef], + [enqueueStructuralOperation, onDeleteCrdtDoc], ); const cloneNode = useCallback( @@ -196,21 +218,27 @@ export function useCanvasDocument({ if (!sourceNode) return; const clonedNode = cloneCanvasNode(sourceNode, 24, 24); - setNodesWithRef((prev) => [...prev, clonedNode]); - sendEvent({ type: "node_create", node: clonedNode }); + enqueueStructuralOperation({ + type: "node_create", + clientOperationId: crypto.randomUUID(), + node: clonedNode, + }); selectNodes(new Set([clonedNode.id])); }, - [selectNodes, sendEvent, setNodesWithRef], + [enqueueStructuralOperation, selectNodes], ); const pasteNodeFromSnapshot = useCallback( (sourceNode: CanvasNode) => { const pastedNode = cloneCanvasNode(sourceNode, 24, 24); - setNodesWithRef((prev) => [...prev, pastedNode]); - sendEvent({ type: "node_create", node: pastedNode }); + enqueueStructuralOperation({ + type: "node_create", + clientOperationId: crypto.randomUUID(), + node: pastedNode, + }); selectNodes(new Set([pastedNode.id])); }, - [selectNodes, sendEvent, setNodesWithRef], + [enqueueStructuralOperation, selectNodes], ); const commitDrawStroke = useCallback( @@ -227,7 +255,9 @@ export function useCanvasDocument({ const nodeY = minY - padding; const nodeW = Math.max(maxX - minX + 2 * padding, 1); const nodeH = Math.max(maxY - minY + 2 * padding, 1); - const relativePoints = points.map(([x, y]) => [x - nodeX, y - nodeY] as [number, number]); + const relativePoints = points.map( + ([x, y]) => [x - nodeX, y - nodeY] as [number, number], + ); const node = createDrawNode( nodeX, @@ -237,11 +267,14 @@ export function useCanvasDocument({ relativePoints, strokeThickness, ); - setNodesWithRef((prev) => [...prev, node]); - sendEvent({ type: "node_create", node }); + enqueueStructuralOperation({ + type: "node_create", + clientOperationId: crypto.randomUUID(), + node, + }); selectNodes(new Set([node.id])); }, - [selectNodes, sendEvent, setNodesWithRef], + [enqueueStructuralOperation, selectNodes], ); const spawnNode = useCallback( @@ -260,118 +293,50 @@ export function useCanvasDocument({ node.x = world.x - node.width / 2; node.y = world.y - node.height / 2; - setNodesWithRef((prev) => [...prev, node]); - sendEvent({ type: "node_create", node }); - selectNodes(new Set([node.id])); + const operations: CanvasOutboundStructuralEvent[] = [ + { + type: "node_create", + clientOperationId: crypto.randomUUID(), + node, + }, + ]; if (fromNodeId) { - const edge: Edge = { - id: crypto.randomUUID(), - fromNodeId, - toNodeId: node.id, - }; - setEdges((prev) => [...prev, edge]); - sendEvent({ type: "edge_create", edge }); + operations.push({ + type: "edge_create", + clientOperationId: crypto.randomUUID(), + edge: { + id: crypto.randomUUID(), + fromNodeId, + toNodeId: node.id, + }, + }); } + enqueueStructuralOperations(operations); + selectNodes(new Set([node.id])); return node.id; }, - [selectNodes, sendEvent, setNodesWithRef, transformRef, viewportRef], - ); - - const getNodeById = useCallback((nodeId: string) => { - return nodesRef.current.find((node) => node.id === nodeId); - }, []); - - const applyRemoteNodeCreate = useCallback( - (node: CanvasNode) => { - setNodesWithRef((prev) => [...prev, node]); - }, - [setNodesWithRef], - ); - - const applyRemoteNodeMove = useCallback( - (_userId: string, nodeId: string, x: number, y: number) => { - setNodesWithRef((prev) => - prev.map((node) => (node.id === nodeId ? { ...node, x, y } : node)), - ); - }, - [setNodesWithRef], - ); - - const applyRemoteNodeUpdate = useCallback( - (nodeId: string, patch: Partial) => { - setNodesWithRef((prev) => - prev.map((node) => - node.id === nodeId ? ({ ...node, ...patch } as CanvasNode) : node, - ), - ); - }, - [setNodesWithRef], + [enqueueStructuralOperations, selectNodes, transformRef, viewportRef], ); - const applyRemoteNodeDelete = useCallback( + const getNodeById = useCallback( (nodeId: string) => { - setNodesWithRef((prev) => prev.filter((node) => node.id !== nodeId)); - setEdges((prev) => - prev.filter( - (edge) => edge.fromNodeId !== nodeId && edge.toNodeId !== nodeId, - ), - ); - setSelectedNodeIds((prev) => { - if (!prev.has(nodeId)) return prev; - const next = new Set(prev); - next.delete(nodeId); - return next; - }); - onDeleteCrdtDoc(nodeId); - }, - [onDeleteCrdtDoc, setNodesWithRef], - ); - - const applyRemoteEdgeCreate = useCallback((edge: Edge) => { - setEdges((prev) => [...prev, edge]); - }, []); - - const applyRemoteEdgeDelete = useCallback((edgeId: string) => { - setEdges((prev) => prev.filter((edge) => edge.id !== edgeId)); - }, []); - - const applyRemoteCrdtOp = useCallback( - (docId: string, op: CrdtOp) => { - onCrdtOp(docId, op); - }, - [onCrdtOp], - ); - - const applyRemoteSyncResponse = useCallback( - (docId: string, ops: CrdtOp[]) => { - onSyncResponse(docId, ops); + return documentStore.getState().nodes.find((node) => node.id === nodeId); }, - [onSyncResponse], + [documentStore], ); const remote = useMemo( () => ({ - applyNodeCreate: applyRemoteNodeCreate, - applyNodeMove: applyRemoteNodeMove, - applyNodeUpdate: applyRemoteNodeUpdate, - applyNodeDelete: applyRemoteNodeDelete, - applyEdgeCreate: applyRemoteEdgeCreate, - applyEdgeDelete: applyRemoteEdgeDelete, - applyCrdtOp: applyRemoteCrdtOp, - applySyncResponse: applyRemoteSyncResponse, + applyCrdtOp: (docId: string, op: CrdtOp) => { + onCrdtOp(docId, op); + }, + applySyncResponse: (docId: string, ops: CrdtOp[]) => { + onSyncResponse(docId, ops); + }, }), - [ - applyRemoteCrdtOp, - applyRemoteEdgeCreate, - applyRemoteEdgeDelete, - applyRemoteNodeCreate, - applyRemoteNodeDelete, - applyRemoteNodeMove, - applyRemoteNodeUpdate, - applyRemoteSyncResponse, - ], + [onCrdtOp, onSyncResponse], ); return { diff --git a/frontend/src/canvas/ops/canvasOperationQueueStore.ts b/frontend/src/canvas/ops/canvasOperationQueueStore.ts new file mode 100644 index 0000000..ef3a618 --- /dev/null +++ b/frontend/src/canvas/ops/canvasOperationQueueStore.ts @@ -0,0 +1,179 @@ +import { useStore } from "zustand"; +import { createStore } from "zustand/vanilla"; +import type { + CanvasCommittedStructuralEvent, + CanvasOutboundStructuralEvent, +} from "../../shared/events"; + +type OperationStatus = "pending" | "in_flight"; + +export interface CanvasQueuedOperation { + clientOperationId: string; + event: CanvasOutboundStructuralEvent; + status: OperationStatus; + attemptCount: number; + createdAt: number; +} + +interface CanvasOperationQueueState { + operations: CanvasQueuedOperation[]; + dispatchReady: boolean; + enqueue: (event: CanvasOutboundStructuralEvent) => void; + markInFlight: (clientOperationId: string) => void; + acknowledge: (clientOperationId: string) => void; + setDispatchReady: (ready: boolean) => void; + requeueInFlightOperations: () => void; + pruneForCommittedEvent: (event: CanvasCommittedStructuralEvent) => void; +} + +export type CanvasOperationQueueStore = ReturnType< + typeof createCanvasOperationQueueStore +>; + +function operationTouchesNode( + operation: CanvasOutboundStructuralEvent, + nodeId: string, +) { + switch (operation.type) { + case "node_create": + return operation.node.id === nodeId; + case "node_move": + case "node_update": + case "node_delete": + return operation.nodeId === nodeId; + case "edge_create": + return ( + operation.edge.fromNodeId === nodeId || operation.edge.toNodeId === nodeId + ); + case "edge_delete": + return false; + } +} + +function operationTouchesEdge( + operation: CanvasOutboundStructuralEvent, + edgeId: string, +) { + switch (operation.type) { + case "edge_create": + return operation.edge.id === edgeId; + case "edge_delete": + return operation.edgeId === edgeId; + default: + return false; + } +} + +/** + * Own the client-side structural send queue. + * + * The queue is intentionally separate from the document store so optimistic UI + * projection and transport/retry state do not collapse into one god store. + */ +export function createCanvasOperationQueueStore() { + return createStore((set) => ({ + operations: [], + dispatchReady: false, + + enqueue: (event) => + set((state) => ({ + operations: [ + ...state.operations, + { + clientOperationId: event.clientOperationId, + event, + status: "pending", + attemptCount: 0, + createdAt: Date.now(), + }, + ], + })), + + markInFlight: (clientOperationId) => + set((state) => ({ + operations: state.operations.map((operation) => + operation.clientOperationId === clientOperationId + ? { + ...operation, + status: "in_flight", + attemptCount: operation.attemptCount + 1, + } + : operation, + ), + })), + + acknowledge: (clientOperationId) => + set((state) => ({ + operations: state.operations.filter( + (operation) => operation.clientOperationId !== clientOperationId, + ), + })), + + setDispatchReady: (dispatchReady) => set({ dispatchReady }), + + requeueInFlightOperations: () => + set((state) => ({ + operations: state.operations.map((operation) => + operation.status === "in_flight" + ? { ...operation, status: "pending" } + : operation, + ), + })), + + pruneForCommittedEvent: (event) => + set((state) => ({ + operations: state.operations.filter((operation) => { + switch (event.type) { + case "node_delete": + return !operationTouchesNode(operation.event, event.nodeId); + case "edge_delete": + return !operationTouchesEdge(operation.event, event.edgeId); + default: + return true; + } + }), + })), + })); +} + +/** + * Subscribe to one slice of a structural operation queue store from React. + */ +export function useCanvasOperationQueueStore( + store: CanvasOperationQueueStore, + selector: (state: CanvasOperationQueueState) => T, +): T { + return useStore(store, selector); +} + +/** + * Read all still-pending local structural operations in FIFO order. + * + * Both pending and in-flight operations count as local intent that should be + * replayed on top of committed base state. + */ +export function getPendingStructuralOperations( + store: CanvasOperationQueueStore, +): CanvasOutboundStructuralEvent[] { + return store.getState().operations.map((operation) => operation.event); +} + +/** + * Return the next operation that can be dispatched right now, if any. + */ +export function getNextDispatchableOperation( + store: CanvasOperationQueueStore, +): CanvasQueuedOperation | null { + const state = store.getState(); + if (!state.dispatchReady) { + return null; + } + + if (state.operations.some((operation) => operation.status === "in_flight")) { + return null; + } + + return ( + state.operations.find((operation) => operation.status === "pending") ?? null + ); +} diff --git a/frontend/src/canvas/presence/cursorPresenceStore.ts b/frontend/src/canvas/presence/cursorPresenceStore.ts index 7817752..3989620 100644 --- a/frontend/src/canvas/presence/cursorPresenceStore.ts +++ b/frontend/src/canvas/presence/cursorPresenceStore.ts @@ -1,25 +1,86 @@ -import { useSyncExternalStore } from "react"; -import type { RemoteCursor } from "./types"; +import { useEffect, useMemo, useState, useSyncExternalStore } from "react"; +import type { RemoteCursor, RemoteCursorSample, RemoteCursorTrack } from "./types"; type CursorPresenceListener = () => void; +const CURSOR_RENDER_DELAY_MS = 75; +const MAX_CURSOR_SAMPLES = 8; +const CURSOR_SAMPLE_RETENTION_MS = 250; + +function getMonotonicNow() { + return typeof performance === "undefined" ? Date.now() : performance.now(); +} + export interface CursorPresenceStore { subscribe(listener: CursorPresenceListener): () => void; - getSnapshot(): ReadonlyMap; + getSnapshot(): ReadonlyMap; upsertCursor(cursor: RemoteCursor): void; removeCursor(userId: string): void; clear(): void; } +function trimSamples( + samples: RemoteCursorSample[], + now: number, +): RemoteCursorSample[] { + const cutoff = now - CURSOR_SAMPLE_RETENTION_MS; + const retained = samples.filter((sample) => sample.receivedAt >= cutoff); + return retained.slice(-MAX_CURSOR_SAMPLES); +} + +function interpolateTrack( + track: RemoteCursorTrack, + renderAt: number, +): RemoteCursor { + const samples = track.samples; + const earliest = samples[0]; + const latest = samples[samples.length - 1]; + + if (samples.length === 1 || renderAt <= earliest.receivedAt) { + return { + userId: track.userId, + x: earliest.x, + y: earliest.y, + }; + } + + for (let index = 1; index < samples.length; index += 1) { + const previous = samples[index - 1]; + const next = samples[index]; + if (renderAt > next.receivedAt) { + continue; + } + + const span = next.receivedAt - previous.receivedAt; + const progress = + span <= 0 + ? 1 + : Math.max(0, Math.min(1, (renderAt - previous.receivedAt) / span)); + + return { + userId: track.userId, + x: previous.x + (next.x - previous.x) * progress, + y: previous.y + (next.y - previous.y) * progress, + }; + } + + return { + userId: track.userId, + x: latest.x, + y: latest.y, + }; +} + /** - * Creates an isolated latest-wins store for remote cursor presence. + * Creates an isolated buffered store for remote cursor presence. * * Cursor movement is high-frequency ephemeral state. Keeping it outside the * main Canvas component prevents remote pointer packets from invalidating the - * full scene tree on every update. + * full scene tree on every update, and keeping a short sample history lets the + * overlay render slightly in the past for smoother interpolation. */ export function createCursorPresenceStore(): CursorPresenceStore { - let cursors = new Map(); + let cursors = new Map(); const listeners = new Set(); const emitChange = () => { @@ -28,7 +89,7 @@ export function createCursorPresenceStore(): CursorPresenceStore { } }; - const setSnapshot = (next: Map) => { + const setSnapshot = (next: Map) => { cursors = next; emitChange(); }; @@ -44,13 +105,28 @@ export function createCursorPresenceStore(): CursorPresenceStore { return cursors; }, upsertCursor(cursor) { + const now = getMonotonicNow(); const existing = cursors.get(cursor.userId); - if (existing && existing.x === cursor.x && existing.y === cursor.y) { + const latestSample = existing?.samples[existing.samples.length - 1]; + if (latestSample && latestSample.x === cursor.x && latestSample.y === cursor.y) { return; } const next = new Map(cursors); - next.set(cursor.userId, cursor); + next.set(cursor.userId, { + userId: cursor.userId, + samples: trimSamples( + [ + ...(existing?.samples ?? []), + { + x: cursor.x, + y: cursor.y, + receivedAt: now, + }, + ], + now, + ), + }); setSnapshot(next); }, removeCursor(userId) { @@ -68,17 +144,48 @@ export function createCursorPresenceStore(): CursorPresenceStore { } /** - * Subscribes a cursor-only view to the latest remote cursor snapshot. + * Subscribes a cursor-only view to interpolated remote cursor positions. * - * This keeps the cursor overlay reactive without pulling high-frequency - * presence updates through the Canvas render path. + * Rendering about one packet behind real time gives the overlay two real + * samples to blend between, which feels smoother than snapping to the latest + * jittery arrival time. */ -export function useRemoteCursors( +export function useInterpolatedRemoteCursors( store: CursorPresenceStore, ): ReadonlyMap { - return useSyncExternalStore( + const tracks = useSyncExternalStore( store.subscribe, store.getSnapshot, store.getSnapshot, ); + + const [frameNow, setFrameNow] = useState(() => getMonotonicNow()); + + useEffect(() => { + if (tracks.size === 0) { + return undefined; + } + + let frameId = 0; + const tick = (timestamp: number) => { + setFrameNow(timestamp); + frameId = requestAnimationFrame(tick); + }; + + frameId = requestAnimationFrame(tick); + return () => { + cancelAnimationFrame(frameId); + }; + }, [tracks.size]); + + return useMemo(() => { + const renderAt = frameNow - CURSOR_RENDER_DELAY_MS; + const rendered = new Map(); + + for (const [userId, track] of tracks) { + rendered.set(userId, interpolateTrack(track, renderAt)); + } + + return rendered; + }, [frameNow, tracks]); } diff --git a/frontend/src/canvas/presence/types.ts b/frontend/src/canvas/presence/types.ts index 811b61a..c62a364 100644 --- a/frontend/src/canvas/presence/types.ts +++ b/frontend/src/canvas/presence/types.ts @@ -8,6 +8,17 @@ export interface RemoteCursor { y: number; } +export interface RemoteCursorSample { + x: number; + y: number; + receivedAt: number; +} + +export interface RemoteCursorTrack { + userId: string; + samples: RemoteCursorSample[]; +} + export interface RemoteStroke { points: Array<[number, number]>; thickness: number; diff --git a/frontend/src/shared/events.ts b/frontend/src/shared/events.ts index 7e1ef91..a6c7a75 100644 --- a/frontend/src/shared/events.ts +++ b/frontend/src/shared/events.ts @@ -7,47 +7,109 @@ export interface CanvasPresenceUser { color: string; } -/** - * Outbound events this client can send to the relay server. - * - * The server stays mostly schema-agnostic and fan-outs payloads by canvas room. - */ -export type CanvasOutboundEvent = +export type CanvasStructuralEventType = + | "node_create" + | "node_move" + | "node_update" + | "node_delete" + | "edge_create" + | "edge_delete"; + +export type CanvasOutboundStructuralEvent = + | { type: "node_create"; clientOperationId: string; node: CanvasNode } + | { + type: "node_move"; + clientOperationId: string; + nodeId: string; + x: number; + y: number; + } + | { + type: "node_update"; + clientOperationId: string; + nodeId: string; + patch: Partial; + } + | { type: "node_delete"; clientOperationId: string; nodeId: string } + | { type: "edge_create"; clientOperationId: string; edge: Edge } + | { type: "edge_delete"; clientOperationId: string; edgeId: string }; + +export type CanvasImmediateOutboundEvent = | { type: "cursor_move"; userId: string; x: number; y: number } - | { type: "node_create"; node: CanvasNode } - | { type: "node_move"; nodeId: string; x: number; y: number } | { type: "node_drag_start"; nodeIds: string[] } | { type: "node_drag_end" } - | { type: "node_update"; nodeId: string; patch: Partial } - | { type: "node_delete"; nodeId: string } - | { type: "edge_create"; edge: Edge } - | { type: "edge_delete"; edgeId: string } | { type: "node_select"; userId: string; nodeIds: string[] } | { type: "crdt_op"; docId: string; op: CrdtOp } | { type: "sync_request"; docId: string; stateVector: StateVector } | { type: "draw_points"; points: Array<[number, number]>; thickness: number } | { type: "draw_end" }; +/** + * Outbound events this client can send to the relay server. + * + * Structural graph edits carry a clientOperationId so the sender can reconcile + * optimistic local state with the committed event echoed back by the server. + */ +export type CanvasOutboundEvent = + | CanvasOutboundStructuralEvent + | CanvasImmediateOutboundEvent; + +export interface CanvasBootstrapEvent { + type: "canvas_bootstrap"; + canvasId: string; + headVersion: number; + nodes: CanvasNode[]; + edges: Edge[]; +} + +interface CanvasCommittedStructuralEventBase { + canvasId: string; + userId: string; + version: number; + eventId: string; + clientOperationId: string; +} + +export type CanvasCommittedStructuralEvent = + | (CanvasCommittedStructuralEventBase & { + type: "node_create"; + node: CanvasNode; + }) + | (CanvasCommittedStructuralEventBase & { + type: "node_move"; + nodeId: string; + x: number; + y: number; + }) + | (CanvasCommittedStructuralEventBase & { + type: "node_update"; + nodeId: string; + patch: Partial; + }) + | (CanvasCommittedStructuralEventBase & { + type: "node_delete"; + nodeId: string; + }) + | (CanvasCommittedStructuralEventBase & { + type: "edge_create"; + edge: Edge; + }) + | (CanvasCommittedStructuralEventBase & { + type: "edge_delete"; + edgeId: string; + }); + /** * Inbound events this client may receive from the server. */ export type CanvasInboundEvent = | { type: "presence_snapshot"; users: CanvasPresenceUser[] } + | CanvasBootstrapEvent | { type: "user_join"; user: CanvasPresenceUser } | { type: "cursor_move"; userId: string; x: number; y: number } - | { type: "node_create"; userId: string; node: CanvasNode } - | { type: "node_move"; userId: string; nodeId: string; x: number; y: number } + | CanvasCommittedStructuralEvent | { type: "node_drag_start"; userId: string; nodeIds: string[] } | { type: "node_drag_end"; userId: string } - | { - type: "node_update"; - userId: string; - nodeId: string; - patch: Partial; - } - | { type: "node_delete"; userId: string; nodeId: string } - | { type: "edge_create"; userId: string; edge: Edge } - | { type: "edge_delete"; userId: string; edgeId: string } | { type: "node_select"; userId: string; nodeIds: string[] } | { type: "user_leave"; userId: string } | { type: "crdt_op"; userId: string; docId: string; op: CrdtOp } @@ -61,17 +123,44 @@ export type CanvasInboundEvent = | { type: "draw_end"; userId: string }; export interface CanvasEventHandlers { - onNodeCreate?: (node: CanvasNode) => void; onNodeMove?: (userId: string, nodeId: string, x: number, y: number) => void; onNodeDragStart?: (userId: string, nodeIds: string[]) => void; onNodeDragEnd?: (userId: string) => void; - onNodeUpdate?: (nodeId: string, patch: Partial) => void; - onNodeDelete?: (nodeId: string) => void; - onEdgeCreate?: (edge: Edge) => void; - onEdgeDelete?: (edgeId: string) => void; onNodeSelect?: (userId: string, nodeIds: string[]) => void; onUserJoin?: (user: CanvasPresenceUser) => void; onUserLeave?: (userId: string) => void; onCrdtOp?: (docId: string, op: CrdtOp, senderUserId: string) => void; onSyncResponse?: (docId: string, ops: CrdtOp[]) => void; } + +/** + * Identify durable structural outbound events so the queue can own them. + */ +export function isStructuralOutboundEvent( + event: CanvasOutboundEvent, +): event is CanvasOutboundStructuralEvent { + return ( + event.type === "node_create" || + event.type === "node_move" || + event.type === "node_update" || + event.type === "node_delete" || + event.type === "edge_create" || + event.type === "edge_delete" + ); +} + +/** + * Identify committed structural inbound events emitted after persistence. + */ +export function isCommittedStructuralInboundEvent( + event: CanvasInboundEvent, +): event is CanvasCommittedStructuralEvent { + return ( + event.type === "node_create" || + event.type === "node_move" || + event.type === "node_update" || + event.type === "node_delete" || + event.type === "edge_create" || + event.type === "edge_delete" + ); +} diff --git a/services/collab/src/main/java/com/leetdoodle/collab/handler/CanvasWebSocketHandler.java b/services/collab/src/main/java/com/leetdoodle/collab/handler/CanvasWebSocketHandler.java index ec3e1e2..2ee6b6c 100644 --- a/services/collab/src/main/java/com/leetdoodle/collab/handler/CanvasWebSocketHandler.java +++ b/services/collab/src/main/java/com/leetdoodle/collab/handler/CanvasWebSocketHandler.java @@ -360,6 +360,7 @@ private void handleStructuralEvent(WebSocketSession session, JsonNode root, Stri String committedPayload = objectMapper.writeValueAsString( structuralBroadcastMessage(canvasId, committed) ); + sendToSession(session, new TextMessage(committedPayload)); broadcastStructuralToCanvas(canvasId, session.getId(), committed.version(), committedPayload); } @@ -586,8 +587,8 @@ private void sendCanvasBootstrap(WebSocketSession session, CanvasSnapshotRespons response.put("type", "canvas_bootstrap"); response.put("canvasId", snapshot.canvasId()); response.put("headVersion", snapshot.headVersion()); - response.set("nodes", snapshot.nodes() == null ? objectMapper.createArrayNode() : snapshot.nodes().deepCopy()); - response.set("edges", snapshot.edges() == null ? objectMapper.createArrayNode() : snapshot.edges().deepCopy()); + response.set("nodes", frontendNodesFromDurableSnapshot(snapshot.nodes())); + response.set("edges", frontendEdgesFromDurableSnapshot(snapshot.edges())); sendToSession(session, new TextMessage( Objects.requireNonNull(objectMapper.writeValueAsString(response)) )); @@ -674,7 +675,7 @@ private String structuralOperationType(String eventType) { private JsonNode structuralPayloadFor(String eventType, JsonNode root) { return switch (eventType) { - case "node_create" -> requireField(root, "node", eventType).deepCopy(); + case "node_create" -> durableNodeCreatePayload(requireField(root, "node", eventType), eventType); case "node_move" -> { ObjectNode payload = objectMapper.createObjectNode(); payload.put("nodeId", requireTextField(root, "nodeId", eventType)); @@ -692,7 +693,7 @@ private JsonNode structuralPayloadFor(String eventType, JsonNode root) { payload.put("nodeId", requireTextField(root, "nodeId", eventType)); yield payload; } - case "edge_create" -> requireField(root, "edge", eventType).deepCopy(); + case "edge_create" -> durableEdgeCreatePayload(requireField(root, "edge", eventType), eventType); case "edge_delete" -> { ObjectNode payload = objectMapper.createObjectNode(); payload.put("edgeId", requireTextField(root, "edgeId", eventType)); @@ -719,7 +720,7 @@ private ObjectNode structuralBroadcastMessage(String canvasId, CommittedCanvasOp switch (operationType) { case "NODE_CREATE" -> { outbound.put("type", "node_create"); - outbound.set("node", payload.deepCopy()); + outbound.set("node", frontendNodeFromDurablePayload(payload)); } case "NODE_MOVE" -> { outbound.put("type", "node_move"); @@ -740,7 +741,7 @@ private ObjectNode structuralBroadcastMessage(String canvasId, CommittedCanvasOp } case "EDGE_CREATE" -> { outbound.put("type", "edge_create"); - outbound.set("edge", payload.deepCopy()); + outbound.set("edge", frontendEdgeFromDurablePayload(payload)); } case "EDGE_DELETE" -> { outbound.put("type", "edge_delete"); @@ -755,6 +756,72 @@ private ObjectNode structuralBroadcastMessage(String canvasId, CommittedCanvasOp return outbound; } + private ArrayNode frontendNodesFromDurableSnapshot(JsonNode nodes) { + ArrayNode frontendNodes = objectMapper.createArrayNode(); + if (nodes == null || nodes.isNull() || !nodes.isArray()) { + return frontendNodes; + } + + for (JsonNode node : nodes) { + frontendNodes.add(frontendNodeFromDurablePayload(node)); + } + return frontendNodes; + } + + private ArrayNode frontendEdgesFromDurableSnapshot(JsonNode edges) { + ArrayNode frontendEdges = objectMapper.createArrayNode(); + if (edges == null || edges.isNull() || !edges.isArray()) { + return frontendEdges; + } + + for (JsonNode edge : edges) { + frontendEdges.add(frontendEdgeFromDurablePayload(edge)); + } + return frontendEdges; + } + + private ObjectNode durableNodeCreatePayload(ObjectNode node, String eventType) { + ObjectNode payload = objectMapper.createObjectNode(); + payload.put("nodeId", requireTextField(node, "id", eventType + ".node")); + payload.put("nodeType", requireTextField(node, "type", eventType + ".node")); + payload.put("x", requireNumericField(node, "x", eventType + ".node")); + payload.put("y", requireNumericField(node, "y", eventType + ".node")); + payload.put("width", requireNumericField(node, "width", eventType + ".node")); + payload.put("height", requireNumericField(node, "height", eventType + ".node")); + JsonNode data = node.get("data"); + payload.set("data", data == null || data.isNull() ? objectMapper.createObjectNode() : data.deepCopy()); + return payload; + } + + private ObjectNode durableEdgeCreatePayload(ObjectNode edge, String eventType) { + ObjectNode payload = objectMapper.createObjectNode(); + payload.put("edgeId", requireTextField(edge, "id", eventType + ".edge")); + payload.put("fromNodeId", requireTextField(edge, "fromNodeId", eventType + ".edge")); + payload.put("toNodeId", requireTextField(edge, "toNodeId", eventType + ".edge")); + return payload; + } + + private ObjectNode frontendNodeFromDurablePayload(JsonNode payload) { + ObjectNode node = objectMapper.createObjectNode(); + node.put("id", requireTextField(payload, "nodeId", "durable_node")); + node.put("type", requireTextField(payload, "nodeType", "durable_node")); + node.put("x", requireNumericField(payload, "x", "durable_node")); + node.put("y", requireNumericField(payload, "y", "durable_node")); + node.put("width", requireNumericField(payload, "width", "durable_node")); + node.put("height", requireNumericField(payload, "height", "durable_node")); + JsonNode data = payload.get("data"); + node.set("data", data == null || data.isNull() ? objectMapper.createObjectNode() : data.deepCopy()); + return node; + } + + private ObjectNode frontendEdgeFromDurablePayload(JsonNode payload) { + ObjectNode edge = objectMapper.createObjectNode(); + edge.put("id", requireTextField(payload, "edgeId", "durable_edge")); + edge.put("fromNodeId", requireTextField(payload, "fromNodeId", "durable_edge")); + edge.put("toNodeId", requireTextField(payload, "toNodeId", "durable_edge")); + return edge; + } + private ObjectNode requireField(JsonNode node, String field, String eventType) { JsonNode value = node.get(field); if (value == null || value.isNull() || !value.isObject()) {