Skip to content
15 changes: 15 additions & 0 deletions contract/contracts/hello-world/src/tests/autoshare_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,21 @@ fn test_is_group_active_on_nonexistent_group_fails() {
client.is_group_active(&id);
}

#[test]
fn test_create_group_with_zero_usages_fails() {
let test_env = setup_test_env();
let client = AutoShareContractClient::new(&test_env.env, &test_env.autoshare_contract);

let creator = test_env.users.get(0).unwrap().clone();
let token = test_env.mock_tokens.get(0).unwrap().clone();
let id = BytesN::from_array(&test_env.env, &[1u8; 32]);
let name = String::from_str(&test_env.env, "Zero Usages");

crate::test_utils::mint_tokens(&test_env.env, &token, &creator, 10_000_000);
let result = client.try_create(&id, &name, &creator, &0u32, &token);
assert!(result.is_err(), "Creating group with 0 usages should fail");
}

#[test]
fn test_get_all_groups_includes_inactive() {
let test_env = setup_test_env();
Expand Down
10 changes: 0 additions & 10 deletions listener/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions listener/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ async function main() {
logger.info('Initializing database');
const db = await initializeDatabase(config.databasePath);

// Initialize deduplication service
deduplicationService = new EventDeduplicationService(db);

// Rebuild registry with configured event TTL
if (config.cleanup) {
(eventRegistry as any).ttlMs = config.cleanup.eventRetentionMs;
Expand Down
40 changes: 40 additions & 0 deletions listener/src/services/event-processing-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ export class EventProcessingQueue {
private readonly processor: EventProcessor;
private timer: ReturnType<typeof setInterval> | null = null;

// Metrics
private metrics = {
totalEnqueued: 0,
totalProcessed: 0,
totalSucceeded: 0,
totalFailed: 0,
processingTimes: [] as number[],
};

constructor(processor: EventProcessor, options?: EventProcessingQueueOptions) {
this.processor = processor;
this.maxConcurrency = Math.max(1, options?.maxConcurrency ?? DEFAULTS.maxConcurrency);
Expand Down Expand Up @@ -88,6 +97,7 @@ export class EventProcessingQueue {
nextRetryAt,
fingerprint,
});
this.metrics.totalEnqueued++;

return true;
}
Expand Down Expand Up @@ -150,13 +160,18 @@ export class EventProcessingQueue {

private async processItem(item: QueuedEvent): Promise<void> {
this.activeFingerprints.add(item.fingerprint);
const startTime = Date.now();

try {
const success = await this.processor(item.event, item.contractConfig, item.requestId);
const duration = Date.now() - startTime;

if (success) {
this.queuedFingerprints.delete(item.fingerprint);
this.activeFingerprints.delete(item.fingerprint);
this.metrics.totalProcessed++;
this.metrics.totalSucceeded++;
this.metrics.processingTimes.push(duration);
logger.info('Event processing succeeded', {
requestId: item.requestId,
eventId: item.event.id,
Expand All @@ -170,6 +185,9 @@ export class EventProcessingQueue {
if (attempt >= this.maxRetries) {
this.queuedFingerprints.delete(item.fingerprint);
this.activeFingerprints.delete(item.fingerprint);
this.metrics.totalProcessed++;
this.metrics.totalFailed++;
this.metrics.processingTimes.push(duration);
logger.error('Event processing permanently failed after max retries', {
requestId: item.requestId,
eventId: item.event.id,
Expand All @@ -195,11 +213,15 @@ export class EventProcessingQueue {
this.queue.push({ ...item, retryCount: attempt, nextRetryAt });
} catch (error) {
this.activeFingerprints.delete(item.fingerprint);
const duration = Date.now() - startTime;

const attempt = item.retryCount + 1;

if (attempt >= this.maxRetries) {
this.queuedFingerprints.delete(item.fingerprint);
this.metrics.totalProcessed++;
this.metrics.totalFailed++;
this.metrics.processingTimes.push(duration);
logger.error('Event processing crashed after max retries', {
requestId: item.requestId,
eventId: item.event.id,
Expand All @@ -226,6 +248,24 @@ export class EventProcessingQueue {
}
}

getMetrics() {
const times = this.metrics.processingTimes;
const avg = times.length > 0 ? times.reduce((a, b) => a + b, 0) / times.length : 0;
const min = times.length > 0 ? Math.min(...times) : 0;
const max = times.length > 0 ? Math.max(...times) : 0;

return {
queueSize: this.queue.length,
activeCount: this.activeFingerprints.size,
...this.metrics,
processingTime: {
min,
max,
avg,
},
};
}

private calculateDelay(retryCount: number): number {
return this.baseDelayMs * Math.pow(2, retryCount);
}
Expand Down
7 changes: 7 additions & 0 deletions listener/src/services/event-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,11 @@ export class EventSubscriber {
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

getQueueMetrics() {
return {
eventQueue: this.eventQueue?.getMetrics() || null,
retryQueue: this.retryQueue?.getMetrics() || null,
};
}
}
36 changes: 35 additions & 1 deletion listener/src/services/notification-retry-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ export class NotificationRetryQueue {
private readonly notificationFn: NotificationFn;
private readonly analytics: NotificationAnalyticsAggregator | null;

// Metrics
private metrics = {
totalEnqueued: 0,
totalProcessed: 0,
totalSucceeded: 0,
totalFailed: 0,
processingTimes: [] as number[],
};

constructor(notificationFn: NotificationFn, options?: RetryQueueOptions) {
this.notificationFn = notificationFn;
this.baseDelayMs = options?.baseDelayMs ?? DEFAULTS.baseDelayMs;
Expand Down Expand Up @@ -88,6 +97,7 @@ export class NotificationRetryQueue {

this.queuedFingerprints.add(fingerprint);
this.queue.push({ event, contractConfig, retryCount: 0, nextRetryAt, requestId });
this.metrics.totalEnqueued++;
}

start(): void {
Expand Down Expand Up @@ -142,9 +152,13 @@ export class NotificationRetryQueue {
});

const success = await this.notificationFn(item.event, item.contractConfig, item.requestId);
const duration = Date.now() - retryStart;

if (success) {
this.queuedFingerprints.delete(fingerprint);
this.metrics.totalProcessed++;
this.metrics.totalSucceeded++;
this.metrics.processingTimes.push(duration);
logger.info('Retry succeeded', {
requestId: item.requestId,
eventId: item.event.id,
Expand All @@ -156,11 +170,14 @@ export class NotificationRetryQueue {

if (attempt >= this.maxRetries) {
this.queuedFingerprints.delete(fingerprint);
this.metrics.totalProcessed++;
this.metrics.totalFailed++;
this.metrics.processingTimes.push(duration);
this.analytics?.record({
notificationType: NotificationType.DISCORD,
contractAddress: item.contractConfig.address,
outcome: 'failure',
durationMs: Date.now() - retryStart,
durationMs: duration,
errorReason: `exhausted ${this.maxRetries} retries`,
timestamp: Date.now(),
});
Expand Down Expand Up @@ -188,6 +205,23 @@ export class NotificationRetryQueue {
this.queue.push({ ...item, retryCount: attempt, nextRetryAt });
}

getMetrics() {
const times = this.metrics.processingTimes;
const avg = times.length > 0 ? times.reduce((a, b) => a + b, 0) / times.length : 0;
const min = times.length > 0 ? Math.min(...times) : 0;
const max = times.length > 0 ? Math.max(...times) : 0;

return {
queueSize: this.queue.length,
...this.metrics,
processingTime: {
min,
max,
avg,
},
};
}

private calculateDelay(retryCount: number): number {
const base = this.baseDelayMs * Math.pow(this.multiplier, retryCount);
return this.jitter ? base * (0.5 + Math.random() * 0.5) : base;
Expand Down
Loading