diff --git a/apps/electron-app/package.json b/apps/electron-app/package.json index 4f487bad..89e42468 100644 --- a/apps/electron-app/package.json +++ b/apps/electron-app/package.json @@ -6,7 +6,7 @@ "type": "git", "url": "https://github.com/xiduzo/microflow" }, - "version": "0.9.0", + "version": "0.9.1", "description": "An application which allows you to create flow-based logic for microcontrollers", "author": { "name": "Sander Boer", diff --git a/apps/electron-app/src/app.tsx b/apps/electron-app/src/app.tsx index 023ae4ae..a20606f9 100644 --- a/apps/electron-app/src/app.tsx +++ b/apps/electron-app/src/app.tsx @@ -1,13 +1,7 @@ -import { - FigmaVariable, - MqttConfig, - useFigmaStore, - useMqttStore, -} from '@microflow/mqtt-provider/client'; +import { FigmaVariable, useFigmaStore, useMqttStore } from '@microflow/mqtt-provider/client'; import { Toaster, TooltipProvider } from '@microflow/ui'; import { ReactFlowProvider } from '@xyflow/react'; import { createRoot } from 'react-dom/client'; -import { useDarkMode, useLocalStorage } from 'usehooks-ts'; import { IpcDeepLinkListener } from './render/components/IpcDeepLinkListener'; import { IpcMenuListeners } from './render/components/IpcMenuListener'; import { ReactFlowCanvas } from './render/components/react-flow/ReactFlowCanvas'; @@ -19,6 +13,7 @@ import { StrictMode, useEffect, useMemo } from 'react'; import { useAppStore } from './render/stores/app'; import { Settings } from './render/components/Settings'; import logger from 'electron-log/renderer'; +import { useDarkMode } from 'usehooks-ts'; export function App() { return ( diff --git a/apps/electron-app/src/main/board-connection.ts b/apps/electron-app/src/main/board-connection.ts index d230aa74..df26643a 100644 --- a/apps/electron-app/src/main/board-connection.ts +++ b/apps/electron-app/src/main/board-connection.ts @@ -19,6 +19,7 @@ import { getConnectedPort, setConnectedPort, getKnownBoardsWithPorts, + acquirePortOperationLock, } from './port-manager'; import { Timer } from './utils'; @@ -28,6 +29,7 @@ const ipRegex = new RegExp( let runnerProcess: ChildProcess | undefined; let lastUsedPinsHash: string | null = null; +let lastFlow: { nodes: Node[]; edges: Edge[]; ip?: string } | null = null; /** * Gets the current runner process @@ -90,6 +92,9 @@ async function didPinsChange(nodes: Node[]) { } export async function ensureRunnerProcess(nodes: Node[], edges: Edge[], ip?: string) { + // Store the flow for later use (e.g., after flashing) + lastFlow = { nodes, edges, ip }; + if (!runnerProcess) return startRunnerProcess(ip); if (await didPinsChange(nodes)) { @@ -102,6 +107,67 @@ export async function ensureRunnerProcess(nodes: Node[], edges: Edge[], ip?: str } } +/** + * Sends the flow to the runner process + * This is the shared logic used by both the IPC handler and the ready handler + */ +export async function sendFlowToRunner(nodes: Node[], edges: Edge[], timer?: Timer) { + const flowTimer = timer || new Timer(); + const runnerProcess = getRunnerProcess(); + + log.debug( + '[FLOW] ', + runnerProcess?.pid, + JSON.stringify(nodes, null, 2), + JSON.stringify(edges, null, 2), + flowTimer.duration + ); + + if (!runnerProcess) return; + + runnerProcess.send({ type: 'flow', nodes, edges }); +} + +/** + * Gets the current connection state + * Returns the board state if connected, or null if not connected + */ +export function getCurrentConnectionState(): Board | null { + const connectedPort = getConnectedPort(); + const runnerProcess = getRunnerProcess(); + + // If we have a connected port and runner process, verify they're still valid + if (connectedPort && runnerProcess) { + // Check if runner process is still alive + if (!runnerProcess.killed && runnerProcess.exitCode === null) { + // Runner is still running, return connect state + log.debug('[STATE] ', 'Port and runner process exist', connectedPort.path); + return { + type: 'ready', + port: connectedPort.path, + message: 'Board connected', + }; + } else { + // Runner process died, clear the connection + log.debug('[STATE] ', 'Runner process died, clearing connection'); + setConnectedPort(undefined); + return null; + } + } else if (connectedPort && !runnerProcess) { + // Port is set but no runner process - might need to reconnect + log.debug('[STATE] ', 'Port exists but no runner process', connectedPort.path); + return { + type: 'ready', + port: connectedPort.path, + message: 'Reconnecting to board', + }; + } else { + // No connection state + log.debug('[STATE] ', 'No connection state'); + return null; + } +} + export async function startRunnerProcess(ip?: string) { const timer = new Timer(); @@ -171,6 +237,9 @@ export async function startRunnerProcess(ip?: string) { async function checkBoardOnPort(port: Pick, board: BoardName) { await killRunnerProcess(); + // Acquire lock to prevent port polling during connection attempt + const releaseLock = acquirePortOperationLock(); + const timer = new Timer(); const filePath = join(__dirname, 'workers', 'runner.js'); @@ -181,6 +250,76 @@ async function checkBoardOnPort(port: Pick, board: BoardName) stdio: 'pipe', }); + let isResolved = false; + let portCheckInterval: NodeJS.Timeout | null = null; + + // Helper function to clean up connection-related resources (but keep message handler active) + const cleanupConnectionResources = () => { + if (portCheckInterval) { + clearInterval(portCheckInterval); + portCheckInterval = null; + } + if (runnerProcess) { + // Only remove exit/error handlers that are specific to connection phase + // Keep message handler active for runtime messages (it handles both connection and runtime) + runnerProcess.off('exit', handleExit); + runnerProcess.off('error', handleError); + } + // Release the lock when cleanup is called + releaseLock(); + }; + + // Full cleanup - removes all handlers (used when process is killed or on fatal errors) + const fullCleanup = () => { + cleanupConnectionResources(); + if (runnerProcess) { + // Remove message handler only on fatal errors/kill + runnerProcess.off('message', handleMessage); + } + }; + + const rejectWithCleanup = (error: Error) => { + if (isResolved) return; + isResolved = true; + fullCleanup(); + reject(error); + }; + + const resolveWithCleanup = (value: any) => { + if (isResolved) return; + isResolved = true; + // On successful connection, only clean up connection resources + // Keep message handler active - it handles both connection and runtime messages + cleanupConnectionResources(); + resolve(value); + }; + + // Periodically check if the port still exists while waiting for connection + const checkPortExists = async () => { + if (isResolved) return; + + try { + const ports = await getConnectedPorts(); + const portStillExists = ports.find(p => p.path === port.path); + + if (!portStillExists) { + log.warn('[RUNNER] ', port.path, timer.duration); + rejectWithCleanup( + new PortDisconnectedError( + port.path, + `Port ${port.path} disconnected during connection attempt` + ) + ); + } + } catch (error) { + log.warn('[RUNNER] ', error); + // Don't reject on check error, just log it + } + }; + + // Start periodic port checking (every 500ms) + portCheckInterval = setInterval(checkPortExists, 500); + runnerProcess.on('spawn', () => { log.debug('[RUNNER] ', runnerProcess?.pid, timer.duration); }); @@ -194,9 +333,39 @@ async function checkBoardOnPort(port: Pick, board: BoardName) }); runnerProcess.stdout?.on('data', async data => { - log.debug('[RUNNER] ', runnerProcess?.pid, timer.duration, data.toString()); + // log.debug('[RUNNER] ', runnerProcess?.pid, timer.duration, data.toString()); }); + // Handle runner process exit (might happen if port disconnects) + const handleExit = async (code: number | null, signal: string | null) => { + if (isResolved) return; + + log.warn('[RUNNER] ', runnerProcess?.pid, code, signal, timer.duration); + + // Check if port still exists when process exits unexpectedly + try { + await checkPortExists(); + // If port still exists, it was a different error + if (!isResolved) { + rejectWithCleanup( + new Error(`Runner process exited unexpectedly (code: ${code}, signal: ${signal})`) + ); + } + } catch (error) { + // Port check already rejected, nothing to do + } + }; + + // Handle runner process errors + const handleError = (error: Error) => { + if (isResolved) return; + log.warn('[RUNNER] ', runnerProcess?.pid, error, timer.duration); + rejectWithCleanup(error); + }; + + runnerProcess.on('exit', handleExit); + runnerProcess.on('error', handleError); + async function handleMessage(data: Board | UploadedCodeMessage) { // log.debug('[RUNNER] ', runnerProcess?.pid, data.type, timer.duration); try { @@ -212,7 +381,7 @@ async function checkBoardOnPort(port: Pick, board: BoardName) let notificationTimeout: NodeJS.Timeout | null = null; try { if (ipRegex.test(port.path)) { - return reject(new Error(data.message ?? 'Unknown error')); + return rejectWithCleanup(new Error(data.message ?? 'Unknown error')); } // Prevents double error messages from causing multiple flashers @@ -228,15 +397,24 @@ async function checkBoardOnPort(port: Pick, board: BoardName) } satisfies IpcResponse); }, 7500); await flashFirmataToBoard(board, port); - return checkBoardOnPort(port, board); + // Recursively call checkBoardOnPort and chain the result to this Promise + checkBoardOnPort(port, board) + .then(result => resolveWithCleanup(result)) + .catch(error => rejectWithCleanup(error)); + return; // Exit early, Promise will be resolved/rejected by the recursive call } catch (error) { + // Re-register handler in case process is still running after error + // This prevents unhandled messages if flashing fails or recursive call rejects + if (runnerProcess && !runnerProcess.killed && runnerProcess.exitCode === null) { + runnerProcess.on('message', handleMessage); + } try { await checkPortError(error, port.path, 'flashing'); // Port still exists or not a port error - reject with original error - reject(error); + rejectWithCleanup(error as Error); } catch (portError) { // Port disconnected or already PortDisconnectedError - reject with port error - reject(portError); + rejectWithCleanup(portError as Error); } } finally { if (notificationTimeout) clearTimeout(notificationTimeout); @@ -246,7 +424,7 @@ async function checkBoardOnPort(port: Pick, board: BoardName) case 'exit': case 'fail': log.warn(`[RUNNER] <${data.type}>`, runnerProcess?.pid, data.message, timer.duration); - reject(new Error(data.message ?? 'Unknown error')); + rejectWithCleanup(new Error(data.message ?? 'Unknown error')); break; case 'ready': log.debug(`[RUNNER] <${data.type}>`, runnerProcess?.pid, timer.duration); @@ -254,15 +432,63 @@ async function checkBoardOnPort(port: Pick, board: BoardName) success: true, data: { type: 'ready', port: port.path, pins: data.pins }, }); - resolve(null); + resolveWithCleanup(null); + sendFlowToRunner(lastFlow?.nodes ?? [], lastFlow?.edges ?? [], timer); break; } } catch (e) { - reject(e); + rejectWithCleanup(e as Error); } } - runnerProcess?.on('message', handleMessage); + // Register connection-phase message handler + runnerProcess.on('message', handleMessage); + }); +} + +/** + * Waits for a port to appear in the list of connected ports + * @param portPath The path of the port to wait for + * @param timeoutMs Maximum time to wait in milliseconds (default: 10000) + * @param checkIntervalMs How often to check in milliseconds (default: 500) + * @returns Promise that resolves when the port is found, or rejects on timeout + */ +async function waitForPortToReappear( + portPath: string, + timeoutMs: number = 10000, + checkIntervalMs: number = 500 +): Promise { + const startTime = Date.now(); + + return new Promise((resolve, reject) => { + const checkPort = async () => { + try { + const ports = await getConnectedPorts(); + const portFound = ports.find(p => p.path === portPath); + + if (portFound) { + log.debug('[FLASH] ', portPath, Date.now() - startTime); + resolve(); + return; + } + + // Check if we've exceeded the timeout + if (Date.now() - startTime >= timeoutMs) { + reject( + new Error(`Port ${portPath} did not reappear within ${timeoutMs}ms after flashing`) + ); + return; + } + + // Schedule next check + setTimeout(checkPort, checkIntervalMs); + } catch (error) { + reject(error); + } + }; + + // Start checking immediately + checkPort(); }); } @@ -279,22 +505,128 @@ async function flashFirmataToBoard(board: BoardName, port: Pick', firmataPath, board, port.path, flashTimer.duration); + + // Acquire lock to prevent port polling during flashing + const releaseLock = acquirePortOperationLock(); + + // Store the port path before flashing (it may disappear during flashing) + const portPath = port.path; + const portDisappearedDuringFlash = { value: false }; + return new Promise(async (resolve, reject) => { + let portCheckInterval: NodeJS.Timeout | null = null; + let flashTimeout: NodeJS.Timeout | null = null; + + const cleanup = () => { + if (flashTimeout) { + clearTimeout(flashTimeout); + flashTimeout = null; + } + if (portCheckInterval) { + clearInterval(portCheckInterval); + portCheckInterval = null; + } + // Release the lock when cleanup is called + releaseLock(); + }; + + // Set up a timeout to prevent freezing if flashing takes too long + flashTimeout = setTimeout(() => { + log.error('[FLASH] ', portPath, flashTimer.duration); + cleanup(); + reject(new Error(`Flashing timed out after 60 seconds for port ${portPath}`)); + }, 60000); // 60 second timeout + + // Monitor port status during flashing (it's expected to disappear) + portCheckInterval = setInterval(async () => { + try { + const ports = await getConnectedPorts(); + const portStillExists = ports.find(p => p.path === portPath); + if (!portStillExists && !portDisappearedDuringFlash.value) { + log.debug('[FLASH] ', portPath, flashTimer.duration); + portDisappearedDuringFlash.value = true; + } + } catch (error) { + // Ignore check errors during flashing + } + }, 500); + try { log.debug(`[FLASH] `, flashTimer.duration); - await new Flasher(board, port.path).flash(firmataPath); + + // Flash the board (port may disappear during this, which is expected) + await new Flasher(board, portPath).flash(firmataPath); + log.debug('[FLASH] ', flashTimer.duration); + + // After flashing, the board will disconnect and reconnect + // Wait for the port to reappear (up to 10 seconds) + if (portDisappearedDuringFlash.value) { + log.debug('[FLASH] ', portPath, flashTimer.duration); + try { + await waitForPortToReappear(portPath, 10000, 500); + log.debug('[FLASH] ', portPath, flashTimer.duration); + } catch (waitError) { + log.warn('[FLASH] ', portPath, waitError); + // Don't fail if port doesn't reappear - it might come back later + // The connection loop will handle it + } + } + + // Release lock after all operations complete + cleanup(); resolve(null); } catch (flashError) { log.error('[FLASH] ', flashError, flashTimer.duration); - try { - await checkPortError(flashError, port.path, 'flashing'); - // Port still exists but couldn't open - preserve original error - reject(flashError); - } catch (portError) { - // Port disconnected or already PortDisconnectedError - reject with port error - reject(portError); + // Check if the error is port-related + const isPortRelatedError = + flashError instanceof PortDisconnectedError || + flashError instanceof UnableToOpenSerialConnection || + (flashError instanceof Error && + (flashError.message.includes('No such file or directory') || + flashError.message.includes('cannot open') || + flashError.message.includes('disconnected'))); + + // If port disappeared during flash (expected behavior), wait for it to reappear + if (portDisappearedDuringFlash.value) { + log.debug('[FLASH] ', portPath, flashTimer.duration); + try { + await waitForPortToReappear(portPath, 5000, 200); + log.debug('[FLASH] ', portPath); + // Port reappeared - if error was port-related, it might have been temporary + // If it was a real flash error, reject with the original error + cleanup(); + reject(flashError); + } catch (waitError) { + // Port didn't reappear - this could be: + // 1. A real disconnection (if error was port-related) + // 2. A flash failure that prevented reconnection (if error was not port-related) + cleanup(); + if (isPortRelatedError) { + reject( + new PortDisconnectedError( + portPath, + `Port ${portPath} disconnected during flashing and did not reappear: ${flashError instanceof Error ? flashError.message : String(flashError)}` + ) + ); + } else { + // Flash error that prevented reconnection + reject(flashError); + } + } + } else { + // Port didn't disappear during flash, so check if it's still there + try { + await checkPortError(flashError, portPath, 'flashing'); + // Port still exists but couldn't flash - preserve original error + cleanup(); + reject(flashError); + } catch (portError) { + // Port disconnected or already PortDisconnectedError - reject with port error + cleanup(); + reject(portError); + } } } }); diff --git a/apps/electron-app/src/main/ipc.ts b/apps/electron-app/src/main/ipc.ts index 022ca83c..28156145 100644 --- a/apps/electron-app/src/main/ipc.ts +++ b/apps/electron-app/src/main/ipc.ts @@ -4,7 +4,13 @@ import { mainWindowReady } from './window'; import log from 'electron-log/node'; import { exportFlow, selectAudioFiles, readAudioFile } from './file'; -import { ensureRunnerProcess, getRunnerProcess, killRunnerProcess } from './board-connection'; +import { + ensureRunnerProcess, + getRunnerProcess, + killRunnerProcess, + sendFlowToRunner, + getCurrentConnectionState, +} from './board-connection'; import { checkConnectedPort, setupUSBDeviceListeners, stopPortPolling } from './port-manager'; import { Timer } from './utils'; @@ -34,15 +40,7 @@ ipcMain.on('ipc-flow', async (event, data: { ip?: string; nodes: Node[]; edges: await ensureRunnerProcess(data.nodes, data.edges, data.ip); - const runnerProcess = getRunnerProcess(); - log.debug( - '[FLOW] ', - runnerProcess?.pid, - JSON.stringify(data.nodes, null, 2), - JSON.stringify(data.edges, null, 2), - timer.duration - ); - runnerProcess?.send({ type: 'flow', nodes: data.nodes, edges: data.edges }); + await sendFlowToRunner(data.nodes, data.edges, timer); }); ipcMain.on('ipc-external-value', (_event, data: { nodeId: string; value: unknown }) => { @@ -61,6 +59,12 @@ ipcMain.handle('ipc-read-audio-file', async (_event, filePath: string) => { return buffer.toString('base64'); }); +ipcMain.handle('ipc-get-connection-status', async () => { + log.debug('[IPC] '); + const state = getCurrentConnectionState(); + return state; +}); + killRunnerProcess().catch(log.debug); app.on('before-quit', async event => { diff --git a/apps/electron-app/src/main/port-manager.ts b/apps/electron-app/src/main/port-manager.ts index f226fd88..393f9535 100644 --- a/apps/electron-app/src/main/port-manager.ts +++ b/apps/electron-app/src/main/port-manager.ts @@ -23,6 +23,10 @@ let portPollingInterval: NodeJS.Timeout | null = null; let lastKnownPorts: PortInfo[] = []; const PORT_POLL_INTERVAL_MS = 1000; // Poll every second as fallback +// Lock to prevent port polling during critical operations (flashing, connecting) +let portOperationLock = false; +let lockCount = 0; + /** * Gets the currently connected port */ @@ -37,6 +41,34 @@ export function setConnectedPort(port: PortInfo | undefined): void { connectedPort = port; } +/** + * Acquires a lock to prevent port polling during critical operations + * Returns a release function that must be called when the operation completes + */ +export function acquirePortOperationLock(): () => void { + lockCount++; + portOperationLock = true; + log.debug('[PORT-LOCK] ', lockCount); + + return () => { + lockCount--; + if (lockCount <= 0) { + lockCount = 0; + portOperationLock = false; + log.debug('[PORT-LOCK] '); + } else { + log.debug('[PORT-LOCK] ', lockCount); + } + }; +} + +/** + * Checks if a port operation is currently in progress + */ +export function isPortOperationLocked(): boolean { + return portOperationLock; +} + /** * Converts a USB device product ID (number) to a lowercase hex string for matching */ @@ -114,12 +146,29 @@ export async function getKnownBoardsWithPorts(): Promise<[BoardName, PortInfo[]] * Starts polling for port changes as a fallback mechanism */ export function startPortPolling(onPortDisconnected: () => Promise): void { - // Initial port list - getConnectedPorts().then(ports => { + // Initial port list - check for existing boards on startup + getConnectedPorts().then(async ports => { lastKnownPorts = ports; + + // Check if there are any known boards already connected on startup + const boardsWithPorts = await getKnownBoardsWithPorts(); + if (boardsWithPorts.length > 0 && !connectedPort) { + log.debug('[PORTS] ', boardsWithPorts.length); + // Send connect message to trigger connection attempt + sendMessageToRenderer('ipc-board', { + success: true, + data: { type: 'connect', message: 'Board detected' }, + }); + } }); portPollingInterval = setInterval(async () => { + // Skip polling if a critical operation is in progress + if (portOperationLock) { + log.debug('[POLL] ', 'Port operation in progress'); + return; + } + const currentPorts = await getConnectedPorts(); // Check for disconnected port diff --git a/apps/electron-app/src/preload.ts b/apps/electron-app/src/preload.ts index c08b70c3..34a31454 100644 --- a/apps/electron-app/src/preload.ts +++ b/apps/electron-app/src/preload.ts @@ -15,7 +15,8 @@ export type Channels = | 'ipc-flow' | 'ipc-board' | 'ipc-select-audio-files' - | 'ipc-read-audio-file'; + | 'ipc-read-audio-file' + | 'ipc-get-connection-status'; type IpcCallback = (response: IpcResponse) => void; type Listener = (event: IpcRendererEvent, response: IpcResponse) => void; diff --git a/apps/electron-app/src/render/components/forms/MqttSettingsForm.tsx b/apps/electron-app/src/render/components/forms/MqttSettingsForm.tsx index 17ad3c83..7297051e 100644 --- a/apps/electron-app/src/render/components/forms/MqttSettingsForm.tsx +++ b/apps/electron-app/src/render/components/forms/MqttSettingsForm.tsx @@ -9,23 +9,23 @@ import { FormItem, FormLabel, FormMessage, - Icons, Input, - Switch, useForm, Zod, zodResolver, toast, + Tooltip, + TooltipTrigger, + TooltipContent, + Icons, } from '@microflow/ui'; import { useAppStore } from '../../stores/app'; -import { MqttConfig } from '@microflow/mqtt-provider/client'; +import { mqttUrlSchema } from '@microflow/mqtt-provider/client'; const schema = Zod.object({ - host: Zod.string().or(Zod.ipv4()), - port: Zod.number().min(0), + url: mqttUrlSchema, username: Zod.string().optional(), password: Zod.string().optional(), - protocol: Zod.enum(['ws', 'wss']).default('wss'), }); type Schema = Zod.infer; @@ -38,11 +38,9 @@ export function MqttSettingsForm() { mode: 'onChange', reValidateMode: 'onChange', defaultValues: { - host: mqttConfig?.host, - port: mqttConfig?.port, + url: mqttConfig?.url || 'test.mosquitto.org', username: mqttConfig?.username, password: mqttConfig?.password as string, - protocol: (mqttConfig as MqttConfig & { protocol: 'ws' | 'wss' })?.protocol as 'ws' | 'wss', }, }); @@ -76,34 +74,68 @@ export function MqttSettingsForm() { ( - Host + + Broker URL + + + + + +
+

Format:

+ + [<protocol>://]<host>[:<port>][/<path>] + +

+ Defaults: +
protocol=wss, port=8883, path= + /mqtt +

+

Examples:

+
    +
  • + mqtt.xiduzo.com → wss://mqtt.xiduzo.com:8883/mqtt +
  • +
  • + mqtt.xiduzo.com:443 → wss://mqtt.xiduzo.com:443/mqtt +
  • +
  • + mqtt.xiduzo.com/mqtt → wss://mqtt.xiduzo.com:8883/mqtt +
  • +
  • + mqtt.xiduzo.com:443/mqtt → wss://mqtt.xiduzo.com:443/mqtt +
  • +
  • + wss://mqtt.xiduzo.com:8884 → + wss://mqtt.xiduzo.com:8884/mqtt +
  • +
  • + wss://mqtt.xiduzo.com:443/mqtt → + wss://mqtt.xiduzo.com:443/mqtt +
  • +
  • + ws://mqtt.xiduzo.com:1883 → ws://mqtt.xiduzo.com:1883/mqtt +
  • +
+
+
+
+
- - - -
- )} - /> - ( - - Port - - { - const value = e.target.value; - field.onChange(value === '' ? undefined : Number(value)); - }} - /> + + + + [<protocol>://]<host>[:<port>][/<path>] + +
+ + Defaults: wss://<host>:8883/mqtt + +
)} @@ -134,25 +166,6 @@ export function MqttSettingsForm() { )} /> - ( - - Encrypted (wss) - - { - form.setValue('protocol', checked ? 'wss' : 'ws'); - }} - defaultChecked={form.getValues('protocol') === 'wss'} - /> - - - - )} - />
- This plugin will force a connection over wss://, make sure your settings - will connect to an encrypted websocket. + Make sure to use wss:// protocol for encrypted connections.
diff --git a/apps/nextjs-app/components/DownloadApp.tsx b/apps/nextjs-app/components/DownloadApp.tsx index 2a244cb9..80296641 100644 --- a/apps/nextjs-app/components/DownloadApp.tsx +++ b/apps/nextjs-app/components/DownloadApp.tsx @@ -23,7 +23,7 @@ export function DownloadApp() { const [os, setOs] = useState(); function downloadApp() { - const version = '0.9.0'; + const version = '0.9.1'; const baseUrl = `https://github.com/xiduzo/microflow/releases/download/v${version}`; switch (os) { diff --git a/apps/nextjs-app/package.json b/apps/nextjs-app/package.json index c8fdb152..47db5251 100644 --- a/apps/nextjs-app/package.json +++ b/apps/nextjs-app/package.json @@ -1,6 +1,6 @@ { "name": "nextjs-app", - "version": "0.9.0", + "version": "0.9.1", "private": true, "scripts": { "dev": "next dev", diff --git a/packages/mqtt-provider/package.json b/packages/mqtt-provider/package.json index 579624b2..df8c243b 100644 --- a/packages/mqtt-provider/package.json +++ b/packages/mqtt-provider/package.json @@ -5,6 +5,7 @@ "dependencies": { "mqtt": "5.14.1", "react": "18.3.1", + "zod": "^4.1.12", "zustand": "5.0.8" } } diff --git a/packages/mqtt-provider/src/stores/mqtt.ts b/packages/mqtt-provider/src/stores/mqtt.ts index f7fde1aa..91cf34a4 100644 --- a/packages/mqtt-provider/src/stores/mqtt.ts +++ b/packages/mqtt-provider/src/stores/mqtt.ts @@ -1,5 +1,6 @@ import { create } from 'zustand'; import mqtt, { IClientPublishOptions, OnMessageCallback } from 'mqtt'; +import { z } from 'zod'; const clients = ['app', 'plugin'] as const; export type Client = (typeof clients)[number]; @@ -7,9 +8,137 @@ export type Client = (typeof clients)[number]; const ConnectionStatuses = ['connected', 'disconnected', 'connecting'] as const; export type ConnectionStatus = (typeof ConnectionStatuses)[number]; -export type MqttConfig = Partial< - Pick -> & { uniqueId: string }; +/** + * Regex pattern to validate MQTT URL format + * Format: [://][:][/] + * Protocol defaults to wss, port defaults to 8883, path defaults to /mqtt + */ +export const mqttUrlRegex = /^(?:(ws|wss):\/\/)?([^\s\/:]+)(?::(\d+))?(?:\/(.*))?$/; + +/** + * Zod schema for validating MQTT URL format + * Format: [://][:][/] + * - Protocol is optional (defaults to wss) + * - Host is required + * - Port is optional (defaults to 8883) + * - Path is optional (defaults to /mqtt) + * + * Examples: + * - mqtt.xiduzo.com → wss://mqtt.xiduzo.com:8883/mqtt + * - mqtt.xiduzo.com:443 → wss://mqtt.xiduzo.com:443/mqtt + * - mqtt.xiduzo.com/mqtt → wss://mqtt.xiduzo.com:8883/mqtt + * - mqtt.xiduzo.com:443/mqtt → wss://mqtt.xiduzo.com:443/mqtt + * - wss://mqtt.xiduzo.com:443/mqtt → wss://mqtt.xiduzo.com:443/mqtt + */ +export const mqttUrlSchema = z + .string() + .min(1, 'Host is required') + .superRefine((input, ctx) => { + console.log('[MQTT URL Validation] Parsing full URL:', input); + // Check if it's a full URL (starts with ws:// or wss://) + if (input.startsWith('ws://') || input.startsWith('wss://')) { + try { + const urlObj = new URL(input); + const protocol = urlObj.protocol.replace(':', '') as 'ws' | 'wss'; + + if (protocol !== 'ws' && protocol !== 'wss') { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: `Invalid protocol: ${protocol}. Must be 'ws' or 'wss'`, + }); + return; + } + + const host = urlObj.hostname; + if (!host || host.length === 0) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'Host is required', + }); + return; + } + + // Validate port if provided + if (urlObj.port) { + const port = parseInt(urlObj.port, 10); + if (isNaN(port) || port < 1 || port > 65535) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: `Invalid port: ${urlObj.port}. Must be between 1 and 65535`, + }); + return; + } + } + } catch (error) { + console.error('[MQTT URL Validation] Error parsing URL:', input, error); + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: `Invalid URL format. Expected: [ws://|wss://][:][/]. Error: ${error instanceof Error ? error.message : 'Unable to parse URL'}`, + }); + } + } else { + // It's a host with optional port and path - validate format + // Format: host[:port][/path] + if (/\s/.test(input)) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'Host cannot contain spaces', + }); + return; + } + + // Parse host:port/path manually + // Host cannot contain colons or slashes + const portMatch = input.match(/^([^:/]+)(?::(\d+))?(?:\/(.*))?$/); + if (!portMatch) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'Invalid hostname format', + }); + return; + } + + const host = portMatch[1]; + const portStr = portMatch[2]; + + // Validate host + if (!host || host.length === 0) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'Host is required', + }); + return; + } + + // Check for basic hostname validity (contains at least one dot or is localhost) + if (host !== 'localhost' && !host.includes('.')) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: 'Invalid hostname format', + }); + return; + } + + // Validate port if provided + if (portStr) { + const port = parseInt(portStr, 10); + if (isNaN(port) || port < 1 || port > 65535) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: `Invalid port: ${portStr}. Must be between 1 and 65535`, + }); + return; + } + } + } + }); + +export type MqttConfig = { + url: string; // Format: [://][:][/] - protocol defaults to wss, port defaults to 8883, path defaults to /mqtt + username?: string; + password?: string; + uniqueId: string; +}; type Subscription = { callback: OnMessageCallback; @@ -123,6 +252,53 @@ export const useMqttStore = create((set, get) => { }); }; + /** + * Parses an MQTT URL string into connection options + * Format: [://][:][/] + * + * Defaults: + * - Protocol: wss + * - Port: 8883 + * - Path: /mqtt + * + * Examples: + * - mqtt.xiduzo.com → wss://mqtt.xiduzo.com:8883/mqtt + * - mqtt.xiduzo.com:443 → wss://mqtt.xiduzo.com:443/mqtt + * - mqtt.xiduzo.com/mqtt → wss://mqtt.xiduzo.com:8883/mqtt + * - mqtt.xiduzo.com:443/mqtt → wss://mqtt.xiduzo.com:443/mqtt + * - wss://mqtt.xiduzo.com:443/mqtt → wss://mqtt.xiduzo.com:443/mqtt + * + * Note: This function assumes the input has already been validated by mqttUrlSchema + */ + const parseMqttUrl = ( + input: string + ): { + protocol: 'ws' | 'wss'; + host: string; + port: number; + path: string; + } => { + try { + const validatedInput = mqttUrlSchema.parse(input); + const match = validatedInput.match(mqttUrlRegex); + if (!match || !match[2]) { + throw new Error(`Invalid MQTT URL format: ${input}. Could not parse host.`); + } + const [, protocol, host, port, path] = match; + + return { + protocol: (protocol ?? 'wss') as 'ws' | 'wss', + host, + port: port ? parseInt(String(port), 10) : (protocol ?? 'wss') === 'wss' ? 8883 : 1883, + path: path ?? '/mqtt', + }; + } catch (error) { + throw new Error( + `Invalid MQTT URL format: ${input}. Expected format: [://][:][/]` + ); + } + }; + const connect = async (configParam: MqttConfig, appName: Client) => { config = configParam; // Update internal variables if (client) { @@ -132,16 +308,40 @@ export const useMqttStore = create((set, get) => { set({ status: 'connecting', appName, uniqueId: config.uniqueId }); - const defaultClient: mqtt.IClientOptions = { - host: 'test.mosquitto.org', - port: 8081, - }; + // Parse the URL string into connection components + const { protocol, host, port, path } = parseMqttUrl(config.url); + const clientId = `microflow_${appName}_${config.uniqueId}_${Date.now().toString(36)}`; - console.debug('[MQTT] ', config, appName); - client = mqtt.connect({ - ...defaultClient, - protocol: 'wss', - ...config, + // Construct the full URL for WebSocket connections + // mqtt.js requires the URL as the first argument for proper clientId handling + const url = `${protocol}://${host}:${port}${path}`; + + console.debug('[MQTT] Parsed URL:', { + input: config.url, + protocol, + host, + port, + path, + constructedUrl: url, + clientId, + }); + + // Build connection options (without protocol/host/port/path when using URL) + const connectionOptions: mqtt.IClientOptions = { + username: config.username, + password: config.password, + clientId, + // connectTimeout: 30000, // 30 seconds + // keepalive: 60, // 60 seconds + // clean: true, // Start with a clean session + // reconnectPeriod: 1000, // Reconnect after 1 second + // For WSS connections, ensure proper SSL handling + ...(protocol === 'wss' + ? { + // Allow self-signed certificates (common for public brokers) + rejectUnauthorized: false, + } + : {}), will: { topic: `microflow/v1/${config.uniqueId}/${appName}/status`, retain: true, @@ -153,7 +353,10 @@ export const useMqttStore = create((set, get) => { 100, 105, 115, 99, 111, 110, 110, 101, 99, 116, 101, 100, ]) as Buffer, }, - }); + }; + + console.debug('[MQTT] ', config, appName, url, connectionOptions); + client = mqtt.connect(url, connectionOptions); // Handle status messages from other clients const statusHandler = (topic: string, payload: Buffer) => { diff --git a/yarn.lock b/yarn.lock index 49187f7a..18be119e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2493,6 +2493,7 @@ __metadata: dependencies: mqtt: 5.14.1 react: 18.3.1 + zod: ^4.1.12 zustand: 5.0.8 languageName: unknown linkType: soft