From d8cb5de8b1268f8ec0c5df91d15c7d7f73374073 Mon Sep 17 00:00:00 2001 From: Leibale Date: Tue, 5 Dec 2023 10:02:07 -0500 Subject: [PATCH] fix #2563 - add support for MONITOR --- packages/client/lib/RESP/decoder.ts | 58 +++++++++-------- packages/client/lib/client/commands-queue.ts | 68 +++++++++++++++----- packages/client/lib/client/index.spec.ts | 64 +++++++++++++++++- packages/client/lib/client/index.ts | 31 ++++++++- 4 files changed, 177 insertions(+), 44 deletions(-) diff --git a/packages/client/lib/RESP/decoder.ts b/packages/client/lib/RESP/decoder.ts index 2fbadd0f5e..7e2a488587 100644 --- a/packages/client/lib/RESP/decoder.ts +++ b/packages/client/lib/RESP/decoder.ts @@ -49,14 +49,18 @@ interface DecoderOptions { } export class Decoder { - private readonly _config; - + onReply; + onErrorReply; + onPush; + getTypeMapping; private _cursor = 0; - private _next; constructor(config: DecoderOptions) { - this._config = config; + this.onReply = config.onReply; + this.onErrorReply = config.onErrorReply; + this.onPush = config.onPush; + this.getTypeMapping = config.getTypeMapping; } reset() { @@ -99,102 +103,102 @@ export class Decoder { private _decodeTypeValue(type, chunk) { switch (type) { case RESP_TYPES.NULL: - this._config.onReply(this._decodeNull()); + this.onReply(this._decodeNull()); return false; case RESP_TYPES.BOOLEAN: return this._handleDecodedValue( - this._config.onReply, + this.onReply, this._decodeBoolean(chunk) ); case RESP_TYPES.NUMBER: return this._handleDecodedValue( - this._config.onReply, + this.onReply, this._decodeNumber( - this._config.getTypeMapping()[RESP_TYPES.NUMBER], + this.getTypeMapping()[RESP_TYPES.NUMBER], chunk ) ); case RESP_TYPES.BIG_NUMBER: return this._handleDecodedValue( - this._config.onReply, + this.onReply, this._decodeBigNumber( - this._config.getTypeMapping()[RESP_TYPES.BIG_NUMBER], + this.getTypeMapping()[RESP_TYPES.BIG_NUMBER], chunk ) ); case RESP_TYPES.DOUBLE: return this._handleDecodedValue( - this._config.onReply, + this.onReply, this._decodeDouble( - this._config.getTypeMapping()[RESP_TYPES.DOUBLE], + this.getTypeMapping()[RESP_TYPES.DOUBLE], chunk ) ); case RESP_TYPES.SIMPLE_STRING: return this._handleDecodedValue( - this._config.onReply, + this.onReply, this._decodeSimpleString( - this._config.getTypeMapping()[RESP_TYPES.SIMPLE_STRING], + this.getTypeMapping()[RESP_TYPES.SIMPLE_STRING], chunk ) ); case RESP_TYPES.BLOB_STRING: return this._handleDecodedValue( - this._config.onReply, + this.onReply, this._decodeBlobString( - this._config.getTypeMapping()[RESP_TYPES.BLOB_STRING], + this.getTypeMapping()[RESP_TYPES.BLOB_STRING], chunk ) ); case RESP_TYPES.VERBATIM_STRING: return this._handleDecodedValue( - this._config.onReply, + this.onReply, this._decodeVerbatimString( - this._config.getTypeMapping()[RESP_TYPES.VERBATIM_STRING], + this.getTypeMapping()[RESP_TYPES.VERBATIM_STRING], chunk ) ); case RESP_TYPES.SIMPLE_ERROR: return this._handleDecodedValue( - this._config.onErrorReply, + this.onErrorReply, this._decodeSimpleError(chunk) ); case RESP_TYPES.BLOB_ERROR: return this._handleDecodedValue( - this._config.onErrorReply, + this.onErrorReply, this._decodeBlobError(chunk) ); case RESP_TYPES.ARRAY: return this._handleDecodedValue( - this._config.onReply, - this._decodeArray(this._config.getTypeMapping(), chunk) + this.onReply, + this._decodeArray(this.getTypeMapping(), chunk) ); case RESP_TYPES.SET: return this._handleDecodedValue( - this._config.onReply, - this._decodeSet(this._config.getTypeMapping(), chunk) + this.onReply, + this._decodeSet(this.getTypeMapping(), chunk) ); case RESP_TYPES.MAP: return this._handleDecodedValue( - this._config.onReply, - this._decodeMap(this._config.getTypeMapping(), chunk) + this.onReply, + this._decodeMap(this.getTypeMapping(), chunk) ); case RESP_TYPES.PUSH: return this._handleDecodedValue( - this._config.onPush, + this.onPush, this._decodeArray(PUSH_TYPE_MAPPING, chunk) ); diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 5a25674a7e..dcfbd335fe 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -5,6 +5,7 @@ import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply } from '../errors'; import { EventEmitter } from 'node:stream'; +import { MonitorCallback } from '.'; export interface CommandOptions { chainId?: symbol; @@ -23,6 +24,7 @@ export interface CommandToWrite extends CommandWaitingForReply { signal: AbortSignal; listener: () => unknown; }; + resolveOnWrite?: boolean; } interface CommandWaitingForReply { @@ -151,8 +153,33 @@ export default class RedisCommandsQueue { } }); } + + async monitor(callback: MonitorCallback, typeMapping: TypeMapping = {}, asap = false) { + await this.addCommand( + ['MONITOR'], + { asap }, + true + ); - addCommand(args: CommandArguments, options?: CommandOptions): Promise { + const { onReply, getTypeMapping } = this.decoder; + this.decoder.onReply = callback; + this.decoder.getTypeMapping = () => typeMapping; + return () => new Promise(async resolve => { + await this.addCommand(['RESET'], undefined, true); + this.decoder.onReply = (reply: string) => { + if (reply !== 'RESET') return callback(reply); + this.decoder.onReply = onReply; + this.decoder.getTypeMapping = getTypeMapping; + resolve(); + }; + }); + } + + addCommand( + args: CommandArguments, + options?: CommandOptions, + resolveOnWrite?: boolean + ): Promise { if (this._maxLength && this._toWrite.length + this._waitingForReply.length >= this._maxLength) { return Promise.reject(new Error('The queue is full')); } else if (options?.abortSignal?.aborted) { @@ -164,10 +191,12 @@ export default class RedisCommandsQueue { const value: CommandToWrite = { args, chainId: options?.chainId, - typeMapping: options?.typeMapping, + abort: undefined, + resolveOnWrite, resolve, reject, - abort: undefined + channelsCounter: undefined, + typeMapping: options?.typeMapping }; const signal = options?.abortSignal; @@ -245,16 +274,19 @@ export default class RedisCommandsQueue { return new Promise((resolve, reject) => { this._toWrite.push({ args: command.args, - channelsCounter: command.channelsCounter, - typeMapping: PUSH_TYPE_MAPPING, - resolve: () => { + chainId: undefined, + abort: undefined, + resolveOnWrite: false, + resolve() { command.resolve(); resolve(); }, - reject: err => { + reject(err) { command.reject?.(); reject(err); - } + }, + channelsCounter: command.channelsCounter, + typeMapping: PUSH_TYPE_MAPPING }); }); } @@ -279,13 +311,19 @@ export default class RedisCommandsQueue { RedisCommandsQueue._removeAbortListener(toSend); toSend.abort = undefined; } - - // TODO reuse `toSend` or create new object? - (toSend as any).args = undefined; - (toSend as any).chainId = undefined; - - this._waitingForReply.push(toSend); - this._chainInExecution = toSend.chainId; + + if (toSend.resolveOnWrite) { + toSend.resolve(); + } else { + // TODO reuse `toSend` or create new object? + (toSend as any).args = undefined; + + this._chainInExecution = toSend.chainId; + toSend.chainId = undefined; + + this._waitingForReply.push(toSend); + } + yield encoded; toSend = this._toWrite.shift(); } diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 60e776b0dd..cfdfba0455 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -5,7 +5,7 @@ import RedisClient, { RedisClientType } from '.'; // import { RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands'; import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors'; import { defineScript } from '../lua-script'; -// import { spy } from 'sinon'; +import { spy } from 'sinon'; import { once } from 'node:events'; // import { ClientKillFilters } from '../commands/CLIENT_KILL'; // import { promisify } from 'node:util'; @@ -741,4 +741,66 @@ describe('Client', () => { // }, // disableClientSetup: true // }); + + describe('MONITOR', () => { + testUtils.testWithClient('should be able to monitor commands', async client => { + const duplicate = await client.duplicate().connect(), + listener = spy(message => assert.equal(typeof message, 'string')); + await duplicate.monitor(listener); + + try { + await Promise.all([ + waitTillBeenCalled(listener), + client.ping() + ]); + } finally { + duplicate.destroy(); + } + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should keep monitoring after reconnection', async client => { + const duplicate = await client.duplicate().connect(), + listener = spy(message => assert.equal(typeof message, 'string')); + await duplicate.monitor(listener); + + try { + await Promise.all([ + once(duplicate, 'error'), + client.clientKill({ + filter: 'SKIPME', + skipMe: true + }) + ]); + await Promise.all([ + waitTillBeenCalled(listener), + client.ping() + ]); + } finally { + duplicate.destroy(); + } + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should be able to go back to "normal mode"', async client => { + const off = await client.monitor(() => {}); + await off(); + await assert.doesNotReject(client.ping()); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should respect type mapping', async client => { + const duplicate = await client.duplicate().connect(), + listener = spy(message => assert.ok(message instanceof Buffer)); + await duplicate.withTypeMapping({ + [RESP_TYPES.SIMPLE_STRING]: Buffer + }).monitor(listener); + + try { + await Promise.all([ + waitTillBeenCalled(listener), + client.ping() + ]); + } finally { + duplicate.destroy(); + } + }, GLOBAL.SERVERS.OPEN); + }); }); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index c4bc2654e3..69b5ec6054 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -7,7 +7,7 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchErr import { URL } from 'node:url'; import { TcpSocketConnectOpts } from 'node:net'; import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub'; -import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; +import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument, ReplyWithTypeMapping, SimpleStringReply } from '../RESP/types'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; import HELLO, { HelloOptions } from '../commands/HELLO'; @@ -138,6 +138,8 @@ interface ScanIteratorOptions { cursor?: RedisArgument; } +export type MonitorCallback = (reply: ReplyWithTypeMapping) => unknown; + export default class RedisClient< M extends RedisModules, F extends RedisFunctions, @@ -276,6 +278,7 @@ export default class RedisClient< private readonly _socket: RedisSocket; private readonly _queue: RedisCommandsQueue; private _selectedDB = 0; + private _monitorCallback?: MonitorCallback; private _commandOptions?: CommandOptions; get options(): RedisClientOptions | undefined { @@ -334,6 +337,16 @@ export default class RedisClient< const socketInitiator = async (): Promise => { const promises = []; + if (this._monitorCallback) { + promises.push( + this._queue.monitor( + this._monitorCallback, + this._commandOptions?.typeMapping, + true + ) + ); + } + if (this._selectedDB !== 0) { promises.push( this._queue.addCommand( @@ -855,6 +868,22 @@ export default class RedisClient< } while (cursor !== '0'); } + async MONITOR(callback: MonitorCallback) { + const promise = this._queue.monitor(callback, this._commandOptions?.typeMapping); + this._scheduleWrite(); + + const off = await promise; + this._monitorCallback = callback; + return async () => { + const promise = off(); + this._scheduleWrite(); + await promise; + this._monitorCallback = undefined; + }; + } + + monitor = this.MONITOR; + /** * @deprecated use .close instead */