Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { usersRouter } from './routes/users.js';
import { treasuryRouter } from './routes/treasury.js';
import { filesRouter } from './routes/files.js';
import { pushRouter } from './routes/push.js';
import { uploadsRouter } from './routes/uploads.js';
import { requireAuth, type AuthRequest } from './middleware/auth.js';

const packageJson = JSON.parse(
Expand Down Expand Up @@ -55,6 +56,7 @@ app.use('/users', usersRouter);
app.use('/treasury', treasuryRouter);
app.use('/files', filesRouter);
app.use('/push', pushRouter);
app.use('/uploads', uploadsRouter);

app.get('/me', requireAuth, (req, res) => {
res.json({ user: (req as AuthRequest).auth });
Expand Down
28 changes: 28 additions & 0 deletions apps/backend/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ export const contentTypeEnum = pgEnum('content_type', [
'system',
]);

export const fileStatusEnum = pgEnum('file_status', ['pending', 'ready']);

export const files = pgTable('files', {
id: uuid('id').primaryKey(),
conversationId: uuid('conversation_id')
.notNull()
.references(() => conversations.id, { onDelete: 'cascade' }),
uploaderId: uuid('uploader_id')
.notNull()
.references(() => users.id, { onDelete: 'cascade' }),
size: integer('size').notNull(),
sha256: text('sha256'),
status: fileStatusEnum('status').notNull().default('pending'),
createdAt: timestamp('created_at').notNull().defaultNow(),
});

export const conversationMembers = pgTable('conversation_members', {
id: uuid('id').primaryKey().defaultRandom(),
conversationId: uuid('conversation_id')
Expand Down Expand Up @@ -293,6 +309,15 @@ export const usersRelations = relations(users, ({ many }) => ({
messages: many(messages),
transfers: many(tokenTransfers),
devices: many(devices),
files: many(files),
}));

export const filesRelations = relations(files, ({ one }) => ({
conversation: one(conversations, {
fields: [files.conversationId],
references: [conversations.id],
}),
uploader: one(users, { fields: [files.uploaderId], references: [users.id] }),
}));

export const walletsRelations = relations(wallets, ({ one }) => ({
Expand All @@ -304,6 +329,7 @@ export const conversationsRelations = relations(conversations, ({ many }) => ({
messages: many(messages),
transfers: many(tokenTransfers),
treasuryProposals: many(treasuryProposals),
files: many(files),
}));

export const conversationMembersRelations = relations(conversationMembers, ({ one }) => ({
Expand Down Expand Up @@ -414,3 +440,5 @@ export type OneTimePreKey = typeof oneTimePreKeys.$inferSelect;
export type NewOneTimePreKey = typeof oneTimePreKeys.$inferInsert;
export type UserDevice = typeof userDevices.$inferSelect;
export type NewUserDevice = typeof userDevices.$inferInsert;
export type FileRecord = typeof files.$inferSelect;
export type NewFileRecord = typeof files.$inferInsert;
2 changes: 2 additions & 0 deletions apps/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
runForever as runStellarListener,
} from './services/stellarListener.js';
import { loadEnv } from './config.js';
import { startGarbageCollection } from './services/garbageCollection.js';

dotenv.config();

Expand Down Expand Up @@ -236,6 +237,7 @@ async function attachRedisAdapter(): Promise<void> {
const PORT = process.env['PORT'] ?? 3001;
httpServer.listen(PORT, () => {
console.log(`Backend server running on port ${PORT}`);
startGarbageCollection();
});

// Attach the Redis adapter after listen() so the API is reachable even if
Expand Down
119 changes: 119 additions & 0 deletions apps/backend/src/routes/uploads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { Router } from 'express';
import type { IRouter } from 'express';
import { eq, and } from 'drizzle-orm';
import { db } from '../db/index.js';
import { conversationMembers, files } from '../db/schema.js';
import { requireAuth, type AuthRequest } from '../middleware/auth.js';
import { S3Client, PutObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';

export const uploadsRouter: IRouter = Router();
uploadsRouter.use(requireAuth);

const s3 = new S3Client({
region: process.env['AWS_REGION'] || 'us-east-1',
});
const bucketName = process.env['AWS_BUCKET'] || 'clicked-files';

uploadsRouter.post('/', async (req: AuthRequest, res) => {
const userId = req.auth!.userId;
const { fileId, conversationId, size, sha256 } = req.body;

if (!fileId || !conversationId || typeof size !== 'number') {
res.status(400).json({ error: 'fileId, conversationId, and size are required' });
return;
}

const membership = await db.query.conversationMembers.findFirst({
where: and(
eq(conversationMembers.conversationId, conversationId),
eq(conversationMembers.userId, userId),
),
});

if (!membership) {
res.status(403).json({ error: 'Not authorized for this conversation' });
return;
}

// Insert pending file record
try {
await db.insert(files).values({
id: fileId,
conversationId,
uploaderId: userId,
size,
sha256: sha256 || null,
status: 'pending',
});
} catch (err) {

Check failure on line 49 in apps/backend/src/routes/uploads.ts

View workflow job for this annotation

GitHub Actions / build

'err' is defined but never used
res.status(409).json({ error: 'File upload already initiated' });
return;
}

try {
const command = new PutObjectCommand({
Bucket: bucketName,
Key: fileId,
});
// Signed URL for uploading directly to S3
const presignedUrl = await getSignedUrl(s3, command, { expiresIn: 900 });
res.json({ url: presignedUrl });
} catch {
res.status(500).json({ error: 'Failed to generate upload URL' });
}
});

uploadsRouter.post('/:fileId/complete', async (req: AuthRequest, res) => {
const userId = req.auth!.userId;
const fileId = req.params['fileId'] as string;

if (!fileId) {
res.status(400).json({ error: 'File id is required' });
return;
}

const file = await db.query.files.findFirst({
where: eq(files.id, fileId),
});

if (!file) {
res.status(404).json({ error: 'File not found' });
return;
}

if (file.uploaderId !== userId) {
res.status(403).json({ error: 'Not authorized to complete this upload' });
return;
}

if (file.status === 'ready') {
res.json({ success: true, message: 'Already completed' });
return;
}

try {
const headCommand = new HeadObjectCommand({
Bucket: bucketName,
Key: fileId,
});
const headResponse = await s3.send(headCommand);

if (headResponse.ContentLength !== file.size) {
res.status(400).json({ error: 'Size mismatch', details: 'File size in storage does not match declared size' });
return;
}

await db.update(files)
.set({ status: 'ready' })
.where(eq(files.id, fileId));

res.json({ success: true, message: 'Upload confirmed' });
} catch (err: any) {
if (err.name === 'NotFound') {
res.status(400).json({ error: 'File not found in storage' });
} else {
res.status(500).json({ error: 'Failed to verify file upload' });
}
}
});
54 changes: 54 additions & 0 deletions apps/backend/src/services/garbageCollection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { lt, and, eq } from 'drizzle-orm';
import { S3Client, DeleteObjectCommand } from '@aws-sdk/client-s3';
import { db } from '../db/index.js';
import { files } from '../db/schema.js';

const s3 = new S3Client({
region: process.env['AWS_REGION'] || 'us-east-1',
});
const bucketName = process.env['AWS_BUCKET'] || 'clicked-files';

let gcInterval: ReturnType<typeof setInterval> | null = null;

export async function runGarbageCollection() {
const ONE_DAY_AGO = new Date(Date.now() - 24 * 60 * 60 * 1000);

try {
const oldPendingFiles = await db.query.files.findMany({
where: and(
eq(files.status, 'pending'),
lt(files.createdAt, ONE_DAY_AGO)
),
});

for (const file of oldPendingFiles) {
try {
await s3.send(new DeleteObjectCommand({
Bucket: bucketName,
Key: file.id,
}));
} catch (err) {
console.error(`[GC] Failed to delete file ${file.id} from S3`, err);
}

await db.delete(files).where(eq(files.id, file.id));
}
} catch (err) {
console.error('[GC] Garbage collection failed', err);
}
}

export function startGarbageCollection() {
if (gcInterval) return;
// Run every hour
gcInterval = setInterval(() => {
runGarbageCollection().catch(console.error);
}, 60 * 60 * 1000);
}

export function stopGarbageCollection() {
if (gcInterval) {
clearInterval(gcInterval);
gcInterval = null;
}
}
30 changes: 28 additions & 2 deletions apps/backend/src/socket/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
messages,
messageEnvelopes,
userDevices,
files,
} from '../db/schema.js';
import type { AuthSocket } from '../middleware/socketAuth.js';
import { invalidateConversationCaches } from '../lib/conversationCache.js';
Expand Down Expand Up @@ -133,6 +134,18 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
return;
}

const isFile = contentType && contentType !== 'text/plain';
if (isFile) {
const file = await db.query.files.findFirst({
where: eq(files.id, messageId),
});

if (!file || file.status !== 'ready') {
socket.emit('error', { event: 'send_message', message: 'Referenced file is not ready or not found' });
return;
}
}

const [message] = await db
.insert(messages)
.values({
Expand Down Expand Up @@ -174,10 +187,9 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
messageId,
sequenceNumber: message.sequenceNumber,
});
await deliverMessage(io, message, conversationId);
}

await deliverMessage(io, message, conversationId);

const members = await db.query.conversationMembers.findMany({
where: eq(conversationMembers.conversationId, conversationId),
columns: { userId: true },
Expand Down Expand Up @@ -243,6 +255,20 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
return;
}

const effectiveContentType = contentType || original.contentType;
const isFile = effectiveContentType && effectiveContentType !== 'text/plain';

if (isFile) {
const file = await db.query.files.findFirst({
where: eq(files.id, messageId),
});

if (!file || file.status !== 'ready') {
socket.emit('error', { event: 'edit_message', message: 'Referenced file is not ready or not found' });
return;
}
}

const [message] = await db
.insert(messages)
.values({
Expand Down
Loading