Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type WebSocket from 'isomorphic-ws';
import { Channel } from './channel';
import { ClientState } from './client_state';
import { StableWSConnection } from './connection';
import { UploadManager } from './uploadManager';
import { CheckSignature, DevToken, JWTUserToken } from './signing';
import { TokenManager } from './token_manager';
import { WSConnectionFallback } from './connection_fallback';
Expand Down Expand Up @@ -296,6 +297,10 @@ export type MessageComposerSetupState = {
export class StreamChat {
private static _instance?: unknown | StreamChat; // type is undefined|StreamChat, unknown is due to TS limitations with statics
messageDeliveryReporter: MessageDeliveryReporter;
/**
* @internal
*/
uploadManager: UploadManager;
_user?: OwnUserResponse | UserResponse;
appSettingsPromise?: Promise<AppSettingsAPIResponse>;
activeChannels: {
Expand Down Expand Up @@ -401,6 +406,7 @@ export class StreamChat {
this.moderation = new Moderation(this);

this.notifications = options?.notifications ?? new NotificationManager();
this.uploadManager = new UploadManager(this);

// set the secret
if (secretOrOptions && isString(secretOrOptions)) {
Expand Down Expand Up @@ -1020,6 +1026,7 @@ export class StreamChat {
this.state = new ClientState({ client: this });
// reset thread manager
this.threads.resetState();
this.uploadManager.reset();

// Since we wipe all user data already, we should reset token manager as well
closePromise
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export type {
export * from './thread_manager';
export * from './token_manager';
export * from './types';
export * from './uploadManager';
export * from './channel_manager';
export * from './offline-support';
export * from './LiveLocationManager';
Expand Down
114 changes: 58 additions & 56 deletions src/messageComposer/attachmentManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
AttachmentPostUploadMiddlewareExecutor,
AttachmentPreUploadMiddlewareExecutor,
} from './middleware/attachmentManager';
import { StateStore } from '../store';
import { StateStore, type Unsubscribe } from '../store';
import { generateUUIDv4 } from '../utils';
import { DEFAULT_UPLOAD_SIZE_LIMIT_BYTES } from '../constants';
import type {
Expand Down Expand Up @@ -74,6 +74,12 @@ export class AttachmentManager {
attachments: LocalAttachment[];
};

/**
* Upload manager progress listeners keyed by local attachment id.
* Torn down in {@link AttachmentManager.initState} and when attachments are removed.
*/
private uploadProgressUnsubscribesByLocalId = new Map<string, Unsubscribe>();

constructor({ composer, message }: AttachmentManagerOptions) {
this.composer = composer;
this.state = new StateStore<AttachmentManagerState>(initState({ message }));
Expand Down Expand Up @@ -209,6 +215,10 @@ export class AttachmentManager {
}

initState = ({ message }: { message?: DraftMessage | LocalMessage } = {}) => {
for (const unsubscribe of this.uploadProgressUnsubscribesByLocalId.values()) {
unsubscribe();
}
this.uploadProgressUnsubscribesByLocalId.clear();
this.state.next(initState({ message }));
};

Expand Down Expand Up @@ -270,6 +280,14 @@ export class AttachmentManager {
};

removeAttachments = (localAttachmentIds: string[]) => {
if (!localAttachmentIds.length) return;

for (const id of localAttachmentIds) {
this.uploadProgressUnsubscribesByLocalId.get(id)?.();
this.uploadProgressUnsubscribesByLocalId.delete(id);
this.client.uploadManager.deleteUploadRecord(id);
}

this.state.partialNext({
attachments: this.attachments.filter(
(attachment) => !localAttachmentIds.includes(attachment.localMetadata?.id),
Expand Down Expand Up @@ -537,37 +555,9 @@ export class AttachmentManager {
return localAttachment;
}

const shouldTrackProgress = this.config.trackUploadProgress;
const uploadingAttachment: LocalUploadAttachment = {
...attachment,
localMetadata: {
...attachment.localMetadata,
uploadState: 'uploading',
...(shouldTrackProgress && { uploadProgress: 0 }),
},
};
this.upsertAttachments([uploadingAttachment]);

const uploadOptions = shouldTrackProgress
? {
onProgress: (percent: number | undefined) => {
this.updateAttachment({
...uploadingAttachment,
localMetadata: {
...uploadingAttachment.localMetadata,
uploadProgress: percent,
},
});
},
}
: undefined;

let response: MinimumUploadRequestResult;
try {
response = await this.doUploadRequest(
localAttachment.localMetadata.file,
uploadOptions,
);
response = await this.upload(attachment);
} catch (error) {
const reason = error instanceof Error ? error.message : 'unknown error';
const failedAttachment: LocalUploadAttachment = {
Expand Down Expand Up @@ -655,35 +645,10 @@ export class AttachmentManager {
return preUpload.state.attachment;
}

const shouldTrackProgress = this.config.trackUploadProgress;
attachment = {
...attachment,
localMetadata: {
...attachment.localMetadata,
uploadState: 'uploading',
...(shouldTrackProgress && { uploadProgress: 0 }),
},
};
this.upsertAttachments([attachment]);

const uploadOptions = shouldTrackProgress
? {
onProgress: (percent: number | undefined) => {
this.updateAttachment({
...attachment,
localMetadata: {
...attachment.localMetadata,
uploadProgress: percent,
},
});
},
}
: undefined;

let response: MinimumUploadRequestResult | undefined;
let error: Error | undefined;
try {
response = await this.doUploadRequest(file, uploadOptions);
response = await this.upload(attachment);
} catch (err) {
error = err instanceof Error ? err : undefined;
}
Expand Down Expand Up @@ -725,4 +690,41 @@ export class AttachmentManager {
iterableFiles.slice(0, this.availableUploadSlots).map(this.uploadFile),
);
};

private upload(attachment: LocalUploadAttachment) {
const localId = attachment.localMetadata.id;

this.uploadProgressUnsubscribesByLocalId.get(localId)?.();
const unsubscribe = this.client.uploadManager.state.subscribeWithSelector(
(s) => ({ upload: s.uploads.find((u) => u.id === localId) }),
({ upload: nextUpload }) => {
if (!nextUpload) return;
this.upsertAttachments([
{
...attachment,
localMetadata: {
...attachment.localMetadata,
uploadState: 'uploading',
uploadProgress: nextUpload.uploadProgress,
},
},
]);
},
);

this.uploadProgressUnsubscribesByLocalId.set(localId, unsubscribe);

return this.client.uploadManager
.upload({
id: localId,
channelCid: this.channel.cid,
file: attachment.localMetadata.file,
})
.finally(() => {
if (this.uploadProgressUnsubscribesByLocalId.get(localId) === unsubscribe) {
unsubscribe();
this.uploadProgressUnsubscribesByLocalId.delete(localId);
}
});
}
}
163 changes: 163 additions & 0 deletions src/uploadManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import type { StreamChat } from './client';
import { StateStore } from './store';
import type { AttachmentManager } from '.';

export type UploadRecord = {
id: string;
uploadProgress?: number;
};

export type UploadManagerState = {
uploads: UploadRecord[];
};

const initState = (): UploadManagerState => ({ uploads: [] });

const upsertById = (uploads: UploadRecord[], record: UploadRecord): UploadRecord[] => {
const idx = uploads.findIndex((u) => u.id === record.id);
if (idx === -1) return [...uploads, record];
const current = uploads[idx];
if (current === record) return uploads;
const next = [...uploads];
next[idx] = { ...current, ...record };
return next;
};

const updateById = (
uploads: UploadRecord[],
record: UploadRecord,
): UploadRecord[] | null => {
const idx = uploads.findIndex((u) => u.id === record.id);
if (idx === -1) return null;
const current = uploads[idx];
const next = [...uploads];
next[idx] = { ...current, ...record };
return next;
};

/**
* @internal
*/
export class UploadManager {
readonly state: StateStore<UploadManagerState>;

private inFlightUploads = new Map<string, ReturnType<typeof this.upload>>();

constructor(private readonly client: StreamChat) {
this.state = new StateStore<UploadManagerState>(initState());
}

private resolveAttachmentManager(channelCid: string) {
const colon = channelCid.indexOf(':');
if (colon <= 0 || colon === channelCid.length - 1) {
throw new Error(`Invalid channelCid: ${channelCid}`);
}
const channelType = channelCid.slice(0, colon);
const channelId = channelCid.slice(colon + 1);
return this.client.channel(channelType, channelId).messageComposer.attachmentManager;
}

get uploads() {
return this.state.getLatestValue().uploads;
}

getUpload = (id: string) => this.uploads.find((u) => u.id === id);

/**
* Clears all upload records.
* Invoked when the user disconnects so a later session does not inherit stale upload state.
*/
reset = () => {
this.inFlightUploads.clear();
this.state.next(initState());
};

/**
* Removes the upload record for `id` if present.
*/
deleteUploadRecord = (id: string) => {
this.state.next((current) => {
const nextUploads = current.uploads.filter((u) => u.id !== id);
if (nextUploads.length === current.uploads.length) return current;
return { ...current, uploads: nextUploads };
});
this.inFlightUploads.delete(id);
};

/**
* Starts an upload for `id`, or returns the existing in-flight promise if one is already running.
* Uses {@link StreamChat.channel}(`channelCid`) → `messageComposer.attachmentManager.doUploadRequest`.
* Resolves with that result; rejects if the upload rejects (the record is removed from state either way).
*/
upload = ({
id,
channelCid,
file,
}: {
id: string;
channelCid: string;
file: Parameters<typeof AttachmentManager.prototype.doUploadRequest>[0];
}): ReturnType<typeof AttachmentManager.prototype.doUploadRequest> => {
const existingPromise = this.inFlightUploads.get(id);
if (existingPromise) return existingPromise;

let resolvePromise!: (
value: Awaited<ReturnType<typeof AttachmentManager.prototype.doUploadRequest>>,
) => void;
let rejectPromise!: (reason?: unknown) => void;
const promise = new Promise<
Awaited<ReturnType<typeof AttachmentManager.prototype.doUploadRequest>>
>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});

this.inFlightUploads.set(id, promise);

void (async () => {
const attachmentManager = this.resolveAttachmentManager(channelCid);
const trackProgress = attachmentManager.config.trackUploadProgress;
try {
this.upsertUpload({
id,
uploadProgress: trackProgress ? 0 : undefined,
});

const onProgress = trackProgress
? (progress?: number) => {
this.updateUpload({
id,
uploadProgress: progress,
});
}
: undefined;

const response = await attachmentManager.doUploadRequest(
file,
onProgress ? { onProgress } : undefined,
);
resolvePromise(response);
} catch (error) {
rejectPromise(error);
} finally {
this.deleteUploadRecord(id);
}
})();

return promise;
};

private upsertUpload = (record: UploadRecord) => {
this.state.partialNext({
uploads: upsertById(this.uploads, record),
});
};

private updateUpload = (record: UploadRecord) => {
this.state.next((current) => {
const nextUploads = updateById(current.uploads, record);
if (!nextUploads) return current;
return { ...current, uploads: nextUploads };
});
};
}
Loading
Loading