You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-09 00:22:08 +03:00
fix monitor, add client.reset & client.resetState, some fixes
This commit is contained in:
@@ -1,10 +1,9 @@
|
|||||||
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
|
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
|
||||||
import encodeCommand from '../RESP/encoder';
|
import encodeCommand from '../RESP/encoder';
|
||||||
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
|
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
|
||||||
import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types';
|
import { CommandArguments, TypeMapping, ReplyUnion, RespVersions, SimpleStringReply, ReplyWithTypeMapping } from '../RESP/types';
|
||||||
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
|
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
|
||||||
import { AbortError, ErrorReply } from '../errors';
|
import { AbortError, ErrorReply } from '../errors';
|
||||||
import { EventEmitter } from 'node:stream';
|
|
||||||
import { MonitorCallback } from '.';
|
import { MonitorCallback } from '.';
|
||||||
|
|
||||||
export interface CommandOptions<T = TypeMapping> {
|
export interface CommandOptions<T = TypeMapping> {
|
||||||
@@ -19,24 +18,24 @@ export interface CommandOptions<T = TypeMapping> {
|
|||||||
|
|
||||||
export interface CommandToWrite extends CommandWaitingForReply {
|
export interface CommandToWrite extends CommandWaitingForReply {
|
||||||
args: CommandArguments;
|
args: CommandArguments;
|
||||||
chainId?: symbol;
|
chainId: symbol | undefined;
|
||||||
abort?: {
|
abort: {
|
||||||
signal: AbortSignal;
|
signal: AbortSignal;
|
||||||
listener: () => unknown;
|
listener: () => unknown;
|
||||||
};
|
} | undefined;
|
||||||
resolveOnWrite?: boolean;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
interface CommandWaitingForReply {
|
interface CommandWaitingForReply {
|
||||||
resolve(reply?: unknown): void;
|
resolve(reply?: unknown): void;
|
||||||
reject(err: unknown): void;
|
reject(err: unknown): void;
|
||||||
channelsCounter?: number;
|
channelsCounter: number | undefined;
|
||||||
typeMapping?: TypeMapping;
|
typeMapping: TypeMapping | undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type OnShardedChannelMoved = (channel: string, listeners: ChannelListeners) => void;
|
export type OnShardedChannelMoved = (channel: string, listeners: ChannelListeners) => void;
|
||||||
|
|
||||||
const PONG = Buffer.from('pong');
|
const PONG = Buffer.from('pong'),
|
||||||
|
RESET = Buffer.from('RESET');
|
||||||
|
|
||||||
const RESP2_PUSH_TYPE_MAPPING = {
|
const RESP2_PUSH_TYPE_MAPPING = {
|
||||||
...PUSH_TYPE_MAPPING,
|
...PUSH_TYPE_MAPPING,
|
||||||
@@ -44,35 +43,28 @@ const RESP2_PUSH_TYPE_MAPPING = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
export default class RedisCommandsQueue {
|
export default class RedisCommandsQueue {
|
||||||
readonly #maxLength: number | null | undefined;
|
readonly #respVersion;
|
||||||
|
readonly #maxLength;
|
||||||
readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
|
readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
|
||||||
readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
|
readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
|
||||||
readonly #onShardedChannelMoved: OnShardedChannelMoved;
|
readonly #onShardedChannelMoved;
|
||||||
|
#chainInExecution: symbol | undefined;
|
||||||
|
readonly decoder;
|
||||||
readonly #pubSub = new PubSub();
|
readonly #pubSub = new PubSub();
|
||||||
|
|
||||||
get isPubSubActive() {
|
get isPubSubActive() {
|
||||||
return this.#pubSub.isActive;
|
return this.#pubSub.isActive;
|
||||||
}
|
}
|
||||||
|
|
||||||
#chainInExecution: symbol | undefined;
|
|
||||||
|
|
||||||
decoder: Decoder;
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
respVersion: RespVersions | null | undefined,
|
respVersion: RespVersions,
|
||||||
maxLength: number | null | undefined,
|
maxLength: number | null | undefined,
|
||||||
onShardedChannelMoved: EventEmitter['emit']
|
onShardedChannelMoved: OnShardedChannelMoved
|
||||||
) {
|
) {
|
||||||
this.decoder = this.#initiateDecoder(respVersion);
|
this.#respVersion = respVersion;
|
||||||
this.#maxLength = maxLength;
|
this.#maxLength = maxLength;
|
||||||
this.#onShardedChannelMoved = onShardedChannelMoved;
|
this.#onShardedChannelMoved = onShardedChannelMoved;
|
||||||
}
|
this.decoder = this.#initiateDecoder();
|
||||||
|
|
||||||
#initiateDecoder(respVersion: RespVersions | null | undefined) {
|
|
||||||
return respVersion === 3 ?
|
|
||||||
this.#initiateResp3Decoder() :
|
|
||||||
this.#initiateResp2Decoder();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#onReply(reply: ReplyUnion) {
|
#onReply(reply: ReplyUnion) {
|
||||||
@@ -111,7 +103,7 @@ export default class RedisCommandsQueue {
|
|||||||
return this.#waitingForReply.head!.value.typeMapping ?? {};
|
return this.#waitingForReply.head!.value.typeMapping ?? {};
|
||||||
}
|
}
|
||||||
|
|
||||||
#initiateResp3Decoder() {
|
#initiateDecoder() {
|
||||||
return new Decoder({
|
return new Decoder({
|
||||||
onReply: reply => this.#onReply(reply),
|
onReply: reply => this.#onReply(reply),
|
||||||
onErrorReply: err => this.#onErrorReply(err),
|
onErrorReply: err => this.#onErrorReply(err),
|
||||||
@@ -124,61 +116,9 @@ export default class RedisCommandsQueue {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#initiateResp2Decoder() {
|
|
||||||
return new Decoder({
|
|
||||||
onReply: reply => {
|
|
||||||
if (this.#pubSub.isActive && Array.isArray(reply)) {
|
|
||||||
if (this.#onPush(reply)) return;
|
|
||||||
|
|
||||||
if (PONG.equals(reply[0] as Buffer)) {
|
|
||||||
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
|
|
||||||
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
|
|
||||||
resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
this.#onReply(reply);
|
|
||||||
},
|
|
||||||
onErrorReply: err => this.#onErrorReply(err),
|
|
||||||
// PUSH type does not exist in RESP2
|
|
||||||
// PubSub is handled in onReply
|
|
||||||
// @ts-expect-error
|
|
||||||
onPush: undefined,
|
|
||||||
getTypeMapping: () => {
|
|
||||||
// PubSub push is an Array in RESP2
|
|
||||||
return this.#pubSub.isActive ?
|
|
||||||
RESP2_PUSH_TYPE_MAPPING :
|
|
||||||
this.#getTypeMapping();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async monitor(callback: MonitorCallback, typeMapping: TypeMapping = {}, asap = false) {
|
|
||||||
await this.addCommand(
|
|
||||||
['MONITOR'],
|
|
||||||
{ asap },
|
|
||||||
true
|
|
||||||
);
|
|
||||||
|
|
||||||
const { onReply, getTypeMapping } = this.decoder;
|
|
||||||
this.decoder.onReply = callback;
|
|
||||||
this.decoder.getTypeMapping = () => typeMapping;
|
|
||||||
return () => new Promise<void>(async resolve => {
|
|
||||||
await this.addCommand(['RESET'], undefined, true);
|
|
||||||
this.decoder.onReply = (reply: string) => {
|
|
||||||
if (reply !== 'RESET') return callback(reply);
|
|
||||||
this.decoder.onReply = onReply;
|
|
||||||
this.decoder.getTypeMapping = getTypeMapping;
|
|
||||||
resolve();
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
addCommand<T>(
|
addCommand<T>(
|
||||||
args: CommandArguments,
|
args: CommandArguments,
|
||||||
options?: CommandOptions,
|
options?: CommandOptions
|
||||||
resolveOnWrite?: boolean
|
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) {
|
if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) {
|
||||||
return Promise.reject(new Error('The queue is full'));
|
return Promise.reject(new Error('The queue is full'));
|
||||||
@@ -192,7 +132,6 @@ export default class RedisCommandsQueue {
|
|||||||
args,
|
args,
|
||||||
chainId: options?.chainId,
|
chainId: options?.chainId,
|
||||||
abort: undefined,
|
abort: undefined,
|
||||||
resolveOnWrite,
|
|
||||||
resolve,
|
resolve,
|
||||||
reject,
|
reject,
|
||||||
channelsCounter: undefined,
|
channelsCounter: undefined,
|
||||||
@@ -215,66 +154,12 @@ export default class RedisCommandsQueue {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribe<T extends boolean>(
|
|
||||||
type: PubSubType,
|
|
||||||
channels: string | Array<string>,
|
|
||||||
listener: PubSubListener<T>,
|
|
||||||
returnBuffers?: T
|
|
||||||
) {
|
|
||||||
return this.#addPubSubCommand(
|
|
||||||
this.#pubSub.subscribe(type, channels, listener, returnBuffers)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
unsubscribe<T extends boolean>(
|
|
||||||
type: PubSubType,
|
|
||||||
channels?: string | Array<string>,
|
|
||||||
listener?: PubSubListener<T>,
|
|
||||||
returnBuffers?: T
|
|
||||||
) {
|
|
||||||
return this.#addPubSubCommand(
|
|
||||||
this.#pubSub.unsubscribe(type, channels, listener, returnBuffers)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
resubscribe(): Promise<any> | undefined {
|
|
||||||
const commands = this.#pubSub.resubscribe();
|
|
||||||
if (!commands.length) return;
|
|
||||||
|
|
||||||
return Promise.all(
|
|
||||||
commands.map(command => this.#addPubSubCommand(command, true))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
extendPubSubChannelListeners(
|
|
||||||
type: PubSubType,
|
|
||||||
channel: string,
|
|
||||||
listeners: ChannelListeners
|
|
||||||
) {
|
|
||||||
return this.#addPubSubCommand(
|
|
||||||
this.#pubSub.extendChannelListeners(type, channel, listeners)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) {
|
|
||||||
return this.#addPubSubCommand(
|
|
||||||
this.#pubSub.extendTypeListeners(type, listeners)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
getPubSubListeners(type: PubSubType) {
|
|
||||||
return this.#pubSub.getTypeListeners(type);
|
|
||||||
}
|
|
||||||
|
|
||||||
#addPubSubCommand(command: PubSubCommand, asap = false) {
|
#addPubSubCommand(command: PubSubCommand, asap = false) {
|
||||||
if (command === undefined) return;
|
|
||||||
|
|
||||||
return new Promise<void>((resolve, reject) => {
|
return new Promise<void>((resolve, reject) => {
|
||||||
this.#toWrite.add({
|
this.#toWrite.add({
|
||||||
args: command.args,
|
args: command.args,
|
||||||
chainId: undefined,
|
chainId: undefined,
|
||||||
abort: undefined,
|
abort: undefined,
|
||||||
resolveOnWrite: false,
|
|
||||||
resolve() {
|
resolve() {
|
||||||
command.resolve();
|
command.resolve();
|
||||||
resolve();
|
resolve();
|
||||||
@@ -289,6 +174,171 @@ export default class RedisCommandsQueue {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#setupPubSubHandler(command: Exclude<PubSubCommand, undefined>) {
|
||||||
|
// RESP3 uses `onPush` to handle PubSub, so no need to modify `onReply`
|
||||||
|
if (this.#respVersion !== 2) return;
|
||||||
|
|
||||||
|
// overriding `resolve` instead of using `.then` to make sure it'll be called before processing the next reply
|
||||||
|
const { resolve } = command;
|
||||||
|
command.resolve = () => {
|
||||||
|
this.decoder.onReply = (reply => {
|
||||||
|
if (Array.isArray(reply)) {
|
||||||
|
if (this.#onPush(reply)) return;
|
||||||
|
|
||||||
|
if (PONG.equals(reply[0] as Buffer)) {
|
||||||
|
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
|
||||||
|
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
|
||||||
|
resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.#onReply(reply);
|
||||||
|
}) as Decoder['onReply'];
|
||||||
|
this.decoder.getTypeMapping = () => RESP2_PUSH_TYPE_MAPPING;
|
||||||
|
resolve();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
subscribe<T extends boolean>(
|
||||||
|
type: PubSubType,
|
||||||
|
channels: string | Array<string>,
|
||||||
|
listener: PubSubListener<T>,
|
||||||
|
returnBuffers?: T
|
||||||
|
) {
|
||||||
|
const command = this.#pubSub.subscribe(type, channels, listener, returnBuffers);
|
||||||
|
if (!command) return;
|
||||||
|
|
||||||
|
this.#setupPubSubHandler(command);
|
||||||
|
return this.#addPubSubCommand(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
#resetDecoderCallbacks() {
|
||||||
|
this.decoder.onReply = (reply => this.#onReply(reply)) as Decoder['onReply'];
|
||||||
|
this.decoder.getTypeMapping = () => this.#getTypeMapping();
|
||||||
|
}
|
||||||
|
|
||||||
|
unsubscribe<T extends boolean>(
|
||||||
|
type: PubSubType,
|
||||||
|
channels?: string | Array<string>,
|
||||||
|
listener?: PubSubListener<T>,
|
||||||
|
returnBuffers?: T
|
||||||
|
) {
|
||||||
|
const command = this.#pubSub.unsubscribe(type, channels, listener, returnBuffers);
|
||||||
|
if (!command) return;
|
||||||
|
|
||||||
|
if (command && this.#respVersion === 2) {
|
||||||
|
// RESP2 modifies `onReply` to handle PubSub (see #setupPubSubHandler)
|
||||||
|
const { resolve } = command;
|
||||||
|
command.resolve = () => {
|
||||||
|
if (!this.#pubSub.isActive) {
|
||||||
|
this.#resetDecoderCallbacks();
|
||||||
|
}
|
||||||
|
|
||||||
|
resolve();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.#addPubSubCommand(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
resubscribe() {
|
||||||
|
const commands = this.#pubSub.resubscribe();
|
||||||
|
if (!commands.length) return;
|
||||||
|
|
||||||
|
// using last command becasue of asap
|
||||||
|
this.#setupPubSubHandler(commands[commands.length - 1]);
|
||||||
|
return Promise.all(
|
||||||
|
commands.map(command => this.#addPubSubCommand(command, true))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
extendPubSubChannelListeners(
|
||||||
|
type: PubSubType,
|
||||||
|
channel: string,
|
||||||
|
listeners: ChannelListeners
|
||||||
|
) {
|
||||||
|
const command = this.#pubSub.extendChannelListeners(type, channel, listeners);
|
||||||
|
if (!command) return;
|
||||||
|
|
||||||
|
this.#setupPubSubHandler(command);
|
||||||
|
return this.#addPubSubCommand(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) {
|
||||||
|
const command = this.#pubSub.extendTypeListeners(type, listeners);
|
||||||
|
if (!command) return;
|
||||||
|
|
||||||
|
this.#setupPubSubHandler(command);
|
||||||
|
return this.#addPubSubCommand(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
getPubSubListeners(type: PubSubType) {
|
||||||
|
return this.#pubSub.getTypeListeners(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
monitor(callback: MonitorCallback, typeMapping: TypeMapping = {}, asap = false) {
|
||||||
|
return new Promise<void>((resolve, reject) => {
|
||||||
|
this.#toWrite.add({
|
||||||
|
args: ['MONITOR'],
|
||||||
|
chainId: undefined,
|
||||||
|
abort: undefined,
|
||||||
|
// using `resolve` instead of using `.then`/`await` to make sure it'll be called before processing the next reply
|
||||||
|
resolve: () => {
|
||||||
|
// after running `MONITOR` only `MONITOR` and `RESET` replies are expected
|
||||||
|
// any other command should cause an error
|
||||||
|
|
||||||
|
// if `RESET` already overrides `onReply`, set monitor as it's fallback
|
||||||
|
if (this.#resetFallbackOnReply) {
|
||||||
|
this.#resetFallbackOnReply = callback;
|
||||||
|
} else {
|
||||||
|
this.decoder.onReply = callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.decoder.getTypeMapping = () => typeMapping;
|
||||||
|
resolve();
|
||||||
|
},
|
||||||
|
reject,
|
||||||
|
channelsCounter: undefined,
|
||||||
|
typeMapping
|
||||||
|
}, asap);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#resetFallbackOnReply?: Decoder['onReply'];
|
||||||
|
|
||||||
|
async reset<T extends TypeMapping>(typeMapping?: T) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
// overriding onReply to handle `RESET` while in `MONITOR` or PubSub mode
|
||||||
|
this.#resetFallbackOnReply = this.decoder.onReply;
|
||||||
|
this.decoder.onReply = (reply => {
|
||||||
|
if (
|
||||||
|
(typeof reply === 'string' && reply === 'RESET') ||
|
||||||
|
(reply instanceof Buffer && RESET.equals(reply))
|
||||||
|
) {
|
||||||
|
this.#resetDecoderCallbacks();
|
||||||
|
this.#resetFallbackOnReply = undefined;
|
||||||
|
this.#pubSub.reset();
|
||||||
|
|
||||||
|
this.#waitingForReply.shift()!.resolve(reply);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.#resetFallbackOnReply!(reply);
|
||||||
|
}) as Decoder['onReply'];
|
||||||
|
|
||||||
|
this.#toWrite.push({
|
||||||
|
args: ['RESET'],
|
||||||
|
chainId: undefined,
|
||||||
|
abort: undefined,
|
||||||
|
resolve,
|
||||||
|
reject,
|
||||||
|
channelsCounter: undefined,
|
||||||
|
typeMapping
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
isWaitingToWrite() {
|
isWaitingToWrite() {
|
||||||
return this.#toWrite.length > 0;
|
return this.#toWrite.length > 0;
|
||||||
}
|
}
|
||||||
@@ -305,22 +355,15 @@ export default class RedisCommandsQueue {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO reuse `toSend` or create new object?
|
||||||
|
(toSend as any).args = undefined;
|
||||||
if (toSend.abort) {
|
if (toSend.abort) {
|
||||||
RedisCommandsQueue.#removeAbortListener(toSend);
|
RedisCommandsQueue.#removeAbortListener(toSend);
|
||||||
toSend.abort = undefined;
|
toSend.abort = undefined;
|
||||||
}
|
}
|
||||||
|
this.#chainInExecution = toSend.chainId;
|
||||||
if (toSend.resolveOnWrite) {
|
toSend.chainId = undefined;
|
||||||
toSend.resolve();
|
this.#waitingForReply.push(toSend);
|
||||||
} else {
|
|
||||||
// TODO reuse `toSend` or create new object?
|
|
||||||
(toSend as any).args = undefined;
|
|
||||||
|
|
||||||
this.#chainInExecution = toSend.chainId;
|
|
||||||
toSend.chainId = undefined;
|
|
||||||
|
|
||||||
this.#waitingForReply.push(toSend);
|
|
||||||
}
|
|
||||||
|
|
||||||
yield encoded;
|
yield encoded;
|
||||||
toSend = this.#toWrite.shift();
|
toSend = this.#toWrite.shift();
|
||||||
|
@@ -742,8 +742,10 @@ describe('Client', () => {
|
|||||||
}, GLOBAL.SERVERS.OPEN);
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
|
|
||||||
testUtils.testWithClient('should be able to go back to "normal mode"', async client => {
|
testUtils.testWithClient('should be able to go back to "normal mode"', async client => {
|
||||||
const off = await client.monitor(() => {});
|
await Promise.all([
|
||||||
await off();
|
client.monitor(() => {}),
|
||||||
|
client.reset()
|
||||||
|
]);
|
||||||
await assert.doesNotReject(client.ping());
|
await assert.doesNotReject(client.ping());
|
||||||
}, GLOBAL.SERVERS.OPEN);
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
|
|
||||||
|
@@ -326,15 +326,85 @@ export default class RedisClient<
|
|||||||
|
|
||||||
#initiateQueue(): RedisCommandsQueue {
|
#initiateQueue(): RedisCommandsQueue {
|
||||||
return new RedisCommandsQueue(
|
return new RedisCommandsQueue(
|
||||||
this.#options?.RESP,
|
this.#options?.RESP ?? 2,
|
||||||
this.#options?.commandsQueueMaxLength,
|
this.#options?.commandsQueueMaxLength,
|
||||||
(channel, listeners) => this.emit('sharded-channel-moved', channel, listeners)
|
(channel, listeners) => this.emit('sharded-channel-moved', channel, listeners)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#handshake(asap = false, promises: Array<Promise<unknown>> = []) {
|
||||||
|
if (this.#selectedDB !== 0) {
|
||||||
|
promises.push(
|
||||||
|
this.#queue.addCommand(
|
||||||
|
['SELECT', this.#selectedDB.toString()],
|
||||||
|
{ asap }
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.#options?.readonly) {
|
||||||
|
promises.push(
|
||||||
|
this.#queue.addCommand(
|
||||||
|
COMMANDS.READONLY.transformArguments(),
|
||||||
|
{ asap }
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.#options?.RESP) {
|
||||||
|
const hello: HelloOptions = {};
|
||||||
|
|
||||||
|
if (this.#options.password) {
|
||||||
|
hello.AUTH = {
|
||||||
|
username: this.#options.username ?? 'default',
|
||||||
|
password: this.#options.password
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.#options.name) {
|
||||||
|
hello.SETNAME = this.#options.name;
|
||||||
|
}
|
||||||
|
|
||||||
|
promises.push(
|
||||||
|
this.#queue.addCommand(
|
||||||
|
HELLO.transformArguments(this.#options.RESP, hello),
|
||||||
|
{ asap }
|
||||||
|
)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
if (this.#options?.name) {
|
||||||
|
promises.push(
|
||||||
|
this.#queue.addCommand(
|
||||||
|
COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name),
|
||||||
|
{ asap }
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.#options?.username || this.#options?.password) {
|
||||||
|
promises.push(
|
||||||
|
this.#queue.addCommand(
|
||||||
|
COMMANDS.AUTH.transformArguments({
|
||||||
|
username: this.#options.username,
|
||||||
|
password: this.#options.password ?? ''
|
||||||
|
}),
|
||||||
|
{ asap }
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return promises;
|
||||||
|
}
|
||||||
|
|
||||||
#initiateSocket(): RedisSocket {
|
#initiateSocket(): RedisSocket {
|
||||||
const socketInitiator = async (): Promise<void> => {
|
const socketInitiator = () => {
|
||||||
const promises = [this.#queue.resubscribe()];
|
const promises: Array<Promise<unknown>> = [];
|
||||||
|
|
||||||
|
const resubscribePromise = this.#queue.resubscribe();
|
||||||
|
if (resubscribePromise) {
|
||||||
|
promises.push(resubscribePromise);
|
||||||
|
}
|
||||||
|
|
||||||
if (this.#monitorCallback) {
|
if (this.#monitorCallback) {
|
||||||
promises.push(
|
promises.push(
|
||||||
@@ -346,70 +416,11 @@ export default class RedisClient<
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.#selectedDB !== 0) {
|
this.#handshake(true, promises);
|
||||||
promises.push(
|
|
||||||
this.#queue.addCommand(
|
|
||||||
['SELECT', this.#selectedDB.toString()],
|
|
||||||
{ asap: true }
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.#options?.readonly) {
|
|
||||||
promises.push(
|
|
||||||
this.#queue.addCommand(
|
|
||||||
COMMANDS.READONLY.transformArguments(),
|
|
||||||
{ asap: true }
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.#options?.RESP) {
|
|
||||||
const hello: HelloOptions = {};
|
|
||||||
|
|
||||||
if (this.#options.password) {
|
|
||||||
hello.AUTH = {
|
|
||||||
username: this.#options.username ?? 'default',
|
|
||||||
password: this.#options.password
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.#options.name) {
|
|
||||||
hello.SETNAME = this.#options.name;
|
|
||||||
}
|
|
||||||
|
|
||||||
promises.push(
|
|
||||||
this.#queue.addCommand(
|
|
||||||
HELLO.transformArguments(this.#options.RESP, hello),
|
|
||||||
{ asap: true }
|
|
||||||
)
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
if (this.#options?.name) {
|
|
||||||
promises.push(
|
|
||||||
this.#queue.addCommand(
|
|
||||||
COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name),
|
|
||||||
{ asap: true }
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.#options?.username || this.#options?.password) {
|
|
||||||
promises.push(
|
|
||||||
this.#queue.addCommand(
|
|
||||||
COMMANDS.AUTH.transformArguments({
|
|
||||||
username: this.#options.username,
|
|
||||||
password: this.#options.password ?? ''
|
|
||||||
}),
|
|
||||||
{ asap: true }
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (promises.length) {
|
if (promises.length) {
|
||||||
this.#write();
|
this.#write();
|
||||||
await Promise.all(promises);
|
return Promise.all(promises);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -876,19 +887,48 @@ export default class RedisClient<
|
|||||||
async MONITOR(callback: MonitorCallback<TYPE_MAPPING>) {
|
async MONITOR(callback: MonitorCallback<TYPE_MAPPING>) {
|
||||||
const promise = this._self.#queue.monitor(callback, this._commandOptions?.typeMapping);
|
const promise = this._self.#queue.monitor(callback, this._commandOptions?.typeMapping);
|
||||||
this._self.#scheduleWrite();
|
this._self.#scheduleWrite();
|
||||||
|
await promise;
|
||||||
const off = await promise;
|
|
||||||
this._self.#monitorCallback = callback;
|
this._self.#monitorCallback = callback;
|
||||||
return async () => {
|
|
||||||
const promise = off();
|
|
||||||
this._self.#scheduleWrite();
|
|
||||||
await promise;
|
|
||||||
this._self.#monitorCallback = undefined;
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
monitor = this.MONITOR;
|
monitor = this.MONITOR;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the client to its default state (i.e. stop PubSub, stop monitoring, select default DB, etc.)
|
||||||
|
*/
|
||||||
|
async reset() {
|
||||||
|
const promises = [this._self.#queue.reset()];
|
||||||
|
this._self.#handshake(false, promises);
|
||||||
|
await Promise.all(promises);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the client has state, reset it.
|
||||||
|
* An internal function to be used by wrapper class such as `RedisClientPool`.
|
||||||
|
* @internal
|
||||||
|
*/
|
||||||
|
resetIfDirty() {
|
||||||
|
let shouldReset = false;
|
||||||
|
if (this._self.#selectedDB !== this._self.#options?.database ?? 0) {
|
||||||
|
console.warn('Returning a client with a different selected DB');
|
||||||
|
shouldReset = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this._self.#monitorCallback) {
|
||||||
|
console.warn('Returning a client with active MONITOR');
|
||||||
|
shouldReset = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this._self.#queue.isPubSubActive) {
|
||||||
|
console.warn('Returning a client with active PubSub');
|
||||||
|
shouldReset = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shouldReset) {
|
||||||
|
return this.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated use .close instead
|
* @deprecated use .close instead
|
||||||
*/
|
*/
|
||||||
|
@@ -375,6 +375,7 @@ export class RedisClientPool<
|
|||||||
#returnClient(node: DoublyLinkedNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>) {
|
#returnClient(node: DoublyLinkedNode<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>) {
|
||||||
const task = this.#tasksQueue.shift();
|
const task = this.#tasksQueue.shift();
|
||||||
if (task) {
|
if (task) {
|
||||||
|
clearTimeout(task.timeout);
|
||||||
this.#executeTask(node, task.resolve, task.reject, task.fn);
|
this.#executeTask(node, task.resolve, task.reject, task.fn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { RedisArgument } from '../RESP/types';
|
import { RedisArgument } from '../RESP/types';
|
||||||
|
import { CommandToWrite } from './commands-queue';
|
||||||
|
|
||||||
export enum PubSubType {
|
export enum PubSubType {
|
||||||
CHANNELS = 'CHANNELS',
|
CHANNELS = 'CHANNELS',
|
||||||
@@ -26,7 +27,7 @@ const COMMANDS = {
|
|||||||
|
|
||||||
export type PubSubListener<
|
export type PubSubListener<
|
||||||
RETURN_BUFFERS extends boolean = false
|
RETURN_BUFFERS extends boolean = false
|
||||||
> = <T = RETURN_BUFFERS extends true ? Buffer : string>(message: T, channel: T) => unknown;
|
> = <T extends RETURN_BUFFERS extends true ? Buffer : string>(message: T, channel: T) => unknown;
|
||||||
|
|
||||||
export interface ChannelListeners {
|
export interface ChannelListeners {
|
||||||
unsubscribing: boolean;
|
unsubscribing: boolean;
|
||||||
@@ -38,11 +39,11 @@ export type PubSubTypeListeners = Map<string, ChannelListeners>;
|
|||||||
|
|
||||||
type Listeners = Record<PubSubType, PubSubTypeListeners>;
|
type Listeners = Record<PubSubType, PubSubTypeListeners>;
|
||||||
|
|
||||||
export type PubSubCommand = ReturnType<
|
export type PubSubCommand = (
|
||||||
typeof PubSub.prototype.subscribe |
|
Required<Pick<CommandToWrite, 'args' | 'channelsCounter' | 'resolve'>> & {
|
||||||
typeof PubSub.prototype.unsubscribe |
|
reject: undefined | (() => unknown);
|
||||||
typeof PubSub.prototype.extendTypeListeners
|
}
|
||||||
>;
|
);
|
||||||
|
|
||||||
export class PubSub {
|
export class PubSub {
|
||||||
static isStatusReply(reply: Array<Buffer>): boolean {
|
static isStatusReply(reply: Array<Buffer>): boolean {
|
||||||
@@ -135,7 +136,7 @@ export class PubSub {
|
|||||||
this.#subscribing--;
|
this.#subscribing--;
|
||||||
this.#updateIsActive();
|
this.#updateIsActive();
|
||||||
}
|
}
|
||||||
};
|
} satisfies PubSubCommand;
|
||||||
}
|
}
|
||||||
|
|
||||||
extendChannelListeners(
|
extendChannelListeners(
|
||||||
@@ -158,7 +159,7 @@ export class PubSub {
|
|||||||
this.#subscribing--;
|
this.#subscribing--;
|
||||||
this.#updateIsActive();
|
this.#updateIsActive();
|
||||||
}
|
}
|
||||||
};
|
} satisfies PubSubCommand;
|
||||||
}
|
}
|
||||||
|
|
||||||
#extendChannelListeners(
|
#extendChannelListeners(
|
||||||
@@ -203,7 +204,7 @@ export class PubSub {
|
|||||||
this.#subscribing--;
|
this.#subscribing--;
|
||||||
this.#updateIsActive();
|
this.#updateIsActive();
|
||||||
}
|
}
|
||||||
};
|
} satisfies PubSubCommand;
|
||||||
}
|
}
|
||||||
|
|
||||||
unsubscribe<T extends boolean>(
|
unsubscribe<T extends boolean>(
|
||||||
@@ -299,8 +300,8 @@ export class PubSub {
|
|||||||
removeListeners();
|
removeListeners();
|
||||||
this.#updateIsActive();
|
this.#updateIsActive();
|
||||||
},
|
},
|
||||||
reject: undefined // use the same structure as `subscribe`
|
reject: undefined
|
||||||
};
|
} satisfies PubSubCommand;
|
||||||
}
|
}
|
||||||
|
|
||||||
#updateIsActive() {
|
#updateIsActive() {
|
||||||
@@ -317,7 +318,7 @@ export class PubSub {
|
|||||||
this.#subscribing = 0;
|
this.#subscribing = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
resubscribe(): Array<PubSubCommand> {
|
resubscribe() {
|
||||||
const commands = [];
|
const commands = [];
|
||||||
for (const [type, listeners] of Object.entries(this.#listeners)) {
|
for (const [type, listeners] of Object.entries(this.#listeners)) {
|
||||||
if (!listeners.size) continue;
|
if (!listeners.size) continue;
|
||||||
@@ -333,7 +334,7 @@ export class PubSub {
|
|||||||
channelsCounter: listeners.size,
|
channelsCounter: listeners.size,
|
||||||
resolve: callback,
|
resolve: callback,
|
||||||
reject: callback
|
reject: callback
|
||||||
});
|
} satisfies PubSubCommand);
|
||||||
}
|
}
|
||||||
|
|
||||||
return commands;
|
return commands;
|
||||||
|
@@ -43,7 +43,7 @@ interface CreateSocketReturn<T> {
|
|||||||
socket: T;
|
socket: T;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type RedisSocketInitiator = () => Promise<void>;
|
export type RedisSocketInitiator = () => void | Promise<unknown>;
|
||||||
|
|
||||||
export default class RedisSocket extends EventEmitter {
|
export default class RedisSocket extends EventEmitter {
|
||||||
static #initiateOptions(options?: RedisSocketOptions): RedisSocketOptions {
|
static #initiateOptions(options?: RedisSocketOptions): RedisSocketOptions {
|
||||||
|
Reference in New Issue
Block a user