Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ TUNNEL_TOKEN=
COLLECTOR_NAME=
STATS_TOPIC_PREFIX=stats
STATS_INTERVAL_MS=30000
STATS_INCLUDE_TOKEN_CLIENT_DETAILS=false
106 changes: 101 additions & 5 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Aedes from 'aedes';
import { createServer } from 'http';
import { WebSocketServer } from 'ws';
import { Duplex } from 'stream';
import os from 'os';
import { verifyAuthToken } from '@michaelhart/meshcore-decoder';
import { getAirportInfo } from 'airport-utils';
import { RateLimiter } from './rate-limiter';
Expand Down Expand Up @@ -52,9 +53,27 @@ const subscriberMaxConnections = new Map<string, number>();
// Track active connections per subscriber username
const subscriberActiveConnections = new Map<string, Set<string>>();
// Collector stats config
const COLLECTOR_NAME = process.env.COLLECTOR_NAME || 'collector';
function resolveCollectorName(): string {
const envCollectorName = (process.env.COLLECTOR_NAME || '').trim();
if (
envCollectorName &&
envCollectorName !== 'set_collector_name_in_.env' &&
envCollectorName !== 'collector-1'
) {
return envCollectorName;
}

if (EXPECTED_AUDIENCE && EXPECTED_AUDIENCE.trim()) {
return EXPECTED_AUDIENCE.trim();
}

return os.hostname();
}

const COLLECTOR_NAME = resolveCollectorName();
const STATS_TOPIC_PREFIX = process.env.STATS_TOPIC_PREFIX || 'stats';
const STATS_INTERVAL_MS = parseInt(process.env.STATS_INTERVAL_MS || '30000', 10);
const STATS_INCLUDE_TOKEN_CLIENT_DETAILS = (process.env.STATS_INCLUDE_TOKEN_CLIENT_DETAILS || 'false').toLowerCase() === 'true';

// Track detailed connected subscriber clients for stats
type ConnectedSubscriber = {
Expand All @@ -68,6 +87,25 @@ type ConnectedSubscriber = {

const connectedSubscribers = new Map<string, ConnectedSubscriber>();

type ConnectedTokenClient = {
clientId: string;
publicKey: string;
connectedAt: number;
ip?: string;
audience?: string;
};

const connectedTokenClients = new Map<string, ConnectedTokenClient>();

const statsCounters = {
messagesReceivedTotal: 0,
messagesReceivedBytesTotal: 0,
meshcoreMessagesReceivedTotal: 0,
messagesReceivedSinceLastStats: 0,
messagesReceivedBytesSinceLastStats: 0,
lastStatsPublishedAt: Date.now(),
};

function getSubscriberRoleName(role: SubscriberRole): string {
if (role === SubscriberRole.ADMIN) return 'admin';
if (role === SubscriberRole.FULL_ACCESS) return 'full_access';
Expand Down Expand Up @@ -96,13 +134,48 @@ function buildCollectorStatsPayload() {
connected_seconds: Math.floor((now - subscriber.connectedAt) / 1000),
}));

return {
const tokenClients = STATS_INCLUDE_TOKEN_CLIENT_DETAILS
? Array.from(connectedTokenClients.values()).map((tokenClient) => ({
client_id: tokenClient.clientId,
public_key_short: tokenClient.publicKey.substring(0, 8),
ip: tokenClient.ip,
audience: tokenClient.audience,
connected_at: new Date(tokenClient.connectedAt).toISOString(),
connected_seconds: Math.floor((now - tokenClient.connectedAt) / 1000),
}))
: undefined;

const secondsSinceLastStats = Math.max(
1,
Math.floor((now - statsCounters.lastStatsPublishedAt) / 1000)
);

const payload = {
collector: COLLECTOR_NAME,
timestamp: new Date(now).toISOString(),

connected_subscriber_count: subscribers.length,
connected_subscribers: subscribers,
excluded: 'token authenticated publisher clients are not included',

connected_token_client_count: connectedTokenClients.size,
...(STATS_INCLUDE_TOKEN_CLIENT_DETAILS ? { connected_token_clients: tokenClients } : {}),

messages_received_total: statsCounters.messagesReceivedTotal,
messages_received_bytes_total: statsCounters.messagesReceivedBytesTotal,
meshcore_messages_received_total: statsCounters.meshcoreMessagesReceivedTotal,

messages_received_since_last_stats: statsCounters.messagesReceivedSinceLastStats,
messages_received_bytes_since_last_stats: statsCounters.messagesReceivedBytesSinceLastStats,
messages_received_per_second: Math.round(
(statsCounters.messagesReceivedSinceLastStats / secondsSinceLastStats) * 100
) / 100,
};

statsCounters.messagesReceivedSinceLastStats = 0;
statsCounters.messagesReceivedBytesSinceLastStats = 0;
statsCounters.lastStatsPublishedAt = now;

return payload;
}

function publishCollectorStats(): void {
Expand Down Expand Up @@ -312,6 +385,14 @@ aedes.authenticate = async (client, username, password, callback) => {
(client as any).publicKey = publicKey;
(client as any).tokenPayload = tokenPayload;
(client as any).clientType = ClientType.PUBLISHER;

connectedTokenClients.set(client.id, {
clientId: client.id,
publicKey,
connectedAt: Date.now(),
ip: (client as any).conn?.clientIP,
audience: tokenPayload.aud,
});

// Mark stream as authenticated
const stream = (client as any).conn;
Expand Down Expand Up @@ -873,15 +954,30 @@ aedes.on('clientDisconnect', (client) => {
}
connectedSubscribers.delete(client.id);
}

if (clientType === ClientType.PUBLISHER) {
connectedTokenClients.delete(client.id);
}
}
});

aedes.on('publish', (packet, client) => {
const payloadBytes = packet.payload ? packet.payload.length : 0;

if (client) {
statsCounters.messagesReceivedTotal++;
statsCounters.messagesReceivedBytesTotal += payloadBytes;
statsCounters.messagesReceivedSinceLastStats++;
statsCounters.messagesReceivedBytesSinceLastStats += payloadBytes;

if (packet.topic.startsWith('meshcore/')) {
statsCounters.meshcoreMessagesReceivedTotal++;
}

const logPrefix = getClientLogPrefix(client);
console.log(`${logPrefix} [PUBLISH] ${packet.topic} (${packet.payload.length} bytes)`);
console.log(`${logPrefix} [PUBLISH] ${packet.topic} (${payloadBytes} bytes)`);
} else {
console.log(`[PUBLISH] Internal -> ${packet.topic} (${packet.payload.length} bytes)`);
console.log(`[PUBLISH] Internal -> ${packet.topic} (${payloadBytes} bytes)`);
}
});

Expand Down
Loading