Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/database/migrations/0001_create_idempotency_keys.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS idempotency_keys (
key VARCHAR(64) PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
endpoint VARCHAR(255) NOT NULL,
request_hash VARCHAR(64) NOT NULL,
response_status INTEGER,
response_body JSONB,
tx_hash VARCHAR(64),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_idempotency_expires ON idempotency_keys(expires_at);
CREATE INDEX IF NOT EXISTS idx_idempotency_user_endpoint ON idempotency_keys(user_id, endpoint);
25 changes: 25 additions & 0 deletions src/database/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,28 @@ export const credentials = pgTable("credentials", {
.notNull()
.defaultNow(),
});

// ─── Idempotency Keys ──────────────────────────────────────────────────────

export const idempotencyKeys = pgTable(
"idempotency_keys",
{
key: varchar("key", { length: 64 }).primaryKey(),
userId: uuid("user_id")
.notNull()
.references(() => users.id, { onDelete: "cascade" }),
endpoint: varchar("endpoint", { length: 255 }).notNull(),
requestHash: varchar("request_hash", { length: 64 }).notNull(),
responseStatus: integer("response_status"),
responseBody: jsonb("response_body"),
txHash: varchar("tx_hash", { length: 64 }),
createdAt: timestamp("created_at", { withTimezone: true })
.notNull()
.defaultNow(),
expiresAt: timestamp("expires_at", { withTimezone: true }).notNull(),
},
(table) => [
index("idx_idempotency_expires").on(table.expiresAt),
index("idx_idempotency_user_endpoint").on(table.userId, table.endpoint),
]
);
18 changes: 18 additions & 0 deletions src/jobs/cleanup-idempotency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { logger } from "../utils/logger.js";
import { cleanupExpiredIdempotencyKeys } from "../middleware/idempotency.js";

const CLEANUP_INTERVAL_MS = 24 * 60 * 60 * 1000;

export async function runIdempotencyCleanupOnce(): Promise<number> {
return cleanupExpiredIdempotencyKeys();
}

export function startIdempotencyCleanupJob(): () => void {
const timer = setInterval(() => {
cleanupExpiredIdempotencyKeys().catch((err) => {
logger.error({ err }, "Failed to clean up expired idempotency keys");
});
}, CLEANUP_INTERVAL_MS);

return () => clearInterval(timer);
}
229 changes: 229 additions & 0 deletions src/middleware/idempotency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import crypto from "node:crypto";
import { and, eq, lt } from "drizzle-orm";
import { db } from "../config/database.js";
import { idempotencyKeys } from "../database/schema.js";
import { ConflictError } from "../utils/errors.js";

const IDEMPOTENCY_TTL_MS = 24 * 60 * 60 * 1000;
const POLL_INTERVAL_MS = 250;
const MAX_WAIT_MS = 30_000;

export interface IdempotencyRecord {
key: string;
userId: string;
endpoint: string;
requestHash: string;
responseStatus: number | null;
responseBody: unknown | null;
txHash: string | null;
createdAt: Date;
expiresAt: Date;
}

export interface IdempotencyReservation {
state: "new" | "cached" | "resume";
record: IdempotencyRecord;
}

type RawIdempotencyRow = typeof idempotencyKeys.$inferSelect;

export function hashRequestBody(requestBody: unknown): string {
return crypto
.createHash("sha256")
.update(stableStringify(requestBody))
.digest("hex");
}

export async function reserveIdempotencyKey(options: {
key: string;
userId: string;
endpoint: string;
requestBody: unknown;
}): Promise<IdempotencyReservation> {
const key = options.key.trim();
const requestHash = hashRequestBody(options.requestBody);

while (true) {
const existing = await loadIdempotencyKey(key);
if (existing) {
if (existing.expiresAt.getTime() <= Date.now()) {
await deleteIdempotencyKey(key);
continue;
}

assertIdempotencyOwnership(existing, options.userId, options.endpoint);

if (existing.requestHash !== requestHash) {
throw new ConflictError(
"Idempotency key reused with different request body"
);
}

if (existing.responseStatus !== null && existing.responseBody !== null) {
return { state: "cached", record: existing };
}

if (existing.txHash !== null) {
return { state: "resume", record: existing };
}

const deadline = Date.now() + MAX_WAIT_MS;
while (Date.now() < deadline) {
await sleep(POLL_INTERVAL_MS);
const latest = await loadIdempotencyKey(key);
if (!latest) {
break;
}

assertIdempotencyOwnership(latest, options.userId, options.endpoint);

if (latest.requestHash !== requestHash) {
throw new ConflictError(
"Idempotency key reused with different request body"
);
}

if (latest.responseStatus !== null && latest.responseBody !== null) {
return { state: "cached", record: latest };
}

if (latest.txHash !== null) {
return { state: "resume", record: latest };
}
}

throw new ConflictError("Idempotency key is already being processed");
}

try {
await db.insert(idempotencyKeys).values({
key,
userId: options.userId,
endpoint: options.endpoint,
requestHash,
expiresAt: new Date(Date.now() + IDEMPOTENCY_TTL_MS),
});

const record = await loadIdempotencyKey(key);
if (!record) {
throw new Error("Failed to persist idempotency key");
}

return { state: "new", record };
} catch (err) {
if (isUniqueViolation(err)) {
continue;
}

throw err;
}
}
}

export async function storeIdempotencyTxHash(
key: string,
txHash: string
): Promise<void> {
await db
.update(idempotencyKeys)
.set({ txHash })
.where(eq(idempotencyKeys.key, key));
}

export async function storeIdempotentResponse(
key: string,
status: number,
body: unknown
): Promise<void> {
await db
.update(idempotencyKeys)
.set({
responseStatus: status,
responseBody: body,
})
.where(eq(idempotencyKeys.key, key));
}

export async function deleteIdempotencyKey(key: string): Promise<void> {
await db.delete(idempotencyKeys).where(eq(idempotencyKeys.key, key));
}

export async function cleanupExpiredIdempotencyKeys(): Promise<number> {
const removed = await db
.delete(idempotencyKeys)
.where(lt(idempotencyKeys.expiresAt, new Date()))
.returning({ key: idempotencyKeys.key });

return removed.length;
}

function assertIdempotencyOwnership(
record: IdempotencyRecord,
userId: string,
endpoint: string
): void {
if (record.userId !== userId || record.endpoint !== endpoint) {
throw new ConflictError(
"Idempotency key is already associated with a different request"
);
}
}

async function loadIdempotencyKey(
key: string
): Promise<IdempotencyRecord | null> {
const row = await db.query.idempotencyKeys.findFirst({
where: eq(idempotencyKeys.key, key),
});

return row ? mapRow(row) : null;
}

function mapRow(row: RawIdempotencyRow): IdempotencyRecord {
return {
key: row.key,
userId: row.userId,
endpoint: row.endpoint,
requestHash: row.requestHash,
responseStatus: row.responseStatus,
responseBody: row.responseBody,
txHash: row.txHash,
createdAt: row.createdAt,
expiresAt: row.expiresAt,
};
}

function stableStringify(value: unknown): string {
if (value === null) {
return "null";
}

if (typeof value !== "object") {
return JSON.stringify(value);
}

if (Array.isArray(value)) {
return `[${value.map((item) => stableStringify(item)).join(",")}]`;
}

const entries = Object.entries(value as Record<string, unknown>)
.sort(([left], [right]) => left.localeCompare(right))
.map(([entryKey, entryValue]) => {
return `${JSON.stringify(entryKey)}:${stableStringify(entryValue)}`;
});

return `{${entries.join(",")}}`;
}

function isUniqueViolation(err: unknown): boolean {
return (
typeof err === "object" &&
err !== null &&
"code" in err &&
(err as { code?: string }).code === "23505"
);
}

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
7 changes: 2 additions & 5 deletions src/modules/courses/course.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { eq, and, count, desc } from "drizzle-orm";
import { eq, and, count, desc, inArray } from "drizzle-orm";
import { db } from "../../config/database.js";
import { courses, enrollments } from "../../database/schema.js";
import { NotFoundError, ConflictError } from "../../utils/errors.js";
Expand Down Expand Up @@ -45,10 +45,7 @@ export class CourseService {
value: count(),
})
.from(enrollments)
.where(
// Using inArray would be cleaner, but we'll keep it simple
eq(enrollments.courseId, courseIds[0])
)
.where(inArray(enrollments.courseId, courseIds))
.groupBy(enrollments.courseId);

for (const c of counts) {
Expand Down
5 changes: 3 additions & 2 deletions src/modules/credentials/credential.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ export class CredentialController {
reply: FastifyReply
): Promise<void> {
const { authUser } = request as AuthenticatedRequest;
const { courseId, submissionId } = (request as any).validatedBody;
const { courseId, submissionId, idempotencyKey } = (request as any).validatedBody;
const result = await credentialService.mint(
authUser.id,
courseId,
submissionId
submissionId,
idempotencyKey
);

reply.status(201).send({ success: true, data: result });
Expand Down
Loading
Loading