diff --git a/.env.example b/.env.example index 70022b3..66e0567 100644 --- a/.env.example +++ b/.env.example @@ -61,3 +61,4 @@ TUNNEL_TOKEN= COLLECTOR_NAME= STATS_TOPIC_PREFIX=stats STATS_INTERVAL_MS=30000 +STATS_INCLUDE_TOKEN_CLIENT_DETAILS=false diff --git a/src/server.ts b/src/server.ts index c865451..5af7408 100644 --- a/src/server.ts +++ b/src/server.ts @@ -2,6 +2,7 @@ import Aedes from 'aedes'; import { createServer } from 'http'; import { WebSocketServer } from 'ws'; import { Duplex } from 'stream'; +import os from 'os'; import { verifyAuthToken } from '@michaelhart/meshcore-decoder'; import { getAirportInfo } from 'airport-utils'; import { RateLimiter } from './rate-limiter'; @@ -52,9 +53,27 @@ const subscriberMaxConnections = new Map(); // Track active connections per subscriber username const subscriberActiveConnections = new Map>(); // Collector stats config -const COLLECTOR_NAME = process.env.COLLECTOR_NAME || 'collector'; +function resolveCollectorName(): string { + const envCollectorName = (process.env.COLLECTOR_NAME || '').trim(); + if ( + envCollectorName && + envCollectorName !== 'set_collector_name_in_.env' && + envCollectorName !== 'collector-1' + ) { + return envCollectorName; + } + + if (EXPECTED_AUDIENCE && EXPECTED_AUDIENCE.trim()) { + return EXPECTED_AUDIENCE.trim(); + } + + return os.hostname(); +} + +const COLLECTOR_NAME = resolveCollectorName(); const STATS_TOPIC_PREFIX = process.env.STATS_TOPIC_PREFIX || 'stats'; const STATS_INTERVAL_MS = parseInt(process.env.STATS_INTERVAL_MS || '30000', 10); +const STATS_INCLUDE_TOKEN_CLIENT_DETAILS = (process.env.STATS_INCLUDE_TOKEN_CLIENT_DETAILS || 'false').toLowerCase() === 'true'; // Track detailed connected subscriber clients for stats type ConnectedSubscriber = { @@ -68,6 +87,25 @@ type ConnectedSubscriber = { const connectedSubscribers = new Map(); +type ConnectedTokenClient = { + clientId: string; + publicKey: string; + connectedAt: number; + ip?: string; + audience?: string; +}; + +const connectedTokenClients = new Map(); + +const statsCounters = { + messagesReceivedTotal: 0, + messagesReceivedBytesTotal: 0, + meshcoreMessagesReceivedTotal: 0, + messagesReceivedSinceLastStats: 0, + messagesReceivedBytesSinceLastStats: 0, + lastStatsPublishedAt: Date.now(), +}; + function getSubscriberRoleName(role: SubscriberRole): string { if (role === SubscriberRole.ADMIN) return 'admin'; if (role === SubscriberRole.FULL_ACCESS) return 'full_access'; @@ -96,13 +134,48 @@ function buildCollectorStatsPayload() { connected_seconds: Math.floor((now - subscriber.connectedAt) / 1000), })); - return { + const tokenClients = STATS_INCLUDE_TOKEN_CLIENT_DETAILS + ? Array.from(connectedTokenClients.values()).map((tokenClient) => ({ + client_id: tokenClient.clientId, + public_key_short: tokenClient.publicKey.substring(0, 8), + ip: tokenClient.ip, + audience: tokenClient.audience, + connected_at: new Date(tokenClient.connectedAt).toISOString(), + connected_seconds: Math.floor((now - tokenClient.connectedAt) / 1000), + })) + : undefined; + + const secondsSinceLastStats = Math.max( + 1, + Math.floor((now - statsCounters.lastStatsPublishedAt) / 1000) + ); + + const payload = { collector: COLLECTOR_NAME, timestamp: new Date(now).toISOString(), + connected_subscriber_count: subscribers.length, connected_subscribers: subscribers, - excluded: 'token authenticated publisher clients are not included', + + connected_token_client_count: connectedTokenClients.size, + ...(STATS_INCLUDE_TOKEN_CLIENT_DETAILS ? { connected_token_clients: tokenClients } : {}), + + messages_received_total: statsCounters.messagesReceivedTotal, + messages_received_bytes_total: statsCounters.messagesReceivedBytesTotal, + meshcore_messages_received_total: statsCounters.meshcoreMessagesReceivedTotal, + + messages_received_since_last_stats: statsCounters.messagesReceivedSinceLastStats, + messages_received_bytes_since_last_stats: statsCounters.messagesReceivedBytesSinceLastStats, + messages_received_per_second: Math.round( + (statsCounters.messagesReceivedSinceLastStats / secondsSinceLastStats) * 100 + ) / 100, }; + + statsCounters.messagesReceivedSinceLastStats = 0; + statsCounters.messagesReceivedBytesSinceLastStats = 0; + statsCounters.lastStatsPublishedAt = now; + + return payload; } function publishCollectorStats(): void { @@ -312,6 +385,14 @@ aedes.authenticate = async (client, username, password, callback) => { (client as any).publicKey = publicKey; (client as any).tokenPayload = tokenPayload; (client as any).clientType = ClientType.PUBLISHER; + + connectedTokenClients.set(client.id, { + clientId: client.id, + publicKey, + connectedAt: Date.now(), + ip: (client as any).conn?.clientIP, + audience: tokenPayload.aud, + }); // Mark stream as authenticated const stream = (client as any).conn; @@ -873,15 +954,30 @@ aedes.on('clientDisconnect', (client) => { } connectedSubscribers.delete(client.id); } + + if (clientType === ClientType.PUBLISHER) { + connectedTokenClients.delete(client.id); + } } }); aedes.on('publish', (packet, client) => { + const payloadBytes = packet.payload ? packet.payload.length : 0; + if (client) { + statsCounters.messagesReceivedTotal++; + statsCounters.messagesReceivedBytesTotal += payloadBytes; + statsCounters.messagesReceivedSinceLastStats++; + statsCounters.messagesReceivedBytesSinceLastStats += payloadBytes; + + if (packet.topic.startsWith('meshcore/')) { + statsCounters.meshcoreMessagesReceivedTotal++; + } + const logPrefix = getClientLogPrefix(client); - console.log(`${logPrefix} [PUBLISH] ${packet.topic} (${packet.payload.length} bytes)`); + console.log(`${logPrefix} [PUBLISH] ${packet.topic} (${payloadBytes} bytes)`); } else { - console.log(`[PUBLISH] Internal -> ${packet.topic} (${packet.payload.length} bytes)`); + console.log(`[PUBLISH] Internal -> ${packet.topic} (${payloadBytes} bytes)`); } });