diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 84ec7e52c9..df2c25dbc2 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -26,7 +26,7 @@ interface CommandWaitingToBeSent extends CommandWaitingForReply { interface CommandWaitingForReply { resolve(reply?: unknown): void; - reject(err: Error): void; + reject(err: unknown): void; channelsCounter?: number; returnBuffers?: boolean; } @@ -142,7 +142,9 @@ export default class RedisCommandsQueue { }, returnError: (err: Error) => this.#shiftWaitingForReply().reject(err) }); + #chainInExecution: symbol | undefined; + constructor(maxLength: number | null | undefined) { this.#maxLength = maxLength; } @@ -155,6 +157,7 @@ export default class RedisCommandsQueue { } else if (options?.signal?.aborted) { return Promise.reject(new AbortError()); } + return new Promise((resolve, reject) => { const node = new LinkedList.Node({ args, @@ -178,6 +181,7 @@ export default class RedisCommandsQueue { once: true }); } + if (options?.asap) { this.#waitingToBeSent.unshiftNode(node); } else { @@ -386,12 +390,13 @@ export default class RedisCommandsQueue { flushWaitingForReply(err: Error): void { RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); - if (!this.#chainInExecution) { - return; - } + + if (!this.#chainInExecution) return; + while (this.#waitingToBeSent.head?.value.chainId === this.#chainInExecution) { this.#waitingToBeSent.shift(); } + this.#chainInExecution = undefined; } diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index c5bfb32c03..1d627756c6 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -1,6 +1,6 @@ import { strict as assert } from 'assert'; import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; -import RedisClient, { ClientLegacyCommandArguments, RedisClientType } from '.'; +import RedisClient, { RedisClientType } from '.'; import { RedisClientMultiCommandType } from './multi-command'; import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisScripts } from '../commands'; import { AbortError, AuthError, ClientClosedError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors'; @@ -183,7 +183,7 @@ describe('Client', () => { } }); - function setAsync(client: RedisClientType, ...args: ClientLegacyCommandArguments): Promise { + function setAsync(client: RedisClientType, ...args: Array): Promise { return new Promise((resolve, reject) => { (client as any).set(...args, (err: Error | undefined, reply: RedisCommandRawReply) => { if (err) return reject(err); @@ -353,6 +353,18 @@ describe('Client', () => { ); }, GLOBAL.SERVERS.OPEN); }); + + testUtils.testWithClient('undefined and null should not break the client', async client => { + await assert.rejects( + client.sendCommand([null as any, undefined as any]), + 'ERR unknown command ``, with args beginning with: ``' + ); + + assert.equal( + await client.ping(), + 'PONG' + ); + }, GLOBAL.SERVERS.OPEN); }); describe('multi', () => { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 6144c98070..80f029a406 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -9,7 +9,7 @@ import { CommandOptions, commandOptions, isCommandOptions } from '../command-opt import { ScanOptions, ZMember } from '../commands/generic-transformers'; import { ScanCommandOptions } from '../commands/SCAN'; import { HScanTuple } from '../commands/HSCAN'; -import { extendWithCommands, extendWithModulesAndScripts, LegacyCommandArguments, transformCommandArguments, transformCommandReply, transformLegacyCommandArguments } from '../commander'; +import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply } from '../commander'; import { Pool, Options as PoolOptions, createPool } from 'generic-pool'; import { ClientClosedError, DisconnectsClientError, AuthError } from '../errors'; import { URL } from 'url'; @@ -83,7 +83,6 @@ export interface ClientCommandOptions extends QueueCommandOptions { type ClientLegacyCallback = (err: Error | null, reply?: RedisCommandRawReply) => void; -export type ClientLegacyCommandArguments = LegacyCommandArguments | [...LegacyCommandArguments, ClientLegacyCallback]; export default class RedisClient extends EventEmitter { static commandOptions(options: T): CommandOptions { return commandOptions(options); @@ -292,13 +291,13 @@ export default class RedisClient if (!this.#options?.legacyMode) return; (this as any).#v4.sendCommand = this.#sendCommand.bind(this); - (this as any).sendCommand = (...args: ClientLegacyCommandArguments): void => { + (this as any).sendCommand = (...args: Array): void => { let callback: ClientLegacyCallback; if (typeof args[args.length - 1] === 'function') { callback = args.pop() as ClientLegacyCallback; } - this.#sendCommand(transformLegacyCommandArguments(args as LegacyCommandArguments)) + this.#sendCommand(args.flat()) .then((reply: RedisCommandRawReply) => { if (!callback) return; diff --git a/packages/client/lib/client/multi-command.ts b/packages/client/lib/client/multi-command.ts index 601334fe6d..cce0b515f1 100644 --- a/packages/client/lib/client/multi-command.ts +++ b/packages/client/lib/client/multi-command.ts @@ -1,7 +1,7 @@ import COMMANDS from './commands'; import { RedisCommand, RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands'; import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command'; -import { extendWithCommands, extendWithModulesAndScripts, LegacyCommandArguments, transformLegacyCommandArguments } from '../commander'; +import { extendWithCommands, extendWithModulesAndScripts } from '../commander'; import { ExcludeMappedString } from '.'; type RedisClientMultiCommandSignature = @@ -53,8 +53,8 @@ export default class RedisClientMultiCommand { #legacyMode(): void { this.v4.addCommand = this.addCommand.bind(this); - (this as any).addCommand = (...args: LegacyCommandArguments): this => { - this.#multi.addCommand(transformLegacyCommandArguments(args)); + (this as any).addCommand = (...args: Array): this => { + this.#multi.addCommand(args.flat()); return this; }; this.v4.exec = this.exec.bind(this); diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index ccbe3f7f2c..d7b91e14c0 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -249,18 +249,16 @@ export default class RedisSocket extends EventEmitter { #isCorked = false; cork(): void { - if (!this.#socket) { + if (!this.#socket || this.#isCorked) { return; } - if (!this.#isCorked) { - this.#socket.cork(); - this.#isCorked = true; + this.#socket.cork(); + this.#isCorked = true; - queueMicrotask(() => { - this.#socket?.uncork(); - this.#isCorked = false; - }); - } + queueMicrotask(() => { + this.#socket?.uncork(); + this.#isCorked = false; + }); } } diff --git a/packages/client/lib/commander.spec.ts b/packages/client/lib/commander.spec.ts index b6ec100461..f0690f3736 100644 --- a/packages/client/lib/commander.spec.ts +++ b/packages/client/lib/commander.spec.ts @@ -2,42 +2,34 @@ import { strict as assert } from 'assert'; import { describe } from 'mocha'; import { encodeCommand } from './commander'; -function encodeCommandToString(...args: Parameters): string { - const arr = []; - for (const item of encodeCommand(...args)) { - arr.push(item.toString()); - } - - return arr.join(''); -} describe('Commander', () => { describe('encodeCommand (see #1628)', () => { it('1 byte', () => { - assert.equal( - encodeCommandToString(['a', 'z']), - '*2\r\n$1\r\na\r\n$1\r\nz\r\n' + assert.deepEqual( + [...encodeCommand(['a', 'z'])], + ['*2\r\n$1\r\na\r\n$1\r\nz\r\n'] ); }); it('2 bytes', () => { - assert.equal( - encodeCommandToString(['א', 'ת']), - '*2\r\n$2\r\nא\r\n$2\r\nת\r\n' + assert.deepEqual( + [...encodeCommand(['א', 'ת'])], + ['*2\r\n$2\r\nא\r\n$2\r\nת\r\n'] ); }); it('4 bytes', () => { - assert.equal( - encodeCommandToString(['🐣', '🐤']), - '*2\r\n$4\r\n🐣\r\n$4\r\n🐤\r\n' + assert.deepEqual( + [...encodeCommand(['🐣', '🐤'])], + ['*2\r\n$4\r\n🐣\r\n$4\r\n🐤\r\n'] ); }); it('with a buffer', () => { - assert.equal( - encodeCommandToString([Buffer.from('string')]), - '*1\r\n$6\r\nstring\r\n' + assert.deepEqual( + [...encodeCommand([Buffer.from('string')])], + ['*1\r\n$6\r\n', Buffer.from('string'), '\r\n'] ); }); }); diff --git a/packages/client/lib/commander.ts b/packages/client/lib/commander.ts index 2d129d679e..b3dfff0a99 100644 --- a/packages/client/lib/commander.ts +++ b/packages/client/lib/commander.ts @@ -95,25 +95,25 @@ export function* encodeCommand(args: RedisCommandArguments): IterableIterator 1024) { - yield strings; - strings = arg; - stringsLength = byteLength; - } else { - strings += arg; - stringsLength = totalLength; - } - } else { - yield strings; + if (Buffer.isBuffer(arg)) { + yield `${strings}$${arg.length}${DELIMITER}`; strings = ''; stringsLength = 0; yield arg; + } else { + const string = arg?.toString?.() ?? '', + byteLength = Buffer.byteLength(string); + strings += `$${byteLength}${DELIMITER}`; + + const totalLength = stringsLength + byteLength; + if (totalLength > 1024) { + yield strings; + strings = string; + stringsLength = byteLength; + } else { + strings += string; + stringsLength = totalLength; + } } strings += DELIMITER; @@ -133,18 +133,3 @@ export function transformCommandReply( return command.transformReply(rawReply, preserved); } - -export type LegacyCommandArguments = Array; - -export function transformLegacyCommandArguments(args: LegacyCommandArguments, flat: RedisCommandArguments = []): RedisCommandArguments { - for (const arg of args) { - if (Array.isArray(arg)) { - transformLegacyCommandArguments(arg, flat); - continue; - } - - flat.push(typeof arg === 'number' ? arg.toString() : arg); - } - - return flat; -}