1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00
This commit is contained in:
Leibale
2023-04-23 07:56:15 -04:00
parent 35d32e5b64
commit 9faa3e77c4
326 changed files with 14620 additions and 10931 deletions

View File

@@ -1,424 +1,549 @@
import COMMANDS from './commands';
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, RedisCommandSignature, RedisFunction } from '../commands';
import { ClientCommandOptions, RedisClientOptions, RedisClientType, WithFunctions, WithModules, WithScripts } from '../client';
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
import { attachExtensions, transformCommandReply, attachCommands, transformCommandArguments } from '../commander';
import { ClientCommandOptions, RedisClientOptions, RedisClientType } from '../client';
import { Command, CommandArguments, CommanderConfig, CommandPolicies, CommandSignature, CommandWithPoliciesSignature, Flags, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, TransformReply } from '../RESP/types';
import COMMANDS from '../commands';
import { EventEmitter } from 'events';
import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command';
import { RedisMultiQueuedCommand } from '../multi-command';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
// import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command';
// import { RedisMultiQueuedCommand } from '../multi-command';
import { PubSubListener } from '../client/pub-sub';
import { ErrorReply } from '../errors';
export type RedisClusterClientOptions = Omit<
RedisClientOptions,
'modules' | 'functions' | 'scripts' | 'database'
RedisClientOptions,
'modules' | 'functions' | 'scripts' | 'database' | 'RESP'
>;
export interface RedisClusterOptions<
M extends RedisModules = Record<string, never>,
F extends RedisFunctions = Record<string, never>,
S extends RedisScripts = Record<string, never>
> extends RedisExtensions<M, F, S> {
/**
* Should contain details for some of the cluster nodes that the client will use to discover
* the "cluster topology". We recommend including details for at least 3 nodes here.
*/
rootNodes: Array<RedisClusterClientOptions>;
/**
* Default values used for every client in the cluster. Use this to specify global values,
* for example: ACL credentials, timeouts, TLS configuration etc.
*/
defaults?: Partial<RedisClusterClientOptions>;
/**
* When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes.
* Useful for short-term or PubSub-only connections.
*/
minimizeConnections?: boolean;
/**
* When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes.
*/
useReplicas?: boolean;
/**
* The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors.
*/
maxCommandRedirections?: number;
/**
* Mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to
* Useful when the cluster is running on another network
*
*/
nodeAddressMap?: NodeAddressMap;
M extends RedisModules = RedisModules,
F extends RedisFunctions = RedisFunctions,
S extends RedisScripts = RedisScripts,
RESP extends RespVersions = RespVersions
> extends CommanderConfig<M, F, S, RESP> {
/**
* Should contain details for some of the cluster nodes that the client will use to discover
* the "cluster topology". We recommend including details for at least 3 nodes here.
*/
rootNodes: Array<RedisClusterClientOptions>;
/**
* Default values used for every client in the cluster. Use this to specify global values,
* for example: ACL credentials, timeouts, TLS configuration etc.
*/
defaults?: Partial<RedisClusterClientOptions>;
/**
* When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes.
* Useful for short-term or PubSub-only connections.
*/
minimizeConnections?: boolean;
/**
* When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes.
*/
// TODO: replicas only mode?
useReplicas?: boolean;
/**
* The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors.
*/
maxCommandRedirections?: number;
/**
* Mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to
* Useful when the cluster is running on another network
*/
nodeAddressMap?: NodeAddressMap;
}
type WithCommands = {
[P in keyof typeof COMMANDS]: RedisCommandSignature<(typeof COMMANDS)[P]>;
type WithCommands<
RESP extends RespVersions,
FLAGS extends Flags,
POLICIES extends CommandPolicies
> = {
[P in keyof typeof COMMANDS]: CommandWithPoliciesSignature<(typeof COMMANDS)[P], RESP, FLAGS, POLICIES>;
};
export type RedisClusterType<
M extends RedisModules = Record<string, never>,
F extends RedisFunctions = Record<string, never>,
S extends RedisScripts = Record<string, never>
> = RedisCluster<M, F, S> & WithCommands & WithModules<M> & WithFunctions<F> & WithScripts<S>;
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
FLAGS extends Flags = {},
POLICIES extends CommandPolicies = {}
> = RedisCluster<M, F, S, RESP, FLAGS, POLICIES> & WithCommands<RESP, FLAGS, POLICIES>;
// & WithModules<M> & WithFunctions<F> & WithScripts<S>
export default class RedisCluster<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> extends EventEmitter {
static extractFirstKey(
command: RedisCommand,
originalArgs: Array<unknown>,
redisArgs: RedisCommandArguments
): RedisCommandArgument | undefined {
if (command.FIRST_KEY_INDEX === undefined) {
return undefined;
} else if (typeof command.FIRST_KEY_INDEX === 'number') {
return redisArgs[command.FIRST_KEY_INDEX];
}
return command.FIRST_KEY_INDEX(...originalArgs);
}
static create<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
>(options?: RedisClusterOptions<M, F, S>): RedisClusterType<M, F, S> {
return new (attachExtensions({
BaseClass: RedisCluster,
modulesExecutor: RedisCluster.prototype.commandsExecutor,
modules: options?.modules,
functionsExecutor: RedisCluster.prototype.functionsExecutor,
functions: options?.functions,
scriptsExecutor: RedisCluster.prototype.scriptsExecutor,
scripts: options?.scripts
}))(options);
}
readonly #options: RedisClusterOptions<M, F, S>;
readonly #slots: RedisClusterSlots<M, F, S>;
get slots() {
return this.#slots.slots;
}
get shards() {
return this.#slots.shards;
}
get masters() {
return this.#slots.masters;
}
get replicas() {
return this.#slots.replicas;
}
get nodeByAddress() {
return this.#slots.nodeByAddress;
}
get pubSubNode() {
return this.#slots.pubSubNode;
}
readonly #Multi: InstantiableRedisClusterMultiCommandType<M, F, S>;
get isOpen() {
return this.#slots.isOpen;
}
constructor(options: RedisClusterOptions<M, F, S>) {
super();
this.#options = options;
this.#slots = new RedisClusterSlots(options, this.emit.bind(this));
this.#Multi = RedisClusterMultiCommand.extend(options);
}
duplicate(overrides?: Partial<RedisClusterOptions<M, F, S>>): RedisClusterType<M, F, S> {
return new (Object.getPrototypeOf(this).constructor)({
...this.#options,
...overrides
});
}
connect() {
return this.#slots.connect();
}
async commandsExecutor<C extends RedisCommand>(
command: C,
args: Array<unknown>
): Promise<RedisCommandReply<C>> {
const { args: redisArgs, options } = transformCommandArguments(command, args);
return transformCommandReply(
command,
await this.sendCommand(
RedisCluster.extractFirstKey(command, args, redisArgs),
command.IS_READ_ONLY,
redisArgs,
options
),
redisArgs.preserve
);
}
async sendCommand<T = RedisCommandRawReply>(
firstKey: RedisCommandArgument | undefined,
isReadonly: boolean | undefined,
args: RedisCommandArguments,
options?: ClientCommandOptions
): Promise<T> {
return this.#execute(
firstKey,
isReadonly,
client => client.sendCommand<T>(args, options)
);
}
async functionsExecutor<F extends RedisFunction>(
fn: F,
args: Array<unknown>,
name: string,
): Promise<RedisCommandReply<F>> {
const { args: redisArgs, options } = transformCommandArguments(fn, args);
return transformCommandReply(
fn,
await this.executeFunction(
name,
fn,
args,
redisArgs,
options
),
redisArgs.preserve
);
}
async executeFunction(
name: string,
fn: RedisFunction,
originalArgs: Array<unknown>,
redisArgs: RedisCommandArguments,
options?: ClientCommandOptions
): Promise<RedisCommandRawReply> {
return this.#execute(
RedisCluster.extractFirstKey(fn, originalArgs, redisArgs),
fn.IS_READ_ONLY,
client => client.executeFunction(name, fn, redisArgs, options)
);
}
async scriptsExecutor<S extends RedisScript>(script: S, args: Array<unknown>): Promise<RedisCommandReply<S>> {
const { args: redisArgs, options } = transformCommandArguments(script, args);
return transformCommandReply(
script,
await this.executeScript(
script,
args,
redisArgs,
options
),
redisArgs.preserve
);
}
async executeScript(
script: RedisScript,
originalArgs: Array<unknown>,
redisArgs: RedisCommandArguments,
options?: ClientCommandOptions
): Promise<RedisCommandRawReply> {
return this.#execute(
RedisCluster.extractFirstKey(script, originalArgs, redisArgs),
script.IS_READ_ONLY,
client => client.executeScript(script, redisArgs, options)
);
}
async #execute<Reply>(
firstKey: RedisCommandArgument | undefined,
isReadonly: boolean | undefined,
executor: (client: RedisClientType<M, F, S>) => Promise<Reply>
): Promise<Reply> {
const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16;
let client = await this.#slots.getClient(firstKey, isReadonly);
for (let i = 0;; i++) {
try {
return await executor(client);
} catch (err) {
if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) {
throw err;
}
if (err.message.startsWith('ASK')) {
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
let redirectTo = await this.#slots.getMasterByAddress(address);
if (!redirectTo) {
await this.#slots.rediscover(client);
redirectTo = await this.#slots.getMasterByAddress(address);
}
if (!redirectTo) {
throw new Error(`Cannot find node ${address}`);
}
await redirectTo.asking();
client = redirectTo;
continue;
} else if (err.message.startsWith('MOVED')) {
await this.#slots.rediscover(client);
client = await this.#slots.getClient(firstKey, isReadonly);
continue;
}
throw err;
}
}
}
MULTI(routing?: RedisCommandArgument): RedisClusterMultiCommandType<M, F, S> {
return new this.#Multi(
(commands: Array<RedisMultiQueuedCommand>, firstKey?: RedisCommandArgument, chainId?: symbol) => {
return this.#execute(
firstKey,
false,
client => client.multiExecutor(commands, undefined, chainId)
);
},
routing
);
}
multi = this.MULTI;
async SUBSCRIBE<T extends boolean = false>(
channels: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return (await this.#slots.getPubSubClient())
.SUBSCRIBE(channels, listener, bufferMode);
}
subscribe = this.SUBSCRIBE;
async UNSUBSCRIBE<T extends boolean = false>(
channels?: string | Array<string>,
listener?: PubSubListener<boolean>,
bufferMode?: T
) {
return this.#slots.executeUnsubscribeCommand(client =>
client.UNSUBSCRIBE(channels, listener, bufferMode)
);
}
unsubscribe = this.UNSUBSCRIBE;
async PSUBSCRIBE<T extends boolean = false>(
patterns: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return (await this.#slots.getPubSubClient())
.PSUBSCRIBE(patterns, listener, bufferMode);
}
pSubscribe = this.PSUBSCRIBE;
async PUNSUBSCRIBE<T extends boolean = false>(
patterns?: string | Array<string>,
listener?: PubSubListener<T>,
bufferMode?: T
) {
return this.#slots.executeUnsubscribeCommand(client =>
client.PUNSUBSCRIBE(patterns, listener, bufferMode)
);
}
pUnsubscribe = this.PUNSUBSCRIBE;
async SSUBSCRIBE<T extends boolean = false>(
channels: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16,
firstChannel = Array.isArray(channels) ? channels[0] : channels;
let client = await this.#slots.getShardedPubSubClient(firstChannel);
for (let i = 0;; i++) {
try {
return await client.SSUBSCRIBE(channels, listener, bufferMode);
} catch (err) {
if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) {
throw err;
}
if (err.message.startsWith('MOVED')) {
await this.#slots.rediscover(client);
client = await this.#slots.getShardedPubSubClient(firstChannel);
continue;
}
throw err;
}
}
}
sSubscribe = this.SSUBSCRIBE;
SUNSUBSCRIBE<T extends boolean = false>(
channels: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return this.#slots.executeShardedUnsubscribeCommand(
Array.isArray(channels) ? channels[0] : channels,
client => client.SUNSUBSCRIBE(channels, listener, bufferMode)
);
}
sUnsubscribe = this.SUNSUBSCRIBE;
quit(): Promise<void> {
return this.#slots.quit();
}
disconnect(): Promise<void> {
return this.#slots.disconnect();
}
nodeClient(node: ShardNode<M, F, S>) {
return this.#slots.nodeClient(node);
}
getRandomNode() {
return this.#slots.getRandomNode();
}
getSlotRandomNode(slot: number) {
return this.#slots.getSlotRandomNode(slot);
}
/**
* @deprecated use `.masters` instead
*/
getMasters() {
return this.masters;
}
/**
* @deprecated use `.slots[<SLOT>]` instead
*/
getSlotMaster(slot: number) {
return this.slots[slot].master;
}
export interface ClusterCommandOptions extends ClientCommandOptions {
policies?: CommandPolicies;
}
attachCommands({
BaseClass: RedisCluster,
commands: COMMANDS,
executor: RedisCluster.prototype.commandsExecutor
});
type ProxyCluster = RedisCluster<RedisModules, RedisFunctions, RedisScripts, RespVersions, Flags, CommandPolicies> & { commandOptions?: ClusterCommandOptions };
type NamespaceProxyCluster = { self: ProxyCluster };
export default class RedisCluster<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags,
POLICIES extends CommandPolicies
> extends EventEmitter {
private static _extractFirstKey<C extends Command>(
command: C,
args: Parameters<C['transformArguments']>,
redisArgs: Array<RedisArgument>
): RedisArgument | undefined {
if (command.FIRST_KEY_INDEX === undefined) {
return undefined;
} else if (typeof command.FIRST_KEY_INDEX === 'number') {
return redisArgs[command.FIRST_KEY_INDEX];
}
return command.FIRST_KEY_INDEX(...args);
}
private static _createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: ProxyCluster) {
const args = command.transformArguments.apply(undefined, arguments as any),
firstKey = RedisCluster._extractFirstKey(
command,
arguments as any,
args
),
reply = await this.sendCommand(
firstKey,
command.IS_READ_ONLY,
args,
this.commandOptions,
command.POLICIES
);
return transformReply ?
transformReply(reply, args.preserve) :
reply;
};
}
private static _createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: NamespaceProxyCluster) {
const args = command.transformArguments.apply(undefined, arguments as any),
firstKey = RedisCluster._extractFirstKey(
command,
arguments as any,
args
),
reply = await this.self.sendCommand(
firstKey,
command.IS_READ_ONLY,
args,
this.self.commandOptions,
command.POLICIES
);
return transformReply ?
transformReply(reply, args.preserve) :
reply;
};
}
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
return async function (this: NamespaceProxyCluster) {
const fnArgs = fn.transformArguments.apply(undefined, arguments as any),
args = prefix.concat(fnArgs),
firstKey = RedisCluster._extractFirstKey(
fn,
arguments as any,
args
),
reply = await this.self.sendCommand(
firstKey,
fn.IS_READ_ONLY,
args,
this.self.commandOptions,
fn.POLICIES
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
};
}
private static _createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp);
return async function (this: ProxyCluster) {
const scriptArgs = script.transformArguments.apply(undefined, arguments as any),
args = prefix.concat(scriptArgs),
firstKey = RedisCluster._extractFirstKey(
script,
arguments as any,
args
),
reply = await this.sendCommand(
firstKey,
script.IS_READ_ONLY,
args,
this.commandOptions,
script.POLICIES
);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
};
}
static factory<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2
>(config?: CommanderConfig<M, F, S, RESP>) {
const Cluster = attachConfig({
BaseClass: RedisCluster,
commands: COMMANDS,
createCommand: RedisCluster._createCommand,
createFunctionCommand: RedisCluster._createFunctionCommand,
createModuleCommand: RedisCluster._createModuleCommand,
createScriptCommand: RedisCluster._createScriptCommand,
config
});
// Client.prototype.Multi = RedisClientMultiCommand.extend(config);
return (options?: Omit<RedisClusterOptions, keyof Exclude<typeof config, undefined>>) => {
// returning a proxy of the client to prevent the namespaces.self to leak between proxies
// namespaces will be bootstraped on first access per proxy
return Object.create(new Cluster(options)) as RedisClusterType<M, F, S, RESP>;
};
}
static create<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2
>(options?: RedisClusterOptions<M, F, S, RESP>) {
return RedisCluster.factory(options)(options);
}
private readonly _options: RedisClusterOptions<M, F, S, RESP>;
private readonly _slots: RedisClusterSlots<M, F, S, RESP>;
/**
* An array of the cluster slots, each slot contain its `master` and `replicas`.
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica).
*/
get slots() {
return this._slots.slots;
}
/**
* An array of cluster shards, each shard contain its `master` and `replicas`.
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica).
*/
get shards() {
return this._slots.shards;
}
/**
* An array of the cluster masters.
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node.
*/
get masters() {
return this._slots.masters;
}
/**
* An array of the cluster replicas.
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific replica node.
*/
get replicas() {
return this._slots.replicas;
}
/**
* A map form a node address (`<host>:<port>`) to its shard, each shard contain its `master` and `replicas`.
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica).
*/
get nodeByAddress() {
return this._slots.nodeByAddress;
}
/**
* The current pub/sub node.
*/
get pubSubNode() {
return this._slots.pubSubNode;
}
// readonly #Multi: InstantiableRedisClusterMultiCommandType<M, F, S>;
get isOpen() {
return this._slots.isOpen;
}
constructor(options: RedisClusterOptions<M, F, S, RESP>) {
super();
this._options = options;
this._slots = new RedisClusterSlots(options, this.emit.bind(this));
// this.#Multi = RedisClusterMultiCommand.extend(options);
}
duplicate(overrides?: Partial<RedisClusterOptions<M, F, S>>): RedisClusterType<M, F, S> {
return new (Object.getPrototypeOf(this).constructor)({
...this._options,
...overrides
});
}
connect() {
return this._slots.connect();
}
withCommandOptions<T extends ClusterCommandOptions>(options: T) {
const proxy = Object.create(this);
proxy.commandOptions = options;
return proxy as RedisClusterType<
M,
F,
S,
RESP,
T['flags'] extends Flags ? T['flags'] : {},
T['policies'] extends CommandPolicies ? T['policies'] : {}
>;
}
private _commandOptionsProxy<
K extends keyof ClusterCommandOptions,
V extends ClusterCommandOptions[K]
>(
key: K,
value: V
) {
const proxy = Object.create(this);
proxy.commandOptions = Object.create((this as ProxyCluster).commandOptions ?? null);
proxy.commandOptions[key] = value;
return proxy as RedisClusterType<
M,
F,
S,
RESP,
K extends 'flags' ? V extends Flags ? V : {} : FLAGS,
K extends 'policies' ? V extends CommandPolicies ? V : {} : POLICIES
>;
}
/**
* Override the `flags` command option
*/
withFlags<FLAGS extends Flags>(flags: FLAGS) {
return this._commandOptionsProxy('flags', flags);
}
/**
* Override the `policies` command option
* TODO
*/
withPolicies<POLICIES extends CommandPolicies> (policies: POLICIES) {
return this._commandOptionsProxy('policies', policies);
}
async sendCommand<T = ReplyUnion>(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined,
args: CommandArguments,
options?: ClusterCommandOptions,
deafultPolicies?: CommandPolicies
): Promise<T> {
// const requestPolicy = options?.policies?.request ?? deafultPolicies?.request,
// responsePolicy = options?.policies?.response ?? deafultPolicies?.response;
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
let client = await this._slots.getClient(firstKey, isReadonly);
for (let i = 0; ; i++) {
try {
return await client.sendCommand<T>(args, options);
} catch (err) {
// TODO: error class
if (++i > maxCommandRedirections || !(err instanceof Error)) {
throw err;
}
if (err.message.startsWith('ASK')) {
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
let redirectTo = await this._slots.getMasterByAddress(address);
if (!redirectTo) {
await this._slots.rediscover(client);
redirectTo = await this._slots.getMasterByAddress(address);
}
if (!redirectTo) {
throw new Error(`Cannot find node ${address}`);
}
await redirectTo.asking();
client = redirectTo;
continue;
} else if (err.message.startsWith('MOVED')) {
await this._slots.rediscover(client);
client = await this._slots.getClient(firstKey, isReadonly);
continue;
}
throw err;
}
}
}
// MULTI(routing?: RedisCommandArgument): RedisClusterMultiCommandType<M, F, S> {
// return new this.#Multi(
// (commands: Array<RedisMultiQueuedCommand>, firstKey?: RedisCommandArgument, chainId?: symbol) => {
// return this.#execute(
// firstKey,
// false,
// client => client.multiExecutor(commands, undefined, chainId)
// );
// },
// routing
// );
// }
// multi = this.MULTI;
async SUBSCRIBE<T extends boolean = false>(
channels: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return (await this._slots.getPubSubClient())
.SUBSCRIBE(channels, listener, bufferMode);
}
subscribe = this.SUBSCRIBE;
async UNSUBSCRIBE<T extends boolean = false>(
channels?: string | Array<string>,
listener?: PubSubListener<boolean>,
bufferMode?: T
) {
return this._slots.executeUnsubscribeCommand(client =>
client.UNSUBSCRIBE(channels, listener, bufferMode)
);
}
unsubscribe = this.UNSUBSCRIBE;
async PSUBSCRIBE<T extends boolean = false>(
patterns: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return (await this._slots.getPubSubClient())
.PSUBSCRIBE(patterns, listener, bufferMode);
}
pSubscribe = this.PSUBSCRIBE;
async PUNSUBSCRIBE<T extends boolean = false>(
patterns?: string | Array<string>,
listener?: PubSubListener<T>,
bufferMode?: T
) {
return this._slots.executeUnsubscribeCommand(client =>
client.PUNSUBSCRIBE(patterns, listener, bufferMode)
);
}
pUnsubscribe = this.PUNSUBSCRIBE;
async SSUBSCRIBE<T extends boolean = false>(
channels: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16,
firstChannel = Array.isArray(channels) ? channels[0] : channels;
let client = await this._slots.getShardedPubSubClient(firstChannel);
for (let i = 0; ; i++) {
try {
return await client.SSUBSCRIBE(channels, listener, bufferMode);
} catch (err) {
if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) {
throw err;
}
if (err.message.startsWith('MOVED')) {
await this._slots.rediscover(client);
client = await this._slots.getShardedPubSubClient(firstChannel);
continue;
}
throw err;
}
}
}
sSubscribe = this.SSUBSCRIBE;
SUNSUBSCRIBE<T extends boolean = false>(
channels: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return this._slots.executeShardedUnsubscribeCommand(
Array.isArray(channels) ? channels[0] : channels,
client => client.SUNSUBSCRIBE(channels, listener, bufferMode)
);
}
sUnsubscribe = this.SUNSUBSCRIBE;
// quit(): Promise<void> {
// return this.#slots.quit();
// }
disconnect(): Promise<void> {
return this._slots.disconnect();
}
nodeClient(node: ShardNode<M, F, S, RESP>) {
return this._slots.nodeClient(node);
}
/**
* Returns a random node from the cluster.
* Userful for running "forward" commands (like PUBLISH) on a random node.
*/
getRandomNode() {
return this._slots.getRandomNode();
}
/**
* Get a random node from a slot.
* Useful for running readonly commands on a slot.
*/
getSlotRandomNode(slot: number) {
return this._slots.getSlotRandomNode(slot);
}
/**
* @deprecated use `.masters` instead
* TODO
*/
getMasters() {
return this.masters;
}
/**
* @deprecated use `.slots[<SLOT>]` instead
* TODO
*/
getSlotMaster(slot: number) {
return this.slots[slot].master;
}
}