Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"@opentelemetry/sdk-node": "^0.219.0",
"@opentelemetry/semantic-conventions": "^1.41.1",
"@stellar/stellar-sdk": "^13.3.0",
"async-lock": "^1.4.1",
"cockatiel": "^4.0.0",
"dotenv": "^16.4.7",
"drizzle-orm": "^0.38.3",
Expand All @@ -42,6 +43,7 @@
"zod": "^3.24.1"
},
"devDependencies": {
"@types/async-lock": "^1.4.2",
"@types/node": "^22.10.5",
"@types/pg": "^8.11.10",
"@types/sanitize-html": "^2.16.1",
Expand Down
64 changes: 51 additions & 13 deletions src/modules/rewards/reward.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import {
NotFoundError,
ForbiddenError,
ConflictError,
StellarError,
} from "../../utils/errors.js";
import { withLock } from "../../utils/lock.js";
import { invokeContract } from "../../stellar/transactions.js";
import { stellarClient } from "../../stellar/client.js";
import { createQuizProof } from "../../stellar/signatures.js";
import { isCircuitBreakerError } from "../../stellar/resilience.js";
import { config } from "../../config/index.js";
Expand All @@ -29,6 +31,29 @@ import { cacheGet, cacheSet, cacheDel, cacheKey } from "../../cache/index.js";

const REWARD_AMOUNT = 10; // credits per passed quiz

/**
* Helper function to handle bad_seq errors from Stellar transactions.
* When a bad_seq error occurs, it attempts to fetch the current account sequence
* for debugging purposes. The transaction may still succeed on-chain despite the error.
* @returns txHash set to "pending_indexer_confirmation" to indicate uncertain state
*/
async function handleBadSeqError(submissionId: string, stellarAddress: string): Promise<string> {
let accountSeq = "unknown";
try {
const account = await stellarClient.getAccount(stellarAddress);
accountSeq = account.sequence;
} catch {
// Intentionally swallow error: sequence fetch is for debugging only
// If Horizon is unavailable, we still want to mark the transaction as pending
}

logger.warn(
{ submissionId, accountSeq },
"bad_seq after invoke — the tx might actually succeed on-chain"
);
return "pending_indexer_confirmation";
}

/**
* Shared reward claim execution logic.
* Used by both the direct claim path and the background retry processor.
Expand Down Expand Up @@ -77,12 +102,18 @@ export async function processRewardClaim(
{ method: "claim_reward", status: "success" },
Number(process.hrtime.bigint() - txStart) / 1e9,
);
} catch (err) {
} catch (err: unknown) {
stellarTxDurationSeconds.observe(
{ method: "claim_reward", status: "error" },
Number(process.hrtime.bigint() - txStart) / 1e9,
);
throw err;
if (err instanceof StellarError && err.message.includes("bad_seq")) {
txHash = await handleBadSeqError(submissionId, user.stellarAddress);
} else if (err instanceof StellarError && err.message.includes("tx_bad_seq")) {
txHash = await handleBadSeqError(submissionId, user.stellarAddress);
} else {
throw err;
}
}

await db.transaction(async (tx) => {
Expand Down Expand Up @@ -157,16 +188,17 @@ export class RewardService {
submission.score,
);

const [user] = await tx
.select()
.from(users)
.where(eq(users.id, userId));

if (!user) {
throw new NotFoundError("User");
}

let txHash: string | null = null;
try {
const [user] = await tx
.select()
.from(users)
.where(eq(users.id, userId));

if (!user) {
throw new NotFoundError("User");
}

txHash = await invokeContract(
config.STELLAR_REWARD_CONTRACT_ID,
Expand All @@ -177,7 +209,7 @@ export class RewardService {
StellarSdk.nativeToScVal(Buffer.from(proof.signature, "base64")),
],
);
} catch (err) {
} catch (err: unknown) {
if (err instanceof NotFoundError) throw err;

if (isCircuitBreakerError(err)) {
Expand Down Expand Up @@ -207,8 +239,14 @@ export class RewardService {
};
}

logger.error({ err, submissionId }, "On-chain reward claim failed");
throw new Error("Failed to process on-chain reward");
if (err instanceof StellarError && err.message.includes("bad_seq")) {
txHash = await handleBadSeqError(submissionId, user.stellarAddress);
} else if (err instanceof StellarError && err.message.includes("tx_bad_seq")) {
txHash = await handleBadSeqError(submissionId, user.stellarAddress);
} else {
logger.error({ err, submissionId }, "On-chain reward claim failed");
throw new Error("Failed to process on-chain reward");
}
}

await tx
Expand Down
33 changes: 33 additions & 0 deletions src/stellar/sequence-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { stellarClient } from "./client.js";

export class SequenceCache {
private localSeq: Map<string, bigint> = new Map();

async getNextSequence(accountId: string): Promise<string> {
// 1. Check local in-memory cache first
const local = this.localSeq.get(accountId);
if (local !== undefined) {
const next = local + 1n;
this.localSeq.set(accountId, next);
return next.toString();
}

// 2. Load from Horizon
const account = await stellarClient.getAccount(accountId);
const seq = BigInt(account.sequence);
const next = seq + 1n;
this.localSeq.set(accountId, next);
// Return next sequence number (seq + 1) on first call for consistency
return next.toString();
}

invalidate(accountId: string): void {
this.localSeq.delete(accountId);
}

resetTo(accountId: string, seq: bigint): void {
this.localSeq.set(accountId, seq);
}
}

export const sequenceCache = new SequenceCache();
116 changes: 77 additions & 39 deletions src/stellar/transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import { stellarClient } from "./client.js";
import { logger } from "../utils/logger.js";
import { StellarError } from "../utils/errors.js";

import { sequenceCache } from "./sequence-cache.js";
import { withAccountLock } from "../utils/account-lock.js";

const MAX_SEQ_RETRIES = 3;

/**
* Build and submit a Soroban contract invocation transaction.
*/
Expand All @@ -19,33 +24,50 @@ export async function invokeContract(
signer?: StellarSdk.Keypair
): Promise<string> {
const keypair = signer ?? getPlatformKeypair();
const account = await stellarClient.getAccount(keypair.publicKey());
const contract = new StellarSdk.Contract(contractId);

const tx = new StellarSdk.TransactionBuilder(account, {
fee: StellarSdk.BASE_FEE,
networkPassphrase: getNetworkPassphrase(),
})
.addOperation(contract.call(method, ...args))
.setTimeout(60)
.build();
return withAccountLock(keypair.publicKey(), async () => {
const contract = new StellarSdk.Contract(contractId);

for (let attempt = 0; attempt < MAX_SEQ_RETRIES; attempt++) {
try {
const seqNum = await sequenceCache.getNextSequence(keypair.publicKey());
const account = new StellarSdk.Account(keypair.publicKey(), seqNum);

tx.sign(keypair);
const tx = new StellarSdk.TransactionBuilder(account, {
fee: StellarSdk.BASE_FEE,
networkPassphrase: getNetworkPassphrase(),
})
.addOperation(contract.call(method, ...args))
.setTimeout(60)
.build();

// Simulate first to avoid submitting doomed txs
const soroban = getSorobanServer();
const simResult = await soroban.simulateTransaction(tx);
if (StellarSdk.rpc.Api.isSimulationError(simResult)) {
logger.error({ error: simResult.error }, "Simulation failed");
throw new StellarError(`Simulation failed: ${simResult.error}`);
}
tx.sign(keypair);

// Prepare the transaction with the simulation results
const preparedTx = StellarSdk.rpc.assembleTransaction(tx, simResult).build();
preparedTx.sign(keypair);
// Simulate first to avoid submitting doomed txs
const soroban = getSorobanServer();
const simResult = await soroban.simulateTransaction(tx);
if (StellarSdk.rpc.Api.isSimulationError(simResult)) {
logger.error({ error: simResult.error }, "Simulation failed");
throw new StellarError(`Simulation failed: ${simResult.error}`);
}

const result = await stellarClient.submitTransaction(preparedTx);
return result.hash;
// Prepare the transaction with the simulation results
const preparedTx = StellarSdk.rpc.assembleTransaction(tx, simResult).build();
preparedTx.sign(keypair);

const result = await stellarClient.submitTransaction(preparedTx);
return result.hash;
} catch (err: any) {
if (err instanceof StellarError && (err.message.includes("bad_seq") || err.message.includes("tx_bad_seq"))) {
sequenceCache.invalidate(keypair.publicKey());
logger.warn({ attempt, err }, "Sequence number conflict, retrying with fresh sequence");
continue;
}
throw err;
}
}
throw new StellarError(`Failed after ${MAX_SEQ_RETRIES} attempts due to sequence conflicts`);
});
}

/**
Expand All @@ -57,23 +79,39 @@ export async function sendPayment(
signer?: StellarSdk.Keypair
): Promise<string> {
const keypair = signer ?? getPlatformKeypair();
const account = await stellarClient.getAccount(keypair.publicKey());

const tx = new StellarSdk.TransactionBuilder(account, {
fee: StellarSdk.BASE_FEE,
networkPassphrase: getNetworkPassphrase(),
})
.addOperation(
StellarSdk.Operation.payment({
destination,
asset: StellarSdk.Asset.native(),
amount,
})
)
.setTimeout(60)
.build();
return withAccountLock(keypair.publicKey(), async () => {
for (let attempt = 0; attempt < MAX_SEQ_RETRIES; attempt++) {
try {
const seqNum = await sequenceCache.getNextSequence(keypair.publicKey());
const account = new StellarSdk.Account(keypair.publicKey(), seqNum);

const tx = new StellarSdk.TransactionBuilder(account, {
fee: StellarSdk.BASE_FEE,
networkPassphrase: getNetworkPassphrase(),
})
.addOperation(
StellarSdk.Operation.payment({
destination,
asset: StellarSdk.Asset.native(),
amount,
})
)
.setTimeout(60)
.build();

tx.sign(keypair);
const result = await stellarClient.submitTransaction(tx);
return result.hash;
tx.sign(keypair);
const result = await stellarClient.submitTransaction(tx);
return result.hash;
} catch (err: any) {
if (err instanceof StellarError && (err.message.includes("bad_seq") || err.message.includes("tx_bad_seq"))) {
sequenceCache.invalidate(keypair.publicKey());
logger.warn({ attempt, err }, "Sequence number conflict, retrying with fresh sequence");
continue;
}
throw err;
}
}
throw new StellarError(`Failed after ${MAX_SEQ_RETRIES} attempts due to sequence conflicts`);
});
}
13 changes: 13 additions & 0 deletions src/utils/account-lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import AsyncLock from "async-lock";

const accountLock = new AsyncLock({
timeout: 10_000,
maxPending: 50,
});

export function withAccountLock<T>(
accountId: string,
fn: () => Promise<T>
): Promise<T> {
return accountLock.acquire(`account:${accountId}`, fn);
}
2 changes: 1 addition & 1 deletion src/utils/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class AppError extends Error {
this.statusCode = statusCode;
this.code = code;
this.isOperational = isOperational;
Object.setPrototypeOf(this, AppError.prototype);
Object.setPrototypeOf(this, new.target.prototype);
}
}

Expand Down
Loading
Loading