Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ REDIS_PASSWORD=
# Stellar Horizon
STELLAR_HORIZON_URL=https://horizon.stellar.org

# Soroban event indexer
SOROBAN_RPC_URL=https://soroban-rpc.mainnet.stellar.gateway.fm
SMARTDROP_CONTRACT_ID=
INDEXER_ENABLED=true
INDEXER_POLL_INTERVAL_MS=5000
INDEXER_POLL_LIMIT=100
INDEXER_START_LEDGER=0

# Stellar USDC Issuer
USDC_ISSUER=GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335AX2OBFLDTQLNUEHRGPTM6RIA

Expand Down
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ Multi-source price oracle that fetches and caches USD prices for Stellar assets.
- Price anomaly logging (>10% changes)
- Fallback chain: DEX → CoinGecko → CoinMarketCap → cached

### Soroban Event Indexer

Polls Soroban RPC for SmartDrop contract events and stores decoded event state in Redis so the API can answer claim-status queries without live RPC calls on every request.

**Indexed events:**
- `airdrop_created`
- `recipient_added`
- `token_claimed`
- `airdrop_expired`

**Features:**
- Configurable contract ID, RPC URL, poll interval, poll limit, and start ledger
- Last indexed ledger checkpoint persisted in Redis
- Raw XDR and decoded event data retained for each indexed event
- Aggregated airdrop status, recipient lists, recipient claim history, and indexer status endpoints
- RPC errors are logged and the poller continues on the next interval

## Setup

