From e01002aeda51f7fb3bf5c85bb78fef8eb832f506 Mon Sep 17 00:00:00 2001 From: Eneje Glory Date: Sun, 28 Jun 2026 15:39:54 +0100 Subject: [PATCH 1/2] fix: fenced lock release, wire retry processor, guard indexer metrics on RPC fail, add yield tests closes #1163 closes #1167 closes #1171 closes #1201 --- backend/src/index.ts | 14 +- .../__tests__/yieldHistoryService.test.ts | 120 ++++++++++++++++++ backend/src/services/cacheService.ts | 35 +++++ backend/src/services/defaultChecker.ts | 28 +++- backend/src/services/eventIndexer.ts | 8 ++ backend/src/services/webhookRetryProcessor.ts | 15 ++- backend/src/services/webhookRetryScheduler.ts | 99 --------------- 7 files changed, 207 insertions(+), 112 deletions(-) delete mode 100644 backend/src/services/webhookRetryScheduler.ts diff --git a/backend/src/index.ts b/backend/src/index.ts index ae9dfa79..c525e6be 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -17,10 +17,10 @@ import { stopDefaultCheckerScheduler, } from './services/defaultChecker.js'; import { - startWebhookRetryScheduler, - stopWebhookRetryScheduler, -} from './services/webhookRetryScheduler.js'; -import { eventStreamService } from './services/eventStreamService.js'; + startWebhookRetryProcessor, + stopWebhookRetryProcessor, +} from "./services/webhookRetryProcessor.js"; +import { eventStreamService } from "./services/eventStreamService.js"; import { startNotificationCleanupScheduler, stopNotificationCleanupScheduler, @@ -66,8 +66,8 @@ const server = app.listen(port, () => { // Start periodic on-chain default checks (if configured) startDefaultCheckerScheduler(); - // Start webhook retry scheduler - startWebhookRetryScheduler(); + // Start webhook retry processor (5m/15m/45m backoff via WebhookService.processRetries) + startWebhookRetryProcessor(); // Start scheduled score reconciliation against on-chain state startScoreReconciliationScheduler(); @@ -100,7 +100,7 @@ const shutdown = async (signal: 'SIGTERM' | 'SIGINT') => { await stopIndexer(); stopDefaultCheckerScheduler(); - stopWebhookRetryScheduler(); + stopWebhookRetryProcessor(); stopScoreReconciliationScheduler(); stopNotificationCleanupScheduler(); diff --git a/backend/src/services/__tests__/yieldHistoryService.test.ts b/backend/src/services/__tests__/yieldHistoryService.test.ts index 2d0aec15..2557cd74 100644 --- a/backend/src/services/__tests__/yieldHistoryService.test.ts +++ b/backend/src/services/__tests__/yieldHistoryService.test.ts @@ -77,4 +77,124 @@ describe('yieldHistoryService', () => { expect(latest.currentValue).toBeGreaterThanOrEqual(1000); expect(latest.netYield).toBeGreaterThanOrEqual(0); }); + + // Issue #1171 — previously uncovered paths + + it("Withdraw reduces cost basis proportionally", async () => { + const now = new Date(); + const t1 = new Date(now); + t1.setUTCDate(t1.getUTCDate() - 2); + const t2 = new Date(now); + t2.setUTCDate(t2.getUTCDate() - 1); + + // Pool events: Deposit 1000 then Withdraw 500 (value=null → shares=amount) + mockQuery.mockResolvedValueOnce({ + rows: [ + { event_type: "Deposit", amount: "1000", ledger_closed_at: t1, value: null }, + { event_type: "Withdraw", amount: "500", ledger_closed_at: t2, value: null }, + ], + }); + // Depositor events: same + mockQuery.mockResolvedValueOnce({ + rows: [ + { event_type: "Deposit", amount: "1000", ledger_closed_at: t1, value: null }, + { event_type: "Withdraw", amount: "500", ledger_closed_at: t2, value: null }, + ], + }); + + const history = await buildDepositorYieldHistory("GDep", "GTok", 7, 500_000); + expect(history.length).toBeGreaterThan(0); + // After withdrawing half the shares the cost basis should have halved + const latest = history[history.length - 1]!; + // netYield = currentValue - costBasis; costBasis after withdraw ≈ 500 + expect(latest.netYield).toBeGreaterThanOrEqual(-1); // may be slightly negative due to share price + }); + + it("EmergencyWithdraw follows the same cost-basis reduction path as Withdraw", async () => { + const now = new Date(); + const t1 = new Date(now); + t1.setUTCDate(t1.getUTCDate() - 2); + const t2 = new Date(now); + t2.setUTCDate(t2.getUTCDate() - 1); + + mockQuery.mockResolvedValueOnce({ + rows: [ + { event_type: "Deposit", amount: "1000", ledger_closed_at: t1, value: null }, + { event_type: "EmergencyWithdraw", amount: "1000", ledger_closed_at: t2, value: null }, + ], + }); + mockQuery.mockResolvedValueOnce({ + rows: [ + { event_type: "Deposit", amount: "1000", ledger_closed_at: t1, value: null }, + { event_type: "EmergencyWithdraw", amount: "1000", ledger_closed_at: t2, value: null }, + ], + }); + + const history = await buildDepositorYieldHistory("GDep", "GTok", 7, 0); + // Full emergency withdraw → depositor has 0 shares → currentValue = 0 + if (history.length > 0) { + const latest = history[history.length - 1]!; + expect(latest.currentValue).toBe(0); + } + }); + + it("decodes shares from base64 XDR value (BigInt conversion path)", async () => { + // XDR encodes [assetAmount=1000, shares=500] as a 2-element Vec of i128 + // Generated with: nativeToScVal([BigInt(1000), BigInt(500)]).toXDR('base64') + const xdrDeposit = "AAAAEAAAAAEAAAACAAAABQAAAAAAAAPoAAAABQAAAAAAAAH0"; + // [assetAmount=2000, shares=800] + const xdrWithdraw = "AAAAEAAAAAEAAAACAAAABQAAAAAAAAfQAAAABQAAAAAAAAMg"; + + const now = new Date(); + const t1 = new Date(now); + t1.setUTCDate(t1.getUTCDate() - 2); + const t2 = new Date(now); + t2.setUTCDate(t2.getUTCDate() - 1); + + mockQuery.mockResolvedValueOnce({ + rows: [ + { event_type: "Deposit", amount: "1000", ledger_closed_at: t1, value: xdrDeposit }, + { event_type: "Withdraw", amount: "500", ledger_closed_at: t2, value: xdrWithdraw }, + ], + }); + mockQuery.mockResolvedValueOnce({ + rows: [ + { event_type: "Deposit", amount: "1000", ledger_closed_at: t1, value: xdrDeposit }, + { event_type: "Withdraw", amount: "500", ledger_closed_at: t2, value: xdrWithdraw }, + ], + }); + + // Should not throw even with non-null XDR values + const history = await buildDepositorYieldHistory("GDep", "GTok", 7, 1_000_000); + expect(history.length).toBeGreaterThan(0); + }); + + it("falls back gracefully when XDR value is malformed", async () => { + const now = new Date(); + mockQuery.mockResolvedValueOnce({ + rows: [ + { + event_type: "Deposit", + amount: "1000", + ledger_closed_at: now, + value: "not-valid-base64-xdr!!!", + }, + ], + }); + mockQuery.mockResolvedValueOnce({ + rows: [ + { + event_type: "Deposit", + amount: "1000", + ledger_closed_at: now, + value: "not-valid-base64-xdr!!!", + }, + ], + }); + + // Should not throw — falls back to assetAmount as shares + await expect( + buildDepositorYieldHistory("GDep", "GTok", 7), + ).resolves.toBeDefined(); + }); }); diff --git a/backend/src/services/cacheService.ts b/backend/src/services/cacheService.ts index a5275450..127727bd 100644 --- a/backend/src/services/cacheService.ts +++ b/backend/src/services/cacheService.ts @@ -129,6 +129,41 @@ class CacheService { } } + /** + * Delete a key only when its stored value matches `expectedValue` (fenced + * compare-and-delete). Used by distributed locks so a run that outlives the + * TTL cannot delete a lock acquired by a different instance. + * + * @returns true if the key existed and the value matched (key deleted), + * false if the key was absent or the value did not match. + */ + async deleteIfMatch(key: string, expectedValue: string): Promise { + try { + await this.ensureConnected(); + const stored = await this.client!.get(key); + if (stored === null) return false; + + let storedValue: unknown; + try { + storedValue = JSON.parse(stored); + } catch { + storedValue = stored; + } + + if (storedValue !== expectedValue) return false; + + await this.client!.del(key); + return true; + } catch (error) { + if (process.env.NODE_ENV !== "test") { + logger + .withContext() + .error(`Error in deleteIfMatch for key ${key}`, { error }); + } + return false; + } + } + /** * Invalidate multiple keys by a pattern (e.g. prefix) * Note: KEYS is generally not recommended in production, but suitable for exact or bounded patterns. diff --git a/backend/src/services/defaultChecker.ts b/backend/src/services/defaultChecker.ts index e6a14c5f..443c8e33 100644 --- a/backend/src/services/defaultChecker.ts +++ b/backend/src/services/defaultChecker.ts @@ -91,6 +91,7 @@ export class DefaultChecker { private pollAttempts: number; private pollSleepMs: number; private concurrency: number; + private currentLockValue: string | null = null; constructor() { this.contractId = process.env.LOAN_MANAGER_CONTRACT_ID || ''; @@ -385,7 +386,14 @@ export class DefaultChecker { private async acquireLock(): Promise { try { const lockValue = `${Date.now()}-${Math.random().toString(16).slice(2)}`; - const acquired = await cacheService.setNotExists(LOCK_KEY, lockValue, LOCK_TTL_SECONDS); + const acquired = await cacheService.setNotExists( + LOCK_KEY, + lockValue, + LOCK_TTL_SECONDS, + ); + if (acquired) { + this.currentLockValue = lockValue; + } return acquired; } catch (error) { logger.withContext().error('Failed to acquire default checker lock', { error }); @@ -394,11 +402,25 @@ export class DefaultChecker { } /** - * Releases the distributed lock. + * Releases the distributed lock only when the stored value matches the value + * this instance set at acquire time. A run that outlives the TTL cannot + * delete a lock that now belongs to a different instance. */ private async releaseLock(): Promise { + const value = this.currentLockValue; + this.currentLockValue = null; + + if (!value) return; + try { - await cacheService.delete(LOCK_KEY); + const released = await cacheService.deleteIfMatch(LOCK_KEY, value); + if (!released) { + logger + .withContext() + .warn( + "default_checker: lock already expired or owned by another instance — skipping delete", + ); + } } catch (error) { logger.withContext().error('Failed to release default checker lock', { error }); } diff --git a/backend/src/services/eventIndexer.ts b/backend/src/services/eventIndexer.ts index 439b31f4..b4502581 100644 --- a/backend/src/services/eventIndexer.ts +++ b/backend/src/services/eventIndexer.ts @@ -254,6 +254,14 @@ export class EventIndexer { const lastIndexedLedger = await this.getLastIndexedLedger(); const latestLedger = await this.getLatestLedgerSequence(); + // latestLedger === 0 means getLatestLedgerSequence failed (RPC error or + // non-finite response). Publishing lag=0 / chainTip=0 during an outage + // would silently defeat the behind-chain-tip alert, so skip the metric + // update and leave gauges at their last known values. + if (latestLedger === 0) { + return; + } + if (latestLedger <= lastIndexedLedger) { recordIndexerLedgers(lastIndexedLedger, latestLedger); return; diff --git a/backend/src/services/webhookRetryProcessor.ts b/backend/src/services/webhookRetryProcessor.ts index a2e5d398..ac9e5565 100644 --- a/backend/src/services/webhookRetryProcessor.ts +++ b/backend/src/services/webhookRetryProcessor.ts @@ -6,10 +6,19 @@ import { jobMetricsService } from './jobMetricsService.js'; let retryProcessorInterval: NodeJS.Timeout | null = null; /** - * Starts the webhook retry processor that periodically checks for failed - * webhook deliveries and retries them with exponential backoff. + * Starts the webhook retry processor. * - * Runs every 10 seconds to process pending retries. + * Polls every 10 seconds and delegates to WebhookService.processRetries, + * which queries for deliveries whose next_retry_at <= now and applies the + * backoff schedule defined in webhookService.ts: + * attempt 1 → retry after 5 min + * attempt 2 → retry after 15 min + * attempt 3 → retry after 45 min + * attempt 4+ → permanently failed (MAX_RETRY_ATTEMPTS = 4) + * + * This is the single retry implementation wired in index.ts. + * webhookRetryScheduler.ts (which used a different backoff and ignored + * next_retry_at) has been removed. */ export function startWebhookRetryProcessor(): void { if (retryProcessorInterval) { diff --git a/backend/src/services/webhookRetryScheduler.ts b/backend/src/services/webhookRetryScheduler.ts deleted file mode 100644 index 7fc39f36..00000000 --- a/backend/src/services/webhookRetryScheduler.ts +++ /dev/null @@ -1,99 +0,0 @@ -import { query } from '../db/connection.js'; -import logger from '../utils/logger.js'; -import { WebhookService, type WebhookEventType } from './webhookService.js'; - -const BACKOFF = [60, 300, 1800]; // seconds - -let schedulerInterval: NodeJS.Timeout | null = null; - -async function markAsFailed(deliveryId: number) { - await query( - `UPDATE webhook_deliveries - SET next_retry_at = NULL, - last_error = $1, - updated_at = NOW() - WHERE id = $2`, - ['Permanently failed after max attempts reached', deliveryId], - ); - logger.withContext().error(`Webhook delivery ${deliveryId} marked as permanently failed.`); -} - -function shouldRetry(delivery: { updated_at: string }, delay: number): boolean { - const lastAttempt = new Date(delivery.updated_at).getTime(); - const now = Date.now(); - return now >= lastAttempt + delay * 1000; -} - -async function sendWebhookAgain(delivery: { - id: number; - attempt_count: number; - subscription_id: number; - callback_url: string; - secret: string | null; - event_id: number; - event_type: string; - payload: unknown; - updated_at: string; -}) { - logger - .withContext() - .info(`Retrying webhook delivery ${delivery.id} (attempt ${delivery.attempt_count + 1})`); - - await WebhookService.retryWebhookDelivery( - delivery.id, - delivery.subscription_id, - delivery.callback_url, - delivery.secret || undefined, - String(delivery.event_id), - delivery.event_type as WebhookEventType, - delivery.payload as Record, - delivery.attempt_count, - ); -} - -export async function retryFailedWebhooks() { - try { - const result = await query(` - SELECT wd.*, ws.max_attempts, ws.callback_url, ws.secret - FROM webhook_deliveries wd - JOIN webhook_subscriptions ws ON wd.subscription_id = ws.id - WHERE wd.delivered_at IS NULL - AND (wd.next_retry_at IS NOT NULL OR wd.attempt_count = 0) - `); - - const failed = result.rows; - - for (const delivery of failed) { - const delay = BACKOFF[delivery.attempt_count] || 3600; - - if (delivery.attempt_count >= delivery.max_attempts) { - await markAsFailed(delivery.id); - continue; - } - - if (shouldRetry(delivery, delay)) { - await sendWebhookAgain(delivery); - } - } - } catch (error) { - logger.withContext().error('Error in webhook retry scheduler', { error }); - } -} - -export function startWebhookRetryScheduler() { - if (schedulerInterval) { - logger.withContext().warn('Webhook retry scheduler already running'); - return; - } - - logger.withContext().info('Starting webhook retry scheduler (60s interval)'); - schedulerInterval = setInterval(retryFailedWebhooks, 60000); -} - -export function stopWebhookRetryScheduler() { - if (schedulerInterval) { - logger.withContext().info('Stopping webhook retry scheduler'); - clearInterval(schedulerInterval); - schedulerInterval = null; - } -} From 7b405cccb4eb0ab3d21bb7cc8258120c52684a97 Mon Sep 17 00:00:00 2001 From: Eneje Glory Date: Mon, 29 Jun 2026 10:43:30 +0100 Subject: [PATCH 2/2] fix(lint): use single quotes in webhook processor and event stream imports --- backend/src/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index c525e6be..8f9dd311 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -19,8 +19,8 @@ import { import { startWebhookRetryProcessor, stopWebhookRetryProcessor, -} from "./services/webhookRetryProcessor.js"; -import { eventStreamService } from "./services/eventStreamService.js"; +} from './services/webhookRetryProcessor.js'; +import { eventStreamService } from './services/eventStreamService.js'; import { startNotificationCleanupScheduler, stopNotificationCleanupScheduler,