You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-04 15:02:09 +03:00
Execute empty MULTI (#2423)
* Fix multi.exec with empty queue and previous watch When calling exec on a multi instance which you did not use, no command is sent currently. This is a problem for watched keys, because no EXEC means no unwatch, which might cause hard-to-debug problems. Proposed Fix: Sending UNWATCH * execute empty multi command (instead of skipping) * Update index.ts * Update index.ts * Update multi-command.ts * Update multi-command.ts * Update multi-command.ts * Update multi-command.ts * short circuit empty pipelines * Update index.ts --------- Co-authored-by: Leibale <me@leibale.com>
This commit is contained in:
@@ -518,14 +518,23 @@ describe('Client', () => {
|
|||||||
);
|
);
|
||||||
}, GLOBAL.SERVERS.OPEN);
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
|
|
||||||
testUtils.testWithClient('execAsPipeline', async client => {
|
describe('execAsPipeline', () => {
|
||||||
assert.deepEqual(
|
testUtils.testWithClient('exec(true)', async client => {
|
||||||
await client.multi()
|
assert.deepEqual(
|
||||||
.ping()
|
await client.multi()
|
||||||
.exec(true),
|
.ping()
|
||||||
['PONG']
|
.exec(true),
|
||||||
);
|
['PONG']
|
||||||
}, GLOBAL.SERVERS.OPEN);
|
);
|
||||||
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
|
|
||||||
|
testUtils.testWithClient('empty execAsPipeline', async client => {
|
||||||
|
assert.deepEqual(
|
||||||
|
await client.multi().execAsPipeline(),
|
||||||
|
[]
|
||||||
|
);
|
||||||
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
|
});
|
||||||
|
|
||||||
testUtils.testWithClient('should remember selected db', async client => {
|
testUtils.testWithClient('should remember selected db', async client => {
|
||||||
await client.multi()
|
await client.multi()
|
||||||
|
@@ -460,7 +460,7 @@ export default class RedisClient<
|
|||||||
);
|
);
|
||||||
} else if (!this.#socket.isReady && this.#options?.disableOfflineQueue) {
|
} else if (!this.#socket.isReady && this.#options?.disableOfflineQueue) {
|
||||||
return Promise.reject(new ClientOfflineError());
|
return Promise.reject(new ClientOfflineError());
|
||||||
}
|
}
|
||||||
|
|
||||||
const promise = this.#queue.addCommand<T>(args, options);
|
const promise = this.#queue.addCommand<T>(args, options);
|
||||||
this.#tick();
|
this.#tick();
|
||||||
@@ -725,11 +725,14 @@ export default class RedisClient<
|
|||||||
return Promise.reject(new ClientClosedError());
|
return Promise.reject(new ClientClosedError());
|
||||||
}
|
}
|
||||||
|
|
||||||
const promise = Promise.all(
|
const promise = chainId ?
|
||||||
commands.map(({ args }) => {
|
// if `chainId` has a value, it's a `MULTI` (and not "pipeline") - need to add the `MULTI` and `EXEC` commands
|
||||||
return this.#queue.addCommand(args, { chainId });
|
Promise.all([
|
||||||
})
|
this.#queue.addCommand(['MULTI'], { chainId }),
|
||||||
);
|
this.#addMultiCommands(commands, chainId),
|
||||||
|
this.#queue.addCommand(['EXEC'], { chainId })
|
||||||
|
]) :
|
||||||
|
this.#addMultiCommands(commands);
|
||||||
|
|
||||||
this.#tick();
|
this.#tick();
|
||||||
|
|
||||||
@@ -742,6 +745,12 @@ export default class RedisClient<
|
|||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#addMultiCommands(commands: Array<RedisMultiQueuedCommand>, chainId?: symbol) {
|
||||||
|
return Promise.all(
|
||||||
|
commands.map(({ args }) => this.#queue.addCommand(args, { chainId }))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
async* scanIterator(options?: ScanCommandOptions): AsyncIterable<string> {
|
async* scanIterator(options?: ScanCommandOptions): AsyncIterable<string> {
|
||||||
let cursor = 0;
|
let cursor = 0;
|
||||||
do {
|
do {
|
||||||
|
@@ -170,12 +170,9 @@ export default class RedisClientMultiCommand {
|
|||||||
return this.execAsPipeline();
|
return this.execAsPipeline();
|
||||||
}
|
}
|
||||||
|
|
||||||
const commands = this.#multi.exec();
|
|
||||||
if (!commands) return [];
|
|
||||||
|
|
||||||
return this.#multi.handleExecReplies(
|
return this.#multi.handleExecReplies(
|
||||||
await this.#executor(
|
await this.#executor(
|
||||||
commands,
|
this.#multi.queue,
|
||||||
this.#selectedDB,
|
this.#selectedDB,
|
||||||
RedisMultiCommand.generateChainId()
|
RedisMultiCommand.generateChainId()
|
||||||
)
|
)
|
||||||
@@ -185,6 +182,8 @@ export default class RedisClientMultiCommand {
|
|||||||
EXEC = this.exec;
|
EXEC = this.exec;
|
||||||
|
|
||||||
async execAsPipeline(): Promise<Array<RedisCommandRawReply>> {
|
async execAsPipeline(): Promise<Array<RedisCommandRawReply>> {
|
||||||
|
if (this.#multi.queue.length === 0) return [];
|
||||||
|
|
||||||
return this.#multi.transformReplies(
|
return this.#multi.transformReplies(
|
||||||
await this.#executor(
|
await this.#executor(
|
||||||
this.#multi.queue,
|
this.#multi.queue,
|
||||||
|
@@ -120,11 +120,8 @@ export default class RedisClusterMultiCommand {
|
|||||||
return this.execAsPipeline();
|
return this.execAsPipeline();
|
||||||
}
|
}
|
||||||
|
|
||||||
const commands = this.#multi.exec();
|
|
||||||
if (!commands) return [];
|
|
||||||
|
|
||||||
return this.#multi.handleExecReplies(
|
return this.#multi.handleExecReplies(
|
||||||
await this.#executor(commands, this.#firstKey, RedisMultiCommand.generateChainId())
|
await this.#executor(this.#multi.queue, this.#firstKey, RedisMultiCommand.generateChainId())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -46,24 +46,23 @@ describe('Multi Command', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe('exec', () => {
|
describe('exec', () => {
|
||||||
it('undefined', () => {
|
it('without commands', () => {
|
||||||
assert.equal(
|
assert.deepEqual(
|
||||||
new RedisMultiCommand().exec(),
|
new RedisMultiCommand().queue,
|
||||||
undefined
|
[]
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('Array', () => {
|
it('with commands', () => {
|
||||||
const multi = new RedisMultiCommand();
|
const multi = new RedisMultiCommand();
|
||||||
multi.addCommand(['PING']);
|
multi.addCommand(['PING']);
|
||||||
|
|
||||||
assert.deepEqual(
|
assert.deepEqual(
|
||||||
multi.exec(),
|
multi.queue,
|
||||||
[
|
[{
|
||||||
{ args: ['MULTI'] },
|
args: ['PING'],
|
||||||
{ args: ['PING'], transformReply: undefined },
|
transformReply: undefined
|
||||||
{ args: ['EXEC'] }
|
}]
|
||||||
]
|
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@@ -69,18 +69,6 @@ export default class RedisMultiCommand {
|
|||||||
return transformedArguments;
|
return transformedArguments;
|
||||||
}
|
}
|
||||||
|
|
||||||
exec(): undefined | Array<RedisMultiQueuedCommand> {
|
|
||||||
if (!this.queue.length) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
return [
|
|
||||||
{ args: ['MULTI'] },
|
|
||||||
...this.queue,
|
|
||||||
{ args: ['EXEC'] }
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
handleExecReplies(rawReplies: Array<RedisCommandRawReply>): Array<RedisCommandRawReply> {
|
handleExecReplies(rawReplies: Array<RedisCommandRawReply>): Array<RedisCommandRawReply> {
|
||||||
const execReply = rawReplies[rawReplies.length - 1] as (null | Array<RedisCommandRawReply>);
|
const execReply = rawReplies[rawReplies.length - 1] as (null | Array<RedisCommandRawReply>);
|
||||||
if (execReply === null) {
|
if (execReply === null) {
|
||||||
|
Reference in New Issue
Block a user