Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import { parseTimeout, PUBSUB } from "./util";

const debug = require("debug")("socket.io-redis");

module.exports = exports = createAdapter;

/**
* Request types, for messages between nodes
*/
Expand Down Expand Up @@ -88,7 +86,7 @@ export interface RedisAdapterOptions {
export function createAdapter(
pubClient: any,
subClient: any,
opts?: Partial<RedisAdapterOptions>
opts?: Partial<RedisAdapterOptions>,
) {
return function (nsp) {
return new RedisAdapter(nsp, pubClient, subClient, opts);
Expand Down Expand Up @@ -124,7 +122,7 @@ export class RedisAdapter extends Adapter {
nsp: any,
readonly pubClient: any,
readonly subClient: any,
opts: Partial<RedisAdapterOptions> = {}
opts: Partial<RedisAdapterOptions> = {},
) {
super(nsp);

Expand Down Expand Up @@ -154,7 +152,7 @@ export class RedisAdapter extends Adapter {
this.subClient.pSubscribe(
this.channel + "*",
this.redisListeners.get("psub"),
true
true,
);
this.subClient.subscribe(
[
Expand All @@ -163,7 +161,7 @@ export class RedisAdapter extends Adapter {
this.specificResponseChannel,
],
this.redisListeners.get("sub"),
true
true,
);
} else {
this.redisListeners.set("pmessageBuffer", this.onmessage.bind(this));
Expand All @@ -172,7 +170,7 @@ export class RedisAdapter extends Adapter {
this.subClient.psubscribe(this.channel + "*");
this.subClient.on(
"pmessageBuffer",
this.redisListeners.get("pmessageBuffer")
this.redisListeners.get("pmessageBuffer"),
);

this.subClient.subscribe([
Expand All @@ -182,7 +180,7 @@ export class RedisAdapter extends Adapter {
]);
this.subClient.on(
"messageBuffer",
this.redisListeners.get("messageBuffer")
this.redisListeners.get("messageBuffer"),
);
}

Expand Down Expand Up @@ -420,7 +418,7 @@ export class RedisAdapter extends Adapter {
type: RequestType.SERVER_SIDE_EMIT,
requestId: request.requestId,
data: arg,
})
}),
);
};
request.data.push(callback);
Expand Down Expand Up @@ -449,7 +447,7 @@ export class RedisAdapter extends Adapter {
type: RequestType.BROADCAST_CLIENT_COUNT,
requestId: request.requestId,
clientCount,
})
}),
);
},
(arg) => {
Expand All @@ -461,9 +459,9 @@ export class RedisAdapter extends Adapter {
type: RequestType.BROADCAST_ACK,
requestId: request.requestId,
packet: arg,
})
}),
);
}
},
);
break;
}
Expand Down Expand Up @@ -596,7 +594,7 @@ export class RedisAdapter extends Adapter {
debug(
"serverSideEmit: got %d responses out of %d",
request.responses.length,
request.numSub
request.numSub,
);
if (request.responses.length === request.numSub) {
clearTimeout(request.timeout);
Expand Down Expand Up @@ -646,7 +644,7 @@ export class RedisAdapter extends Adapter {
packet: any,
opts: BroadcastOptions,
clientCountCallback: (clientCount: number) => void,
ack: (...args: any[]) => void
ack: (...args: any[]) => void,
) {
packet.nsp = this.nsp.name;

Expand Down Expand Up @@ -675,7 +673,7 @@ export class RedisAdapter extends Adapter {
// will simply clean up the ackRequests map after the given delay
const ackTimeout = parseTimeout(
opts.flags?.timeout,
this.requestsTimeout
this.requestsTimeout,
);

const timeout = setTimeout(() => {
Expand Down Expand Up @@ -717,7 +715,7 @@ export class RedisAdapter extends Adapter {
const timeout = setTimeout(() => {
if (this.requests.has(requestId)) {
reject(
new Error("timeout reached while waiting for allRooms response")
new Error("timeout reached while waiting for allRooms response"),
);
this.requests.delete(requestId);
}
Expand Down Expand Up @@ -766,7 +764,9 @@ export class RedisAdapter extends Adapter {
const timeout = setTimeout(() => {
if (this.requests.has(requestId)) {
reject(
new Error("timeout reached while waiting for fetchSockets response")
new Error(
"timeout reached while waiting for fetchSockets response",
),
);
this.requests.delete(requestId);
}
Expand Down Expand Up @@ -881,9 +881,9 @@ export class RedisAdapter extends Adapter {
if (storedRequest) {
ack(
new Error(
`timeout reached: only ${storedRequest.responses.length} responses received out of ${storedRequest.numSub}`
`timeout reached: only ${storedRequest.responses.length} responses received out of ${storedRequest.numSub}`,
),
storedRequest.responses
storedRequest.responses,
);
this.requests.delete(requestId);
}
Expand Down Expand Up @@ -922,31 +922,31 @@ export class RedisAdapter extends Adapter {
this.subClient.pUnsubscribe(
this.channel + "*",
this.redisListeners.get("psub"),
true
true,
);

// There is a bug in redis v4 when unsubscribing multiple channels at once, so we'll unsub one at a time.
// See https://github.com/redis/node-redis/issues/2052
this.subClient.unsubscribe(
this.requestChannel,
this.redisListeners.get("sub"),
true
true,
);
this.subClient.unsubscribe(
this.responseChannel,
this.redisListeners.get("sub"),
true
true,
);
this.subClient.unsubscribe(
this.specificResponseChannel,
this.redisListeners.get("sub"),
true
true,
);
} else {
this.subClient.punsubscribe(this.channel + "*");
this.subClient.off(
"pmessageBuffer",
this.redisListeners.get("pmessageBuffer")
this.redisListeners.get("pmessageBuffer"),
);

this.subClient.unsubscribe([
Expand All @@ -956,7 +956,7 @@ export class RedisAdapter extends Adapter {
]);
this.subClient.off(
"messageBuffer",
this.redisListeners.get("messageBuffer")
this.redisListeners.get("messageBuffer"),
);
}

Expand Down
12 changes: 6 additions & 6 deletions lib/sharded-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export interface ShardedRedisAdapterOptions {
export function createShardedAdapter(
pubClient: any,
subClient: any,
opts?: ShardedRedisAdapterOptions
opts?: ShardedRedisAdapterOptions,
) {
return function (nsp) {
return new ShardedRedisAdapter(nsp, pubClient, subClient, opts);
Expand All @@ -84,7 +84,7 @@ class ShardedRedisAdapter extends ClusterAdapter {
channelPrefix: "socket.io",
subscriptionMode: "dynamic",
},
opts
opts,
);

this.channel = `${this.opts.channelPrefix}#${nsp.name}#`;
Expand Down Expand Up @@ -139,7 +139,7 @@ class ShardedRedisAdapter extends ClusterAdapter {
}

return Promise.all(
channels.map((channel) => SUNSUBSCRIBE(this.subClient, channel))
channels.map((channel) => SUNSUBSCRIBE(this.subClient, channel)),
).then();
}

Expand All @@ -148,7 +148,7 @@ class ShardedRedisAdapter extends ClusterAdapter {
debug("publishing message of type %s to %s", message.type, channel);

return SPUBLISH(this.pubClient, channel, this.encode(message)).then(
() => ""
() => "",
);
}

Expand Down Expand Up @@ -176,14 +176,14 @@ class ShardedRedisAdapter extends ClusterAdapter {

override doPublishResponse(
requesterUid: string,
response: ClusterResponse
response: ClusterResponse,
): Promise<void> {
debug("publishing response of type %s to %s", response.type, requesterUid);

return SPUBLISH(
this.pubClient,
`${this.channel}${requesterUid}#`,
this.encode(response)
this.encode(response),
).then();
}

Expand Down
14 changes: 7 additions & 7 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const kPendingUnsubscribes = Symbol("pendingUnsubscribes");
export function SSUBSCRIBE(
redisClient: any,
channel: string,
handler: (rawMessage: Buffer, channel: Buffer) => void
handler: (rawMessage: Buffer, channel: Buffer) => void,
): Promise<void> {
if (isRedisV4Client(redisClient)) {
return redisClient.sSubscribe(channel, handler, RETURN_BUFFERS);
Expand All @@ -77,7 +77,7 @@ export function SSUBSCRIBE(
redisClient[kListener] = (rawChannel: Buffer, message: Buffer) => {
redisClient[kHandlers].get(rawChannel.toString())?.(
message,
rawChannel
rawChannel,
);
};
redisClient.on("smessageBuffer", redisClient[kListener]);
Expand All @@ -97,7 +97,7 @@ export function SSUBSCRIBE(

export function SUNSUBSCRIBE(
redisClient: any,
channel: string | string[]
channel: string | string[],
): Promise<void> {
if (isRedisV4Client(redisClient)) {
return redisClient.sUnsubscribe(channel);
Expand All @@ -123,7 +123,7 @@ export function SUNSUBSCRIBE(

// Track pending unsubscribe for each channel
channels.forEach((c) =>
redisClient[kPendingUnsubscribes]?.set(c, unsubscribePromise)
redisClient[kPendingUnsubscribes]?.set(c, unsubscribePromise),
);

return unsubscribePromise;
Expand All @@ -136,7 +136,7 @@ export function SUNSUBSCRIBE(
export function SPUBLISH(
redisClient: any,
channel: string,
payload: string | Uint8Array
payload: string | Uint8Array,
) {
if (isRedisV4Client(redisClient)) {
return redisClient.sPublish(channel, payload);
Expand All @@ -153,7 +153,7 @@ export function PUBSUB(redisClient: any, arg: string, channel: string) {
return node
.send_command("PUBSUB", [arg, channel])
.then(parseNumSubResponse);
})
}),
).then(sumValues);
} else if (isRedisV4Client(redisClient)) {
const isCluster = Array.isArray(redisClient.masters);
Expand All @@ -165,7 +165,7 @@ export function PUBSUB(redisClient: any, arg: string, channel: string) {
return node.client
.sendCommand(["PUBSUB", arg, channel])
.then(parseNumSubResponse);
})
}),
).then(sumValues);
} else {
// redis@4 standalone
Expand Down
Loading