Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,13 @@ DATABASE_URL="postgresql://postgres:postgres@localhost:5432/guildpass"

# Maximum outbox events processed per worker pass (default: 50)
# OUTBOX_WORKER_BATCH_SIZE=50

# ============================================================================
# INDEXER WORKER (Optional - sensible defaults provided)
# ============================================================================

# How often the indexer worker polls for new blocks, in milliseconds (default: 5000)
# INDEXER_INTERVAL_MS=5000

# Number of blocks to wait for finality before processing (default: 12)
# INDEXER_FINALITY_WINDOW=12
25 changes: 25 additions & 0 deletions apps/access-api/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ enum MembershipState {
model Membership {
id String @id @default(cuid())
memberId String @unique
tokenId Int? @unique
state MembershipState
expiresAt DateTime?
renewedAt DateTime?
createdAt DateTime @default(now())
member Member @relation(fields: [memberId], references: [id])

@@index([state, expiresAt])
@@index([tokenId])
}

enum Role {
Expand Down Expand Up @@ -183,3 +185,26 @@ model OutboxEvent {
@@index([nextRetryAt])
@@index([status, nextRetryAt])
}

// --- Chain Indexer Reorg Safety & Idempotency ---

model ProcessedEvent {
id String @id @default(uuid())
transactionHash String
logIndex Int
blockHash String
blockNumber Int
eventType String
processedAt DateTime @default(now())

@@unique([transactionHash, logIndex])
@@index([blockHash])
@@index([blockNumber])
}

model IndexerState {
id String @id @default("singleton")
lastBlockNumber Int
lastBlockHash String
updatedAt DateTime @updatedAt
}
14 changes: 14 additions & 0 deletions apps/access-api/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ const ConfigSchema = z.object({
.positive()
.default(50),

// Indexer worker
indexerIntervalMs: z.coerce
.number()
.int()
.positive()
.default(5_000),
indexerFinalityWindow: z.coerce
.number()
.int()
.nonnegative()
.default(12), // e.g., 12 blocks for Ethereum mainnet-like safety

// Rate limiting
rateLimitEnabled: z
.string()
Expand Down Expand Up @@ -99,6 +111,8 @@ function validateConfig(): Config {
reconciliationIntervalMs: process.env.RECONCILIATION_INTERVAL_MS,
outboxWorkerIntervalMs: process.env.OUTBOX_WORKER_INTERVAL_MS,
outboxWorkerBatchSize: process.env.OUTBOX_WORKER_BATCH_SIZE,
indexerIntervalMs: process.env.INDEXER_INTERVAL_MS,
indexerFinalityWindow: process.env.INDEXER_FINALITY_WINDOW,
rateLimitEnabled: process.env.RATE_LIMIT_ENABLED,
rateLimitWindowMs: process.env.RATE_LIMIT_WINDOW_MS,
rateLimitDefaultMax: process.env.RATE_LIMIT_DEFAULT_MAX,
Expand Down
19 changes: 19 additions & 0 deletions apps/access-api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { config } from './config';
import { disconnectPrisma } from './services/prisma';
import { createReconciliationWorker } from './workers/reconciliationWorker';
import { createOutboxWorker } from './workers/outboxWorker';
import { createIndexerWorker, ChainProvider } from './workers/indexerWorker';

async function main() {
const app = await buildApp();
Expand All @@ -25,6 +26,23 @@ async function main() {
);
outboxWorker.start();

// Initialize IndexerWorker if a provider is available
// Note: In a real production environment, you would inject a real RPC-backed ChainProvider here.
// For now, we instantiate it only if explicitly needed or with a mock/stub if desired.
// Example stub for demonstration:
const stubProvider: ChainProvider = {
getLatestBlockNumber: async () => 0,
getBlock: async () => ({ number: 0, hash: '0x0', parentHash: '0x0' }),
getLogs: async () => [],
};

const indexerWorker = createIndexerWorker(
stubProvider,
config.indexerIntervalMs,
config.indexerFinalityWindow,
);
// indexerWorker.start(); // Keep disabled by default until provider is configured

await app.listen({ port: config.port, host: '0.0.0.0' });

console.log(
Expand All @@ -41,6 +59,7 @@ async function main() {
try {
worker.stop();
outboxWorker.stop();
indexerWorker.stop();
await disconnectPrisma();
console.log('✅ Server and database connections closed cleanly.');
process.exit(0);
Expand Down
41 changes: 41 additions & 0 deletions apps/access-api/src/services/contractEventHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@ export interface DecodedMembershipMintedEvent {
communityId: string;
expiresAt: number; // unix timestamp
blockNumber?: number;
blockHash?: string;
transactionHash?: string;
logIndex?: number;
}

export interface DecodedMembershipRenewedEvent {
type: 'MembershipRenewed';
tokenId: number;
newExpiresAt: number; // unix timestamp
blockNumber?: number;
blockHash?: string;
transactionHash?: string;
logIndex?: number;
}

export interface DecodedMembershipSuspendedEvent {
type: 'MembershipSuspended';
tokenId: number;
isSuspended: boolean;
blockNumber?: number;
blockHash?: string;
transactionHash?: string;
logIndex?: number;
}

export type DecodedContractEvent =
Expand Down Expand Up @@ -82,6 +88,23 @@ export async function applyContractEvent(

// Access-affecting writes must be atomic.
await prisma.$transaction(async (tx) => {
// Idempotency check: If transactionHash and logIndex are provided, check if already processed.
if (event.transactionHash && event.logIndex !== undefined) {
const alreadyProcessed = await tx.processedEvent.findUnique({
where: {
transactionHash_logIndex: {
transactionHash: event.transactionHash,
logIndex: event.logIndex,
},
},
});

if (alreadyProcessed) {
// Already processed, skip to maintain idempotency.
return;
}
}

if (event.type === 'MembershipMinted') {
const wallet = event.to.toLowerCase();
const expiresAt = new Date(event.expiresAt * 1000);
Expand Down Expand Up @@ -178,6 +201,24 @@ export async function applyContractEvent(
},
});
}

// Record the event as processed for reorg safety and idempotency.
if (
event.transactionHash &&
event.logIndex !== undefined &&
event.blockHash &&
event.blockNumber !== undefined
) {
await tx.processedEvent.create({
data: {
transactionHash: event.transactionHash,
logIndex: event.logIndex,
blockHash: event.blockHash,
blockNumber: event.blockNumber,
eventType: event.type,
},
});
}
});
}

Expand Down
108 changes: 108 additions & 0 deletions apps/access-api/src/workers/indexerWorker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { IndexerWorker, ChainProvider } from './indexerWorker';
import { DecodedContractEvent } from '../services/contractEventHelpers';

describe('IndexerWorker', () => {
let prisma: any;
let provider: jest.Mocked<ChainProvider>;
let worker: IndexerWorker;

beforeEach(() => {
prisma = {
indexerState: {
findUnique: jest.fn(),
upsert: jest.fn(),
update: jest.fn(),
},
processedEvent: {
findUnique: jest.fn(),
create: jest.fn(),
deleteMany: jest.fn(),
},
$transaction: jest.fn((cb) => cb(prisma)),
wallet: { upsert: jest.fn() },
community: { upsert: jest.fn() },
member: { upsert: jest.fn() },
membership: { upsert: jest.fn(), findFirst: jest.fn(), update: jest.fn() },
};

provider = {
getLatestBlockNumber: jest.fn(),
getBlock: jest.fn(),
getLogs: jest.fn(),
};

worker = new IndexerWorker(prisma as any, provider, 5000, 12);
});

test('should process blocks and update checkpoint', async () => {
provider.getLatestBlockNumber.mockResolvedValue(100);
prisma.indexerState.findUnique.mockResolvedValue({
lastBlockNumber: 80,
lastBlockHash: 'hash80',
});
provider.getBlock.mockImplementation(async (n) => ({
number: n,
hash: `hash${n}`,
parentHash: `hash${n - 1}`,
}));
provider.getLogs.mockResolvedValue([]);

await worker.runPass();

expect(provider.getLogs).toHaveBeenCalledWith(81, 88); // 100 - 12 = 88
expect(prisma.indexerState.upsert).toHaveBeenCalledWith(expect.objectContaining({
create: expect.objectContaining({ lastBlockNumber: 88 }),
}));
});

test('should detect reorg and rewind', async () => {
provider.getLatestBlockNumber.mockResolvedValue(100);
prisma.indexerState.findUnique.mockResolvedValue({
lastBlockNumber: 80,
lastBlockHash: 'hash80-old',
});
provider.getBlock.mockImplementation(async (n) => ({
number: n,
hash: `hash${n}`, // Will return hash80 for block 80, which differs from hash80-old
parentHash: `hash${n - 1}`,
}));

await worker.runPass();

expect(prisma.indexerState.update).toHaveBeenCalled();
// Rewind 80 - 12*2 = 56
expect(prisma.processedEvent.deleteMany).toHaveBeenCalledWith({
where: { blockNumber: { gt: 56 } },
});
});

test('should handle duplicate logs idempotently via applyContractEvent', async () => {
provider.getLatestBlockNumber.mockResolvedValue(100);
prisma.indexerState.findUnique.mockResolvedValue({
lastBlockNumber: 80,
lastBlockHash: 'hash80',
});
provider.getBlock.mockResolvedValue({ number: 80, hash: 'hash80', parentHash: 'hash79' });

const mockLog: DecodedContractEvent = {
type: 'MembershipMinted',
to: '0x123',
tokenId: 1,
communityId: 'c1',
expiresAt: 1000,
transactionHash: 'tx1',
logIndex: 0,
blockNumber: 81,
blockHash: 'hash81',
};
provider.getLogs.mockResolvedValue([mockLog]);

// Simulate already processed
prisma.processedEvent.findUnique.mockResolvedValue({ id: 'existing' });

await worker.runPass();

// Should NOT call wallet upsert because it's already processed
expect(prisma.wallet.upsert).not.toHaveBeenCalled();
});
});
Loading
Loading