diff --git a/frontend/components/nodes/filter-node/filter-node.tsx b/frontend/components/nodes/filter-node/filter-node.tsx index a02a02d..a11075a 100644 --- a/frontend/components/nodes/filter-node/filter-node.tsx +++ b/frontend/components/nodes/filter-node/filter-node.tsx @@ -1,6 +1,10 @@ 'use client'; import { useGlobalContext } from '@/context/GlobalContext'; import { ProcessingConfig } from '@/lib/processing'; + +const dispatchProcessingConfig = (config: ProcessingConfig) => { + window.dispatchEvent(new CustomEvent('processing-config-update', { detail: config })); +}; import { Handle, Position, useReactFlow } from '@xyflow/react'; import React from 'react'; import ComboBox from './combo-box'; @@ -19,8 +23,7 @@ export default function FilterNode({ id }: FilterNodeProps) { // Get React Flow instance const reactFlowInstance = useReactFlow(); - // Get data stream status from global context - const { dataStreaming, sendProcessingConfig } = useGlobalContext() + const { dataStreaming } = useGlobalContext(); const buildConfig = (): ProcessingConfig => { if (!isConnected) { @@ -136,11 +139,11 @@ export default function FilterNode({ id }: FilterNodeProps) { React.useEffect(() => { if (!dataStreaming) return - sendProcessingConfig(buildConfig()) + dispatchProcessingConfig(buildConfig()) }, [selectedFilter, lowCutoff, highCutoff, isConnected, dataStreaming]) React.useEffect(() => { - sendProcessingConfig(buildConfig()); + dispatchProcessingConfig(buildConfig()); }, []); return ( diff --git a/frontend/components/nodes/signal-graph-node/signal-graph-node.tsx b/frontend/components/nodes/signal-graph-node/signal-graph-node.tsx index e00ea5e..8c85d08 100644 --- a/frontend/components/nodes/signal-graph-node/signal-graph-node.tsx +++ b/frontend/components/nodes/signal-graph-node/signal-graph-node.tsx @@ -1,7 +1,7 @@ import { Card } from '@/components/ui/card'; import { Handle, Position, useReactFlow } from '@xyflow/react'; -// import useWebsocket from '@/hooks/useWebsocket'; import { useGlobalContext } from '@/context/GlobalContext'; +import useNodeData from '@/hooks/useNodeData'; import { ArrowUpRight } from 'lucide-react'; import React from 'react'; @@ -16,9 +16,8 @@ import { import SignalGraphView from './signal-graph-full'; export default function SignalGraphNode({ id }: { id?: string }) { - // const { renderData } = useWebsocket(20, 10); - - const { renderData, dataStreaming } = useGlobalContext(); + const { dataStreaming } = useGlobalContext(); + const { renderData } = useNodeData(20, 10); const processedData = renderData; const reactFlowInstance = useReactFlow(); diff --git a/frontend/components/nodes/window-node/window-combo-box.tsx b/frontend/components/nodes/window-node/window-combo-box.tsx index 791995f..a426a68 100644 --- a/frontend/components/nodes/window-node/window-combo-box.tsx +++ b/frontend/components/nodes/window-node/window-combo-box.tsx @@ -2,7 +2,7 @@ import * as React from 'react'; import { ChevronUp } from 'lucide-react'; import { cn } from '@/lib/utils'; -export type WindowOption = 'default' | 'preset' | 'custom'; +export type WindowOption = 'default' | 'custom'; interface ComboBoxProps { windowSize: number; @@ -17,8 +17,6 @@ interface ComboBoxProps { const presetWindows: Array<{ value: WindowOption; label: string; size?: number }> = [ { value: 'default', label: 'Default (64)', size: 64 }, - { value: 'preset', label: 'Preset A (4)', size: 4 }, - { value: 'preset', label: 'Preset B (6)', size: 6 }, { value: 'custom', label: 'Custom' }, ]; @@ -33,48 +31,55 @@ export default function ComboBox({ isDataStreamOn = false, }: ComboBoxProps) { const [isExpanded, setIsExpanded] = React.useState(false); + const [step, setStep] = React.useState<'window' | 'overlap'>('window'); const [customWindowInput, setCustomWindowInput] = React.useState(''); - const[customOverlapInput, setCustomOverlapInput] = React.useState(String(overlapSize)); + const [customOverlapInput, setCustomOverlapInput] = React.useState(''); const [windowError, setWindowError] = React.useState(''); const [overlapError, setOverlapError] = React.useState(''); - - React.useEffect(() => { - setCustomOverlapInput(String(overlapSize)); - }, [overlapSize]); + const [overlapOption, setOverlapOption] = React.useState<'default' | 'custom'>('default'); const toggleExpanded = () => { + if (!isExpanded) { + setStep('window'); + setWindowError(''); + setOverlapError(''); + } setIsExpanded(!isExpanded); }; + const confirmWindow = () => { + setStep('overlap'); + }; + + const confirmOverlap = () => { + setIsExpanded(false); + setStep('window'); + }; + const handlePresetSelect = (optionValue: WindowOption, size?: number) => { setSelectedOption(optionValue); - if(typeof size == 'number'){ + setWindowError(''); + if (typeof size === 'number') { setWindowSize(size); - if(overlapSize >= size){ - setOverlapSize(Math.max(0,size-1)); + if (overlapSize >= size) { + setOverlapSize(0); + setOverlapOption('default'); } } - if(optionValue !== 'custom'){ - setWindowError(''); - setOverlapError(''); - setIsExpanded(false); - setTimeout(() => { - setIsExpanded(false); - }, 100); - return; + if (optionValue === 'custom') { + setCustomWindowInput(''); + } else { + confirmWindow(); } - setCustomWindowInput(''); - setWindowError(''); - }; const submitCustomWindow = () => { const parsed = Number(customWindowInput); - if(!Number.isInteger(parsed) || parsed <= 0){ + if (!Number.isInteger(parsed) || parsed <= 0) { setWindowError('Window size must be a positive integer'); return; - } - if(overlapSize >= parsed){ + } + if (overlapSize >= parsed) { setWindowError('Window size must be greater than overlap size'); return; } @@ -82,43 +87,32 @@ export default function ComboBox({ setWindowSize(parsed); setWindowError(''); setOverlapError(''); - setIsExpanded(false); - - setTimeout(() => { - setIsExpanded(false); - }, 100); + confirmWindow(); }; const submitCustomOverlap = () => { const parsed = Number(customOverlapInput); - if(!Number.isInteger(parsed) || parsed < 0){ + if (!Number.isInteger(parsed) || parsed < 0) { setOverlapError('Overlap size must be a non-negative integer'); return; - } - if(parsed >= windowSize){ + } + if (parsed >= windowSize) { setOverlapError('Overlap size must be less than window size'); return; } setOverlapSize(parsed); setOverlapError(''); setWindowError(''); - - setTimeout(() => { - setIsExpanded(false); - }, 100); + confirmOverlap(); }; return (
{/* Main button/header */} - {/* Base Header */} + {/* Collapsed summary */} {!isExpanded && (
-
+
Size:{' '} {windowSize}
-
+
Overlap Size:{' '} {overlapSize} @@ -201,77 +173,105 @@ export default function ComboBox({
)} - {/* Expandable options section */} + {/* Expandable section */}
-
-
Input size
- {presetWindows.map((preset) => ( - + ))} + {selectedOption === 'custom' && ( +
+ { + setCustomWindowInput(e.target.value.replace(/[^\d]/g, '')); + setWindowError(''); + }} + placeholder="Custom integer" + className="nodrag h-8 w-full rounded-md border border-gray-300 px-2 text-sm" + /> + +
)} - > - {preset.label} - - ))} + {selectedOption === 'custom' && windowError && ( +
{windowError}
+ )} + + )} - {selectedOption === 'custom' && ( -
- { - setCustomWindowInput(e.target.value.replace(/[^\d]/g, '')); // Enfore integer-only input by stripping non-digits. - setWindowError(''); - }} - placeholder="Custom integer" - className="h-8 w-full rounded-md border border-gray-300 px-2 text-sm" - /> + {/* Step 2: Overlap size */} + {step === 'overlap' && ( + <> +
Overlap size
-
- )} - {selectedOption === 'custom' && windowError && ( -
{windowError}
- )} - -
-
Overlap size
-
- { - setCustomOverlapInput(e.target.value.replace(/[^\d]/g, '')); - setOverlapError(''); - }} - className="h-8 w-full rounded-md border border-gray-300 px-2 text-sm" - /> -
-
- {overlapError &&
{overlapError}
} + {overlapOption === 'custom' && ( +
+ { + setCustomOverlapInput(e.target.value.replace(/[^\d]/g, '')); + setOverlapError(''); + }} + placeholder="Custom integer" + className="nodrag h-8 w-full rounded-md border border-gray-300 px-2 text-sm" + /> + +
+ )} + {overlapError &&
{overlapError}
} + + )}
-
+
); } diff --git a/frontend/components/nodes/window-node/window-node.tsx b/frontend/components/nodes/window-node/window-node.tsx index 1d3151e..a84cc27 100644 --- a/frontend/components/nodes/window-node/window-node.tsx +++ b/frontend/components/nodes/window-node/window-node.tsx @@ -3,7 +3,7 @@ import { useGlobalContext } from '@/context/GlobalContext'; import { Handle, Position, useReactFlow } from '@xyflow/react'; import React from 'react'; import WindowComboBox, { type WindowOption } from './window-combo-box'; -// import useWebsocket from '@/hooks/useWebsocket'; +import { WindowingConfig } from '@/lib/processing'; interface WindowNodeProps { id?: string; @@ -23,12 +23,13 @@ export default function WindowNode({ id }: WindowNodeProps) { // Get React Flow instance const reactFlowInstance = useReactFlow(); - // Get data stream status from global context - const { dataStreaming, sendWindowingConfig } = useGlobalContext(); + const { dataStreaming } = useGlobalContext(); - // const { sendWindowingConfig } = useWebsocket(0, 0); + const dispatchWindowingConfig = (config: WindowingConfig) => { + window.dispatchEvent(new CustomEvent('windowing-config-update', { detail: config })); + }; - const buildConfig = () => ({ + const buildConfig = (): WindowingConfig => ({ chunk_size: windowSize, overlap_size: overlapSize, }); @@ -81,7 +82,7 @@ export default function WindowNode({ id }: WindowNodeProps) { overlapSize < windowSize; React.useEffect(() => { - sendWindowingConfig(buildConfig()); + dispatchWindowingConfig(buildConfig()); }, []); // Check connection status on mount and when edges might change @@ -105,9 +106,8 @@ export default function WindowNode({ id }: WindowNodeProps) { }, [checkConnectionStatus]); React.useEffect(() => { - if (!dataStreaming) return; if(!isValidConfig) return; - sendWindowingConfig(buildConfig()); + dispatchWindowingConfig(buildConfig()); }, [windowSize, overlapSize, selectedOption, isConnected, dataStreaming]); return ( diff --git a/frontend/context/GlobalContext.tsx b/frontend/context/GlobalContext.tsx index 4af5cc5..6533812 100644 --- a/frontend/context/GlobalContext.tsx +++ b/frontend/context/GlobalContext.tsx @@ -5,34 +5,23 @@ import React, { useState } from 'react'; -import useWebsocket from '@/hooks/useWebsocket'; -import { ProcessingConfig, WindowingConfig } from '@/lib/processing'; - type GlobalContextType = { dataStreaming: boolean; setDataStreaming: React.Dispatch>; activeSessionId: number | null; setActiveSessionId: React.Dispatch>; - renderData: any[]; - sendProcessingConfig: (config: ProcessingConfig) => void; - sendWindowingConfig: (config: WindowingConfig) => void; }; const GlobalContext = createContext(undefined); export const GlobalProvider = ({ children }: { children: ReactNode }) => { const [dataStreaming, setDataStreaming] = useState(false); - - const { renderData, sendProcessingConfig, sendWindowingConfig } = useWebsocket(20, 10, dataStreaming); const [activeSessionId, setActiveSessionId] = useState(null); return ( diff --git a/frontend/context/WebSocketContext.tsx b/frontend/context/WebSocketContext.tsx index 7e09eb4..1045179 100644 --- a/frontend/context/WebSocketContext.tsx +++ b/frontend/context/WebSocketContext.tsx @@ -2,7 +2,7 @@ import React, { createContext, useContext, useEffect, useRef, useCallback, ReactNode } from 'react'; import { useGlobalContext } from './GlobalContext'; -import { ProcessingConfig } from '@/lib/processing'; +import { ProcessingConfig, WindowingConfig } from '@/lib/processing'; export type DataPoint = { time: string; @@ -17,6 +17,7 @@ type Subscriber = (points: DataPoint[]) => void; type WebSocketContextType = { subscribe: (fn: Subscriber) => () => void; sendProcessingConfig: (config: ProcessingConfig) => void; + sendWindowingConfig: (config: WindowingConfig) => void; }; const WebSocketContext = createContext(undefined); @@ -52,6 +53,7 @@ export function WebSocketProvider({ children }: { children: ReactNode }) { const { dataStreaming } = useGlobalContext(); const wsRef = useRef(null); const processingConfigRef = useRef(null); + const windowingConfigRef = useRef(null); const subscribersRef = useRef>(new Set()); const closingTimeoutRef = useRef(null); const isClosingGracefullyRef = useRef(false); @@ -69,14 +71,29 @@ export function WebSocketProvider({ children }: { children: ReactNode }) { } }, []); - // Forward processing-config-update events from filter node to backend + const sendWindowingConfig = useCallback((config: WindowingConfig) => { + windowingConfigRef.current = config; + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(JSON.stringify(config)); + console.log('Sent windowing config:', config); + } + }, []); + + // Forward config events from nodes to backend useEffect(() => { - const handler = (event: Event) => { + const processingHandler = (event: Event) => { sendProcessingConfig((event as CustomEvent).detail); }; - window.addEventListener('processing-config-update', handler); - return () => window.removeEventListener('processing-config-update', handler); - }, [sendProcessingConfig]); + const windowingHandler = (event: Event) => { + sendWindowingConfig((event as CustomEvent).detail); + }; + window.addEventListener('processing-config-update', processingHandler); + window.addEventListener('windowing-config-update', windowingHandler); + return () => { + window.removeEventListener('processing-config-update', processingHandler); + window.removeEventListener('windowing-config-update', windowingHandler); + }; + }, [sendProcessingConfig, sendWindowingConfig]); // Manage WebSocket lifecycle useEffect(() => { @@ -102,6 +119,9 @@ export function WebSocketProvider({ children }: { children: ReactNode }) { ws.onopen = () => { console.log('WebSocket connection opened.'); ws.send(JSON.stringify(processingConfigRef.current ?? DEFAULT_PROCESSING_CONFIG)); + if (windowingConfigRef.current) { + ws.send(JSON.stringify(windowingConfigRef.current)); + } }; ws.onmessage = (event) => { @@ -145,7 +165,7 @@ export function WebSocketProvider({ children }: { children: ReactNode }) { }, [dataStreaming]); return ( - + {children} ); diff --git a/frontend/hooks/useWebsocket.tsx b/frontend/hooks/useWebsocket.tsx deleted file mode 100644 index 5b8531d..0000000 --- a/frontend/hooks/useWebsocket.tsx +++ /dev/null @@ -1,165 +0,0 @@ -import { useEffect, useRef, useState } from 'react'; -// import { useGlobalContext } from '@/context/GlobalContext'; -import { ProcessingConfig, WindowingConfig } from '@/lib/processing'; - -export default function useWebsocket( - chartSize: number, - batchesPerSecond: number, - dataStreaming: boolean -) { -// type WsConfigMessage = -// | { type: 'processing'; payload: ProcessingConfig } -// | { type: 'windowing'; payload: WindowingConfig }; - - // const { dataStreaming } = useGlobalContext(); - const [renderData, setRenderData] = useState([]); - const bufferRef = useRef([]); - const wsRef = useRef(null); - const closingTimeoutRef = useRef(null); - const [isClosingGracefully, setIsClosingGracefully] = useState(false); - const processingConfigRef = useRef(null); - const windowingConfigRef = useRef(null); - - const intervalTime = 1000 / batchesPerSecond; - - const sendProcessingConfig = (config: ProcessingConfig) => { - processingConfigRef.current = config - - if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { - wsRef.current.send(JSON.stringify(config)) - console.log('Sent processing config:', config) - } - } - - const sendWindowingConfig = (config: WindowingConfig) => { - windowingConfigRef.current = config - - if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { - wsRef.current.send(JSON.stringify(config)) - console.log('Sent windowing config:', config) - } - } - - useEffect(() => { - console.log('data streaming:', dataStreaming); - - if (!dataStreaming && wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { - if (!isClosingGracefully) { - console.log("Initiating graceful close..."); - setIsClosingGracefully(true); - wsRef.current.send('clientClosing'); - - closingTimeoutRef.current = setTimeout(() => { - console.warn("Timeout: No 'confirmed closing' received. Forcing WebSocket close."); - if (wsRef.current) { - wsRef.current.close(); - } - setIsClosingGracefully(false); - }, 5000); - } - return; - } - - if (!dataStreaming && (!wsRef.current || wsRef.current.readyState === WebSocket.CLOSED)) { - return; - } - - if (dataStreaming && (!wsRef.current || wsRef.current.readyState === WebSocket.CLOSED)) { - console.log("Opening new WebSocket connection..."); - const ws = new WebSocket('ws://localhost:8080'); - wsRef.current = ws; - - ws.onopen = () => { - console.log('WebSocket connection opened.'); - - if (processingConfigRef.current) { - ws.send(JSON.stringify(processingConfigRef.current)) - } - if (windowingConfigRef.current) { - ws.send(JSON.stringify(windowingConfigRef.current)) - } - }; - - ws.onmessage = (event) => { - const message = event.data; - if (message === 'confirmed closing') { - console.log("Received 'confirmed closing' from server. Proceeding to close."); - if (closingTimeoutRef.current) { - clearTimeout(closingTimeoutRef.current); - } - if (wsRef.current) { - wsRef.current.close(); - } - setIsClosingGracefully(false); - } else { - try { - const parsedData = JSON.parse(message); - bufferRef.current.push(parsedData); - } catch (e) { - console.error("Failed to parse non-confirmation message as JSON:", e, message); - } - } - }; - - ws.onclose = (event) => { - console.log('WebSocket connection closed:', event.code, event.reason); - wsRef.current = null; - setIsClosingGracefully(false); - }; - - ws.onerror = (error) => { - console.error('WebSocket error:', error); - if (closingTimeoutRef.current) { - clearTimeout(closingTimeoutRef.current); - } - setIsClosingGracefully(false); - }; - } - - const updateRenderData = () => { - if (bufferRef.current.length > 0) { - const nextBatch = bufferRef.current.splice( - 0, - Math.min(bufferRef.current.length, chartSize) - ); - setRenderData((prevData) => - [...(Array.isArray(prevData) ? prevData : []), ...nextBatch].slice(-chartSize) - ); - } - }; - - let intervalId: NodeJS.Timeout | null = null; - if (dataStreaming) { - intervalId = setInterval(updateRenderData, intervalTime); - } - - return () => { - console.log("Cleanup function running."); - if (intervalId) { - clearInterval(intervalId); - } - - if (closingTimeoutRef.current) { - clearTimeout(closingTimeoutRef.current); - } - - if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN && !isClosingGracefully) { - console.log("Component unmounting or dependencies changed: Initiating graceful close during cleanup."); - wsRef.current.send('clientClosing'); - closingTimeoutRef.current = setTimeout(() => { - console.warn("Timeout: No 'confirmed closing' received during cleanup. Forcing WebSocket close."); - if (wsRef.current) { - wsRef.current.close(); - } - }, 5000); - } else if (wsRef.current && wsRef.current.readyState !== WebSocket.CLOSED) { - console.log("Forcing immediate WebSocket close during cleanup."); - wsRef.current.close(); - } - wsRef.current = null; - setIsClosingGracefully(false); - }; - }, [chartSize, batchesPerSecond, dataStreaming, isClosingGracefully]); - - return { renderData, sendProcessingConfig, sendWindowingConfig }; -} \ No newline at end of file