Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion migrations/1772000000000_webhook-subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ export const up = (pgm) => {
pgm.createTable("webhook_subscriptions", {
id: "id",
callback_url: { type: "text", notNull: true },
event_types: { type: "jsonb", notNull: true, default: "[]::jsonb" },
event_types: {
type: "jsonb",
notNull: true,
default: pgm.func("'[]'::jsonb"),
},
secret: { type: "varchar(255)" },
is_active: { type: "boolean", notNull: true, default: true },
created_at: {
Expand Down
4 changes: 2 additions & 2 deletions migrations/1776000000006_add-interest-rate-to-loan-events.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
exports.up = (pgm) => {
export const up = (pgm) => {
pgm.addColumns("loan_events", {
interest_rate_bps: { type: "integer", default: null },
term_ledgers: { type: "integer", default: null },
Expand All @@ -8,6 +8,6 @@ exports.up = (pgm) => {
// but for now we'll just track the rate per-loan event.
};

exports.down = (pgm) => {
export const down = (pgm) => {
pgm.dropColumns("loan_events", ["interest_rate_bps", "term_ledgers"]);
};
16 changes: 14 additions & 2 deletions migrations/1778000000009_transaction-submissions.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* @param { import("node-pg-migrate").MigrationBuilder } @param pgm {import("node-pg-migrate").MigrationBuilder}
*/
exports.up = (pgm) => {
export const up = (pgm) => {
pgm.createTable("transaction_submissions", {
id: {
type: "serial",
Expand Down Expand Up @@ -52,6 +52,18 @@ exports.up = (pgm) => {
pgm.createIndex("transaction_submissions", ["status"]);
pgm.createIndex("transaction_submissions", ["transaction_type"]);

// Ensure the shared updated_at trigger function exists (not created by any
// earlier migration), otherwise the trigger below cannot be created.
pgm.sql(`
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS trigger AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
`);

// Trigger to update updated_at timestamp
pgm.createTrigger("transaction_submissions", "update_updated_at", {
when: "BEFORE",
Expand All @@ -63,6 +75,6 @@ exports.up = (pgm) => {
/**
* @param { import("node-pg-migrate").MigrationBuilder } @param pgm {import("node-pg-migrate").MigrationBuilder}
*/
exports.down = (pgm) => {
export const down = (pgm) => {
pgm.dropTable("transaction_submissions");
};
34 changes: 22 additions & 12 deletions migrations/1781000000011_webhook-retry-logic.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,31 @@ export const shorthands = undefined;
* @returns {Promise<void> | void}
*/
export const up = (pgm) => {
// Add payload column to webhook_deliveries table
pgm.addColumn("webhook_deliveries", {
payload: {
type: "jsonb",
notNull: false,
// Add payload column to webhook_deliveries table.
// 1772 already creates this column, so guard against re-adding it.
pgm.addColumn(
"webhook_deliveries",
{
payload: {
type: "jsonb",
notNull: false,
},
},
});
{ ifNotExists: true },
);

// Add next_retry_at column to track when to retry
pgm.addColumn("webhook_deliveries", {
next_retry_at: {
type: "timestamp",
notNull: false,
// Add next_retry_at column to track when to retry.
// 1772 already creates this column, so guard against re-adding it.
pgm.addColumn(
"webhook_deliveries",
{
next_retry_at: {
type: "timestamp",
notNull: false,
},
},
});
{ ifNotExists: true },
);

// Add index for efficient retry polling
pgm.createIndex("webhook_deliveries", ["next_retry_at"], {
Expand Down
71 changes: 36 additions & 35 deletions migrations/1784000000014_add-loan-disputes.js
Original file line number Diff line number Diff line change
@@ -1,41 +1,42 @@
// Migration: Add loan_disputes table and support for disputed loan status

module.exports = {
async up(db) {
// 1. Create loan_disputes table
await db.query(`
CREATE TABLE IF NOT EXISTS loan_disputes (
id SERIAL PRIMARY KEY,
loan_id INTEGER NOT NULL REFERENCES loan_events(loan_id),
borrower TEXT NOT NULL,
reason TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open', -- open, resolved, rejected
admin_note TEXT,
resolution TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
resolved_at TIMESTAMP WITH TIME ZONE
);
`);
export const up = (pgm) => {
// 1. Create loan_disputes table
pgm.sql(`
CREATE TABLE IF NOT EXISTS loan_disputes (
id SERIAL PRIMARY KEY,
-- loan_id is not a FK: loan_events is an append-only event table whose
-- loan_id is non-unique (and later becomes a view), so it cannot be a
-- foreign key target.
loan_id INTEGER NOT NULL,
borrower TEXT NOT NULL,
reason TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open', -- open, resolved, rejected
admin_note TEXT,
resolution TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
resolved_at TIMESTAMP WITH TIME ZONE
);
`);

// 2. Add indexes for efficient querying
await db.query(`
CREATE INDEX IF NOT EXISTS idx_loan_disputes_status ON loan_disputes(status);
`);
await db.query(`
CREATE INDEX IF NOT EXISTS idx_loan_disputes_borrower ON loan_disputes(borrower);
`);
await db.query(`
CREATE INDEX IF NOT EXISTS idx_loan_disputes_loan_id ON loan_disputes(loan_id);
`);
// 2. Add indexes for efficient querying
pgm.sql(`
CREATE INDEX IF NOT EXISTS idx_loan_disputes_status ON loan_disputes(status);
`);
pgm.sql(`
CREATE INDEX IF NOT EXISTS idx_loan_disputes_borrower ON loan_disputes(borrower);
`);
pgm.sql(`
CREATE INDEX IF NOT EXISTS idx_loan_disputes_loan_id ON loan_disputes(loan_id);
`);

// 3. Add disputed status to loan_events (if using status enum, update it)
// If status is a string, no migration needed. If enum, alter type here.
// Example for enum:
// await db.query(`ALTER TYPE loan_status_enum ADD VALUE IF NOT EXISTS 'disputed';`);
},
// 3. Add disputed status to loan_events (if using status enum, update it)
// If status is a string, no migration needed. If enum, alter type here.
// Example for enum:
// pgm.sql(`ALTER TYPE loan_status_enum ADD VALUE IF NOT EXISTS 'disputed';`);
};

async down(db) {
await db.query(`DROP TABLE IF EXISTS loan_disputes;`);
// No need to remove enum value (Postgres doesn't support removing enum values easily)
},
export const down = (pgm) => {
pgm.sql(`DROP TABLE IF EXISTS loan_disputes;`);
// No need to remove enum value (Postgres doesn't support removing enum values easily)
};
15 changes: 10 additions & 5 deletions migrations/1787000000017_user-notification-preferences.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ export const shorthands = undefined;
* @returns {Promise<void> | void}
*/
export const up = (pgm) => {
pgm.addColumns("user_profiles", {
email_enabled: { type: "boolean", notNull: true, default: false },
sms_enabled: { type: "boolean", notNull: true, default: false },
phone: { type: "varchar(20)" },
});
// 1773 already adds these notification columns, so guard against re-adding.
pgm.addColumns(
"user_profiles",
{
email_enabled: { type: "boolean", notNull: true, default: false },
sms_enabled: { type: "boolean", notNull: true, default: false },
phone: { type: "varchar(20)" },
},
{ ifNotExists: true },
);
};

/**
Expand Down
69 changes: 17 additions & 52 deletions migrations/1788000000019_unified-contract-events.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,15 @@ export const up = (pgm) => {
// 3. Make address nullable (for events like YieldDistributed that may not have a user address)
pgm.alterColumn("contract_events", "address", { notNull: false });

// 4. Rename indexes to match the new table and column names
pgm.renameIndex(
"contract_events",
"idx_loan_events_borrower_event_type",
"idx_contract_events_address_event_type",
);
pgm.renameIndex(
"contract_events",
"idx_loan_events_loan_id_event_type",
"idx_contract_events_loan_id_event_type",
);
pgm.renameIndex(
"contract_events",
"idx_loan_events_event_type_loan_id",
"idx_contract_events_event_type_loan_id",
);
pgm.renameIndex(
"contract_events",
"idx_loan_events_ledger",
"idx_contract_events_ledger",
);
pgm.renameIndex(
"contract_events",
"idx_loan_events_pool_deposits_withdraws",
"idx_contract_events_pool_deposits_withdraws",
);
// 4. Rename indexes to match the new table and column names.
// node-pg-migrate has no renameIndex helper, so use raw ALTER INDEX.
pgm.sql(`
ALTER INDEX IF EXISTS idx_loan_events_borrower_event_type RENAME TO idx_contract_events_address_event_type;
ALTER INDEX IF EXISTS idx_loan_events_loan_id_event_type RENAME TO idx_contract_events_loan_id_event_type;
ALTER INDEX IF EXISTS idx_loan_events_event_type_loan_id RENAME TO idx_contract_events_event_type_loan_id;
ALTER INDEX IF EXISTS idx_loan_events_ledger RENAME TO idx_contract_events_ledger;
ALTER INDEX IF EXISTS idx_loan_events_pool_deposits_withdraws RENAME TO idx_contract_events_pool_deposits_withdraws;
`);

// Rename single-column indexes from initial schema (if they exist)
pgm.sql(`
Expand Down Expand Up @@ -81,32 +64,14 @@ export const down = (pgm) => {

pgm.renameTable("contract_events", "loan_events");

// Revert index names
pgm.renameIndex(
"loan_events",
"idx_contract_events_address_event_type",
"idx_loan_events_borrower_event_type",
);
pgm.renameIndex(
"loan_events",
"idx_contract_events_loan_id_event_type",
"idx_loan_events_loan_id_event_type",
);
pgm.renameIndex(
"loan_events",
"idx_contract_events_event_type_loan_id",
"idx_loan_events_event_type_loan_id",
);
pgm.renameIndex(
"loan_events",
"idx_contract_events_ledger",
"idx_loan_events_ledger",
);
pgm.renameIndex(
"loan_events",
"idx_contract_events_pool_deposits_withdraws",
"idx_loan_events_pool_deposits_withdraws",
);
// Revert index names (raw ALTER INDEX; no renameIndex helper exists)
pgm.sql(`
ALTER INDEX IF EXISTS idx_contract_events_address_event_type RENAME TO idx_loan_events_borrower_event_type;
ALTER INDEX IF EXISTS idx_contract_events_loan_id_event_type RENAME TO idx_loan_events_loan_id_event_type;
ALTER INDEX IF EXISTS idx_contract_events_event_type_loan_id RENAME TO idx_loan_events_event_type_loan_id;
ALTER INDEX IF EXISTS idx_contract_events_ledger RENAME TO idx_loan_events_ledger;
ALTER INDEX IF EXISTS idx_contract_events_pool_deposits_withdraws RENAME TO idx_loan_events_pool_deposits_withdraws;
`);

pgm.sql(`
ALTER INDEX IF EXISTS contract_events_event_type_index RENAME TO loan_events_event_type_index;
Expand Down
6 changes: 4 additions & 2 deletions migrations/1789000000000_ensure-core-tables.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ export const up = (pgm) => {
END $$;
`);

// Ensure loan_events table matches requested schema
// Ensure loan_events relation exists. Use to_regclass (not pg_tables) so the
// backward-compat loan_events VIEW created in 1788 also counts as existing;
// otherwise this would try to CREATE TABLE over the view and fail.
pgm.sql(`
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'loan_events') THEN
IF to_regclass('public.loan_events') IS NULL THEN
CREATE TABLE loan_events (
id SERIAL PRIMARY KEY,
loan_id INTEGER,
Expand Down
56 changes: 43 additions & 13 deletions src/cron/loanCheckCron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import { cacheService } from "../services/cacheService.js";
const LOCK_KEY = "loan_due_check_cron:running";
const LOCK_TTL_SECONDS = 300; // 5 minutes

const LEDGER_CLOSE_SECONDS = 5;
const DEFAULT_TERM_LEDGERS = 17280; // 1 day in ledgers
const NOTIFICATION_WINDOW_SECONDS = 24 * 60 * 60; // 24 hours

function notificationCacheKey(loanId: number): string {
return `loan_due_notified:${loanId}`;
}

export async function runLoanDueCheck(): Promise<void> {
let lockAcquired = false;
try {
Expand All @@ -30,31 +38,53 @@ export async function runLoanDueCheck(): Promise<void> {
try {
logger.info("Running loan due check cron...");

// Find loans where a repayment is due in the next 24 hours
// This is a simplified query; in a real app, you'd check against a repayment schedule table
const result = await query(`
SELECT le.loan_id, le.address, le.amount
SELECT le.loan_id, le.address, le.amount,
le.ledger_closed_at AS approved_at,
COALESCE(le.term_ledgers, ${DEFAULT_TERM_LEDGERS}) AS term_ledgers
FROM contract_events le
WHERE le.event_type = 'LoanApproved'
AND NOT EXISTS (
SELECT 1 FROM contract_events re
SELECT 1 FROM contract_events re
WHERE re.loan_id = le.loan_id AND re.event_type = 'LoanRepaid'
)
AND le.ledger_closed_at < NOW() - INTERVAL '30 days' -- Simplified due logic
AND (le.ledger_closed_at + (COALESCE(le.term_ledgers, ${DEFAULT_TERM_LEDGERS}) * ${LEDGER_CLOSE_SECONDS} || ' seconds')::interval) <= NOW() + INTERVAL '24 hours'
`);

let notifiedCount = 0;

for (const loan of result.rows) {
await notificationService.createNotification({
userId: loan.address,
type: "repayment_due",
title: "Repayment Due Soon",
message: `Your repayment for loan #${loan.loan_id} of ${loan.amount} is due.`,
loanId: loan.loan_id,
});
const cacheKey = notificationCacheKey(loan.loan_id);
const alreadyNotified = await cacheService.setNotExists(
cacheKey,
"1",
NOTIFICATION_WINDOW_SECONDS,
);

if (!alreadyNotified) {
continue;
}

try {
await notificationService.createNotification({
userId: loan.address,
type: "repayment_due",
title: "Repayment Due Soon",
message: `Your repayment for loan #${loan.loan_id} of ${loan.amount} is due.`,
loanId: loan.loan_id,
});
notifiedCount++;
} catch (err) {
logger.error("Failed to send notification, clearing dedup key", {
loanId: loan.loan_id,
error: err,
});
await cacheService.delete(cacheKey).catch(() => {});
}
}

logger.info(
`Loan due check completed. Notified ${result.rows.length} borrowers.`,
`Loan due check completed. Notified ${notifiedCount} borrowers (${result.rows.length} due loans found).`,
);
} catch (error) {
logger.error("Error in loan due check cron", { error });
Expand Down
Loading