1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00

Sentinel Support (#2664)

* redis client socket changes needed for sentinel

* Sentinel Implementation [EXPERIMENTAL]

* add pooling

* improve typing with SENTINEL_ client members

* cleanup - remove unused comments / commented code

* small sendCommand change + revert change to tsconfig

* add more sentinel commands needed for testing.

* lots of fixups and a reasonable first pass test suite

* add a timer option to update topology in background

+ don't need both sentinel client and pubsubclient
+ nits

* format all the things

* more progress

* small cleanup

* try to group promises together to minimize the internal await points

* redo events, to keep a single topology event to listen on

* nits + readme

* add RedisSentinelFactory to provide lower level access to sentinel

* nit

* update

* add RedisSentinelClient/Type for leased clients
	returned by aquire()
	used by function passed to use()

* add self for private access + improve emitting

* nit

* nits

* improve testing

- improve steady state waiting between tests
- get masternode from client, not from sentinels themselves (not consistent and then client isn't changing as we expect
- provide extensive logging/tracing on test errors
	- provide a very low impact tracing mechanism withinthe code that only really impacts code when tracing is in use.

* ismall nit for typing

* bunch of changes

- harden testing
	- don't use sentinel[0] for debug error dump as could be downed by a test
	- increase time for sentinel down test to 30s (caused a long taking failover)
- add client-error even / don't pass throuh client errors as errors option for pubsub proxy
- when passing through cient errors as error events, dont pass the event, but the Error object, as only Error objects are supposed to be on 'error'
	-

* improve pub sub proxy.

save the refference to all channel/pattern listeners up front on creation, dont hve to fetch the object each time, as it doesn't change.

removes race condition between setting up the listener and the pub sub node going down and being recreated.

* wrap the passed through RedisClient error to make clear where its coming from.

* refactor sentinel object / factory tests apart

* harden tests a little bit more

* add pipeline test

* add scripts/function tests + fixups / cleanups to get them to work

* change to use redis-stack-server for redis nodes to enable module testing

* fix test, forgot to return in use function with module

* rename test

* improve tests to test with redis/sentinel nodes with and withput passwords

this tests that we are handling the nodeClientOptions and sentinelClientOptions correctly

* cleanup for RedisSentinel type generic typing in tests

* remove debugLog, just rely on traace mechanism

* added multi tests for script/function/modules

* don't emit errors on lease object, only on main object

* improve testing

* extract out common code to reduce duplication

* nit

* nits

* nit

* remove SENTINEL_... commands from main client, load them via module interface

* missed adding RedisSentinelModule to correct places in RedisSentinelFactory

* nits

* fix test logging on error

1) it takes a lot of time now, so needs larger timeout
2) docker logs can be large, so need to increase maxBuffer size so doesn't error (and break test clean up)

* invalidate watches when client reconnects

+ provide API for other wrapper clients to also create invalid watch states programatically.

Reasoning: if a user does a WATCH and then the client reconnects, the watch is no longer active, but if a user does a MULTI/EXEC after that, they wont know, and since the WATCH is no longer active, the request has no protection.

The API is needed for when a wrapper client (say sentinel, cluster) might close the underlying client and reopen a new one transparently to the user.  Just like in the reconnection case, this should result in an error, but its up to the wrapping client to provide the appropriate error

* remove WATCH and UNWATCH command files, fix WATCH and UNWATCH return type, some more cleanups

* missing file in last commit :P

* support for custom message in `WatchError`

* setDirtyWatch

* update watch docs

* fixes needed

* wip

* get functions/modules to work again

self -> _self change

* reuse leased client on pipelined commands.

though I realize this implementation, really only works after the first write command.

unsure this is worth it.

* test tweaks

* nit

* change how "sentinel" object client works, allow it to be reserved

no more semaphore type counting

* review

* fixes to get more tests to pass

* handle dirtyWatch and watchEpoch in reset and resetIfDirty

* "fix", but not correct, needs more work

* fix pubsub proxy

* remove timeout from steadyState function in test, caused problems

* improve restarting nodes

* fix pubsub proxy and test

---------

Co-authored-by: Leibale Eidelman <me@leibale.com>
This commit is contained in:
Shaya Potter
2024-02-05 16:48:33 +02:00
committed by GitHub
parent 0cd6915698
commit 8f3a276509
34 changed files with 6773 additions and 1524 deletions

View File

