From 5e411e1f9bdec6c734dd188eed03617b9b81fff4 Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 21 Mar 2026 10:56:14 -0400 Subject: [PATCH 01/32] remove all load balancing logic from intervals --- load-balancer/index.ts | 4 +- load-balancer/package-lock.json | 1 - load-balancer/src/intervals.ts | 198 +------------------------- load-balancer/src/utils.ts | 9 ++ load-balancer/tests/intervals.spec.ts | 175 +---------------------- package-lock.json | 3 - 6 files changed, 14 insertions(+), 376 deletions(-) diff --git a/load-balancer/index.ts b/load-balancer/index.ts index 86ce6eb..814a558 100644 --- a/load-balancer/index.ts +++ b/load-balancer/index.ts @@ -1,7 +1,7 @@ import { app } from "./src/app.ts"; import { terminalUi } from "./terminal-ui.ts"; -import { childServerMap, shutdown } from "./src/utils.ts"; -import { spawnServer, startIntervals } from "./src/intervals.ts"; +import { childServerMap, shutdown, spawnServer } from "./src/utils.ts"; +import { startIntervals } from "./src/intervals.ts"; const port = 3000; terminalUi.setRuntimeInfo({ port, serviceName: "load-balancer" }); diff --git a/load-balancer/package-lock.json b/load-balancer/package-lock.json index 56c4a17..7e560d4 100644 --- a/load-balancer/package-lock.json +++ b/load-balancer/package-lock.json @@ -54,7 +54,6 @@ "resolved": "https://registry.npmjs.org/@redis/client/-/client-5.11.0.tgz", "integrity": "sha512-GHoprlNQD51Xq2Ztd94HHV94MdFZQ3CVrpA04Fz8MVoHM0B7SlbmPEVIjwTbcv58z8QyjnrOuikS0rWF03k5dQ==", "license": "MIT", - "peer": true, "dependencies": { "cluster-key-slot": "1.1.2" }, diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index 02dc27f..2eebc7a 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -4,11 +4,8 @@ import { redisClient, runtimeState, serverBlacklist, - websocketServerFactory, } from "./utils.ts"; import { - debugLog, - redisRedistributeChannelFactory, removeServerFromRedis, serversChatRoomsCountKey, serversClientCountKey, @@ -17,7 +14,6 @@ import { serversSocketWritesPerSecondKey, type ServerState, } from "@chat/shared"; -import { v4 } from "uuid"; const PPS_SURGE_THRESHOLD = 40; @@ -37,72 +33,6 @@ const AVERAGE_INTERVAL_TIMEOUT_CRITICAL_THRESHOLD = Number( process.env.AVERAGE_INTERVAL_TIMEOUT_CRITICAL_THRESHOLD ?? 10, ); -const shouldRedistribute = ( - distribution: number, - totalClients: number, - totalServers: number, -) => { - const optimalDistribution = totalClients / totalServers; - - return ( - distribution > optimalDistribution && - distribution - optimalDistribution > 1 && - distribution / optimalDistribution > redistributeThreshold && - !isSurge() - ); -}; - -export async function redistributeLoad() { - const [serverConnections, serverWritesPerSecond] = await Promise.all([ - redisClient.zRangeWithScores(serversClientCountKey, 0, -1), - redisClient.zRangeWithScores(serversSocketWritesPerSecondKey, 0, -1), - ]); - const serverConnectionsMap = serverConnections.filter( - (serverConnection) => !serverBlacklist.has(serverConnection.value), - ); - runtimeState.serverMps = serverWritesPerSecond.map(({ value, score }) => [ - value, - score, - ]); - runtimeState.lastRedistribution = null; - const numberOfClients = serverConnectionsMap.reduce( - (a, b) => Number(a) + Number(b.score), - 0, - ); - serverConnectionsMap.sort((a, b) => b.score - a.score); - runtimeState.serverLoads = serverConnectionsMap.map(({ value, score }) => [ - value, - score, - ]); - runtimeState.totalClients = Number(numberOfClients.toFixed(2)); - runtimeState.totalServers = serverConnectionsMap.length; - const optimal = numberOfClients / serverConnectionsMap.length; - runtimeState.optimalDistribution = Number.isFinite(optimal) ? optimal : 0; - - for (const serverScoreMap of serverConnectionsMap) { - if ( - shouldRedistribute( - serverScoreMap.score, - numberOfClients, - serverConnectionsMap.length, - ) - ) { - const redistributeBy = serverScoreMap.score - Math.floor(optimal); - runtimeState.lastRedistribution = { - amount: redistributeBy, - serverId: serverScoreMap.value, - timestamp: new Date().toLocaleTimeString("en-US", { - hour12: false, - }), - }; - await redisClient.publish( - redisRedistributeChannelFactory(serverScoreMap.value), - JSON.stringify(redistributeBy), - ); - } - } -} - const detectTimedOutServers = async () => { const now = Date.now(); const cutoff = now - wssServerTimeoutMs; @@ -184,123 +114,11 @@ export const healthChecks = async () => { await detectTimedOutServers(); purgeBlacklistedServers(); -}; - -export const spawnServer = async () => { - const output = await websocketServerFactory(v4()); - - if (output) { - childServerMap.set(output.server.id, output); - } -}; - -let lastSpawnedServer = 0; -export const shouldSpawnNewServer = () => { - const lastFiveSecondsOfSocketWrites: Array> = [ - ...childServerMap.values(), - ].map((process) => { - const arr = process.state.socketWrites; - const sampleSize = 5; - if (!arr.length) { - return Array.from({ length: sampleSize }).map(() => 0); - } - - return arr.slice(arr.length - sampleSize, arr.length); - }); - const serversAboveSocketWriteThreshold = lastFiveSecondsOfSocketWrites.filter( - (n) => - n.reduce((a, b) => a + b) / n.length > - SOCKET_WRITES_PER_SECOND_CRITICAL_MASS, - ); - - const maxCapacity = 100_000 * childServerMap.size; - const lastFiveSecondsOfTotalLoad = []; - const len = lastFiveSecondsOfSocketWrites[0]?.length ?? 1; - for (let i = 0; i < len; i++) { - let sum = 0; - for (let j = 0; j < lastFiveSecondsOfSocketWrites.length; j++) { - sum += lastFiveSecondsOfSocketWrites[j][i]; - } - lastFiveSecondsOfTotalLoad.push(sum); - } - - const serverStartedRecently = Date.now() - lastSpawnedServer < 10_000; - const serverAboveCriticalMass = Boolean( - serversAboveSocketWriteThreshold.length, - ); - const cumulativeSocketLoadAboveSafeAverage = Boolean( - lastFiveSecondsOfTotalLoad.reduce((a, b) => a + b) / - lastFiveSecondsOfTotalLoad.length > - maxCapacity, - ); - const areServersTimingOut = Boolean( - [...childServerMap.values()].filter((server) => { - const len = server.state.timeouts.length; - const lastFive = server.state.timeouts.slice(len - 5, len); - return ( - lastFive.reduce((a, b) => a + b) / lastFive.length > - AVERAGE_INTERVAL_TIMEOUT_CRITICAL_THRESHOLD - ); - }).length, - ); - const val = - !serverStartedRecently && - (serverAboveCriticalMass || - cumulativeSocketLoadAboveSafeAverage || - areServersTimingOut); - - if (val) { - debugLog({ - serverStartedRecently, - serverAboveCriticalMass, - averageSocketLoadAboveSafeAverage: `${cumulativeSocketLoadAboveSafeAverage} - ${lastFiveSecondsOfTotalLoad.reduce((a, b) => a + b) / lastFiveSecondsOfTotalLoad.length} > ${maxCapacity}`, - areServersTimingOut, - }); - } - return val; -}; - -const spawnServerIfRequired = async () => { - if (shouldSpawnNewServer()) { - debugLog("spawning new process"); - await spawnServer(); - lastSpawnedServer = Date.now(); - } + updatePps(); }; -export async function cleanupDeadServers() { - const loadKeys = await redisClient.zRangeByScore( - serversSocketWritesPerSecondKey, - "-inf", - "+inf", - ); - const timeoutKeys = await redisClient.zRangeByScore( - serversHeartbeatKey, - "-inf", - "+inf", - ); - - loadKeys.forEach((key) => { - if (timeoutKeys.includes(key)) { - return; - } - - removeServerFromRedis(key); - childServerMap.get(key)?.process?.kill(0); - childServerMap.delete(key); - }); - - childServerMap.forEach((server, key) => { - if (server.process.killed) { - void removeServerFromRedis(key); - childServerMap.delete(key); - } - }); -} - export let pps = 0; -export const isSurge = () => pps > PPS_SURGE_THRESHOLD; export let provisionsThisSecond = 0; export const incrProvisionsThisSecond = () => provisionsThisSecond++; const updatePps = () => { @@ -336,20 +154,6 @@ const syncTerminalUi = () => { export const startIntervals = () => { setInterval(async () => { await healthChecks(); - }, 1000); - setInterval(async () => { - await spawnServerIfRequired(); - }, 1000); - setInterval(async () => { - await cleanupDeadServers(); - }, 1000); - setInterval(() => { - updatePps(); - }, 1000); - setInterval(async () => { - await redistributeLoad(); - }, 1000); - setInterval(() => { syncTerminalUi(); }, 1000); }; diff --git a/load-balancer/src/utils.ts b/load-balancer/src/utils.ts index 5ef3a19..d7e5be3 100644 --- a/load-balancer/src/utils.ts +++ b/load-balancer/src/utils.ts @@ -10,6 +10,7 @@ import { type Server, type ServerState, } from "@chat/shared"; +import { v4 } from "uuid"; export const redisClient = createClient(); await redisClient.connect(); @@ -66,6 +67,14 @@ await subscriptionClient.subscribe("panic", (server: string) => { ); }); +export const spawnServer = async () => { + const output = await websocketServerFactory(v4()); + + if (output) { + childServerMap.set(output.server.id, output); + } +}; + export interface ChildProcess { server: Server; process: ChildProcessWithoutNullStreams; diff --git a/load-balancer/tests/intervals.spec.ts b/load-balancer/tests/intervals.spec.ts index cb15a5e..287b930 100644 --- a/load-balancer/tests/intervals.spec.ts +++ b/load-balancer/tests/intervals.spec.ts @@ -1,24 +1,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import { - cleanupDeadServers, - healthChecks, - redistributeLoad, - shouldSpawnNewServer, -} from "@load-balancer/src/intervals.js"; +import { healthChecks } from "@load-balancer/src/intervals.js"; import { v4 } from "uuid"; -import { - ChildProcess, - childServerMap, - serverBlacklist, -} from "@load-balancer/src/utils.js"; -import { - defaultServerState, - redisRedistributeChannelFactory, - serversClientCountKey, - serversHeartbeatKey, - serversSocketWritesPerSecondKey, -} from "@chat/shared"; -import { ChildProcessWithoutNullStreams } from "node:child_process"; +import { serverBlacklist } from "@load-balancer/src/utils.js"; const mockRedisClient = vi.hoisted(() => ({ connect: vi.fn(), @@ -52,68 +35,6 @@ describe("intervals", () => { serverBlacklist.clear(); }); - describe("redistributeLoad", () => { - it("redistributes when distribution is above threshold", async () => { - mockRedisClient.publish = vi.fn(); - const uuid = v4(); - - mockRedisClient.zRangeWithScores = vi - .fn() - .mockResolvedValueOnce([ - { value: v4(), score: 0 }, - { value: uuid, score: 10 }, - ]) - .mockResolvedValueOnce([{ value: uuid, score: 3 }]) - .mockResolvedValueOnce([]); - - await redistributeLoad(); - - expect(mockRedisClient.zRangeWithScores).toHaveBeenCalledTimes(2); - expect(mockRedisClient.zRangeWithScores).toHaveBeenNthCalledWith( - 1, - serversClientCountKey, - 0, - -1, - ); - expect(mockRedisClient.zRangeWithScores).toHaveBeenNthCalledWith( - 2, - serversSocketWritesPerSecondKey, - 0, - -1, - ); - expect(mockRedisClient.publish).toHaveBeenCalledExactlyOnceWith( - redisRedistributeChannelFactory(uuid), - "5", - ); - }); - - it("doesn't trigger redistribution when server is blacklisted", async () => { - mockRedisClient.publish = vi.fn(); - const now = Date.now(); - vi.useFakeTimers(); - vi.setSystemTime(now); - - const healthyServerOne = v4(); - const healthyServerTwo = v4(); - const deadServer = v4(); - - mockRedisClient.zRangeWithScores = vi - .fn() - .mockResolvedValueOnce([ - { value: healthyServerOne, score: 500 }, - { value: healthyServerTwo, score: 500 }, - { value: deadServer, score: 500 }, - ]) - .mockResolvedValueOnce([]); - - serverBlacklist.set(deadServer, Date.now()); - - await redistributeLoad(); - - expect(mockRedisClient.publish).not.toHaveBeenCalled(); - }); - }); - describe("healthChecks", () => { it("adds timed-out servers to server blacklist", async () => { expect(serverBlacklist.size).toBe(0); @@ -148,96 +69,4 @@ describe("intervals", () => { expect(mockedRemoveServerFromRedis).toHaveBeenCalledOnce(); }); }); - - describe("cleanupDeadServers", () => { - it("removes servers that has a ratio key but no timeout key", async () => { - const uuid = v4(); - mockRedisClient.zRangeByScore = vi - .fn() - .mockResolvedValueOnce([uuid]) - .mockResolvedValueOnce(["timeout-server-1"]); - childServerMap.set(uuid, { - process: { - kill: vi.fn(), - killed: false, - } as unknown as ChildProcessWithoutNullStreams, - server: { - id: uuid, - url: "ws://localhost:3001", - }, - state: defaultServerState(), - }); - - await cleanupDeadServers(); - - expect(mockRedisClient.zRangeByScore).toHaveBeenCalledTimes(2); - expect(mockRedisClient.zRangeByScore).toHaveBeenCalledWith( - ...[serversSocketWritesPerSecondKey, "-inf", "+inf"], - ); - expect(mockRedisClient.zRangeByScore).toHaveBeenCalledWith( - serversHeartbeatKey, - "-inf", - "+inf", - ); - expect(mockedRemoveServerFromRedis).toHaveBeenCalledExactlyOnceWith(uuid); - expect(childServerMap.get(uuid)).toBeUndefined(); - }); - }); - - describe("shouldSpawnNewServer", () => { - const childServerFactory = (overrides: Partial) => { - const id = v4(); - return { - server: { - id, - url: "ws://snickers.com:8080,", - }, - process: {} as ChildProcessWithoutNullStreams, - state: defaultServerState(), - ...overrides, - }; - }; - - it("spawns new server if cumulative socket write load is above max capacity", () => { - const a = childServerFactory({ - state: { - ...defaultServerState(), - socketWrites: [100_000, 100_000, 100_000, 100_000, 100_000], - }, - }); - childServerMap.set(a.server.id, a); - const b = childServerFactory({ - state: { - ...defaultServerState(), - socketWrites: [100_000, 100_000, 100_000, 100_000, 100_000], - }, - }); - childServerMap.set(b.server.id, b); - const c = childServerFactory({ - state: { - ...defaultServerState(), - socketWrites: [100_000, 100_000, 100_000, 100_000, 100_000], - }, - }); - childServerMap.set(c.server.id, c); - - const outcome = shouldSpawnNewServer(); - - expect(outcome).toBeTruthy(); - }); - - it("spawns new server if one server has average socket load above critical mass", () => { - const a = childServerFactory({ - state: { - ...defaultServerState(), - socketWrites: [100_000, 80_000, 110_000, 50_000, 60_000], - }, - }); - childServerMap.set(a.server.id, a); - - const outcome = shouldSpawnNewServer(); - - expect(outcome).toBeTruthy(); - }); - }); }); diff --git a/package-lock.json b/package-lock.json index 3e013a7..74e69dd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -404,7 +404,6 @@ "resolved": "https://registry.npmjs.org/@types/node/-/node-25.5.0.tgz", "integrity": "sha512-jp2P3tQMSxWugkCUKLRPVUpGaL5MVFwF8RDuSRztfwgN1wmqJeMSbKlnEtQqU8UrhTmzEmZdu2I6v2dpp7XIxw==", "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~7.18.0" } @@ -1314,7 +1313,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -1589,7 +1587,6 @@ "resolved": "https://registry.npmjs.org/vite/-/vite-8.0.0.tgz", "integrity": "sha512-fPGaRNj9Zytaf8LEiBhY7Z6ijnFKdzU/+mL8EFBaKr7Vw1/FWcTBAMW0wLPJAGMPX38ZPVCVgLceWiEqeoqL2Q==", "license": "MIT", - "peer": true, "dependencies": { "@oxc-project/runtime": "0.115.0", "lightningcss": "^1.32.0", From 5c35a57261434e1f27440609f82ca64318b88f2a Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 21 Mar 2026 11:04:16 -0400 Subject: [PATCH 02/32] oh my god it was this the whole time --- chat-server/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chat-server/index.ts b/chat-server/index.ts index 3110096..84c92d3 100644 --- a/chat-server/index.ts +++ b/chat-server/index.ts @@ -14,7 +14,7 @@ import { serversEventLoopTimeoutKey, serversHeartbeatKey, serversSocketWritesPerSecondKey, - WebSocketMessage, + type WebSocketMessage, } from "@chat/shared"; import { ClientSocket, From e8804d0fea5f16636bd94038225ff9092f79edff Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 21 Mar 2026 11:04:27 -0400 Subject: [PATCH 03/32] try and surface errors --- load-balancer/src/utils.ts | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/load-balancer/src/utils.ts b/load-balancer/src/utils.ts index d7e5be3..d24a5b1 100644 --- a/load-balancer/src/utils.ts +++ b/load-balancer/src/utils.ts @@ -73,6 +73,10 @@ export const spawnServer = async () => { if (output) { childServerMap.set(output.server.id, output); } + + if (!output) { + debugLog("error; unable to spawn server"); + } }; export interface ChildProcess { @@ -96,11 +100,22 @@ export const websocketServerFactory = async ( }, ); - const args = [path.resolve("../chat-server/dist/chat-server/index.js")]; + const args = [ + "--experimental-transform-types", + path.resolve("../chat-server/dist/chat-server/index.js"), + ]; args.push(`--id=${id}`); const child = spawn(process.execPath, args); + child.on("error", (e) => debugLog(`child process error: ${e.message}`)); + child.on("exit", (code, signal) => + debugLog(`child exited with code=${code} signal=${signal}`), + ); + child.stderr.on("data", (chunk) => + debugLog(`child stderr: ${chunk.toString().trim()}`), + ); + const timeoutMs = 5_000; const now = Date.now(); const poll = async () => { From 93d2c379864677c98d80ee2452ac8627bcf7fda7 Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 21 Mar 2026 11:35:30 -0400 Subject: [PATCH 04/32] remove offloading logic --- chat-server/index.ts | 52 ------------------------------------ chat-server/src/state.ts | 3 --- chat-server/src/utils.ts | 6 +---- load-balancer/terminal-ui.ts | 3 ++- package-lock.json | 16 +++++++++++ package.json | 4 ++- 6 files changed, 22 insertions(+), 62 deletions(-) diff --git a/chat-server/index.ts b/chat-server/index.ts index 84c92d3..6c71f98 100644 --- a/chat-server/index.ts +++ b/chat-server/index.ts @@ -18,13 +18,9 @@ import { } from "@chat/shared"; import { ClientSocket, - EVENTLOOP_TIMEOUT_THRESHOLD_MS, flushRoom, - redistributeBy, redistributeListener, - REQUEST_HELP_EVERY_MS, Room, - setRedistributeBy, } from "./src/utils.js"; import { chatRoomCount, @@ -33,8 +29,6 @@ import { decrementClientCount, incrementChatCount, incrementClientCount, - notTakingNewConnections, - setNotTakingNewConnections, } from "./src/state.js"; const redisClient = createClient(); @@ -80,14 +74,7 @@ void addSelfToRedis(); const rooms: Map = new Map(); -setNotTakingNewConnections(false); wss.on("connection", async (socket) => { - if (notTakingNewConnections) { - socket.close(1); - - return; - } - const client = socket as ClientSocket; client.id = v4(); @@ -173,47 +160,8 @@ const updateMetrics = () => { }); }; -const offload = () => { - if (socketWritesThisSecond > 75_000) { - const multi = Math.min(socketWritesThisSecond / 150_000 / 2, 0.5); - void redisClient.publish( - "message", - JSON.stringify({ message: `nuking ${clientCount * multi}`, serverId }), - ); - setRedistributeBy(redistributeBy + clientCount * multi); - } -}; - -let lastRequestedHelp = 0; const lastFiveTimeoutValues = new Array(5).fill(0); -const updateTimeoutNumbers = async (timeout: number) => { - const shouldPanic = () => { - return ( - lastFiveTimeoutValues.reduce((a, b) => a + b) / - lastFiveTimeoutValues.length > - EVENTLOOP_TIMEOUT_THRESHOLD_MS && - Date.now() - lastRequestedHelp > REQUEST_HELP_EVERY_MS - ); - }; - - lastFiveTimeoutValues.shift(); - lastFiveTimeoutValues.push(timeout); - if (shouldPanic()) { - lastRequestedHelp = Date.now(); - debugLog("event loop is blocking! timeout: " + timeout); - void redisClient.publish("panic", JSON.stringify({ serverId, timeout })); - } -}; - -setInterval(() => { - setNotTakingNewConnections(false); -}, 1_500); setInterval(updateMetrics, 1000); -setInterval(offload, 500); -setInterval(() => { - const start = performance.now(); - setImmediate(() => void updateTimeoutNumbers(performance.now() - start)); -}, 1000); const registerSocket = async ( registrationMessage: WebSocketMessage, diff --git a/chat-server/src/state.ts b/chat-server/src/state.ts index 9dfed02..e651440 100644 --- a/chat-server/src/state.ts +++ b/chat-server/src/state.ts @@ -1,9 +1,6 @@ export let chatRoomCount = 0; export let clientCount = 0; -export let notTakingNewConnections = false; export const incrementChatCount = () => chatRoomCount++; export const decrementChatCount = () => chatRoomCount--; export const incrementClientCount = () => clientCount++; export const decrementClientCount = () => clientCount--; -export const setNotTakingNewConnections = (b: boolean) => - (notTakingNewConnections = b); diff --git a/chat-server/src/utils.ts b/chat-server/src/utils.ts index 77b57ce..eec814d 100644 --- a/chat-server/src/utils.ts +++ b/chat-server/src/utils.ts @@ -49,11 +49,7 @@ export interface Room { } export const redistributeListener = (message: string) => { - // redistributeBy = Math.floor( - // Math.max(Number(message) * REDISTRIBUTE_BY_FACTOR, 1), - // ); - // setNotTakingNewConnections(true); - debugLog(`over by ${message}; nuking ${redistributeBy} clients`); + redistributeBy = Math.floor(Math.max(Number(message) * 0.15, 1)); }; export const flushRoom = (room: Room, callback: () => void = () => {}) => { diff --git a/load-balancer/terminal-ui.ts b/load-balancer/terminal-ui.ts index 2ce041f..54ead7b 100644 --- a/load-balancer/terminal-ui.ts +++ b/load-balancer/terminal-ui.ts @@ -53,7 +53,8 @@ const timestamp = () => const formatNumber = (value: number) => value.toFixed(2); const formatNumberArray = (values: number[]) => values - .slice(0, MAX_RENDERED_ARRAY_ITEMS) + .slice(values.length - 10, values.length) + .reverse() .map((value) => Number(formatNumber(value))); const formatServerJson = (value: unknown) => JSON.stringify(value, null, 2) diff --git a/package-lock.json b/package-lock.json index 74e69dd..657fa48 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7,6 +7,7 @@ "name": "chat-workspace", "dependencies": { "@types/supertest": "^7.2.0", + "prettier": "^3.8.1", "supertest": "^7.2.2", "vitest": "^4.1.0" } @@ -1348,6 +1349,21 @@ "node": "^10 || ^12 || >=14" } }, + "node_modules/prettier": { + "version": "3.8.1", + "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.8.1.tgz", + "integrity": "sha512-UOnG6LftzbdaHZcKoPFtOcCKztrQ57WkHDeRD9t/PTQtmT0NHSeWWepj6pS0z/N7+08BHFDQVUrfmfMRcZwbMg==", + "license": "MIT", + "bin": { + "prettier": "bin/prettier.cjs" + }, + "engines": { + "node": ">=14" + }, + "funding": { + "url": "https://github.com/prettier/prettier?sponsor=1" + } + }, "node_modules/qs": { "version": "6.15.0", "resolved": "https://registry.npmjs.org/qs/-/qs-6.15.0.tgz", diff --git a/package.json b/package.json index f8a7992..554d1e1 100644 --- a/package.json +++ b/package.json @@ -3,10 +3,12 @@ "private": true, "dependencies": { "@types/supertest": "^7.2.0", + "prettier": "^3.8.1", "supertest": "^7.2.2", "vitest": "^4.1.0" }, "scripts": { - "test": "vitest --run" + "test": "vitest --run", + "prettier": "prettier --write ." } } From 6872f4bc0c1c9975ac388da9e979ea81c349e395 Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 21 Mar 2026 11:53:08 -0400 Subject: [PATCH 05/32] fix timeout value updates --- chat-server/index.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/chat-server/index.ts b/chat-server/index.ts index 6c71f98..38317cb 100644 --- a/chat-server/index.ts +++ b/chat-server/index.ts @@ -162,7 +162,13 @@ const updateMetrics = () => { const lastFiveTimeoutValues = new Array(5).fill(0); setInterval(updateMetrics, 1000); - +setInterval(() => { + const start = performance.now(); + setImmediate(() => { + lastFiveTimeoutValues.shift(); + lastFiveTimeoutValues.push(performance.now() - start); + }); +}, 1000); const registerSocket = async ( registrationMessage: WebSocketMessage, socket: ClientSocket, From 0757b8834b4ff9f24966202b574168b1b41820f3 Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 21 Mar 2026 12:09:41 -0400 Subject: [PATCH 06/32] numeric list --- packages/shared/src/index.ts | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 9a67dad..f5b8110 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -5,17 +5,31 @@ export interface Server { url: string; } +class NumericList extends Array { + max() { + return Math.max(...this); + } + + min() { + return Math.min(...this); + } + + average() { + return this.reduce((a, b) => a + b) / this.length; + } +} + export interface ServerState { - clients: Array; - chatRooms: Array; - socketWrites: Array; - timeouts: Array; + clients: NumericList; + chatRooms: NumericList; + socketWrites: NumericList; + timeouts: NumericList; } -export const defaultServerState = () => ({ - clients: Array.from({ length: 100 }).map(() => 0), - chatRooms: Array.from({ length: 100 }).map(() => 0), - socketWrites: Array.from({ length: 100 }).map(() => 0), - timeouts: Array.from({ length: 100 }).map(() => 0), +export const defaultServerState = (): ServerState => ({ + clients: new NumericList(...Array.from({ length: 100 }).map(() => 0)), + chatRooms: new NumericList(...Array.from({ length: 100 }).map(() => 0)), + socketWrites: new NumericList(...Array.from({ length: 100 }).map(() => 0)), + timeouts: new NumericList(...Array.from({ length: 100 }).map(() => 0)), }); export const redistributeChannel = "wss-redistribute"; @@ -110,7 +124,7 @@ export const getLowestLoadServers = async ( const ids = await redisClient.zRange( serversSocketWritesPerSecondKey, 0, - (count ?? 10) - 1, + (count ?? 100) - 1, ); const results: Array<{ id: string; url: string }> = []; From bbf665a0653c87a3c9849317e05a962100aaa4f1 Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 21 Mar 2026 12:51:20 -0400 Subject: [PATCH 07/32] track pps --- load-balancer/src/intervals.ts | 6 ++++++ packages/shared/src/index.ts | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index 2eebc7a..4114b2e 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -6,6 +6,7 @@ import { serverBlacklist, } from "./utils.ts"; import { + NumericList, removeServerFromRedis, serversChatRoomsCountKey, serversClientCountKey, @@ -118,12 +119,17 @@ export const healthChecks = async () => { updatePps(); }; +export const ppsHistory: NumericList = new NumericList( + ...Array.from({ length: 100 }).map(() => 0), +); export let pps = 0; export let provisionsThisSecond = 0; export const incrProvisionsThisSecond = () => provisionsThisSecond++; const updatePps = () => { pps = provisionsThisSecond; runtimeState.pps = pps; + ppsHistory.shift(); + ppsHistory.push(pps); provisionsThisSecond = 0; }; diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index f5b8110..ab8bb25 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -5,7 +5,7 @@ export interface Server { url: string; } -class NumericList extends Array { +export class NumericList extends Array { max() { return Math.max(...this); } From 8d57881498d308a7fe8a07c7667caf12c5140eb4 Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 21 Mar 2026 12:51:39 -0400 Subject: [PATCH 08/32] forEach --> for of --- load-balancer/src/utils.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/load-balancer/src/utils.ts b/load-balancer/src/utils.ts index d24a5b1..e158f83 100644 --- a/load-balancer/src/utils.ts +++ b/load-balancer/src/utils.ts @@ -48,10 +48,10 @@ export const shutdown = async () => { terminalUi.destroy(); await redisClient.quit(); await subscriptionClient.quit(); - childServerMap.forEach((server: ChildProcess, id: string) => { - void removeServerFromRedis(id); - server.process.kill(1); - }); + for (let [id, childServerMapElement] of childServerMap) { + await removeServerFromRedis(id); + childServerMapElement.process.kill(1); + } } catch { redisClient.destroy(); subscriptionClient.destroy(); @@ -72,10 +72,14 @@ export const spawnServer = async () => { if (output) { childServerMap.set(output.server.id, output); + + return output.server; } if (!output) { debugLog("error; unable to spawn server"); + + return undefined; } }; From 83d052e35712c0fd933815ede111655bbab8cbeb Mon Sep 17 00:00:00 2001 From: ryan Date: Sat, 21 Mar 2026 12:51:51 -0400 Subject: [PATCH 09/32] basic load balancing logic --- .../src/controllers/servers.provision.ts | 41 +++++++++++++++++-- load-balancer/src/state.ts | 3 ++ 2 files changed, 41 insertions(+), 3 deletions(-) create mode 100644 load-balancer/src/state.ts diff --git a/load-balancer/src/controllers/servers.provision.ts b/load-balancer/src/controllers/servers.provision.ts index 92ed5e1..abec4ba 100644 --- a/load-balancer/src/controllers/servers.provision.ts +++ b/load-balancer/src/controllers/servers.provision.ts @@ -1,14 +1,49 @@ -import { runtimeState } from "../utils.ts"; +import { + type ChildProcess, + childServerMap, + runtimeState, + spawnServer, +} from "../utils.ts"; import express from "express"; import { incrProvisionsThisSecond } from "../intervals.ts"; -import { getLowestLoadServer } from "@chat/shared"; +import { type Server } from "@chat/shared"; +import { lastSpawnedServer, setLastSpawnedServer } from "../state.ts"; + +const MAX_SOCKET_SPIKE_LOAD = 75_000; +const serverAtMaxCapacity = (server: ChildProcess): boolean => { + return server.state.socketWrites.max() > MAX_SOCKET_SPIKE_LOAD; +}; +const shouldSpawnServer = () => Date.now() - lastSpawnedServer > 10_000; +export const getBestCandidateServer = async (): Promise => { + const servers = [...childServerMap.values()]; + + let candidate: ChildProcess | undefined = undefined; + servers.forEach((server) => { + if (serverAtMaxCapacity(server)) { + return; + } + + if (true /* some other conditions... */) { + // early return + } + + candidate = server; + }); + + if (!candidate && shouldSpawnServer()) { + setLastSpawnedServer(Date.now()); + return await spawnServer(); + } + + return (candidate as ChildProcess | undefined)?.server; +}; export const provisionServer = async ( req: express.Request, res: express.Response, ) => { incrProvisionsThisSecond(); - let server = await getLowestLoadServer(); + let server = await getBestCandidateServer(); if (!server) { res.sendStatus(404); diff --git a/load-balancer/src/state.ts b/load-balancer/src/state.ts new file mode 100644 index 0000000..e3ffea4 --- /dev/null +++ b/load-balancer/src/state.ts @@ -0,0 +1,3 @@ +export let lastSpawnedServer = Date.now(); +export const setLastSpawnedServer = (time: number) => + (lastSpawnedServer = time); From 0d295f2d10bd48d47d9ef9176ad05243fba3098d Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 10:14:27 -0400 Subject: [PATCH 10/32] delete redundant file --- chat-client/monitor.ts | 349 ---------------------------------- chat-client/package-lock.json | 12 -- 2 files changed, 361 deletions(-) delete mode 100644 chat-client/monitor.ts diff --git a/chat-client/monitor.ts b/chat-client/monitor.ts deleted file mode 100644 index a6b001c..0000000 --- a/chat-client/monitor.ts +++ /dev/null @@ -1,349 +0,0 @@ -import { createClient } from "redis"; -import blessed from "blessed"; -import { format } from "node:util"; - -// Redis keys (mirrors @chat/shared constants) -const serversClientCountKey = "servers:clients"; -const serversChatRoomsCountKey = "servers:chats"; -const serversHeartbeatKey = "servers:heartbeat"; -const serversSocketWritesPerSecondKey = "servers:mps"; -const serversEventLoopTimeoutKey = "servers:event-loop"; -const redisServerKeyFactory = (id: string) => `server:${id}`; - -const POLL_INTERVAL_MS = 1000; -const MAX_SERVER_LINES = 20; -const UUID_DISPLAY_LEN = 8; - -const trim = (id: string) => id.slice(0, UUID_DISPLAY_LEN); -const timestamp = () => - new Date().toLocaleTimeString("en-US", { hour12: false }); -const fmt = (n: number, decimals = 2) => n.toFixed(decimals); -const fmtAge = (ms: number) => { - if (ms < 1000) return `${ms}ms`; - if (ms < 60_000) return `${(ms / 1000).toFixed(1)}s`; - return `${Math.floor(ms / 60_000)}m${Math.floor((ms % 60_000) / 1000)}s`; -}; -const fmtDuration = (startedAt: number) => { - const s = Math.max(0, Math.floor((Date.now() - startedAt) / 1000)); - const h = Math.floor(s / 3600); - const m = Math.floor((s % 3600) / 60); - const sec = s % 60; - return [h, m, sec].map((v) => v.toString().padStart(2, "0")).join(":"); -}; - -interface ServerMetrics { - id: string; - url: string; - clients: number; - chatRooms: number; - mps: number; - eventLoopTimeout: number; - heartbeatAgeMs: number; -} - -interface Snapshot { - servers: ServerMetrics[]; - totalClients: number; - totalChatRooms: number; - totalMps: number; - pollCount: number; - lastPolledAt: string; - redisStatus: "connected" | "connecting" | "error"; -} - -async function pollRedis( - client: ReturnType, -): Promise { - const now = Date.now(); - - const [clients, chatRooms, heartbeats, mps, eventLoop] = await Promise.all([ - client.zRangeWithScores(serversClientCountKey, 0, -1), - client.zRangeWithScores(serversChatRoomsCountKey, 0, -1), - client.zRangeWithScores(serversHeartbeatKey, 0, -1), - client.zRangeWithScores(serversSocketWritesPerSecondKey, 0, -1), - client.zRangeWithScores(serversEventLoopTimeoutKey, 0, -1), - ]); - - const serverIds = new Set([ - ...clients.map((e) => e.value), - ...heartbeats.map((e) => e.value), - ...mps.map((e) => e.value), - ]); - - const servers: ServerMetrics[] = []; - for (const id of serverIds) { - const url = - (await client.hGet(redisServerKeyFactory(id), "url")) ?? "unknown"; - - servers.push({ - id, - url, - clients: clients.find((e) => e.value === id)?.score ?? 0, - chatRooms: chatRooms.find((e) => e.value === id)?.score ?? 0, - mps: mps.find((e) => e.value === id)?.score ?? 0, - eventLoopTimeout: eventLoop.find((e) => e.value === id)?.score ?? 0, - heartbeatAgeMs: - now - (heartbeats.find((e) => e.value === id)?.score ?? 0), - }); - } - - return servers; -} - -function heartbeatColor(ageMs: number): string { - if (ageMs < 2000) return "{green-fg}"; - if (ageMs < 5000) return "{yellow-fg}"; - return "{red-fg}"; -} - -function formatServerBlock(s: ServerMetrics): string { - const hbColor = heartbeatColor(s.heartbeatAgeMs); - const hbReset = "{/}"; - return [ - ` id ${trim(s.id)}`, - ` url ${s.url}`, - ` clients ${s.clients}`, - ` chat rooms ${s.chatRooms}`, - ` mps ${fmt(s.mps)}`, - ` event loop ${fmt(s.eventLoopTimeout)}ms`, - ` heartbeat ${hbColor}${fmtAge(s.heartbeatAgeMs)} ago${hbReset}`, - ].join("\n"); -} - -class RedisMonitorUi { - private readonly startedAt = Date.now(); - private readonly originalConsole = { - error: console.error.bind(console), - log: console.log.bind(console), - warn: console.warn.bind(console), - }; - - private screen: blessed.Widgets.Screen; - private headerBox: blessed.Widgets.BoxElement; - private metricsBox: blessed.Widgets.BoxElement; - private serversBox: blessed.Widgets.BoxElement; - private logBox: blessed.Widgets.Log; - - private snapshot: Snapshot = { - servers: [], - totalClients: 0, - totalChatRooms: 0, - totalMps: 0, - pollCount: 0, - lastPolledAt: "-", - redisStatus: "connecting", - }; - - constructor() { - this.screen = blessed.screen({ - smartCSR: true, - title: "redis-monitor", - dockBorders: true, - fullUnicode: false, - }); - - this.headerBox = blessed.box({ - parent: this.screen, - top: 0, - left: 0, - width: "100%", - height: 4, - tags: true, - border: "line", - style: { border: { fg: "cyan" } }, - }); - - this.metricsBox = blessed.box({ - parent: this.screen, - top: 4, - left: 0, - width: "50%", - height: 11, - tags: true, - border: "line", - label: " totals ", - style: { border: { fg: "green" } }, - }); - - this.serversBox = blessed.box({ - parent: this.screen, - top: 4, - left: "50%", - width: "50%", - height: "100%-4", - tags: true, - border: "line", - label: " servers ", - scrollable: true, - alwaysScroll: true, - mouse: true, - keys: true, - vi: true, - scrollbar: { ch: " " }, - style: { border: { fg: "yellow" } }, - }); - - this.logBox = blessed.log({ - parent: this.screen, - top: 15, - left: 0, - width: "50%", - height: "100%-15", - tags: false, - border: "line", - label: " events ", - scrollable: true, - alwaysScroll: true, - mouse: true, - keys: true, - vi: true, - scrollbar: { ch: " " }, - style: { border: { fg: "magenta" } }, - }); - - this.screen.key(["C-c", "q", "escape"], () => { - process.kill(process.pid, "SIGINT"); - }); - this.screen.key(["pageup"], () => this.logBox.scroll(-5)); - this.screen.key(["pagedown"], () => this.logBox.scroll(5)); - this.screen.key(["S-pageup"], () => this.serversBox.scroll(-5)); - this.screen.key(["S-pagedown"], () => this.serversBox.scroll(5)); - - this.patchConsole(); - this.render(); - } - - setSnapshot(snapshot: Snapshot) { - this.snapshot = snapshot; - this.render(); - } - - destroy() { - this.restoreConsole(); - try { - this.screen.destroy(); - } catch { - // ignore - } - } - - private patchConsole() { - console.log = (...args: unknown[]) => this.writeLog("log", args); - console.warn = (...args: unknown[]) => this.writeLog("warn", args); - console.error = (...args: unknown[]) => this.writeLog("error", args); - } - - private restoreConsole() { - console.log = this.originalConsole.log; - console.warn = this.originalConsole.warn; - console.error = this.originalConsole.error; - } - - private writeLog(level: "log" | "warn" | "error", args: unknown[]) { - const line = `[${timestamp()}] ${level.toUpperCase()} ${format(...args)}`; - this.logBox.log(line); - this.render(); - } - - private render() { - const { snapshot } = this; - const statusColor = - snapshot.redisStatus === "connected" - ? "{green-fg}" - : snapshot.redisStatus === "connecting" - ? "{yellow-fg}" - : "{red-fg}"; - - this.headerBox.setContent( - [ - `{bold}redis-monitor{/bold} ${statusColor}${snapshot.redisStatus}{/}`, - `uptime ${fmtDuration(this.startedAt)} polls ${snapshot.pollCount} last ${snapshot.lastPolledAt}`, - `watching: ${[serversClientCountKey, serversChatRoomsCountKey, serversHeartbeatKey, serversSocketWritesPerSecondKey, serversEventLoopTimeoutKey].join(" ")}`, - ].join("\n"), - ); - - this.metricsBox.setContent( - [ - `total servers ${snapshot.servers.length}`, - `total clients ${snapshot.totalClients}`, - `total chat rooms ${snapshot.totalChatRooms}`, - `total mps ${fmt(snapshot.totalMps)}`, - ``, - `healthy ${snapshot.servers.filter((s) => s.heartbeatAgeMs < 2000).length}`, - `degraded ${snapshot.servers.filter((s) => s.heartbeatAgeMs >= 2000 && s.heartbeatAgeMs < 5000).length}`, - `dead ${snapshot.servers.filter((s) => s.heartbeatAgeMs >= 5000).length}`, - ].join("\n"), - ); - - const serverBlocks = snapshot.servers - .sort((a, b) => a.id.localeCompare(b.id)) - .slice(0, MAX_SERVER_LINES) - .map(formatServerBlock); - - this.serversBox.setContent( - serverBlocks.length > 0 - ? serverBlocks.join("\n{gray-fg}──────────────────────────{/}\n") - : "{gray-fg}no servers in redis{/}", - ); - - this.screen.render(); - } -} - -async function main() { - const client = createClient(); - const ui = new RedisMonitorUi(); - - let pollCount = 0; - - const poll = async () => { - try { - const servers = await pollRedis(client); - pollCount++; - ui.setSnapshot({ - servers, - totalClients: servers.reduce((s, sv) => s + sv.clients, 0), - totalChatRooms: servers.reduce((s, sv) => s + sv.chatRooms, 0), - totalMps: servers.reduce((s, sv) => s + sv.mps, 0), - pollCount, - lastPolledAt: timestamp(), - redisStatus: "connected", - }); - } catch (err) { - console.error("poll failed:", err); - ui.setSnapshot({ - servers: [], - totalClients: 0, - totalChatRooms: 0, - totalMps: 0, - pollCount, - lastPolledAt: timestamp(), - redisStatus: "error", - }); - } - }; - - const shutdown = async () => { - ui.destroy(); - await client.quit(); - process.exit(0); - }; - - process.on("SIGINT", shutdown); - process.on("SIGTERM", shutdown); - - try { - await client.connect(); - console.log("redis connected"); - } catch (err) { - console.error("redis connect failed:", err); - } - - await poll(); - setInterval(poll, POLL_INTERVAL_MS); -} - -main().catch((err) => { - console.error(err); - process.exit(1); -}); diff --git a/chat-client/package-lock.json b/chat-client/package-lock.json index a88ed4c..c8c66e2 100644 --- a/chat-client/package-lock.json +++ b/chat-client/package-lock.json @@ -61,7 +61,6 @@ "integrity": "sha512-CGOfOJqWjg2qW/Mb6zNsDm+u5vFQ8DxXfbM09z69p5Z6+mE1ikP2jUXw+j42Pf1XTYED2Rni5f95npYeuwMDQA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -618,7 +617,6 @@ "resolved": "https://registry.npmjs.org/@redis/client/-/client-5.11.0.tgz", "integrity": "sha512-GHoprlNQD51Xq2Ztd94HHV94MdFZQ3CVrpA04Fz8MVoHM0B7SlbmPEVIjwTbcv58z8QyjnrOuikS0rWF03k5dQ==", "license": "MIT", - "peer": true, "dependencies": { "cluster-key-slot": "1.1.2" }, @@ -981,7 +979,6 @@ "integrity": "sha512-ilcTH/UniCkMdtexkoCN0bI7pMcJDvmQFPvuPvmEaYA/NSfFTAgdUSLAoVjaRJm7+6PvcM+q1zYOwS4wTYMF9w==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "csstype": "^3.2.2" } @@ -1041,7 +1038,6 @@ "integrity": "sha512-XZzOmihLIr8AD1b9hL9ccNMzEMWt/dE2u7NyTY9jJG6YNiNthaD5XtUHVF2uCXZ15ng+z2hT3MVuxnUYhq6k1g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.57.0", "@typescript-eslint/types": "8.57.0", @@ -1324,7 +1320,6 @@ "integrity": "sha512-UVJyE9MttOsBQIDKw1skb9nAwQuR5wuGD3+82K6JgJlm/Y+KI92oNsMNGZCYdDsVtRHSak0pcV5Dno5+4jh9sw==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -1445,7 +1440,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -1644,7 +1638,6 @@ "integrity": "sha512-XoMjdBOwe/esVgEvLmNsD3IRHkm7fbKIUGvrleloJXUZgDHig2IPWNniv+GwjyJXzuNqVjlr5+4yVUZjycJwfQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -2592,7 +2585,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -2669,7 +2661,6 @@ "resolved": "https://registry.npmjs.org/react/-/react-19.2.4.tgz", "integrity": "sha512-9nfp2hYpCwOjAN+8TZFGhtWEwgvWHXqESH8qT89AT/lWklpLON22Lc8pEtnpsZz7VmawabSU0gCjnj8aC0euHQ==", "license": "MIT", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -2885,7 +2876,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -2971,7 +2961,6 @@ "integrity": "sha512-fPGaRNj9Zytaf8LEiBhY7Z6ijnFKdzU/+mL8EFBaKr7Vw1/FWcTBAMW0wLPJAGMPX38ZPVCVgLceWiEqeoqL2Q==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@oxc-project/runtime": "0.115.0", "lightningcss": "^1.32.0", @@ -3097,7 +3086,6 @@ "integrity": "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg==", "dev": true, "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } From 21e086ac0d03dbe6c1f6c12055ed0bda09ac549f Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 10:15:07 -0400 Subject: [PATCH 11/32] record total chat messages --- chat-server/index.ts | 2 ++ packages/shared/src/index.ts | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/chat-server/index.ts b/chat-server/index.ts index 38317cb..443d0f5 100644 --- a/chat-server/index.ts +++ b/chat-server/index.ts @@ -4,6 +4,7 @@ import { WebSocketServer } from "ws"; import { addServerToRedis, ChatPayload, + chatRoomTotalMessagesKey, debugLog, redisRedistributeChannelFactory, redisServerKeyFactory, @@ -214,6 +215,7 @@ const publishChat = async ( return; } + void redisClient.zIncrBy(chatRoomTotalMessagesKey, 1, socket.chatId); await redisClient.publish(socket.chatId, JSON.stringify(message.payload)); }; diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index ab8bb25..977d90d 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -36,8 +36,9 @@ export const redistributeChannel = "wss-redistribute"; export const serversClientCountKey = "servers:clients"; export const serversChatRoomsCountKey = "servers:chats"; export const serversHeartbeatKey = "servers:heartbeat"; -export const serversSocketWritesPerSecondKey = "servers:mps"; +export const serversSocketWritesPerSecondKey = "servers:swps"; export const serversEventLoopTimeoutKey = "servers:event-loop"; +export const chatRoomTotalMessagesKey = "chat-rooms:messages"; export const redisRedistributeChannelFactory = (serverId: string) => `${serverId}-${redistributeChannel}`; export const redisServerKeyFactory = (serverId: string) => `server:${serverId}`; From 370978d2579605f51d57059c7be7ddd515aa5e93 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 10:24:21 -0400 Subject: [PATCH 12/32] report chat message totals --- chat-client/src/App.css | 61 +++++++++- chat-client/src/Monitor.tsx | 94 ++++++++++------ chat-client/src/index.css | 111 ------------------- load-balancer/src/controllers/redis.stats.ts | 10 ++ 4 files changed, 128 insertions(+), 148 deletions(-) diff --git a/chat-client/src/App.css b/chat-client/src/App.css index bf31290..cd3754c 100644 --- a/chat-client/src/App.css +++ b/chat-client/src/App.css @@ -1,12 +1,14 @@ .app { display: grid; gap: 0.35rem; - padding: 1rem; + padding: 1rem 1.25rem; align-content: start; justify-items: start; text-align: left; font-family: "SFMono-Regular", "Consolas", "Liberation Mono", monospace; min-height: 100vh; + width: 100%; + box-sizing: border-box; background: #111; color: #e8e8e8; } @@ -109,6 +111,63 @@ width: 100%; } +.monitor-body { + display: grid; + grid-template-columns: 1fr 220px; + gap: 0.75rem; + align-items: start; +} + +.monitor-main { + display: grid; + gap: 0.75rem; +} + +.monitor-sidebar { + display: grid; + gap: 0; + background: #181818; + border: 1px solid #2a2a2a; + align-self: start; +} + +.sidebar-title { + font-size: 0.7rem; + color: #555; + letter-spacing: 0.05em; + padding: 0.4rem 0.6rem; + border-bottom: 1px solid #2a2a2a; +} + +.room-row { + display: flex; + justify-content: space-between; + align-items: baseline; + padding: 0.3rem 0.6rem; + font-size: 0.8rem; + border-bottom: 1px solid #1e1e1e; +} + +.room-row:last-child { + border-bottom: none; +} + +.room-name { + color: #aaa; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + flex: 1; + min-width: 0; +} + +.room-msgs { + color: #3fb950; + font-size: 0.75rem; + margin-left: 0.5rem; + flex-shrink: 0; +} + .monitor-header { display: flex; align-items: center; diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index 2bd903b..f1a15fe 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -22,6 +22,7 @@ interface RedisStats { ts: number; servers: ServerMetrics[]; totals: { clients: number; chatRooms: number; mps: number }; + chatRooms: { messageCounts: { value: string; score: number }[] }; } const POLL_MS = 1000; @@ -127,7 +128,7 @@ function ServerCard({ s }: { s: ServerMetrics }) { color="#7d9fc5" />
{label} + + avg: {(data.reduce((a, b) => a + b) / data.length).toFixed(3)} + {fmtVal(current)} @@ -221,6 +225,10 @@ export function Monitor() { }; }, []); + const sortedRooms = stats + ? [...stats.chatRooms.messageCounts].sort((a, b) => b.score - a.score) + : []; + return (
@@ -234,46 +242,60 @@ export function Monitor() {
{stats && ( - <> -
- - - - - s.heartbeatAgeMs < 2000).length - } - accent="green" - /> - s.heartbeatAgeMs >= 2000 && s.heartbeatAgeMs < 5000, - ).length - } - accent="yellow" - /> - s.heartbeatAgeMs >= 5000).length - } - accent="red" - /> +
+
+
+ + + + s.heartbeatAgeMs < 2000).length + } + accent="green" + /> + s.heartbeatAgeMs >= 2000 && s.heartbeatAgeMs < 5000, + ).length + } + accent="yellow" + /> + s.heartbeatAgeMs >= 5000).length + } + accent="red" + /> +
+ +
+ {stats.servers.length === 0 && ( +

no servers in redis

+ )} + {stats.servers.map((s) => ( + + ))} +
-
- {stats.servers.length === 0 && ( -

no servers in redis

+
+
chat rooms
+ {sortedRooms.length === 0 && ( +

no rooms

)} - {stats.servers.map((s) => ( - + {sortedRooms.map((room) => ( +
+ {room.value} + {room.score} +
))}
- +
)} {status === "error" && !stats && ( diff --git a/chat-client/src/index.css b/chat-client/src/index.css index aad5cb1..e69de29 100644 --- a/chat-client/src/index.css +++ b/chat-client/src/index.css @@ -1,111 +0,0 @@ -:root { - --text: #6b6375; - --text-h: #08060d; - --bg: #fff; - --border: #e5e4e7; - --code-bg: #f4f3ec; - --accent: #aa3bff; - --accent-bg: rgba(170, 59, 255, 0.1); - --accent-border: rgba(170, 59, 255, 0.5); - --social-bg: rgba(244, 243, 236, 0.5); - --shadow: - rgba(0, 0, 0, 0.1) 0 10px 15px -3px, rgba(0, 0, 0, 0.05) 0 4px 6px -2px; - - --sans: system-ui, "Segoe UI", Roboto, sans-serif; - --heading: system-ui, "Segoe UI", Roboto, sans-serif; - --mono: ui-monospace, Consolas, monospace; - - font: 18px/145% var(--sans); - letter-spacing: 0.18px; - color-scheme: light dark; - color: var(--text); - background: var(--bg); - font-synthesis: none; - text-rendering: optimizeLegibility; - -webkit-font-smoothing: antialiased; - -moz-osx-font-smoothing: grayscale; - - @media (max-width: 1024px) { - font-size: 16px; - } -} - -@media (prefers-color-scheme: dark) { - :root { - --text: #9ca3af; - --text-h: #f3f4f6; - --bg: #16171d; - --border: #2e303a; - --code-bg: #1f2028; - --accent: #c084fc; - --accent-bg: rgba(192, 132, 252, 0.15); - --accent-border: rgba(192, 132, 252, 0.5); - --social-bg: rgba(47, 48, 58, 0.5); - --shadow: - rgba(0, 0, 0, 0.4) 0 10px 15px -3px, rgba(0, 0, 0, 0.25) 0 4px 6px -2px; - } - - #social .button-icon { - filter: invert(1) brightness(2); - } -} - -#root { - width: 1126px; - max-width: 100%; - margin: 0 auto; - text-align: center; - border-inline: 1px solid var(--border); - min-height: 100svh; - display: flex; - flex-direction: column; - box-sizing: border-box; -} - -body { - margin: 0; -} - -h1, -h2 { - font-family: var(--heading); - font-weight: 500; - color: var(--text-h); -} - -h1 { - font-size: 56px; - letter-spacing: -1.68px; - margin: 32px 0; - @media (max-width: 1024px) { - font-size: 36px; - margin: 20px 0; - } -} -h2 { - font-size: 24px; - line-height: 118%; - letter-spacing: -0.24px; - margin: 0 0 8px; - @media (max-width: 1024px) { - font-size: 20px; - } -} -p { - margin: 0; -} - -code, -.counter { - font-family: var(--mono); - display: inline-flex; - border-radius: 4px; - color: var(--text-h); -} - -code { - font-size: 15px; - line-height: 135%; - padding: 4px 8px; - background: var(--code-bg); -} diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index d84f843..2615056 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -8,6 +8,7 @@ import { serversHeartbeatKey, serversSocketWritesPerSecondKey, redisServerKeyFactory, + chatRoomTotalMessagesKey, } from "@chat/shared"; export const redisStats = async (_req: Request, res: Response) => { @@ -53,6 +54,12 @@ export const redisStats = async (_req: Request, res: Response) => { servers.sort((a, b) => a.id.localeCompare(b.id)); + const messageCounts = await redisClient.zRangeWithScores( + chatRoomTotalMessagesKey, + 0, + -1, + ); + res.json({ ts: now, servers, @@ -61,5 +68,8 @@ export const redisStats = async (_req: Request, res: Response) => { chatRooms: servers.reduce((s, sv) => s + sv.chatRooms, 0), mps: servers.reduce((s, sv) => s + sv.mps, 0), }, + chatRooms: { + messageCounts, + }, }); }; From 57d560399759fa87996411dc39f755b134b8e2f6 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 10:45:11 -0400 Subject: [PATCH 13/32] track chat room messages in intervals.ts, clear on load balancer shutdown --- load-balancer/src/intervals.ts | 9 +++++++++ load-balancer/src/utils.ts | 2 ++ packages/shared/src/index.ts | 6 ++++++ 3 files changed, 17 insertions(+) diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index 4114b2e..b3cd91b 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -6,6 +6,7 @@ import { serverBlacklist, } from "./utils.ts"; import { + chatRoomTotalMessagesKey, NumericList, removeServerFromRedis, serversChatRoomsCountKey, @@ -112,6 +113,14 @@ export const healthChecks = async () => { timeoutValues.forEach(({ value: id, score: timeout }) => updateServerState(id, "timeouts", timeout), ); + const chatRoomMessages = await redisClient.zRangeWithScores( + chatRoomTotalMessagesKey, + 0, + -1, + ); + chatRoomMessages.forEach(({ value: id, score: totalMessages }) => + updateServerState(id, "messages", totalMessages), + ); await detectTimedOutServers(); purgeBlacklistedServers(); diff --git a/load-balancer/src/utils.ts b/load-balancer/src/utils.ts index e158f83..a07a360 100644 --- a/load-balancer/src/utils.ts +++ b/load-balancer/src/utils.ts @@ -3,6 +3,7 @@ import { terminalUi } from "../terminal-ui.ts"; import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import path from "node:path"; import { + chatRoomTotalMessagesKey, debugLog, defaultServerState, redisServerKeyFactory, @@ -52,6 +53,7 @@ export const shutdown = async () => { await removeServerFromRedis(id); childServerMapElement.process.kill(1); } + await redisClient.del(chatRoomTotalMessagesKey) } catch { redisClient.destroy(); subscriptionClient.destroy(); diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 977d90d..946adca 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -17,6 +17,10 @@ export class NumericList extends Array { average() { return this.reduce((a, b) => a + b) / this.length; } + + deltas() { + return this.map((v, i) => v - (this[i - 1] ?? 0)); + } } export interface ServerState { @@ -24,12 +28,14 @@ export interface ServerState { chatRooms: NumericList; socketWrites: NumericList; timeouts: NumericList; + messages: NumericList; } export const defaultServerState = (): ServerState => ({ clients: new NumericList(...Array.from({ length: 100 }).map(() => 0)), chatRooms: new NumericList(...Array.from({ length: 100 }).map(() => 0)), socketWrites: new NumericList(...Array.from({ length: 100 }).map(() => 0)), timeouts: new NumericList(...Array.from({ length: 100 }).map(() => 0)), + messages: new NumericList(...Array.from({ length: 100 }).map(() => 0)), }); export const redistributeChannel = "wss-redistribute"; From cd9288e3bb9c80bc1bc34cdee11ae2f66266d4c0 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 10:57:16 -0400 Subject: [PATCH 14/32] fe change --- chat-client/src/Monitor.tsx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index f1a15fe..02e268d 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -165,7 +165,7 @@ function Graph({
{label} - avg: {(data.reduce((a, b) => a + b) / data.length).toFixed(3)} + avg: {(data.reduce((a, b) => a + b, 0) / data.length).toFixed(3)} {fmtVal(current)} @@ -247,7 +247,7 @@ export function Monitor() {
- + Date: Sun, 22 Mar 2026 10:57:35 -0400 Subject: [PATCH 15/32] temp change --- load-balancer/src/controllers/servers.provision.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/load-balancer/src/controllers/servers.provision.ts b/load-balancer/src/controllers/servers.provision.ts index abec4ba..50152ec 100644 --- a/load-balancer/src/controllers/servers.provision.ts +++ b/load-balancer/src/controllers/servers.provision.ts @@ -6,7 +6,7 @@ import { } from "../utils.ts"; import express from "express"; import { incrProvisionsThisSecond } from "../intervals.ts"; -import { type Server } from "@chat/shared"; +import { getLowestLoadServer, type Server } from "@chat/shared"; import { lastSpawnedServer, setLastSpawnedServer } from "../state.ts"; const MAX_SOCKET_SPIKE_LOAD = 75_000; @@ -43,7 +43,7 @@ export const provisionServer = async ( res: express.Response, ) => { incrProvisionsThisSecond(); - let server = await getBestCandidateServer(); + let server = await getLowestLoadServer(); if (!server) { res.sendStatus(404); From 9f304c9cf03f40c5a6fb6b1d8badf43b61096c4a Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 10:57:38 -0400 Subject: [PATCH 16/32] pw . --- load-balancer/src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/load-balancer/src/utils.ts b/load-balancer/src/utils.ts index a07a360..5052035 100644 --- a/load-balancer/src/utils.ts +++ b/load-balancer/src/utils.ts @@ -53,7 +53,7 @@ export const shutdown = async () => { await removeServerFromRedis(id); childServerMapElement.process.kill(1); } - await redisClient.del(chatRoomTotalMessagesKey) + await redisClient.del(chatRoomTotalMessagesKey); } catch { redisClient.destroy(); subscriptionClient.destroy(); From e106cca9ac2946e55da7beff2871f8320a07ca52 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 11:03:57 -0400 Subject: [PATCH 17/32] chat room total clients in redis --- chat-server/index.ts | 4 ++++ load-balancer/src/utils.ts | 2 ++ packages/shared/src/index.ts | 3 ++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/chat-server/index.ts b/chat-server/index.ts index 443d0f5..3523fe3 100644 --- a/chat-server/index.ts +++ b/chat-server/index.ts @@ -4,6 +4,7 @@ import { WebSocketServer } from "ws"; import { addServerToRedis, ChatPayload, + chatRoomTotalClientsKey, chatRoomTotalMessagesKey, debugLog, redisRedistributeChannelFactory, @@ -106,8 +107,10 @@ wss.on("connection", async (socket) => { client.on("close", async () => { const chatId = client.chatId; if (chatId) { + void redisClient.zIncrBy(chatRoomTotalClientsKey, -1, chatId); rooms.get(chatId)?.clients.delete(client); } + if (chatId && (rooms.get(chatId)?.clients.size ?? 0) < 1) { debugLog(`unsubscribing from ${chatId}`); rooms.delete(chatId); @@ -175,6 +178,7 @@ const registerSocket = async ( socket: ClientSocket, ) => { const chatChannel = registrationMessage.payload.chatId; + await redisClient.zIncrBy(chatRoomTotalClientsKey, 1, chatChannel); if (rooms.get(chatChannel) === undefined) { incrementChatCount(); diff --git a/load-balancer/src/utils.ts b/load-balancer/src/utils.ts index 5052035..44ff70a 100644 --- a/load-balancer/src/utils.ts +++ b/load-balancer/src/utils.ts @@ -3,6 +3,7 @@ import { terminalUi } from "../terminal-ui.ts"; import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import path from "node:path"; import { + chatRoomTotalClientsKey, chatRoomTotalMessagesKey, debugLog, defaultServerState, @@ -54,6 +55,7 @@ export const shutdown = async () => { childServerMapElement.process.kill(1); } await redisClient.del(chatRoomTotalMessagesKey); + await redisClient.del(chatRoomTotalClientsKey); } catch { redisClient.destroy(); subscriptionClient.destroy(); diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 946adca..31c8d51 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -45,6 +45,7 @@ export const serversHeartbeatKey = "servers:heartbeat"; export const serversSocketWritesPerSecondKey = "servers:swps"; export const serversEventLoopTimeoutKey = "servers:event-loop"; export const chatRoomTotalMessagesKey = "chat-rooms:messages"; +export const chatRoomTotalClientsKey = "chat-rooms:clients"; export const redisRedistributeChannelFactory = (serverId: string) => `${serverId}-${redistributeChannel}`; export const redisServerKeyFactory = (serverId: string) => `server:${serverId}`; @@ -129,7 +130,7 @@ export const getLowestLoadServers = async ( await redisClient.connect(); const ids = await redisClient.zRange( - serversSocketWritesPerSecondKey, + serversClientCountKey, 0, (count ?? 100) - 1, ); From e47f1be9ff1a60f354636ed212dc463766004000 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 11:04:04 -0400 Subject: [PATCH 18/32] fe --- chat-client/src/App.css | 7 +++++ chat-client/src/Monitor.tsx | 32 ++++++++++++-------- load-balancer/src/controllers/redis.stats.ts | 7 +++-- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/chat-client/src/App.css b/chat-client/src/App.css index cd3754c..e2e2df2 100644 --- a/chat-client/src/App.css +++ b/chat-client/src/App.css @@ -161,6 +161,13 @@ min-width: 0; } +.room-clients { + color: #7d9fc5; + font-size: 0.75rem; + margin-left: 0.5rem; + flex-shrink: 0; +} + .room-msgs { color: #3fb950; font-size: 0.75rem; diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index 02e268d..1336449 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -22,7 +22,10 @@ interface RedisStats { ts: number; servers: ServerMetrics[]; totals: { clients: number; chatRooms: number; mps: number }; - chatRooms: { messageCounts: { value: string; score: number }[] }; + chatRooms: { + messageCounts: { value: string; score: number }[]; + clientCounts: { value: string; score: number }[]; + }; } const POLL_MS = 1000; @@ -225,10 +228,6 @@ export function Monitor() { }; }, []); - const sortedRooms = stats - ? [...stats.chatRooms.messageCounts].sort((a, b) => b.score - a.score) - : []; - return (
@@ -285,15 +284,24 @@ export function Monitor() {
chat rooms
- {sortedRooms.length === 0 && ( + {stats.chatRooms.messageCounts.length === 0 && (

no rooms

)} - {sortedRooms.map((room) => ( -
- {room.value} - {room.score} -
- ))} + {[...stats.chatRooms.messageCounts] + .sort((a, b) => b.score - a.score) + .map((mc) => { + const clients = + stats.chatRooms.clientCounts.find( + (c) => c.value === mc.value, + )?.score ?? 0; + return ( +
+ {mc.value} + {clients} + {mc.score} +
+ ); + })}
)} diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index 2615056..f65df0f 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -1,4 +1,4 @@ -// this is all AI generated +// this is 99% AI generated import type { Request, Response } from "express"; import { childServerMap, redisClient } from "../utils.ts"; import { @@ -8,7 +8,7 @@ import { serversHeartbeatKey, serversSocketWritesPerSecondKey, redisServerKeyFactory, - chatRoomTotalMessagesKey, + chatRoomTotalMessagesKey, chatRoomTotalClientsKey, } from "@chat/shared"; export const redisStats = async (_req: Request, res: Response) => { @@ -60,6 +60,8 @@ export const redisStats = async (_req: Request, res: Response) => { -1, ); + const clientCounts = await redisClient.zRangeWithScores(chatRoomTotalClientsKey,1,-1); + res.json({ ts: now, servers, @@ -70,6 +72,7 @@ export const redisStats = async (_req: Request, res: Response) => { }, chatRooms: { messageCounts, + clientCounts, }, }); }; From 269b8b759a086087236990e5b4819bf35a76bf09 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 11:04:15 -0400 Subject: [PATCH 19/32] pw . --- _test-harness/index.ts | 6 +++--- chat-client/src/Monitor.tsx | 5 ++--- load-balancer/src/controllers/redis.stats.ts | 9 +++++++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/_test-harness/index.ts b/_test-harness/index.ts index 6c23c6a..c8275ff 100644 --- a/_test-harness/index.ts +++ b/_test-harness/index.ts @@ -6,11 +6,11 @@ import type { } from "@chat/shared"; const LOAD_BALANCER_URL = "http://localhost:3000"; -const CHATTER_COUNT = 10_000; +const CHATTER_COUNT = 50; const CHAT_ROOM_COUNT = 1; const MESSAGE_INTERVAL_MIN_MS = 5_000; -const MESSAGE_INTERVAL_MAX_MS = 90_000; -const INITIAL_CONNECT_STAGGER_MS = 15; +const MESSAGE_INTERVAL_MAX_MS = 15_000; +const INITIAL_CONNECT_STAGGER_MS = 100; const RECONNECT_DELAY_MS = 100; type ProvisionedServer = { diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index 1336449..030e535 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -291,9 +291,8 @@ export function Monitor() { .sort((a, b) => b.score - a.score) .map((mc) => { const clients = - stats.chatRooms.clientCounts.find( - (c) => c.value === mc.value, - )?.score ?? 0; + stats.chatRooms.clientCounts.find((c) => c.value === mc.value) + ?.score ?? 0; return (
{mc.value} diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index f65df0f..8a38708 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -8,7 +8,8 @@ import { serversHeartbeatKey, serversSocketWritesPerSecondKey, redisServerKeyFactory, - chatRoomTotalMessagesKey, chatRoomTotalClientsKey, + chatRoomTotalMessagesKey, + chatRoomTotalClientsKey, } from "@chat/shared"; export const redisStats = async (_req: Request, res: Response) => { @@ -60,7 +61,11 @@ export const redisStats = async (_req: Request, res: Response) => { -1, ); - const clientCounts = await redisClient.zRangeWithScores(chatRoomTotalClientsKey,1,-1); + const clientCounts = await redisClient.zRangeWithScores( + chatRoomTotalClientsKey, + 1, + -1, + ); res.json({ ts: now, From c18d90a2c1375359d44a63f31b8c631b29ed4d64 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 11:45:53 -0400 Subject: [PATCH 20/32] oops --- load-balancer/src/controllers/redis.stats.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index 8a38708..3cde8bb 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -63,7 +63,7 @@ export const redisStats = async (_req: Request, res: Response) => { const clientCounts = await redisClient.zRangeWithScores( chatRoomTotalClientsKey, - 1, + 0, -1, ); From e0d0882ccd9881e98aa47908f1b403c41f55c311 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 11:49:38 -0400 Subject: [PATCH 21/32] diff --- chat-client/src/Monitor.tsx | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index 030e535..35ccb32 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -228,6 +228,15 @@ export function Monitor() { }; }, []); + const prevMessageCounts = useRef([]); + + useEffect(() => { + if (stats) { + prevMessageCounts.current = stats.chatRooms.messageCounts; + } + }, [stats]); + + return (
@@ -288,16 +297,26 @@ export function Monitor() {

no rooms

)} {[...stats.chatRooms.messageCounts] - .sort((a, b) => b.score - a.score) - .map((mc) => { + .sort( + (a, b) => + (stats.chatRooms.clientCounts.find((c) => c.value === b.value) + ?.score ?? 0) - + (stats.chatRooms.clientCounts.find((c) => c.value === a.value) + ?.score ?? 0), + ) + .map((mc, i) => { + const prev = prevMessageCounts.current.find((p) => p.value === mc.value); + const delta = prev ? mc.score - prev.score : 0; + const clients = stats.chatRooms.clientCounts.find((c) => c.value === mc.value) ?.score ?? 0; + return (
{mc.value} {clients} - {mc.score} + {delta}
); })} From c9300b220d26bf51ae0c6bfca5cfe9b3b06e2f4c Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 12:17:15 -0400 Subject: [PATCH 22/32] pw . --- chat-client/src/Monitor.tsx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index 35ccb32..6d79ccf 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -236,7 +236,6 @@ export function Monitor() { } }, [stats]); - return (
@@ -305,7 +304,9 @@ export function Monitor() { ?.score ?? 0), ) .map((mc, i) => { - const prev = prevMessageCounts.current.find((p) => p.value === mc.value); + const prev = prevMessageCounts.current.find( + (p) => p.value === mc.value, + ); const delta = prev ? mc.score - prev.score : 0; const clients = From 1af26297553766e98d9f3358203d9c9ddc39b61c Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 13:02:10 -0400 Subject: [PATCH 23/32] server chat room client count --- chat-server/index.ts | 11 +++++++++ load-balancer/src/intervals.ts | 43 ++++++++++++++++++++-------------- load-balancer/terminal-ui.ts | 1 - packages/shared/src/index.ts | 13 ++++++++-- 4 files changed, 48 insertions(+), 20 deletions(-) diff --git a/chat-server/index.ts b/chat-server/index.ts index 3523fe3..ec357f6 100644 --- a/chat-server/index.ts +++ b/chat-server/index.ts @@ -7,6 +7,7 @@ import { chatRoomTotalClientsKey, chatRoomTotalMessagesKey, debugLog, + redisChatCountKeyFactory, redisRedistributeChannelFactory, redisServerKeyFactory, RegistrationPayload, @@ -108,6 +109,11 @@ wss.on("connection", async (socket) => { const chatId = client.chatId; if (chatId) { void redisClient.zIncrBy(chatRoomTotalClientsKey, -1, chatId); + void redisClient.hIncrBy( + redisServerKeyFactory(serverId), + redisChatCountKeyFactory(chatId), + -1, + ); rooms.get(chatId)?.clients.delete(client); } @@ -179,6 +185,11 @@ const registerSocket = async ( ) => { const chatChannel = registrationMessage.payload.chatId; await redisClient.zIncrBy(chatRoomTotalClientsKey, 1, chatChannel); + await redisClient.hIncrBy( + redisServerKeyFactory(serverId), + redisChatCountKeyFactory(chatChannel), + 1, + ); if (rooms.get(chatChannel) === undefined) { incrementChatCount(); diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index b3cd91b..26f7d20 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -7,14 +7,14 @@ import { } from "./utils.ts"; import { chatRoomTotalMessagesKey, + type HistoryKey, NumericList, + redisServerKeyFactory, removeServerFromRedis, - serversChatRoomsCountKey, serversClientCountKey, serversEventLoopTimeoutKey, serversHeartbeatKey, serversSocketWritesPerSecondKey, - type ServerState, } from "@chat/shared"; const PPS_SURGE_THRESHOLD = 40; @@ -64,9 +64,9 @@ const purgeBlacklistedServers = () => { }); }; -const updateServerState = ( +const updateServerStateHistoryArray = ( id: string, - key: keyof ServerState, + key: HistoryKey, value: number, ) => { if (!childServerMap.has(id)) { @@ -77,6 +77,12 @@ const updateServerState = ( childServerMap.get(id)?.state[key]?.push(value); }; +const setServerChatRoomState = (id: string, value: Record) => { + if (childServerMap.has(id)) { + childServerMap.get(id)!.state["chatRooms"] = value; + } +}; + export const healthChecks = async () => { // socket writes const socketWrites = await redisClient.zRangeWithScores( @@ -85,7 +91,7 @@ export const healthChecks = async () => { -1, ); socketWrites.forEach(({ value: id, score: writesPerSecond }) => - updateServerState(id, "socketWrites", writesPerSecond), + updateServerStateHistoryArray(id, "socketWrites", writesPerSecond), ); // clients const clients = await redisClient.zRangeWithScores( @@ -94,16 +100,7 @@ export const healthChecks = async () => { -1, ); clients.forEach(({ value: id, score: clients }) => - updateServerState(id, "clients", clients), - ); - // chat rooms - const chatRooms = await redisClient.zRangeWithScores( - serversChatRoomsCountKey, - 0, - -1, - ); - chatRooms.forEach(({ value: id, score: chatRooms }) => - updateServerState(id, "chatRooms", chatRooms), + updateServerStateHistoryArray(id, "clients", clients), ); const timeoutValues = await redisClient.zRangeWithScores( serversEventLoopTimeoutKey, @@ -111,7 +108,7 @@ export const healthChecks = async () => { -1, ); timeoutValues.forEach(({ value: id, score: timeout }) => - updateServerState(id, "timeouts", timeout), + updateServerStateHistoryArray(id, "timeouts", timeout), ); const chatRoomMessages = await redisClient.zRangeWithScores( chatRoomTotalMessagesKey, @@ -119,9 +116,21 @@ export const healthChecks = async () => { -1, ); chatRoomMessages.forEach(({ value: id, score: totalMessages }) => - updateServerState(id, "messages", totalMessages), + updateServerStateHistoryArray(id, "messages", totalMessages), ); + for (const c of [...childServerMap.values()]) { + const redisData = await redisClient.hGetAll(redisServerKeyFactory(c.server.id)); + const chatKeys = Object.keys(redisData).filter((k) => k.includes("chat:")); + const keyValuePairs: Record = {}; + + for (let chatKey of chatKeys) { + keyValuePairs[chatKey.split("chat:")[1]] = Number(redisData[chatKey]); + } + + setServerChatRoomState(c.server.id, keyValuePairs); + } + await detectTimedOutServers(); purgeBlacklistedServers(); diff --git a/load-balancer/terminal-ui.ts b/load-balancer/terminal-ui.ts index 54ead7b..4bf6ed4 100644 --- a/load-balancer/terminal-ui.ts +++ b/load-balancer/terminal-ui.ts @@ -360,7 +360,6 @@ class LoadBalancerTerminalUi { mps: Number(formatNumber(mps)), state: { clients: formatNumberArray(state.clients), - chatRooms: formatNumberArray(state.chatRooms), socketWrites: formatNumberArray(state.socketWrites), timeouts: formatNumberArray(state.timeouts), }, diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 31c8d51..e3b102c 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -23,19 +23,26 @@ export class NumericList extends Array { } } +export type HistoryKey = Exclude< + { + [K in keyof ServerState]: ServerState[K] extends NumericList ? K : never; + }[keyof ServerState], + "chatRooms" +>; + export interface ServerState { clients: NumericList; - chatRooms: NumericList; socketWrites: NumericList; timeouts: NumericList; messages: NumericList; + chatRooms: Record; } export const defaultServerState = (): ServerState => ({ clients: new NumericList(...Array.from({ length: 100 }).map(() => 0)), - chatRooms: new NumericList(...Array.from({ length: 100 }).map(() => 0)), socketWrites: new NumericList(...Array.from({ length: 100 }).map(() => 0)), timeouts: new NumericList(...Array.from({ length: 100 }).map(() => 0)), messages: new NumericList(...Array.from({ length: 100 }).map(() => 0)), + chatRooms: {}, }); export const redistributeChannel = "wss-redistribute"; @@ -49,6 +56,8 @@ export const chatRoomTotalClientsKey = "chat-rooms:clients"; export const redisRedistributeChannelFactory = (serverId: string) => `${serverId}-${redistributeChannel}`; export const redisServerKeyFactory = (serverId: string) => `server:${serverId}`; +export const redisChatCountKeyFactory = (chatChannel: string) => + `chat:${chatChannel}`; export const addServerToRedis = async (server: Server) => { const redisClient = createClient(); From 7731b3b8b3d1f62ea0b3f73b7cad056ca320cfc9 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 13:09:43 -0400 Subject: [PATCH 24/32] use server state map instead of redis for redis endpoint --- load-balancer/src/controllers/redis.stats.ts | 76 +++++++------------- 1 file changed, 24 insertions(+), 52 deletions(-) diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index 3cde8bb..c9d61e2 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -1,13 +1,7 @@ -// this is 99% AI generated import type { Request, Response } from "express"; import { childServerMap, redisClient } from "../utils.ts"; import { - serversChatRoomsCountKey, - serversClientCountKey, - serversEventLoopTimeoutKey, serversHeartbeatKey, - serversSocketWritesPerSecondKey, - redisServerKeyFactory, chatRoomTotalMessagesKey, chatRoomTotalClientsKey, } from "@chat/shared"; @@ -15,58 +9,36 @@ import { export const redisStats = async (_req: Request, res: Response) => { const now = Date.now(); - const [clients, chatRooms, heartbeats, mps, eventLoop] = await Promise.all([ - redisClient.zRangeWithScores(serversClientCountKey, 0, -1), - redisClient.zRangeWithScores(serversChatRoomsCountKey, 0, -1), + const [heartbeats, messageCounts, clientCounts] = await Promise.all([ redisClient.zRangeWithScores(serversHeartbeatKey, 0, -1), - redisClient.zRangeWithScores(serversSocketWritesPerSecondKey, 0, -1), - redisClient.zRangeWithScores(serversEventLoopTimeoutKey, 0, -1), + redisClient.zRangeWithScores(chatRoomTotalMessagesKey, 0, -1), + redisClient.zRangeWithScores(chatRoomTotalClientsKey, 0, -1), ]); - const serverIds = new Set([ - ...clients.map((e) => e.value), - ...heartbeats.map((e) => e.value), - ...mps.map((e) => e.value), - ]); - - const servers = await Promise.all( - [...serverIds].map(async (id) => { - const url = - (await redisClient.hGet(redisServerKeyFactory(id), "url")) ?? null; - const state = childServerMap.get(id)?.state; - return { - id, - url, - clients: clients.find((e) => e.value === id)?.score ?? 0, - chatRooms: chatRooms.find((e) => e.value === id)?.score ?? 0, - mps: mps.find((e) => e.value === id)?.score ?? 0, - eventLoopTimeout: eventLoop.find((e) => e.value === id)?.score ?? 0, - heartbeatAgeMs: - now - (heartbeats.find((e) => e.value === id)?.score ?? 0), - history: { - clients: state?.clients ?? [], - chatRooms: state?.chatRooms ?? [], - socketWrites: state?.socketWrites ?? [], - timeouts: state?.timeouts ?? [], - }, - }; - }), - ); + const heartbeatMap = new Map(heartbeats.map((e) => [e.value, e.score])); + + const servers = [...childServerMap.entries()].map(([id, child]) => { + const { state, server } = child; + const last = state.clients.length - 1; + return { + id, + url: server.url, + clients: state.clients[last] ?? 0, + chatRooms: Object.keys(state.chatRooms).length, + mps: state.socketWrites[last] ?? 0, + eventLoopTimeout: state.timeouts[last] ?? 0, + heartbeatAgeMs: now - (heartbeatMap.get(id) ?? 0), + history: { + clients: state.clients, + chatRooms: state.chatRooms, + socketWrites: state.socketWrites, + timeouts: state.timeouts, + }, + }; + }); servers.sort((a, b) => a.id.localeCompare(b.id)); - const messageCounts = await redisClient.zRangeWithScores( - chatRoomTotalMessagesKey, - 0, - -1, - ); - - const clientCounts = await redisClient.zRangeWithScores( - chatRoomTotalClientsKey, - 0, - -1, - ); - res.json({ ts: now, servers, From 97e4db47d0a31468cef0995e53f20581147ea831 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 13:19:55 -0400 Subject: [PATCH 25/32] show chat room distribution on server row --- chat-client/src/App.css | 29 ++++++++++++++++++++ chat-client/src/Monitor.tsx | 17 ++++++++++-- load-balancer/src/controllers/redis.stats.ts | 7 +++-- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/chat-client/src/App.css b/chat-client/src/App.css index e2e2df2..52a6152 100644 --- a/chat-client/src/App.css +++ b/chat-client/src/App.css @@ -322,6 +322,35 @@ flex-wrap: wrap; } +.server-rooms { + margin-top: 0.5rem; + border-top: 1px solid #2a2a2a; + padding-top: 0.4rem; + display: flex; + flex-direction: column; + gap: 0.1rem; +} + +.server-room-row { + display: flex; + justify-content: space-between; + font-size: 0.75rem; + color: #888; +} + +.server-room-name { + color: #aaa; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; + max-width: 80%; +} + +.server-room-clients { + color: #7d9fc5; + font-variant-numeric: tabular-nums; +} + .graph { display: flex; flex-direction: column; diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index 6d79ccf..c34f50c 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -11,7 +11,7 @@ interface ServerMetrics { id: string; url: string | null; clients: number; - chatRooms: number; + chatRooms: Record; mps: number; eventLoopTimeout: number; heartbeatAgeMs: number; @@ -145,6 +145,18 @@ function ServerCard({ s }: { s: ServerMetrics }) { fmt={(v) => `${fmt(v)}ms`} />
+ {s.chatRooms && Object.keys(s.chatRooms).length > 0 && ( +
+ {Object.entries(s.chatRooms) + .sort(([, a], [, b]) => b - a) + .map(([roomId, count]) => ( +
+ {roomId} + {count} +
+ ))} +
+ )}
); } @@ -229,7 +241,6 @@ export function Monitor() { }, []); const prevMessageCounts = useRef([]); - useEffect(() => { if (stats) { prevMessageCounts.current = stats.chatRooms.messageCounts; @@ -303,7 +314,7 @@ export function Monitor() { (stats.chatRooms.clientCounts.find((c) => c.value === a.value) ?.score ?? 0), ) - .map((mc, i) => { + .map((mc) => { const prev = prevMessageCounts.current.find( (p) => p.value === mc.value, ); diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index c9d61e2..a14bd9b 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -24,7 +24,7 @@ export const redisStats = async (_req: Request, res: Response) => { id, url: server.url, clients: state.clients[last] ?? 0, - chatRooms: Object.keys(state.chatRooms).length, + chatRooms: state.chatRooms, mps: state.socketWrites[last] ?? 0, eventLoopTimeout: state.timeouts[last] ?? 0, heartbeatAgeMs: now - (heartbeatMap.get(id) ?? 0), @@ -44,7 +44,10 @@ export const redisStats = async (_req: Request, res: Response) => { servers, totals: { clients: servers.reduce((s, sv) => s + sv.clients, 0), - chatRooms: servers.reduce((s, sv) => s + sv.chatRooms, 0), + chatRooms: servers.reduce( + (s, sv) => s + Object.values(sv.chatRooms).length, + 0, + ), mps: servers.reduce((s, sv) => s + sv.mps, 0), }, chatRooms: { From 70d1493749316d4395d429b0e1c214c118c8fe74 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 13:19:58 -0400 Subject: [PATCH 26/32] pw --- load-balancer/src/intervals.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index 26f7d20..02de37c 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -120,7 +120,9 @@ export const healthChecks = async () => { ); for (const c of [...childServerMap.values()]) { - const redisData = await redisClient.hGetAll(redisServerKeyFactory(c.server.id)); + const redisData = await redisClient.hGetAll( + redisServerKeyFactory(c.server.id), + ); const chatKeys = Object.keys(redisData).filter((k) => k.includes("chat:")); const keyValuePairs: Record = {}; From b0509231c6d4d46a8621637ef8099a14b096321c Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 13:46:12 -0400 Subject: [PATCH 27/32] chat room client history / socket writes on server state --- load-balancer/src/intervals.ts | 34 ++++++++++++++++++++++++++++++++++ load-balancer/src/state.ts | 3 +++ packages/shared/src/index.ts | 2 ++ 3 files changed, 39 insertions(+) diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index 02de37c..e130ce5 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -16,6 +16,7 @@ import { serversHeartbeatKey, serversSocketWritesPerSecondKey, } from "@chat/shared"; +import { chatRoomClientHistory } from "./state.ts"; const PPS_SURGE_THRESHOLD = 40; @@ -133,6 +134,23 @@ export const healthChecks = async () => { setServerChatRoomState(c.server.id, keyValuePairs); } + for (const c of [...childServerMap.values()]) { + for (let chatRoomKey in c.state.chatRooms) { + const list = chatRoomClientHistory[chatRoomKey]; + const chats = list.deltas()[list.length - 1]; + const clients = c.state.chatRooms[chatRoomKey] + + const socketWrites = chats * clients; + + if (!c.state.chatRoomSocketWrites[chatRoomKey]) { + c.state.chatRoomSocketWrites[chatRoomKey] = new NumericList(...Array.from({length:100}).map(() => 0)) + } + + c.state.chatRoomSocketWrites[chatRoomKey].shift() + c.state.chatRoomSocketWrites[chatRoomKey].push(socketWrites); + } + } + await detectTimedOutServers(); purgeBlacklistedServers(); @@ -177,8 +195,24 @@ const syncTerminalUi = () => { }); }; +const updateState = async () => { + const values = await redisClient.zRangeWithScores( + chatRoomTotalMessagesKey, + 0, + -1, + ); + values.forEach(({ value, score }) => { + if (!chatRoomClientHistory[value]) { + chatRoomClientHistory[value] = new NumericList(...Array.from({length:100}).map(() => 0)); + } + chatRoomClientHistory[value].shift(); + chatRoomClientHistory[value].push(score); + }); +}; + export const startIntervals = () => { setInterval(async () => { + await updateState(); await healthChecks(); syncTerminalUi(); }, 1000); diff --git a/load-balancer/src/state.ts b/load-balancer/src/state.ts index e3ffea4..eecf0bf 100644 --- a/load-balancer/src/state.ts +++ b/load-balancer/src/state.ts @@ -1,3 +1,6 @@ +import { NumericList } from "@chat/shared"; + +export const chatRoomClientHistory: Record = {}; export let lastSpawnedServer = Date.now(); export const setLastSpawnedServer = (time: number) => (lastSpawnedServer = time); diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index e3b102c..82f185a 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -36,6 +36,7 @@ export interface ServerState { timeouts: NumericList; messages: NumericList; chatRooms: Record; + chatRoomSocketWrites: Record; } export const defaultServerState = (): ServerState => ({ clients: new NumericList(...Array.from({ length: 100 }).map(() => 0)), @@ -43,6 +44,7 @@ export const defaultServerState = (): ServerState => ({ timeouts: new NumericList(...Array.from({ length: 100 }).map(() => 0)), messages: new NumericList(...Array.from({ length: 100 }).map(() => 0)), chatRooms: {}, + chatRoomSocketWrites: {}, }); export const redistributeChannel = "wss-redistribute"; From 1d6c19c04a570027e2e93919d6541f91b4765ae1 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 13:49:41 -0400 Subject: [PATCH 28/32] omg --- chat-client/src/App.css | 13 +++++++++++++ chat-client/src/Monitor.tsx | 11 +++++++++++ load-balancer/src/controllers/redis.stats.ts | 13 +++++++++++++ 3 files changed, 37 insertions(+) diff --git a/chat-client/src/App.css b/chat-client/src/App.css index 52a6152..3243863 100644 --- a/chat-client/src/App.css +++ b/chat-client/src/App.css @@ -175,6 +175,13 @@ flex-shrink: 0; } +.room-swps { + color: #bc8cff; + font-size: 0.75rem; + margin-left: 0.5rem; + flex-shrink: 0; +} + .monitor-header { display: flex; align-items: center; @@ -351,6 +358,12 @@ font-variant-numeric: tabular-nums; } +.server-room-swps { + color: #bc8cff; + font-size: 0.75rem; + font-variant-numeric: tabular-nums; +} + .graph { display: flex; flex-direction: column; diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index c34f50c..f8dff21 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -5,6 +5,7 @@ interface ServerHistory { chatRooms: number[]; socketWrites: number[]; timeouts: number[]; + chatRoomSocketWrites: Record; } interface ServerMetrics { @@ -12,6 +13,7 @@ interface ServerMetrics { url: string | null; clients: number; chatRooms: Record; + chatRoomSocketWrites: Record; mps: number; eventLoopTimeout: number; heartbeatAgeMs: number; @@ -153,6 +155,9 @@ function ServerCard({ s }: { s: ServerMetrics }) {
{roomId} {count} + + {s.chatRoomSocketWrites?.[roomId] ?? 0} swps +
))}
@@ -324,11 +329,17 @@ export function Monitor() { stats.chatRooms.clientCounts.find((c) => c.value === mc.value) ?.score ?? 0; + const swps = stats.servers.reduce( + (sum, s) => sum + (s.chatRoomSocketWrites?.[mc.value] ?? 0), + 0, + ); + return (
{mc.value} {clients} {delta} + {swps}
); })} diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index a14bd9b..c0415f3 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -20,11 +20,18 @@ export const redisStats = async (_req: Request, res: Response) => { const servers = [...childServerMap.entries()].map(([id, child]) => { const { state, server } = child; const last = state.clients.length - 1; + const chatRoomSocketWrites = Object.fromEntries( + Object.entries(state.chatRoomSocketWrites).map(([room, list]) => [ + room, + list[list.length - 1] ?? 0, + ]), + ); return { id, url: server.url, clients: state.clients[last] ?? 0, chatRooms: state.chatRooms, + chatRoomSocketWrites, mps: state.socketWrites[last] ?? 0, eventLoopTimeout: state.timeouts[last] ?? 0, heartbeatAgeMs: now - (heartbeatMap.get(id) ?? 0), @@ -33,6 +40,12 @@ export const redisStats = async (_req: Request, res: Response) => { chatRooms: state.chatRooms, socketWrites: state.socketWrites, timeouts: state.timeouts, + chatRoomSocketWrites: Object.fromEntries( + Object.entries(state.chatRoomSocketWrites).map(([room, list]) => [ + room, + [...list], + ]), + ), }, }; }); From e748caecb5850de371032e7a527484e5d05af2f5 Mon Sep 17 00:00:00 2001 From: ryan Date: Sun, 22 Mar 2026 13:59:14 -0400 Subject: [PATCH 29/32] hold up --- load-balancer/index.ts | 4 ++-- load-balancer/src/intervals.ts | 44 +++++++++++++++++----------------- load-balancer/src/utils.ts | 4 ++-- load-balancer/terminal-ui.ts | 2 +- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/load-balancer/index.ts b/load-balancer/index.ts index 814a558..7c67f93 100644 --- a/load-balancer/index.ts +++ b/load-balancer/index.ts @@ -1,10 +1,10 @@ import { app } from "./src/app.ts"; -import { terminalUi } from "./terminal-ui.ts"; +// import { terminalUi } from "./terminal-ui.ts"; import { childServerMap, shutdown, spawnServer } from "./src/utils.ts"; import { startIntervals } from "./src/intervals.ts"; const port = 3000; -terminalUi.setRuntimeInfo({ port, serviceName: "load-balancer" }); +// terminalUi.setRuntimeInfo({ port, serviceName: "load-balancer" }); if (childServerMap.size < 1) { console.log("spawning init"); diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index e130ce5..1628e3d 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -1,4 +1,4 @@ -import { terminalUi } from "../terminal-ui.ts"; +// import { terminalUi } from "../terminal-ui.ts"; import { childServerMap, redisClient, @@ -136,8 +136,8 @@ export const healthChecks = async () => { for (const c of [...childServerMap.values()]) { for (let chatRoomKey in c.state.chatRooms) { - const list = chatRoomClientHistory[chatRoomKey]; - const chats = list.deltas()[list.length - 1]; + const list = chatRoomClientHistory[chatRoomKey] ?? new NumericList(); + const chats = list.deltas()[list.length - 1] ?? 0; const clients = c.state.chatRooms[chatRoomKey] const socketWrites = chats * clients; @@ -175,24 +175,24 @@ const syncTerminalUi = () => { const clientsByServerId = new Map(runtimeState.serverLoads); const mpsByServerId = new Map(runtimeState.serverMps); - terminalUi.setSnapshot({ - blacklistedServers: [...serverBlacklist.entries()].map( - ([serverId, startedAt]) => [ - serverId, - Math.floor((Date.now() - startedAt) / 1000), - ], - ), - childServers: [...childServerMap.entries()].map(([serverId, child]) => ({ - clients: clientsByServerId.get(serverId) ?? 0, - isKilled: child.process.killed, - mps: mpsByServerId.get(serverId) ?? 0, - pid: child.process.pid, - serverId, - state: child.state, - })), - status: "running", - ...runtimeState, - }); + // terminalUi.setSnapshot({ + // blacklistedServers: [...serverBlacklist.entries()].map( + // ([serverId, startedAt]) => [ + // serverId, + // Math.floor((Date.now() - startedAt) / 1000), + // ], + // ), + // childServers: [...childServerMap.entries()].map(([serverId, child]) => ({ + // clients: clientsByServerId.get(serverId) ?? 0, + // isKilled: child.process.killed, + // mps: mpsByServerId.get(serverId) ?? 0, + // pid: child.process.pid, + // serverId, + // state: child.state, + // })), + // status: "running", + // ...runtimeState, + // }); }; const updateState = async () => { @@ -214,6 +214,6 @@ export const startIntervals = () => { setInterval(async () => { await updateState(); await healthChecks(); - syncTerminalUi(); + // syncTerminalUi(); }, 1000); }; diff --git a/load-balancer/src/utils.ts b/load-balancer/src/utils.ts index 44ff70a..6c8f598 100644 --- a/load-balancer/src/utils.ts +++ b/load-balancer/src/utils.ts @@ -1,5 +1,5 @@ import { createClient } from "redis"; -import { terminalUi } from "../terminal-ui.ts"; +// import { terminalUi } from "../terminal-ui.ts"; import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import path from "node:path"; import { @@ -47,7 +47,7 @@ export const runtimeState = { export const shutdown = async () => { try { - terminalUi.destroy(); + // terminalUi.destroy(); await redisClient.quit(); await subscriptionClient.quit(); for (let [id, childServerMapElement] of childServerMap) { diff --git a/load-balancer/terminal-ui.ts b/load-balancer/terminal-ui.ts index 4bf6ed4..e7ed59c 100644 --- a/load-balancer/terminal-ui.ts +++ b/load-balancer/terminal-ui.ts @@ -407,4 +407,4 @@ class LoadBalancerTerminalUi { } } -export const terminalUi = new LoadBalancerTerminalUi(); +// export const terminalUi = new LoadBalancerTerminalUi(); From e2917dd3d32352bea8a7d41f8b75443b4ddadb0c Mon Sep 17 00:00:00 2001 From: Ryan Enns Date: Mon, 23 Mar 2026 08:30:43 -0400 Subject: [PATCH 30/32] (((panick))) --- _test-harness/index.ts | 10 +-- chat-client/src/Monitor.tsx | 17 +++-- chat-server/index.ts | 45 ++++++++++++- chat-server/src/utils.ts | 8 ++- load-balancer/src/controllers/redis.stats.ts | 8 +-- load-balancer/src/intervals.ts | 66 ++++++++++---------- load-balancer/src/utils.ts | 4 +- packages/shared/src/index.ts | 7 ++- 8 files changed, 103 insertions(+), 62 deletions(-) diff --git a/_test-harness/index.ts b/_test-harness/index.ts index c8275ff..b6bb761 100644 --- a/_test-harness/index.ts +++ b/_test-harness/index.ts @@ -6,11 +6,11 @@ import type { } from "@chat/shared"; const LOAD_BALANCER_URL = "http://localhost:3000"; -const CHATTER_COUNT = 50; -const CHAT_ROOM_COUNT = 1; -const MESSAGE_INTERVAL_MIN_MS = 5_000; -const MESSAGE_INTERVAL_MAX_MS = 15_000; -const INITIAL_CONNECT_STAGGER_MS = 100; +const CHATTER_COUNT = 100; +const CHAT_ROOM_COUNT = 7; +const MESSAGE_INTERVAL_MIN_MS = 500; +const MESSAGE_INTERVAL_MAX_MS = 90_000; +const INITIAL_CONNECT_STAGGER_MS = 15; const RECONNECT_DELAY_MS = 100; type ProvisionedServer = { diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index f8dff21..dffc8f2 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -320,26 +320,25 @@ export function Monitor() { ?.score ?? 0), ) .map((mc) => { - const prev = prevMessageCounts.current.find( - (p) => p.value === mc.value, - ); - const delta = prev ? mc.score - prev.score : 0; + console.log(stats.servers); + + const delta = mc.score; const clients = stats.chatRooms.clientCounts.find((c) => c.value === mc.value) ?.score ?? 0; - const swps = stats.servers.reduce( - (sum, s) => sum + (s.chatRoomSocketWrites?.[mc.value] ?? 0), + const msgs = stats.servers.reduce( + (sum, s) => sum + (s.chatRooms?.[mc.value] ?? 0), 0, ); return (
{mc.value} - {clients} - {delta} - {swps} + c:{clients} + m:{msgs} + s:{delta}
); })} diff --git a/chat-server/index.ts b/chat-server/index.ts index ec357f6..53e2065 100644 --- a/chat-server/index.ts +++ b/chat-server/index.ts @@ -5,7 +5,7 @@ import { addServerToRedis, ChatPayload, chatRoomTotalClientsKey, - chatRoomTotalMessagesKey, + chatRoomSocketWritesPerSecondKey, debugLog, redisChatCountKeyFactory, redisRedistributeChannelFactory, @@ -18,6 +18,7 @@ import { serversHeartbeatKey, serversSocketWritesPerSecondKey, type WebSocketMessage, + chatRoomMessagesPerSecondKey, } from "@chat/shared"; import { ClientSocket, @@ -146,6 +147,7 @@ await subscriber.subscribe( ); let socketWritesThisSecond = 0; +const socketWritesPerChannelThisSecond: Record = {}; const updateMetrics = () => { void redisClient.zAdd(serversHeartbeatKey, { score: Date.now(), @@ -156,6 +158,28 @@ const updateMetrics = () => { value: serverId, }); socketWritesThisSecond = 0; + Object.keys(socketWritesPerChannelThisSecond).forEach((key) => { + void redisClient.zAdd(chatRoomSocketWritesPerSecondKey, { + score: socketWritesPerChannelThisSecond[key], + value: key, + }); + socketWritesPerChannelThisSecond[key] = 0; + }); + Object.keys(messagesSentPerChannelThisSecond).forEach((key) => { + // todo -- the obvious problem with this approach is that + // for the n number of servers that are receiving messages + // for a given chat room, all n of them will be trying + // to zAdd simultaneously. This doesn't introduce data + // integrity issues I don't think, seeing as they should + // all produce the same numbers; but boy oh boy is writing + // over the same redis key n times with the same value an + // annoying compromise to have to make. + void redisClient.zAdd(chatRoomMessagesPerSecondKey, { + score: messagesSentPerChannelThisSecond[key], + value: key, + }); + messagesSentPerChannelThisSecond[key] = 0; + }); void redisClient.zAdd(serversChatRoomsCountKey, { score: chatRoomCount, value: serverId, @@ -212,7 +236,18 @@ const registerSocket = async ( room.queue.push(message); if (!room.running) { - flushRoom(room, () => socketWritesThisSecond++); + flushRoom(room, (socket: ClientSocket) => { + socketWritesThisSecond++; + if (!socket.chatId) { + return; + } + + if (!socketWritesPerChannelThisSecond[socket.chatId]) { + socketWritesPerChannelThisSecond[socket.chatId] = 0; + } + + socketWritesPerChannelThisSecond[socket.chatId] += 1; + }); } }); } @@ -222,6 +257,7 @@ const registerSocket = async ( rooms.get(chatChannel)?.clients.add(socket); }; +const messagesSentPerChannelThisSecond: Record = {}; const publishChat = async ( message: WebSocketMessage, socket: ClientSocket, @@ -230,7 +266,10 @@ const publishChat = async ( return; } - void redisClient.zIncrBy(chatRoomTotalMessagesKey, 1, socket.chatId); + if (!messagesSentPerChannelThisSecond[socket.chatId]) { + messagesSentPerChannelThisSecond[socket.chatId] = 0; + } + messagesSentPerChannelThisSecond[socket.chatId] += 1; await redisClient.publish(socket.chatId, JSON.stringify(message.payload)); }; diff --git a/chat-server/src/utils.ts b/chat-server/src/utils.ts index eec814d..6e32770 100644 --- a/chat-server/src/utils.ts +++ b/chat-server/src/utils.ts @@ -1,5 +1,4 @@ import { - debugLog, getLowestLoadServer, RedistributionPayload, Server, @@ -52,7 +51,10 @@ export const redistributeListener = (message: string) => { redistributeBy = Math.floor(Math.max(Number(message) * 0.15, 1)); }; -export const flushRoom = (room: Room, callback: () => void = () => {}) => { +export const flushRoom = ( + room: Room, + callback: (socket: ClientSocket) => void = () => {}, +) => { if (room.queue.length < 1) { room.running = false; @@ -80,7 +82,7 @@ export const flushRoom = (room: Room, callback: () => void = () => {}) => { } else { socket.send(message); } - callback(); + callback(socket); } } diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index c0415f3..e0cb6a0 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -2,7 +2,7 @@ import type { Request, Response } from "express"; import { childServerMap, redisClient } from "../utils.ts"; import { serversHeartbeatKey, - chatRoomTotalMessagesKey, + chatRoomSocketWritesPerSecondKey, chatRoomTotalClientsKey, } from "@chat/shared"; @@ -11,7 +11,7 @@ export const redisStats = async (_req: Request, res: Response) => { const [heartbeats, messageCounts, clientCounts] = await Promise.all([ redisClient.zRangeWithScores(serversHeartbeatKey, 0, -1), - redisClient.zRangeWithScores(chatRoomTotalMessagesKey, 0, -1), + redisClient.zRangeWithScores(chatRoomSocketWritesPerSecondKey, 0, -1), redisClient.zRangeWithScores(chatRoomTotalClientsKey, 0, -1), ]); @@ -30,14 +30,14 @@ export const redisStats = async (_req: Request, res: Response) => { id, url: server.url, clients: state.clients[last] ?? 0, - chatRooms: state.chatRooms, + chatRooms: state.chatRoomMessages, chatRoomSocketWrites, mps: state.socketWrites[last] ?? 0, eventLoopTimeout: state.timeouts[last] ?? 0, heartbeatAgeMs: now - (heartbeatMap.get(id) ?? 0), history: { clients: state.clients, - chatRooms: state.chatRooms, + chatRooms: state.chatRoomMessages, socketWrites: state.socketWrites, timeouts: state.timeouts, chatRoomSocketWrites: Object.fromEntries( diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index 1628e3d..7e1bbca 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -6,7 +6,8 @@ import { serverBlacklist, } from "./utils.ts"; import { - chatRoomTotalMessagesKey, + chatRoomMessagesPerSecondKey, + chatRoomSocketWritesPerSecondKey, type HistoryKey, NumericList, redisServerKeyFactory, @@ -80,7 +81,7 @@ const updateServerStateHistoryArray = ( const setServerChatRoomState = (id: string, value: Record) => { if (childServerMap.has(id)) { - childServerMap.get(id)!.state["chatRooms"] = value; + childServerMap.get(id)!.state["chatRoomMessages"] = value; } }; @@ -111,43 +112,40 @@ export const healthChecks = async () => { timeoutValues.forEach(({ value: id, score: timeout }) => updateServerStateHistoryArray(id, "timeouts", timeout), ); - const chatRoomMessages = await redisClient.zRangeWithScores( - chatRoomTotalMessagesKey, - 0, - -1, - ); - chatRoomMessages.forEach(({ value: id, score: totalMessages }) => - updateServerStateHistoryArray(id, "messages", totalMessages), - ); - for (const c of [...childServerMap.values()]) { - const redisData = await redisClient.hGetAll( - redisServerKeyFactory(c.server.id), - ); - const chatKeys = Object.keys(redisData).filter((k) => k.includes("chat:")); - const keyValuePairs: Record = {}; - - for (let chatKey of chatKeys) { - keyValuePairs[chatKey.split("chat:")[1]] = Number(redisData[chatKey]); - } - - setServerChatRoomState(c.server.id, keyValuePairs); - } + // const redisData = await redisClient.zRangeWithScores( + // chatRoomMessagesPerSecondKey, + // 0, + // -1, + // ); + // redisData.forEach(({value:id,score:messagesPerSecond}) => { + // childServerMap() + // }) + + // for (const c of [...childServerMap.values()]) { + // + // const chatKeys = Object.keys(redisData).filter((k) => k.includes("chat:")); + // const keyValuePairs: Record = {}; + // + // for (let chatKey of chatKeys) { + // keyValuePairs[chatKey.split("chat:")[1]] = Number(redisData[chatKey]); + // } + // + // setServerChatRoomState(c.server.id, keyValuePairs); + // } for (const c of [...childServerMap.values()]) { - for (let chatRoomKey in c.state.chatRooms) { + for (let chatRoomKey in c.state.chatRoomMessages) { const list = chatRoomClientHistory[chatRoomKey] ?? new NumericList(); - const chats = list.deltas()[list.length - 1] ?? 0; - const clients = c.state.chatRooms[chatRoomKey] - - const socketWrites = chats * clients; if (!c.state.chatRoomSocketWrites[chatRoomKey]) { - c.state.chatRoomSocketWrites[chatRoomKey] = new NumericList(...Array.from({length:100}).map(() => 0)) + c.state.chatRoomSocketWrites[chatRoomKey] = new NumericList( + ...Array.from({ length: 100 }).map(() => 0), + ); } - c.state.chatRoomSocketWrites[chatRoomKey].shift() - c.state.chatRoomSocketWrites[chatRoomKey].push(socketWrites); + c.state.chatRoomSocketWrites[chatRoomKey].shift(); + c.state.chatRoomSocketWrites[chatRoomKey].push(list[list.length - 1]); } } @@ -197,13 +195,15 @@ const syncTerminalUi = () => { const updateState = async () => { const values = await redisClient.zRangeWithScores( - chatRoomTotalMessagesKey, + chatRoomSocketWritesPerSecondKey, 0, -1, ); values.forEach(({ value, score }) => { if (!chatRoomClientHistory[value]) { - chatRoomClientHistory[value] = new NumericList(...Array.from({length:100}).map(() => 0)); + chatRoomClientHistory[value] = new NumericList( + ...Array.from({ length: 100 }).map(() => 0), + ); } chatRoomClientHistory[value].shift(); chatRoomClientHistory[value].push(score); diff --git a/load-balancer/src/utils.ts b/load-balancer/src/utils.ts index 6c8f598..5484733 100644 --- a/load-balancer/src/utils.ts +++ b/load-balancer/src/utils.ts @@ -4,7 +4,7 @@ import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import path from "node:path"; import { chatRoomTotalClientsKey, - chatRoomTotalMessagesKey, + chatRoomSocketWritesPerSecondKey, debugLog, defaultServerState, redisServerKeyFactory, @@ -54,7 +54,7 @@ export const shutdown = async () => { await removeServerFromRedis(id); childServerMapElement.process.kill(1); } - await redisClient.del(chatRoomTotalMessagesKey); + await redisClient.del(chatRoomSocketWritesPerSecondKey); await redisClient.del(chatRoomTotalClientsKey); } catch { redisClient.destroy(); diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 82f185a..762ad14 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -35,7 +35,7 @@ export interface ServerState { socketWrites: NumericList; timeouts: NumericList; messages: NumericList; - chatRooms: Record; + chatRoomMessages: Record; chatRoomSocketWrites: Record; } export const defaultServerState = (): ServerState => ({ @@ -43,7 +43,7 @@ export const defaultServerState = (): ServerState => ({ socketWrites: new NumericList(...Array.from({ length: 100 }).map(() => 0)), timeouts: new NumericList(...Array.from({ length: 100 }).map(() => 0)), messages: new NumericList(...Array.from({ length: 100 }).map(() => 0)), - chatRooms: {}, + chatRoomMessages: {}, chatRoomSocketWrites: {}, }); @@ -53,7 +53,8 @@ export const serversChatRoomsCountKey = "servers:chats"; export const serversHeartbeatKey = "servers:heartbeat"; export const serversSocketWritesPerSecondKey = "servers:swps"; export const serversEventLoopTimeoutKey = "servers:event-loop"; -export const chatRoomTotalMessagesKey = "chat-rooms:messages"; +export const chatRoomSocketWritesPerSecondKey = "chat-rooms:socket-writes"; +export const chatRoomMessagesPerSecondKey = "chat-rooms:messages"; export const chatRoomTotalClientsKey = "chat-rooms:clients"; export const redisRedistributeChannelFactory = (serverId: string) => `${serverId}-${redistributeChannel}`; From 8efedf147f743b9493645fcba2b6827e74ecf962 Mon Sep 17 00:00:00 2001 From: Ryan Enns Date: Mon, 23 Mar 2026 08:55:43 -0400 Subject: [PATCH 31/32] store chat room state in load balancer --- load-balancer/src/controllers/redis.stats.ts | 61 ++++++------- load-balancer/src/intervals.ts | 93 ++++++++------------ load-balancer/src/state.ts | 8 +- packages/shared/src/index.ts | 4 - schema.json | 25 ++++++ 5 files changed, 96 insertions(+), 95 deletions(-) create mode 100644 schema.json diff --git a/load-balancer/src/controllers/redis.stats.ts b/load-balancer/src/controllers/redis.stats.ts index e0cb6a0..9c5bb6d 100644 --- a/load-balancer/src/controllers/redis.stats.ts +++ b/load-balancer/src/controllers/redis.stats.ts @@ -1,71 +1,64 @@ import type { Request, Response } from "express"; import { childServerMap, redisClient } from "../utils.ts"; -import { - serversHeartbeatKey, - chatRoomSocketWritesPerSecondKey, - chatRoomTotalClientsKey, -} from "@chat/shared"; +import { serversHeartbeatKey } from "@chat/shared"; +import { chatRooms } from "../state.ts"; export const redisStats = async (_req: Request, res: Response) => { const now = Date.now(); - const [heartbeats, messageCounts, clientCounts] = await Promise.all([ - redisClient.zRangeWithScores(serversHeartbeatKey, 0, -1), - redisClient.zRangeWithScores(chatRoomSocketWritesPerSecondKey, 0, -1), - redisClient.zRangeWithScores(chatRoomTotalClientsKey, 0, -1), - ]); - + const heartbeats = await redisClient.zRangeWithScores( + serversHeartbeatKey, + 0, + -1, + ); const heartbeatMap = new Map(heartbeats.map((e) => [e.value, e.score])); const servers = [...childServerMap.entries()].map(([id, child]) => { const { state, server } = child; const last = state.clients.length - 1; - const chatRoomSocketWrites = Object.fromEntries( - Object.entries(state.chatRoomSocketWrites).map(([room, list]) => [ - room, - list[list.length - 1] ?? 0, - ]), - ); return { id, url: server.url, clients: state.clients[last] ?? 0, - chatRooms: state.chatRoomMessages, - chatRoomSocketWrites, - mps: state.socketWrites[last] ?? 0, + socketWritesPerSecond: state.socketWrites[last] ?? 0, eventLoopTimeout: state.timeouts[last] ?? 0, heartbeatAgeMs: now - (heartbeatMap.get(id) ?? 0), history: { clients: state.clients, - chatRooms: state.chatRoomMessages, socketWrites: state.socketWrites, timeouts: state.timeouts, - chatRoomSocketWrites: Object.fromEntries( - Object.entries(state.chatRoomSocketWrites).map(([room, list]) => [ - room, - [...list], - ]), - ), }, }; }); servers.sort((a, b) => a.id.localeCompare(b.id)); + const chatRoomList = [...chatRooms.entries()].map(([id, room]) => { + const last = room.clients.length - 1; + return { + id, + clients: room.clients[last] ?? 0, + messagesPerSecond: room.messagesPerSecond[last] ?? 0, + socketWritesPerSecond: room.socketWritesPerSecond[last] ?? 0, + history: { + clients: room.clients, + messagesPerSecond: room.messagesPerSecond, + socketWritesPerSecond: room.socketWritesPerSecond, + }, + }; + }); + res.json({ ts: now, servers, + chatRooms: chatRoomList, totals: { clients: servers.reduce((s, sv) => s + sv.clients, 0), - chatRooms: servers.reduce( - (s, sv) => s + Object.values(sv.chatRooms).length, + chatRooms: chatRoomList.length, + socketWritesPerSecond: chatRoomList.reduce( + (s, r) => s + r.socketWritesPerSecond, 0, ), - mps: servers.reduce((s, sv) => s + sv.mps, 0), - }, - chatRooms: { - messageCounts, - clientCounts, }, }); }; diff --git a/load-balancer/src/intervals.ts b/load-balancer/src/intervals.ts index 7e1bbca..e7ce0db 100644 --- a/load-balancer/src/intervals.ts +++ b/load-balancer/src/intervals.ts @@ -8,6 +8,7 @@ import { import { chatRoomMessagesPerSecondKey, chatRoomSocketWritesPerSecondKey, + chatRoomTotalClientsKey, type HistoryKey, NumericList, redisServerKeyFactory, @@ -17,7 +18,7 @@ import { serversHeartbeatKey, serversSocketWritesPerSecondKey, } from "@chat/shared"; -import { chatRoomClientHistory } from "./state.ts"; +import { type ChatRoomState, chatRooms } from "./state.ts"; const PPS_SURGE_THRESHOLD = 40; @@ -79,12 +80,6 @@ const updateServerStateHistoryArray = ( childServerMap.get(id)?.state[key]?.push(value); }; -const setServerChatRoomState = (id: string, value: Record) => { - if (childServerMap.has(id)) { - childServerMap.get(id)!.state["chatRoomMessages"] = value; - } -}; - export const healthChecks = async () => { // socket writes const socketWrites = await redisClient.zRangeWithScores( @@ -113,42 +108,6 @@ export const healthChecks = async () => { updateServerStateHistoryArray(id, "timeouts", timeout), ); - // const redisData = await redisClient.zRangeWithScores( - // chatRoomMessagesPerSecondKey, - // 0, - // -1, - // ); - // redisData.forEach(({value:id,score:messagesPerSecond}) => { - // childServerMap() - // }) - - // for (const c of [...childServerMap.values()]) { - // - // const chatKeys = Object.keys(redisData).filter((k) => k.includes("chat:")); - // const keyValuePairs: Record = {}; - // - // for (let chatKey of chatKeys) { - // keyValuePairs[chatKey.split("chat:")[1]] = Number(redisData[chatKey]); - // } - // - // setServerChatRoomState(c.server.id, keyValuePairs); - // } - - for (const c of [...childServerMap.values()]) { - for (let chatRoomKey in c.state.chatRoomMessages) { - const list = chatRoomClientHistory[chatRoomKey] ?? new NumericList(); - - if (!c.state.chatRoomSocketWrites[chatRoomKey]) { - c.state.chatRoomSocketWrites[chatRoomKey] = new NumericList( - ...Array.from({ length: 100 }).map(() => 0), - ); - } - - c.state.chatRoomSocketWrites[chatRoomKey].shift(); - c.state.chatRoomSocketWrites[chatRoomKey].push(list[list.length - 1]); - } - } - await detectTimedOutServers(); purgeBlacklistedServers(); @@ -193,20 +152,42 @@ const syncTerminalUi = () => { // }); }; +const ensureChatRoom = (id: string): ChatRoomState => { + if (!chatRooms.has(id)) { + const empty = () => + new NumericList(...Array.from({ length: 100 }).map(() => 0)); + chatRooms.set(id, { + clients: empty(), + messagesPerSecond: empty(), + socketWritesPerSecond: empty(), + }); + } + return chatRooms.get(id)!; +}; + const updateState = async () => { - const values = await redisClient.zRangeWithScores( - chatRoomSocketWritesPerSecondKey, - 0, - -1, - ); - values.forEach(({ value, score }) => { - if (!chatRoomClientHistory[value]) { - chatRoomClientHistory[value] = new NumericList( - ...Array.from({ length: 100 }).map(() => 0), - ); - } - chatRoomClientHistory[value].shift(); - chatRoomClientHistory[value].push(score); + const [socketWrites, messages, clients] = await Promise.all([ + redisClient.zRangeWithScores(chatRoomSocketWritesPerSecondKey, 0, -1), + redisClient.zRangeWithScores(chatRoomMessagesPerSecondKey, 0, -1), + redisClient.zRangeWithScores(chatRoomTotalClientsKey, 0, -1), + ]); + + socketWrites.forEach(({ value: id, score }) => { + const room = ensureChatRoom(id); + room.socketWritesPerSecond.shift(); + room.socketWritesPerSecond.push(score); + }); + + messages.forEach(({ value: id, score }) => { + const room = ensureChatRoom(id); + room.messagesPerSecond.shift(); + room.messagesPerSecond.push(score); + }); + + clients.forEach(({ value: id, score }) => { + const room = ensureChatRoom(id); + room.clients.shift(); + room.clients.push(score); }); }; diff --git a/load-balancer/src/state.ts b/load-balancer/src/state.ts index eecf0bf..b202943 100644 --- a/load-balancer/src/state.ts +++ b/load-balancer/src/state.ts @@ -1,6 +1,12 @@ import { NumericList } from "@chat/shared"; -export const chatRoomClientHistory: Record = {}; +export interface ChatRoomState { + clients: NumericList; + messagesPerSecond: NumericList; + socketWritesPerSecond: NumericList; +} + +export const chatRooms: Map = new Map(); export let lastSpawnedServer = Date.now(); export const setLastSpawnedServer = (time: number) => (lastSpawnedServer = time); diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 762ad14..df51f38 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -35,16 +35,12 @@ export interface ServerState { socketWrites: NumericList; timeouts: NumericList; messages: NumericList; - chatRoomMessages: Record; - chatRoomSocketWrites: Record; } export const defaultServerState = (): ServerState => ({ clients: new NumericList(...Array.from({ length: 100 }).map(() => 0)), socketWrites: new NumericList(...Array.from({ length: 100 }).map(() => 0)), timeouts: new NumericList(...Array.from({ length: 100 }).map(() => 0)), messages: new NumericList(...Array.from({ length: 100 }).map(() => 0)), - chatRoomMessages: {}, - chatRoomSocketWrites: {}, }); export const redistributeChannel = "wss-redistribute"; diff --git a/schema.json b/schema.json new file mode 100644 index 0000000..1abe196 --- /dev/null +++ b/schema.json @@ -0,0 +1,25 @@ +{ + "chat_rooms": [ + { + "id": "some-chat", + "clients": 0, + "messages_per_second": 0, + "socket_writes_per_second": 0 + } + ], + "servers": [ + { + "id": "some-server", + "url": "ws://localhost:8080", + "chat_rooms": [ + { + "id": "some-chat", + "clients": 0 + } + ], + "last_heartbeat": 123, + "event_loop_timeout": 2, + "cumulative_swps": 0 + } + ] +} From a83e9b08d2e4b9bcdfca71282c9578868d7cd42c Mon Sep 17 00:00:00 2001 From: Ryan Enns Date: Mon, 23 Mar 2026 09:03:20 -0400 Subject: [PATCH 32/32] client --- chat-client/src/App.css | 11 +++-- chat-client/src/Monitor.tsx | 94 +++++++++++-------------------------- 2 files changed, 35 insertions(+), 70 deletions(-) diff --git a/chat-client/src/App.css b/chat-client/src/App.css index 3243863..56ee6dc 100644 --- a/chat-client/src/App.css +++ b/chat-client/src/App.css @@ -113,7 +113,7 @@ .monitor-body { display: grid; - grid-template-columns: 1fr 220px; + grid-template-columns: 1fr 22rem; gap: 0.75rem; align-items: start; } @@ -164,21 +164,24 @@ .room-clients { color: #7d9fc5; font-size: 0.75rem; - margin-left: 0.5rem; + width: 4rem; + text-align: right; flex-shrink: 0; } .room-msgs { color: #3fb950; font-size: 0.75rem; - margin-left: 0.5rem; + width: 4rem; + text-align: right; flex-shrink: 0; } .room-swps { color: #bc8cff; font-size: 0.75rem; - margin-left: 0.5rem; + width: 4rem; + text-align: right; flex-shrink: 0; } diff --git a/chat-client/src/Monitor.tsx b/chat-client/src/Monitor.tsx index dffc8f2..e5661e4 100644 --- a/chat-client/src/Monitor.tsx +++ b/chat-client/src/Monitor.tsx @@ -2,32 +2,32 @@ import { useEffect, useRef, useState } from "react"; interface ServerHistory { clients: number[]; - chatRooms: number[]; socketWrites: number[]; timeouts: number[]; - chatRoomSocketWrites: Record; } interface ServerMetrics { id: string; url: string | null; clients: number; - chatRooms: Record; - chatRoomSocketWrites: Record; - mps: number; + socketWritesPerSecond: number; eventLoopTimeout: number; heartbeatAgeMs: number; history: ServerHistory; } +interface ChatRoomMetrics { + id: string; + clients: number; + messagesPerSecond: number; + socketWritesPerSecond: number; +} + interface RedisStats { ts: number; servers: ServerMetrics[]; - totals: { clients: number; chatRooms: number; mps: number }; - chatRooms: { - messageCounts: { value: string; score: number }[]; - clientCounts: { value: string; score: number }[]; - }; + chatRooms: ChatRoomMetrics[]; + totals: { clients: number; chatRooms: number; socketWritesPerSecond: number }; } const POLL_MS = 1000; @@ -134,7 +134,7 @@ function ServerCard({ s }: { s: ServerMetrics }) { /> fmt(v)} @@ -147,21 +147,6 @@ function ServerCard({ s }: { s: ServerMetrics }) { fmt={(v) => `${fmt(v)}ms`} />
- {s.chatRooms && Object.keys(s.chatRooms).length > 0 && ( -
- {Object.entries(s.chatRooms) - .sort(([, a], [, b]) => b - a) - .map(([roomId, count]) => ( -
- {roomId} - {count} - - {s.chatRoomSocketWrites?.[roomId] ?? 0} swps - -
- ))} -
- )}
); } @@ -245,13 +230,6 @@ export function Monitor() { }; }, []); - const prevMessageCounts = useRef([]); - useEffect(() => { - if (stats) { - prevMessageCounts.current = stats.chatRooms.messageCounts; - } - }, [stats]); - return (
@@ -270,7 +248,10 @@ export function Monitor() {
- +
chat rooms
- {stats.chatRooms.messageCounts.length === 0 && ( + {stats.chatRooms.length === 0 && (

no rooms

)} - {[...stats.chatRooms.messageCounts] - .sort( - (a, b) => - (stats.chatRooms.clientCounts.find((c) => c.value === b.value) - ?.score ?? 0) - - (stats.chatRooms.clientCounts.find((c) => c.value === a.value) - ?.score ?? 0), - ) - .map((mc) => { - console.log(stats.servers); - - const delta = mc.score; - - const clients = - stats.chatRooms.clientCounts.find((c) => c.value === mc.value) - ?.score ?? 0; - - const msgs = stats.servers.reduce( - (sum, s) => sum + (s.chatRooms?.[mc.value] ?? 0), - 0, - ); - - return ( -
- {mc.value} - c:{clients} - m:{msgs} - s:{delta} -
- ); - })} + {[...stats.chatRooms] + .sort((a, b) => b.clients - a.clients) + .map((room) => ( +
+ {room.id} + c:{room.clients} + m:{room.messagesPerSecond} + + s:{room.socketWritesPerSecond} + +
+ ))}
)}