1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00

fix #2563 - add support for MONITOR

This commit is contained in:
Leibale
2023-12-05 10:02:07 -05:00
parent 520441704b
commit d8cb5de8b1
4 changed files with 177 additions and 44 deletions

View File

@@ -49,14 +49,18 @@ interface DecoderOptions {
}
export class Decoder {
private readonly _config;
onReply;
onErrorReply;
onPush;
getTypeMapping;
private _cursor = 0;
private _next;
constructor(config: DecoderOptions) {
this._config = config;
this.onReply = config.onReply;
this.onErrorReply = config.onErrorReply;
this.onPush = config.onPush;
this.getTypeMapping = config.getTypeMapping;
}
reset() {
@@ -99,102 +103,102 @@ export class Decoder {
private _decodeTypeValue(type, chunk) {
switch (type) {
case RESP_TYPES.NULL:
this._config.onReply(this._decodeNull());
this.onReply(this._decodeNull());
return false;
case RESP_TYPES.BOOLEAN:
return this._handleDecodedValue(
this._config.onReply,
this.onReply,
this._decodeBoolean(chunk)
);
case RESP_TYPES.NUMBER:
return this._handleDecodedValue(
this._config.onReply,
this.onReply,
this._decodeNumber(
this._config.getTypeMapping()[RESP_TYPES.NUMBER],
this.getTypeMapping()[RESP_TYPES.NUMBER],
chunk
)
);
case RESP_TYPES.BIG_NUMBER:
return this._handleDecodedValue(
this._config.onReply,
this.onReply,
this._decodeBigNumber(
this._config.getTypeMapping()[RESP_TYPES.BIG_NUMBER],
this.getTypeMapping()[RESP_TYPES.BIG_NUMBER],
chunk
)
);
case RESP_TYPES.DOUBLE:
return this._handleDecodedValue(
this._config.onReply,
this.onReply,
this._decodeDouble(
this._config.getTypeMapping()[RESP_TYPES.DOUBLE],
this.getTypeMapping()[RESP_TYPES.DOUBLE],
chunk
)
);
case RESP_TYPES.SIMPLE_STRING:
return this._handleDecodedValue(
this._config.onReply,
this.onReply,
this._decodeSimpleString(
this._config.getTypeMapping()[RESP_TYPES.SIMPLE_STRING],
this.getTypeMapping()[RESP_TYPES.SIMPLE_STRING],
chunk
)
);
case RESP_TYPES.BLOB_STRING:
return this._handleDecodedValue(
this._config.onReply,
this.onReply,
this._decodeBlobString(
this._config.getTypeMapping()[RESP_TYPES.BLOB_STRING],
this.getTypeMapping()[RESP_TYPES.BLOB_STRING],
chunk
)
);
case RESP_TYPES.VERBATIM_STRING:
return this._handleDecodedValue(
this._config.onReply,
this.onReply,
this._decodeVerbatimString(
this._config.getTypeMapping()[RESP_TYPES.VERBATIM_STRING],
this.getTypeMapping()[RESP_TYPES.VERBATIM_STRING],
chunk
)
);
case RESP_TYPES.SIMPLE_ERROR:
return this._handleDecodedValue(
this._config.onErrorReply,
this.onErrorReply,
this._decodeSimpleError(chunk)
);
case RESP_TYPES.BLOB_ERROR:
return this._handleDecodedValue(
this._config.onErrorReply,
this.onErrorReply,
this._decodeBlobError(chunk)
);
case RESP_TYPES.ARRAY:
return this._handleDecodedValue(
this._config.onReply,
this._decodeArray(this._config.getTypeMapping(), chunk)
this.onReply,
this._decodeArray(this.getTypeMapping(), chunk)
);
case RESP_TYPES.SET:
return this._handleDecodedValue(
this._config.onReply,
this._decodeSet(this._config.getTypeMapping(), chunk)
this.onReply,
this._decodeSet(this.getTypeMapping(), chunk)
);
case RESP_TYPES.MAP:
return this._handleDecodedValue(
this._config.onReply,
this._decodeMap(this._config.getTypeMapping(), chunk)
this.onReply,
this._decodeMap(this.getTypeMapping(), chunk)
);
case RESP_TYPES.PUSH:
return this._handleDecodedValue(
this._config.onPush,
this.onPush,
this._decodeArray(PUSH_TYPE_MAPPING, chunk)
);

View File

@@ -5,6 +5,7 @@ import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply } from '../errors';
import { EventEmitter } from 'node:stream';
import { MonitorCallback } from '.';
export interface CommandOptions<T = TypeMapping> {
chainId?: symbol;
@@ -23,6 +24,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
signal: AbortSignal;
listener: () => unknown;
};
resolveOnWrite?: boolean;
}
interface CommandWaitingForReply {
@@ -152,7 +154,32 @@ export default class RedisCommandsQueue {
});
}
addCommand<T>(args: CommandArguments, options?: CommandOptions): Promise<T> {
async monitor(callback: MonitorCallback, typeMapping: TypeMapping = {}, asap = false) {
await this.addCommand(
['MONITOR'],
{ asap },
true
);
const { onReply, getTypeMapping } = this.decoder;
this.decoder.onReply = callback;
this.decoder.getTypeMapping = () => typeMapping;
return () => new Promise<void>(async resolve => {
await this.addCommand(['RESET'], undefined, true);
this.decoder.onReply = (reply: string) => {
if (reply !== 'RESET') return callback(reply);
this.decoder.onReply = onReply;
this.decoder.getTypeMapping = getTypeMapping;
resolve();
};
});
}
addCommand<T>(
args: CommandArguments,
options?: CommandOptions,
resolveOnWrite?: boolean
): Promise<T> {
if (this._maxLength && this._toWrite.length + this._waitingForReply.length >= this._maxLength) {
return Promise.reject(new Error('The queue is full'));
} else if (options?.abortSignal?.aborted) {
@@ -164,10 +191,12 @@ export default class RedisCommandsQueue {
const value: CommandToWrite = {
args,
chainId: options?.chainId,
typeMapping: options?.typeMapping,
abort: undefined,
resolveOnWrite,
resolve,
reject,
abort: undefined
channelsCounter: undefined,
typeMapping: options?.typeMapping
};
const signal = options?.abortSignal;
@@ -245,16 +274,19 @@ export default class RedisCommandsQueue {
return new Promise<void>((resolve, reject) => {
this._toWrite.push({
args: command.args,
channelsCounter: command.channelsCounter,
typeMapping: PUSH_TYPE_MAPPING,
resolve: () => {
chainId: undefined,
abort: undefined,
resolveOnWrite: false,
resolve() {
command.resolve();
resolve();
},
reject: err => {
reject(err) {
command.reject?.();
reject(err);
}
},
channelsCounter: command.channelsCounter,
typeMapping: PUSH_TYPE_MAPPING
});
});
}
@@ -280,12 +312,18 @@ export default class RedisCommandsQueue {
toSend.abort = undefined;
}
// TODO reuse `toSend` or create new object?
(toSend as any).args = undefined;
(toSend as any).chainId = undefined;
if (toSend.resolveOnWrite) {
toSend.resolve();
} else {
// TODO reuse `toSend` or create new object?
(toSend as any).args = undefined;
this._chainInExecution = toSend.chainId;
toSend.chainId = undefined;
this._waitingForReply.push(toSend);
}
this._waitingForReply.push(toSend);
this._chainInExecution = toSend.chainId;
yield encoded;
toSend = this._toWrite.shift();
}

View File

@@ -5,7 +5,7 @@ import RedisClient, { RedisClientType } from '.';
// import { RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands';
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
import { defineScript } from '../lua-script';
// import { spy } from 'sinon';
import { spy } from 'sinon';
import { once } from 'node:events';
// import { ClientKillFilters } from '../commands/CLIENT_KILL';
// import { promisify } from 'node:util';
@@ -741,4 +741,66 @@ describe('Client', () => {
// },
// disableClientSetup: true
// });
describe('MONITOR', () => {
testUtils.testWithClient('should be able to monitor commands', async client => {
const duplicate = await client.duplicate().connect(),
listener = spy(message => assert.equal(typeof message, 'string'));
await duplicate.monitor(listener);
try {
await Promise.all([
waitTillBeenCalled(listener),
client.ping()
]);
} finally {
duplicate.destroy();
}
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('should keep monitoring after reconnection', async client => {
const duplicate = await client.duplicate().connect(),
listener = spy(message => assert.equal(typeof message, 'string'));
await duplicate.monitor(listener);
try {
await Promise.all([
once(duplicate, 'error'),
client.clientKill({
filter: 'SKIPME',
skipMe: true
})
]);
await Promise.all([
waitTillBeenCalled(listener),
client.ping()
]);
} finally {
duplicate.destroy();
}
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('should be able to go back to "normal mode"', async client => {
const off = await client.monitor(() => {});
await off();
await assert.doesNotReject(client.ping());
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('should respect type mapping', async client => {
const duplicate = await client.duplicate().connect(),
listener = spy(message => assert.ok(message instanceof Buffer));
await duplicate.withTypeMapping({
[RESP_TYPES.SIMPLE_STRING]: Buffer
}).monitor(listener);
try {
await Promise.all([
waitTillBeenCalled(listener),
client.ping()
]);
} finally {
duplicate.destroy();
}
}, GLOBAL.SERVERS.OPEN);
});
});

View File

@@ -7,7 +7,7 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchErr
import { URL } from 'node:url';
import { TcpSocketConnectOpts } from 'node:net';
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument, ReplyWithTypeMapping, SimpleStringReply } from '../RESP/types';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
import { RedisMultiQueuedCommand } from '../multi-command';
import HELLO, { HelloOptions } from '../commands/HELLO';
@@ -138,6 +138,8 @@ interface ScanIteratorOptions {
cursor?: RedisArgument;
}
export type MonitorCallback<TYPE_MAPPING extends TypeMapping = TypeMapping> = (reply: ReplyWithTypeMapping<SimpleStringReply, TYPE_MAPPING>) => unknown;
export default class RedisClient<
M extends RedisModules,
F extends RedisFunctions,
@@ -276,6 +278,7 @@ export default class RedisClient<
private readonly _socket: RedisSocket;
private readonly _queue: RedisCommandsQueue;
private _selectedDB = 0;
private _monitorCallback?: MonitorCallback<TYPE_MAPPING>;
private _commandOptions?: CommandOptions<TYPE_MAPPING>;
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
@@ -334,6 +337,16 @@ export default class RedisClient<
const socketInitiator = async (): Promise<void> => {
const promises = [];
if (this._monitorCallback) {
promises.push(
this._queue.monitor(
this._monitorCallback,
this._commandOptions?.typeMapping,
true
)
);
}
if (this._selectedDB !== 0) {
promises.push(
this._queue.addCommand(
@@ -855,6 +868,22 @@ export default class RedisClient<
} while (cursor !== '0');
}
async MONITOR(callback: MonitorCallback<TYPE_MAPPING>) {
const promise = this._queue.monitor(callback, this._commandOptions?.typeMapping);
this._scheduleWrite();
const off = await promise;
this._monitorCallback = callback;
return async () => {
const promise = off();
this._scheduleWrite();
await promise;
this._monitorCallback = undefined;
};
}
monitor = this.MONITOR;
/**
* @deprecated use .close instead
*/