Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions apps/backend/src/__tests__/fileCleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ vi.mock('@aws-sdk/client-s3', () => ({
const mockFindMany = vi.fn();
const mockUpdate = vi.fn();
const mockExecute = vi.fn();
const mockDelete = vi.fn();

vi.mock('../db/index.js', () => ({
db: {
query: { files: { findMany: mockFindMany } },
update: mockUpdate,
execute: mockExecute,
delete: mockDelete,
},
}));

Expand All @@ -35,6 +37,9 @@ vi.mock('../db/schema.js', () => ({
vi.mock('drizzle-orm', () => ({
isNotNull: vi.fn((col: unknown) => ({ col, isNotNull: true })),
isNull: vi.fn((col: unknown) => ({ col, isNull: true })),
and: vi.fn(),
eq: vi.fn(),
lt: vi.fn(),
sql: Object.assign(
vi.fn((strings: TemplateStringsArray, ...vals: unknown[]) => ({ strings, vals })),
{ raw: vi.fn() },
Expand All @@ -53,6 +58,7 @@ beforeEach(() => {
mockS3Send.mockResolvedValue(undefined);
mockUpdate.mockReturnValue({ set: mockSetFn });
mockSetFn.mockReturnValue({ where: mockWhereFn });
mockDelete.mockReturnValue({ where: mockWhereFn });
});

const { softDeleteFile, runHardDeletePass } = await import('../services/fileCleanup.js');
Expand All @@ -67,7 +73,8 @@ describe('#231 – softDeleteFile', () => {

describe('#231 – runHardDeletePass', () => {
it('skips files that still have live message references', async () => {
mockFindMany.mockResolvedValue([{ id: 'file-1', storageKey: 'key-1' }]);
mockFindMany.mockResolvedValueOnce([{ id: 'file-1', storageKey: 'key-1' }]) // first pass (candidates)
.mockResolvedValueOnce([]); // second pass (pendingCandidates)
mockExecute.mockResolvedValueOnce([{ '?column?': 1 }]); // live ref exists

await runHardDeletePass();
Expand All @@ -77,7 +84,8 @@ describe('#231 – runHardDeletePass', () => {
});

it('hard-deletes from S3 and marks hardDeletedAt when no live refs', async () => {
mockFindMany.mockResolvedValue([{ id: 'file-2', storageKey: 'key-2' }]);
mockFindMany.mockResolvedValueOnce([{ id: 'file-2', storageKey: 'key-2' }])
.mockResolvedValueOnce([]);
mockExecute.mockResolvedValueOnce([]); // no live refs

await runHardDeletePass();
Expand All @@ -87,7 +95,8 @@ describe('#231 – runHardDeletePass', () => {
});

it('does not mark hardDeletedAt when S3 delete throws (safe retry)', async () => {
mockFindMany.mockResolvedValue([{ id: 'file-3', storageKey: 'key-3' }]);
mockFindMany.mockResolvedValueOnce([{ id: 'file-3', storageKey: 'key-3' }])
.mockResolvedValueOnce([]);
mockExecute.mockResolvedValueOnce([]);
mockS3Send.mockRejectedValueOnce(new Error('NoSuchKey'));

Expand All @@ -97,14 +106,26 @@ describe('#231 – runHardDeletePass', () => {
});

it('processes multiple files in one pass', async () => {
mockFindMany.mockResolvedValue([
mockFindMany.mockResolvedValueOnce([
{ id: 'file-a', storageKey: 'key-a' },
{ id: 'file-b', storageKey: 'key-b' },
]);
]).mockResolvedValueOnce([]);
mockExecute.mockResolvedValue([]); // no live refs for either

await runHardDeletePass();

expect(mockS3Send).toHaveBeenCalledTimes(2);
});

it('deletes pending files older than 24 hours', async () => {
mockFindMany.mockResolvedValueOnce([]) // no soft-deleted candidates
.mockResolvedValueOnce([
{ id: 'pending-1', storageKey: 'pending-key-1' },
]);

await runHardDeletePass();

expect(mockS3Send).toHaveBeenCalledTimes(1);
expect(mockDelete).toHaveBeenCalled();
});
});
2 changes: 2 additions & 0 deletions apps/backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { messagesRouter } from './routes/messages.js';
import { usersRouter } from './routes/users.js';
import { treasuryRouter } from './routes/treasury.js';
import { filesRouter } from './routes/files.js';
import { uploadsRouter } from './routes/uploads.js';
import { pushRouter } from './routes/push.js';
import { syncRouter } from './routes/sync.js';
import { requireAuth, type AuthRequest } from './middleware/auth.js';
Expand Down Expand Up @@ -55,6 +56,7 @@ app.use('/messages', messagesRouter);
app.use('/users', usersRouter);
app.use('/treasury', treasuryRouter);
app.use('/files', filesRouter);
app.use('/uploads', uploadsRouter);
app.use('/push', pushRouter);
app.use('/sync', syncRouter);

Expand Down
5 changes: 5 additions & 0 deletions apps/backend/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@ export const contentTypeEnum = pgEnum('content_type', [
// Tracks S3 storage objects for file-type messages. Soft-deleted when all
// referencing messages are retracted; hard-deleted by the background cleanup job.

export const fileStatusEnum = pgEnum('file_status', ['pending', 'ready']);

export const files = pgTable('files', {
id: uuid('id').primaryKey().defaultRandom(),
storageKey: text('storage_key').notNull().unique(),
status: fileStatusEnum('status').notNull().default('pending'),
size: integer('size'),
sha256: text('sha256'),
deletedAt: timestamp('deleted_at'),
hardDeletedAt: timestamp('hard_deleted_at'),
createdAt: timestamp('created_at').notNull().defaultNow(),
Expand Down
17 changes: 13 additions & 4 deletions apps/backend/src/routes/files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Router } from 'express';
import type { IRouter } from 'express';
import { eq, and } from 'drizzle-orm';
import { db } from '../db/index.js';
import { messages, conversationMembers } from '../db/schema.js';
import { messages, conversationMembers, files } from '../db/schema.js';
import { requireAuth, type AuthRequest } from '../middleware/auth.js';
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
Expand All @@ -24,12 +24,21 @@ filesRouter.get('/:fileId', async (req: AuthRequest, res) => {
return;
}

// Find the message that references this file
// Find a message that references this file
const message = await db.query.messages.findFirst({
where: eq(messages.id, fileId),
where: eq(messages.fileId, fileId),
});

if (!message) {
res.status(404).json({ error: 'File not referenced by any message' });
return;
}

const file = await db.query.files.findFirst({
where: eq(files.id, fileId),
});

if (!file) {
res.status(404).json({ error: 'File not found' });
return;
}
Expand All @@ -50,7 +59,7 @@ filesRouter.get('/:fileId', async (req: AuthRequest, res) => {
try {
const command = new GetObjectCommand({
Bucket: bucketName,
Key: fileId,
Key: file.storageKey,
});
// Short-lived URL: 5 minutes
const presignedUrl = await getSignedUrl(s3, command, { expiresIn: 300 });
Expand Down
13 changes: 12 additions & 1 deletion apps/backend/src/routes/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Router } from 'express';
import type { IRouter } from 'express';
import { and, eq, inArray } from 'drizzle-orm';
import { db } from '../db/index.js';
import { conversationMembers, messages, messageEnvelopes, userDevices } from '../db/schema.js';
import { conversationMembers, messages, messageEnvelopes, userDevices, files } from '../db/schema.js';
import { softDeleteFile } from '../services/fileCleanup.js';
import { requireAuth, type AuthRequest } from '../middleware/auth.js';
import { validate } from '../middleware/validate.js';
Expand Down Expand Up @@ -61,6 +61,16 @@ messagesRouter.post('/', validate(SendMessageSchema), async (req: AuthRequest, r
return;
}

if (fileId) {
const fileRecord = await db.query.files.findFirst({
where: eq(files.id, fileId),
});
if (!fileRecord || fileRecord.status !== 'ready') {
res.status(400).json({ error: 'File is not ready or does not exist' });
return;
}
}

// ── persist ────────────────────────────────────────────────────────────────
const [message] = await db
.insert(messages)
Expand All @@ -71,6 +81,7 @@ messagesRouter.post('/', validate(SendMessageSchema), async (req: AuthRequest, r
senderDeviceId: deviceId ?? null,
contentType: contentType?.trim().toLowerCase() || 'text',
ciphertext: ciphertext || null,
fileId: fileId || null,
})
.returning();

Expand Down
108 changes: 108 additions & 0 deletions apps/backend/src/routes/uploads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { Router } from 'express';
import type { IRouter } from 'express';
import { eq } from 'drizzle-orm';
import { randomUUID } from 'crypto';
import { db } from '../db/index.js';
import { files } from '../db/schema.js';
import { requireAuth, type AuthRequest } from '../middleware/auth.js';
import { S3Client, PutObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';

export const uploadsRouter: IRouter = Router();
uploadsRouter.use(requireAuth);

const s3 = new S3Client({
region: process.env['AWS_REGION'] || 'us-east-1',
});
const bucketName = process.env['AWS_BUCKET'] || 'clicked-files';

uploadsRouter.post('/', async (_req: AuthRequest, res) => {
const fileId = randomUUID();
try {
await db.insert(files).values({
id: fileId,
storageKey: fileId,
status: 'pending',
});

const command = new PutObjectCommand({
Bucket: bucketName,
Key: fileId,
});
// Short-lived URL: 15 minutes
const presignedUrl = await getSignedUrl(s3, command, { expiresIn: 900 });

res.status(201).json({ fileId, uploadUrl: presignedUrl });
} catch (error) {
res.status(500).json({ error: 'Failed to initiate upload' });
}
});

uploadsRouter.post('/:fileId/confirm', async (req: AuthRequest, res) => {
const fileId = req.params['fileId'] as string;
const { size, sha256 } = req.body as { size?: number; sha256?: string };

if (!fileId) {
res.status(400).json({ error: 'File id is required' });
return;
}
if (size === undefined || typeof size !== 'number') {
res.status(400).json({ error: 'Size is required and must be a number' });
return;
}

const file = await db.query.files.findFirst({
where: eq(files.id, fileId),
});

if (!file) {
res.status(404).json({ error: 'File not found' });
return;
}

if (file.status === 'ready') {
res.status(200).json({ message: 'File is already ready' });
return;
}

try {
const headCommand = new HeadObjectCommand({
Bucket: bucketName,
Key: file.storageKey,
});
const headOutput = await s3.send(headCommand);

if (headOutput.ContentLength !== size) {
res.status(400).json({ error: 'Size mismatch' });
return;
}

if (sha256) {
if (headOutput.ChecksumSHA256 && headOutput.ChecksumSHA256 !== sha256) {
res.status(400).json({ error: 'Hash mismatch' });
return;
}
if (
headOutput.Metadata &&
headOutput.Metadata['sha256'] &&
headOutput.Metadata['sha256'] !== sha256
) {
res.status(400).json({ error: 'Hash mismatch' });
return;
}
}

await db
.update(files)
.set({ status: 'ready', size, sha256: sha256 || null })
.where(eq(files.id, fileId));

res.status(200).json({ message: 'File confirmed' });
} catch (error: any) {
if (error.name === 'NotFound' || error.$metadata?.httpStatusCode === 404) {
res.status(400).json({ error: 'File not found in storage. Ensure upload completed.' });
return;
}
res.status(500).json({ error: 'Failed to confirm upload' });
}
});
19 changes: 18 additions & 1 deletion apps/backend/src/services/fileCleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* delete, so a crash between steps is safe to retry.
*/
import { S3Client, DeleteObjectCommand } from '@aws-sdk/client-s3';
import { isNotNull, isNull, sql } from 'drizzle-orm';
import { isNotNull, isNull, sql, and, eq, lt } from 'drizzle-orm';
import { db } from '../db/index.js';
import { files } from '../db/schema.js';
import { reenableExpiredBackoffs } from './pushNotification.js';
Expand Down Expand Up @@ -67,6 +67,23 @@ export async function runHardDeletePass(): Promise<void> {
console.error(`[file-cleanup] failed to delete ${file.storageKey}:`, err);
}
}

// Garbage-collect unconfirmed pending files older than 24 hours
const stalePendingDate = new Date(Date.now() - 24 * 60 * 60 * 1000);
const pendingCandidates = await db.query.files.findMany({
where: (f) => and(eq(f.status, 'pending'), lt(f.createdAt, stalePendingDate)),
columns: { id: true, storageKey: true },
});

for (const file of pendingCandidates) {
try {
await s3.send(new DeleteObjectCommand({ Bucket: BUCKET, Key: file.storageKey }));
await db.delete(files).where(eq(files.id, file.id));
console.log(`[file-cleanup] deleted pending file s3://${BUCKET}/${file.storageKey}`);
} catch (err) {
console.error(`[file-cleanup] failed to delete pending file ${file.storageKey}:`, err);
}
}
}

let cleanupTimer: ReturnType<typeof setInterval> | null = null;
Expand Down
20 changes: 11 additions & 9 deletions apps/backend/src/socket/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,14 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void

// ── send_message ───────────────────────────────────────────────────────────
dispatcher.register('send_message', async (payload) => {
const { conversationId, messageId, content, contentType, ciphertext, envelopes } = payload as {
const { conversationId, messageId, content, contentType, ciphertext, envelopes, fileId } = payload as {
conversationId: string;
messageId?: string;
content?: string;
contentType?: string;
ciphertext?: string;
envelopes?: Array<{ recipientDeviceId: string; ciphertext: string }>;
fileId?: string;
};
const deviceId = socket.auth!.deviceId;

Expand Down Expand Up @@ -136,15 +137,16 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
return;
}

let fileId: string | undefined;
const resolvedContentType = contentType || 'text/plain';
if (FILE_CONTENT_TYPES.has(resolvedContentType)) {
const [fileRow] = await db
.insert(files)
.values({ storageKey: messageId })
.onConflictDoUpdate({ target: files.storageKey, set: { storageKey: messageId } })
.returning({ id: files.id });
fileId = fileRow?.id;

if (fileId) {
const fileRecord = await db.query.files.findFirst({
where: eq(files.id, fileId),
});
if (!fileRecord || fileRecord.status !== 'ready') {
socket.emit('error', { event: 'send_message', message: 'File is not ready or does not exist' });
return;
}
}

const [message] = await db
Expand Down