@@ -268,7 +268,7 @@ export default class RedisCommandsQueue {
}
getPubSubListeners(type: PubSubType) {
return this.#pubSub.getTypeListeners(type);
return this.#pubSub.listeners[type];
}
monitor(callback: MonitorCallback, options?: CommandOptions) {

View File

@@ -6,7 +6,7 @@ import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumen
import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors';
import { URL } from 'node:url';
import { TcpSocketConnectOpts } from 'node:net';
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { PUBSUB_TYPE, PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument, ReplyWithTypeMapping, SimpleStringReply } from '../RESP/types';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
import { RedisMultiQueuedCommand } from '../multi-command';
@@ -14,6 +14,7 @@ import HELLO, { HelloOptions } from '../commands/HELLO';
import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
import { RedisPoolOptions, RedisClientPool } from './pool';
import { RedisVariadicArgument, pushVariadicArguments } from '../commands/generic-transformers';
export interface RedisClientOptions<
M extends RedisModules = RedisModules,
@@ -279,6 +280,9 @@ export default class RedisClient<
#monitorCallback?: MonitorCallback<TYPE_MAPPING>;
private _self = this;
private _commandOptions?: CommandOptions<TYPE_MAPPING>;
#dirtyWatch?: string;
#epoch: number;
#watchEpoch?: number;
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
return this._self.#options;
@@ -296,11 +300,20 @@ export default class RedisClient<
return this._self.#queue.isPubSubActive;
}
get isWatching() {
return this._self.#watchEpoch !== undefined;
}
setDirtyWatch(msg: string) {
this._self.#dirtyWatch = msg;
}
constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
super();
this.#options = this.#initiateOptions(options);
this.#queue = this.#initiateQueue();
this.#socket = this.#initiateSocket();
this.#epoch = 0;
}
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
@@ -440,6 +453,7 @@ export default class RedisClient<
})
.on('connect', () => this.emit('connect'))
.on('ready', () => {
this.#epoch++;
this.emit('ready');
this.#setPingTimer();
this.#maybeScheduleWrite();
@@ -596,7 +610,7 @@ export default class RedisClient<
select = this.SELECT;
#pubSubCommand(promise: Promise<void> | undefined) {
#pubSubCommand<T>(promise: Promise<T> | undefined) {
if (promise === undefined) return Promise.resolve();
this.#scheduleWrite();
@@ -610,7 +624,7 @@ export default class RedisClient<
): Promise<void> {
return this._self.#pubSubCommand(
this._self.#queue.subscribe(
PubSubType.CHANNELS,
PUBSUB_TYPE.CHANNELS,
channels,
listener,
bufferMode
@@ -627,7 +641,7 @@ export default class RedisClient<
): Promise<void> {
return this._self.#pubSubCommand(
this._self.#queue.unsubscribe(
PubSubType.CHANNELS,
PUBSUB_TYPE.CHANNELS,
channels,
listener,
bufferMode
@@ -635,7 +649,7 @@ export default class RedisClient<
);
}
unsubscribe = this.UNSUBSCRIBE
unsubscribe = this.UNSUBSCRIBE;
PSUBSCRIBE<T extends boolean = false>(
patterns: string | Array<string>,
@@ -644,7 +658,7 @@ export default class RedisClient<
): Promise<void> {
return this._self.#pubSubCommand(
this._self.#queue.subscribe(
PubSubType.PATTERNS,
PUBSUB_TYPE.PATTERNS,
patterns,
listener,
bufferMode
@@ -661,7 +675,7 @@ export default class RedisClient<
): Promise<void> {
return this._self.#pubSubCommand(
this._self.#queue.unsubscribe(
PubSubType.PATTERNS,
PUBSUB_TYPE.PATTERNS,
patterns,
listener,
bufferMode
@@ -678,7 +692,7 @@ export default class RedisClient<
): Promise<void> {
return this._self.#pubSubCommand(
this._self.#queue.subscribe(
PubSubType.SHARDED,
PUBSUB_TYPE.SHARDED,
channels,
listener,
bufferMode
@@ -695,7 +709,7 @@ export default class RedisClient<
): Promise<void> {
return this._self.#pubSubCommand(
this._self.#queue.unsubscribe(
PubSubType.SHARDED,
PUBSUB_TYPE.SHARDED,
channels,
listener,
bufferMode
@@ -705,6 +719,24 @@ export default class RedisClient<
sUnsubscribe = this.SUNSUBSCRIBE;
async WATCH(key: RedisVariadicArgument) {
const reply = await this._self.sendCommand(
pushVariadicArguments(['WATCH'], key)
);
this._self.#watchEpoch ??= this._self.#epoch;
return reply as unknown as ReplyWithTypeMapping<SimpleStringReply<'OK'>, TYPE_MAPPING>;
}
watch = this.WATCH;
async UNWATCH() {
const reply = await this._self.sendCommand(['UNWATCH']);
this._self.#watchEpoch = undefined;
return reply as unknown as ReplyWithTypeMapping<SimpleStringReply<'OK'>, TYPE_MAPPING>;
}
unwatch = this.UNWATCH;
getPubSubListeners(type: PubSubType) {
return this._self.#queue.getPubSubListeners(type);
}
@@ -781,10 +813,23 @@ export default class RedisClient<
commands: Array<RedisMultiQueuedCommand>,
selectedDB?: number
) {
const dirtyWatch = this._self.#dirtyWatch;
this._self.#dirtyWatch = undefined;
const watchEpoch = this._self.#watchEpoch;
this._self.#watchEpoch = undefined;
if (!this._self.#socket.isOpen) {
throw new ClientClosedError();
}
if (dirtyWatch) {
throw new WatchError(dirtyWatch);
}
if (watchEpoch && watchEpoch !== this._self.#epoch) {
throw new WatchError('Client reconnected after WATCH');
}
const typeMapping = this._commandOptions?.typeMapping,
chainId = Symbol('MULTI Chain'),
promises = [
@@ -910,6 +955,8 @@ export default class RedisClient<
await Promise.all(promises);
this._self.#selectedDB = selectedDB;
this._self.#monitorCallback = undefined;
this._self.#dirtyWatch = undefined;
this._self.#watchEpoch = undefined;
}
/**
@@ -934,6 +981,11 @@ export default class RedisClient<
shouldReset = true;
}
if (this._self.#dirtyWatch || this._self.#watchEpoch) {
console.warn('Returning a client with active WATCH');
shouldReset = true;
}
if (shouldReset) {
return this.reset();
}

View File

@@ -1,151 +1,151 @@
import { strict as assert } from 'node:assert';
import { PubSub, PubSubType } from './pub-sub';
import { PubSub, PUBSUB_TYPE } from './pub-sub';
describe('PubSub', () => {
const TYPE = PubSubType.CHANNELS,
CHANNEL = 'channel',
LISTENER = () => {};
const TYPE = PUBSUB_TYPE.CHANNELS,
CHANNEL = 'channel',
LISTENER = () => {};
describe('subscribe to new channel', () => {
function createAndSubscribe() {
const pubSub = new PubSub(),
command = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.equal(pubSub.isActive, true);
assert.ok(command);
assert.equal(command.channelsCounter, 1);
return {
pubSub,
command
};
}
describe('subscribe to new channel', () => {
function createAndSubscribe() {
const pubSub = new PubSub(),
command = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
it('resolve', () => {
const { pubSub, command } = createAndSubscribe();
command.resolve();
assert.equal(pubSub.isActive, true);
assert.ok(command);
assert.equal(command.channelsCounter, 1);
assert.equal(pubSub.isActive, true);
});
return {
pubSub,
command
};
}
it('reject', () => {
const { pubSub, command } = createAndSubscribe();
assert.ok(command.reject);
command.reject();
it('resolve', () => {
const { pubSub, command } = createAndSubscribe();
assert.equal(pubSub.isActive, false);
});
command.resolve();
assert.equal(pubSub.isActive, true);
});
it('subscribe to already subscribed channel', () => {
it('reject', () => {
const { pubSub, command } = createAndSubscribe();
assert.ok(command.reject);
command.reject();
assert.equal(pubSub.isActive, false);
});
});
it('subscribe to already subscribed channel', () => {
const pubSub = new PubSub(),
firstSubscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(firstSubscribe);
const secondSubscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(secondSubscribe);
firstSubscribe.resolve();
assert.equal(
pubSub.subscribe(TYPE, CHANNEL, LISTENER),
undefined
);
});
it('unsubscribe all', () => {
const pubSub = new PubSub();
const subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(subscribe);
subscribe.resolve();
assert.equal(pubSub.isActive, true);
const unsubscribe = pubSub.unsubscribe(TYPE);
assert.equal(pubSub.isActive, true);
assert.ok(unsubscribe);
unsubscribe.resolve();
assert.equal(pubSub.isActive, false);
});
describe('unsubscribe from channel', () => {
it('when not subscribed', () => {
const pubSub = new PubSub(),
unsubscribe = pubSub.unsubscribe(TYPE, CHANNEL);
assert.ok(unsubscribe);
unsubscribe.resolve();
assert.equal(pubSub.isActive, false);
});
it('when already subscribed', () => {
const pubSub = new PubSub(),
subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(subscribe);
subscribe.resolve();
assert.equal(pubSub.isActive, true);
const unsubscribe = pubSub.unsubscribe(TYPE, CHANNEL);
assert.equal(pubSub.isActive, true);
assert.ok(unsubscribe);
unsubscribe.resolve();
assert.equal(pubSub.isActive, false);
});
});
describe('unsubscribe from listener', () => {
it('when it\'s the only listener', () => {
const pubSub = new PubSub(),
subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(subscribe);
subscribe.resolve();
assert.equal(pubSub.isActive, true);
const unsubscribe = pubSub.unsubscribe(TYPE, CHANNEL, LISTENER);
assert.ok(unsubscribe);
unsubscribe.resolve();
assert.equal(pubSub.isActive, false);
});
it('when there are more listeners', () => {
const pubSub = new PubSub(),
subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(subscribe);
subscribe.resolve();
assert.equal(pubSub.isActive, true);
assert.equal(
pubSub.subscribe(TYPE, CHANNEL, () => { }),
undefined
);
assert.equal(
pubSub.unsubscribe(TYPE, CHANNEL, LISTENER),
undefined
);
});
describe('non-existing listener', () => {
it('on subscribed channel', () => {
const pubSub = new PubSub(),
firstSubscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(firstSubscribe);
const secondSubscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(secondSubscribe);
firstSubscribe.resolve();
assert.equal(
pubSub.subscribe(TYPE, CHANNEL, LISTENER),
undefined
);
});
it('unsubscribe all', () => {
const pubSub = new PubSub();
const subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(subscribe);
subscribe.resolve();
assert.equal(pubSub.isActive, true);
const unsubscribe = pubSub.unsubscribe(TYPE);
assert.equal(
pubSub.unsubscribe(TYPE, CHANNEL, () => { }),
undefined
);
assert.equal(pubSub.isActive, true);
assert.ok(unsubscribe);
unsubscribe.resolve();
});
it('on unsubscribed channel', () => {
const pubSub = new PubSub();
assert.ok(pubSub.unsubscribe(TYPE, CHANNEL, () => { }));
assert.equal(pubSub.isActive, false);
});
});
describe('unsubscribe from channel', () => {
it('when not subscribed', () => {
const pubSub = new PubSub(),
unsubscribe = pubSub.unsubscribe(TYPE, CHANNEL);
assert.ok(unsubscribe);
unsubscribe.resolve();
assert.equal(pubSub.isActive, false);
});
it('when already subscribed', () => {
const pubSub = new PubSub(),
subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(subscribe);
subscribe.resolve();
assert.equal(pubSub.isActive, true);
const unsubscribe = pubSub.unsubscribe(TYPE, CHANNEL);
assert.equal(pubSub.isActive, true);
assert.ok(unsubscribe);
unsubscribe.resolve();
assert.equal(pubSub.isActive, false);
});
});
describe('unsubscribe from listener', () => {
it('when it\'s the only listener', () => {
const pubSub = new PubSub(),
subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(subscribe);
subscribe.resolve();
assert.equal(pubSub.isActive, true);
const unsubscribe = pubSub.unsubscribe(TYPE, CHANNEL, LISTENER);
assert.ok(unsubscribe);
unsubscribe.resolve();
assert.equal(pubSub.isActive, false);
});
it('when there are more listeners', () => {
const pubSub = new PubSub(),
subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(subscribe);
subscribe.resolve();
assert.equal(pubSub.isActive, true);
assert.equal(
pubSub.subscribe(TYPE, CHANNEL, () => {}),
undefined
);
assert.equal(
pubSub.unsubscribe(TYPE, CHANNEL, LISTENER),
undefined
);
});
describe('non-existing listener', () => {
it('on subscribed channel', () => {
const pubSub = new PubSub(),
subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER);
assert.ok(subscribe);
subscribe.resolve();
assert.equal(pubSub.isActive, true);
assert.equal(
pubSub.unsubscribe(TYPE, CHANNEL, () => {}),
undefined
);
assert.equal(pubSub.isActive, true);
});
it('on unsubscribed channel', () => {
const pubSub = new PubSub();
assert.ok(pubSub.unsubscribe(TYPE, CHANNEL, () => {}));
assert.equal(pubSub.isActive, false);
});
});
});
});
});

View File

@@ -1,24 +1,28 @@
import { RedisArgument } from '../RESP/types';
import { CommandToWrite } from './commands-queue';
export enum PubSubType {
CHANNELS = 'CHANNELS',
PATTERNS = 'PATTERNS',
SHARDED = 'SHARDED'
}
export const PUBSUB_TYPE = {
CHANNELS: 'CHANNELS',
PATTERNS: 'PATTERNS',
SHARDED: 'SHARDED'
} as const;
export type PUBSUB_TYPE = typeof PUBSUB_TYPE;
export type PubSubType = PUBSUB_TYPE[keyof PUBSUB_TYPE];
const COMMANDS = {
[PubSubType.CHANNELS]: {
[PUBSUB_TYPE.CHANNELS]: {
subscribe: Buffer.from('subscribe'),
unsubscribe: Buffer.from('unsubscribe'),
message: Buffer.from('message')
},
[PubSubType.PATTERNS]: {
[PUBSUB_TYPE.PATTERNS]: {
subscribe: Buffer.from('psubscribe'),
unsubscribe: Buffer.from('punsubscribe'),
message: Buffer.from('pmessage')
},
[PubSubType.SHARDED]: {
[PUBSUB_TYPE.SHARDED]: {
subscribe: Buffer.from('ssubscribe'),
unsubscribe: Buffer.from('sunsubscribe'),
message: Buffer.from('smessage')
@@ -37,7 +41,7 @@ export interface ChannelListeners {
export type PubSubTypeListeners = Map<string, ChannelListeners>;
type Listeners = Record<PubSubType, PubSubTypeListeners>;
export type PubSubListeners = Record<PubSubType, PubSubTypeListeners>;
export type PubSubCommand = (
Required<Pick<CommandToWrite, 'args' | 'channelsCounter' | 'resolve'>> & {
@@ -48,16 +52,16 @@ export type PubSubCommand = (
export class PubSub {
static isStatusReply(reply: Array<Buffer>): boolean {
return (
COMMANDS[PubSubType.CHANNELS].subscribe.equals(reply[0]) ||
COMMANDS[PubSubType.CHANNELS].unsubscribe.equals(reply[0]) ||
COMMANDS[PubSubType.PATTERNS].subscribe.equals(reply[0]) ||
COMMANDS[PubSubType.PATTERNS].unsubscribe.equals(reply[0]) ||
COMMANDS[PubSubType.SHARDED].subscribe.equals(reply[0])
COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.equals(reply[0]) ||
COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.equals(reply[0]) ||
COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.equals(reply[0]) ||
COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.equals(reply[0]) ||
COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.equals(reply[0])
);
}
static isShardedUnsubscribe(reply: Array<Buffer>): boolean {
return COMMANDS[PubSubType.SHARDED].unsubscribe.equals(reply[0]);
return COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.equals(reply[0]);
}
static #channelsArray(channels: string | Array<string>) {
@@ -79,10 +83,10 @@ export class PubSub {
return this.#isActive;
}
#listeners: Listeners = {
[PubSubType.CHANNELS]: new Map(),
[PubSubType.PATTERNS]: new Map(),
[PubSubType.SHARDED]: new Map()
readonly listeners: PubSubListeners = {
[PUBSUB_TYPE.CHANNELS]: new Map(),
[PUBSUB_TYPE.PATTERNS]: new Map(),
[PUBSUB_TYPE.SHARDED]: new Map()
};
subscribe<T extends boolean>(
@@ -94,7 +98,7 @@ export class PubSub {
const args: Array<RedisArgument> = [COMMANDS[type].subscribe],
channelsArray = PubSub.#channelsArray(channels);
for (const channel of channelsArray) {
let channelListeners = this.#listeners[type].get(channel);
let channelListeners = this.listeners[type].get(channel);
if (!channelListeners || channelListeners.unsubscribing) {
args.push(channel);
}
@@ -104,7 +108,7 @@ export class PubSub {
// all channels are already subscribed, add listeners without issuing a command
for (const channel of channelsArray) {
PubSub.#listenersSet(
this.#listeners[type].get(channel)!,
this.listeners[type].get(channel)!,
returnBuffers
).add(listener);
}
@@ -119,14 +123,14 @@ export class PubSub {
resolve: () => {
this.#subscribing--;
for (const channel of channelsArray) {
let listeners = this.#listeners[type].get(channel);
let listeners = this.listeners[type].get(channel);
if (!listeners) {
listeners = {
unsubscribing: false,
buffers: new Set(),
strings: new Set()
};
this.#listeners[type].set(channel, listeners);
this.listeners[type].set(channel, listeners);
}
PubSub.#listenersSet(listeners, returnBuffers).add(listener);
@@ -167,9 +171,9 @@ export class PubSub {
channel: string,
listeners: ChannelListeners
) {
const existingListeners = this.#listeners[type].get(channel);
const existingListeners = this.listeners[type].get(channel);
if (!existingListeners) {
this.#listeners[type].set(channel, listeners);
this.listeners[type].set(channel, listeners);
return true;
}
@@ -213,7 +217,7 @@ export class PubSub {
listener?: PubSubListener<T>,
returnBuffers?: T
) {
const listeners = this.#listeners[type];
const listeners = this.listeners[type];
if (!channels) {
return this.#unsubscribeCommand(
[COMMANDS[type].unsubscribe],
@@ -306,9 +310,9 @@ export class PubSub {
#updateIsActive() {
this.#isActive = (
this.#listeners[PubSubType.CHANNELS].size !== 0 ||
this.#listeners[PubSubType.PATTERNS].size !== 0 ||
this.#listeners[PubSubType.SHARDED].size !== 0 ||
this.listeners[PUBSUB_TYPE.CHANNELS].size !== 0 ||
this.listeners[PUBSUB_TYPE.PATTERNS].size !== 0 ||
this.listeners[PUBSUB_TYPE.SHARDED].size !== 0 ||
this.#subscribing !== 0
);
}
@@ -320,7 +324,7 @@ export class PubSub {
resubscribe() {
const commands = [];
for (const [type, listeners] of Object.entries(this.#listeners)) {
for (const [type, listeners] of Object.entries(this.listeners)) {
if (!listeners.size) continue;
this.#isActive = true;
@@ -341,24 +345,24 @@ export class PubSub {
}
handleMessageReply(reply: Array<Buffer>): boolean {
if (COMMANDS[PubSubType.CHANNELS].message.equals(reply[0])) {
if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PubSubType.CHANNELS,
PUBSUB_TYPE.CHANNELS,
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PubSubType.PATTERNS].message.equals(reply[0])) {
} else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PubSubType.PATTERNS,
PUBSUB_TYPE.PATTERNS,
reply[3],
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PubSubType.SHARDED].message.equals(reply[0])) {
} else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) {
this.#emitPubSubMessage(
PubSubType.SHARDED,
PUBSUB_TYPE.SHARDED,
reply[2],
reply[1]
);
@@ -369,8 +373,8 @@ export class PubSub {
}
removeShardedListeners(channel: string): ChannelListeners {
const listeners = this.#listeners[PubSubType.SHARDED].get(channel)!;
this.#listeners[PubSubType.SHARDED].delete(channel);
const listeners = this.listeners[PUBSUB_TYPE.SHARDED].get(channel)!;
this.listeners[PUBSUB_TYPE.SHARDED].delete(channel);
this.#updateIsActive();
return listeners;
}
@@ -382,7 +386,7 @@ export class PubSub {
pattern?: Buffer
): void {
const keyString = (pattern ?? channel).toString(),
listeners = this.#listeners[type].get(keyString);
listeners = this.listeners[type].get(keyString);
if (!listeners) return;
@@ -402,8 +406,4 @@ export class PubSub {
listener(messageString, channelString);
}
}
getTypeListeners(type: PubSubType): PubSubTypeListeners {
return this.#listeners[type];
}
}

View File

@@ -28,15 +28,15 @@ export interface RedisSocketCommonOptions {
reconnectStrategy?: false | number | ((retries: number, cause: Error) => false | Error | number);
}
type RedisNetSocketOptions = Partial<net.SocketConnectOpts> & {
export interface RedisNetConnectOpts extends Omit<Partial<net.TcpNetConnectOpts>, 'keepAlive'>, Partial<net.IpcNetConnectOpts>, RedisSocketCommonOptions {
tls?: false;
};
export interface RedisTlsSocketOptions extends tls.ConnectionOptions {
export interface RedisTlsSocketOptions extends Partial<tls.ConnectionOptions>, RedisSocketCommonOptions {
tls: true;
}
};
export type RedisSocketOptions = RedisSocketCommonOptions & (RedisNetSocketOptions | RedisTlsSocketOptions);
export type RedisSocketOptions = RedisNetConnectOpts | RedisTlsSocketOptions
interface CreateSocketReturn<T> {
connectEvent: string;

View File

@@ -2,7 +2,7 @@ import { RedisClusterClientOptions, RedisClusterOptions } from '.';
import { RootNodesUnavailableError } from '../errors';
import RedisClient, { RedisClientOptions, RedisClientType } from '../client';
import { EventEmitter } from 'node:stream';
import { ChannelListeners, PubSubType, PubSubTypeListeners } from '../client/pub-sub';
import { ChannelListeners, PUBSUB_TYPE, PubSubTypeListeners } from '../client/pub-sub';
import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
import calculateSlot from 'cluster-key-slot';
@@ -83,7 +83,7 @@ type PubSubNode<
);
type PubSubToResubscribe = Record<
PubSubType.CHANNELS | PubSubType.PATTERNS,
PUBSUB_TYPE['CHANNELS'] | PUBSUB_TYPE['PATTERNS'],
PubSubTypeListeners
>;
@@ -186,16 +186,16 @@ export default class RedisClusterSlots<
}
if (this.pubSubNode && !addressesInUse.has(this.pubSubNode.address)) {
const channelsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.CHANNELS),
patternsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.PATTERNS);
const channelsListeners = this.pubSubNode.client.getPubSubListeners(PUBSUB_TYPE.CHANNELS),
patternsListeners = this.pubSubNode.client.getPubSubListeners(PUBSUB_TYPE.PATTERNS);
this.pubSubNode.client.destroy();
if (channelsListeners.size || patternsListeners.size) {
promises.push(
this.#initiatePubSubClient({
[PubSubType.CHANNELS]: channelsListeners,
[PubSubType.PATTERNS]: patternsListeners
[PUBSUB_TYPE.CHANNELS]: channelsListeners,
[PUBSUB_TYPE.PATTERNS]: patternsListeners
})
);
}
@@ -526,8 +526,8 @@ export default class RedisClusterSlots<
.then(async client => {
if (toResubscribe) {
await Promise.all([
client.extendPubSubListeners(PubSubType.CHANNELS, toResubscribe[PubSubType.CHANNELS]),
client.extendPubSubListeners(PubSubType.PATTERNS, toResubscribe[PubSubType.PATTERNS])
client.extendPubSubListeners(PUBSUB_TYPE.CHANNELS, toResubscribe[PUBSUB_TYPE.CHANNELS]),
client.extendPubSubListeners(PUBSUB_TYPE.PATTERNS, toResubscribe[PUBSUB_TYPE.PATTERNS])
]);
}
@@ -568,7 +568,7 @@ export default class RedisClusterSlots<
await this.rediscover(client);
const redirectTo = await this.getShardedPubSubClient(channel);
await redirectTo.extendPubSubChannelListeners(
PubSubType.SHARDED,
PUBSUB_TYPE.SHARDED,
channel,
listeners
);

View File

@@ -1,19 +0,0 @@
import { strict as assert } from 'node:assert';
import testUtils, { GLOBAL } from '../test-utils';
import UNWATCH from './UNWATCH';
describe('UNWATCH', () => {
it('transformArguments', () => {
assert.deepEqual(
UNWATCH.transformArguments(),
['UNWATCH']
);
});
testUtils.testWithClient('client.unwatch', async client => {
assert.equal(
await client.unwatch(),
'OK'
);
}, GLOBAL.SERVERS.OPEN);
});

View File

@@ -1,10 +0,0 @@
import { SimpleStringReply, Command } from '../RESP/types';
export default {
FIRST_KEY_INDEX: undefined,
IS_READ_ONLY: true,
transformArguments() {
return ['UNWATCH'];
},
transformReply: undefined as unknown as () => SimpleStringReply
} as const satisfies Command;

View File

@@ -1,20 +0,0 @@
import { strict as assert } from 'node:assert';
import WATCH from './WATCH';
describe('WATCH', () => {
describe('transformArguments', () => {
it('string', () => {
assert.deepEqual(
WATCH.transformArguments('key'),
['WATCH', 'key']
);
});
it('array', () => {
assert.deepEqual(
WATCH.transformArguments(['1', '2']),
['WATCH', '1', '2']
);
});
});
});

View File

@@ -1,11 +0,0 @@
import { SimpleStringReply, Command } from '../RESP/types';
import { RedisVariadicArgument, pushVariadicArguments } from './generic-transformers';
export default {
FIRST_KEY_INDEX: undefined,
IS_READ_ONLY: true,
transformArguments(key: RedisVariadicArgument) {
return pushVariadicArguments(['WATCH'], key);
},
transformReply: undefined as unknown as () => SimpleStringReply
} as const satisfies Command;

View File

@@ -265,9 +265,7 @@ import TOUCH from './TOUCH';
import TTL from './TTL';
import TYPE from './TYPE';
import UNLINK from './UNLINK';
import UNWATCH from './UNWATCH';
import WAIT from './WAIT';
import WATCH from './WATCH';
import XACK from './XACK';
import XADD_NOMKSTREAM from './XADD_NOMKSTREAM';
import XADD from './XADD';
@@ -869,12 +867,8 @@ export default {
type: TYPE,
UNLINK,
unlink: UNLINK,
UNWATCH,
unwatch: UNWATCH,
WAIT,
wait: WAIT,
WATCH,
watch: WATCH,
XACK,
xAck: XACK,
XADD_NOMKSTREAM,

View File

@@ -5,8 +5,8 @@ export class AbortError extends Error {
}
export class WatchError extends Error {
constructor() {
super('One (or more) of the watched keys has been changed');
constructor(message = 'One (or more) of the watched keys has been changed') {
super(message);
}
}

View File

@@ -0,0 +1,12 @@
import { RedisArgument, MapReply, BlobStringReply, Command } from '../../RESP/types';
import { transformTuplesReply } from '../../commands/generic-transformers';
export default {
transformArguments(dbname: RedisArgument) {
return ['SENTINEL', 'MASTER', dbname];
},
transformReply: {
2: transformTuplesReply,
3: undefined as unknown as () => MapReply<BlobStringReply, BlobStringReply>
}
} as const satisfies Command;

View File

@@ -0,0 +1,8 @@
import { RedisArgument, SimpleStringReply, Command } from '../../RESP/types';
export default {
transformArguments(dbname: RedisArgument, host: RedisArgument, port: RedisArgument, quorum: RedisArgument) {
return ['SENTINEL', 'MONITOR', dbname, host, port, quorum];
},
transformReply: undefined as unknown as () => SimpleStringReply<'OK'>
} as const satisfies Command;

View File

@@ -0,0 +1,15 @@
import { RedisArgument, ArrayReply, BlobStringReply, MapReply, Command } from '../../RESP/types';
import { transformTuplesReply } from '../../commands/generic-transformers';
export default {
transformArguments(dbname: RedisArgument) {
return ['SENTINEL', 'REPLICAS', dbname];
},
transformReply: {
2: (reply: any) => {
const initial: Array<Record<string, BlobStringReply>> = [];
return reply.reduce((sentinels: Array<Record<string, BlobStringReply>>, x: any) => { sentinels.push(transformTuplesReply(x)); return sentinels }, initial);
},
3: undefined as unknown as () => ArrayReply<MapReply<BlobStringReply, BlobStringReply>>
}
} as const satisfies Command;

View File

@@ -0,0 +1,15 @@
import { RedisArgument, ArrayReply, MapReply, BlobStringReply, Command } from '../../RESP/types';
import { transformTuplesReply } from '../../commands/generic-transformers';
export default {
transformArguments(dbname: RedisArgument) {
return ['SENTINEL', 'SENTINELS', dbname];
},
transformReply: {
2: (reply: any) => {
const initial: Array<Record<string, BlobStringReply>> = [];
return reply.reduce((sentinels: Array<Record<string, BlobStringReply>>, x: any) => { sentinels.push(transformTuplesReply(x)); return sentinels }, initial);
},
3: undefined as unknown as () => ArrayReply<MapReply<BlobStringReply, BlobStringReply>>
}
} as const satisfies Command;

View File

@@ -0,0 +1,19 @@
import { RedisArgument, SimpleStringReply, Command } from '../../RESP/types';
export type SentinelSetOptions = Array<{
option: RedisArgument;
value: RedisArgument;
}>;
export default {
transformArguments(dbname: RedisArgument, options: SentinelSetOptions) {
const args = ['SENTINEL', 'SET', dbname];
for (const option of options) {
args.push(option.option, option.value);
}
return args;
},
transformReply: undefined as unknown as () => SimpleStringReply<'OK'>
} as const satisfies Command;

View File

@@ -0,0 +1,19 @@
import { RedisCommands } from '../../RESP/types';
import SENTINEL_MASTER from './SENTINEL_MASTER';
import SENTINEL_MONITOR from './SENTINEL_MONITOR';
import SENTINEL_REPLICAS from './SENTINEL_REPLICAS';
import SENTINEL_SENTINELS from './SENTINEL_SENTINELS';
import SENTINEL_SET from './SENTINEL_SET';
export default {
SENTINEL_SENTINELS,
sentinelSentinels: SENTINEL_SENTINELS,
SENTINEL_MASTER,
sentinelMaster: SENTINEL_MASTER,
SENTINEL_REPLICAS,
sentinelReplicas: SENTINEL_REPLICAS,
SENTINEL_MONITOR,
sentinelMonitor: SENTINEL_MONITOR,
SENTINEL_SET,
sentinelSet: SENTINEL_SET
} as const satisfies RedisCommands;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,6 @@
import { RedisModules } from '@redis/client';
import sentinel from './commands';
export default {
sentinel
} as const satisfies RedisModules;

View File

@@ -0,0 +1,215 @@
import COMMANDS from '../commands';
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command';
import { ReplyWithTypeMapping, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, TypeMapping } from '../RESP/types';
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
import { RedisSentinelType } from './types';
type CommandSignature<
REPLIES extends Array<unknown>,
C extends Command,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = (...args: Parameters<C['transformArguments']>) => RedisSentinelMultiCommandType<
[...REPLIES, ReplyWithTypeMapping<CommandReply<C, RESP>, TYPE_MAPPING>],
M,
F,
S,
RESP,
TYPE_MAPPING
>;
type WithCommands<
REPLIES extends Array<unknown>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof typeof COMMANDS]: CommandSignature<REPLIES, (typeof COMMANDS)[P], M, F, S, RESP, TYPE_MAPPING>;
};
type WithModules<
REPLIES extends Array<unknown>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof M]: {
[C in keyof M[P]]: CommandSignature<REPLIES, M[P][C], M, F, S, RESP, TYPE_MAPPING>;
};
};
type WithFunctions<
REPLIES extends Array<unknown>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = {
[L in keyof F]: {
[C in keyof F[L]]: CommandSignature<REPLIES, F[L][C], M, F, S, RESP, TYPE_MAPPING>;
};
};
type WithScripts<
REPLIES extends Array<unknown>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof S]: CommandSignature<REPLIES, S[P], M, F, S, RESP, TYPE_MAPPING>;
};
export type RedisSentinelMultiCommandType<
REPLIES extends Array<any>,
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = (
RedisSentinelMultiCommand<REPLIES> &
WithCommands<REPLIES, M, F, S, RESP, TYPE_MAPPING> &
WithModules<REPLIES, M, F, S, RESP, TYPE_MAPPING> &
WithFunctions<REPLIES, M, F, S, RESP, TYPE_MAPPING> &
WithScripts<REPLIES, M, F, S, RESP, TYPE_MAPPING>
);
export default class RedisSentinelMultiCommand<REPLIES = []> {
private static _createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return function (this: RedisSentinelMultiCommand, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args);
return this.addCommand(
command.IS_READ_ONLY,
redisArgs,
transformReply
);
};
}
private static _createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return function (this: { _self: RedisSentinelMultiCommand }, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args);
return this._self.addCommand(
command.IS_READ_ONLY,
redisArgs,
transformReply
);
};
}
private static _createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
return function (this: { _self: RedisSentinelMultiCommand }, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args);
const redisArgs: CommandArguments = prefix.concat(fnArgs);
redisArgs.preserve = fnArgs.preserve;
return this._self.addCommand(
fn.IS_READ_ONLY,
redisArgs,
transformReply
);
};
}
private static _createScriptCommand(script: RedisScript, resp: RespVersions) {
const transformReply = getTransformReply(script, resp);
return function (this: RedisSentinelMultiCommand, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args);
this._setState(
script.IS_READ_ONLY
);
this._multi.addScript(
script,
scriptArgs,
transformReply
);
return this;
};
}
static extend<
M extends RedisModules = Record<string, never>,
F extends RedisFunctions = Record<string, never>,
S extends RedisScripts = Record<string, never>,
RESP extends RespVersions = 2
>(config?: CommanderConfig<M, F, S, RESP>) {
return attachConfig({
BaseClass: RedisSentinelMultiCommand,
commands: COMMANDS,
createCommand: RedisSentinelMultiCommand._createCommand,
createModuleCommand: RedisSentinelMultiCommand._createModuleCommand,
createFunctionCommand: RedisSentinelMultiCommand._createFunctionCommand,
createScriptCommand: RedisSentinelMultiCommand._createScriptCommand,
config
});
}
private readonly _multi = new RedisMultiCommand();
private readonly _sentinel: RedisSentinelType
private _isReadonly: boolean | undefined = true;
constructor(sentinel: RedisSentinelType) {
this._sentinel = sentinel;
}
private _setState(
isReadonly: boolean | undefined,
) {
this._isReadonly &&= isReadonly;
}
addCommand(
isReadonly: boolean | undefined,
args: CommandArguments,
transformReply?: TransformReply
) {
this._setState(isReadonly);
this._multi.addCommand(args, transformReply);
return this;
}
async exec<T extends MultiReply = MULTI_REPLY['GENERIC']>(execAsPipeline = false) {
if (execAsPipeline) return this.execAsPipeline<T>();
return this._multi.transformReplies(
await this._sentinel._executeMulti(
this._isReadonly,
this._multi.queue
)
) as MultiReplyType<T, REPLIES>;
}
EXEC = this.exec;
execTyped(execAsPipeline = false) {
return this.exec<MULTI_REPLY['TYPED']>(execAsPipeline);
}
async execAsPipeline<T extends MultiReply = MULTI_REPLY['GENERIC']>() {
if (this._multi.queue.length === 0) return [] as MultiReplyType<T, REPLIES>;
return this._multi.transformReplies(
await this._sentinel._executePipeline(
this._isReadonly,
this._multi.queue
)
) as MultiReplyType<T, REPLIES>;
}
execAsPipelineTyped() {
return this.execAsPipeline<MULTI_REPLY['TYPED']>();
}
}

View File

@@ -0,0 +1,213 @@
import EventEmitter from 'node:events';
import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
import { RedisClientOptions } from '../client';
import { PUBSUB_TYPE, PubSubListener, PubSubTypeListeners } from '../client/pub-sub';
import { RedisNode } from './types';
import RedisClient from '../client';
type Client = RedisClient<
RedisModules,
RedisFunctions,
RedisScripts,
RespVersions,
TypeMapping
>;
type Subscriptions = Record<
PUBSUB_TYPE['CHANNELS'] | PUBSUB_TYPE['PATTERNS'],
PubSubTypeListeners
>;
type PubSubState = {
client: Client;
connectPromise: Promise<Client | undefined> | undefined;
};
type OnError = (err: unknown) => unknown;
export class PubSubProxy extends EventEmitter {
#clientOptions;
#onError;
#node?: RedisNode;
#state?: PubSubState;
#subscriptions?: Subscriptions;
constructor(clientOptions: RedisClientOptions, onError: OnError) {
super();
this.#clientOptions = clientOptions;
this.#onError = onError;
}
#createClient() {
if (this.#node === undefined) {
throw new Error("pubSubProxy: didn't define node to do pubsub against");
}
const options = { ...this.#clientOptions };
if (this.#clientOptions.socket) {
options.socket = { ...this.#clientOptions.socket };
} else {
options.socket = {};
}
options.socket.host = this.#node.host;
options.socket.port = this.#node.port;
return new RedisClient(options);
}
async #initiatePubSubClient(withSubscriptions = false) {
const client = this.#createClient()
.on('error', this.#onError);
const connectPromise = client.connect()
.then(async client => {
if (this.#state?.client !== client) {
// if pubsub was deactivated while connecting (`this.#pubSubClient === undefined`)
// or if the node changed (`this.#pubSubClient.client !== client`)
client.destroy();
return this.#state?.connectPromise;
}
if (withSubscriptions && this.#subscriptions) {
await Promise.all([
client.extendPubSubListeners(PUBSUB_TYPE.CHANNELS, this.#subscriptions[PUBSUB_TYPE.CHANNELS]),
client.extendPubSubListeners(PUBSUB_TYPE.PATTERNS, this.#subscriptions[PUBSUB_TYPE.PATTERNS])
]);
}
if (this.#state.client !== client) {
// if the node changed (`this.#pubSubClient.client !== client`)
client.destroy();
return this.#state?.connectPromise;
}
this.#state!.connectPromise = undefined;
return client;
})
.catch(err => {
this.#state = undefined;
throw err;
});
this.#state = {
client,
connectPromise
};
return connectPromise;
}
#getPubSubClient() {
if (!this.#state) return this.#initiatePubSubClient();
return (
this.#state.connectPromise ??
this.#state.client
);
}
async changeNode(node: RedisNode) {
this.#node = node;
if (!this.#state) return;
// if `connectPromise` is undefined, `this.#subscriptions` is already set
// and `this.#state.client` might not have the listeners set yet
if (this.#state.connectPromise === undefined) {
this.#subscriptions = {
[PUBSUB_TYPE.CHANNELS]: this.#state.client.getPubSubListeners(PUBSUB_TYPE.CHANNELS),
[PUBSUB_TYPE.PATTERNS]: this.#state.client.getPubSubListeners(PUBSUB_TYPE.PATTERNS)
};
this.#state.client.destroy();
}
await this.#initiatePubSubClient(true);
}
#executeCommand<T>(fn: (client: Client) => T) {
const client = this.#getPubSubClient();
if (client instanceof RedisClient) {
return fn(client);
}
return client.then(client => {
// if pubsub was deactivated while connecting
if (client === undefined) return;
return fn(client);
}).catch(err => {
if (this.#state?.client.isPubSubActive) {
this.#state.client.destroy();
this.#state = undefined;
}
throw err;
});
}
subscribe<T extends boolean = false>(
channels: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return this.#executeCommand(
client => client.SUBSCRIBE(channels, listener, bufferMode)
);
}
#unsubscribe<T>(fn: (client: Client) => Promise<T>) {
return this.#executeCommand(async client => {
const reply = await fn(client);
if (!client.isPubSubActive) {
client.destroy();
this.#state = undefined;
}
return reply;
});
}
async unsubscribe<T extends boolean = false>(
channels?: string | Array<string>,
listener?: PubSubListener<boolean>,
bufferMode?: T
) {
return this.#unsubscribe(client => client.UNSUBSCRIBE(channels, listener, bufferMode));
}
async pSubscribe<T extends boolean = false>(
patterns: string | Array<string>,
listener: PubSubListener<T>,
bufferMode?: T
) {
return this.#executeCommand(
client => client.PSUBSCRIBE(patterns, listener, bufferMode)
);
}
async pUnsubscribe<T extends boolean = false>(
patterns?: string | Array<string>,
listener?: PubSubListener<T>,
bufferMode?: T
) {
return this.#unsubscribe(client => client.PUNSUBSCRIBE(patterns, listener, bufferMode));
}
destroy() {
this.#subscriptions = undefined;
if (this.#state === undefined) return;
// `connectPromise` already handles the case of `this.#pubSubState = undefined`
if (!this.#state.connectPromise) {
this.#state.client.destroy();
}
this.#state = undefined;
}
}

View File

@@ -0,0 +1,605 @@
import { createConnection } from 'node:net';
import { setTimeout } from 'node:timers/promises';
import { once } from 'node:events';
import { promisify } from 'node:util';
import { exec } from 'node:child_process';
import { RedisSentinelOptions, RedisSentinelType } from './types';
import RedisClient from '../client';
import RedisSentinel from '.';
import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
const execAsync = promisify(exec);
import RedisSentinelModule from './module'
interface ErrorWithCode extends Error {
code: string;
}
async function isPortAvailable(port: number): Promise<boolean> {
var socket = undefined;
try {
socket = createConnection({ port });
await once(socket, 'connect');
} catch (err) {
if (err instanceof Error && (err as ErrorWithCode).code === 'ECONNREFUSED') {
return true;
}
} finally {
if (socket !== undefined) {
socket.end();
}
}
return false;
}
const portIterator = (async function* (): AsyncIterableIterator<number> {
for (let i = 6379; i < 65535; i++) {
if (await isPortAvailable(i)) {
yield i;
}
}
throw new Error('All ports are in use');
})();
export interface RedisServerDockerConfig {
image: string;
version: string;
}
export interface RedisServerDocker {
port: number;
dockerId: string;
}
abstract class DockerBase {
async spawnRedisServerDocker({ image, version }: RedisServerDockerConfig, serverArguments: Array<string>, environment?: string): Promise<RedisServerDocker> {
const port = (await portIterator.next()).value;
let cmdLine = `docker run --init -d --network host `;
if (environment !== undefined) {
cmdLine += `-e ${environment} `;
}
cmdLine += `${image}:${version} ${serverArguments.join(' ')}`;
cmdLine = cmdLine.replace('{port}', `--port ${port.toString()}`);
// console.log("spawnRedisServerDocker: cmdLine = " + cmdLine);
const { stdout, stderr } = await execAsync(cmdLine);
if (!stdout) {
throw new Error(`docker run error - ${stderr}`);
}
while (await isPortAvailable(port)) {
await setTimeout(50);
}
return {
port,
dockerId: stdout.trim()
};
}
async dockerRemove(dockerId: string): Promise<void> {
try {
await this.dockerStop(dockerId); ``
} catch (err) {
// its ok if stop failed, as we are just going to remove, will just be slower
console.log(`dockerStop failed in remove: ${err}`);
}
const { stderr } = await execAsync(`docker rm -f ${dockerId}`);
if (stderr) {
console.log("docker rm failed");
throw new Error(`docker rm error - ${stderr}`);
}
}
async dockerStop(dockerId: string): Promise<void> {
/* this is an optimization to get around slow docker stop times, but will fail if container is already stopped */
try {
await execAsync(`docker exec ${dockerId} /bin/bash -c "kill -SIGINT 1"`);
} catch (err) {
/* this will fail if container is already not running, can be ignored */
}
let ret = await execAsync(`docker stop ${dockerId}`);
if (ret.stderr) {
throw new Error(`docker stop error - ${ret.stderr}`);
}
}
async dockerStart(dockerId: string): Promise<void> {
const { stderr } = await execAsync(`docker start ${dockerId}`);
if (stderr) {
throw new Error(`docker start error - ${stderr}`);
}
}
}
export interface RedisSentinelConfig {
numberOfNodes?: number;
nodeDockerConfig?: RedisServerDockerConfig;
nodeServerArguments?: Array<string>
numberOfSentinels?: number;
sentinelDockerConfig?: RedisServerDockerConfig;
sentinelServerArgument?: Array<string>
sentinelName: string;
sentinelQuorum?: number;
password?: string;
}
type ArrayElement<ArrayType extends readonly unknown[]> =
ArrayType extends readonly (infer ElementType)[] ? ElementType : never;
export interface SentinelController {
getMaster(): Promise<string>;
getMasterPort(): Promise<number>;
getRandomNode(): string;
getRandonNonMasterNode(): Promise<string>;
getNodePort(id: string): number;
getAllNodesPort(): Array<number>;
getSentinelPort(id: string): number;
getAllSentinelsPort(): Array<number>;
getSetinel(i: number): string;
stopNode(id: string): Promise<void>;
restartNode(id: string): Promise<void>;
stopSentinel(id: string): Promise<void>;
restartSentinel(id: string): Promise<void>;
getSentinelClient(opts?: Partial<RedisSentinelOptions<{}, {}, {}, 2, {}>>): RedisSentinelType<{}, {}, {}, 2, {}>;
}
export class SentinelFramework extends DockerBase {
#nodeList: Awaited<ReturnType<SentinelFramework['spawnRedisSentinelNodes']>> = [];
/* port -> docker info/client */
#nodeMap: Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelNodes']>>>>;
#sentinelList: Awaited<ReturnType<SentinelFramework['spawnRedisSentinelSentinels']>> = [];
/* port -> docker info/client */
#sentinelMap: Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelSentinels']>>>>;
config: RedisSentinelConfig;
#spawned: boolean = false;
get spawned() {
return this.#spawned;
}
constructor(config: RedisSentinelConfig) {
super();
this.config = config;
this.#nodeMap = new Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelNodes']>>>>();
this.#sentinelMap = new Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelSentinels']>>>>();
}
getSentinelClient(opts?: Partial<RedisSentinelOptions<RedisModules,
RedisFunctions,
RedisScripts,
RespVersions,
TypeMapping>>, errors = true) {
if (opts?.sentinelRootNodes !== undefined) {
throw new Error("cannot specify sentinelRootNodes here");
}
if (opts?.name !== undefined) {
throw new Error("cannot specify sentinel db name here");
}
const options: RedisSentinelOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> = {
name: this.config.sentinelName,
sentinelRootNodes: this.#sentinelList.map((sentinel) => { return { host: '127.0.0.1', port: sentinel.docker.port } }),
passthroughClientErrorEvents: errors
}
if (this.config.password !== undefined) {
options.nodeClientOptions = {password: this.config.password};
options.sentinelClientOptions = {password: this.config.password};
}
if (opts) {
Object.assign(options, opts);
}
return RedisSentinel.create(options);
}
async spawnRedisSentinel() {
if (this.#spawned) {
return;
}
if (this.#nodeMap.size != 0 || this.#sentinelMap.size != 0) {
throw new Error("inconsistent state with partial setup");
}
this.#nodeList = await this.spawnRedisSentinelNodes();
this.#nodeList.map((value) => this.#nodeMap.set(value.docker.port.toString(), value));
this.#sentinelList = await this.spawnRedisSentinelSentinels();
this.#sentinelList.map((value) => this.#sentinelMap.set(value.docker.port.toString(), value));
this.#spawned = true;
}
async cleanup() {
if (!this.#spawned) {
return;
}
return Promise.all(
[...this.#nodeMap!.values(), ...this.#sentinelMap!.values()].map(
async ({ docker, client }) => {
if (client.isOpen) {
client.destroy();
}
this.dockerRemove(docker.dockerId);
}
)
).finally(async () => {
this.#spawned = false;
this.#nodeMap.clear();
this.#sentinelMap.clear();
});
}
protected async spawnRedisSentinelNodeDocker() {
const imageInfo: RedisServerDockerConfig = this.config.nodeDockerConfig ?? { image: "redis/redis-stack-server", version: "latest" };
const serverArguments: Array<string> = this.config.nodeServerArguments ?? [];
let environment;
if (this.config.password !== undefined) {
environment = `REDIS_ARGS="{port} --requirepass ${this.config.password}"`;
} else {
environment = 'REDIS_ARGS="{port}"';
}
const docker = await this.spawnRedisServerDocker(imageInfo, serverArguments, environment);
const client = await RedisClient.create({
password: this.config.password,
socket: {
port: docker.port
}
}).on("error", () => { }).connect();
return {
docker,
client
};
}
protected async spawnRedisSentinelNodes() {
const master = await this.spawnRedisSentinelNodeDocker();
const promises: Array<ReturnType<SentinelFramework['spawnRedisSentinelNodeDocker']>> = [];
for (let i = 0; i < (this.config.numberOfNodes ?? 0) - 1; i++) {
promises.push(
this.spawnRedisSentinelNodeDocker().then(async node => {
if (this.config.password !== undefined) {
await node.client.configSet({'masterauth': this.config.password})
}
await node.client.replicaOf('127.0.0.1', master.docker.port);
return node;
})
);
}
return [
master,
...await Promise.all(promises)
];
}
protected async spawnRedisSentinelSentinelDocker() {
const imageInfo: RedisServerDockerConfig = this.config.sentinelDockerConfig ?? { image: "redis", version: "latest" }
let serverArguments: Array<string>;
if (this.config.password === undefined) {
serverArguments = this.config.sentinelServerArgument ??
[
"/bin/bash",
"-c",
"\"touch /tmp/sentinel.conf ; /usr/local/bin/redis-sentinel /tmp/sentinel.conf {port} \""
];
} else {
serverArguments = this.config.sentinelServerArgument ??
[
"/bin/bash",
"-c",
`"touch /tmp/sentinel.conf ; /usr/local/bin/redis-sentinel /tmp/sentinel.conf {port} --requirepass ${this.config.password}"`
];
}
const docker = await this.spawnRedisServerDocker(imageInfo, serverArguments);
const client = await RedisClient.create({
modules: RedisSentinelModule,
password: this.config.password,
socket: {
port: docker.port
}
}).on("error", () => { }).connect();
return {
docker,
client
};
}
protected async spawnRedisSentinelSentinels() {
const quorum = this.config.sentinelQuorum?.toString() ?? "2";
const node = this.#nodeList[0];
const promises: Array<ReturnType<SentinelFramework['spawnRedisSentinelSentinelDocker']>> = [];
for (let i = 0; i < (this.config.numberOfSentinels ?? 3); i++) {
promises.push(
this.spawnRedisSentinelSentinelDocker().then(async sentinel => {
await sentinel.client.sentinel.sentinelMonitor(this.config.sentinelName, '127.0.0.1', node.docker.port.toString(), quorum);
const options: Array<{option: RedisArgument, value: RedisArgument}> = [];
options.push({ option: "down-after-milliseconds", value: "100" });
options.push({ option: "failover-timeout", value: "5000" });
if (this.config.password !== undefined) {
options.push({ option: "auth-pass", value: this.config.password });
}
await sentinel.client.sentinel.sentinelSet(this.config.sentinelName, options)
return sentinel;
})
);
}
return [
...await Promise.all(promises)
]
}
async getAllRunning() {
for (const port of this.getAllNodesPort()) {
let first = true;
while (await isPortAvailable(port)) {
if (!first) {
console.log(`problematic restart ${port}`);
await setTimeout(500);
} else {
first = false;
}
await this.restartNode(port.toString());
}
}
for (const port of this.getAllSentinelsPort()) {
let first = true;
while (await isPortAvailable(port)) {
if (!first) {
await setTimeout(500);
} else {
first = false;
}
await this.restartSentinel(port.toString());
}
}
}
async addSentinel() {
const quorum = this.config.sentinelQuorum?.toString() ?? "2";
const node = this.#nodeList[0];
const sentinel = await this.spawnRedisSentinelSentinelDocker();
await sentinel.client.sentinel.sentinelMonitor(this.config.sentinelName, '127.0.0.1', node.docker.port.toString(), quorum);
const options: Array<{option: RedisArgument, value: RedisArgument}> = [];
options.push({ option: "down-after-milliseconds", value: "100" });
options.push({ option: "failover-timeout", value: "5000" });
if (this.config.password !== undefined) {
options.push({ option: "auth-pass", value: this.config.password });
}
await sentinel.client.sentinel.sentinelSet(this.config.sentinelName, options);
this.#sentinelList.push(sentinel);
this.#sentinelMap.set(sentinel.docker.port.toString(), sentinel);
}
async addNode() {
const masterPort = await this.getMasterPort();
const newNode = await this.spawnRedisSentinelNodeDocker();
if (this.config.password !== undefined) {
await newNode.client.configSet({'masterauth': this.config.password})
}
await newNode.client.replicaOf('127.0.0.1', masterPort);
this.#nodeList.push(newNode);
this.#nodeMap.set(newNode.docker.port.toString(), newNode);
}
async getMaster(tracer?: Array<string>): Promise<string | undefined> {
for (const sentinel of this.#sentinelMap!.values()) {
let info;
try {
if (!sentinel.client.isReady) {
continue;
}
info = await sentinel.client.sentinel.sentinelMaster(this.config.sentinelName) as any;
if (tracer) {
tracer.push('getMaster: master data returned from sentinel');
tracer.push(JSON.stringify(info, undefined, '\t'))
}
} catch (err) {
console.log("getMaster: sentinelMaster call failed: " + err);
continue;
}
const master = this.#nodeMap.get(info.port);
if (master === undefined) {
throw new Error(`couldn't find master node for ${info.port}`);
}
if (tracer) {
tracer.push(`getMaster: master port is either ${info.port} or ${master.docker.port}`);
}
if (!master.client.isOpen) {
throw new Error(`Sentinel's expected master node (${info.port}) is now down`);
}
return info.port;
}
throw new Error("Couldn't get master");
}
async getMasterPort(tracer?: Array<string>): Promise<number> {
const data = await this.getMaster(tracer)
return this.#nodeMap.get(data!)!.docker.port;
}
getRandomNode() {
return this.#nodeList[Math.floor(Math.random() * this.#nodeList.length)].docker.port.toString();
}
async getRandonNonMasterNode(): Promise<string> {
const masterPort = await this.getMasterPort();
while (true) {
const node = this.#nodeList[Math.floor(Math.random() * this.#nodeList.length)];
if (node.docker.port != masterPort) {
return node.docker.port.toString();
}
}
}
async stopNode(id: string) {
// console.log(`stopping node ${id}`);
let node = this.#nodeMap.get(id);
if (node === undefined) {
throw new Error("unknown node: " + id);
}
if (node.client.isOpen) {
node.client.destroy();
}
return await this.dockerStop(node.docker.dockerId);
}
async restartNode(id: string) {
let node = this.#nodeMap.get(id);
if (node === undefined) {
throw new Error("unknown node: " + id);
}
await this.dockerStart(node.docker.dockerId);
if (!node.client.isOpen) {
node.client = await RedisClient.create({
password: this.config.password,
socket: {
port: node.docker.port
}
}).on("error", () => { }).connect();
}
}
async stopSentinel(id: string) {
let sentinel = this.#sentinelMap.get(id);
if (sentinel === undefined) {
throw new Error("unknown sentinel: " + id);
}
if (sentinel.client.isOpen) {
sentinel.client.destroy();
}
return await this.dockerStop(sentinel.docker.dockerId);
}
async restartSentinel(id: string) {
let sentinel = this.#sentinelMap.get(id);
if (sentinel === undefined) {
throw new Error("unknown sentinel: " + id);
}
await this.dockerStart(sentinel.docker.dockerId);
if (!sentinel.client.isOpen) {
sentinel.client = await RedisClient.create({
modules: RedisSentinelModule,
password: this.config.password,
socket: {
port: sentinel.docker.port
}
}).on("error", () => { }).connect();
}
}
getNodePort(id: string) {
let node = this.#nodeMap.get(id);
if (node === undefined) {
throw new Error("unknown node: " + id);
}
return node.docker.port;
}
getAllNodesPort() {
let ports: Array<number> = [];
for (const node of this.#nodeList) {
ports.push(node.docker.port);
}
return ports
}
getAllDockerIds() {
let ids = new Map<string, number>();
for (const node of this.#nodeList) {
ids.set(node.docker.dockerId, node.docker.port);
}
return ids;
}
getSentinelPort(id: string) {
let sentinel = this.#sentinelMap.get(id);
if (sentinel === undefined) {
throw new Error("unknown sentinel: " + id);
}
return sentinel.docker.port;
}
getAllSentinelsPort() {
let ports: Array<number> = [];
for (const sentinel of this.#sentinelList) {
ports.push(sentinel.docker.port);
}
return ports
}
getSetinel(i: number): string {
return this.#sentinelList[i].docker.port.toString();
}
sentinelSentinels() {
for (const sentinel of this.#sentinelList) {
if (sentinel.client.isReady) {
return sentinel.client.sentinel.sentinelSentinels(this.config.sentinelName);
}
}
}
sentinelMaster() {
for (const sentinel of this.#sentinelList) {
if (sentinel.client.isReady) {
return sentinel.client.sentinel.sentinelMaster(this.config.sentinelName);
}
}
}
sentinelReplicas() {
for (const sentinel of this.#sentinelList) {
if (sentinel.client.isReady) {
return sentinel.client.sentinel.sentinelReplicas(this.config.sentinelName);
}
}
}
}

