diff --git a/apps/backend/src/app.ts b/apps/backend/src/app.ts index 1d6b6ac..4b0c723 100644 --- a/apps/backend/src/app.ts +++ b/apps/backend/src/app.ts @@ -13,6 +13,7 @@ import { usersRouter } from './routes/users.js'; import { treasuryRouter } from './routes/treasury.js'; import { filesRouter } from './routes/files.js'; import { pushRouter } from './routes/push.js'; +import { uploadsRouter } from './routes/uploads.js'; import { requireAuth, type AuthRequest } from './middleware/auth.js'; const packageJson = JSON.parse( @@ -55,6 +56,7 @@ app.use('/users', usersRouter); app.use('/treasury', treasuryRouter); app.use('/files', filesRouter); app.use('/push', pushRouter); +app.use('/uploads', uploadsRouter); app.get('/me', requireAuth, (req, res) => { res.json({ user: (req as AuthRequest).auth }); diff --git a/apps/backend/src/db/schema.ts b/apps/backend/src/db/schema.ts index 0ec572c..e45346f 100644 --- a/apps/backend/src/db/schema.ts +++ b/apps/backend/src/db/schema.ts @@ -53,6 +53,22 @@ export const contentTypeEnum = pgEnum('content_type', [ 'system', ]); +export const fileStatusEnum = pgEnum('file_status', ['pending', 'ready']); + +export const files = pgTable('files', { + id: uuid('id').primaryKey(), + conversationId: uuid('conversation_id') + .notNull() + .references(() => conversations.id, { onDelete: 'cascade' }), + uploaderId: uuid('uploader_id') + .notNull() + .references(() => users.id, { onDelete: 'cascade' }), + size: integer('size').notNull(), + sha256: text('sha256'), + status: fileStatusEnum('status').notNull().default('pending'), + createdAt: timestamp('created_at').notNull().defaultNow(), +}); + export const conversationMembers = pgTable('conversation_members', { id: uuid('id').primaryKey().defaultRandom(), conversationId: uuid('conversation_id') @@ -293,6 +309,15 @@ export const usersRelations = relations(users, ({ many }) => ({ messages: many(messages), transfers: many(tokenTransfers), devices: many(devices), + files: many(files), +})); + +export const filesRelations = relations(files, ({ one }) => ({ + conversation: one(conversations, { + fields: [files.conversationId], + references: [conversations.id], + }), + uploader: one(users, { fields: [files.uploaderId], references: [users.id] }), })); export const walletsRelations = relations(wallets, ({ one }) => ({ @@ -304,6 +329,7 @@ export const conversationsRelations = relations(conversations, ({ many }) => ({ messages: many(messages), transfers: many(tokenTransfers), treasuryProposals: many(treasuryProposals), + files: many(files), })); export const conversationMembersRelations = relations(conversationMembers, ({ one }) => ({ @@ -414,3 +440,5 @@ export type OneTimePreKey = typeof oneTimePreKeys.$inferSelect; export type NewOneTimePreKey = typeof oneTimePreKeys.$inferInsert; export type UserDevice = typeof userDevices.$inferSelect; export type NewUserDevice = typeof userDevices.$inferInsert; +export type FileRecord = typeof files.$inferSelect; +export type NewFileRecord = typeof files.$inferInsert; diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index d94893c..3793fff 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -33,6 +33,7 @@ import { runForever as runStellarListener, } from './services/stellarListener.js'; import { loadEnv } from './config.js'; +import { startGarbageCollection } from './services/garbageCollection.js'; dotenv.config(); @@ -236,6 +237,7 @@ async function attachRedisAdapter(): Promise { const PORT = process.env['PORT'] ?? 3001; httpServer.listen(PORT, () => { console.log(`Backend server running on port ${PORT}`); + startGarbageCollection(); }); // Attach the Redis adapter after listen() so the API is reachable even if diff --git a/apps/backend/src/routes/uploads.ts b/apps/backend/src/routes/uploads.ts new file mode 100644 index 0000000..72abc65 --- /dev/null +++ b/apps/backend/src/routes/uploads.ts @@ -0,0 +1,119 @@ +import { Router } from 'express'; +import type { IRouter } from 'express'; +import { eq, and } from 'drizzle-orm'; +import { db } from '../db/index.js'; +import { conversationMembers, files } from '../db/schema.js'; +import { requireAuth, type AuthRequest } from '../middleware/auth.js'; +import { S3Client, PutObjectCommand, HeadObjectCommand } from '@aws-sdk/client-s3'; +import { getSignedUrl } from '@aws-sdk/s3-request-presigner'; + +export const uploadsRouter: IRouter = Router(); +uploadsRouter.use(requireAuth); + +const s3 = new S3Client({ + region: process.env['AWS_REGION'] || 'us-east-1', +}); +const bucketName = process.env['AWS_BUCKET'] || 'clicked-files'; + +uploadsRouter.post('/', async (req: AuthRequest, res) => { + const userId = req.auth!.userId; + const { fileId, conversationId, size, sha256 } = req.body; + + if (!fileId || !conversationId || typeof size !== 'number') { + res.status(400).json({ error: 'fileId, conversationId, and size are required' }); + return; + } + + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); + + if (!membership) { + res.status(403).json({ error: 'Not authorized for this conversation' }); + return; + } + + // Insert pending file record + try { + await db.insert(files).values({ + id: fileId, + conversationId, + uploaderId: userId, + size, + sha256: sha256 || null, + status: 'pending', + }); + } catch (err) { + res.status(409).json({ error: 'File upload already initiated' }); + return; + } + + try { + const command = new PutObjectCommand({ + Bucket: bucketName, + Key: fileId, + }); + // Signed URL for uploading directly to S3 + const presignedUrl = await getSignedUrl(s3, command, { expiresIn: 900 }); + res.json({ url: presignedUrl }); + } catch { + res.status(500).json({ error: 'Failed to generate upload URL' }); + } +}); + +uploadsRouter.post('/:fileId/complete', async (req: AuthRequest, res) => { + const userId = req.auth!.userId; + const fileId = req.params['fileId'] as string; + + if (!fileId) { + res.status(400).json({ error: 'File id is required' }); + return; + } + + const file = await db.query.files.findFirst({ + where: eq(files.id, fileId), + }); + + if (!file) { + res.status(404).json({ error: 'File not found' }); + return; + } + + if (file.uploaderId !== userId) { + res.status(403).json({ error: 'Not authorized to complete this upload' }); + return; + } + + if (file.status === 'ready') { + res.json({ success: true, message: 'Already completed' }); + return; + } + + try { + const headCommand = new HeadObjectCommand({ + Bucket: bucketName, + Key: fileId, + }); + const headResponse = await s3.send(headCommand); + + if (headResponse.ContentLength !== file.size) { + res.status(400).json({ error: 'Size mismatch', details: 'File size in storage does not match declared size' }); + return; + } + + await db.update(files) + .set({ status: 'ready' }) + .where(eq(files.id, fileId)); + + res.json({ success: true, message: 'Upload confirmed' }); + } catch (err: any) { + if (err.name === 'NotFound') { + res.status(400).json({ error: 'File not found in storage' }); + } else { + res.status(500).json({ error: 'Failed to verify file upload' }); + } + } +}); diff --git a/apps/backend/src/services/garbageCollection.ts b/apps/backend/src/services/garbageCollection.ts new file mode 100644 index 0000000..252a261 --- /dev/null +++ b/apps/backend/src/services/garbageCollection.ts @@ -0,0 +1,54 @@ +import { lt, and, eq } from 'drizzle-orm'; +import { S3Client, DeleteObjectCommand } from '@aws-sdk/client-s3'; +import { db } from '../db/index.js'; +import { files } from '../db/schema.js'; + +const s3 = new S3Client({ + region: process.env['AWS_REGION'] || 'us-east-1', +}); +const bucketName = process.env['AWS_BUCKET'] || 'clicked-files'; + +let gcInterval: ReturnType | null = null; + +export async function runGarbageCollection() { + const ONE_DAY_AGO = new Date(Date.now() - 24 * 60 * 60 * 1000); + + try { + const oldPendingFiles = await db.query.files.findMany({ + where: and( + eq(files.status, 'pending'), + lt(files.createdAt, ONE_DAY_AGO) + ), + }); + + for (const file of oldPendingFiles) { + try { + await s3.send(new DeleteObjectCommand({ + Bucket: bucketName, + Key: file.id, + })); + } catch (err) { + console.error(`[GC] Failed to delete file ${file.id} from S3`, err); + } + + await db.delete(files).where(eq(files.id, file.id)); + } + } catch (err) { + console.error('[GC] Garbage collection failed', err); + } +} + +export function startGarbageCollection() { + if (gcInterval) return; + // Run every hour + gcInterval = setInterval(() => { + runGarbageCollection().catch(console.error); + }, 60 * 60 * 1000); +} + +export function stopGarbageCollection() { + if (gcInterval) { + clearInterval(gcInterval); + gcInterval = null; + } +} diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index 5426137..0488850 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -8,6 +8,7 @@ import { messages, messageEnvelopes, userDevices, + files, } from '../db/schema.js'; import type { AuthSocket } from '../middleware/socketAuth.js'; import { invalidateConversationCaches } from '../lib/conversationCache.js'; @@ -133,6 +134,18 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } + const isFile = contentType && contentType !== 'text/plain'; + if (isFile) { + const file = await db.query.files.findFirst({ + where: eq(files.id, messageId), + }); + + if (!file || file.status !== 'ready') { + socket.emit('error', { event: 'send_message', message: 'Referenced file is not ready or not found' }); + return; + } + } + const [message] = await db .insert(messages) .values({ @@ -174,10 +187,9 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void messageId, sequenceNumber: message.sequenceNumber, }); + await deliverMessage(io, message, conversationId); } - await deliverMessage(io, message, conversationId); - const members = await db.query.conversationMembers.findMany({ where: eq(conversationMembers.conversationId, conversationId), columns: { userId: true }, @@ -243,6 +255,20 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } + const effectiveContentType = contentType || original.contentType; + const isFile = effectiveContentType && effectiveContentType !== 'text/plain'; + + if (isFile) { + const file = await db.query.files.findFirst({ + where: eq(files.id, messageId), + }); + + if (!file || file.status !== 'ready') { + socket.emit('error', { event: 'edit_message', message: 'Referenced file is not ready or not found' }); + return; + } + } + const [message] = await db .insert(messages) .values({