1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-13 10:02:24 +03:00

fix #1636 - handle null in multi.exec

This commit is contained in:
leibale
2021-08-02 14:08:16 -04:00
parent 169912f44c
commit 27344e9626
4 changed files with 54 additions and 19 deletions

View File

@@ -332,7 +332,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
} }
} }
async #multiExecutor(commands: Array<MultiQueuedCommand>, chainId?: symbol): Promise<Array<RedisReply>> { #multiExecutor(commands: Array<MultiQueuedCommand>, chainId?: symbol): Promise<Array<RedisReply>> {
const promise = Promise.all( const promise = Promise.all(
commands.map(({encodedCommand}) => { commands.map(({encodedCommand}) => {
return this.#queue.addEncodedCommand(encodedCommand, RedisClient.commandOptions({ return this.#queue.addEncodedCommand(encodedCommand, RedisClient.commandOptions({
@@ -343,7 +343,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
this.#tick(); this.#tick();
return await promise; return promise;
} }
multi(): RedisMultiCommandType<M, S> { multi(): RedisMultiCommandType<M, S> {

View File

@@ -3,3 +3,9 @@ export class AbortError extends Error {
super('The command was aborted'); super('The command was aborted');
} }
} }
export class WatchError extends Error {
constructor() {
super('One (or more) of the watched keys has been changed');
}
}

View File

@@ -4,7 +4,7 @@ import { encodeCommand } from './commander';
describe('Multi Command', () => { describe('Multi Command', () => {
it('exec', async () => { it('exec', async () => {
const multi = RedisMultiCommand.create(queue => { const multi = RedisMultiCommand.create((queue, symbol) => {
assert.deepEqual( assert.deepEqual(
queue.map(({encodedCommand}) => encodedCommand), queue.map(({encodedCommand}) => encodedCommand),
[ [
@@ -14,6 +14,11 @@ describe('Multi Command', () => {
] ]
); );
assert.equal(
typeof symbol,
'symbol'
)
return Promise.resolve(['QUEUED', 'QUEUED', ['PONG']]); return Promise.resolve(['QUEUED', 'QUEUED', ['PONG']]);
}); });

View File

@@ -3,6 +3,7 @@ import { RedisCommand, RedisModules, RedisReply } from './commands';
import { RedisLuaScript, RedisLuaScripts } from './lua-script'; import { RedisLuaScript, RedisLuaScripts } from './lua-script';
import { RedisClientOptions } from './client'; import { RedisClientOptions } from './client';
import { extendWithModulesAndScripts, extendWithDefaultCommands, encodeCommand } from './commander'; import { extendWithModulesAndScripts, extendWithDefaultCommands, encodeCommand } from './commander';
import { WatchError } from './errors';
type RedisMultiCommandSignature<C extends RedisCommand, M extends RedisModules, S extends RedisLuaScripts> = (...args: Parameters<C['transformArguments']>) => RedisMultiCommandType<M, S>; type RedisMultiCommandSignature<C extends RedisCommand, M extends RedisModules, S extends RedisLuaScripts> = (...args: Parameters<C['transformArguments']>) => RedisMultiCommandType<M, S>;
@@ -28,7 +29,7 @@ export interface MultiQueuedCommand {
transformReply?: RedisCommand['transformReply']; transformReply?: RedisCommand['transformReply'];
} }
export type RedisMultiExecutor = (queue: Array<MultiQueuedCommand>, chainId?: symbol) => Promise<Array<RedisReply>>; export type RedisMultiExecutor = (queue: Array<MultiQueuedCommand>, chainId?: symbol) => Promise<null | Array<RedisReply>>;
export default class RedisMultiCommand<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> { export default class RedisMultiCommand<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> {
static commandsExecutor(this: RedisMultiCommand, command: RedisCommand, args: Array<unknown>): RedisMultiCommand { static commandsExecutor(this: RedisMultiCommand, command: RedisCommand, args: Array<unknown>): RedisMultiCommand {
@@ -160,34 +161,57 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
return this; return this;
} }
async exec(execAsPipeline = false): Promise<Array<unknown>> { async exec(execAsPipeline = false): Promise<Array<RedisReply>> {
if (execAsPipeline) { if (execAsPipeline) {
return this.execAsPipeline(); return this.execAsPipeline();
} else if (!this.#queue.length) { } else if (!this.#queue.length) {
return []; return [];
} }
const queue = this.#queue.splice(0); const queue = this.#queue.splice(0),
queue.unshift({ rawReplies = this.#handleNullReply(
encodedCommand: encodeCommand(['MULTI']) await this.#executor([
}); {
queue.push({ encodedCommand: encodeCommand(['MULTI'])
encodedCommand: encodeCommand(['EXEC']) },
}); ...queue,
{
encodedCommand: encodeCommand(['EXEC'])
}
], Symbol('[RedisMultiCommand] Chain ID'))
);
const rawReplies = await this.#executor(queue, Symbol('[RedisMultiCommand] Chain ID')); return this.#transformReplies(
return (rawReplies[rawReplies.length - 1]! as Array<RedisReply>).map((reply, i) => { rawReplies[rawReplies.length - 1] as Array<RedisReply>,
const { transformReply, preservedArguments } = queue[i + 1]; queue
return transformReply ? transformReply(reply, preservedArguments) : reply; );
});
} }
async execAsPipeline(): Promise<Array<unknown>> { async execAsPipeline(): Promise<Array<RedisReply>> {
if (!this.#queue.length) { if (!this.#queue.length) {
return []; return [];
} }
return await this.#executor(this.#queue.splice(0)); const queue = this.#queue.splice(0);
return this.#transformReplies(
this.#handleNullReply(await this.#executor(queue)),
queue
);
}
#handleNullReply<T>(reply: null | T): T {
if (reply === null) {
throw new WatchError();
}
return reply;
}
#transformReplies(rawReplies: Array<RedisReply>, queue: Array<MultiQueuedCommand>): Array<RedisReply> {
return rawReplies.map((reply, i) => {
const { transformReply, preservedArguments } = queue[i];
return transformReply ? transformReply(reply, preservedArguments) : reply;
});
} }
} }