View File

@@ -0,0 +1,174 @@
import { RedisClientOptions } from '../client';
import { CommandOptions } from '../client/commands-queue';
import { CommandSignature, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
import COMMANDS from '../commands';
import RedisSentinel, { RedisSentinelClient } from '.';
export interface RedisNode {
host: string;
port: number;
}
export interface RedisSentinelOptions<
M extends RedisModules = RedisModules,
F extends RedisFunctions = RedisFunctions,
S extends RedisScripts = RedisScripts,
RESP extends RespVersions = RespVersions,
TYPE_MAPPING extends TypeMapping = TypeMapping
> extends SentinelCommander<M, F, S, RESP, TYPE_MAPPING> {
/**
* The sentinel identifier for a particular database cluster
*/
name: string;
/**
* An array of root nodes that are part of the sentinel cluster, which will be used to get the topology. Each element in the array is a client configuration object. There is no need to specify every node in the cluster: 3 should be enough to reliably connect and obtain the sentinel configuration from the server
*/
sentinelRootNodes: Array<RedisNode>;
/**
* The maximum number of times a command will retry due to topology changes.
*/
maxCommandRediscovers?: number;
/**
* The configuration values for every node in the cluster. Use this for example when specifying an ACL user to connect with
*/
nodeClientOptions?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>;
/**
* The configuration values for every sentinel in the cluster. Use this for example when specifying an ACL user to connect with
*/
sentinelClientOptions?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>;
/**
* The number of clients connected to the master node
*/
masterPoolSize?: number;
/**
* The number of clients connected to each replica node.
* When greater than 0, the client will distribute the load by executing read-only commands (such as `GET`, `GEOSEARCH`, etc.) across all the cluster nodes.
*/
replicaPoolSize?: number;
/**
* TODO
*/
scanInterval?: number;
/**
* TODO
*/
passthroughClientErrorEvents?: boolean;
/**
* When `true`, one client will be reserved for the sentinel object.
* When `false`, the sentinel object will wait for the first available client from the pool.
*/
reserveClient?: boolean;
}
export interface SentinelCommander<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping,
// POLICIES extends CommandPolicies
> extends CommanderConfig<M, F, S, RESP> {
commandOptions?: CommandOptions<TYPE_MAPPING>;
}
export type RedisSentinelClientOptions = Omit<
RedisClientOptions,
keyof SentinelCommander<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping/*, CommandPolicies*/>
>;
type WithCommands<
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING>;
};
type WithModules<
M extends RedisModules,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof M]: {
[C in keyof M[P]]: CommandSignature<M[P][C], RESP, TYPE_MAPPING>;
};
};
type WithFunctions<
F extends RedisFunctions,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = {
[L in keyof F]: {
[C in keyof F[L]]: CommandSignature<F[L][C], RESP, TYPE_MAPPING>;
};
};
type WithScripts<
S extends RedisScripts,
RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof S]: CommandSignature<S[P], RESP, TYPE_MAPPING>;
};
export type RedisSentinelClientType<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = {},
> = (
RedisSentinelClient<M, F, S, RESP, TYPE_MAPPING> &
WithCommands<RESP, TYPE_MAPPING> &
WithModules<M, RESP, TYPE_MAPPING> &
WithFunctions<F, RESP, TYPE_MAPPING> &
WithScripts<S, RESP, TYPE_MAPPING>
);
export type RedisSentinelType<
M extends RedisModules = {},
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
TYPE_MAPPING extends TypeMapping = {},
// POLICIES extends CommandPolicies = {}
> = (
RedisSentinel<M, F, S, RESP, TYPE_MAPPING> &
WithCommands<RESP, TYPE_MAPPING> &
WithModules<M, RESP, TYPE_MAPPING> &
WithFunctions<F, RESP, TYPE_MAPPING> &
WithScripts<S, RESP, TYPE_MAPPING>
);
export interface SentinelCommandOptions<
TYPE_MAPPING extends TypeMapping = TypeMapping
> extends CommandOptions<TYPE_MAPPING> {}
export type ProxySentinel = RedisSentinel<any, any, any, any, any>;
export type ProxySentinelClient = RedisSentinelClient<any, any, any, any, any>;
export type NamespaceProxySentinel = { _self: ProxySentinel };
export type NamespaceProxySentinelClient = { _self: ProxySentinelClient };
export type NodeInfo = {
ip: any,
port: any,
flags: any,
};
export type RedisSentinelEvent = NodeChangeEvent | SizeChangeEvent;
export type NodeChangeEvent = {
type: "SENTINEL_CHANGE" | "MASTER_CHANGE" | "REPLICA_ADD" | "REPLICA_REMOVE";
node: RedisNode;
}
export type SizeChangeEvent = {
type: "SENTINE_LIST_CHANGE";
size: Number;
}
export type ClientErrorEvent = {
type: 'MASTER' | 'REPLICA' | 'SENTINEL' | 'PUBSUBPROXY';
node: RedisNode;
error: Error;
}

