diff --git a/lib/client.ts b/lib/client.ts index 087d643225..89923a2232 100644 --- a/lib/client.ts +++ b/lib/client.ts @@ -332,7 +332,7 @@ export default class RedisClient, chainId?: symbol): Promise> { + #multiExecutor(commands: Array, chainId?: symbol): Promise> { const promise = Promise.all( commands.map(({encodedCommand}) => { return this.#queue.addEncodedCommand(encodedCommand, RedisClient.commandOptions({ @@ -343,7 +343,7 @@ export default class RedisClient { diff --git a/lib/errors.ts b/lib/errors.ts index 9040d7b589..1e58614c83 100644 --- a/lib/errors.ts +++ b/lib/errors.ts @@ -3,3 +3,9 @@ export class AbortError extends Error { super('The command was aborted'); } } + +export class WatchError extends Error { + constructor() { + super('One (or more) of the watched keys has been changed'); + } +} diff --git a/lib/multi-command.spec.ts b/lib/multi-command.spec.ts index 7b24b98e9c..bafda9fbb5 100644 --- a/lib/multi-command.spec.ts +++ b/lib/multi-command.spec.ts @@ -4,7 +4,7 @@ import { encodeCommand } from './commander'; describe('Multi Command', () => { it('exec', async () => { - const multi = RedisMultiCommand.create(queue => { + const multi = RedisMultiCommand.create((queue, symbol) => { assert.deepEqual( queue.map(({encodedCommand}) => encodedCommand), [ @@ -14,6 +14,11 @@ describe('Multi Command', () => { ] ); + assert.equal( + typeof symbol, + 'symbol' + ) + return Promise.resolve(['QUEUED', 'QUEUED', ['PONG']]); }); diff --git a/lib/multi-command.ts b/lib/multi-command.ts index 66b6102753..e9ad381368 100644 --- a/lib/multi-command.ts +++ b/lib/multi-command.ts @@ -3,6 +3,7 @@ import { RedisCommand, RedisModules, RedisReply } from './commands'; import { RedisLuaScript, RedisLuaScripts } from './lua-script'; import { RedisClientOptions } from './client'; import { extendWithModulesAndScripts, extendWithDefaultCommands, encodeCommand } from './commander'; +import { WatchError } from './errors'; type RedisMultiCommandSignature = (...args: Parameters) => RedisMultiCommandType; @@ -28,7 +29,7 @@ export interface MultiQueuedCommand { transformReply?: RedisCommand['transformReply']; } -export type RedisMultiExecutor = (queue: Array, chainId?: symbol) => Promise>; +export type RedisMultiExecutor = (queue: Array, chainId?: symbol) => Promise>; export default class RedisMultiCommand { static commandsExecutor(this: RedisMultiCommand, command: RedisCommand, args: Array): RedisMultiCommand { @@ -160,34 +161,57 @@ export default class RedisMultiCommand> { + async exec(execAsPipeline = false): Promise> { if (execAsPipeline) { return this.execAsPipeline(); } else if (!this.#queue.length) { return []; } - const queue = this.#queue.splice(0); - queue.unshift({ - encodedCommand: encodeCommand(['MULTI']) - }); - queue.push({ - encodedCommand: encodeCommand(['EXEC']) - }); + const queue = this.#queue.splice(0), + rawReplies = this.#handleNullReply( + await this.#executor([ + { + encodedCommand: encodeCommand(['MULTI']) + }, + ...queue, + { + encodedCommand: encodeCommand(['EXEC']) + } + ], Symbol('[RedisMultiCommand] Chain ID')) + ); - const rawReplies = await this.#executor(queue, Symbol('[RedisMultiCommand] Chain ID')); - return (rawReplies[rawReplies.length - 1]! as Array).map((reply, i) => { - const { transformReply, preservedArguments } = queue[i + 1]; - return transformReply ? transformReply(reply, preservedArguments) : reply; - }); + return this.#transformReplies( + rawReplies[rawReplies.length - 1] as Array, + queue + ); } - async execAsPipeline(): Promise> { + async execAsPipeline(): Promise> { if (!this.#queue.length) { return []; } - return await this.#executor(this.#queue.splice(0)); + const queue = this.#queue.splice(0); + return this.#transformReplies( + this.#handleNullReply(await this.#executor(queue)), + queue + ); + } + + #handleNullReply(reply: null | T): T { + if (reply === null) { + throw new WatchError(); + } + + return reply; + } + + #transformReplies(rawReplies: Array, queue: Array): Array { + return rawReplies.map((reply, i) => { + const { transformReply, preservedArguments } = queue[i]; + return transformReply ? transformReply(reply, preservedArguments) : reply; + }); } }