You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-03 04:01:40 +03:00
691 lines
21 KiB
TypeScript
691 lines
21 KiB
TypeScript
import { RedisClientOptions, RedisClientType } from '../client';
|
|
import { CommandOptions } from '../client/commands-queue';
|
|
import { Command, CommandArguments, CommanderConfig, CommandSignature, /*CommandPolicies, CommandWithPoliciesSignature,*/ TypeMapping, RedisArgument, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions } from '../RESP/types';
|
|
import COMMANDS from '../commands';
|
|
import { EventEmitter } from 'node:events';
|
|
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
|
|
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
|
|
import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command';
|
|
import { PubSubListener } from '../client/pub-sub';
|
|
import { ErrorReply } from '../errors';
|
|
import { RedisTcpSocketOptions } from '../client/socket';
|
|
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
|
|
import { BasicCommandParser } from '../client/parser';
|
|
import { ASKING_CMD } from '../commands/ASKING';
|
|
import SingleEntryCache from '../single-entry-cache'
|
|
interface ClusterCommander<
|
|
M extends RedisModules,
|
|
F extends RedisFunctions,
|
|
S extends RedisScripts,
|
|
RESP extends RespVersions,
|
|
TYPE_MAPPING extends TypeMapping,
|
|
// POLICIES extends CommandPolicies
|
|
> extends CommanderConfig<M, F, S, RESP> {
|
|
commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>;
|
|
}
|
|
|
|
export type RedisClusterClientOptions = Omit<
|
|
RedisClientOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisTcpSocketOptions>,
|
|
keyof ClusterCommander<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping/*, CommandPolicies*/>
|
|
>;
|
|
|
|
export interface RedisClusterOptions<
|
|
M extends RedisModules = RedisModules,
|
|
F extends RedisFunctions = RedisFunctions,
|
|
S extends RedisScripts = RedisScripts,
|
|
RESP extends RespVersions = RespVersions,
|
|
TYPE_MAPPING extends TypeMapping = TypeMapping,
|
|
// POLICIES extends CommandPolicies = CommandPolicies
|
|
> extends ClusterCommander<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/> {
|
|
/**
|
|
* Should contain details for some of the cluster nodes that the client will use to discover
|
|
* the "cluster topology". We recommend including details for at least 3 nodes here.
|
|
*/
|
|
rootNodes: Array<RedisClusterClientOptions>;
|
|
/**
|
|
* Default values used for every client in the cluster. Use this to specify global values,
|
|
* for example: ACL credentials, timeouts, TLS configuration etc.
|
|
*/
|
|
defaults?: Partial<RedisClusterClientOptions>;
|
|
/**
|
|
* When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes.
|
|
* Useful for short-term or PubSub-only connections.
|
|
*/
|
|
minimizeConnections?: boolean;
|
|
/**
|
|
* When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes.
|
|
*/
|
|
// TODO: replicas only mode?
|
|
useReplicas?: boolean;
|
|
/**
|
|
* The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors.
|
|
*/
|
|
maxCommandRedirections?: number;
|
|
/**
|
|
* Mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to
|
|
* Useful when the cluster is running on another network
|
|
*/
|
|
nodeAddressMap?: NodeAddressMap;
|
|
/**
|
|
* Client Side Caching configuration for the pool.
|
|
*
|
|
* Enables Redis Servers and Clients to work together to cache results from commands
|
|
* sent to a server. The server will notify the client when cached results are no longer valid.
|
|
* In pooled mode, the cache is shared across all clients in the pool.
|
|
*
|
|
* Note: Client Side Caching is only supported with RESP3.
|
|
*
|
|
* @example Anonymous cache configuration
|
|
* ```
|
|
* const client = createCluster({
|
|
* clientSideCache: {
|
|
* ttl: 0,
|
|
* maxEntries: 0,
|
|
* evictPolicy: "LRU"
|
|
* },
|
|
* minimum: 5
|
|
* });
|
|
* ```
|
|
*
|
|
* @example Using a controllable cache
|
|
* ```
|
|
* const cache = new BasicPooledClientSideCache({
|
|
* ttl: 0,
|
|
* maxEntries: 0,
|
|
* evictPolicy: "LRU"
|
|
* });
|
|
* const client = createCluster({
|
|
* clientSideCache: cache,
|
|
* minimum: 5
|
|
* });
|
|
* ```
|
|
*/
|
|
clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig;
|
|
}
|
|
|
|
// remove once request & response policies are ready
|
|
type ClusterCommand<
|
|
NAME extends PropertyKey,
|
|
COMMAND extends Command
|
|
> = COMMAND['NOT_KEYED_COMMAND'] extends true ? (
|
|
COMMAND['IS_FORWARD_COMMAND'] extends true ? NAME : never
|
|
) : NAME;
|
|
|
|
// CommandWithPoliciesSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING, POLICIES>
|
|
type WithCommands<
|
|
RESP extends RespVersions,
|
|
TYPE_MAPPING extends TypeMapping
|
|
> = {
|
|
[P in keyof typeof COMMANDS as ClusterCommand<P, (typeof COMMANDS)[P]>]: CommandSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING>;
|
|
};
|
|
|
|
type WithModules<
|
|
M extends RedisModules,
|
|
RESP extends RespVersions,
|
|
TYPE_MAPPING extends TypeMapping
|
|
> = {
|
|
[P in keyof M]: {
|
|
[C in keyof M[P] as ClusterCommand<C, M[P][C]>]: CommandSignature<M[P][C], RESP, TYPE_MAPPING>;
|
|
};
|
|
};
|
|
|
|
type WithFunctions<
|
|
F extends RedisFunctions,
|
|
RESP extends RespVersions,
|
|
TYPE_MAPPING extends TypeMapping
|
|
> = {
|
|
[L in keyof F]: {
|
|
[C in keyof F[L] as ClusterCommand<C, F[L][C]>]: CommandSignature<F[L][C], RESP, TYPE_MAPPING>;
|
|
};
|
|
};
|
|
|
|
type WithScripts<
|
|
S extends RedisScripts,
|
|
RESP extends RespVersions,
|
|
TYPE_MAPPING extends TypeMapping
|
|
> = {
|
|
[P in keyof S as ClusterCommand<P, S[P]>]: CommandSignature<S[P], RESP, TYPE_MAPPING>;
|
|
};
|
|
|
|
export type RedisClusterType<
|
|
M extends RedisModules = {},
|
|
F extends RedisFunctions = {},
|
|
S extends RedisScripts = {},
|
|
RESP extends RespVersions = 2,
|
|
TYPE_MAPPING extends TypeMapping = {},
|
|
// POLICIES extends CommandPolicies = {}
|
|
> = (
|
|
RedisCluster<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/> &
|
|
WithCommands<RESP, TYPE_MAPPING> &
|
|
WithModules<M, RESP, TYPE_MAPPING> &
|
|
WithFunctions<F, RESP, TYPE_MAPPING> &
|
|
WithScripts<S, RESP, TYPE_MAPPING>
|
|
);
|
|
|
|
export interface ClusterCommandOptions<
|
|
TYPE_MAPPING extends TypeMapping = TypeMapping
|
|
// POLICIES extends CommandPolicies = CommandPolicies
|
|
> extends CommandOptions<TYPE_MAPPING> {
|
|
// policies?: POLICIES;
|
|
}
|
|
|
|
type ProxyCluster = RedisCluster<any, any, any, any, any/*, any*/>;
|
|
|
|
type NamespaceProxyCluster = { _self: ProxyCluster };
|
|
|
|
export default class RedisCluster<
|
|
M extends RedisModules,
|
|
F extends RedisFunctions,
|
|
S extends RedisScripts,
|
|
RESP extends RespVersions,
|
|
TYPE_MAPPING extends TypeMapping,
|
|
// POLICIES extends CommandPolicies
|
|
> extends EventEmitter {
|
|
static #createCommand(command: Command, resp: RespVersions) {
|
|
const transformReply = getTransformReply(command, resp);
|
|
|
|
return async function (this: ProxyCluster, ...args: Array<unknown>) {
|
|
const parser = new BasicCommandParser();
|
|
command.parseCommand(parser, ...args);
|
|
|
|
return this._self._execute(
|
|
parser.firstKey,
|
|
command.IS_READ_ONLY,
|
|
this._commandOptions,
|
|
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
|
|
);
|
|
};
|
|
}
|
|
|
|
static #createModuleCommand(command: Command, resp: RespVersions) {
|
|
const transformReply = getTransformReply(command, resp);
|
|
|
|
return async function (this: NamespaceProxyCluster, ...args: Array<unknown>) {
|
|
const parser = new BasicCommandParser();
|
|
command.parseCommand(parser, ...args);
|
|
|
|
return this._self._execute(
|
|
parser.firstKey,
|
|
command.IS_READ_ONLY,
|
|
this._self._commandOptions,
|
|
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
|
|
);
|
|
};
|
|
}
|
|
|
|
static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
|
|
const prefix = functionArgumentsPrefix(name, fn);
|
|
const transformReply = getTransformReply(fn, resp);
|
|
|
|
return async function (this: NamespaceProxyCluster, ...args: Array<unknown>) {
|
|
const parser = new BasicCommandParser();
|
|
parser.push(...prefix);
|
|
fn.parseCommand(parser, ...args);
|
|
|
|
return this._self._execute(
|
|
parser.firstKey,
|
|
fn.IS_READ_ONLY,
|
|
this._self._commandOptions,
|
|
(client, opts) => client._executeCommand(fn, parser, opts, transformReply)
|
|
);
|
|
};
|
|
}
|
|
|
|
static #createScriptCommand(script: RedisScript, resp: RespVersions) {
|
|
const prefix = scriptArgumentsPrefix(script);
|
|
const transformReply = getTransformReply(script, resp);
|
|
|
|
return async function (this: ProxyCluster, ...args: Array<unknown>) {
|
|
const parser = new BasicCommandParser();
|
|
parser.push(...prefix);
|
|
script.parseCommand(parser, ...args);
|
|
|
|
return this._self._execute(
|
|
parser.firstKey,
|
|
script.IS_READ_ONLY,
|
|
this._commandOptions,
|
|
(client, opts) => client._executeScript(script, parser, opts, transformReply)
|
|
);
|
|
};
|
|
}
|
|
|
|
static #SingleEntryCache = new SingleEntryCache<any, any>();
|
|
|
|
static factory<
|
|
M extends RedisModules = {},
|
|
F extends RedisFunctions = {},
|
|
S extends RedisScripts = {},
|
|
RESP extends RespVersions = 2,
|
|
TYPE_MAPPING extends TypeMapping = {},
|
|
// POLICIES extends CommandPolicies = {}
|
|
>(config?: ClusterCommander<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>) {
|
|
|
|
let Cluster = RedisCluster.#SingleEntryCache.get(config);
|
|
if (!Cluster) {
|
|
Cluster = attachConfig({
|
|
BaseClass: RedisCluster,
|
|
commands: COMMANDS,
|
|
createCommand: RedisCluster.#createCommand,
|
|
createModuleCommand: RedisCluster.#createModuleCommand,
|
|
createFunctionCommand: RedisCluster.#createFunctionCommand,
|
|
createScriptCommand: RedisCluster.#createScriptCommand,
|
|
config
|
|
});
|
|
|
|
Cluster.prototype.Multi = RedisClusterMultiCommand.extend(config);
|
|
RedisCluster.#SingleEntryCache.set(config, Cluster);
|
|
}
|
|
|
|
return (options?: Omit<RedisClusterOptions, keyof Exclude<typeof config, undefined>>) => {
|
|
// returning a "proxy" to prevent the namespaces._self to leak between "proxies"
|
|
return Object.create(new Cluster(options)) as RedisClusterType<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>;
|
|
};
|
|
}
|
|
|
|
static create<
|
|
M extends RedisModules = {},
|
|
F extends RedisFunctions = {},
|
|
S extends RedisScripts = {},
|
|
RESP extends RespVersions = 2,
|
|
TYPE_MAPPING extends TypeMapping = {},
|
|
// POLICIES extends CommandPolicies = {}
|
|
>(options?: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>) {
|
|
return RedisCluster.factory(options)(options);
|
|
}
|
|
|
|
readonly _options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>;
|
|
|
|
readonly _slots: RedisClusterSlots<M, F, S, RESP, TYPE_MAPPING>;
|
|
|
|
private _self = this;
|
|
private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>;
|
|
|
|
/**
|
|
* An array of the cluster slots, each slot contain its `master` and `replicas`.
|
|
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica).
|
|
*/
|
|
get slots() {
|
|
return this._self._slots.slots;
|
|
}
|
|
|
|
get clientSideCache() {
|
|
return this._self._slots.clientSideCache;
|
|
}
|
|
|
|
/**
|
|
* An array of the cluster masters.
|
|
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node.
|
|
*/
|
|
get masters() {
|
|
return this._self._slots.masters;
|
|
}
|
|
|
|
/**
|
|
* An array of the cluster replicas.
|
|
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific replica node.
|
|
*/
|
|
get replicas() {
|
|
return this._self._slots.replicas;
|
|
}
|
|
|
|
/**
|
|
* A map form a node address (`<host>:<port>`) to its shard, each shard contain its `master` and `replicas`.
|
|
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific node (master or replica).
|
|
*/
|
|
get nodeByAddress() {
|
|
return this._self._slots.nodeByAddress;
|
|
}
|
|
|
|
/**
|
|
* The current pub/sub node.
|
|
*/
|
|
get pubSubNode() {
|
|
return this._self._slots.pubSubNode;
|
|
}
|
|
|
|
get isOpen() {
|
|
return this._self._slots.isOpen;
|
|
}
|
|
|
|
constructor(options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING/*, POLICIES*/>) {
|
|
super();
|
|
|
|
this._options = options;
|
|
this._slots = new RedisClusterSlots(options, this.emit.bind(this));
|
|
|
|
if (options?.commandOptions) {
|
|
this._commandOptions = options.commandOptions;
|
|
}
|
|
}
|
|
|
|
duplicate<
|
|
_M extends RedisModules = M,
|
|
_F extends RedisFunctions = F,
|
|
_S extends RedisScripts = S,
|
|
_RESP extends RespVersions = RESP,
|
|
_TYPE_MAPPING extends TypeMapping = TYPE_MAPPING
|
|
>(overrides?: Partial<RedisClusterOptions<_M, _F, _S, _RESP, _TYPE_MAPPING>>) {
|
|
return new (Object.getPrototypeOf(this).constructor)({
|
|
...this._self._options,
|
|
commandOptions: this._commandOptions,
|
|
...overrides
|
|
}) as RedisClusterType<_M, _F, _S, _RESP, _TYPE_MAPPING>;
|
|
}
|
|
|
|
async connect() {
|
|
await this._self._slots.connect();
|
|
return this as unknown as RedisClusterType<M, F, S, RESP, TYPE_MAPPING>;
|
|
}
|
|
|
|
withCommandOptions<
|
|
OPTIONS extends ClusterCommandOptions<TYPE_MAPPING/*, CommandPolicies*/>,
|
|
TYPE_MAPPING extends TypeMapping,
|
|
// POLICIES extends CommandPolicies
|
|
>(options: OPTIONS) {
|
|
const proxy = Object.create(this);
|
|
proxy._commandOptions = options;
|
|
return proxy as RedisClusterType<
|
|
M,
|
|
F,
|
|
S,
|
|
RESP,
|
|
TYPE_MAPPING extends TypeMapping ? TYPE_MAPPING : {}
|
|
// POLICIES extends CommandPolicies ? POLICIES : {}
|
|
>;
|
|
}
|
|
|
|
private _commandOptionsProxy<
|
|
K extends keyof ClusterCommandOptions,
|
|
V extends ClusterCommandOptions[K]
|
|
>(
|
|
key: K,
|
|
value: V
|
|
) {
|
|
const proxy = Object.create(this);
|
|
proxy._commandOptions = Object.create(this._commandOptions ?? null);
|
|
proxy._commandOptions[key] = value;
|
|
return proxy as RedisClusterType<
|
|
M,
|
|
F,
|
|
S,
|
|
RESP,
|
|
K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING
|
|
// K extends 'policies' ? V extends CommandPolicies ? V : {} : POLICIES
|
|
>;
|
|
}
|
|
|
|
/**
|
|
* Override the `typeMapping` command option
|
|
*/
|
|
withTypeMapping<TYPE_MAPPING extends TypeMapping>(typeMapping: TYPE_MAPPING) {
|
|
return this._commandOptionsProxy('typeMapping', typeMapping);
|
|
}
|
|
|
|
// /**
|
|
// * Override the `policies` command option
|
|
// * TODO
|
|
// */
|
|
// withPolicies<POLICIES extends CommandPolicies> (policies: POLICIES) {
|
|
// return this._commandOptionsProxy('policies', policies);
|
|
// }
|
|
|
|
_handleAsk<T>(
|
|
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, opts?: ClusterCommandOptions) => Promise<T>
|
|
) {
|
|
return async (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, options?: ClusterCommandOptions) => {
|
|
const chainId = Symbol("asking chain");
|
|
const opts = options ? {...options} : {};
|
|
opts.chainId = chainId;
|
|
|
|
|
|
|
|
const ret = await Promise.all(
|
|
[
|
|
client.sendCommand([ASKING_CMD], {chainId: chainId}),
|
|
fn(client, opts)
|
|
]
|
|
);
|
|
|
|
return ret[1];
|
|
};
|
|
}
|
|
|
|
async _execute<T>(
|
|
firstKey: RedisArgument | undefined,
|
|
isReadonly: boolean | undefined,
|
|
options: ClusterCommandOptions | undefined,
|
|
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, opts?: ClusterCommandOptions) => Promise<T>
|
|
): Promise<T> {
|
|
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
|
|
let client = await this._slots.getClient(firstKey, isReadonly);
|
|
let i = 0;
|
|
|
|
let myFn = fn;
|
|
|
|
while (true) {
|
|
try {
|
|
return await myFn(client, options);
|
|
} catch (err) {
|
|
myFn = fn;
|
|
|
|
// TODO: error class
|
|
if (++i > maxCommandRedirections || !(err instanceof Error)) {
|
|
throw err;
|
|
}
|
|
|
|
if (err.message.startsWith('ASK')) {
|
|
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
|
|
let redirectTo = await this._slots.getMasterByAddress(address);
|
|
if (!redirectTo) {
|
|
await this._slots.rediscover(client);
|
|
redirectTo = await this._slots.getMasterByAddress(address);
|
|
}
|
|
|
|
if (!redirectTo) {
|
|
throw new Error(`Cannot find node ${address}`);
|
|
}
|
|
|
|
client = redirectTo;
|
|
myFn = this._handleAsk(fn);
|
|
continue;
|
|
}
|
|
|
|
if (err.message.startsWith('MOVED')) {
|
|
await this._slots.rediscover(client);
|
|
client = await this._slots.getClient(firstKey, isReadonly);
|
|
continue;
|
|
}
|
|
|
|
throw err;
|
|
}
|
|
}
|
|
}
|
|
|
|
async sendCommand<T = ReplyUnion>(
|
|
firstKey: RedisArgument | undefined,
|
|
isReadonly: boolean | undefined,
|
|
args: CommandArguments,
|
|
options?: ClusterCommandOptions,
|
|
// defaultPolicies?: CommandPolicies
|
|
): Promise<T> {
|
|
|
|
// Merge global options with local options
|
|
const opts = {
|
|
...this._self._commandOptions,
|
|
...options
|
|
}
|
|
return this._self._execute(
|
|
firstKey,
|
|
isReadonly,
|
|
opts,
|
|
(client, opts) => client.sendCommand(args, opts)
|
|
);
|
|
}
|
|
|
|
MULTI(routing?: RedisArgument) {
|
|
type Multi = new (...args: ConstructorParameters<typeof RedisClusterMultiCommand>) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;
|
|
return new ((this as any).Multi as Multi)(
|
|
async (firstKey, isReadonly, commands) => {
|
|
const client = await this._self._slots.getClient(firstKey, isReadonly);
|
|
return client._executeMulti(commands);
|
|
},
|
|
async (firstKey, isReadonly, commands) => {
|
|
const client = await this._self._slots.getClient(firstKey, isReadonly);
|
|
return client._executePipeline(commands);
|
|
},
|
|
routing,
|
|
this._commandOptions?.typeMapping
|
|
);
|
|
}
|
|
|
|
multi = this.MULTI;
|
|
|
|
async SUBSCRIBE<T extends boolean = false>(
|
|
channels: string | Array<string>,
|
|
listener: PubSubListener<T>,
|
|
bufferMode?: T
|
|
) {
|
|
return (await this._self._slots.getPubSubClient())
|
|
.SUBSCRIBE(channels, listener, bufferMode);
|
|
}
|
|
|
|
subscribe = this.SUBSCRIBE;
|
|
|
|
async UNSUBSCRIBE<T extends boolean = false>(
|
|
channels?: string | Array<string>,
|
|
listener?: PubSubListener<boolean>,
|
|
bufferMode?: T
|
|
) {
|
|
return this._self._slots.executeUnsubscribeCommand(client =>
|
|
client.UNSUBSCRIBE(channels, listener, bufferMode)
|
|
);
|
|
}
|
|
|
|
unsubscribe = this.UNSUBSCRIBE;
|
|
|
|
async PSUBSCRIBE<T extends boolean = false>(
|
|
patterns: string | Array<string>,
|
|
listener: PubSubListener<T>,
|
|
bufferMode?: T
|
|
) {
|
|
return (await this._self._slots.getPubSubClient())
|
|
.PSUBSCRIBE(patterns, listener, bufferMode);
|
|
}
|
|
|
|
pSubscribe = this.PSUBSCRIBE;
|
|
|
|
async PUNSUBSCRIBE<T extends boolean = false>(
|
|
patterns?: string | Array<string>,
|
|
listener?: PubSubListener<T>,
|
|
bufferMode?: T
|
|
) {
|
|
return this._self._slots.executeUnsubscribeCommand(client =>
|
|
client.PUNSUBSCRIBE(patterns, listener, bufferMode)
|
|
);
|
|
}
|
|
|
|
pUnsubscribe = this.PUNSUBSCRIBE;
|
|
|
|
async SSUBSCRIBE<T extends boolean = false>(
|
|
channels: string | Array<string>,
|
|
listener: PubSubListener<T>,
|
|
bufferMode?: T
|
|
) {
|
|
const maxCommandRedirections = this._self._options.maxCommandRedirections ?? 16,
|
|
firstChannel = Array.isArray(channels) ? channels[0] : channels;
|
|
let client = await this._self._slots.getShardedPubSubClient(firstChannel);
|
|
for (let i = 0; ; i++) {
|
|
try {
|
|
return await client.SSUBSCRIBE(channels, listener, bufferMode);
|
|
} catch (err) {
|
|
if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) {
|
|
throw err;
|
|
}
|
|
|
|
if (err.message.startsWith('MOVED')) {
|
|
await this._self._slots.rediscover(client);
|
|
client = await this._self._slots.getShardedPubSubClient(firstChannel);
|
|
continue;
|
|
}
|
|
|
|
throw err;
|
|
}
|
|
}
|
|
}
|
|
|
|
sSubscribe = this.SSUBSCRIBE;
|
|
|
|
SUNSUBSCRIBE<T extends boolean = false>(
|
|
channels: string | Array<string>,
|
|
listener?: PubSubListener<T>,
|
|
bufferMode?: T
|
|
) {
|
|
return this._self._slots.executeShardedUnsubscribeCommand(
|
|
Array.isArray(channels) ? channels[0] : channels,
|
|
client => client.SUNSUBSCRIBE(channels, listener, bufferMode)
|
|
);
|
|
}
|
|
|
|
sUnsubscribe = this.SUNSUBSCRIBE;
|
|
|
|
/**
|
|
* @deprecated Use `close` instead.
|
|
*/
|
|
quit() {
|
|
return this._self._slots.quit();
|
|
}
|
|
|
|
/**
|
|
* @deprecated Use `destroy` instead.
|
|
*/
|
|
disconnect() {
|
|
return this._self._slots.disconnect();
|
|
}
|
|
|
|
close() {
|
|
this._self._slots.clientSideCache?.onPoolClose();
|
|
return this._self._slots.close();
|
|
}
|
|
|
|
destroy() {
|
|
this._self._slots.clientSideCache?.onPoolClose();
|
|
return this._self._slots.destroy();
|
|
}
|
|
|
|
nodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>) {
|
|
return this._self._slots.nodeClient(node);
|
|
}
|
|
|
|
/**
|
|
* Returns a random node from the cluster.
|
|
* Userful for running "forward" commands (like PUBLISH) on a random node.
|
|
*/
|
|
getRandomNode() {
|
|
return this._self._slots.getRandomNode();
|
|
}
|
|
|
|
/**
|
|
* Get a random node from a slot.
|
|
* Useful for running readonly commands on a slot.
|
|
*/
|
|
getSlotRandomNode(slot: number) {
|
|
return this._self._slots.getSlotRandomNode(slot);
|
|
}
|
|
|
|
/**
|
|
* @deprecated use `.masters` instead
|
|
* TODO
|
|
*/
|
|
getMasters() {
|
|
return this.masters;
|
|
}
|
|
|
|
/**
|
|
* @deprecated use `.slots[<SLOT>]` instead
|
|
* TODO
|
|
*/
|
|
getSlotMaster(slot: number) {
|
|
return this.slots[slot].master;
|
|
}
|
|
}
|