View File

@@ -0,0 +1,103 @@
import { Command, RedisFunction, RedisScript, RespVersions } from '../RESP/types';
import { RedisSocketOptions } from '../client/socket';
import { functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { NamespaceProxySentinel, NamespaceProxySentinelClient, NodeInfo, ProxySentinel, ProxySentinelClient, RedisNode } from './types';
/* TODO: should use map interface, would need a transform reply probably? as resp2 is list form, which this depends on */
export function parseNode(node: NodeInfo): RedisNode | undefined{
if (node.flags.includes("s_down") || node.flags.includes("disconnected") || node.flags.includes("failover_in_progress")) {
return undefined;
}
return { host: node.ip, port: Number(node.port) };
}
export function createNodeList(nodes: Array<NodeInfo>) {
var nodeList: Array<RedisNode> = [];
for (const nodeData of nodes) {
const node = parseNode(nodeData)
if (node === undefined) {
continue;
}
nodeList.push(node);
}
return nodeList;
}
export function clientSocketToNode(socket: RedisSocketOptions): RedisNode {
return {
host: socket.host!,
port: socket.port!
}
}
export function createCommand<T extends ProxySentinel | ProxySentinelClient>(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: T, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this._self.sendCommand(
command.IS_READ_ONLY,
redisArgs,
this._self.commandOptions
);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
}
export function createFunctionCommand<T extends NamespaceProxySentinel | NamespaceProxySentinelClient>(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
return async function (this: T, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args),
redisArgs = prefix.concat(fnArgs),
reply = await this._self._self.sendCommand(
fn.IS_READ_ONLY,
redisArgs,
this._self._self.commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
}
};
export function createModuleCommand<T extends NamespaceProxySentinel | NamespaceProxySentinelClient>(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: T, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this._self._self.sendCommand(
command.IS_READ_ONLY,
redisArgs,
this._self._self.commandOptions
);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
}
};
export function createScriptCommand<T extends ProxySentinel | ProxySentinelClient>(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp);
return async function (this: T, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this._self.executeScript(
script,
script.IS_READ_ONLY,
redisArgs,
this._self.commandOptions
);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
};
}

View File

@@ -0,0 +1,24 @@
import { SinglyLinkedList } from '../client/linked-list';
export class WaitQueue<T> {
#list = new SinglyLinkedList<T>();
#queue = new SinglyLinkedList<(item: T) => unknown>();
push(value: T) {
const resolve = this.#queue.shift();
if (resolve !== undefined) {
resolve(value);
return;
}
this.#list.push(value);
}
shift() {
return this.#list.shift();
}
wait() {
return new Promise<T>(resolve => this.#queue.push(resolve));
}
}