Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
} from 'n8n-workflow';
import {
PterodactylWebSocketManager,
EventThrottler,
EventBatcher,
WebSocketTokenResponse,
} from '../../shared/websocket';
import { pterodactylApiRequest } from '../../shared/transport';
Expand Down Expand Up @@ -95,48 +95,48 @@ export class PterodactylWebsocketTrigger implements INodeType {
description: 'Whether to include the raw WebSocket message in the output',
},
{
displayName: 'Enable Throttling',
name: 'throttleEnabled',
type: 'boolean',
default: true,
description:
'Whether to throttle high-frequency events to prevent overwhelming the workflow',
displayName: 'Enable Event Batching',
name: 'throttleEnabled',
type: 'boolean',
default: true,
description:
'Whether to batch high-frequency events to reduce workflow executions and improve performance',
},
{
displayName: 'Throttle Interval (ms)',
name: 'throttleInterval',
type: 'number',
default: 100,
displayOptions: {
show: {
throttleEnabled: [true],
},
displayName: 'Batch Interval (ms)',
name: 'throttleInterval',
type: 'number',
default: 100,
displayOptions: {
show: {
throttleEnabled: [true],
},
description: 'Minimum milliseconds between event emissions',
},
description: 'How often to emit batched events (in milliseconds)',
},
{
displayName: 'Throttle Max Burst',
name: 'throttleMaxBurst',
type: 'number',
default: 10,
displayOptions: {
show: {
throttleEnabled: [true],
},
displayName: 'Events Per Batch',
name: 'throttleMaxBurst',
type: 'number',
default: 10,
displayOptions: {
show: {
throttleEnabled: [true],
},
description: 'Maximum events allowed in a burst',
},
description: 'Maximum number of events to include in each batch',
},
{
displayName: 'Discard Excess Events',
name: 'discardExcess',
type: 'boolean',
default: false,
displayOptions: {
show: {
throttleEnabled: [true],
},
displayName: 'Discard Excess Events',
name: 'discardExcess',
type: 'boolean',
default: false,
displayOptions: {
show: {
throttleEnabled: [true],
},
description: 'Whether to discard excess events instead of queuing them',
},
description: 'Whether to discard events exceeding batch size instead of queuing them for the next batch',
},
{
displayName: 'Auto Reconnect',
Expand Down Expand Up @@ -254,10 +254,10 @@ export class PterodactylWebsocketTrigger implements INodeType {
fetchToken,
);

// Initialize throttler if enabled
let throttler: EventThrottler | null = null;
// Initialize batcher if enabled
let batcher: EventBatcher | null = null;
if (throttleEnabled) {
throttler = new EventThrottler({
batcher = new EventBatcher({
interval: throttleInterval,
maxBurst: throttleMaxBurst,
discardExcess,
Expand All @@ -270,43 +270,49 @@ export class PterodactylWebsocketTrigger implements INodeType {
return events.includes(eventName);
};

// Helper function to create item from event data
const createItem = (eventName: string, data: any) => {
// Parse stats JSON string to object for better usability
let processedData = data;
if (
eventName === 'stats' &&
Array.isArray(data) &&
data.length > 0 &&
typeof data[0] === 'string'
) {
try {
processedData = [JSON.parse(data[0])];
} catch (error) {
console.warn('[PterodactylWebsocketTrigger] Failed to parse stats JSON:', error);
// Keep original data if parsing fails
}
}

return {
json: {
event: eventName,
timestamp: new Date().toISOString(),
serverId,
data: processedData,
...(includeRawData && { raw: { event: eventName, args: data } }),
},
};
};

// Set up event handlers
const handleEvent = (eventName: string) => {
return (data: any) => {
if (!shouldEmit(eventName)) return;

// Parse stats JSON string to object for better usability
let processedData = data;
if (
eventName === 'stats' &&
Array.isArray(data) &&
data.length > 0 &&
typeof data[0] === 'string'
) {
try {
processedData = [JSON.parse(data[0])];
} catch (error) {
console.warn('[PterodactylWebsocketTrigger] Failed to parse stats JSON:', error);
// Keep original data if parsing fails
}
}

const item = {
json: {
event: eventName,
timestamp: new Date().toISOString(),
serverId,
data: processedData,
...(includeRawData && { raw: { event: eventName, args: data } }),
},
};

// Emit with or without throttling
if (throttler) {
throttler.add({ event: eventName, args: data }, () =>
this.emit([this.helpers.returnJsonArray([item])]),
);
// Emit with or without batching
if (batcher) {
batcher.add({ event: eventName, args: data }, (events) => {
// Create items for all events in the batch
const items = events.map((evt) => createItem(evt.event, evt.args));
this.emit([this.helpers.returnJsonArray(items)]);
});
} else {
const item = createItem(eventName, data);
this.emit([this.helpers.returnJsonArray([item])]);
}
};
Expand Down Expand Up @@ -415,8 +421,8 @@ export class PterodactylWebsocketTrigger implements INodeType {
// Return close function for cleanup
const closeFunction = async () => {
console.log('[PterodactylWebsocketTrigger] Closing connection');
if (throttler) {
throttler.clear();
if (batcher) {
batcher.clear();
}
wsManager.close();
};
Expand Down
110 changes: 110 additions & 0 deletions shared/websocket/EventBatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { BatchOptions, WebSocketEvent } from './WebSocketTypes';

/**
* Event batcher to batch high-frequency events and reduce workflow executions
*/
export class EventBatcher {
private buffer: WebSocketEvent[] = [];
private lastEmit: number = 0;
private interval: number;
private maxBurst: number;
private discardExcess: boolean;
private emitTimer: NodeJS.Timeout | null = null;
private emitFn: ((events: WebSocketEvent[]) => void) | null = null;

constructor(options: BatchOptions) {
this.interval = options.interval;
this.maxBurst = options.maxBurst;
this.discardExcess = options.discardExcess ?? false;
}

/**
* Add an event to the batcher
* @param event - WebSocket event to batch
* @param emitFn - Function to call when events should be emitted (receives array of events)
*/
add(event: WebSocketEvent, emitFn: (events: WebSocketEvent[]) => void): void {
// Store emit function (should be consistent across calls)
if (!this.emitFn) {
this.emitFn = emitFn;
} else if (this.emitFn !== emitFn) {
console.warn('[EventBatcher] Different emit function provided - using first registered function');
}

// Add event to buffer
if (this.discardExcess && this.buffer.length >= this.maxBurst) {
// Buffer is full - discard excess events
console.log(`[EventBatcher] Discarding event: ${event.event} (buffer full at ${this.buffer.length}/${this.maxBurst})`);
return;
}

this.buffer.push(event);

// Schedule emission if not already scheduled
this.scheduleNextEmit();
}

/**
* Emit accumulated events as a batch
*/
private emitBatch(): void {
if (this.buffer.length === 0 || !this.emitFn) {
return;
}

// Take up to maxBurst events from buffer
const eventsToEmit = this.buffer.splice(0, this.maxBurst);

this.lastEmit = Date.now();
this.emitFn(eventsToEmit);

// Clear the timer since we just emitted
if (this.emitTimer) {
clearTimeout(this.emitTimer);
this.emitTimer = null;
}

// If there are still events in buffer, schedule next emission
if (this.buffer.length > 0) {
this.scheduleNextEmit();
}
}

/**
* Schedule the next batch emission
*/
private scheduleNextEmit(): void {
// Don't schedule if timer already exists
if (this.emitTimer) {
return;
}

const now = Date.now();
const timeSinceLastEmit = now - this.lastEmit;
const delay = Math.max(0, this.interval - timeSinceLastEmit);

this.emitTimer = setTimeout(() => {
this.emitTimer = null;
this.emitBatch();
}, delay);
}

/**
* Clear all buffered events and timers
*/
clear(): void {
this.buffer = [];
if (this.emitTimer) {
clearTimeout(this.emitTimer);
this.emitTimer = null;
}
this.emitFn = null;
}

/**
* Get current buffer length
*/
getQueueLength(): number {
return this.buffer.length;
}
}
Loading
Loading