### Prerequisites
Expand Down Expand Up @@ -85,6 +102,12 @@ cp .env.example .env
| `REDIS_PORT` | Redis server port | 6379 | No |
| `REDIS_PASSWORD` | Redis password | undefined | No |
| `STELLAR_HORIZON_URL` | Horizon API URL | https://horizon.stellar.org | No |
| `SOROBAN_RPC_URL` | Soroban RPC URL for contract event polling | https://soroban-rpc.mainnet.stellar.gateway.fm | No |
| `SMARTDROP_CONTRACT_ID` | SmartDrop contract ID to index | undefined | Yes, for indexer |
| `INDEXER_ENABLED` | Enable Soroban event polling | true | No |
| `INDEXER_POLL_INTERVAL_MS` | Soroban event polling interval in milliseconds | 5000 | No |
| `INDEXER_POLL_LIMIT` | Maximum events requested per poll | 100 | No |
| `INDEXER_START_LEDGER` | First ledger to scan when no checkpoint exists | 0 | No |
| `USDC_ISSUER` | USDC issuer address | GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335AX2OBFLDTQLNUEHRGPTM6RIA | No |
| `COINGECKO_API_KEY` | CoinGecko API key | undefined | No |
| `COINMARKETCAP_API_KEY` | CoinMarketCap API key | undefined | No |
Expand Down Expand Up @@ -148,6 +171,15 @@ GET /health
}
```

### Indexed Airdrop Data

```
GET /api/v1/airdrops/:id/status
GET /api/v1/airdrops/:id/recipients
GET /api/v1/recipients/:address/claims
GET /api/v1/indexer/status
```

## Usage Examples

### Fetch XLM Price
Expand Down
8 changes: 8 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@ module.exports = {
},
stellar: {
horizonUrl: process.env.STELLAR_HORIZON_URL || 'https://horizon.stellar.org',
sorobanRpcUrl: process.env.SOROBAN_RPC_URL || 'https://soroban-rpc.mainnet.stellar.gateway.fm',
usdcIssuer: process.env.USDC_ISSUER || 'GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335AX2OBFLDTQLNUEHRGPTM6RIA',
},
indexer: {
enabled: process.env.INDEXER_ENABLED !== 'false',
contractId: process.env.SMARTDROP_CONTRACT_ID || '',
pollIntervalMs: parseInt(process.env.INDEXER_POLL_INTERVAL_MS, 10) || 5000,
pollLimit: parseInt(process.env.INDEXER_POLL_LIMIT, 10) || 100,
startLedger: parseInt(process.env.INDEXER_START_LEDGER, 10) || 0,
},
coingecko: {
apiKey: process.env.COINGECKO_API_KEY || '',
baseUrl: 'https://api.coingecko.com/api/v3',
Expand Down
6 changes: 6 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const priceRefreshJob = require('./jobs/priceRefresh');
const buildCorsMiddleware = require('./middleware/cors');
const pricesRouter = require('./routes/prices');
const alertsRouter = require('./routes/alerts');
const indexerRouter = require('./routes/indexer');
const indexerPoller = require('./indexer/runtime');

const app = express();

Expand All @@ -26,6 +28,7 @@ app.get('/health', (req, res) => {

app.use('/api/v1', pricesRouter);
app.use('/api/v1', alertsRouter);
app.use('/api/v1', indexerRouter);

app.use((err, req, res, _next) => {
const status = err.status || 500;
Expand All @@ -36,11 +39,13 @@ app.use((err, req, res, _next) => {
const server = app.listen(config.port, () => {
logger.info(`SmartDrop backend running on port ${config.port}`);
priceRefreshJob.start();
indexerPoller.start();
});

process.on('SIGTERM', async () => {
logger.info('SIGTERM received, shutting down');
priceRefreshJob.stop();
indexerPoller.stop();
server.close();
await cache.disconnect();
process.exit(0);
Expand All @@ -49,6 +54,7 @@ process.on('SIGTERM', async () => {
process.on('SIGINT', async () => {
logger.info('SIGINT received, shutting down');
priceRefreshJob.stop();
indexerPoller.stop();
server.close();
await cache.disconnect();
process.exit(0);
Expand Down
118 changes: 118 additions & 0 deletions src/indexer/eventParser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
const crypto = require('crypto');
const { scValToNative } = require('stellar-sdk');

const EVENT_FIELDS = {
airdrop_created: ['airdrop_id', 'creator', 'token', 'total_amount', 'expiry_ledger'],
recipient_added: ['airdrop_id', 'recipient', 'amount'],
token_claimed: ['airdrop_id', 'recipient', 'amount', 'ledger'],
airdrop_expired: ['airdrop_id', 'unclaimed_amount'],
};

const EVENT_NAMES = Object.keys(EVENT_FIELDS);

function toJsonSafe(value) {
if (typeof value === 'bigint') return value.toString();
if (Buffer.isBuffer(value)) return value.toString('base64');
if (Array.isArray(value)) return value.map(toJsonSafe);
if (value && typeof value === 'object') {
return Object.fromEntries(Object.entries(value).map(([key, val]) => [key, toJsonSafe(val)]));
}
return value;
}

function xdrBase64(scVal) {
if (!scVal || typeof scVal.toXDR !== 'function') return null;
return scVal.toXDR('base64');
}

function decodeScVal(scVal) {
if (scVal === undefined || scVal === null) return null;
return toJsonSafe(scValToNative(scVal));
}

function normalizeEventName(value) {
if (typeof value !== 'string') return null;
return EVENT_NAMES.includes(value) ? value : null;
}

function dataFromValue(eventName, value, topicHintCount = 0) {
if (value && !Array.isArray(value) && typeof value === 'object') {
return value;
}

const fields = EVENT_FIELDS[eventName];
if (Array.isArray(value)) {
const valueFields = topicHintCount > 0 && value.length < fields.length
? fields.slice(fields.length - value.length)
: fields;

return Object.fromEntries(valueFields.map((field, index) => [field, value[index] ?? null]));
}

return { value };
}

function mergeTopicHints(eventName, data, topics) {
const eventNameIndex = topics.findIndex((topic) => topic === eventName);
const topicHints = eventNameIndex >= 0 ? topics.slice(eventNameIndex + 1) : [];
const merged = { ...data };

if (merged.airdrop_id == null && topicHints[0] != null) merged.airdrop_id = topicHints[0];
if (merged.recipient == null && topicHints[1] != null) merged.recipient = topicHints[1];

return merged;
}

function eventId(event) {
if (event.id) return String(event.id);
const fallback = `${event.ledger}:${event.pagingToken}:${JSON.stringify(event.topic || [])}`;
return crypto.createHash('sha256').update(fallback).digest('hex');
}

function contractIdToString(contractId) {
if (!contractId) return null;
if (typeof contractId === 'string') return contractId;
if (typeof contractId.toString === 'function') return contractId.toString();
return String(contractId);
}

function parseContractEvent(event) {
const nativeTopics = (event.topic || []).map(decodeScVal);
const eventName = nativeTopics.map(normalizeEventName).find(Boolean);

if (!eventName) return null;

const decodedValue = decodeScVal(event.value);
const eventNameIndex = nativeTopics.findIndex((topic) => topic === eventName);
const topicHintCount = eventNameIndex >= 0 ? nativeTopics.length - eventNameIndex - 1 : 0;
const data = mergeTopicHints(eventName, dataFromValue(eventName, decodedValue, topicHintCount), nativeTopics);

return {
id: eventId(event),
event_name: eventName,
type: event.type,
ledger: event.ledger,
ledger_closed_at: event.ledgerClosedAt || null,
paging_token: event.pagingToken || null,
contract_id: contractIdToString(event.contractId),
in_successful_contract_call: event.inSuccessfulContractCall !== false,
data,
decoded: {
topics: nativeTopics,
value: decodedValue,
},
raw_xdr: {
topics: (event.topic || []).map(xdrBase64),
value: xdrBase64(event.value),
},
indexed_at: new Date().toISOString(),
};
}

module.exports = {
EVENT_FIELDS,
EVENT_NAMES,
decodeScVal,
parseContractEvent,
toJsonSafe,
};
124 changes: 124 additions & 0 deletions src/indexer/eventPoller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
const { SorobanRpc } = require('stellar-sdk');
const config = require('../config');
const logger = require('../logger');
const eventStore = require('./eventStore');
const { parseContractEvent } = require('./eventParser');

class EventPoller {
constructor(options = {}) {
this.contractId = options.contractId ?? config.indexer.contractId;
this.pollIntervalMs = options.pollIntervalMs ?? config.indexer.pollIntervalMs;
this.pollLimit = options.pollLimit ?? config.indexer.pollLimit;
this.startLedger = options.startLedger ?? config.indexer.startLedger;
this.enabled = options.enabled ?? config.indexer.enabled;
this.store = options.store || eventStore;
this.logger = options.logger || logger;
this.server = options.server || new SorobanRpc.Server(options.rpcUrl || config.stellar.sorobanRpcUrl);
this.timer = null;
this.lastRun = null;
this.lastError = null;
this.latestLedger = null;
}

isConfigured() {
return this.enabled && Boolean(this.contractId);
}

getStatus() {
return {
enabled: this.enabled,
configured: this.isConfigured(),
running: this.timer !== null,
contract_id: this.contractId || null,
poll_interval_ms: this.pollIntervalMs,
poll_limit: this.pollLimit,
last_run: this.lastRun,
last_error: this.lastError,
latest_ledger: this.latestLedger,
};
}

async pollOnce() {
if (!this.isConfigured()) {
return { skipped: true, reason: 'SMARTDROP_CONTRACT_ID not configured' };
}

const previousLedger = await this.store.getLastLedger(null);
const startLedger = previousLedger == null
? this.startLedger || 0
: Math.max(Number(previousLedger) + 1, this.startLedger || 0);

const response = await this.server.getEvents({
startLedger,
filters: [
{
type: 'contract',
contractIds: [this.contractId],
},
],
limit: this.pollLimit,
});

const parsedEvents = (response.events || [])
.map(parseContractEvent)
.filter(Boolean);

for (const event of parsedEvents) {
await this.store.saveEvent(event);
}

const latestIndexedLedger = Math.max(
response.latestLedger || previousLedger,
...parsedEvents.map((event) => event.ledger)
);
await this.store.setLastLedger(latestIndexedLedger);

this.latestLedger = response.latestLedger || null;
this.lastRun = new Date().toISOString();
this.lastError = null;

return {
skipped: false,
start_ledger: startLedger,
latest_ledger: response.latestLedger,
indexed_events: parsedEvents.length,
};
}

start() {
if (this.timer || !this.enabled) return;
if (!this.contractId) {
this.logger.warn('SmartDrop indexer disabled: SMARTDROP_CONTRACT_ID is not configured');
return;
}

const run = async () => {
try {
const result = await this.pollOnce();
this.logger.info('SmartDrop contract events indexed', result);
} catch (err) {
this.lastRun = new Date().toISOString();
this.lastError = err.message;
this.logger.warn('SmartDrop event indexing failed', { error: err.message });
}
};

run();
this.timer = setInterval(run, this.pollIntervalMs);
if (typeof this.timer.unref === 'function') this.timer.unref();
this.logger.info('SmartDrop event indexer started', {
contractId: this.contractId,
pollIntervalMs: this.pollIntervalMs,
});
}

stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
this.logger.info('SmartDrop event indexer stopped');
}
}
}

module.exports = { EventPoller };
Loading