diff --git a/migrations/1772000000000_webhook-subscriptions.js b/migrations/1772000000000_webhook-subscriptions.js index 8e1879d..5e941c7 100644 --- a/migrations/1772000000000_webhook-subscriptions.js +++ b/migrations/1772000000000_webhook-subscriptions.js @@ -11,7 +11,11 @@ export const up = (pgm) => { pgm.createTable("webhook_subscriptions", { id: "id", callback_url: { type: "text", notNull: true }, - event_types: { type: "jsonb", notNull: true, default: "[]::jsonb" }, + event_types: { + type: "jsonb", + notNull: true, + default: pgm.func("'[]'::jsonb"), + }, secret: { type: "varchar(255)" }, is_active: { type: "boolean", notNull: true, default: true }, created_at: { diff --git a/migrations/1776000000006_add-interest-rate-to-loan-events.js b/migrations/1776000000006_add-interest-rate-to-loan-events.js index 1155466..aafbbf9 100644 --- a/migrations/1776000000006_add-interest-rate-to-loan-events.js +++ b/migrations/1776000000006_add-interest-rate-to-loan-events.js @@ -1,4 +1,4 @@ -exports.up = (pgm) => { +export const up = (pgm) => { pgm.addColumns("loan_events", { interest_rate_bps: { type: "integer", default: null }, term_ledgers: { type: "integer", default: null }, @@ -8,6 +8,6 @@ exports.up = (pgm) => { // but for now we'll just track the rate per-loan event. }; -exports.down = (pgm) => { +export const down = (pgm) => { pgm.dropColumns("loan_events", ["interest_rate_bps", "term_ledgers"]); }; diff --git a/migrations/1778000000009_transaction-submissions.js b/migrations/1778000000009_transaction-submissions.js index 34a4dac..2cb80be 100644 --- a/migrations/1778000000009_transaction-submissions.js +++ b/migrations/1778000000009_transaction-submissions.js @@ -1,7 +1,7 @@ /** * @param { import("node-pg-migrate").MigrationBuilder } @param pgm {import("node-pg-migrate").MigrationBuilder} */ -exports.up = (pgm) => { +export const up = (pgm) => { pgm.createTable("transaction_submissions", { id: { type: "serial", @@ -52,6 +52,18 @@ exports.up = (pgm) => { pgm.createIndex("transaction_submissions", ["status"]); pgm.createIndex("transaction_submissions", ["transaction_type"]); + // Ensure the shared updated_at trigger function exists (not created by any + // earlier migration), otherwise the trigger below cannot be created. + pgm.sql(` + CREATE OR REPLACE FUNCTION update_updated_at_column() + RETURNS trigger AS $$ + BEGIN + NEW.updated_at = NOW(); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + `); + // Trigger to update updated_at timestamp pgm.createTrigger("transaction_submissions", "update_updated_at", { when: "BEFORE", @@ -63,6 +75,6 @@ exports.up = (pgm) => { /** * @param { import("node-pg-migrate").MigrationBuilder } @param pgm {import("node-pg-migrate").MigrationBuilder} */ -exports.down = (pgm) => { +export const down = (pgm) => { pgm.dropTable("transaction_submissions"); }; diff --git a/migrations/1781000000011_webhook-retry-logic.js b/migrations/1781000000011_webhook-retry-logic.js index 775b358..291b68d 100644 --- a/migrations/1781000000011_webhook-retry-logic.js +++ b/migrations/1781000000011_webhook-retry-logic.js @@ -8,21 +8,31 @@ export const shorthands = undefined; * @returns {Promise | void} */ export const up = (pgm) => { - // Add payload column to webhook_deliveries table - pgm.addColumn("webhook_deliveries", { - payload: { - type: "jsonb", - notNull: false, + // Add payload column to webhook_deliveries table. + // 1772 already creates this column, so guard against re-adding it. + pgm.addColumn( + "webhook_deliveries", + { + payload: { + type: "jsonb", + notNull: false, + }, }, - }); + { ifNotExists: true }, + ); - // Add next_retry_at column to track when to retry - pgm.addColumn("webhook_deliveries", { - next_retry_at: { - type: "timestamp", - notNull: false, + // Add next_retry_at column to track when to retry. + // 1772 already creates this column, so guard against re-adding it. + pgm.addColumn( + "webhook_deliveries", + { + next_retry_at: { + type: "timestamp", + notNull: false, + }, }, - }); + { ifNotExists: true }, + ); // Add index for efficient retry polling pgm.createIndex("webhook_deliveries", ["next_retry_at"], { diff --git a/migrations/1784000000014_add-loan-disputes.js b/migrations/1784000000014_add-loan-disputes.js index d802da4..2c4ae4e 100644 --- a/migrations/1784000000014_add-loan-disputes.js +++ b/migrations/1784000000014_add-loan-disputes.js @@ -1,41 +1,42 @@ // Migration: Add loan_disputes table and support for disputed loan status -module.exports = { - async up(db) { - // 1. Create loan_disputes table - await db.query(` - CREATE TABLE IF NOT EXISTS loan_disputes ( - id SERIAL PRIMARY KEY, - loan_id INTEGER NOT NULL REFERENCES loan_events(loan_id), - borrower TEXT NOT NULL, - reason TEXT NOT NULL, - status TEXT NOT NULL DEFAULT 'open', -- open, resolved, rejected - admin_note TEXT, - resolution TEXT, - created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), - resolved_at TIMESTAMP WITH TIME ZONE - ); - `); +export const up = (pgm) => { + // 1. Create loan_disputes table + pgm.sql(` + CREATE TABLE IF NOT EXISTS loan_disputes ( + id SERIAL PRIMARY KEY, + -- loan_id is not a FK: loan_events is an append-only event table whose + -- loan_id is non-unique (and later becomes a view), so it cannot be a + -- foreign key target. + loan_id INTEGER NOT NULL, + borrower TEXT NOT NULL, + reason TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'open', -- open, resolved, rejected + admin_note TEXT, + resolution TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + resolved_at TIMESTAMP WITH TIME ZONE + ); + `); - // 2. Add indexes for efficient querying - await db.query(` - CREATE INDEX IF NOT EXISTS idx_loan_disputes_status ON loan_disputes(status); - `); - await db.query(` - CREATE INDEX IF NOT EXISTS idx_loan_disputes_borrower ON loan_disputes(borrower); - `); - await db.query(` - CREATE INDEX IF NOT EXISTS idx_loan_disputes_loan_id ON loan_disputes(loan_id); - `); + // 2. Add indexes for efficient querying + pgm.sql(` + CREATE INDEX IF NOT EXISTS idx_loan_disputes_status ON loan_disputes(status); + `); + pgm.sql(` + CREATE INDEX IF NOT EXISTS idx_loan_disputes_borrower ON loan_disputes(borrower); + `); + pgm.sql(` + CREATE INDEX IF NOT EXISTS idx_loan_disputes_loan_id ON loan_disputes(loan_id); + `); - // 3. Add disputed status to loan_events (if using status enum, update it) - // If status is a string, no migration needed. If enum, alter type here. - // Example for enum: - // await db.query(`ALTER TYPE loan_status_enum ADD VALUE IF NOT EXISTS 'disputed';`); - }, + // 3. Add disputed status to loan_events (if using status enum, update it) + // If status is a string, no migration needed. If enum, alter type here. + // Example for enum: + // pgm.sql(`ALTER TYPE loan_status_enum ADD VALUE IF NOT EXISTS 'disputed';`); +}; - async down(db) { - await db.query(`DROP TABLE IF EXISTS loan_disputes;`); - // No need to remove enum value (Postgres doesn't support removing enum values easily) - }, +export const down = (pgm) => { + pgm.sql(`DROP TABLE IF EXISTS loan_disputes;`); + // No need to remove enum value (Postgres doesn't support removing enum values easily) }; diff --git a/migrations/1787000000017_user-notification-preferences.js b/migrations/1787000000017_user-notification-preferences.js index cf58cf0..a4cd71e 100644 --- a/migrations/1787000000017_user-notification-preferences.js +++ b/migrations/1787000000017_user-notification-preferences.js @@ -8,11 +8,16 @@ export const shorthands = undefined; * @returns {Promise | void} */ export const up = (pgm) => { - pgm.addColumns("user_profiles", { - email_enabled: { type: "boolean", notNull: true, default: false }, - sms_enabled: { type: "boolean", notNull: true, default: false }, - phone: { type: "varchar(20)" }, - }); + // 1773 already adds these notification columns, so guard against re-adding. + pgm.addColumns( + "user_profiles", + { + email_enabled: { type: "boolean", notNull: true, default: false }, + sms_enabled: { type: "boolean", notNull: true, default: false }, + phone: { type: "varchar(20)" }, + }, + { ifNotExists: true }, + ); }; /** diff --git a/migrations/1788000000019_unified-contract-events.js b/migrations/1788000000019_unified-contract-events.js index 9044ff9..02e7e35 100644 --- a/migrations/1788000000019_unified-contract-events.js +++ b/migrations/1788000000019_unified-contract-events.js @@ -12,32 +12,15 @@ export const up = (pgm) => { // 3. Make address nullable (for events like YieldDistributed that may not have a user address) pgm.alterColumn("contract_events", "address", { notNull: false }); - // 4. Rename indexes to match the new table and column names - pgm.renameIndex( - "contract_events", - "idx_loan_events_borrower_event_type", - "idx_contract_events_address_event_type", - ); - pgm.renameIndex( - "contract_events", - "idx_loan_events_loan_id_event_type", - "idx_contract_events_loan_id_event_type", - ); - pgm.renameIndex( - "contract_events", - "idx_loan_events_event_type_loan_id", - "idx_contract_events_event_type_loan_id", - ); - pgm.renameIndex( - "contract_events", - "idx_loan_events_ledger", - "idx_contract_events_ledger", - ); - pgm.renameIndex( - "contract_events", - "idx_loan_events_pool_deposits_withdraws", - "idx_contract_events_pool_deposits_withdraws", - ); + // 4. Rename indexes to match the new table and column names. + // node-pg-migrate has no renameIndex helper, so use raw ALTER INDEX. + pgm.sql(` + ALTER INDEX IF EXISTS idx_loan_events_borrower_event_type RENAME TO idx_contract_events_address_event_type; + ALTER INDEX IF EXISTS idx_loan_events_loan_id_event_type RENAME TO idx_contract_events_loan_id_event_type; + ALTER INDEX IF EXISTS idx_loan_events_event_type_loan_id RENAME TO idx_contract_events_event_type_loan_id; + ALTER INDEX IF EXISTS idx_loan_events_ledger RENAME TO idx_contract_events_ledger; + ALTER INDEX IF EXISTS idx_loan_events_pool_deposits_withdraws RENAME TO idx_contract_events_pool_deposits_withdraws; + `); // Rename single-column indexes from initial schema (if they exist) pgm.sql(` @@ -81,32 +64,14 @@ export const down = (pgm) => { pgm.renameTable("contract_events", "loan_events"); - // Revert index names - pgm.renameIndex( - "loan_events", - "idx_contract_events_address_event_type", - "idx_loan_events_borrower_event_type", - ); - pgm.renameIndex( - "loan_events", - "idx_contract_events_loan_id_event_type", - "idx_loan_events_loan_id_event_type", - ); - pgm.renameIndex( - "loan_events", - "idx_contract_events_event_type_loan_id", - "idx_loan_events_event_type_loan_id", - ); - pgm.renameIndex( - "loan_events", - "idx_contract_events_ledger", - "idx_loan_events_ledger", - ); - pgm.renameIndex( - "loan_events", - "idx_contract_events_pool_deposits_withdraws", - "idx_loan_events_pool_deposits_withdraws", - ); + // Revert index names (raw ALTER INDEX; no renameIndex helper exists) + pgm.sql(` + ALTER INDEX IF EXISTS idx_contract_events_address_event_type RENAME TO idx_loan_events_borrower_event_type; + ALTER INDEX IF EXISTS idx_contract_events_loan_id_event_type RENAME TO idx_loan_events_loan_id_event_type; + ALTER INDEX IF EXISTS idx_contract_events_event_type_loan_id RENAME TO idx_loan_events_event_type_loan_id; + ALTER INDEX IF EXISTS idx_contract_events_ledger RENAME TO idx_loan_events_ledger; + ALTER INDEX IF EXISTS idx_contract_events_pool_deposits_withdraws RENAME TO idx_loan_events_pool_deposits_withdraws; + `); pgm.sql(` ALTER INDEX IF EXISTS contract_events_event_type_index RENAME TO loan_events_event_type_index; diff --git a/migrations/1789000000000_ensure-core-tables.js b/migrations/1789000000000_ensure-core-tables.js index 38d7933..d489021 100644 --- a/migrations/1789000000000_ensure-core-tables.js +++ b/migrations/1789000000000_ensure-core-tables.js @@ -25,11 +25,13 @@ export const up = (pgm) => { END $$; `); - // Ensure loan_events table matches requested schema + // Ensure loan_events relation exists. Use to_regclass (not pg_tables) so the + // backward-compat loan_events VIEW created in 1788 also counts as existing; + // otherwise this would try to CREATE TABLE over the view and fail. pgm.sql(` DO $$ BEGIN - IF NOT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'loan_events') THEN + IF to_regclass('public.loan_events') IS NULL THEN CREATE TABLE loan_events ( id SERIAL PRIMARY KEY, loan_id INTEGER, diff --git a/src/cron/loanCheckCron.ts b/src/cron/loanCheckCron.ts index e04c44d..1136339 100644 --- a/src/cron/loanCheckCron.ts +++ b/src/cron/loanCheckCron.ts @@ -7,6 +7,14 @@ import { cacheService } from "../services/cacheService.js"; const LOCK_KEY = "loan_due_check_cron:running"; const LOCK_TTL_SECONDS = 300; // 5 minutes +const LEDGER_CLOSE_SECONDS = 5; +const DEFAULT_TERM_LEDGERS = 17280; // 1 day in ledgers +const NOTIFICATION_WINDOW_SECONDS = 24 * 60 * 60; // 24 hours + +function notificationCacheKey(loanId: number): string { + return `loan_due_notified:${loanId}`; +} + export async function runLoanDueCheck(): Promise { let lockAcquired = false; try { @@ -30,31 +38,53 @@ export async function runLoanDueCheck(): Promise { try { logger.info("Running loan due check cron..."); - // Find loans where a repayment is due in the next 24 hours - // This is a simplified query; in a real app, you'd check against a repayment schedule table const result = await query(` - SELECT le.loan_id, le.address, le.amount + SELECT le.loan_id, le.address, le.amount, + le.ledger_closed_at AS approved_at, + COALESCE(le.term_ledgers, ${DEFAULT_TERM_LEDGERS}) AS term_ledgers FROM contract_events le WHERE le.event_type = 'LoanApproved' AND NOT EXISTS ( - SELECT 1 FROM contract_events re + SELECT 1 FROM contract_events re WHERE re.loan_id = le.loan_id AND re.event_type = 'LoanRepaid' ) - AND le.ledger_closed_at < NOW() - INTERVAL '30 days' -- Simplified due logic + AND (le.ledger_closed_at + (COALESCE(le.term_ledgers, ${DEFAULT_TERM_LEDGERS}) * ${LEDGER_CLOSE_SECONDS} || ' seconds')::interval) <= NOW() + INTERVAL '24 hours' `); + let notifiedCount = 0; + for (const loan of result.rows) { - await notificationService.createNotification({ - userId: loan.address, - type: "repayment_due", - title: "Repayment Due Soon", - message: `Your repayment for loan #${loan.loan_id} of ${loan.amount} is due.`, - loanId: loan.loan_id, - }); + const cacheKey = notificationCacheKey(loan.loan_id); + const alreadyNotified = await cacheService.setNotExists( + cacheKey, + "1", + NOTIFICATION_WINDOW_SECONDS, + ); + + if (!alreadyNotified) { + continue; + } + + try { + await notificationService.createNotification({ + userId: loan.address, + type: "repayment_due", + title: "Repayment Due Soon", + message: `Your repayment for loan #${loan.loan_id} of ${loan.amount} is due.`, + loanId: loan.loan_id, + }); + notifiedCount++; + } catch (err) { + logger.error("Failed to send notification, clearing dedup key", { + loanId: loan.loan_id, + error: err, + }); + await cacheService.delete(cacheKey).catch(() => {}); + } } logger.info( - `Loan due check completed. Notified ${result.rows.length} borrowers.`, + `Loan due check completed. Notified ${notifiedCount} borrowers (${result.rows.length} due loans found).`, ); } catch (error) { logger.error("Error in loan due check cron", { error }); diff --git a/src/tests/loanCheckCron.test.ts b/src/tests/loanCheckCron.test.ts new file mode 100644 index 0000000..c8759d5 --- /dev/null +++ b/src/tests/loanCheckCron.test.ts @@ -0,0 +1,192 @@ +import { jest } from "@jest/globals"; + +jest.unstable_mockModule("../db/connection.js", () => ({ + query: jest.fn(), + getClient: jest.fn(), + default: { query: jest.fn() }, +})); + +jest.unstable_mockModule("../services/notificationService.js", () => ({ + notificationService: { + createNotification: jest.fn(), + }, +})); + +jest.unstable_mockModule("../services/cacheService.js", () => ({ + cacheService: { + setNotExists: jest.fn(), + delete: jest.fn(), + }, +})); + +jest.unstable_mockModule("../utils/logger.js", () => ({ + default: { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + }, +})); + +const { query } = await import("../db/connection.js"); +const { notificationService } = + await import("../services/notificationService.js"); +const { cacheService } = await import("../services/cacheService.js"); +const { runLoanDueCheck } = await import("../cron/loanCheckCron.js"); + +const mockedQuery = query as jest.MockedFunction; +const mockedSetNotExists = cacheService.setNotExists as jest.MockedFunction< + typeof cacheService.setNotExists +>; +const mockedDelete = cacheService.delete as jest.MockedFunction< + typeof cacheService.delete +>; +const mockedCreateNotification = + notificationService.createNotification as jest.MockedFunction< + typeof notificationService.createNotification + >; + +describe("loanCheckCron - runLoanDueCheck", () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("should skip if lock cannot be acquired", async () => { + mockedSetNotExists.mockResolvedValue(false); + + await runLoanDueCheck(); + + expect(mockedQuery).not.toHaveBeenCalled(); + expect(mockedCreateNotification).not.toHaveBeenCalled(); + }); + + it("should notify a borrower for a due loan", async () => { + // First call: acquire cron lock + // Second call: dedup guard for loan (returns true = key was set = not yet notified) + mockedSetNotExists.mockResolvedValueOnce(true).mockResolvedValueOnce(true); + + mockedQuery.mockResolvedValue({ + rows: [ + { + loan_id: 42, + address: "GBORROWER1", + amount: "1000", + approved_at: new Date().toISOString(), + term_ledgers: 17280, + }, + ], + rowCount: 1, + } as any); + + mockedCreateNotification.mockResolvedValue({} as any); + mockedDelete.mockResolvedValue(undefined as any); + + await runLoanDueCheck(); + + expect(mockedCreateNotification).toHaveBeenCalledTimes(1); + expect(mockedCreateNotification).toHaveBeenCalledWith({ + userId: "GBORROWER1", + type: "repayment_due", + title: "Repayment Due Soon", + message: "Your repayment for loan #42 of 1000 is due.", + loanId: 42, + }); + }); + + it("should not re-notify a borrower already notified within the window", async () => { + // First call: acquire cron lock (true) + // Second call: dedup guard (false = key already exists = already notified) + mockedSetNotExists.mockResolvedValueOnce(true).mockResolvedValueOnce(false); + + mockedQuery.mockResolvedValue({ + rows: [ + { + loan_id: 42, + address: "GBORROWER1", + amount: "1000", + approved_at: new Date().toISOString(), + term_ledgers: 17280, + }, + ], + rowCount: 1, + } as any); + + mockedDelete.mockResolvedValue(undefined as any); + + await runLoanDueCheck(); + + expect(mockedCreateNotification).not.toHaveBeenCalled(); + }); + + it("should handle multiple loans and only notify those not yet notified", async () => { + // Cron lock + 3 dedup guards: loan 1 not notified, loan 2 already notified, loan 3 not notified + mockedSetNotExists + .mockResolvedValueOnce(true) // cron lock + .mockResolvedValueOnce(true) // loan 1: new + .mockResolvedValueOnce(false) // loan 2: already notified + .mockResolvedValueOnce(true); // loan 3: new + + mockedQuery.mockResolvedValue({ + rows: [ + { + loan_id: 1, + address: "GA", + amount: "100", + approved_at: new Date().toISOString(), + term_ledgers: 17280, + }, + { + loan_id: 2, + address: "GB", + amount: "200", + approved_at: new Date().toISOString(), + term_ledgers: 17280, + }, + { + loan_id: 3, + address: "GC", + amount: "300", + approved_at: new Date().toISOString(), + term_ledgers: 17280, + }, + ], + rowCount: 3, + } as any); + + mockedCreateNotification.mockResolvedValue({} as any); + mockedDelete.mockResolvedValue(undefined as any); + + await runLoanDueCheck(); + + expect(mockedCreateNotification).toHaveBeenCalledTimes(2); + expect(mockedCreateNotification).toHaveBeenCalledWith( + expect.objectContaining({ loanId: 1 }), + ); + expect(mockedCreateNotification).toHaveBeenCalledWith( + expect.objectContaining({ loanId: 3 }), + ); + }); + + it("should delete dedup key when notification fails so it can be retried", async () => { + mockedSetNotExists.mockResolvedValueOnce(true).mockResolvedValueOnce(true); + + mockedQuery.mockResolvedValue({ + rows: [ + { + loan_id: 99, + address: "GFAIL", + amount: "500", + approved_at: new Date().toISOString(), + term_ledgers: 17280, + }, + ], + rowCount: 1, + } as any); + + mockedCreateNotification.mockRejectedValueOnce(new Error("send failed")); + mockedDelete.mockResolvedValue(undefined as any); + + await runLoanDueCheck(); + + expect(mockedDelete).toHaveBeenCalledWith("loan_due_notified:99"); + }); +});