From 2b86b9380e9844260acb278ed04ad1041b06e8a0 Mon Sep 17 00:00:00 2001 From: M2ABRAMSTANK Date: Tue, 20 Jan 2026 16:04:55 -0700 Subject: [PATCH] feat: add swarm computing power tracking (RAM and CPU cores) Track total RAM (GB) and CPU cores across all connected peers in the P2P swarm. Hardware stats are broadcast via heartbeat messages and aggregated for display on the dashboard. Changes: - Add src/utils/hardware.js for local hardware detection (cached at startup) - Update heartbeat messages to include hardware stats (backwards compatible) - Include hardware in signature to prevent tampering - Add hardware validation with realistic bounds (max 2TB RAM, 256 cores) - Store per-peer hardware data with O(1) aggregation via incremental tracking - Expose swarmStats via /api/stats endpoint and SSE stream Co-Authored-By: Claude Opus 4.5 --- server.js | 6 ++++-- src/p2p/messaging.js | 35 ++++++++++++++++++++++++++++------ src/p2p/swarm.js | 17 +++++++++++++---- src/state/peers.js | 42 +++++++++++++++++++++++++++++++++++++++-- src/utils/hardware.js | 19 +++++++++++++++++++ src/web/routes/sse.js | 1 + src/web/routes/stats.js | 1 + 7 files changed, 107 insertions(+), 14 deletions(-) create mode 100644 src/utils/hardware.js diff --git a/server.js b/server.js index 40c9757..379f161 100644 --- a/server.js +++ b/server.js @@ -1,6 +1,7 @@ require('dotenv').config(); const { generateIdentity } = require("./src/core/identity"); +const { getHardwareStats } = require("./src/utils/hardware"); const { PeerManager } = require("./src/state/peers"); const { DiagnosticsManager } = require("./src/state/diagnostics"); const { MessageHandler } = require("./src/p2p/messaging"); @@ -16,7 +17,7 @@ const main = async () => { const diagnostics = new DiagnosticsManager(); const sseManager = new SSEManager(); - peerManager.addOrUpdatePeer(identity.id, peerManager.getSeq()); + peerManager.addOrUpdatePeer(identity.id, peerManager.getSeq(), null, getHardwareStats()); const broadcastUpdate = () => { sseManager.broadcastUpdate({ @@ -27,7 +28,8 @@ const main = async () => { diagnostics: diagnostics.getStats(), chatEnabled: ENABLE_CHAT, mapEnabled: ENABLE_MAP, - peers: peerManager.getPeersWithIps() + peers: peerManager.getPeersWithIps(), + swarmStats: peerManager.getSwarmStats(), }); }; diff --git a/src/p2p/messaging.js b/src/p2p/messaging.js index 4e316fe..f2b6df0 100644 --- a/src/p2p/messaging.js +++ b/src/p2p/messaging.js @@ -68,7 +68,13 @@ class MessageHandler { const key = createPublicKey(id); - if (!verifySignature(`seq:${seq}`, sig, key)) { + // Build signature payload - include hardware if present + const hardware = msg.hardware || null; + const sigPayload = hardware + ? `seq:${seq}:hw:${hardware.ram}:${hardware.cores}` + : `seq:${seq}`; + + if (!verifySignature(sigPayload, sig, key)) { this.diagnostics.increment("invalidSig"); return; } @@ -87,7 +93,7 @@ class MessageHandler { }; const ip = hops === 0 ? getIp(sourceSocket) : null; - const wasNew = this.peerManager.addOrUpdatePeer(id, seq, ip); + const wasNew = this.peerManager.addOrUpdatePeer(id, seq, ip, hardware); if (wasNew) { this.diagnostics.increment("newPeersAdded"); @@ -223,16 +229,33 @@ const validateMessage = (msg) => { if (msgSize > require("../config/constants").MAX_MESSAGE_SIZE) return false; if (msg.type === "HEARTBEAT") { - const allowedFields = ["type", "id", "seq", "hops", "nonce", "sig"]; + const allowedFields = ["type", "id", "seq", "hops", "nonce", "sig", "hardware"]; const fields = Object.keys(msg); - return ( + const baseValid = fields.every((f) => allowedFields.includes(f)) && msg.id && typeof msg.seq === "number" && typeof msg.hops === "number" && msg.nonce && - msg.sig - ); + msg.sig; + + if (!baseValid) return false; + + // Optional hardware validation with realistic sanity bounds + // Max 2TB RAM (high-end servers), max 256 cores (enterprise systems) + if (msg.hardware) { + return ( + typeof msg.hardware === "object" && + typeof msg.hardware.ram === "number" && + typeof msg.hardware.cores === "number" && + msg.hardware.ram >= 0 && + msg.hardware.ram <= 2048 && + msg.hardware.cores >= 1 && + msg.hardware.cores <= 256 + ); + } + + return true; } if (msg.type === "LEAVE") { diff --git a/src/p2p/swarm.js b/src/p2p/swarm.js index 9fbe592..2182dde 100644 --- a/src/p2p/swarm.js +++ b/src/p2p/swarm.js @@ -1,5 +1,6 @@ const Hyperswarm = require("hyperswarm"); const { signMessage } = require("../core/security"); +const { getHardwareStats } = require("../utils/hardware"); const { TOPIC, TOPIC_NAME, @@ -51,17 +52,20 @@ class SwarmManager { socket.connectedAt = Date.now(); + const hardware = getHardwareStats(); + const seq = this.peerManager.getSeq(); const sig = signMessage( - `seq:${this.peerManager.getSeq()}`, + `seq:${seq}:hw:${hardware.ram}:${hardware.cores}`, this.identity.privateKey ); const hello = JSON.stringify({ type: "HEARTBEAT", id: this.identity.id, - seq: this.peerManager.getSeq(), + seq, hops: 0, nonce: this.identity.nonce, sig, + hardware, }); socket.write(hello); this.broadcastFn(); @@ -96,13 +100,17 @@ class SwarmManager { } startHeartbeat() { + const hardware = getHardwareStats(); this.heartbeatInterval = setInterval(() => { const seq = this.peerManager.incrementSeq(); - this.peerManager.addOrUpdatePeer(this.identity.id, seq, null); + this.peerManager.addOrUpdatePeer(this.identity.id, seq, null, hardware); this.messageHandler.bloomFilter.markRelayed(this.identity.id, seq); - const sig = signMessage(`seq:${seq}`, this.identity.privateKey); + const sig = signMessage( + `seq:${seq}:hw:${hardware.ram}:${hardware.cores}`, + this.identity.privateKey + ); const heartbeat = JSON.stringify({ type: "HEARTBEAT", @@ -111,6 +119,7 @@ class SwarmManager { hops: 0, nonce: this.identity.nonce, sig, + hardware, }) + "\n"; for (const socket of this.swarm.connections) { diff --git a/src/state/peers.js b/src/state/peers.js index 155de15..ef340eb 100644 --- a/src/state/peers.js +++ b/src/state/peers.js @@ -7,11 +7,15 @@ class PeerManager { this.seenPeers = new LRUCache(MAX_PEERS); this.uniquePeersHLL = new HyperLogLog(10); this.mySeq = 0; + // Incremental swarm stats tracking + this._swarmTotalRam = 0; + this._swarmTotalCores = 0; + this._peersWithHardware = 0; } - addOrUpdatePeer(id, seq, ip = null) { + addOrUpdatePeer(id, seq, ip = null, hardware = null) { const stored = this.seenPeers.get(id); - + // If we have a stored peer, only update if the new sequence is higher if (stored && seq <= stored.seq) { return false; @@ -22,10 +26,24 @@ class PeerManager { // Track in HyperLogLog for total unique estimation this.uniquePeersHLL.add(id); + // Update incremental swarm stats + if (stored && stored.hardware) { + this._swarmTotalRam -= stored.hardware.ram; + this._swarmTotalCores -= stored.hardware.cores; + this._peersWithHardware--; + } + if (hardware) { + this._swarmTotalRam += hardware.ram; + this._swarmTotalCores += hardware.cores; + this._peersWithHardware++; + } + this.seenPeers.set(id, { seq, lastSeen: Date.now(), ip: ip || (stored ? stored.ip : null), + // Only store hardware if explicitly provided (fixes stale data issue) + hardware: hardware || null, }); return wasNew; @@ -41,6 +59,12 @@ class PeerManager { } removePeer(id) { + const stored = this.seenPeers.get(id); + if (stored && stored.hardware) { + this._swarmTotalRam -= stored.hardware.ram; + this._swarmTotalCores -= stored.hardware.cores; + this._peersWithHardware--; + } return this.seenPeers.delete(id); } @@ -54,6 +78,11 @@ class PeerManager { for (const [id, data] of this.seenPeers.entries()) { if (now - data.lastSeen > PEER_TIMEOUT) { + if (data.hardware) { + this._swarmTotalRam -= data.hardware.ram; + this._swarmTotalCores -= data.hardware.cores; + this._peersWithHardware--; + } this.seenPeers.delete(id); removed++; } else { @@ -92,6 +121,15 @@ class PeerManager { } return peers; } + + getSwarmStats() { + // O(1) - uses incrementally maintained totals + return { + totalRam: Math.round(this._swarmTotalRam * 10) / 10, + totalCores: this._swarmTotalCores, + peersWithHardware: this._peersWithHardware, + }; + } } module.exports = { PeerManager }; diff --git a/src/utils/hardware.js b/src/utils/hardware.js new file mode 100644 index 0000000..6b35860 --- /dev/null +++ b/src/utils/hardware.js @@ -0,0 +1,19 @@ +const os = require("os"); + +/** + * Cached hardware statistics (computed once at module load) + * RAM and CPU cores don't change at runtime, so no need to recompute + * @type {{ ram: number, cores: number }} + */ +const cachedStats = Object.freeze({ + ram: Math.round((os.totalmem() / (1024 ** 3)) * 10) / 10, + cores: os.cpus().length +}); + +/** + * Get local hardware statistics + * @returns {{ ram: number, cores: number }} RAM in GB (1 decimal), logical CPU core count + */ +const getHardwareStats = () => cachedStats; + +module.exports = { getHardwareStats }; diff --git a/src/web/routes/sse.js b/src/web/routes/sse.js index 49f8eea..5d59259 100644 --- a/src/web/routes/sse.js +++ b/src/web/routes/sse.js @@ -21,6 +21,7 @@ const setupSSERoutes = (router, dependencies) => { chatEnabled: ENABLE_CHAT, mapEnabled: ENABLE_MAP, peers: peerManager.getPeersWithIps(), + swarmStats: peerManager.getSwarmStats(), }); res.write(`data: ${data}\n\n`); diff --git a/src/web/routes/stats.js b/src/web/routes/stats.js index b5889a2..de10c17 100644 --- a/src/web/routes/stats.js +++ b/src/web/routes/stats.js @@ -13,6 +13,7 @@ const setupStatsRoutes = (router, dependencies) => { diagnostics: diagnostics.getStats(), chatEnabled: ENABLE_CHAT, peers: peerManager.getPeersWithIps(), + swarmStats: peerManager.getSwarmStats(), }); }); };