From e985d606f03c825170397acd8087bb639720d52e Mon Sep 17 00:00:00 2001 From: Zita Szupera Date: Mon, 16 Mar 2026 13:59:57 +0100 Subject: [PATCH] feat: global upload manager --- src/client.ts | 7 + src/index.ts | 1 + src/messageComposer/attachmentManager.ts | 114 ++++---- src/uploadManager.ts | 163 +++++++++++ .../MessageComposer/attachmentManager.test.ts | 220 +++++++++++++- .../MessageComposer/uploadManager.test.ts | 272 ++++++++++++++++++ test/unit/client.test.js | 20 ++ 7 files changed, 729 insertions(+), 68 deletions(-) create mode 100644 src/uploadManager.ts create mode 100644 test/unit/MessageComposer/uploadManager.test.ts diff --git a/src/client.ts b/src/client.ts index b704b4191..ef53de2c5 100644 --- a/src/client.ts +++ b/src/client.ts @@ -9,6 +9,7 @@ import type WebSocket from 'isomorphic-ws'; import { Channel } from './channel'; import { ClientState } from './client_state'; import { StableWSConnection } from './connection'; +import { UploadManager } from './uploadManager'; import { CheckSignature, DevToken, JWTUserToken } from './signing'; import { TokenManager } from './token_manager'; import { WSConnectionFallback } from './connection_fallback'; @@ -296,6 +297,10 @@ export type MessageComposerSetupState = { export class StreamChat { private static _instance?: unknown | StreamChat; // type is undefined|StreamChat, unknown is due to TS limitations with statics messageDeliveryReporter: MessageDeliveryReporter; + /** + * @internal + */ + uploadManager: UploadManager; _user?: OwnUserResponse | UserResponse; appSettingsPromise?: Promise; activeChannels: { @@ -401,6 +406,7 @@ export class StreamChat { this.moderation = new Moderation(this); this.notifications = options?.notifications ?? new NotificationManager(); + this.uploadManager = new UploadManager(this); // set the secret if (secretOrOptions && isString(secretOrOptions)) { @@ -1020,6 +1026,7 @@ export class StreamChat { this.state = new ClientState({ client: this }); // reset thread manager this.threads.resetState(); + this.uploadManager.reset(); // Since we wipe all user data already, we should reset token manager as well closePromise diff --git a/src/index.ts b/src/index.ts index 9455a64ad..c22eec6b0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -33,6 +33,7 @@ export type { export * from './thread_manager'; export * from './token_manager'; export * from './types'; +export * from './uploadManager'; export * from './channel_manager'; export * from './offline-support'; export * from './LiveLocationManager'; diff --git a/src/messageComposer/attachmentManager.ts b/src/messageComposer/attachmentManager.ts index 489c7af20..d6beeec92 100644 --- a/src/messageComposer/attachmentManager.ts +++ b/src/messageComposer/attachmentManager.ts @@ -19,7 +19,7 @@ import { AttachmentPostUploadMiddlewareExecutor, AttachmentPreUploadMiddlewareExecutor, } from './middleware/attachmentManager'; -import { StateStore } from '../store'; +import { StateStore, type Unsubscribe } from '../store'; import { generateUUIDv4 } from '../utils'; import { DEFAULT_UPLOAD_SIZE_LIMIT_BYTES } from '../constants'; import type { @@ -74,6 +74,12 @@ export class AttachmentManager { attachments: LocalAttachment[]; }; + /** + * Upload manager progress listeners keyed by local attachment id. + * Torn down in {@link AttachmentManager.initState} and when attachments are removed. + */ + private uploadProgressUnsubscribesByLocalId = new Map(); + constructor({ composer, message }: AttachmentManagerOptions) { this.composer = composer; this.state = new StateStore(initState({ message })); @@ -209,6 +215,10 @@ export class AttachmentManager { } initState = ({ message }: { message?: DraftMessage | LocalMessage } = {}) => { + for (const unsubscribe of this.uploadProgressUnsubscribesByLocalId.values()) { + unsubscribe(); + } + this.uploadProgressUnsubscribesByLocalId.clear(); this.state.next(initState({ message })); }; @@ -270,6 +280,14 @@ export class AttachmentManager { }; removeAttachments = (localAttachmentIds: string[]) => { + if (!localAttachmentIds.length) return; + + for (const id of localAttachmentIds) { + this.uploadProgressUnsubscribesByLocalId.get(id)?.(); + this.uploadProgressUnsubscribesByLocalId.delete(id); + this.client.uploadManager.deleteUploadRecord(id); + } + this.state.partialNext({ attachments: this.attachments.filter( (attachment) => !localAttachmentIds.includes(attachment.localMetadata?.id), @@ -537,37 +555,9 @@ export class AttachmentManager { return localAttachment; } - const shouldTrackProgress = this.config.trackUploadProgress; - const uploadingAttachment: LocalUploadAttachment = { - ...attachment, - localMetadata: { - ...attachment.localMetadata, - uploadState: 'uploading', - ...(shouldTrackProgress && { uploadProgress: 0 }), - }, - }; - this.upsertAttachments([uploadingAttachment]); - - const uploadOptions = shouldTrackProgress - ? { - onProgress: (percent: number | undefined) => { - this.updateAttachment({ - ...uploadingAttachment, - localMetadata: { - ...uploadingAttachment.localMetadata, - uploadProgress: percent, - }, - }); - }, - } - : undefined; - let response: MinimumUploadRequestResult; try { - response = await this.doUploadRequest( - localAttachment.localMetadata.file, - uploadOptions, - ); + response = await this.upload(attachment); } catch (error) { const reason = error instanceof Error ? error.message : 'unknown error'; const failedAttachment: LocalUploadAttachment = { @@ -655,35 +645,10 @@ export class AttachmentManager { return preUpload.state.attachment; } - const shouldTrackProgress = this.config.trackUploadProgress; - attachment = { - ...attachment, - localMetadata: { - ...attachment.localMetadata, - uploadState: 'uploading', - ...(shouldTrackProgress && { uploadProgress: 0 }), - }, - }; - this.upsertAttachments([attachment]); - - const uploadOptions = shouldTrackProgress - ? { - onProgress: (percent: number | undefined) => { - this.updateAttachment({ - ...attachment, - localMetadata: { - ...attachment.localMetadata, - uploadProgress: percent, - }, - }); - }, - } - : undefined; - let response: MinimumUploadRequestResult | undefined; let error: Error | undefined; try { - response = await this.doUploadRequest(file, uploadOptions); + response = await this.upload(attachment); } catch (err) { error = err instanceof Error ? err : undefined; } @@ -725,4 +690,41 @@ export class AttachmentManager { iterableFiles.slice(0, this.availableUploadSlots).map(this.uploadFile), ); }; + + private upload(attachment: LocalUploadAttachment) { + const localId = attachment.localMetadata.id; + + this.uploadProgressUnsubscribesByLocalId.get(localId)?.(); + const unsubscribe = this.client.uploadManager.state.subscribeWithSelector( + (s) => ({ upload: s.uploads.find((u) => u.id === localId) }), + ({ upload: nextUpload }) => { + if (!nextUpload) return; + this.upsertAttachments([ + { + ...attachment, + localMetadata: { + ...attachment.localMetadata, + uploadState: 'uploading', + uploadProgress: nextUpload.uploadProgress, + }, + }, + ]); + }, + ); + + this.uploadProgressUnsubscribesByLocalId.set(localId, unsubscribe); + + return this.client.uploadManager + .upload({ + id: localId, + channelCid: this.channel.cid, + file: attachment.localMetadata.file, + }) + .finally(() => { + if (this.uploadProgressUnsubscribesByLocalId.get(localId) === unsubscribe) { + unsubscribe(); + this.uploadProgressUnsubscribesByLocalId.delete(localId); + } + }); + } } diff --git a/src/uploadManager.ts b/src/uploadManager.ts new file mode 100644 index 000000000..91477a418 --- /dev/null +++ b/src/uploadManager.ts @@ -0,0 +1,163 @@ +import type { StreamChat } from './client'; +import { StateStore } from './store'; +import type { AttachmentManager } from '.'; + +export type UploadRecord = { + id: string; + uploadProgress?: number; +}; + +export type UploadManagerState = { + uploads: UploadRecord[]; +}; + +const initState = (): UploadManagerState => ({ uploads: [] }); + +const upsertById = (uploads: UploadRecord[], record: UploadRecord): UploadRecord[] => { + const idx = uploads.findIndex((u) => u.id === record.id); + if (idx === -1) return [...uploads, record]; + const current = uploads[idx]; + if (current === record) return uploads; + const next = [...uploads]; + next[idx] = { ...current, ...record }; + return next; +}; + +const updateById = ( + uploads: UploadRecord[], + record: UploadRecord, +): UploadRecord[] | null => { + const idx = uploads.findIndex((u) => u.id === record.id); + if (idx === -1) return null; + const current = uploads[idx]; + const next = [...uploads]; + next[idx] = { ...current, ...record }; + return next; +}; + +/** + * @internal + */ +export class UploadManager { + readonly state: StateStore; + + private inFlightUploads = new Map>(); + + constructor(private readonly client: StreamChat) { + this.state = new StateStore(initState()); + } + + private resolveAttachmentManager(channelCid: string) { + const colon = channelCid.indexOf(':'); + if (colon <= 0 || colon === channelCid.length - 1) { + throw new Error(`Invalid channelCid: ${channelCid}`); + } + const channelType = channelCid.slice(0, colon); + const channelId = channelCid.slice(colon + 1); + return this.client.channel(channelType, channelId).messageComposer.attachmentManager; + } + + get uploads() { + return this.state.getLatestValue().uploads; + } + + getUpload = (id: string) => this.uploads.find((u) => u.id === id); + + /** + * Clears all upload records. + * Invoked when the user disconnects so a later session does not inherit stale upload state. + */ + reset = () => { + this.inFlightUploads.clear(); + this.state.next(initState()); + }; + + /** + * Removes the upload record for `id` if present. + */ + deleteUploadRecord = (id: string) => { + this.state.next((current) => { + const nextUploads = current.uploads.filter((u) => u.id !== id); + if (nextUploads.length === current.uploads.length) return current; + return { ...current, uploads: nextUploads }; + }); + this.inFlightUploads.delete(id); + }; + + /** + * Starts an upload for `id`, or returns the existing in-flight promise if one is already running. + * Uses {@link StreamChat.channel}(`channelCid`) → `messageComposer.attachmentManager.doUploadRequest`. + * Resolves with that result; rejects if the upload rejects (the record is removed from state either way). + */ + upload = ({ + id, + channelCid, + file, + }: { + id: string; + channelCid: string; + file: Parameters[0]; + }): ReturnType => { + const existingPromise = this.inFlightUploads.get(id); + if (existingPromise) return existingPromise; + + let resolvePromise!: ( + value: Awaited>, + ) => void; + let rejectPromise!: (reason?: unknown) => void; + const promise = new Promise< + Awaited> + >((resolve, reject) => { + resolvePromise = resolve; + rejectPromise = reject; + }); + + this.inFlightUploads.set(id, promise); + + void (async () => { + const attachmentManager = this.resolveAttachmentManager(channelCid); + const trackProgress = attachmentManager.config.trackUploadProgress; + try { + this.upsertUpload({ + id, + uploadProgress: trackProgress ? 0 : undefined, + }); + + const onProgress = trackProgress + ? (progress?: number) => { + this.updateUpload({ + id, + uploadProgress: progress, + }); + } + : undefined; + + const response = await attachmentManager.doUploadRequest( + file, + onProgress ? { onProgress } : undefined, + ); + resolvePromise(response); + } catch (error) { + rejectPromise(error); + } finally { + this.deleteUploadRecord(id); + } + })(); + + return promise; + }; + + private upsertUpload = (record: UploadRecord) => { + this.state.partialNext({ + uploads: upsertById(this.uploads, record), + }); + }; + + private updateUpload = (record: UploadRecord) => { + this.state.next((current) => { + const nextUploads = updateById(current.uploads, record); + if (!nextUploads) return current; + return { ...current, uploads: nextUploads }; + }); + }; +} diff --git a/test/unit/MessageComposer/attachmentManager.test.ts b/test/unit/MessageComposer/attachmentManager.test.ts index 28acdc901..c922605ab 100644 --- a/test/unit/MessageComposer/attachmentManager.test.ts +++ b/test/unit/MessageComposer/attachmentManager.test.ts @@ -9,7 +9,6 @@ import { DraftResponse, FileReference, LocalMessage, - MessageComposer, StreamChat, } from '../../../src'; import { AppSettings } from '../../../src'; @@ -117,12 +116,14 @@ const setup = ({ .fn() .mockResolvedValue({ file: 'test-image-url', thumb_url: 'thumb_url-image' }); mockChannel.data = { own_capabilities: ['upload-file'] }; - const messageComposer = new MessageComposer({ - client: mockClient, - composition, - compositionContext: mockChannel, - config: { attachments: config }, - }); + // Use the channel's messageComposer so client.uploadManager (resolves via channelCid) hits this composer. + const messageComposer = mockChannel.messageComposer; + if (config) { + messageComposer.updateConfig({ attachments: config }); + } + if (composition !== undefined) { + messageComposer.initState({ composition }); + } return { mockClient, mockChannel, messageComposer }; }; @@ -466,7 +467,62 @@ describe('AttachmentManager', () => { attachmentManager.initState(); - expect(attachmentManager.state.getLatestValue()).toEqual({ attachments: [] }); + expect(attachmentManager.state.getLatestValue()).toEqual({ + attachments: [], + }); + }); + + it('should unsubscribe upload progress listeners when re-initializing state', async () => { + const { messageComposer, mockClient } = setup(); + const { attachmentManager } = messageComposer; + + vi.spyOn(attachmentManager, 'doUploadRequest').mockImplementation( + () => new Promise(() => {}), + ); + + const upsertSpy = vi.spyOn(attachmentManager, 'upsertAttachments'); + + void attachmentManager.uploadFile(new File([], 'p.png', { type: 'image/png' })); + + await vi.waitFor(() => { + expect( + ( + attachmentManager as unknown as { + uploadProgressUnsubscribesByLocalId: Map; + } + ).uploadProgressUnsubscribesByLocalId.size, + ).toBeGreaterThan(0); + }); + + const uploadId = mockClient.uploadManager.uploads[0]?.id; + expect(uploadId).toBeDefined(); + + expect(upsertSpy).toHaveBeenCalled(); + + upsertSpy.mockClear(); + attachmentManager.initState(); + + expect( + ( + attachmentManager as unknown as { + uploadProgressUnsubscribesByLocalId: Map; + } + ).uploadProgressUnsubscribesByLocalId.size, + ).toBe(0); + + mockClient.uploadManager.state.partialNext((current) => ({ + ...current, + uploads: current.uploads.map((u) => + u.id === uploadId + ? { + ...u, + uploadProgress: 99, + } + : u, + ), + })); + + expect(upsertSpy).not.toHaveBeenCalled(); }); it('should initialize with message', () => { @@ -614,6 +670,144 @@ describe('AttachmentManager', () => { { localMetadata: { id: 'test-id-2' } }, ]); }); + + it('should delete matching upload records when removing upload attachments', async () => { + const { + messageComposer: { attachmentManager }, + messageComposer, + mockClient, + mockChannel, + } = setup({ config: { trackUploadProgress: false } }); + + const previewUri = 'blob:preview-for-remove-test'; + const file = generateFile({ name: 'x.png', type: 'image/png' }); + const attachment: LocalUploadAttachment = { + type: 'image', + mime_type: 'image/png', + file_size: file.size, + fallback: file.name, + localMetadata: { + id: 'att-with-upload', + file, + previewUri, + uploadState: 'uploading', + }, + }; + + attachmentManager.upsertAttachments([attachment]); + + vi.spyOn(attachmentManager, 'doUploadRequest').mockImplementation( + () => new Promise(() => {}), + ); + void mockClient.uploadManager.upload({ + id: 'att-with-upload', + channelCid: mockChannel.cid, + file, + }); + + await Promise.resolve(); + + expect( + mockClient.uploadManager.uploads.some((u) => u.id === 'att-with-upload'), + ).toBe(true); + + attachmentManager.removeAttachments(['att-with-upload']); + + expect( + mockClient.uploadManager.uploads.some((u) => u.id === 'att-with-upload'), + ).toBe(false); + expect(attachmentManager.attachments).toEqual([]); + }); + + it('should not delete upload records for another local attachment id', async () => { + const { + messageComposer: { attachmentManager }, + mockClient, + mockChannel, + } = setup({ config: { trackUploadProgress: false } }); + + const previewUri = 'blob:other-composer-uri'; + + vi.spyOn(attachmentManager, 'doUploadRequest').mockImplementation( + () => new Promise(() => {}), + ); + void mockClient.uploadManager.upload({ + id: 'other-composer-attachment', + channelCid: mockChannel.cid, + file: new File([], 'other.png'), + }); + + await Promise.resolve(); + + const file = generateFile({ name: 'x.png', type: 'image/png' }); + attachmentManager.upsertAttachments([ + { + type: 'image', + mime_type: 'image/png', + file_size: file.size, + fallback: file.name, + localMetadata: { + id: 'att-1', + file, + previewUri, + uploadState: 'uploading', + }, + } as LocalUploadAttachment, + ]); + + attachmentManager.removeAttachments(['att-1']); + + expect( + mockClient.uploadManager.uploads.some( + (u) => u.id === 'other-composer-attachment', + ), + ).toBe(true); + }); + + it('should unsubscribe upload progress listeners when removing an in-flight upload', async () => { + const { + messageComposer: { attachmentManager }, + mockClient, + } = setup({ config: { trackUploadProgress: true } }); + + vi.spyOn(attachmentManager, 'doUploadRequest').mockImplementation( + () => new Promise(() => {}), + ); + const upsertSpy = vi.spyOn(attachmentManager, 'upsertAttachments'); + + void attachmentManager.uploadFile(new File([], 'p.png', { type: 'image/png' })); + + await vi.waitFor(() => { + expect( + ( + attachmentManager as unknown as { + uploadProgressUnsubscribesByLocalId: Map; + } + ).uploadProgressUnsubscribesByLocalId.size, + ).toBeGreaterThan(0); + }); + + const uploadId = mockClient.uploadManager.uploads[0]?.id; + expect(uploadId).toBeDefined(); + + upsertSpy.mockClear(); + attachmentManager.removeAttachments([uploadId!]); + + expect( + ( + attachmentManager as unknown as { + uploadProgressUnsubscribesByLocalId: Map; + } + ).uploadProgressUnsubscribesByLocalId.size, + ).toBe(0); + + mockClient.uploadManager.state.partialNext((current) => ({ + ...current, + uploads: [...current.uploads, { id: uploadId!, uploadProgress: 77 }], + })); + + expect(upsertSpy).not.toHaveBeenCalled(); + }); }); describe('getUploadConfigCheck', () => { @@ -1276,7 +1470,7 @@ describe('AttachmentManager', () => { messageComposer: { attachmentManager }, mockChannel, } = setup(); - const updateSpy = vi.spyOn(attachmentManager, 'updateAttachment'); + const updateSpy = vi.spyOn(attachmentManager, 'upsertAttachments'); const customUploadFn = vi.fn(async (_file, options) => { options?.onProgress?.(42); return { file: 'custom-upload-url' }; @@ -1304,9 +1498,11 @@ describe('AttachmentManager', () => { expect.objectContaining({ onProgress: expect.any(Function) }), ); expect(updateSpy).toHaveBeenCalledWith( - expect.objectContaining({ - localMetadata: expect.objectContaining({ uploadProgress: 42 }), - }), + expect.arrayContaining([ + expect.objectContaining({ + localMetadata: expect.objectContaining({ uploadProgress: 42 }), + }), + ]), ); expect(mockChannel.sendImage).not.toHaveBeenCalled(); }); diff --git a/test/unit/MessageComposer/uploadManager.test.ts b/test/unit/MessageComposer/uploadManager.test.ts new file mode 100644 index 000000000..4e0796bd9 --- /dev/null +++ b/test/unit/MessageComposer/uploadManager.test.ts @@ -0,0 +1,272 @@ +import { describe, expect, it, vi } from 'vitest'; +import type { StreamChat, UploadRecord } from '../../../src'; +import { UploadManager } from '../../../src'; + +const TEST_CID = 'channelType:channelId'; + +const createManager = ( + doUploadRequest: (...args: unknown[]) => unknown = vi.fn().mockResolvedValue(undefined), + trackUploadProgress = true, +) => { + const attachmentManager = { + doUploadRequest, + config: { trackUploadProgress }, + }; + const client = { + channel: vi.fn().mockReturnValue({ + messageComposer: { attachmentManager }, + }), + } as unknown as StreamChat; + const manager = new UploadManager(client); + return { manager, client, doUploadRequest }; +}; + +describe('UploadManager', () => { + it('upload upserts uploading record and calls doUploadRequest with onProgress', async () => { + const { manager, doUploadRequest } = createManager(); + const file = new File([], 'a.txt'); + + const promise = manager.upload({ + id: 'local-a', + channelCid: TEST_CID, + file, + }); + + expect(manager.uploads).toEqual([ + { + id: 'local-a', + uploadProgress: 0, + }, + ]); + await promise; + + expect(manager.getUpload('local-a')).toBeUndefined(); + expect(doUploadRequest).toHaveBeenCalledWith( + file, + expect.objectContaining({ onProgress: expect.any(Function) }), + ); + }); + + it('stores id on upload record', async () => { + const { manager, doUploadRequest } = createManager(); + const file = new File([], 'a.txt'); + + await manager.upload({ id: 'm1', channelCid: TEST_CID, file }); + + const snapshots: unknown[] = []; + const { manager: manager2, doUploadRequest: doUploadRequest2 } = createManager(); + const unsub = manager2.state.subscribe((next) => snapshots.push(next.uploads)); + await manager2.upload({ + id: 'm1', + channelCid: TEST_CID, + file: new File([], 'b.txt'), + }); + unsub(); + + expect( + snapshots.some((u: unknown) => { + const row = (u as UploadRecord[])?.[0]; + return row?.id === 'm1' && row.uploadProgress === 0; + }), + ).toBe(true); + expect(snapshots.some((u: unknown) => (u as UploadRecord[]).length === 0)).toBe(true); + expect(doUploadRequest).toHaveBeenCalled(); + expect(doUploadRequest2).toHaveBeenCalled(); + }); + + it('updates uploadProgress when onProgress is invoked (including undefined)', async () => { + const { manager, doUploadRequest } = createManager(); + let onProgress!: (p?: number) => void; + doUploadRequest.mockImplementation( + async (_file: unknown, opts?: { onProgress?: (n?: number) => void }) => { + onProgress = opts!.onProgress!; + }, + ); + + const start = manager.upload({ + id: 'u1', + channelCid: TEST_CID, + file: new File([], 'x'), + }); + onProgress(33); + expect(manager.getUpload('u1')?.uploadProgress).toBe(33); + onProgress(undefined); + expect(manager.getUpload('u1')?.uploadProgress).toBeUndefined(); + await start; + }); + + it('on failure, removes record and rejects the promise', async () => { + const err = new Error('boom'); + const { manager, doUploadRequest } = createManager(vi.fn().mockRejectedValue(err)); + + await expect( + manager.upload({ + id: 'u1', + channelCid: TEST_CID, + file: new File([], 'x'), + }), + ).rejects.toBe(err); + + expect(manager.getUpload('u1')).toBeUndefined(); + }); + + it('reset clears all records', async () => { + const err = new Error('fail'); + const { manager, doUploadRequest } = createManager( + vi.fn().mockRejectedValueOnce(err).mockResolvedValueOnce(undefined), + ); + + await expect( + manager.upload({ id: 'm1', channelCid: TEST_CID, file: new File([], 'a') }), + ).rejects.toBe(err); + manager.reset(); + + expect(manager.uploads).toEqual([]); + expect(doUploadRequest).toHaveBeenCalledTimes(1); + }); + + it('dedupes upload: concurrent calls share one promise and a single doUploadRequest run', async () => { + const { manager, doUploadRequest } = createManager(); + let resolve!: () => void; + const gate = new Promise((r) => { + resolve = r; + }); + doUploadRequest.mockImplementation(async () => gate); + + const file = new File([], 'x'); + const first = manager.upload({ id: 'same', channelCid: TEST_CID, file }); + const second = manager.upload({ id: 'same', channelCid: TEST_CID, file }); + + expect(first).toBe(second); + expect(doUploadRequest).toHaveBeenCalledTimes(1); + + resolve(); + await expect(Promise.all([first, second])).resolves.toEqual([undefined, undefined]); + }); + + it('different ids allow concurrent uploads', async () => { + const { manager, doUploadRequest } = createManager(); + let resolveA!: () => void; + let resolveB!: () => void; + const gateA = new Promise((r) => { + resolveA = r; + }); + const gateB = new Promise((r) => { + resolveB = r; + }); + const gates = [gateA, gateB]; + let i = 0; + doUploadRequest.mockImplementation(async () => gates[i++]); + + const file = new File([], 'x'); + const p1 = manager.upload({ id: 'm1', channelCid: TEST_CID, file }); + const p2 = manager.upload({ id: 'm2', channelCid: TEST_CID, file }); + + expect(manager.getUpload('m1')).toEqual({ id: 'm1', uploadProgress: 0 }); + expect(manager.getUpload('m2')).toEqual({ id: 'm2', uploadProgress: 0 }); + + const p1Again = manager.upload({ id: 'm1', channelCid: TEST_CID, file }); + expect(p1Again).toBe(p1); + expect(doUploadRequest).toHaveBeenCalledTimes(2); + + resolveA(); + resolveB(); + await Promise.all([p1, p2]); + }); + + it('on success, response is returned and upload record is removed', async () => { + const response = { file: 'https://cdn.example/uploaded' }; + const { manager, doUploadRequest } = createManager( + vi.fn().mockResolvedValue(response), + ); + + const result = await manager.upload({ + id: 'u1', + channelCid: TEST_CID, + file: new File([], 'x'), + }); + + expect(result).toBe(response); + expect(manager.getUpload('u1')).toBeUndefined(); + expect(doUploadRequest).toHaveBeenCalled(); + }); + + it('omits onProgress when attachment manager config trackUploadProgress is false', async () => { + const { manager, doUploadRequest } = createManager( + vi.fn().mockResolvedValue(undefined), + false, + ); + const file = new File([], 'x'); + await manager.upload({ + id: 'u1', + channelCid: TEST_CID, + file, + }); + expect(doUploadRequest).toHaveBeenCalledWith(file, undefined); + }); + + describe('deleteUploadRecord', () => { + it('removes matching in-flight record and leaves others', async () => { + const { manager, doUploadRequest } = createManager( + vi.fn().mockImplementation(() => new Promise(() => {})), + ); + const file = new File([], 'x'); + + void manager.upload({ id: 'm1', channelCid: TEST_CID, file }); + void manager.upload({ id: 'm2', channelCid: TEST_CID, file }); + + await Promise.resolve(); + + expect(manager.getUpload('m1')).toBeDefined(); + expect(manager.getUpload('m2')).toBeDefined(); + + manager.deleteUploadRecord('m1'); + + expect(manager.getUpload('m1')).toBeUndefined(); + expect(manager.getUpload('m2')).toBeDefined(); + expect(doUploadRequest).toHaveBeenCalledTimes(2); + }); + + it('is a no-op when id does not match any record', () => { + const { manager, doUploadRequest } = createManager( + vi.fn().mockImplementation(() => new Promise(() => {})), + ); + void manager.upload({ id: 'm1', channelCid: TEST_CID, file: new File([], 'x') }); + + manager.deleteUploadRecord('other'); + + expect(manager.getUpload('m1')).toEqual({ id: 'm1', uploadProgress: 0 }); + expect(doUploadRequest).toHaveBeenCalledTimes(1); + }); + + it('does not re-insert the record when onProgress fires after deleteUploadRecord', async () => { + let onProgress!: (p?: number) => void; + const { manager, doUploadRequest } = createManager( + vi + .fn() + .mockImplementation( + (_file: unknown, opts?: { onProgress?: (n?: number) => void }) => { + onProgress = opts!.onProgress!; + return new Promise(() => {}); + }, + ), + ); + + void manager.upload({ + id: 'removed-mid-flight', + channelCid: TEST_CID, + file: new File([], 'x'), + }); + + await Promise.resolve(); + expect(manager.getUpload('removed-mid-flight')).toBeDefined(); + + manager.deleteUploadRecord('removed-mid-flight'); + expect(manager.getUpload('removed-mid-flight')).toBeUndefined(); + + onProgress(50); + expect(manager.getUpload('removed-mid-flight')).toBeUndefined(); + expect(doUploadRequest).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/test/unit/client.test.js b/test/unit/client.test.js index 8f4bbc244..61e6fcc23 100644 --- a/test/unit/client.test.js +++ b/test/unit/client.test.js @@ -345,6 +345,26 @@ describe('Client disconnectUser', () => { await expect(client.disconnectUser()).rejects.toThrow(); expect(client.tokenManager.reset.called).to.be.true; }); + + it('should clear upload manager records', async () => { + const client = new StreamChat('', ''); + client.uploadManager.state.next(() => ({ + uploads: [ + { + id: 'upload-x', + uploadProgress: 0, + }, + ], + })); + const { resolve, promise } = Promise.withResolvers(); + client.wsConnection = { disconnect: () => promise }; + client.wsFallback = null; + const disconnectPromise = client.disconnectUser(); + expect(client.uploadManager.uploads).to.have.length(0); + resolve(); + await disconnectPromise; + expect(client.uploadManager.uploads).to.deep.equal([]); + }); }); describe('Detect node environment', () => {