1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-03 04:01:40 +03:00
Files
2025-07-07 11:37:08 +03:00

1578 lines
47 KiB
TypeScript

import { EventEmitter } from 'node:events';
import { CommandArguments, RedisFunctions, RedisModules, RedisScripts, ReplyUnion, RespVersions, TypeMapping } from '../RESP/types';
import RedisClient, { RedisClientOptions, RedisClientType } from '../client';
import { CommandOptions } from '../client/commands-queue';
import { attachConfig } from '../commander';
import COMMANDS from '../commands';
import { ClientErrorEvent, NamespaceProxySentinel, NamespaceProxySentinelClient, ProxySentinel, ProxySentinelClient, RedisNode, RedisSentinelClientType, RedisSentinelEvent, RedisSentinelOptions, RedisSentinelType, SentinelCommander } from './types';
import { clientSocketToNode, createCommand, createFunctionCommand, createModuleCommand, createNodeList, createScriptCommand, parseNode } from './utils';
import { RedisMultiQueuedCommand } from '../multi-command';
import RedisSentinelMultiCommand, { RedisSentinelMultiCommandType } from './multi-commands';
import { PubSubListener } from '../client/pub-sub';
import { PubSubProxy } from './pub-sub-proxy';
import { setTimeout } from 'node:timers/promises';
import RedisSentinelModule from './module'
import { RedisVariadicArgument } from '../commands/generic-transformers';
import { WaitQueue } from './wait-queue';
import { TcpNetConnectOpts } from 'node:net';
import { RedisTcpSocketOptions } from '../client/socket';
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';
interface ClientInfo {
id: number;
}
export class RedisSentinelClient<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> {
#clientInfo: ClientInfo | undefined;
#internal: RedisSentinelInternal<M, F, S, RESP, TYPE_MAPPING>;
readonly _self: RedisSentinelClient<M, F, S, RESP, TYPE_MAPPING>;
/**
* Indicates if the client connection is open
*
* @returns `true` if the client connection is open, `false` otherwise
*/
get isOpen() {
return this._self.#internal.isOpen;
}
/**
* Indicates if the client connection is ready to accept commands
*
* @returns `true` if the client connection is ready, `false` otherwise
*/
get isReady() {
return this._self.#internal.isReady;
}
/**
* Gets the command options configured for this client
*
* @returns The command options for this client or `undefined` if none were set
*/
get commandOptions() {
return this._self.#commandOptions;
}
#commandOptions?: CommandOptions<TYPE_MAPPING>;
constructor(
internal: RedisSentinelInternal<M, F, S, RESP, TYPE_MAPPING>,
clientInfo: ClientInfo,
commandOptions?: CommandOptions<TYPE_MAPPING>
) {
this._self = this;
this.#internal = internal;
this.#clientInfo = clientInfo;
this.#commandOptions = commandOptions;
}
static factory<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = {}
>(config?: SentinelCommander<M, F, S, RESP, TYPE_MAPPING>) {
const SentinelClient = attachConfig({
BaseClass: RedisSentinelClient,
commands: COMMANDS,
createCommand: createCommand<ProxySentinelClient>,
createModuleCommand: createModuleCommand<NamespaceProxySentinelClient>,
createFunctionCommand: createFunctionCommand<NamespaceProxySentinelClient>,
createScriptCommand: createScriptCommand<ProxySentinelClient>,
config
});
SentinelClient.prototype.Multi = RedisSentinelMultiCommand.extend(config);
return (
internal: RedisSentinelInternal<M, F, S, RESP, TYPE_MAPPING>,
clientInfo: ClientInfo,
commandOptions?: CommandOptions<TYPE_MAPPING>
) => {
// returning a "proxy" to prevent the namespaces._self to leak between "proxies"
return Object.create(new SentinelClient(internal, clientInfo, commandOptions)) as RedisSentinelClientType<M, F, S, RESP, TYPE_MAPPING>;
};
}
static create<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = {}
>(
options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>,
internal: RedisSentinelInternal<M, F, S, RESP, TYPE_MAPPING>,
clientInfo: ClientInfo,
commandOptions?: CommandOptions<TYPE_MAPPING>,
) {
return RedisSentinelClient.factory(options)(internal, clientInfo, commandOptions);
}
withCommandOptions<
OPTIONS extends CommandOptions<TYPE_MAPPING>,
TYPE_MAPPING extends TypeMapping
>(options: OPTIONS) {
const proxy = Object.create(this);
proxy._commandOptions = options;
return proxy as RedisSentinelClientType<
M,
F,
S,
RESP,
TYPE_MAPPING extends TypeMapping ? TYPE_MAPPING : {}
>;
}
private _commandOptionsProxy<
K extends keyof CommandOptions,
V extends CommandOptions[K]
>(
key: K,
value: V
) {
const proxy = Object.create(this);
proxy._commandOptions = Object.create(this._self.#commandOptions ?? null);
proxy._commandOptions[key] = value;
return proxy as RedisSentinelClientType<
M,
F,
S,
RESP,
K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING
>;
}
/**
* Override the `typeMapping` command option
*/
withTypeMapping<TYPE_MAPPING extends TypeMapping>(typeMapping: TYPE_MAPPING) {
return this._commandOptionsProxy('typeMapping', typeMapping);
}
async _execute<T>(
isReadonly: boolean | undefined,
fn: (client: RedisClient<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => Promise<T>
): Promise<T> {
if (this._self.#clientInfo === undefined) {
throw new Error("Attempted execution on released RedisSentinelClient lease");
}
return await this._self.#internal.execute(fn, this._self.#clientInfo);
}
async sendCommand<T = ReplyUnion>(
isReadonly: boolean | undefined,
args: CommandArguments,
options?: CommandOptions,
): Promise<T> {
return this._execute(
isReadonly,
client => client.sendCommand(args, options)
);
}
/**
* @internal
*/
async _executePipeline(
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
return this._execute(
isReadonly,
client => client._executePipeline(commands)
);
}
/**f
* @internal
*/
async _executeMulti(
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
return this._execute(
isReadonly,
client => client._executeMulti(commands)
);
}
MULTI(): RedisSentinelMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {
return new (this as any).Multi(this);
}
multi = this.MULTI;
WATCH(key: RedisVariadicArgument) {
if (this._self.#clientInfo === undefined) {
throw new Error("Attempted execution on released RedisSentinelClient lease");
}
return this._execute(
false,
client => client.watch(key)
)
}
watch = this.WATCH;
UNWATCH() {
if (this._self.#clientInfo === undefined) {
throw new Error('Attempted execution on released RedisSentinelClient lease');
}
return this._execute(
false,
client => client.unwatch()
)
}
unwatch = this.UNWATCH;
/**
* Releases the client lease back to the pool
*
* After calling this method, the client instance should no longer be used as it
* will be returned to the client pool and may be given to other operations.
*
* @returns A promise that resolves when the client is ready to be reused, or undefined
* if the client was immediately ready
* @throws Error if the lease has already been released
*/
release() {
if (this._self.#clientInfo === undefined) {
throw new Error('RedisSentinelClient lease already released');
}
const result = this._self.#internal.releaseClientLease(this._self.#clientInfo);
this._self.#clientInfo = undefined;
return result;
}
}
export default class RedisSentinel<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> extends EventEmitter {
readonly _self: RedisSentinel<M, F, S, RESP, TYPE_MAPPING>;
#internal: RedisSentinelInternal<M, F, S, RESP, TYPE_MAPPING>;
#options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>;
/**
* Indicates if the sentinel connection is open
*
* @returns `true` if the sentinel connection is open, `false` otherwise
*/
get isOpen() {
return this._self.#internal.isOpen;
}
/**
* Indicates if the sentinel connection is ready to accept commands
*
* @returns `true` if the sentinel connection is ready, `false` otherwise
*/
get isReady() {
return this._self.#internal.isReady;
}
get commandOptions() {
return this._self.#commandOptions;
}
#commandOptions?: CommandOptions<TYPE_MAPPING>;
#trace: (msg: string) => unknown = () => { };
#reservedClientInfo?: ClientInfo;
#masterClientCount = 0;
#masterClientInfo?: ClientInfo;
get clientSideCache() {
return this._self.#internal.clientSideCache;
}
constructor(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
super();
this._self = this;
this.#options = options;
if (options.commandOptions) {
this.#commandOptions = options.commandOptions;
}
this.#internal = new RedisSentinelInternal<M, F, S, RESP, TYPE_MAPPING>(options);
this.#internal.on('error', err => this.emit('error', err));
/* pass through underling events */
/* TODO: perhaps make this a struct and one vent, instead of multiple events */
this.#internal.on('topology-change', (event: RedisSentinelEvent) => {
if (!this.emit('topology-change', event)) {
this._self.#trace(`RedisSentinel: re-emit for topology-change for ${event.type} event returned false`);
}
});
}
static factory<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = {}
>(config?: SentinelCommander<M, F, S, RESP, TYPE_MAPPING>) {
const Sentinel = attachConfig({
BaseClass: RedisSentinel,
commands: COMMANDS,
createCommand: createCommand<ProxySentinel>,
createModuleCommand: createModuleCommand<NamespaceProxySentinel>,
createFunctionCommand: createFunctionCommand<NamespaceProxySentinel>,
createScriptCommand: createScriptCommand<ProxySentinel>,
config
});
Sentinel.prototype.Multi = RedisSentinelMultiCommand.extend(config);
return (options: Omit<RedisSentinelOptions, keyof Exclude<typeof config, undefined>>) => {
// returning a "proxy" to prevent the namespaces.self to leak between "proxies"
return Object.create(new Sentinel(options)) as RedisSentinelType<M, F, S, RESP, TYPE_MAPPING>;
};
}
static create<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = {}
>(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
return RedisSentinel.factory(options)(options);
}
withCommandOptions<
OPTIONS extends CommandOptions<TYPE_MAPPING>,
TYPE_MAPPING extends TypeMapping,
>(options: OPTIONS) {
const proxy = Object.create(this);
proxy._commandOptions = options;
return proxy as RedisSentinelType<
M,
F,
S,
RESP,
TYPE_MAPPING extends TypeMapping ? TYPE_MAPPING : {}
>;
}
private _commandOptionsProxy<
K extends keyof CommandOptions,
V extends CommandOptions[K]
>(
key: K,
value: V
) {
const proxy = Object.create(this);
// Create new commandOptions object with the inherited properties
proxy._self.#commandOptions = {
...(this._self.#commandOptions || {}),
[key]: value
};
return proxy as RedisSentinelType<
M,
F,
S,
RESP,
K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING
>;
}
/**
* Override the `typeMapping` command option
*/
withTypeMapping<TYPE_MAPPING extends TypeMapping>(typeMapping: TYPE_MAPPING) {
return this._commandOptionsProxy('typeMapping', typeMapping);
}
async connect() {
await this._self.#internal.connect();
if (this._self.#options.reserveClient) {
this._self.#reservedClientInfo = await this._self.#internal.getClientLease();
}
return this as unknown as RedisSentinelType<M, F, S, RESP, TYPE_MAPPING>;
}
async _execute<T>(
isReadonly: boolean | undefined,
fn: (client: RedisClient<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => Promise<T>
): Promise<T> {
let clientInfo: ClientInfo | undefined;
if (!isReadonly || !this._self.#internal.useReplicas) {
if (this._self.#reservedClientInfo) {
clientInfo = this._self.#reservedClientInfo;
} else {
this._self.#masterClientInfo ??= await this._self.#internal.getClientLease();
clientInfo = this._self.#masterClientInfo;
this._self.#masterClientCount++;
}
}
try {
return await this._self.#internal.execute(fn, clientInfo);
} finally {
if (
clientInfo !== undefined &&
clientInfo === this._self.#masterClientInfo &&
--this._self.#masterClientCount === 0
) {
const promise = this._self.#internal.releaseClientLease(clientInfo);
this._self.#masterClientInfo = undefined;
if (promise) await promise;
}
}
}
async use<T>(fn: (sentinelClient: RedisSentinelClientType<M, F, S, RESP, TYPE_MAPPING>) => Promise<T>) {
const clientInfo = await this._self.#internal.getClientLease();
try {
return await fn(
RedisSentinelClient.create(this._self.#options, this._self.#internal, clientInfo, this._self.#commandOptions)
);
} finally {
const promise = this._self.#internal.releaseClientLease(clientInfo);
if (promise) await promise;
}
}
async sendCommand<T = ReplyUnion>(
isReadonly: boolean | undefined,
args: CommandArguments,
options?: CommandOptions,
): Promise<T> {
return this._execute(
isReadonly,
client => client.sendCommand(args, options)
);
}
/**
* @internal
*/
async _executePipeline(
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
return this._execute(
isReadonly,
client => client._executePipeline(commands)
);
}
/**f
* @internal
*/
async _executeMulti(
isReadonly: boolean | undefined,
commands: Array<RedisMultiQueuedCommand>
) {
return this._execute(
isReadonly,
client => client._executeMulti(commands)
);
}
MULTI(): RedisSentinelMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {
return new (this as any).Multi(this);
}
multi = this.MULTI;
async close() {
return this._self.#internal.close();
}
destroy() {
return this._self.#internal.destroy();
}
async SUBSCRIBE<T extends boolean = false>(
channels: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return this._self.#internal.subscribe(channels, listener, bufferMode);
}
subscribe = this.SUBSCRIBE;
async UNSUBSCRIBE<T extends boolean = false>(
channels?: string | Array<string>,
listener?: PubSubListener<boolean>,
bufferMode?: T
) {
return this._self.#internal.unsubscribe(channels, listener, bufferMode);
}
unsubscribe = this.UNSUBSCRIBE;
async PSUBSCRIBE<T extends boolean = false>(
patterns: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return this._self.#internal.pSubscribe(patterns, listener, bufferMode);
}
pSubscribe = this.PSUBSCRIBE;
async PUNSUBSCRIBE<T extends boolean = false>(
patterns?: string | Array<string>,
listener?: PubSubListener<T>,
bufferMode?: T
) {
return this._self.#internal.pUnsubscribe(patterns, listener, bufferMode);
}
pUnsubscribe = this.PUNSUBSCRIBE;
/**
* Acquires a master client lease for exclusive operations
*
* Used when multiple commands need to run on an exclusive client (for example, using `WATCH/MULTI/EXEC`).
* The returned client must be released after use with the `release()` method.
*
* @returns A promise that resolves to a Redis client connected to the master node
* @example
* ```javascript
* const clientLease = await sentinel.acquire();
*
* try {
* await clientLease.watch('key');
* const resp = await clientLease.multi()
* .get('key')
* .exec();
* } finally {
* clientLease.release();
* }
* ```
*/
async acquire(): Promise<RedisSentinelClientType<M, F, S, RESP, TYPE_MAPPING>> {
const clientInfo = await this._self.#internal.getClientLease();
return RedisSentinelClient.create(this._self.#options, this._self.#internal, clientInfo, this._self.#commandOptions);
}
getSentinelNode(): RedisNode | undefined {
return this._self.#internal.getSentinelNode();
}
getMasterNode(): RedisNode | undefined {
return this._self.#internal.getMasterNode();
}
getReplicaNodes(): Map<RedisNode, number> {
return this._self.#internal.getReplicaNodes();
}
setTracer(tracer?: Array<string>) {
if (tracer) {
this._self.#trace = (msg: string) => { tracer.push(msg) };
} else {
this._self.#trace = () => { };
}
this._self.#internal.setTracer(tracer);
}
}
class RedisSentinelInternal<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> extends EventEmitter {
#isOpen = false;
get isOpen() {
return this.#isOpen;
}
#isReady = false;
get isReady() {
return this.#isReady;
}
readonly #name: string;
readonly #nodeClientOptions: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
readonly #sentinelClientOptions: RedisClientOptions<typeof RedisSentinelModule, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisTcpSocketOptions>;
readonly #scanInterval: number;
readonly #passthroughClientErrorEvents: boolean;
readonly #RESP?: RespVersions;
#anotherReset = false;
#configEpoch: number = 0;
#sentinelRootNodes: Array<RedisNode>;
#sentinelClient?: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>;
#masterClients: Array<RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>> = [];
#masterClientQueue: WaitQueue<number>;
readonly #masterPoolSize: number;
#replicaClients: Array<RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>> = [];
#replicaClientsIdx: number = 0;
readonly #replicaPoolSize: number;
get useReplicas() {
return this.#replicaPoolSize > 0;
}
#connectPromise?: Promise<void>;
#maxCommandRediscovers: number;
readonly #pubSubProxy: PubSubProxy;
#scanTimer?: NodeJS.Timeout
#destroy = false;
#trace: (msg: string) => unknown = () => { };
#clientSideCache?: PooledClientSideCacheProvider;
get clientSideCache() {
return this.#clientSideCache;
}
#validateOptions(options?: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
if (options?.clientSideCache && options?.RESP !== 3) {
throw new Error('Client Side Caching is only supported with RESP3');
}
}
constructor(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
super();
this.#validateOptions(options);
this.#name = options.name;
this.#RESP = options.RESP;
this.#sentinelRootNodes = Array.from(options.sentinelRootNodes);
this.#maxCommandRediscovers = options.maxCommandRediscovers ?? 16;
this.#masterPoolSize = options.masterPoolSize ?? 1;
this.#replicaPoolSize = options.replicaPoolSize ?? 0;
this.#scanInterval = options.scanInterval ?? 0;
this.#passthroughClientErrorEvents = options.passthroughClientErrorEvents ?? false;
this.#nodeClientOptions = (options.nodeClientOptions ? {...options.nodeClientOptions} : {}) as RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
if (this.#nodeClientOptions.url !== undefined) {
throw new Error("invalid nodeClientOptions for Sentinel");
}
if (options.clientSideCache) {
if (options.clientSideCache instanceof PooledClientSideCacheProvider) {
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = options.clientSideCache;
} else {
const cscConfig = options.clientSideCache;
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig);
// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
}
}
this.#sentinelClientOptions = options.sentinelClientOptions ? Object.assign({} as RedisClientOptions<typeof RedisSentinelModule, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>, options.sentinelClientOptions) : {};
this.#sentinelClientOptions.modules = RedisSentinelModule;
if (this.#sentinelClientOptions.url !== undefined) {
throw new Error("invalid sentinelClientOptions for Sentinel");
}
this.#masterClientQueue = new WaitQueue();
for (let i = 0; i < this.#masterPoolSize; i++) {
this.#masterClientQueue.push(i);
}
/* persistent object for life of sentinel object */
this.#pubSubProxy = new PubSubProxy(
this.#nodeClientOptions,
err => this.emit('error', err)
);
}
#createClient(node: RedisNode, clientOptions: RedisClientOptions, reconnectStrategy?: undefined | false) {
return RedisClient.create({
//first take the globally set RESP
RESP: this.#RESP,
//then take the client options, which can in theory overwrite it
...clientOptions,
socket: {
...clientOptions.socket,
host: node.host,
port: node.port,
reconnectStrategy
}
});
}
/**
* Gets a client lease from the master client pool
*
* @returns A client info object or a promise that resolves to a client info object
* when a client becomes available
*/
getClientLease(): ClientInfo | Promise<ClientInfo> {
const id = this.#masterClientQueue.shift();
if (id !== undefined) {
return { id };
}
return this.#masterClientQueue.wait().then(id => ({ id }));
}
/**
* Releases a client lease back to the pool
*
* If the client was used for a transaction that might have left it in a dirty state,
* it will be reset before being returned to the pool.
*
* @param clientInfo The client info object representing the client to release
* @returns A promise that resolves when the client is ready to be reused, or undefined
* if the client was immediately ready or no longer exists
*/
releaseClientLease(clientInfo: ClientInfo) {
const client = this.#masterClients[clientInfo.id];
// client can be undefined if releasing in middle of a reconfigure
if (client !== undefined) {
const dirtyPromise = client.resetIfDirty();
if (dirtyPromise) {
return dirtyPromise
.then(() => this.#masterClientQueue.push(clientInfo.id));
}
}
this.#masterClientQueue.push(clientInfo.id);
}
async connect() {
if (this.#isOpen) {
throw new Error("already attempting to open")
}
try {
this.#isOpen = true;
this.#connectPromise = this.#connect();
await this.#connectPromise;
this.#isReady = true;
} finally {
this.#connectPromise = undefined;
if (this.#scanInterval > 0) {
this.#scanTimer = setInterval(this.#reset.bind(this), this.#scanInterval);
}
}
}
async #connect() {
let count = 0;
while (true) {
this.#trace("starting connect loop");
count+=1;
if (this.#destroy) {
this.#trace("in #connect and want to destroy")
return;
}
try {
this.#anotherReset = false;
await this.transform(this.analyze(await this.observe()));
if (this.#anotherReset) {
this.#trace("#connect: anotherReset is true, so continuing");
continue;
}
this.#trace("#connect: returning");
return;
} catch (e: any) {
this.#trace(`#connect: exception ${e.message}`);
if (!this.#isReady && count > this.#maxCommandRediscovers) {
throw e;
}
if (e.message !== 'no valid master node') {
console.log(e);
}
await setTimeout(1000);
} finally {
this.#trace("finished connect");
}
}
}
async execute<T>(
fn: (client: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => Promise<T>,
clientInfo?: ClientInfo
): Promise<T> {
let iter = 0;
while (true) {
if (this.#connectPromise !== undefined) {
await this.#connectPromise;
}
const client = this.#getClient(clientInfo);
if (!client.isReady) {
await this.#reset();
continue;
}
const sockOpts = client.options?.socket as TcpNetConnectOpts | undefined;
this.#trace("attemping to send command to " + sockOpts?.host + ":" + sockOpts?.port)
try {
/*
// force testing of READONLY errors
if (clientInfo !== undefined) {
if (Math.floor(Math.random() * 10) < 1) {
console.log("throwing READONLY error");
throw new Error("READONLY You can't write against a read only replica.");
}
}
*/
return await fn(client);
} catch (err) {
if (++iter > this.#maxCommandRediscovers || !(err instanceof Error)) {
throw err;
}
/*
rediscover and retry if doing a command against a "master"
a) READONLY error (topology has changed) but we haven't been notified yet via pubsub
b) client is "not ready" (disconnected), which means topology might have changed, but sentinel might not see it yet
*/
if (clientInfo !== undefined && (err.message.startsWith('READONLY') || !client.isReady)) {
await this.#reset();
continue;
}
throw err;
}
}
}
async #createPubSub(client: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) {
/* Whenever sentinels or slaves get added, or when slave configuration changes, reconfigure */
await client.pSubscribe(['switch-master', '[-+]sdown', '+slave', '+sentinel', '[-+]odown', '+slave-reconf-done'], (message, channel) => {
this.#handlePubSubControlChannel(channel, message);
}, true);
return client;
}
async #handlePubSubControlChannel(channel: Buffer, message: Buffer) {
this.#trace("pubsub control channel message on " + channel);
this.#reset();
}
// if clientInfo is defined, it corresponds to a master client in the #masterClients array, otherwise loop around replicaClients
#getClient(clientInfo?: ClientInfo): RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> {
if (clientInfo !== undefined) {
return this.#masterClients[clientInfo.id];
}
if (this.#replicaClientsIdx >= this.#replicaClients.length) {
this.#replicaClientsIdx = 0;
}
if (this.#replicaClients.length == 0) {
throw new Error("no replicas available for read");
}
return this.#replicaClients[this.#replicaClientsIdx++];
}
async #reset() {
/* closing / don't reset */
if (this.#isReady == false || this.#destroy == true) {
return;
}
// already in #connect()
if (this.#connectPromise !== undefined) {
this.#anotherReset = true;
return await this.#connectPromise;
}
try {
this.#connectPromise = this.#connect();
return await this.#connectPromise;
} finally {
this.#trace("finished reconfgure");
this.#connectPromise = undefined;
}
}
async close() {
this.#destroy = true;
if (this.#connectPromise != undefined) {
await this.#connectPromise;
}
this.#isReady = false;
this.#clientSideCache?.onPoolClose();
if (this.#scanTimer) {
clearInterval(this.#scanTimer);
this.#scanTimer = undefined;
}
const promises = [];
if (this.#sentinelClient !== undefined) {
if (this.#sentinelClient.isOpen) {
promises.push(this.#sentinelClient.close());
}
this.#sentinelClient = undefined;
}
for (const client of this.#masterClients) {
if (client.isOpen) {
promises.push(client.close());
}
}
this.#masterClients = [];
for (const client of this.#replicaClients) {
if (client.isOpen) {
promises.push(client.close());
}
}
this.#replicaClients = [];
await Promise.all(promises);
this.#pubSubProxy.destroy();
this.#isOpen = false;
}
// destroy has to be async because its stopping others async events, timers and the like
// and shouldn't return until its finished.
async destroy() {
this.#destroy = true;
if (this.#connectPromise != undefined) {
await this.#connectPromise;
}
this.#isReady = false;
this.#clientSideCache?.onPoolClose();
if (this.#scanTimer) {
clearInterval(this.#scanTimer);
this.#scanTimer = undefined;
}
if (this.#sentinelClient !== undefined) {
if (this.#sentinelClient.isOpen) {
this.#sentinelClient.destroy();
}
this.#sentinelClient = undefined;
}
for (const client of this.#masterClients) {
if (client.isOpen) {
client.destroy();
}
}
this.#masterClients = [];
for (const client of this.#replicaClients) {
if (client.isOpen) {
client.destroy();
}
}
this.#replicaClients = [];
this.#pubSubProxy.destroy();
this.#isOpen = false
this.#destroy = false;
}
async subscribe<T extends boolean = false>(
channels: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return this.#pubSubProxy.subscribe(channels, listener, bufferMode);
}
async unsubscribe<T extends boolean = false>(
channels?: string | Array<string>,
listener?: PubSubListener<boolean>,
bufferMode?: T
) {
return this.#pubSubProxy.unsubscribe(channels, listener, bufferMode);
}
async pSubscribe<T extends boolean = false>(
patterns: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return this.#pubSubProxy.pSubscribe(patterns, listener, bufferMode);
}
async pUnsubscribe<T extends boolean = false>(
patterns?: string | Array<string>,
listener?: PubSubListener<T>,
bufferMode?: T
) {
return this.#pubSubProxy.pUnsubscribe(patterns, listener, bufferMode);
}
// observe/analyze/transform remediation functions
async observe() {
for (const node of this.#sentinelRootNodes) {
let client: RedisClientType<typeof RedisSentinelModule, {}, {}, RespVersions, {}> | undefined;
try {
this.#trace(`observe: trying to connect to sentinel: ${node.host}:${node.port}`)
client = this.#createClient(node, this.#sentinelClientOptions, false) as unknown as RedisClientType<typeof RedisSentinelModule, {}, {}, RespVersions, {}>;
client.on('error', (err) => this.emit('error', `obseve client error: ${err}`));
await client.connect();
this.#trace(`observe: connected to sentinel`)
const [sentinelData, masterData, replicaData] = await Promise.all([
client.sentinel.sentinelSentinels(this.#name),
client.sentinel.sentinelMaster(this.#name),
client.sentinel.sentinelReplicas(this.#name)
]);
this.#trace("observe: got all sentinel data");
const ret = {
sentinelConnected: node,
sentinelData: sentinelData,
masterData: masterData,
replicaData: replicaData,
currentMaster: this.getMasterNode(),
currentReplicas: this.getReplicaNodes(),
currentSentinel: this.getSentinelNode(),
replicaPoolSize: this.#replicaPoolSize,
useReplicas: this.useReplicas
}
return ret;
} catch (err) {
this.#trace(`observe: error ${err}`);
this.emit('error', err);
} finally {
if (client !== undefined && client.isOpen) {
this.#trace(`observe: destroying sentinel client`);
client.destroy();
}
}
}
this.#trace(`observe: none of the sentinels are available`);
throw new Error('None of the sentinels are available');
}
analyze(observed: Awaited<ReturnType<RedisSentinelInternal<M, F, S, RESP, TYPE_MAPPING>["observe"]>>) {
let master = parseNode(observed.masterData);
if (master === undefined) {
this.#trace(`analyze: no valid master node because ${observed.masterData.flags}`);
throw new Error("no valid master node");
}
if (master.host === observed.currentMaster?.host && master.port === observed.currentMaster?.port) {
this.#trace(`analyze: master node hasn't changed from ${observed.currentMaster?.host}:${observed.currentMaster?.port}`);
master = undefined;
} else {
this.#trace(`analyze: master node has changed to ${master.host}:${master.port} from ${observed.currentMaster?.host}:${observed.currentMaster?.port}`);
}
let sentinel: RedisNode | undefined = observed.sentinelConnected;
if (sentinel.host === observed.currentSentinel?.host && sentinel.port === observed.currentSentinel.port) {
this.#trace(`analyze: sentinel node hasn't changed`);
sentinel = undefined;
} else {
this.#trace(`analyze: sentinel node has changed to ${sentinel.host}:${sentinel.port}`);
}
const replicasToClose: Array<RedisNode> = [];
const replicasToOpen = new Map<RedisNode, number>();
const desiredSet = new Set<string>();
const seen = new Set<string>();
if (observed.useReplicas) {
const replicaList = createNodeList(observed.replicaData)
for (const node of replicaList) {
desiredSet.add(JSON.stringify(node));
}
for (const [node, value] of observed.currentReplicas) {
if (!desiredSet.has(JSON.stringify(node))) {
replicasToClose.push(node);
this.#trace(`analyze: adding ${node.host}:${node.port} to replicsToClose`);
} else {
seen.add(JSON.stringify(node));
if (value != observed.replicaPoolSize) {
replicasToOpen.set(node, observed.replicaPoolSize - value);
this.#trace(`analyze: adding ${node.host}:${node.port} to replicsToOpen`);
}
}
}
for (const node of replicaList) {
if (!seen.has(JSON.stringify(node))) {
replicasToOpen.set(node, observed.replicaPoolSize);
this.#trace(`analyze: adding ${node.host}:${node.port} to replicsToOpen`);
}
}
}
const ret = {
sentinelList: [observed.sentinelConnected].concat(createNodeList(observed.sentinelData)),
epoch: Number(observed.masterData['config-epoch']),
sentinelToOpen: sentinel,
masterToOpen: master,
replicasToClose: replicasToClose,
replicasToOpen: replicasToOpen,
};
return ret;
}
async transform(analyzed: ReturnType<RedisSentinelInternal<M, F, S, RESP, TYPE_MAPPING>["analyze"]>) {
this.#trace("transform: enter");
let promises: Array<Promise<any>> = [];
if (analyzed.sentinelToOpen) {
this.#trace(`transform: opening a new sentinel`);
if (this.#sentinelClient !== undefined && this.#sentinelClient.isOpen) {
this.#trace(`transform: destroying old sentinel as open`);
this.#sentinelClient.destroy()
this.#sentinelClient = undefined;
} else {
this.#trace(`transform: not destroying old sentinel as not open`);
}
this.#trace(`transform: creating new sentinel to ${analyzed.sentinelToOpen.host}:${analyzed.sentinelToOpen.port}`);
const node = analyzed.sentinelToOpen;
const client = this.#createClient(analyzed.sentinelToOpen, this.#sentinelClientOptions, false);
client.on('error', (err: Error) => {
if (this.#passthroughClientErrorEvents) {
this.emit('error', new Error(`Sentinel Client (${node.host}:${node.port}): ${err.message}`, { cause: err }));
}
const event: ClientErrorEvent = {
type: 'SENTINEL',
node: clientSocketToNode(client.options!.socket!),
error: err
};
this.emit('client-error', event);
this.#reset();
});
this.#sentinelClient = client;
this.#trace(`transform: adding sentinel client connect() to promise list`);
const promise = this.#sentinelClient.connect().then((client) => { return this.#createPubSub(client) });
promises.push(promise);
this.#trace(`created sentinel client to ${analyzed.sentinelToOpen.host}:${analyzed.sentinelToOpen.port}`);
const event: RedisSentinelEvent = {
type: "SENTINEL_CHANGE",
node: analyzed.sentinelToOpen
}
this.#trace(`transform: emiting topology-change event for sentinel_change`);
if (!this.emit('topology-change', event)) {
this.#trace(`transform: emit for topology-change for sentinel_change returned false`);
}
}
if (analyzed.masterToOpen) {
this.#trace(`transform: opening a new master`);
const masterPromises = [];
const masterWatches: Array<boolean> = [];
this.#trace(`transform: destroying old masters if open`);
for (const client of this.#masterClients) {
masterWatches.push(client.isWatching || client.isDirtyWatch);
if (client.isOpen) {
client.destroy()
}
}
this.#masterClients = [];
this.#trace(`transform: creating all master clients and adding connect promises`);
for (let i = 0; i < this.#masterPoolSize; i++) {
const node = analyzed.masterToOpen;
const client = this.#createClient(analyzed.masterToOpen, this.#nodeClientOptions);
client.on('error', (err: Error) => {
if (this.#passthroughClientErrorEvents) {
this.emit('error', new Error(`Master Client (${node.host}:${node.port}): ${err.message}`, { cause: err }));
}
const event: ClientErrorEvent = {
type: "MASTER",
node: clientSocketToNode(client.options!.socket!),
error: err
};
this.emit('client-error', event);
});
if (masterWatches[i]) {
client.setDirtyWatch("sentinel config changed in middle of a WATCH Transaction");
}
this.#masterClients.push(client);
masterPromises.push(client.connect());
this.#trace(`created master client to ${analyzed.masterToOpen.host}:${analyzed.masterToOpen.port}`);
}
this.#trace(`transform: adding promise to change #pubSubProxy node`);
masterPromises.push(this.#pubSubProxy.changeNode(analyzed.masterToOpen));
promises.push(...masterPromises);
const event: RedisSentinelEvent = {
type: "MASTER_CHANGE",
node: analyzed.masterToOpen
}
this.#trace(`transform: emiting topology-change event for master_change`);
if (!this.emit('topology-change', event)) {
this.#trace(`transform: emit for topology-change for master_change returned false`);
}
this.#configEpoch++;
}
const replicaCloseSet = new Set<string>();
for (const node of analyzed.replicasToClose) {
const str = JSON.stringify(node);
replicaCloseSet.add(str);
}
const newClientList: Array<RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>> = [];
const removedSet = new Set<string>();
for (const replica of this.#replicaClients) {
const node = clientSocketToNode(replica.options!.socket!);
const str = JSON.stringify(node);
if (replicaCloseSet.has(str) || !replica.isOpen) {
if (replica.isOpen) {
const sockOpts = replica.options?.socket as TcpNetConnectOpts | undefined;
this.#trace(`destroying replica client to ${sockOpts?.host}:${sockOpts?.port}`);
replica.destroy()
}
if (!removedSet.has(str)) {
const event: RedisSentinelEvent = {
type: "REPLICA_REMOVE",
node: node
}
this.emit('topology-change', event);
removedSet.add(str);
}
} else {
newClientList.push(replica);
}
}
this.#replicaClients = newClientList;
if (analyzed.replicasToOpen.size != 0) {
for (const [node, size] of analyzed.replicasToOpen) {
for (let i = 0; i < size; i++) {
const client = this.#createClient(node, this.#nodeClientOptions);
client.on('error', (err: Error) => {
if (this.#passthroughClientErrorEvents) {
this.emit('error', new Error(`Replica Client (${node.host}:${node.port}): ${err.message}`, { cause: err }));
}
const event: ClientErrorEvent = {
type: "REPLICA",
node: clientSocketToNode(client.options!.socket!),
error: err
};
this.emit('client-error', event);
});
this.#replicaClients.push(client);
promises.push(client.connect());
this.#trace(`created replica client to ${node.host}:${node.port}`);
}
const event: RedisSentinelEvent = {
type: "REPLICA_ADD",
node: node
}
this.emit('topology-change', event);
}
}
if (analyzed.sentinelList.length != this.#sentinelRootNodes.length) {
this.#sentinelRootNodes = analyzed.sentinelList;
const event: RedisSentinelEvent = {
type: "SENTINE_LIST_CHANGE",
size: analyzed.sentinelList.length
}
this.emit('topology-change', event);
}
await Promise.all(promises);
this.#trace("transform: exit");
}
// introspection functions
getMasterNode(): RedisNode | undefined {
if (this.#masterClients.length == 0) {
return undefined;
}
for (const master of this.#masterClients) {
if (master.isReady) {
return clientSocketToNode(master.options!.socket!);
}
}
return undefined;
}
getSentinelNode(): RedisNode | undefined {
if (this.#sentinelClient === undefined) {
return undefined;
}
return clientSocketToNode(this.#sentinelClient.options!.socket!);
}
getReplicaNodes(): Map<RedisNode, number> {
const ret = new Map<RedisNode, number>();
const initialMap = new Map<string, number>();
for (const replica of this.#replicaClients) {
const node = clientSocketToNode(replica.options!.socket!);
const hash = JSON.stringify(node);
if (replica.isReady) {
initialMap.set(hash, (initialMap.get(hash) ?? 0) + 1);
} else {
if (!initialMap.has(hash)) {
initialMap.set(hash, 0);
}
}
}
for (const [key, value] of initialMap) {
ret.set(JSON.parse(key) as RedisNode, value);
}
return ret;
}
setTracer(tracer?: Array<string>) {
if (tracer) {
this.#trace = (msg: string) => { tracer.push(msg) };
} else {
// empty function is faster than testing if something is defined or not
this.#trace = () => { };
}
}
}
export class RedisSentinelFactory extends EventEmitter {
options: RedisSentinelOptions;
#sentinelRootNodes: Array<RedisNode>;
#replicaIdx: number = -1;
constructor(options: RedisSentinelOptions) {
super();
this.options = options;
this.#sentinelRootNodes = options.sentinelRootNodes;
}
async updateSentinelRootNodes() {
for (const node of this.#sentinelRootNodes) {
const client = RedisClient.create({
...this.options.sentinelClientOptions,
socket: {
...this.options.sentinelClientOptions?.socket,
host: node.host,
port: node.port,
reconnectStrategy: false
},
modules: RedisSentinelModule
}).on('error', (err) => this.emit(`updateSentinelRootNodes: ${err}`));
try {
await client.connect();
} catch {
if (client.isOpen) {
client.destroy();
}
continue;
}
try {
const sentinelData = await client.sentinel.sentinelSentinels(this.options.name);
this.#sentinelRootNodes = [node].concat(createNodeList(sentinelData));
return;
} finally {
client.destroy();
}
}
throw new Error("Couldn't connect to any sentinel node");
}
async getMasterNode() {
let connected = false;
for (const node of this.#sentinelRootNodes) {
const client = RedisClient.create({
...this.options.sentinelClientOptions,
socket: {
...this.options.sentinelClientOptions?.socket,
host: node.host,
port: node.port,
reconnectStrategy: false
},
modules: RedisSentinelModule
}).on('error', err => this.emit(`getMasterNode: ${err}`));
try {
await client.connect();
} catch {
if (client.isOpen) {
client.destroy();
}
continue;
}
connected = true;
try {
const masterData = await client.sentinel.sentinelMaster(this.options.name);
let master = parseNode(masterData);
if (master === undefined) {
continue;
}
return master;
} finally {
client.destroy();
}
}
if (connected) {
throw new Error("Master Node Not Enumerated");
}
throw new Error("couldn't connect to any sentinels");
}
async getMasterClient() {
const master = await this.getMasterNode();
return RedisClient.create({
...this.options.nodeClientOptions,
socket: {
...this.options.nodeClientOptions?.socket,
host: master.host,
port: master.port
}
});
}
async getReplicaNodes() {
let connected = false;
for (const node of this.#sentinelRootNodes) {
const client = RedisClient.create({
...this.options.sentinelClientOptions,
socket: {
...this.options.sentinelClientOptions?.socket,
host: node.host,
port: node.port,
reconnectStrategy: false
},
modules: RedisSentinelModule
}).on('error', err => this.emit(`getReplicaNodes: ${err}`));
try {
await client.connect();
} catch {
if (client.isOpen) {
client.destroy();
}
continue;
}
connected = true;
try {
const replicaData = await client.sentinel.sentinelReplicas(this.options.name);
const replicas = createNodeList(replicaData);
if (replicas.length == 0) {
continue;
}
return replicas;
} finally {
client.destroy();
}
}
if (connected) {
throw new Error("No Replicas Nodes Enumerated");
}
throw new Error("couldn't connect to any sentinels");
}
async getReplicaClient() {
const replicas = await this.getReplicaNodes();
if (replicas.length == 0) {
throw new Error("no available replicas");
}
this.#replicaIdx++;
if (this.#replicaIdx >= replicas.length) {
this.#replicaIdx = 0;
}
return RedisClient.create({
...this.options.nodeClientOptions,
socket: {
...this.options.nodeClientOptions?.socket,
host: replicas[this.#replicaIdx].host,
port: replicas[this.#replicaIdx].port
}
});
}
}