Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ import { ReminderManager } from './reminders';
import { StateStore } from './store';
import type { MessageComposer } from './messageComposer';
import type { AbstractOfflineDB } from './offline-support';
import { getPendingTaskChannelData } from './offline-support/util';

function isString(x: unknown): x is string {
return typeof x === 'string' || x instanceof String;
Expand Down Expand Up @@ -3114,6 +3115,38 @@ export class StreamChat {
throw Error('Please specify the message.id when calling updateMessage');
}

const messageId = message.id as string;

try {
if (this.offlineDb) {
return await this.offlineDb.queueTask<UpdateMessageAPIResponse>({
task: {
...getPendingTaskChannelData(message.cid),
messageId,
payload: [message, partialUserOrUserId, options],
type: 'update-message',
},
});
}
} catch (error) {
this.logger('error', `offlineDb:updateMessage`, {
tags: ['channel', 'offlineDb'],
error,
});
}

return await this._updateMessage(message, partialUserOrUserId, options);
}

async _updateMessage(
message: LocalMessage | Partial<MessageResponse>,
partialUserOrUserId?: string | { id: string },
options?: UpdateMessageOptions,
) {
if (!message.id) {
throw Error('Please specify the message.id when calling updateMessage');
}

// should not include user object
const payload = toUpdatedMessagePayload(message);

Expand Down
127 changes: 124 additions & 3 deletions src/offline-support/offline_support_api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import type { APIErrorResponse, ChannelResponse, Event } from '../types';
import type {
APIErrorResponse,
ChannelResponse,
Event,
LocalMessage,
Message,
MessageResponse,
} from '../types';

import type {
OfflineDBApi,
Expand All @@ -11,7 +18,8 @@ import type { StreamChat } from '../client';
import type { AxiosError } from 'axios';
import { OfflineDBSyncManager } from './offline_sync_manager';
import { StateStore } from '../store';
import { runDetached } from '../utils';
import { localMessageToNewMessagePayload, runDetached } from '../utils';
import { isMessageUpdateReplayable } from './util';

/**
* Abstract base class for an offline database implementation used with StreamChat.
Expand Down Expand Up @@ -310,6 +318,16 @@ export abstract class AbstractOfflineDB implements OfflineDBApi {
*/
abstract addPendingTask: OfflineDBApi['addPendingTask'];

/**
* @abstract
* Updates a pending task in the DB, given its ID.
* Will return the prepared queries for delayed execution (even if they are
* already executed).
* @param {DBUpdatePendingTaskType} options
* @returns {Promise<ExecuteBatchDBQueriesType>}
*/
abstract updatePendingTask: OfflineDBApi['updatePendingTask'];

/**
* @abstract
* Deletes a pending task from the DB, given its ID.
Expand Down Expand Up @@ -1076,7 +1094,7 @@ export abstract class AbstractOfflineDB implements OfflineDBApi {
return await attemptTaskExecution();
} catch (e) {
if (!this.shouldSkipQueueingTask(e as AxiosError<APIErrorResponse>)) {
await this.addPendingTask(task);
await this.handleAddPendingTask({ task });
}
throw e;
}
Expand All @@ -1092,13 +1110,112 @@ export abstract class AbstractOfflineDB implements OfflineDBApi {
private shouldSkipQueueingTask = (error: AxiosError<APIErrorResponse>) =>
error?.response?.data?.code === 4 || error?.response?.data?.code === 17;

private mergeFailedMessageUpdateIntoPendingSendMessage = ({
editedMessage,
pendingMessage,
}: {
editedMessage: LocalMessage | Partial<MessageResponse>;
pendingMessage: Message;
}) => {
const normalizedEditedMessageSource = {
...editedMessage,
} as LocalMessage & { message_text_updated_at?: string };

if (editedMessage.status === 'failed') {
delete normalizedEditedMessageSource.message_text_updated_at;
}

const normalizedEditedMessage = localMessageToNewMessagePayload(
normalizedEditedMessageSource,
);
const pendingMessageStatus = (pendingMessage as { status?: string }).status;

return {
...pendingMessage,
...normalizedEditedMessage,
...(typeof pendingMessageStatus !== 'undefined'
? { status: pendingMessageStatus }
: {}),
} as Message;
};

private isPendingSendMessageTask = (
task: PendingTask,
): task is Extract<PendingTask, { type: 'send-message' }> =>
task.type === 'send-message';

private handleOfflineFailedUpdateMessagePendingTask = async (
task: Extract<PendingTask, { type: 'update-message' }>,
) => {
const [message] = task.payload;
if (!message.id) {
return;
}

const pendingTasks = await this.getPendingTasks({ messageId: message.id });
const pendingSendMessageTask = pendingTasks.find(this.isPendingSendMessageTask);

if (!pendingSendMessageTask) {
return;
}

const updatedPendingSendMessage = this.mergeFailedMessageUpdateIntoPendingSendMessage(
{
editedMessage: message,
pendingMessage: pendingSendMessageTask.payload[0],
},
);

const updatedPendingTask: Extract<PendingTask, { type: 'send-message' }> = {
...pendingSendMessageTask,
payload: [updatedPendingSendMessage, pendingSendMessageTask.payload[1]],
};

if (pendingSendMessageTask.id) {
await this.updatePendingTask({
id: pendingSendMessageTask.id,
task: updatedPendingTask,
});
return;
}

await this.addPendingTask({
...updatedPendingTask,
id: undefined,
});
};

/**
* Central ingress for persisting pending tasks. It either stores the task as-is
* or rewrites an existing pending `send-message` task for offline edits of failed messages.
*/
public handleAddPendingTask = async ({ task }: { task: PendingTask }) => {
if (task.type === 'update-message' && !isMessageUpdateReplayable(task.payload[0])) {
return;
}

if (
task.type === 'update-message' &&
!this.client.wsConnection?.isHealthy &&
task.payload[0].status === 'failed'
) {
await this.handleOfflineFailedUpdateMessagePendingTask(task);
return;
}

await this.addPendingTask(task);
};

/**
* Executes a task from the list of supported pending tasks. Currently supported pending tasks
* are:
* - Updating a message
* - Deleting a message
* - Sending a reaction
* - Removing a reaction
* - Sending a message
* - Creating a draft
* - Deleting a draft
* It will throw if we try to execute a pending task that is not supported.
* @param task - The task we want to execute
* @param isPendingTask - a control value telling us if it's an actual pending task being executed
Expand All @@ -1108,6 +1225,10 @@ export abstract class AbstractOfflineDB implements OfflineDBApi {
{ task }: { task: PendingTask },
isPendingTask = false,
) => {
if (task.type === 'update-message') {
return await this.client._updateMessage(...task.payload);
}

if (task.type === 'delete-message') {
return await this.client._deleteMessage(...task.payload);
}
Expand Down
18 changes: 18 additions & 0 deletions src/offline-support/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,16 @@ export type DBDeletePendingTaskType = {
id: number;
};

/**
* Update a pending task by ID.
*/
export type DBUpdatePendingTaskType = {
/** ID of the pending task. */
id: number;
/** The next task payload to persist. */
task: PendingTask;
};

/**
* Options to delete a reaction from a message.
*/
Expand Down Expand Up @@ -372,6 +382,9 @@ export interface OfflineDBApi {
addPendingTask: (task: PendingTask) => Promise<() => Promise<void>>;
getPendingTasks: (conditions?: DBGetPendingTasksType) => Promise<PendingTask[]>;
deleteDraft: (options: DBDeleteDraftType) => Promise<ExecuteBatchDBQueriesType>;
updatePendingTask: (
options: DBUpdatePendingTaskType,
) => Promise<ExecuteBatchDBQueriesType>;
deletePendingTask: (
options: DBDeletePendingTaskType,
) => Promise<ExecuteBatchDBQueriesType>;
Expand All @@ -397,6 +410,7 @@ export type OfflineDBState = {
};

export type PendingTaskTypes = {
updateMessage: 'update-message';
deleteMessage: 'delete-message';
deleteReaction: 'delete-reaction';
sendReaction: 'send-reaction';
Expand All @@ -417,6 +431,10 @@ export type PendingTask = {
payload: Parameters<Channel['sendReaction']>;
type: PendingTaskTypes['sendReaction'];
}
| {
payload: Parameters<StreamChat['updateMessage']>;
type: PendingTaskTypes['updateMessage'];
}
| {
payload: Parameters<StreamChat['deleteMessage']>;
type: PendingTaskTypes['deleteMessage'];
Expand Down
32 changes: 32 additions & 0 deletions src/offline-support/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { Attachment, LocalMessage, MessageResponse } from '../types';

export const isLocalUrl = (value: string | undefined) =>
!!value && !value.startsWith('http');

export const isAttachmentReplayable = (attachment: Attachment) => {
if (!attachment || typeof attachment !== 'object') {
return true;
}

return !isLocalUrl(attachment.asset_url) && !isLocalUrl(attachment.image_url);
};

export const isMessageUpdateReplayable = (
message: LocalMessage | Partial<MessageResponse>,
) => !message.attachments?.some((attachment) => !isAttachmentReplayable(attachment));

export const getPendingTaskChannelData = (cid?: string) => {
if (!cid) {
return {};
}

const separatorIndex = cid.indexOf(':');
if (separatorIndex <= 0 || separatorIndex === cid.length - 1) {
return {};
}

return {
channelId: cid.slice(separatorIndex + 1),
channelType: cid.slice(0, separatorIndex),
};
};
Loading
Loading