From a5c23db6e1f837c6e7e5bf35601c37068fd8cc95 Mon Sep 17 00:00:00 2001 From: CornmeisterNL Date: Sat, 2 May 2026 13:36:41 +0200 Subject: [PATCH 1/2] Refactor collector name resolution and stats handling --- src/server.ts | 111 ++++---------------------------------------------- 1 file changed, 8 insertions(+), 103 deletions(-) diff --git a/src/server.ts b/src/server.ts index 5af7408..ff41515 100644 --- a/src/server.ts +++ b/src/server.ts @@ -2,7 +2,6 @@ import Aedes from 'aedes'; import { createServer } from 'http'; import { WebSocketServer } from 'ws'; import { Duplex } from 'stream'; -import os from 'os'; import { verifyAuthToken } from '@michaelhart/meshcore-decoder'; import { getAirportInfo } from 'airport-utils'; import { RateLimiter } from './rate-limiter'; @@ -53,27 +52,10 @@ const subscriberMaxConnections = new Map(); // Track active connections per subscriber username const subscriberActiveConnections = new Map>(); // Collector stats config -function resolveCollectorName(): string { - const envCollectorName = (process.env.COLLECTOR_NAME || '').trim(); - if ( - envCollectorName && - envCollectorName !== 'set_collector_name_in_.env' && - envCollectorName !== 'collector-1' - ) { - return envCollectorName; - } - - if (EXPECTED_AUDIENCE && EXPECTED_AUDIENCE.trim()) { - return EXPECTED_AUDIENCE.trim(); - } - - return os.hostname(); -} - -const COLLECTOR_NAME = resolveCollectorName(); +const COLLECTOR_NAME = process.env.COLLECTOR_NAME || 'collector'; const STATS_TOPIC_PREFIX = process.env.STATS_TOPIC_PREFIX || 'stats'; const STATS_INTERVAL_MS = parseInt(process.env.STATS_INTERVAL_MS || '30000', 10); -const STATS_INCLUDE_TOKEN_CLIENT_DETAILS = (process.env.STATS_INCLUDE_TOKEN_CLIENT_DETAILS || 'false').toLowerCase() === 'true'; +const STATS_RETAIN = (process.env.STATS_RETAIN || 'true').toLowerCase() === 'true'; // Track detailed connected subscriber clients for stats type ConnectedSubscriber = { @@ -87,25 +69,6 @@ type ConnectedSubscriber = { const connectedSubscribers = new Map(); -type ConnectedTokenClient = { - clientId: string; - publicKey: string; - connectedAt: number; - ip?: string; - audience?: string; -}; - -const connectedTokenClients = new Map(); - -const statsCounters = { - messagesReceivedTotal: 0, - messagesReceivedBytesTotal: 0, - meshcoreMessagesReceivedTotal: 0, - messagesReceivedSinceLastStats: 0, - messagesReceivedBytesSinceLastStats: 0, - lastStatsPublishedAt: Date.now(), -}; - function getSubscriberRoleName(role: SubscriberRole): string { if (role === SubscriberRole.ADMIN) return 'admin'; if (role === SubscriberRole.FULL_ACCESS) return 'full_access'; @@ -134,48 +97,13 @@ function buildCollectorStatsPayload() { connected_seconds: Math.floor((now - subscriber.connectedAt) / 1000), })); - const tokenClients = STATS_INCLUDE_TOKEN_CLIENT_DETAILS - ? Array.from(connectedTokenClients.values()).map((tokenClient) => ({ - client_id: tokenClient.clientId, - public_key_short: tokenClient.publicKey.substring(0, 8), - ip: tokenClient.ip, - audience: tokenClient.audience, - connected_at: new Date(tokenClient.connectedAt).toISOString(), - connected_seconds: Math.floor((now - tokenClient.connectedAt) / 1000), - })) - : undefined; - - const secondsSinceLastStats = Math.max( - 1, - Math.floor((now - statsCounters.lastStatsPublishedAt) / 1000) - ); - - const payload = { + return { collector: COLLECTOR_NAME, timestamp: new Date(now).toISOString(), - connected_subscriber_count: subscribers.length, connected_subscribers: subscribers, - - connected_token_client_count: connectedTokenClients.size, - ...(STATS_INCLUDE_TOKEN_CLIENT_DETAILS ? { connected_token_clients: tokenClients } : {}), - - messages_received_total: statsCounters.messagesReceivedTotal, - messages_received_bytes_total: statsCounters.messagesReceivedBytesTotal, - meshcore_messages_received_total: statsCounters.meshcoreMessagesReceivedTotal, - - messages_received_since_last_stats: statsCounters.messagesReceivedSinceLastStats, - messages_received_bytes_since_last_stats: statsCounters.messagesReceivedBytesSinceLastStats, - messages_received_per_second: Math.round( - (statsCounters.messagesReceivedSinceLastStats / secondsSinceLastStats) * 100 - ) / 100, + excluded: 'token authenticated publisher clients are not included', }; - - statsCounters.messagesReceivedSinceLastStats = 0; - statsCounters.messagesReceivedBytesSinceLastStats = 0; - statsCounters.lastStatsPublishedAt = now; - - return payload; } function publishCollectorStats(): void { @@ -188,7 +116,7 @@ function publishCollectorStats(): void { topic, payload, qos: 0, - retain: true, + retain: STATS_RETAIN, dup: false, }, (err) => { @@ -385,14 +313,6 @@ aedes.authenticate = async (client, username, password, callback) => { (client as any).publicKey = publicKey; (client as any).tokenPayload = tokenPayload; (client as any).clientType = ClientType.PUBLISHER; - - connectedTokenClients.set(client.id, { - clientId: client.id, - publicKey, - connectedAt: Date.now(), - ip: (client as any).conn?.clientIP, - audience: tokenPayload.aud, - }); // Mark stream as authenticated const stream = (client as any).conn; @@ -954,30 +874,15 @@ aedes.on('clientDisconnect', (client) => { } connectedSubscribers.delete(client.id); } - - if (clientType === ClientType.PUBLISHER) { - connectedTokenClients.delete(client.id); - } } }); aedes.on('publish', (packet, client) => { - const payloadBytes = packet.payload ? packet.payload.length : 0; - if (client) { - statsCounters.messagesReceivedTotal++; - statsCounters.messagesReceivedBytesTotal += payloadBytes; - statsCounters.messagesReceivedSinceLastStats++; - statsCounters.messagesReceivedBytesSinceLastStats += payloadBytes; - - if (packet.topic.startsWith('meshcore/')) { - statsCounters.meshcoreMessagesReceivedTotal++; - } - const logPrefix = getClientLogPrefix(client); - console.log(`${logPrefix} [PUBLISH] ${packet.topic} (${payloadBytes} bytes)`); + console.log(`${logPrefix} [PUBLISH] ${packet.topic} (${packet.payload.length} bytes)`); } else { - console.log(`[PUBLISH] Internal -> ${packet.topic} (${payloadBytes} bytes)`); + console.log(`[PUBLISH] Internal -> ${packet.topic} (${packet.payload.length} bytes)`); } }); @@ -1144,7 +1049,7 @@ setInterval(publishCollectorStats, STATS_INTERVAL_MS); setTimeout(publishCollectorStats, 1000); -console.log(`[STATS] Publishing collector stats every ${STATS_INTERVAL_MS}ms to ${getStatsTopic()}`); +console.log(`[STATS] Publishing collector stats every ${STATS_INTERVAL_MS}ms to ${getStatsTopic()} (retain: ${STATS_RETAIN})`); httpServer.listen(WS_PORT, HOST, () => { console.log('╔════════════════════════════════════════════════════════════╗'); console.log('║ MeshCore MQTT Broker (WebSocket) ║'); From cf98f156d35bcba3c4903acf5a58bfb870b7ab35 Mon Sep 17 00:00:00 2001 From: CornmeisterNL Date: Sat, 2 May 2026 13:37:10 +0200 Subject: [PATCH 2/2] Add STATS_RETAIN variable to .env.example --- .env.example | 1 + 1 file changed, 1 insertion(+) diff --git a/.env.example b/.env.example index 66e0567..b6a4490 100644 --- a/.env.example +++ b/.env.example @@ -62,3 +62,4 @@ COLLECTOR_NAME= STATS_TOPIC_PREFIX=stats STATS_INTERVAL_MS=30000 STATS_INCLUDE_TOKEN_CLIENT_DETAILS=false +STATS_RETAIN=true