diff --git a/_test-harness/index.ts b/_test-harness/index.ts index 6c23c6a..b6bb761 100644 --- a/_test-harness/index.ts +++ b/_test-harness/index.ts @@ -6,9 +6,9 @@ import type { } from "@chat/shared"; const LOAD_BALANCER_URL = "http://localhost:3000"; -const CHATTER_COUNT = 10_000; -const CHAT_ROOM_COUNT = 1; -const MESSAGE_INTERVAL_MIN_MS = 5_000; +const CHATTER_COUNT = 100; +const CHAT_ROOM_COUNT = 7; +const MESSAGE_INTERVAL_MIN_MS = 500; const MESSAGE_INTERVAL_MAX_MS = 90_000; const INITIAL_CONNECT_STAGGER_MS = 15; const RECONNECT_DELAY_MS = 100; diff --git a/chat-client/monitor.ts b/chat-client/monitor.ts deleted file mode 100644 index a6b001c..0000000 --- a/chat-client/monitor.ts +++ /dev/null @@ -1,349 +0,0 @@ -import { createClient } from "redis"; -import blessed from "blessed"; -import { format } from "node:util"; - -// Redis keys (mirrors @chat/shared constants) -const serversClientCountKey = "servers:clients"; -const serversChatRoomsCountKey = "servers:chats"; -const serversHeartbeatKey = "servers:heartbeat"; -const serversSocketWritesPerSecondKey = "servers:mps"; -const serversEventLoopTimeoutKey = "servers:event-loop"; -const redisServerKeyFactory = (id: string) => `server:${id}`; - -const POLL_INTERVAL_MS = 1000; -const MAX_SERVER_LINES = 20; -const UUID_DISPLAY_LEN = 8; - -const trim = (id: string) => id.slice(0, UUID_DISPLAY_LEN); -const timestamp = () => - new Date().toLocaleTimeString("en-US", { hour12: false }); -const fmt = (n: number, decimals = 2) => n.toFixed(decimals); -const fmtAge = (ms: number) => { - if (ms < 1000) return `${ms}ms`; - if (ms < 60_000) return `${(ms / 1000).toFixed(1)}s`; - return `${Math.floor(ms / 60_000)}m${Math.floor((ms % 60_000) / 1000)}s`; -}; -const fmtDuration = (startedAt: number) => { - const s = Math.max(0, Math.floor((Date.now() - startedAt) / 1000)); - const h = Math.floor(s / 3600); - const m = Math.floor((s % 3600) / 60); - const sec = s % 60; - return [h, m, sec].map((v) => v.toString().padStart(2, "0")).join(":"); -}; - -interface ServerMetrics { - id: string; - url: string; - clients: number; - chatRooms: number; - mps: number; - eventLoopTimeout: number; - heartbeatAgeMs: number; -} - -interface Snapshot { - servers: ServerMetrics[]; - totalClients: number; - totalChatRooms: number; - totalMps: number; - pollCount: number; - lastPolledAt: string; - redisStatus: "connected" | "connecting" | "error"; -} - -async function pollRedis( - client: ReturnType, -): Promise { - const now = Date.now(); - - const [clients, chatRooms, heartbeats, mps, eventLoop] = await Promise.all([ - client.zRangeWithScores(serversClientCountKey, 0, -1), - client.zRangeWithScores(serversChatRoomsCountKey, 0, -1), - client.zRangeWithScores(serversHeartbeatKey, 0, -1), - client.zRangeWithScores(serversSocketWritesPerSecondKey, 0, -1), - client.zRangeWithScores(serversEventLoopTimeoutKey, 0, -1), - ]); - - const serverIds = new Set([ - ...clients.map((e) => e.value), - ...heartbeats.map((e) => e.value), - ...mps.map((e) => e.value), - ]); - - const servers: ServerMetrics[] = []; - for (const id of serverIds) { - const url = - (await client.hGet(redisServerKeyFactory(id), "url")) ?? "unknown"; - - servers.push({ - id, - url, - clients: clients.find((e) => e.value === id)?.score ?? 0, - chatRooms: chatRooms.find((e) => e.value === id)?.score ?? 0, - mps: mps.find((e) => e.value === id)?.score ?? 0, - eventLoopTimeout: eventLoop.find((e) => e.value === id)?.score ?? 0, - heartbeatAgeMs: - now - (heartbeats.find((e) => e.value === id)?.score ?? 0), - }); - } - - return servers; -} - -function heartbeatColor(ageMs: number): string { - if (ageMs < 2000) return "{green-fg}"; - if (ageMs < 5000) return "{yellow-fg}"; - return "{red-fg}"; -} - -function formatServerBlock(s: ServerMetrics): string { - const hbColor = heartbeatColor(s.heartbeatAgeMs); - const hbReset = "{/}"; - return [ - ` id ${trim(s.id)}`, - ` url ${s.url}`, - ` clients ${s.clients}`, - ` chat rooms ${s.chatRooms}`, - ` mps ${fmt(s.mps)}`, - ` event loop ${fmt(s.eventLoopTimeout)}ms`, - ` heartbeat ${hbColor}${fmtAge(s.heartbeatAgeMs)} ago${hbReset}`, - ].join("\n"); -} - -class RedisMonitorUi { - private readonly startedAt = Date.now(); - private readonly originalConsole = { - error: console.error.bind(console), - log: console.log.bind(console), - warn: console.warn.bind(console), - }; - - private screen: blessed.Widgets.Screen; - private headerBox: blessed.Widgets.BoxElement; - private metricsBox: blessed.Widgets.BoxElement; - private serversBox: blessed.Widgets.BoxElement; - private logBox: blessed.Widgets.Log; - - private snapshot: Snapshot = { - servers: [], - totalClients: 0, - totalChatRooms: 0, - totalMps: 0, - pollCount: 0, - lastPolledAt: "-", - redisStatus: "connecting", - }; - - constructor() { - this.screen = blessed.screen({ - smartCSR: true, - title: "redis-monitor", - dockBorders: true, - fullUnicode: false, - }); - - this.headerBox = blessed.box({ - parent: this.screen, - top: 0, - left: 0, - width: "100%", - height: 4, - tags: true, - border: "line", - style: { border: { fg: "cyan" } }, - }); - - this.metricsBox = blessed.box({ - parent: this.screen, - top: 4, - left: 0, - width: "50%", - height: 11, - tags: true, - border: "line", - label: " totals ", - style: { border: { fg: "green" } }, - }); - - this.serversBox = blessed.box({ - parent: this.screen, - top: 4, - left: "50%", - width: "50%", - height: "100%-4", - tags: true, - border: "line", - label: " servers ", - scrollable: true, - alwaysScroll: true, - mouse: true, - keys: true, - vi: true, - scrollbar: { ch: " " }, - style: { border: { fg: "yellow" } }, - }); - - this.logBox = blessed.log({ - parent: this.screen, - top: 15, - left: 0, - width: "50%", - height: "100%-15", - tags: false, - border: "line", - label: " events ", - scrollable: true, - alwaysScroll: true, - mouse: true, - keys: true, - vi: true, - scrollbar: { ch: " " }, - style: { border: { fg: "magenta" } }, - }); - - this.screen.key(["C-c", "q", "escape"], () => { - process.kill(process.pid, "SIGINT"); - }); - this.screen.key(["pageup"], () => this.logBox.scroll(-5)); - this.screen.key(["pagedown"], () => this.logBox.scroll(5)); - this.screen.key(["S-pageup"], () => this.serversBox.scroll(-5)); - this.screen.key(["S-pagedown"], () => this.serversBox.scroll(5)); - - this.patchConsole(); - this.render(); - } - - setSnapshot(snapshot: Snapshot) { - this.snapshot = snapshot; - this.render(); - } - - destroy() { - this.restoreConsole(); - try { - this.screen.destroy(); - } catch { - // ignore - } - } - - private patchConsole() { - console.log = (...args: unknown[]) => this.writeLog("log", args); - console.warn = (...args: unknown[]) => this.writeLog("warn", args); - console.error = (...args: unknown[]) => this.writeLog("error", args); - } - - private restoreConsole() { - console.log = this.originalConsole.log; - console.warn = this.originalConsole.warn; - console.error = this.originalConsole.error; - } - - private writeLog(level: "log" | "warn" | "error", args: unknown[]) { - const line = `[${timestamp()}] ${level.toUpperCase()} ${format(...args)}`; - this.logBox.log(line); - this.render(); - } - - private render() { - const { snapshot } = this; - const statusColor = - snapshot.redisStatus === "connected" - ? "{green-fg}" - : snapshot.redisStatus === "connecting" - ? "{yellow-fg}" - : "{red-fg}"; - - this.headerBox.setContent( - [ - `{bold}redis-monitor{/bold} ${statusColor}${snapshot.redisStatus}{/}`, - `uptime ${fmtDuration(this.startedAt)} polls ${snapshot.pollCount} last ${snapshot.lastPolledAt}`, - `watching: ${[serversClientCountKey, serversChatRoomsCountKey, serversHeartbeatKey, serversSocketWritesPerSecondKey, serversEventLoopTimeoutKey].join(" ")}`, - ].join("\n"), - ); - - this.metricsBox.setContent( - [ - `total servers ${snapshot.servers.length}`, - `total clients ${snapshot.totalClients}`, - `total chat rooms ${snapshot.totalChatRooms}`, - `total mps ${fmt(snapshot.totalMps)}`, - ``, - `healthy ${snapshot.servers.filter((s) => s.heartbeatAgeMs < 2000).length}`, - `degraded ${snapshot.servers.filter((s) => s.heartbeatAgeMs >= 2000 && s.heartbeatAgeMs < 5000).length}`, - `dead ${snapshot.servers.filter((s) => s.heartbeatAgeMs >= 5000).length}`, - ].join("\n"), - ); - - const serverBlocks = snapshot.servers - .sort((a, b) => a.id.localeCompare(b.id)) - .slice(0, MAX_SERVER_LINES) - .map(formatServerBlock); - - this.serversBox.setContent( - serverBlocks.length > 0 - ? serverBlocks.join("\n{gray-fg}──────────────────────────{/}\n") - : "{gray-fg}no servers in redis{/}", - ); - - this.screen.render(); - } -} - -async function main() { - const client = createClient(); - const ui = new RedisMonitorUi(); - - let pollCount = 0; - - const poll = async () => { - try { - const servers = await pollRedis(client); - pollCount++; - ui.setSnapshot({ - servers, - totalClients: servers.reduce((s, sv) => s + sv.clients, 0), - totalChatRooms: servers.reduce((s, sv) => s + sv.chatRooms, 0), - totalMps: servers.reduce((s, sv) => s + sv.mps, 0), - pollCount, - lastPolledAt: timestamp(), - redisStatus: "connected", - }); - } catch (err) { - console.error("poll failed:", err); - ui.setSnapshot({ - servers: [], - totalClients: 0, - totalChatRooms: 0, - totalMps: 0, - pollCount, - lastPolledAt: timestamp(), - redisStatus: "error", - }); - } - }; - - const shutdown = async () => { - ui.destroy(); - await client.quit(); - process.exit(0); - }; - - process.on("SIGINT", shutdown); - process.on("SIGTERM", shutdown); - - try { - await client.connect(); - console.log("redis connected"); - } catch (err) { - console.error("redis connect failed:", err); - } - - await poll(); - setInterval(poll, POLL_INTERVAL_MS); -} - -main().catch((err) => { - console.error(err); - process.exit(1); -}); diff --git a/chat-client/package-lock.json b/chat-client/package-lock.json index a88ed4c..c8c66e2 100644 --- a/chat-client/package-lock.json +++ b/chat-client/package-lock.json @@ -61,7 +61,6 @@ "integrity": "sha512-CGOfOJqWjg2qW/Mb6zNsDm+u5vFQ8DxXfbM09z69p5Z6+mE1ikP2jUXw+j42Pf1XTYED2Rni5f95npYeuwMDQA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -618,7 +617,6 @@ "resolved": "https://registry.npmjs.org/@redis/client/-/client-5.11.0.tgz", "integrity": "sha512-GHoprlNQD51Xq2Ztd94HHV94MdFZQ3CVrpA04Fz8MVoHM0B7SlbmPEVIjwTbcv58z8QyjnrOuikS0rWF03k5dQ==", "license": "MIT", - "peer": true, "dependencies": { "cluster-key-slot": "1.1.2" }, @@ -981,7 +979,6 @@ "integrity": "sha512-ilcTH/UniCkMdtexkoCN0bI7pMcJDvmQFPvuPvmEaYA/NSfFTAgdUSLAoVjaRJm7+6PvcM+q1zYOwS4wTYMF9w==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "csstype": "^3.2.2" } @@ -1041,7 +1038,6 @@ "integrity": "sha512-XZzOmihLIr8AD1b9hL9ccNMzEMWt/dE2u7NyTY9jJG6YNiNthaD5XtUHVF2uCXZ15ng+z2hT3MVuxnUYhq6k1g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.57.0", "@typescript-eslint/types": "8.57.0", @@ -1324,7 +1320,6 @@ "integrity": "sha512-UVJyE9MttOsBQIDKw1skb9nAwQuR5wuGD3+82K6JgJlm/Y+KI92oNsMNGZCYdDsVtRHSak0pcV5Dno5+4jh9sw==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -1445,7 +1440,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -1644,7 +1638,6 @@ "integrity": "sha512-XoMjdBOwe/esVgEvLmNsD3IRHkm7fbKIUGvrleloJXUZgDHig2IPWNniv+GwjyJXzuNqVjlr5+4yVUZjycJwfQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -2592,7 +2585,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -2669,7 +2661,6 @@ "resolved": "https://registry.npmjs.org/react/-/react-19.2.4.tgz", "integrity": "sha512-9nfp2hYpCwOjAN+8TZFGhtWEwgvWHXqESH8qT89AT/lWklpLON22Lc8pEtnpsZz7VmawabSU0gCjnj8aC0euHQ==", "license": "MIT", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -2885,7 +2876,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -2971,7 +2961,6 @@ "integrity": "sha512-fPGaRNj9Zytaf8LEiBhY7Z6ijnFKdzU/+mL8EFBaKr7Vw1/FWcTBAMW0wLPJAGMPX38ZPVCVgLceWiEqeoqL2Q==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@oxc-project/runtime": "0.115.0", "lightningcss": "^1.32.0", @@ -3097,7 +3086,6 @@ "integrity": "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==", "dev": true, "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/chat-client/src/App.css b/chat-client/src/App.css index bf31290..56ee6dc 100644 --- a/chat-client/src/App.css +++ b/chat-client/src/App.css @@ -1,12 +1,14 @@ .app { display: grid; gap: 0.35rem; - padding: 1rem; + padding: 1rem 1.25rem; align-content: start; justify-items: start; text-align: left; font-family: "SFMono-Regular", "Consolas", "Liberation Mono", monospace; min-height: 100vh; + width: 100%; + box-sizing: border-box; background: #111; color: #e8e8e8; } @@ -109,6 +111,80 @@ width: 100%; } +.monitor-body { + display: grid; + grid-template-columns: 1fr 22rem; + gap: 0.75rem; + align-items: start; +} + +.monitor-main { + display: grid; + gap: 0.75rem; +} + +.monitor-sidebar { + display: grid; + gap: 0; + background: #181818; + border: 1px solid #2a2a2a; + align-self: start; +} + +.sidebar-title { + font-size: 0.7rem; + color: #555; + letter-spacing: 0.05em; + padding: 0.4rem 0.6rem; + border-bottom: 1px solid #2a2a2a; +} + +.room-row { + display: flex; + justify-content: space-between; + align-items: baseline; + padding: 0.3rem 0.6rem; + font-size: 0.8rem; + border-bottom: 1px solid #1e1e1e; +} + +.room-row:last-child { + border-bottom: none; +} + +.room-name { + color: #aaa; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + flex: 1; + min-width: 0; +} + +.room-clients { + color: #7d9fc5; + font-size: 0.75rem; + width: 4rem; + text-align: right; + flex-shrink: 0; +} + +.room-msgs { + color: #3fb950; + font-size: 0.75rem; + width: 4rem; + text-align: right; + flex-shrink: 0; +} + +.room-swps { + color: #bc8cff; + font-size: 0.75rem; + width: 4rem; + text-align: right; + flex-shrink: 0; +} + .monitor-header { display: flex; align-items: center; @@ -256,6 +332,41 @@ flex-wrap: wrap; } +.server-rooms { + margin-top: 0.5rem; + border-top: 1px solid #2a2a2a; + padding-top: 0.4rem; + display: flex; + flex-direction: column; + gap: 0.1rem; +} + +.server-room-row { + display: flex; + justify-content: space-between; + font-size: 0.75rem; + color: #888; +} + +.server-room-name { + color: #aaa; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + max-width: 80%; +} + +.server-room-clients { + color: #7d9fc5; + font-variant-numeric: tabular-nums; +} + +.server-room-swps { + color: #bc8cff; + font-size: 0.75rem; + font-variant-numeric: tabular-nums; +} + .graph { display: flex; flex-direction: column; diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index 2bd903b..e5661e4 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -2,7 +2,6 @@ import { useEffect, useRef, useState } from "react"; interface ServerHistory { clients: number[]; - chatRooms: number[]; socketWrites: number[]; timeouts: number[]; } @@ -11,17 +10,24 @@ interface ServerMetrics { id: string; url: string | null; clients: number; - chatRooms: number; - mps: number; + socketWritesPerSecond: number; eventLoopTimeout: number; heartbeatAgeMs: number; history: ServerHistory; } +interface ChatRoomMetrics { + id: string; + clients: number; + messagesPerSecond: number; + socketWritesPerSecond: number; +} + interface RedisStats { ts: number; servers: ServerMetrics[]; - totals: { clients: number; chatRooms: number; mps: number }; + chatRooms: ChatRoomMetrics[]; + totals: { clients: number; chatRooms: number; socketWritesPerSecond: number }; } const POLL_MS = 1000; @@ -127,8 +133,8 @@ function ServerCard({ s }: { s: ServerMetrics }) { color="#7d9fc5" /> fmt(v)} @@ -163,6 +169,9 @@ function Graph({
{label} + + avg: {(data.reduce((a, b) => a + b, 0) / data.length).toFixed(3)} + {fmtVal(current)} @@ -234,46 +243,69 @@ export function Monitor() {
{stats && ( - <> -
- - - - - s.heartbeatAgeMs < 2000).length - } - accent="green" - /> - s.heartbeatAgeMs >= 2000 && s.heartbeatAgeMs < 5000, - ).length - } - accent="yellow" - /> - s.heartbeatAgeMs >= 5000).length - } - accent="red" - /> +
+
+
+ + + + s.heartbeatAgeMs < 2000).length + } + accent="green" + /> + s.heartbeatAgeMs >= 2000 && s.heartbeatAgeMs < 5000, + ).length + } + accent="yellow" + /> + s.heartbeatAgeMs >= 5000).length + } + accent="red" + /> +
+ +
+ {stats.servers.length === 0 && ( +

no servers in redis

+ )} + {stats.servers.map((s) => ( + + ))} +
-
- {stats.servers.length === 0 && ( -

no servers in redis

+
+
chat rooms
+ {stats.chatRooms.length === 0 && ( +

no rooms

)} - {stats.servers.map((s) => ( - - ))} + {[...stats.chatRooms] + .sort((a, b) => b.clients - a.clients) + .map((room) => ( +
+ {room.id} + c:{room.clients} + m:{room.messagesPerSecond} + + s:{room.socketWritesPerSecond} + +
+ ))}
- +
)} {status === "error" && !stats && ( diff --git a/chat-client/src/index.css b/chat-client/src/index.css index aad5cb1..e69de29 100644 --- a/chat-client/src/index.css +++ b/chat-client/src/index.css @@ -1,111 +0,0 @@ -:root { - --text: #6b6375; - --text-h: #08060d; - --bg: #fff; - --border: #e5e4e7; - --code-bg: #f4f3ec; - --accent: #aa3bff; - --accent-bg: rgba(170, 59, 255, 0.1); - --accent-border: rgba(170, 59, 255, 0.5); - --social-bg: rgba(244, 243, 236, 0.5); - --shadow: - rgba(0, 0, 0, 0.1) 0 10px 15px -3px, rgba(0, 0, 0, 0.05) 0 4px 6px -2px; - - --sans: system-ui, "Segoe UI", Roboto, sans-serif; - --heading: system-ui, "Segoe UI", Roboto, sans-serif; - --mono: ui-monospace, Consolas, monospace; - - font: 18px/145% var(--sans); - letter-spacing: 0.18px; - color-scheme: light dark; - color: var(--text); - background: var(--bg); - font-synthesis: none; - text-rendering: optimizeLegibility; - -webkit-font-smoothing: antialiased; - -moz-osx-font-smoothing: grayscale; - - @media (max-width: 1024px) { - font-size: 16px; - } -} - -@media (prefers-color-scheme: dark) { - :root { - --text: #9ca3af; - --text-h: #f3f4f6; - --bg: #16171d; - --border: #2e303a; - --code-bg: #1f2028; - --accent: #c084fc; - --accent-bg: rgba(192, 132, 252, 0.15); - --accent-border: rgba(192, 132, 252, 0.5); - --social-bg: rgba(47, 48, 58, 0.5); - --shadow: - rgba(0, 0, 0, 0.4) 0 10px 15px -3px, rgba(0, 0, 0, 0.25) 0 4px 6px -2px; - } - - #social .button-icon { - filter: invert(1) brightness(2); - } -} - -#root { - width: 1126px; - max-width: 100%; - margin: 0 auto; - text-align: center; - border-inline: 1px solid var(--border); - min-height: 100svh; - display: flex; - flex-direction: column; - box-sizing: border-box; -} - -body { - margin: 0; -} - -h1, -h2 { - font-family: var(--heading); - font-weight: 500; - color: var(--text-h); -} - -h1 { - font-size: 56px; - letter-spacing: -1.68px; - margin: 32px 0; - @media (max-width: 1024px) { - font-size: 36px; - margin: 20px 0; - } -} -h2 { - font-size: 24px; - line-height: 118%; - letter-spacing: -0.24px; - margin: 0 0 8px; - @media (max-width: 1024px) { - font-size: 20px; - } -} -p { - margin: 0; -} - -code, -.counter { - font-family: var(--mono); - display: inline-flex; - border-radius: 4px; - color: var(--text-h); -} - -code { - font-size: 15px; - line-height: 135%; - padding: 4px 8px; - background: var(--code-bg); -} diff --git a/chat-server/index.ts b/chat-server/index.ts index 3110096..53e2065 100644 --- a/chat-server/index.ts +++ b/chat-server/index.ts @@ -4,7 +4,10 @@ import { WebSocketServer } from "ws"; import { addServerToRedis, ChatPayload, + chatRoomTotalClientsKey, + chatRoomSocketWritesPerSecondKey, debugLog, + redisChatCountKeyFactory, redisRedistributeChannelFactory, redisServerKeyFactory, RegistrationPayload, @@ -14,17 +17,14 @@ import { serversEventLoopTimeoutKey, serversHeartbeatKey, serversSocketWritesPerSecondKey, - WebSocketMessage, + type WebSocketMessage, + chatRoomMessagesPerSecondKey, } from "@chat/shared"; import { ClientSocket, - EVENTLOOP_TIMEOUT_THRESHOLD_MS, flushRoom, - redistributeBy, redistributeListener, - REQUEST_HELP_EVERY_MS, Room, - setRedistributeBy, } from "./src/utils.js"; import { chatRoomCount, @@ -33,8 +33,6 @@ import { decrementClientCount, incrementChatCount, incrementClientCount, - notTakingNewConnections, - setNotTakingNewConnections, } from "./src/state.js"; const redisClient = createClient(); @@ -80,14 +78,7 @@ void addSelfToRedis(); const rooms: Map = new Map(); -setNotTakingNewConnections(false); wss.on("connection", async (socket) => { - if (notTakingNewConnections) { - socket.close(1); - - return; - } - const client = socket as ClientSocket; client.id = v4(); @@ -118,8 +109,15 @@ wss.on("connection", async (socket) => { client.on("close", async () => { const chatId = client.chatId; if (chatId) { + void redisClient.zIncrBy(chatRoomTotalClientsKey, -1, chatId); + void redisClient.hIncrBy( + redisServerKeyFactory(serverId), + redisChatCountKeyFactory(chatId), + -1, + ); rooms.get(chatId)?.clients.delete(client); } + if (chatId && (rooms.get(chatId)?.clients.size ?? 0) < 1) { debugLog(`unsubscribing from ${chatId}`); rooms.delete(chatId); @@ -149,6 +147,7 @@ await subscriber.subscribe( ); let socketWritesThisSecond = 0; +const socketWritesPerChannelThisSecond: Record = {}; const updateMetrics = () => { void redisClient.zAdd(serversHeartbeatKey, { score: Date.now(), @@ -159,6 +158,28 @@ const updateMetrics = () => { value: serverId, }); socketWritesThisSecond = 0; + Object.keys(socketWritesPerChannelThisSecond).forEach((key) => { + void redisClient.zAdd(chatRoomSocketWritesPerSecondKey, { + score: socketWritesPerChannelThisSecond[key], + value: key, + }); + socketWritesPerChannelThisSecond[key] = 0; + }); + Object.keys(messagesSentPerChannelThisSecond).forEach((key) => { + // todo -- the obvious problem with this approach is that + // for the n number of servers that are receiving messages + // for a given chat room, all n of them will be trying + // to zAdd simultaneously. This doesn't introduce data + // integrity issues I don't think, seeing as they should + // all produce the same numbers; but boy oh boy is writing + // over the same redis key n times with the same value an + // annoying compromise to have to make. + void redisClient.zAdd(chatRoomMessagesPerSecondKey, { + score: messagesSentPerChannelThisSecond[key], + value: key, + }); + messagesSentPerChannelThisSecond[key] = 0; + }); void redisClient.zAdd(serversChatRoomsCountKey, { score: chatRoomCount, value: serverId, @@ -173,53 +194,26 @@ const updateMetrics = () => { }); }; -const offload = () => { - if (socketWritesThisSecond > 75_000) { - const multi = Math.min(socketWritesThisSecond / 150_000 / 2, 0.5); - void redisClient.publish( - "message", - JSON.stringify({ message: `nuking ${clientCount * multi}`, serverId }), - ); - setRedistributeBy(redistributeBy + clientCount * multi); - } -}; - -let lastRequestedHelp = 0; const lastFiveTimeoutValues = new Array(5).fill(0); -const updateTimeoutNumbers = async (timeout: number) => { - const shouldPanic = () => { - return ( - lastFiveTimeoutValues.reduce((a, b) => a + b) / - lastFiveTimeoutValues.length > - EVENTLOOP_TIMEOUT_THRESHOLD_MS && - Date.now() - lastRequestedHelp > REQUEST_HELP_EVERY_MS - ); - }; - - lastFiveTimeoutValues.shift(); - lastFiveTimeoutValues.push(timeout); - if (shouldPanic()) { - lastRequestedHelp = Date.now(); - debugLog("event loop is blocking! timeout: " + timeout); - void redisClient.publish("panic", JSON.stringify({ serverId, timeout })); - } -}; - -setInterval(() => { - setNotTakingNewConnections(false); -}, 1_500); setInterval(updateMetrics, 1000); -setInterval(offload, 500); setInterval(() => { const start = performance.now(); - setImmediate(() => void updateTimeoutNumbers(performance.now() - start)); + setImmediate(() => { + lastFiveTimeoutValues.shift(); + lastFiveTimeoutValues.push(performance.now() - start); + }); }, 1000); - const registerSocket = async ( registrationMessage: WebSocketMessage, socket: ClientSocket, ) => { const chatChannel = registrationMessage.payload.chatId; + await redisClient.zIncrBy(chatRoomTotalClientsKey, 1, chatChannel); + await redisClient.hIncrBy( + redisServerKeyFactory(serverId), + redisChatCountKeyFactory(chatChannel), + 1, + ); if (rooms.get(chatChannel) === undefined) { incrementChatCount(); @@ -242,7 +236,18 @@ const registerSocket = async ( room.queue.push(message); if (!room.running) { - flushRoom(room, () => socketWritesThisSecond++); + flushRoom(room, (socket: ClientSocket) => { + socketWritesThisSecond++; + if (!socket.chatId) { + return; + } + + if (!socketWritesPerChannelThisSecond[socket.chatId]) { + socketWritesPerChannelThisSecond[socket.chatId] = 0; + } + + socketWritesPerChannelThisSecond[socket.chatId] += 1; + }); } }); } @@ -252,6 +257,7 @@ const registerSocket = async ( rooms.get(chatChannel)?.clients.add(socket); }; +const messagesSentPerChannelThisSecond: Record = {}; const publishChat = async ( message: WebSocketMessage, socket: ClientSocket, @@ -260,6 +266,10 @@ const publishChat = async ( return; } + if (!messagesSentPerChannelThisSecond[socket.chatId]) { + messagesSentPerChannelThisSecond[socket.chatId] = 0; + } + messagesSentPerChannelThisSecond[socket.chatId] += 1; await redisClient.publish(socket.chatId, JSON.stringify(message.payload)); }; diff --git a/chat-server/src/state.ts b/chat-server/src/state.ts index 9dfed02..e651440 100644 --- a/chat-server/src/state.ts +++ b/chat-server/src/state.ts @@ -1,9 +1,6 @@ export let chatRoomCount = 0; export let clientCount = 0; -export let notTakingNewConnections = false; export const incrementChatCount = () => chatRoomCount++; export const decrementChatCount = () => chatRoomCount--; export const incrementClientCount = () => clientCount++; export const decrementClientCount = () => clientCount--; -export const setNotTakingNewConnections = (b: boolean) => - (notTakingNewConnections = b); diff --git a/chat-server/src/utils.ts b/chat-server/src/utils.ts index 77b57ce..6e32770 100644 --- a/chat-server/src/utils.ts +++ b/chat-server/src/utils.ts @@ -1,5 +1,4 @@ import { - debugLog, getLowestLoadServer, RedistributionPayload, Server, @@ -49,14 +48,13 @@ export interface Room { } export const redistributeListener = (message: string) => { - // redistributeBy = Math.floor( - // Math.max(Number(message) * REDISTRIBUTE_BY_FACTOR, 1), - // ); - // setNotTakingNewConnections(true); - debugLog(`over by ${message}; nuking ${redistributeBy} clients`); + redistributeBy = Math.floor(Math.max(Number(message) * 0.15, 1)); }; -export const flushRoom = (room: Room, callback: () => void = () => {}) => { +export const flushRoom = ( + room: Room, + callback: (socket: ClientSocket) => void = () => {}, +) => { if (room.queue.length < 1) { room.running = false; @@ -84,7 +82,7 @@ export const flushRoom = (room: Room, callback: () => void = () => {}) => { } else { socket.send(message); } - callback(); + callback(socket); } } diff --git a/load-balancer/index.ts b/load-balancer/index.ts index 86ce6eb..7c67f93 100644 --- a/load-balancer/index.ts +++ b/load-balancer/index.ts @@ -1,10 +1,10 @@ import { app } from "./src/app.ts"; -import { terminalUi } from "./terminal-ui.ts"; -import { childServerMap, shutdown } from "./src/utils.ts"; -import { spawnServer, startIntervals } from "./src/intervals.ts"; +// import { terminalUi } from "./terminal-ui.ts"; +import { childServerMap, shutdown, spawnServer } from "./src/utils.ts"; +import { startIntervals } from "./src/intervals.ts"; const port = 3000; -terminalUi.setRuntimeInfo({ port, serviceName: "load-balancer" }); +// terminalUi.setRuntimeInfo({ port, serviceName: "load-balancer" }); if (childServerMap.size < 1) { console.log("spawning init"); diff --git a/load-balancer/package-lock.json b/load-balancer/package-lock.json index 56c4a17..7e560d4 100644 --- a/load-balancer/package-lock.json +++ b/load-balancer/package-lock.json @@ -54,7 +54,6 @@ "resolved": "https://registry.npmjs.org/@redis/client/-/client-5.11.0.tgz", "integrity": "sha512-GHoprlNQD51Xq2Ztd94HHV94MdFZQ3CVrpA04Fz8MVoHM0B7SlbmPEVIjwTbcv58z8QyjnrOuikS0rWF03k5dQ==", "license": "MIT", - "peer": true, "dependencies": { "cluster-key-slot": "1.1.2" }, diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index d84f843..9c5bb6d 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -1,65 +1,64 @@ -// this is all AI generated import type { Request, Response } from "express"; import { childServerMap, redisClient } from "../utils.ts"; -import { - serversChatRoomsCountKey, - serversClientCountKey, - serversEventLoopTimeoutKey, - serversHeartbeatKey, - serversSocketWritesPerSecondKey, - redisServerKeyFactory, -} from "@chat/shared"; +import { serversHeartbeatKey } from "@chat/shared"; +import { chatRooms } from "../state.ts"; export const redisStats = async (_req: Request, res: Response) => { const now = Date.now(); - const [clients, chatRooms, heartbeats, mps, eventLoop] = await Promise.all([ - redisClient.zRangeWithScores(serversClientCountKey, 0, -1), - redisClient.zRangeWithScores(serversChatRoomsCountKey, 0, -1), - redisClient.zRangeWithScores(serversHeartbeatKey, 0, -1), - redisClient.zRangeWithScores(serversSocketWritesPerSecondKey, 0, -1), - redisClient.zRangeWithScores(serversEventLoopTimeoutKey, 0, -1), - ]); - - const serverIds = new Set([ - ...clients.map((e) => e.value), - ...heartbeats.map((e) => e.value), - ...mps.map((e) => e.value), - ]); - - const servers = await Promise.all( - [...serverIds].map(async (id) => { - const url = - (await redisClient.hGet(redisServerKeyFactory(id), "url")) ?? null; - const state = childServerMap.get(id)?.state; - return { - id, - url, - clients: clients.find((e) => e.value === id)?.score ?? 0, - chatRooms: chatRooms.find((e) => e.value === id)?.score ?? 0, - mps: mps.find((e) => e.value === id)?.score ?? 0, - eventLoopTimeout: eventLoop.find((e) => e.value === id)?.score ?? 0, - heartbeatAgeMs: - now - (heartbeats.find((e) => e.value === id)?.score ?? 0), - history: { - clients: state?.clients ?? [], - chatRooms: state?.chatRooms ?? [], - socketWrites: state?.socketWrites ?? [], - timeouts: state?.timeouts ?? [], - }, - }; - }), + const heartbeats = await redisClient.zRangeWithScores( + serversHeartbeatKey, + 0, + -1, ); + const heartbeatMap = new Map(heartbeats.map((e) => [e.value, e.score])); + + const servers = [...childServerMap.entries()].map(([id, child]) => { + const { state, server } = child; + const last = state.clients.length - 1; + return { + id, + url: server.url, + clients: state.clients[last] ?? 0, + socketWritesPerSecond: state.socketWrites[last] ?? 0, + eventLoopTimeout: state.timeouts[last] ?? 0, + heartbeatAgeMs: now - (heartbeatMap.get(id) ?? 0), + history: { + clients: state.clients, + socketWrites: state.socketWrites, + timeouts: state.timeouts, + }, + }; + }); servers.sort((a, b) => a.id.localeCompare(b.id)); + const chatRoomList = [...chatRooms.entries()].map(([id, room]) => { + const last = room.clients.length - 1; + return { + id, + clients: room.clients[last] ?? 0, + messagesPerSecond: room.messagesPerSecond[last] ?? 0, + socketWritesPerSecond: room.socketWritesPerSecond[last] ?? 0, + history: { + clients: room.clients, + messagesPerSecond: room.messagesPerSecond, + socketWritesPerSecond: room.socketWritesPerSecond, + }, + }; + }); + res.json({ ts: now, servers, + chatRooms: chatRoomList, totals: { clients: servers.reduce((s, sv) => s + sv.clients, 0), - chatRooms: servers.reduce((s, sv) => s + sv.chatRooms, 0), - mps: servers.reduce((s, sv) => s + sv.mps, 0), + chatRooms: chatRoomList.length, + socketWritesPerSecond: chatRoomList.reduce( + (s, r) => s + r.socketWritesPerSecond, + 0, + ), }, }); }; diff --git a/load-balancer/src/controllers/servers.provision.ts b/load-balancer/src/controllers/servers.provision.ts index 92ed5e1..50152ec 100644 --- a/load-balancer/src/controllers/servers.provision.ts +++ b/load-balancer/src/controllers/servers.provision.ts @@ -1,7 +1,42 @@ -import { runtimeState } from "../utils.ts"; +import { + type ChildProcess, + childServerMap, + runtimeState, + spawnServer, +} from "../utils.ts"; import express from "express"; import { incrProvisionsThisSecond } from "../intervals.ts"; -import { getLowestLoadServer } from "@chat/shared"; +import { getLowestLoadServer, type Server } from "@chat/shared"; +import { lastSpawnedServer, setLastSpawnedServer } from "../state.ts"; + +const MAX_SOCKET_SPIKE_LOAD = 75_000; +const serverAtMaxCapacity = (server: ChildProcess): boolean => { + return server.state.socketWrites.max() > MAX_SOCKET_SPIKE_LOAD; +}; +const shouldSpawnServer = () => Date.now() - lastSpawnedServer > 10_000; +export const getBestCandidateServer = async (): Promise => { + const servers = [...childServerMap.values()]; + + let candidate: ChildProcess | undefined = undefined; + servers.forEach((server) => { + if (serverAtMaxCapacity(server)) { + return; + } + + if (true /* some other conditions... */) { + // early return + } + + candidate = server; + }); + + if (!candidate && shouldSpawnServer()) { + setLastSpawnedServer(Date.now()); + return await spawnServer(); + } + + return (candidate as ChildProcess | undefined)?.server; +}; export const provisionServer = async ( req: express.Request, diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index 02dc27f..e7ce0db 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -1,23 +1,24 @@ -import { terminalUi } from "../terminal-ui.ts"; +// import { terminalUi } from "../terminal-ui.ts"; import { childServerMap, redisClient, runtimeState, serverBlacklist, - websocketServerFactory, } from "./utils.ts"; import { - debugLog, - redisRedistributeChannelFactory, + chatRoomMessagesPerSecondKey, + chatRoomSocketWritesPerSecondKey, + chatRoomTotalClientsKey, + type HistoryKey, + NumericList, + redisServerKeyFactory, removeServerFromRedis, - serversChatRoomsCountKey, serversClientCountKey, serversEventLoopTimeoutKey, serversHeartbeatKey, serversSocketWritesPerSecondKey, - type ServerState, } from "@chat/shared"; -import { v4 } from "uuid"; +import { type ChatRoomState, chatRooms } from "./state.ts"; const PPS_SURGE_THRESHOLD = 40; @@ -37,72 +38,6 @@ const AVERAGE_INTERVAL_TIMEOUT_CRITICAL_THRESHOLD = Number( process.env.AVERAGE_INTERVAL_TIMEOUT_CRITICAL_THRESHOLD ?? 10, ); -const shouldRedistribute = ( - distribution: number, - totalClients: number, - totalServers: number, -) => { - const optimalDistribution = totalClients / totalServers; - - return ( - distribution > optimalDistribution && - distribution - optimalDistribution > 1 && - distribution / optimalDistribution > redistributeThreshold && - !isSurge() - ); -}; - -export async function redistributeLoad() { - const [serverConnections, serverWritesPerSecond] = await Promise.all([ - redisClient.zRangeWithScores(serversClientCountKey, 0, -1), - redisClient.zRangeWithScores(serversSocketWritesPerSecondKey, 0, -1), - ]); - const serverConnectionsMap = serverConnections.filter( - (serverConnection) => !serverBlacklist.has(serverConnection.value), - ); - runtimeState.serverMps = serverWritesPerSecond.map(({ value, score }) => [ - value, - score, - ]); - runtimeState.lastRedistribution = null; - const numberOfClients = serverConnectionsMap.reduce( - (a, b) => Number(a) + Number(b.score), - 0, - ); - serverConnectionsMap.sort((a, b) => b.score - a.score); - runtimeState.serverLoads = serverConnectionsMap.map(({ value, score }) => [ - value, - score, - ]); - runtimeState.totalClients = Number(numberOfClients.toFixed(2)); - runtimeState.totalServers = serverConnectionsMap.length; - const optimal = numberOfClients / serverConnectionsMap.length; - runtimeState.optimalDistribution = Number.isFinite(optimal) ? optimal : 0; - - for (const serverScoreMap of serverConnectionsMap) { - if ( - shouldRedistribute( - serverScoreMap.score, - numberOfClients, - serverConnectionsMap.length, - ) - ) { - const redistributeBy = serverScoreMap.score - Math.floor(optimal); - runtimeState.lastRedistribution = { - amount: redistributeBy, - serverId: serverScoreMap.value, - timestamp: new Date().toLocaleTimeString("en-US", { - hour12: false, - }), - }; - await redisClient.publish( - redisRedistributeChannelFactory(serverScoreMap.value), - JSON.stringify(redistributeBy), - ); - } - } -} - const detectTimedOutServers = async () => { const now = Date.now(); const cutoff = now - wssServerTimeoutMs; @@ -132,9 +67,9 @@ const purgeBlacklistedServers = () => { }); }; -const updateServerState = ( +const updateServerStateHistoryArray = ( id: string, - key: keyof ServerState, + key: HistoryKey, value: number, ) => { if (!childServerMap.has(id)) { @@ -153,7 +88,7 @@ export const healthChecks = async () => { -1, ); socketWrites.forEach(({ value: id, score: writesPerSecond }) => - updateServerState(id, "socketWrites", writesPerSecond), + updateServerStateHistoryArray(id, "socketWrites", writesPerSecond), ); // clients const clients = await redisClient.zRangeWithScores( @@ -162,16 +97,7 @@ export const healthChecks = async () => { -1, ); clients.forEach(({ value: id, score: clients }) => - updateServerState(id, "clients", clients), - ); - // chat rooms - const chatRooms = await redisClient.zRangeWithScores( - serversChatRoomsCountKey, - 0, - -1, - ); - chatRooms.forEach(({ value: id, score: chatRooms }) => - updateServerState(id, "chatRooms", chatRooms), + updateServerStateHistoryArray(id, "clients", clients), ); const timeoutValues = await redisClient.zRangeWithScores( serversEventLoopTimeoutKey, @@ -179,133 +105,26 @@ export const healthChecks = async () => { -1, ); timeoutValues.forEach(({ value: id, score: timeout }) => - updateServerState(id, "timeouts", timeout), + updateServerStateHistoryArray(id, "timeouts", timeout), ); await detectTimedOutServers(); purgeBlacklistedServers(); -}; - -export const spawnServer = async () => { - const output = await websocketServerFactory(v4()); - if (output) { - childServerMap.set(output.server.id, output); - } + updatePps(); }; -let lastSpawnedServer = 0; -export const shouldSpawnNewServer = () => { - const lastFiveSecondsOfSocketWrites: Array> = [ - ...childServerMap.values(), - ].map((process) => { - const arr = process.state.socketWrites; - const sampleSize = 5; - if (!arr.length) { - return Array.from({ length: sampleSize }).map(() => 0); - } - - return arr.slice(arr.length - sampleSize, arr.length); - }); - const serversAboveSocketWriteThreshold = lastFiveSecondsOfSocketWrites.filter( - (n) => - n.reduce((a, b) => a + b) / n.length > - SOCKET_WRITES_PER_SECOND_CRITICAL_MASS, - ); - - const maxCapacity = 100_000 * childServerMap.size; - const lastFiveSecondsOfTotalLoad = []; - const len = lastFiveSecondsOfSocketWrites[0]?.length ?? 1; - for (let i = 0; i < len; i++) { - let sum = 0; - for (let j = 0; j < lastFiveSecondsOfSocketWrites.length; j++) { - sum += lastFiveSecondsOfSocketWrites[j][i]; - } - lastFiveSecondsOfTotalLoad.push(sum); - } - - const serverStartedRecently = Date.now() - lastSpawnedServer < 10_000; - const serverAboveCriticalMass = Boolean( - serversAboveSocketWriteThreshold.length, - ); - const cumulativeSocketLoadAboveSafeAverage = Boolean( - lastFiveSecondsOfTotalLoad.reduce((a, b) => a + b) / - lastFiveSecondsOfTotalLoad.length > - maxCapacity, - ); - const areServersTimingOut = Boolean( - [...childServerMap.values()].filter((server) => { - const len = server.state.timeouts.length; - const lastFive = server.state.timeouts.slice(len - 5, len); - return ( - lastFive.reduce((a, b) => a + b) / lastFive.length > - AVERAGE_INTERVAL_TIMEOUT_CRITICAL_THRESHOLD - ); - }).length, - ); - - const val = - !serverStartedRecently && - (serverAboveCriticalMass || - cumulativeSocketLoadAboveSafeAverage || - areServersTimingOut); - - if (val) { - debugLog({ - serverStartedRecently, - serverAboveCriticalMass, - averageSocketLoadAboveSafeAverage: `${cumulativeSocketLoadAboveSafeAverage} - ${lastFiveSecondsOfTotalLoad.reduce((a, b) => a + b) / lastFiveSecondsOfTotalLoad.length} > ${maxCapacity}`, - areServersTimingOut, - }); - } - return val; -}; - -const spawnServerIfRequired = async () => { - if (shouldSpawnNewServer()) { - debugLog("spawning new process"); - await spawnServer(); - lastSpawnedServer = Date.now(); - } -}; - -export async function cleanupDeadServers() { - const loadKeys = await redisClient.zRangeByScore( - serversSocketWritesPerSecondKey, - "-inf", - "+inf", - ); - const timeoutKeys = await redisClient.zRangeByScore( - serversHeartbeatKey, - "-inf", - "+inf", - ); - - loadKeys.forEach((key) => { - if (timeoutKeys.includes(key)) { - return; - } - - removeServerFromRedis(key); - childServerMap.get(key)?.process?.kill(0); - childServerMap.delete(key); - }); - - childServerMap.forEach((server, key) => { - if (server.process.killed) { - void removeServerFromRedis(key); - childServerMap.delete(key); - } - }); -} - +export const ppsHistory: NumericList = new NumericList( + ...Array.from({ length: 100 }).map(() => 0), +); export let pps = 0; -export const isSurge = () => pps > PPS_SURGE_THRESHOLD; export let provisionsThisSecond = 0; export const incrProvisionsThisSecond = () => provisionsThisSecond++; const updatePps = () => { pps = provisionsThisSecond; runtimeState.pps = pps; + ppsHistory.shift(); + ppsHistory.push(pps); provisionsThisSecond = 0; }; @@ -313,43 +132,69 @@ const syncTerminalUi = () => { const clientsByServerId = new Map(runtimeState.serverLoads); const mpsByServerId = new Map(runtimeState.serverMps); - terminalUi.setSnapshot({ - blacklistedServers: [...serverBlacklist.entries()].map( - ([serverId, startedAt]) => [ - serverId, - Math.floor((Date.now() - startedAt) / 1000), - ], - ), - childServers: [...childServerMap.entries()].map(([serverId, child]) => ({ - clients: clientsByServerId.get(serverId) ?? 0, - isKilled: child.process.killed, - mps: mpsByServerId.get(serverId) ?? 0, - pid: child.process.pid, - serverId, - state: child.state, - })), - status: "running", - ...runtimeState, + // terminalUi.setSnapshot({ + // blacklistedServers: [...serverBlacklist.entries()].map( + // ([serverId, startedAt]) => [ + // serverId, + // Math.floor((Date.now() - startedAt) / 1000), + // ], + // ), + // childServers: [...childServerMap.entries()].map(([serverId, child]) => ({ + // clients: clientsByServerId.get(serverId) ?? 0, + // isKilled: child.process.killed, + // mps: mpsByServerId.get(serverId) ?? 0, + // pid: child.process.pid, + // serverId, + // state: child.state, + // })), + // status: "running", + // ...runtimeState, + // }); +}; + +const ensureChatRoom = (id: string): ChatRoomState => { + if (!chatRooms.has(id)) { + const empty = () => + new NumericList(...Array.from({ length: 100 }).map(() => 0)); + chatRooms.set(id, { + clients: empty(), + messagesPerSecond: empty(), + socketWritesPerSecond: empty(), + }); + } + return chatRooms.get(id)!; +}; + +const updateState = async () => { + const [socketWrites, messages, clients] = await Promise.all([ + redisClient.zRangeWithScores(chatRoomSocketWritesPerSecondKey, 0, -1), + redisClient.zRangeWithScores(chatRoomMessagesPerSecondKey, 0, -1), + redisClient.zRangeWithScores(chatRoomTotalClientsKey, 0, -1), + ]); + + socketWrites.forEach(({ value: id, score }) => { + const room = ensureChatRoom(id); + room.socketWritesPerSecond.shift(); + room.socketWritesPerSecond.push(score); + }); + + messages.forEach(({ value: id, score }) => { + const room = ensureChatRoom(id); + room.messagesPerSecond.shift(); + room.messagesPerSecond.push(score); + }); + + clients.forEach(({ value: id, score }) => { + const room = ensureChatRoom(id); + room.clients.shift(); + room.clients.push(score); }); }; export const startIntervals = () => { setInterval(async () => { + await updateState(); await healthChecks(); - }, 1000); - setInterval(async () => { - await spawnServerIfRequired(); - }, 1000); - setInterval(async () => { - await cleanupDeadServers(); - }, 1000); - setInterval(() => { - updatePps(); - }, 1000); - setInterval(async () => { - await redistributeLoad(); - }, 1000); - setInterval(() => { - syncTerminalUi(); + // syncTerminalUi(); }, 1000); }; diff --git a/load-balancer/src/state.ts b/load-balancer/src/state.ts new file mode 100644 index 0000000..b202943 --- /dev/null +++ b/load-balancer/src/state.ts @@ -0,0 +1,12 @@ +import { NumericList } from "@chat/shared"; + +export interface ChatRoomState { + clients: NumericList; + messagesPerSecond: NumericList; + socketWritesPerSecond: NumericList; +} + +export const chatRooms: Map = new Map(); +export let lastSpawnedServer = Date.now(); +export const setLastSpawnedServer = (time: number) => + (lastSpawnedServer = time); diff --git a/load-balancer/src/utils.ts b/load-balancer/src/utils.ts index 5ef3a19..5484733 100644 --- a/load-balancer/src/utils.ts +++ b/load-balancer/src/utils.ts @@ -1,8 +1,10 @@ import { createClient } from "redis"; -import { terminalUi } from "../terminal-ui.ts"; +// import { terminalUi } from "../terminal-ui.ts"; import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import path from "node:path"; import { + chatRoomTotalClientsKey, + chatRoomSocketWritesPerSecondKey, debugLog, defaultServerState, redisServerKeyFactory, @@ -10,6 +12,7 @@ import { type Server, type ServerState, } from "@chat/shared"; +import { v4 } from "uuid"; export const redisClient = createClient(); await redisClient.connect(); @@ -44,13 +47,15 @@ export const runtimeState = { export const shutdown = async () => { try { - terminalUi.destroy(); + // terminalUi.destroy(); await redisClient.quit(); await subscriptionClient.quit(); - childServerMap.forEach((server: ChildProcess, id: string) => { - void removeServerFromRedis(id); - server.process.kill(1); - }); + for (let [id, childServerMapElement] of childServerMap) { + await removeServerFromRedis(id); + childServerMapElement.process.kill(1); + } + await redisClient.del(chatRoomSocketWritesPerSecondKey); + await redisClient.del(chatRoomTotalClientsKey); } catch { redisClient.destroy(); subscriptionClient.destroy(); @@ -66,6 +71,22 @@ await subscriptionClient.subscribe("panic", (server: string) => { ); }); +export const spawnServer = async () => { + const output = await websocketServerFactory(v4()); + + if (output) { + childServerMap.set(output.server.id, output); + + return output.server; + } + + if (!output) { + debugLog("error; unable to spawn server"); + + return undefined; + } +}; + export interface ChildProcess { server: Server; process: ChildProcessWithoutNullStreams; @@ -87,11 +108,22 @@ export const websocketServerFactory = async ( }, ); - const args = [path.resolve("../chat-server/dist/chat-server/index.js")]; + const args = [ + "--experimental-transform-types", + path.resolve("../chat-server/dist/chat-server/index.js"), + ]; args.push(`--id=${id}`); const child = spawn(process.execPath, args); + child.on("error", (e) => debugLog(`child process error: ${e.message}`)); + child.on("exit", (code, signal) => + debugLog(`child exited with code=${code} signal=${signal}`), + ); + child.stderr.on("data", (chunk) => + debugLog(`child stderr: ${chunk.toString().trim()}`), + ); + const timeoutMs = 5_000; const now = Date.now(); const poll = async () => { diff --git a/load-balancer/terminal-ui.ts b/load-balancer/terminal-ui.ts index 2ce041f..e7ed59c 100644 --- a/load-balancer/terminal-ui.ts +++ b/load-balancer/terminal-ui.ts @@ -53,7 +53,8 @@ const timestamp = () => const formatNumber = (value: number) => value.toFixed(2); const formatNumberArray = (values: number[]) => values - .slice(0, MAX_RENDERED_ARRAY_ITEMS) + .slice(values.length - 10, values.length) + .reverse() .map((value) => Number(formatNumber(value))); const formatServerJson = (value: unknown) => JSON.stringify(value, null, 2) @@ -359,7 +360,6 @@ class LoadBalancerTerminalUi { mps: Number(formatNumber(mps)), state: { clients: formatNumberArray(state.clients), - chatRooms: formatNumberArray(state.chatRooms), socketWrites: formatNumberArray(state.socketWrites), timeouts: formatNumberArray(state.timeouts), }, @@ -407,4 +407,4 @@ class LoadBalancerTerminalUi { } } -export const terminalUi = new LoadBalancerTerminalUi(); +// export const terminalUi = new LoadBalancerTerminalUi(); diff --git a/load-balancer/tests/intervals.spec.ts b/load-balancer/tests/intervals.spec.ts index cb15a5e..287b930 100644 --- a/load-balancer/tests/intervals.spec.ts +++ b/load-balancer/tests/intervals.spec.ts @@ -1,24 +1,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { - cleanupDeadServers, - healthChecks, - redistributeLoad, - shouldSpawnNewServer, -} from "@load-balancer/src/intervals.js"; +import { healthChecks } from "@load-balancer/src/intervals.js"; import { v4 } from "uuid"; -import { - ChildProcess, - childServerMap, - serverBlacklist, -} from "@load-balancer/src/utils.js"; -import { - defaultServerState, - redisRedistributeChannelFactory, - serversClientCountKey, - serversHeartbeatKey, - serversSocketWritesPerSecondKey, -} from "@chat/shared"; -import { ChildProcessWithoutNullStreams } from "node:child_process"; +import { serverBlacklist } from "@load-balancer/src/utils.js"; const mockRedisClient = vi.hoisted(() => ({ connect: vi.fn(), @@ -52,68 +35,6 @@ describe("intervals", () => { serverBlacklist.clear(); }); - describe("redistributeLoad", () => { - it("redistributes when distribution is above threshold", async () => { - mockRedisClient.publish = vi.fn(); - const uuid = v4(); - - mockRedisClient.zRangeWithScores = vi - .fn() - .mockResolvedValueOnce([ - { value: v4(), score: 0 }, - { value: uuid, score: 10 }, - ]) - .mockResolvedValueOnce([{ value: uuid, score: 3 }]) - .mockResolvedValueOnce([]); - - await redistributeLoad(); - - expect(mockRedisClient.zRangeWithScores).toHaveBeenCalledTimes(2); - expect(mockRedisClient.zRangeWithScores).toHaveBeenNthCalledWith( - 1, - serversClientCountKey, - 0, - -1, - ); - expect(mockRedisClient.zRangeWithScores).toHaveBeenNthCalledWith( - 2, - serversSocketWritesPerSecondKey, - 0, - -1, - ); - expect(mockRedisClient.publish).toHaveBeenCalledExactlyOnceWith( - redisRedistributeChannelFactory(uuid), - "5", - ); - }); - - it("doesn't trigger redistribution when server is blacklisted", async () => { - mockRedisClient.publish = vi.fn(); - const now = Date.now(); - vi.useFakeTimers(); - vi.setSystemTime(now); - - const healthyServerOne = v4(); - const healthyServerTwo = v4(); - const deadServer = v4(); - - mockRedisClient.zRangeWithScores = vi - .fn() - .mockResolvedValueOnce([ - { value: healthyServerOne, score: 500 }, - { value: healthyServerTwo, score: 500 }, - { value: deadServer, score: 500 }, - ]) - .mockResolvedValueOnce([]); - - serverBlacklist.set(deadServer, Date.now()); - - await redistributeLoad(); - - expect(mockRedisClient.publish).not.toHaveBeenCalled(); - }); - }); - describe("healthChecks", () => { it("adds timed-out servers to server blacklist", async () => { expect(serverBlacklist.size).toBe(0); @@ -148,96 +69,4 @@ describe("intervals", () => { expect(mockedRemoveServerFromRedis).toHaveBeenCalledOnce(); }); }); - - describe("cleanupDeadServers", () => { - it("removes servers that has a ratio key but no timeout key", async () => { - const uuid = v4(); - mockRedisClient.zRangeByScore = vi - .fn() - .mockResolvedValueOnce([uuid]) - .mockResolvedValueOnce(["timeout-server-1"]); - childServerMap.set(uuid, { - process: { - kill: vi.fn(), - killed: false, - } as unknown as ChildProcessWithoutNullStreams, - server: { - id: uuid, - url: "ws://localhost:3001", - }, - state: defaultServerState(), - }); - - await cleanupDeadServers(); - - expect(mockRedisClient.zRangeByScore).toHaveBeenCalledTimes(2); - expect(mockRedisClient.zRangeByScore).toHaveBeenCalledWith( - ...[serversSocketWritesPerSecondKey, "-inf", "+inf"], - ); - expect(mockRedisClient.zRangeByScore).toHaveBeenCalledWith( - serversHeartbeatKey, - "-inf", - "+inf", - ); - expect(mockedRemoveServerFromRedis).toHaveBeenCalledExactlyOnceWith(uuid); - expect(childServerMap.get(uuid)).toBeUndefined(); - }); - }); - - describe("shouldSpawnNewServer", () => { - const childServerFactory = (overrides: Partial) => { - const id = v4(); - return { - server: { - id, - url: "ws://snickers.com:8080,", - }, - process: {} as ChildProcessWithoutNullStreams, - state: defaultServerState(), - ...overrides, - }; - }; - - it("spawns new server if cumulative socket write load is above max capacity", () => { - const a = childServerFactory({ - state: { - ...defaultServerState(), - socketWrites: [100_000, 100_000, 100_000, 100_000, 100_000], - }, - }); - childServerMap.set(a.server.id, a); - const b = childServerFactory({ - state: { - ...defaultServerState(), - socketWrites: [100_000, 100_000, 100_000, 100_000, 100_000], - }, - }); - childServerMap.set(b.server.id, b); - const c = childServerFactory({ - state: { - ...defaultServerState(), - socketWrites: [100_000, 100_000, 100_000, 100_000, 100_000], - }, - }); - childServerMap.set(c.server.id, c); - - const outcome = shouldSpawnNewServer(); - - expect(outcome).toBeTruthy(); - }); - - it("spawns new server if one server has average socket load above critical mass", () => { - const a = childServerFactory({ - state: { - ...defaultServerState(), - socketWrites: [100_000, 80_000, 110_000, 50_000, 60_000], - }, - }); - childServerMap.set(a.server.id, a); - - const outcome = shouldSpawnNewServer(); - - expect(outcome).toBeTruthy(); - }); - }); }); diff --git a/package-lock.json b/package-lock.json index 3e013a7..657fa48 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7,6 +7,7 @@ "name": "chat-workspace", "dependencies": { "@types/supertest": "^7.2.0", + "prettier": "^3.8.1", "supertest": "^7.2.2", "vitest": "^4.1.0" } @@ -404,7 +405,6 @@ "resolved": "https://registry.npmjs.org/@types/node/-/node-25.5.0.tgz", "integrity": "sha512-jp2P3tQMSxWugkCUKLRPVUpGaL5MVFwF8RDuSRztfwgN1wmqJeMSbKlnEtQqU8UrhTmzEmZdu2I6v2dpp7XIxw==", "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~7.18.0" } @@ -1314,7 +1314,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -1350,6 +1349,21 @@ "node": "^10 || ^12 || >=14" } }, + "node_modules/prettier": { + "version": "3.8.1", + "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.8.1.tgz", + "integrity": "sha512-UOnG6LftzbdaHZcKoPFtOcCKztrQ57WkHDeRD9t/PTQtmT0NHSeWWepj6pS0z/N7+08BHFDQVUrfmfMRcZwbMg==", + "license": "MIT", + "bin": { + "prettier": "bin/prettier.cjs" + }, + "engines": { + "node": ">=14" + }, + "funding": { + "url": "https://github.com/prettier/prettier?sponsor=1" + } + }, "node_modules/qs": { "version": "6.15.0", "resolved": "https://registry.npmjs.org/qs/-/qs-6.15.0.tgz", @@ -1589,7 +1603,6 @@ "resolved": "https://registry.npmjs.org/vite/-/vite-8.0.0.tgz", "integrity": "sha512-fPGaRNj9Zytaf8LEiBhY7Z6ijnFKdzU/+mL8EFBaKr7Vw1/FWcTBAMW0wLPJAGMPX38ZPVCVgLceWiEqeoqL2Q==", "license": "MIT", - "peer": true, "dependencies": { "@oxc-project/runtime": "0.115.0", "lightningcss": "^1.32.0", diff --git a/package.json b/package.json index f8a7992..554d1e1 100644 --- a/package.json +++ b/package.json @@ -3,10 +3,12 @@ "private": true, "dependencies": { "@types/supertest": "^7.2.0", + "prettier": "^3.8.1", "supertest": "^7.2.2", "vitest": "^4.1.0" }, "scripts": { - "test": "vitest --run" + "test": "vitest --run", + "prettier": "prettier --write ." } } diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 9a67dad..df51f38 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -5,28 +5,58 @@ export interface Server { url: string; } +export class NumericList extends Array { + max() { + return Math.max(...this); + } + + min() { + return Math.min(...this); + } + + average() { + return this.reduce((a, b) => a + b) / this.length; + } + + deltas() { + return this.map((v, i) => v - (this[i - 1] ?? 0)); + } +} + +export type HistoryKey = Exclude< + { + [K in keyof ServerState]: ServerState[K] extends NumericList ? K : never; + }[keyof ServerState], + "chatRooms" +>; + export interface ServerState { - clients: Array; - chatRooms: Array; - socketWrites: Array; - timeouts: Array; + clients: NumericList; + socketWrites: NumericList; + timeouts: NumericList; + messages: NumericList; } -export const defaultServerState = () => ({ - clients: Array.from({ length: 100 }).map(() => 0), - chatRooms: Array.from({ length: 100 }).map(() => 0), - socketWrites: Array.from({ length: 100 }).map(() => 0), - timeouts: Array.from({ length: 100 }).map(() => 0), +export const defaultServerState = (): ServerState => ({ + clients: new NumericList(...Array.from({ length: 100 }).map(() => 0)), + socketWrites: new NumericList(...Array.from({ length: 100 }).map(() => 0)), + timeouts: new NumericList(...Array.from({ length: 100 }).map(() => 0)), + messages: new NumericList(...Array.from({ length: 100 }).map(() => 0)), }); export const redistributeChannel = "wss-redistribute"; export const serversClientCountKey = "servers:clients"; export const serversChatRoomsCountKey = "servers:chats"; export const serversHeartbeatKey = "servers:heartbeat"; -export const serversSocketWritesPerSecondKey = "servers:mps"; +export const serversSocketWritesPerSecondKey = "servers:swps"; export const serversEventLoopTimeoutKey = "servers:event-loop"; +export const chatRoomSocketWritesPerSecondKey = "chat-rooms:socket-writes"; +export const chatRoomMessagesPerSecondKey = "chat-rooms:messages"; +export const chatRoomTotalClientsKey = "chat-rooms:clients"; export const redisRedistributeChannelFactory = (serverId: string) => `${serverId}-${redistributeChannel}`; export const redisServerKeyFactory = (serverId: string) => `server:${serverId}`; +export const redisChatCountKeyFactory = (chatChannel: string) => + `chat:${chatChannel}`; export const addServerToRedis = async (server: Server) => { const redisClient = createClient(); @@ -108,9 +138,9 @@ export const getLowestLoadServers = async ( await redisClient.connect(); const ids = await redisClient.zRange( - serversSocketWritesPerSecondKey, + serversClientCountKey, 0, - (count ?? 10) - 1, + (count ?? 100) - 1, ); const results: Array<{ id: string; url: string }> = []; diff --git a/schema.json b/schema.json new file mode 100644 index 0000000..1abe196 --- /dev/null +++ b/schema.json @@ -0,0 +1,25 @@ +{ + "chat_rooms": [ + { + "id": "some-chat", + "clients": 0, + "messages_per_second": 0, + "socket_writes_per_second": 0 + } + ], + "servers": [ + { + "id": "some-server", + "url": "ws://localhost:8080", + "chat_rooms": [ + { + "id": "some-chat", + "clients": 0 + } + ], + "last_heartbeat": 123, + "event_loop_timeout": 2, + "cumulative_swps": 0 + } + ] +}