From 31434ae6654a69984b989cf3e0b599b28ca4d52d Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 29 Jun 2026 17:24:28 +0000 Subject: [PATCH] feat: make chain indexer reorg-safe and idempotent - Added `ProcessedEvent` and `IndexerState` models for reorg safety and idempotency. - Enhanced `applyContractEvent` with idempotency checks and event logging. - Implemented `IndexerWorker` with configurable finality window and reorg detection logic. - Updated configuration and environment examples. - Added comprehensive unit tests for `IndexerWorker`. Co-authored-by: clintjeff2 <119521983+clintjeff2@users.noreply.github.com> --- .env.example | 10 ++ apps/access-api/prisma/schema.prisma | 25 +++ apps/access-api/src/config.ts | 14 ++ apps/access-api/src/index.ts | 19 +++ .../src/services/contractEventHelpers.ts | 41 +++++ .../src/workers/indexerWorker.test.ts | 108 +++++++++++++ apps/access-api/src/workers/indexerWorker.ts | 152 ++++++++++++++++++ 7 files changed, 369 insertions(+) create mode 100644 apps/access-api/src/workers/indexerWorker.test.ts create mode 100644 apps/access-api/src/workers/indexerWorker.ts diff --git a/.env.example b/.env.example index 4848e56..64c3d0d 100644 --- a/.env.example +++ b/.env.example @@ -77,3 +77,13 @@ DATABASE_URL="postgresql://postgres:postgres@localhost:5432/guildpass" # Maximum outbox events processed per worker pass (default: 50) # OUTBOX_WORKER_BATCH_SIZE=50 + +# ============================================================================ +# INDEXER WORKER (Optional - sensible defaults provided) +# ============================================================================ + +# How often the indexer worker polls for new blocks, in milliseconds (default: 5000) +# INDEXER_INTERVAL_MS=5000 + +# Number of blocks to wait for finality before processing (default: 12) +# INDEXER_FINALITY_WINDOW=12 diff --git a/apps/access-api/prisma/schema.prisma b/apps/access-api/prisma/schema.prisma index 9816432..dae0ffb 100644 --- a/apps/access-api/prisma/schema.prisma +++ b/apps/access-api/prisma/schema.prisma @@ -51,6 +51,7 @@ enum MembershipState { model Membership { id String @id @default(cuid()) memberId String @unique + tokenId Int? @unique state MembershipState expiresAt DateTime? renewedAt DateTime? @@ -58,6 +59,7 @@ model Membership { member Member @relation(fields: [memberId], references: [id]) @@index([state, expiresAt]) + @@index([tokenId]) } enum Role { @@ -183,3 +185,26 @@ model OutboxEvent { @@index([nextRetryAt]) @@index([status, nextRetryAt]) } + +// --- Chain Indexer Reorg Safety & Idempotency --- + +model ProcessedEvent { + id String @id @default(uuid()) + transactionHash String + logIndex Int + blockHash String + blockNumber Int + eventType String + processedAt DateTime @default(now()) + + @@unique([transactionHash, logIndex]) + @@index([blockHash]) + @@index([blockNumber]) +} + +model IndexerState { + id String @id @default("singleton") + lastBlockNumber Int + lastBlockHash String + updatedAt DateTime @updatedAt +} diff --git a/apps/access-api/src/config.ts b/apps/access-api/src/config.ts index d1bc817..9ec2a51 100644 --- a/apps/access-api/src/config.ts +++ b/apps/access-api/src/config.ts @@ -62,6 +62,18 @@ const ConfigSchema = z.object({ .positive() .default(50), + // Indexer worker + indexerIntervalMs: z.coerce + .number() + .int() + .positive() + .default(5_000), + indexerFinalityWindow: z.coerce + .number() + .int() + .nonnegative() + .default(12), // e.g., 12 blocks for Ethereum mainnet-like safety + // Rate limiting rateLimitEnabled: z .string() @@ -99,6 +111,8 @@ function validateConfig(): Config { reconciliationIntervalMs: process.env.RECONCILIATION_INTERVAL_MS, outboxWorkerIntervalMs: process.env.OUTBOX_WORKER_INTERVAL_MS, outboxWorkerBatchSize: process.env.OUTBOX_WORKER_BATCH_SIZE, + indexerIntervalMs: process.env.INDEXER_INTERVAL_MS, + indexerFinalityWindow: process.env.INDEXER_FINALITY_WINDOW, rateLimitEnabled: process.env.RATE_LIMIT_ENABLED, rateLimitWindowMs: process.env.RATE_LIMIT_WINDOW_MS, rateLimitDefaultMax: process.env.RATE_LIMIT_DEFAULT_MAX, diff --git a/apps/access-api/src/index.ts b/apps/access-api/src/index.ts index 8610eb6..4a0c3eb 100644 --- a/apps/access-api/src/index.ts +++ b/apps/access-api/src/index.ts @@ -10,6 +10,7 @@ import { config } from './config'; import { disconnectPrisma } from './services/prisma'; import { createReconciliationWorker } from './workers/reconciliationWorker'; import { createOutboxWorker } from './workers/outboxWorker'; +import { createIndexerWorker, ChainProvider } from './workers/indexerWorker'; async function main() { const app = await buildApp(); @@ -25,6 +26,23 @@ async function main() { ); outboxWorker.start(); + // Initialize IndexerWorker if a provider is available + // Note: In a real production environment, you would inject a real RPC-backed ChainProvider here. + // For now, we instantiate it only if explicitly needed or with a mock/stub if desired. + // Example stub for demonstration: + const stubProvider: ChainProvider = { + getLatestBlockNumber: async () => 0, + getBlock: async () => ({ number: 0, hash: '0x0', parentHash: '0x0' }), + getLogs: async () => [], + }; + + const indexerWorker = createIndexerWorker( + stubProvider, + config.indexerIntervalMs, + config.indexerFinalityWindow, + ); + // indexerWorker.start(); // Keep disabled by default until provider is configured + await app.listen({ port: config.port, host: '0.0.0.0' }); console.log( @@ -41,6 +59,7 @@ async function main() { try { worker.stop(); outboxWorker.stop(); + indexerWorker.stop(); await disconnectPrisma(); console.log('✅ Server and database connections closed cleanly.'); process.exit(0); diff --git a/apps/access-api/src/services/contractEventHelpers.ts b/apps/access-api/src/services/contractEventHelpers.ts index f5996e9..bc29a42 100644 --- a/apps/access-api/src/services/contractEventHelpers.ts +++ b/apps/access-api/src/services/contractEventHelpers.ts @@ -21,7 +21,9 @@ export interface DecodedMembershipMintedEvent { communityId: string; expiresAt: number; // unix timestamp blockNumber?: number; + blockHash?: string; transactionHash?: string; + logIndex?: number; } export interface DecodedMembershipRenewedEvent { @@ -29,7 +31,9 @@ export interface DecodedMembershipRenewedEvent { tokenId: number; newExpiresAt: number; // unix timestamp blockNumber?: number; + blockHash?: string; transactionHash?: string; + logIndex?: number; } export interface DecodedMembershipSuspendedEvent { @@ -37,7 +41,9 @@ export interface DecodedMembershipSuspendedEvent { tokenId: number; isSuspended: boolean; blockNumber?: number; + blockHash?: string; transactionHash?: string; + logIndex?: number; } export type DecodedContractEvent = @@ -82,6 +88,23 @@ export async function applyContractEvent( // Access-affecting writes must be atomic. await prisma.$transaction(async (tx) => { + // Idempotency check: If transactionHash and logIndex are provided, check if already processed. + if (event.transactionHash && event.logIndex !== undefined) { + const alreadyProcessed = await tx.processedEvent.findUnique({ + where: { + transactionHash_logIndex: { + transactionHash: event.transactionHash, + logIndex: event.logIndex, + }, + }, + }); + + if (alreadyProcessed) { + // Already processed, skip to maintain idempotency. + return; + } + } + if (event.type === 'MembershipMinted') { const wallet = event.to.toLowerCase(); const expiresAt = new Date(event.expiresAt * 1000); @@ -178,6 +201,24 @@ export async function applyContractEvent( }, }); } + + // Record the event as processed for reorg safety and idempotency. + if ( + event.transactionHash && + event.logIndex !== undefined && + event.blockHash && + event.blockNumber !== undefined + ) { + await tx.processedEvent.create({ + data: { + transactionHash: event.transactionHash, + logIndex: event.logIndex, + blockHash: event.blockHash, + blockNumber: event.blockNumber, + eventType: event.type, + }, + }); + } }); } diff --git a/apps/access-api/src/workers/indexerWorker.test.ts b/apps/access-api/src/workers/indexerWorker.test.ts new file mode 100644 index 0000000..3fe44a3 --- /dev/null +++ b/apps/access-api/src/workers/indexerWorker.test.ts @@ -0,0 +1,108 @@ +import { IndexerWorker, ChainProvider } from './indexerWorker'; +import { DecodedContractEvent } from '../services/contractEventHelpers'; + +describe('IndexerWorker', () => { + let prisma: any; + let provider: jest.Mocked; + let worker: IndexerWorker; + + beforeEach(() => { + prisma = { + indexerState: { + findUnique: jest.fn(), + upsert: jest.fn(), + update: jest.fn(), + }, + processedEvent: { + findUnique: jest.fn(), + create: jest.fn(), + deleteMany: jest.fn(), + }, + $transaction: jest.fn((cb) => cb(prisma)), + wallet: { upsert: jest.fn() }, + community: { upsert: jest.fn() }, + member: { upsert: jest.fn() }, + membership: { upsert: jest.fn(), findFirst: jest.fn(), update: jest.fn() }, + }; + + provider = { + getLatestBlockNumber: jest.fn(), + getBlock: jest.fn(), + getLogs: jest.fn(), + }; + + worker = new IndexerWorker(prisma as any, provider, 5000, 12); + }); + + test('should process blocks and update checkpoint', async () => { + provider.getLatestBlockNumber.mockResolvedValue(100); + prisma.indexerState.findUnique.mockResolvedValue({ + lastBlockNumber: 80, + lastBlockHash: 'hash80', + }); + provider.getBlock.mockImplementation(async (n) => ({ + number: n, + hash: `hash${n}`, + parentHash: `hash${n - 1}`, + })); + provider.getLogs.mockResolvedValue([]); + + await worker.runPass(); + + expect(provider.getLogs).toHaveBeenCalledWith(81, 88); // 100 - 12 = 88 + expect(prisma.indexerState.upsert).toHaveBeenCalledWith(expect.objectContaining({ + create: expect.objectContaining({ lastBlockNumber: 88 }), + })); + }); + + test('should detect reorg and rewind', async () => { + provider.getLatestBlockNumber.mockResolvedValue(100); + prisma.indexerState.findUnique.mockResolvedValue({ + lastBlockNumber: 80, + lastBlockHash: 'hash80-old', + }); + provider.getBlock.mockImplementation(async (n) => ({ + number: n, + hash: `hash${n}`, // Will return hash80 for block 80, which differs from hash80-old + parentHash: `hash${n - 1}`, + })); + + await worker.runPass(); + + expect(prisma.indexerState.update).toHaveBeenCalled(); + // Rewind 80 - 12*2 = 56 + expect(prisma.processedEvent.deleteMany).toHaveBeenCalledWith({ + where: { blockNumber: { gt: 56 } }, + }); + }); + + test('should handle duplicate logs idempotently via applyContractEvent', async () => { + provider.getLatestBlockNumber.mockResolvedValue(100); + prisma.indexerState.findUnique.mockResolvedValue({ + lastBlockNumber: 80, + lastBlockHash: 'hash80', + }); + provider.getBlock.mockResolvedValue({ number: 80, hash: 'hash80', parentHash: 'hash79' }); + + const mockLog: DecodedContractEvent = { + type: 'MembershipMinted', + to: '0x123', + tokenId: 1, + communityId: 'c1', + expiresAt: 1000, + transactionHash: 'tx1', + logIndex: 0, + blockNumber: 81, + blockHash: 'hash81', + }; + provider.getLogs.mockResolvedValue([mockLog]); + + // Simulate already processed + prisma.processedEvent.findUnique.mockResolvedValue({ id: 'existing' }); + + await worker.runPass(); + + // Should NOT call wallet upsert because it's already processed + expect(prisma.wallet.upsert).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/access-api/src/workers/indexerWorker.ts b/apps/access-api/src/workers/indexerWorker.ts new file mode 100644 index 0000000..da5cfb2 --- /dev/null +++ b/apps/access-api/src/workers/indexerWorker.ts @@ -0,0 +1,152 @@ +import { PrismaClient } from '@prisma/client'; +import { applyContractEvent, DecodedContractEvent } from '../services/contractEventHelpers'; +import { getPrisma } from '../services/prisma'; + +export interface BlockInfo { + number: number; + hash: string; + parentHash: string; +} + +export interface ChainProvider { + getLatestBlockNumber(): Promise; + getBlock(blockNumber: number): Promise; + getLogs(fromBlock: number, toBlock: number): Promise; +} + +export class IndexerWorker { + private timer: NodeJS.Timeout | null = null; + private isRunning = false; + + constructor( + private readonly prisma: PrismaClient = getPrisma(), + private readonly provider: ChainProvider, + private readonly intervalMs: number = 5000, + private readonly finalityWindow: number = 12, + ) {} + + start() { + if (this.timer) return; + this.timer = setInterval(() => this.runPass(), this.intervalMs); + console.info(`IndexerWorker started (interval: ${this.intervalMs}ms, finalityWindow: ${this.finalityWindow})`); + } + + stop() { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + } + console.info('IndexerWorker stopped'); + } + + async runPass() { + if (this.isRunning) return; + this.isRunning = true; + + try { + await this.processBlocks(); + } catch (error) { + console.error('IndexerWorker error in runPass:', error); + } finally { + this.isRunning = false; + } + } + + private async processBlocks() { + const latestBlockNumber = await this.provider.getLatestBlockNumber(); + const safeBlockNumber = latestBlockNumber - this.finalityWindow; + + const state = await this.prisma.indexerState.findUnique({ + where: { id: 'singleton' }, + }); + + let currentBlock = state ? state.lastBlockNumber + 1 : safeBlockNumber; + + // If we are already beyond safe block, wait. + if (currentBlock > safeBlockNumber) { + return; + } + + // Reorg Detection + if (state) { + const lastProcessedBlock = await this.provider.getBlock(state.lastBlockNumber); + if (lastProcessedBlock.hash !== state.lastBlockHash) { + console.warn(`REORG DETECTED at block ${state.lastBlockNumber}. Expected ${state.lastBlockHash}, got ${lastProcessedBlock.hash}`); + await this.handleReorg(state.lastBlockNumber); + return; + } + } + + // Process blocks in batches + const toBlock = Math.min(currentBlock + 100, safeBlockNumber); + console.info(`Indexer scanning blocks ${currentBlock} to ${toBlock}`); + + const logs = await this.provider.getLogs(currentBlock, toBlock); + + // Sort logs by block number and log index to ensure ordered application + const sortedLogs = [...logs].sort((a, b) => { + if (a.blockNumber !== b.blockNumber) { + return (a.blockNumber || 0) - (b.blockNumber || 0); + } + return (a.logIndex || 0) - (b.logIndex || 0); + }); + + for (const log of sortedLogs) { + await applyContractEvent(this.prisma, log); + } + + // Update checkpoint + const lastBlock = await this.provider.getBlock(toBlock); + await this.prisma.indexerState.upsert({ + where: { id: 'singleton' }, + update: { + lastBlockNumber: toBlock, + lastBlockHash: lastBlock.hash, + }, + create: { + id: 'singleton', + lastBlockNumber: toBlock, + lastBlockHash: lastBlock.hash, + }, + }); + } + + private async handleReorg(lastProcessedBlockNumber: number) { + // Simple reorg handling: Rewind checkpoint by a safe amount or to last known good block. + // In a real implementation, we might want to find the common ancestor. + // For now, we rewind by the finality window to re-process and let idempotency handle it. + const rewindTo = Math.max(0, lastProcessedBlockNumber - this.finalityWindow * 2); + const block = await this.provider.getBlock(rewindTo); + + await this.prisma.$transaction(async (tx) => { + await tx.indexerState.update({ + where: { id: 'singleton' }, + data: { + lastBlockNumber: rewindTo, + lastBlockHash: block.hash, + }, + }); + + // Optional: Delete processed events after the reorg point to allow re-processing + // Note: applyContractEvent uses transactionHash_logIndex for idempotency, + // but in a reorg, these might be in different blocks or even different hashes. + // If we want to fully re-process, we should remove them from ProcessedEvent. + await tx.processedEvent.deleteMany({ + where: { + blockNumber: { gt: rewindTo }, + }, + }); + }); + + console.info(`Rewound indexer to block ${rewindTo} due to reorg`); + } +} + +export function createIndexerWorker( + provider: ChainProvider, + intervalMs?: number, + finalityWindow?: number, + prisma?: PrismaClient, +) { + return new IndexerWorker(prisma, provider, intervalMs, finalityWindow); +}