1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-09 00:22:08 +03:00

fix #1650 - add support for Buffer in some commands, add GET_BUFFER command

This commit is contained in:
leibale
2021-09-13 19:49:39 -04:00
parent 1413a69a6b
commit 08837c8648
65 changed files with 300 additions and 227 deletions

View File

@@ -1,6 +1,6 @@
import RedisSocket, { RedisSocketOptions } from './socket';
import RedisCommandsQueue, { PubSubListener, PubSubSubscribeCommands, PubSubUnsubscribeCommands, QueueCommandOptions } from './commands-queue';
import COMMANDS from './commands';
import COMMANDS, { TransformArgumentsReply } from './commands';
import { RedisCommand, RedisModules, RedisReply } from './commands';
import RedisMultiCommand, { MultiQueuedCommand, RedisMultiCommandType } from './multi-command';
import EventEmitter from 'events';
@@ -62,12 +62,10 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
): Promise<ReturnType<typeof command['transformReply']>> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(command, args);
const reply = command.transformReply(
await this.#sendCommand(redisArgs, options),
redisArgs.preserve
return command.transformReply(
await this.#sendCommand(redisArgs, options, command.BUFFER_MODE),
redisArgs.preserve,
);
return reply;
}
static async #scriptsExecutor(
@@ -77,12 +75,10 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
): Promise<typeof script['transformArguments']> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(script, args);
const reply = script.transformReply(
await this.executeScript(script, redisArgs, options),
return script.transformReply(
await this.executeScript(script, redisArgs, options, script.BUFFER_MODE),
redisArgs.preserve
);
return reply;
}
static create<M extends RedisModules, S extends RedisLuaScripts>(options?: RedisClientOptions<M, S>): RedisClientType<M, S> {
@@ -182,10 +178,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
}
#initiateQueue(): RedisCommandsQueue {
return new RedisCommandsQueue(
this.#options?.commandsQueueMaxLength,
encodedCommands => this.#socket.write(encodedCommands)
);
return new RedisCommandsQueue(this.#options?.commandsQueueMaxLength);
}
#legacyMode(): void {
@@ -299,7 +292,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
QUIT(): Promise<void> {
return this.#socket.quit(() => {
const promise = this.#queue.addEncodedCommand(encodeCommand(['QUIT']));
const promise = this.#queue.addCommand(['QUIT']);
this.#tick();
return promise;
});
@@ -307,46 +300,64 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
quit = this.QUIT;
sendCommand<T = unknown>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
return this.#sendCommand(args, options);
sendCommand<T = RedisReply>(args: TransformArgumentsReply, options?: ClientCommandOptions, bufferMode?: boolean): Promise<T> {
return this.#sendCommand(args, options, bufferMode);
}
// using `#sendCommand` cause `sendCommand` is overwritten in legacy mode
#sendCommand<T = RedisReply>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
return this.sendEncodedCommand(encodeCommand(args), options);
}
async sendEncodedCommand<T = RedisReply>(encodedCommand: string, options?: ClientCommandOptions): Promise<T> {
async #sendCommand<T = RedisReply>(args: TransformArgumentsReply, options?: ClientCommandOptions, bufferMode?: boolean): Promise<T> {
if (!this.#socket.isOpen) {
throw new ClientClosedError();
}
if (options?.isolated) {
return this.executeIsolated(isolatedClient =>
isolatedClient.sendEncodedCommand(encodedCommand, {
isolatedClient.sendCommand(args, {
...options,
isolated: false
})
);
}
const promise = this.#queue.addEncodedCommand<T>(encodedCommand, options);
const promise = this.#queue.addCommand<T>(args, options, bufferMode);
this.#tick();
return await promise;
}
#tick(): void {
if (!this.#socket.isSocketExists) {
return;
}
this.#socket.cork();
while (true) {
const args = this.#queue.getCommandToSend();
if (args === undefined) break;
let writeResult;
for (const toWrite of encodeCommand(args)) {
writeResult = this.#socket.write(toWrite);
}
if (!writeResult) {
break;
}
}
}
executeIsolated<T>(fn: (client: RedisClientType<M, S>) => T | Promise<T>): Promise<T> {
return this.#isolationPool.use(fn);
}
async executeScript(script: RedisLuaScript, args: Array<string>, options?: ClientCommandOptions): Promise<ReturnType<typeof script['transformReply']>> {
async executeScript(script: RedisLuaScript, args: TransformArgumentsReply, options?: ClientCommandOptions, bufferMode?: boolean): Promise<ReturnType<typeof script['transformReply']>> {
try {
return await this.#sendCommand([
'EVALSHA',
script.SHA1,
script.NUMBER_OF_KEYS.toString(),
...args
], options);
], options, bufferMode);
} catch (err: any) {
if (!err?.message?.startsWith?.('NOSCRIPT')) {
throw err;
@@ -357,14 +368,14 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
script.SCRIPT,
script.NUMBER_OF_KEYS.toString(),
...args
], options);
], options, bufferMode);
}
}
#multiExecutor(commands: Array<MultiQueuedCommand>, chainId?: symbol): Promise<Array<RedisReply>> {
const promise = Promise.all(
commands.map(({encodedCommand}) => {
return this.#queue.addEncodedCommand(encodedCommand, RedisClient.commandOptions({
commands.map(({ args }) => {
return this.#queue.addCommand(args, RedisClient.commandOptions({
chainId
}));
})
@@ -438,31 +449,6 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
await this.#isolationPool.drain();
await this.#isolationPool.clear();
}
#isTickQueued = false;
#tick(): void {
const {chunkRecommendedSize} = this.#socket;
if (!chunkRecommendedSize) {
return;
}
if (!this.#isTickQueued && this.#queue.waitingToBeSentCommandsLength < chunkRecommendedSize) {
queueMicrotask(() => this.#tick());
this.#isTickQueued = true;
return;
}
const isBuffering = this.#queue.executeChunk(chunkRecommendedSize);
if (isBuffering === true) {
this.#socket.once('drain', () => this.#tick());
} else if (isBuffering === false) {
this.#tick();
return;
}
this.#isTickQueued = false;
}
}
extendWithDefaultCommands(RedisClient, RedisClient.commandsExecutor);