1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-01 16:46:54 +03:00

New RESP2 parser (#1899)

* parser

* a new RESP parser :)

* clean code

* fix simple string and bulk string cursor

* performance improvements

* change typescript compiler target

* do not use stream.Transform

* Update decoder.ts

* fix for 1d09acb

* improve integer performance

* revert 1d09acb

* improve RESP2 decoder performance

* improve performance

* improve encode performance

* remove unused import

* upgrade benchmark deps

* clean code

* fix socket error handlers, reset parser on error

* fix #2080 - reset pubSubState on socket error

* reset decoder on socket error

* fix pubsub

* fix "RedisSocketInitiator"

* fix returnStringsAsBuffers

* fix merge
This commit is contained in:
Leibale Eidelman
2022-04-25 08:24:33 -04:00
committed by GitHub
parent b1a0b48d2c
commit 23b65133c9
21 changed files with 709 additions and 163 deletions

View File

@ -20,7 +20,6 @@
"dependencies": {
"cluster-key-slot": "1.1.0",
"generic-pool": "3.8.2",
"redis-parser": "3.0.0",
"yallist": "4.0.0"
},
"devDependencies": {
@ -316,6 +315,14 @@
"@node-redis/time-series": "1.0.2"
}
},
"node_modules/redis-v3/node_modules/denque": {
"version": "1.5.1",
"resolved": "https://registry.npmjs.org/denque/-/denque-1.5.1.tgz",
"integrity": "sha512-XwE+iZ4D6ZUB7mfYRMb5wByE8L74HCn30FBN7sWnXksWc1LO1bPDl67pBR9o/kC4z/xSNAwkMYcGgqDV3BE3Hw==",
"engines": {
"node": ">=0.10"
}
},
"node_modules/require-directory": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz",
@ -395,9 +402,9 @@
}
},
"node_modules/yargs-parser": {
"version": "21.0.0",
"resolved": "https://registry.npmjs.org/yargs-parser/-/yargs-parser-21.0.0.tgz",
"integrity": "sha512-z9kApYUOCwoeZ78rfRYYWdiU/iNL6mwwYlkkZfJoyMR1xps+NEBX5X7XmRpxkZHhXJ6+Ey00IwKxBBSW9FIjyA==",
"version": "21.0.1",
"resolved": "https://registry.npmjs.org/yargs-parser/-/yargs-parser-21.0.1.tgz",
"integrity": "sha512-9BK1jFpLzJROCI5TzwZL/TU4gqjK5xiHV/RfWLOahrjAko/e4DJkRDZQXfvqAsiZzzYhgAzbgz6lg48jcm4GLg==",
"engines": {
"node": ">=12"
}
@ -678,9 +685,9 @@
}
},
"yargs-parser": {
"version": "21.0.0",
"resolved": "https://registry.npmjs.org/yargs-parser/-/yargs-parser-21.0.0.tgz",
"integrity": "sha512-z9kApYUOCwoeZ78rfRYYWdiU/iNL6mwwYlkkZfJoyMR1xps+NEBX5X7XmRpxkZHhXJ6+Ey00IwKxBBSW9FIjyA=="
"version": "21.0.1",
"resolved": "https://registry.npmjs.org/yargs-parser/-/yargs-parser-21.0.1.tgz",
"integrity": "sha512-9BK1jFpLzJROCI5TzwZL/TU4gqjK5xiHV/RfWLOahrjAko/e4DJkRDZQXfvqAsiZzzYhgAzbgz6lg48jcm4GLg=="
}
}
}

View File

@ -0,0 +1,14 @@
import { strict as assert } from 'assert';
import BufferComposer from './buffer';
describe('Buffer Composer', () => {
const composer = new BufferComposer();
it('should compose two buffers', () => {
composer.write(Buffer.from([0]));
assert.deepEqual(
composer.end(Buffer.from([1])),
Buffer.from([0, 1])
);
});
});

View File

@ -0,0 +1,18 @@
import { Composer } from './interface';
export default class BufferComposer implements Composer<Buffer> {
private chunks: Array<Buffer> = [];
write(buffer: Buffer): void {
this.chunks.push(buffer);
}
end(buffer: Buffer): Buffer {
this.write(buffer);
return Buffer.concat(this.chunks.splice(0));
}
reset() {
this.chunks = [];
}
}

View File

@ -0,0 +1,7 @@
export interface Composer<T> {
write(buffer: Buffer): void;
end(buffer: Buffer): T;
reset(): void;
}

View File

@ -0,0 +1,14 @@
import { strict as assert } from 'assert';
import StringComposer from './string';
describe('String Composer', () => {
const composer = new StringComposer();
it('should compose two strings', () => {
composer.write(Buffer.from([0]));
assert.deepEqual(
composer.end(Buffer.from([1])),
Buffer.from([0, 1]).toString()
);
});
});

View File

@ -0,0 +1,22 @@
import { StringDecoder } from 'string_decoder';
import { Composer } from './interface';
export default class StringComposer implements Composer<string> {
private decoder = new StringDecoder();
private string = '';
write(buffer: Buffer): void {
this.string += this.decoder.write(buffer);
}
end(buffer: Buffer): string {
const string = this.string + this.decoder.end(buffer);
this.string = '';
return string;
}
reset() {
this.string = '';
}
}

View File

@ -0,0 +1,195 @@
import { strict as assert } from 'assert';
import { SinonSpy, spy } from 'sinon';
import RESP2Decoder from './decoder';
import { ErrorReply } from '../../errors';
interface DecoderAndSpies {
decoder: RESP2Decoder;
returnStringsAsBuffersSpy: SinonSpy;
onReplySpy: SinonSpy;
}
function createDecoderAndSpies(returnStringsAsBuffers: boolean): DecoderAndSpies {
const returnStringsAsBuffersSpy = spy(() => returnStringsAsBuffers),
onReplySpy = spy();
return {
decoder: new RESP2Decoder({
returnStringsAsBuffers: returnStringsAsBuffersSpy,
onReply: onReplySpy
}),
returnStringsAsBuffersSpy,
onReplySpy
};
}
function writeChunks(stream: RESP2Decoder, buffer: Buffer) {
let i = 0;
while (i < buffer.length) {
stream.write(buffer.slice(i, ++i));
}
}
type Replies = Array<Array<unknown>>;
interface TestsOptions {
toWrite: Buffer;
returnStringsAsBuffers: boolean;
replies: Replies;
}
function generateTests({
toWrite,
returnStringsAsBuffers,
replies
}: TestsOptions): void {
it('single chunk', () => {
const { decoder, returnStringsAsBuffersSpy, onReplySpy } =
createDecoderAndSpies(returnStringsAsBuffers);
decoder.write(toWrite);
assert.equal(returnStringsAsBuffersSpy.callCount, replies.length);
testReplies(onReplySpy, replies);
});
it('multiple chunks', () => {
const { decoder, returnStringsAsBuffersSpy, onReplySpy } =
createDecoderAndSpies(returnStringsAsBuffers);
writeChunks(decoder, toWrite);
assert.equal(returnStringsAsBuffersSpy.callCount, replies.length);
testReplies(onReplySpy, replies);
});
}
function testReplies(spy: SinonSpy, replies: Replies): void {
if (!replies) {
assert.equal(spy.callCount, 0);
return;
}
assert.equal(spy.callCount, replies.length);
for (const [i, reply] of replies.entries()) {
assert.deepEqual(
spy.getCall(i).args,
reply
);
}
}
describe('RESP2Parser', () => {
describe('Simple String', () => {
describe('as strings', () => {
generateTests({
toWrite: Buffer.from('+OK\r\n'),
returnStringsAsBuffers: false,
replies: [['OK']]
});
});
describe('as buffers', () => {
generateTests({
toWrite: Buffer.from('+OK\r\n'),
returnStringsAsBuffers: true,
replies: [[Buffer.from('OK')]]
});
});
});
describe('Error', () => {
generateTests({
toWrite: Buffer.from('-ERR\r\n'),
returnStringsAsBuffers: false,
replies: [[new ErrorReply('ERR')]]
});
});
describe('Integer', () => {
describe('-1', () => {
generateTests({
toWrite: Buffer.from(':-1\r\n'),
returnStringsAsBuffers: false,
replies: [[-1]]
});
});
describe('0', () => {
generateTests({
toWrite: Buffer.from(':0\r\n'),
returnStringsAsBuffers: false,
replies: [[0]]
});
});
});
describe('Bulk String', () => {
describe('null', () => {
generateTests({
toWrite: Buffer.from('$-1\r\n'),
returnStringsAsBuffers: false,
replies: [[null]]
});
});
describe('as strings', () => {
generateTests({
toWrite: Buffer.from('$2\r\naa\r\n'),
returnStringsAsBuffers: false,
replies: [['aa']]
});
});
describe('as buffers', () => {
generateTests({
toWrite: Buffer.from('$2\r\naa\r\n'),
returnStringsAsBuffers: true,
replies: [[Buffer.from('aa')]]
});
});
});
describe('Array', () => {
describe('null', () => {
generateTests({
toWrite: Buffer.from('*-1\r\n'),
returnStringsAsBuffers: false,
replies: [[null]]
});
});
const arrayBuffer = Buffer.from(
'*5\r\n' +
'+OK\r\n' +
'-ERR\r\n' +
':0\r\n' +
'$1\r\na\r\n' +
'*0\r\n'
);
describe('as strings', () => {
generateTests({
toWrite: arrayBuffer,
returnStringsAsBuffers: false,
replies: [[[
'OK',
new ErrorReply('ERR'),
0,
'a',
[]
]]]
});
});
describe('as buffers', () => {
generateTests({
toWrite: arrayBuffer,
returnStringsAsBuffers: true,
replies: [[[
Buffer.from('OK'),
new ErrorReply('ERR'),
0,
Buffer.from('a'),
[]
]]]
});
});
});
});

View File

@ -0,0 +1,254 @@
import { ErrorReply } from '../../errors';
import { Composer } from './composers/interface';
import BufferComposer from './composers/buffer';
import StringComposer from './composers/string';
// RESP2 specification
// https://redis.io/topics/protocol
enum Types {
SIMPLE_STRING = 43, // +
ERROR = 45, // -
INTEGER = 58, // :
BULK_STRING = 36, // $
ARRAY = 42 // *
}
enum ASCII {
CR = 13, // \r
ZERO = 48,
MINUS = 45
}
export type Reply = string | Buffer | ErrorReply | number | null | Array<Reply>;
type ArrayReply = Array<Reply> | null;
export type ReturnStringsAsBuffers = () => boolean;
interface RESP2Options {
returnStringsAsBuffers: ReturnStringsAsBuffers;
onReply(reply: Reply): unknown;
}
interface ArrayInProcess {
array: Array<Reply>;
pushCounter: number;
}
// Using TypeScript `private` and not the build-in `#` to avoid __classPrivateFieldGet and __classPrivateFieldSet
export default class RESP2Decoder {
constructor(private options: RESP2Options) {}
private cursor = 0;
private type?: Types;
private bufferComposer = new BufferComposer();
private stringComposer = new StringComposer();
private currentStringComposer: BufferComposer | StringComposer = this.stringComposer;
reset() {
this.cursor = 0;
this.type = undefined;
this.bufferComposer.reset();
this.stringComposer.reset();
this.currentStringComposer = this.stringComposer;
}
write(chunk: Buffer): void {
while (this.cursor < chunk.length) {
if (!this.type) {
this.currentStringComposer = this.options.returnStringsAsBuffers() ?
this.bufferComposer :
this.stringComposer;
this.type = chunk[this.cursor];
if (++this.cursor >= chunk.length) break;
}
const reply = this.parseType(chunk, this.type);
if (reply === undefined) break;
this.type = undefined;
this.options.onReply(reply);
}
this.cursor -= chunk.length;
}
private parseType(chunk: Buffer, type: Types, arraysToKeep?: number): Reply | undefined {
switch (type) {
case Types.SIMPLE_STRING:
return this.parseSimpleString(chunk);
case Types.ERROR:
return this.parseError(chunk);
case Types.INTEGER:
return this.parseInteger(chunk);
case Types.BULK_STRING:
return this.parseBulkString(chunk);
case Types.ARRAY:
return this.parseArray(chunk, arraysToKeep);
}
}
private compose<
C extends Composer<T>,
T = C extends Composer<infer TT> ? TT : never
>(
chunk: Buffer,
composer: C
): T | undefined {
for (let i = this.cursor; i < chunk.length; i++) {
if (chunk[i] === ASCII.CR) {
const reply = composer.end(
chunk.subarray(this.cursor, i)
);
this.cursor = i + 2;
return reply;
}
}
const toWrite = chunk.subarray(this.cursor);
composer.write(toWrite);
this.cursor = chunk.length;
}
private parseSimpleString(chunk: Buffer): string | Buffer | undefined {
return this.compose(chunk, this.currentStringComposer);
}
private parseError(chunk: Buffer): ErrorReply | undefined {
const message = this.compose(chunk, this.stringComposer);
if (message !== undefined) {
return new ErrorReply(message);
}
}
private integer = 0;
private isNegativeInteger?: boolean;
private parseInteger(chunk: Buffer): number | undefined {
if (this.isNegativeInteger === undefined) {
this.isNegativeInteger = chunk[this.cursor] === ASCII.MINUS;
if (this.isNegativeInteger && ++this.cursor === chunk.length) return;
}
do {
const byte = chunk[this.cursor];
if (byte === ASCII.CR) {
const integer = this.isNegativeInteger ? -this.integer : this.integer;
this.integer = 0;
this.isNegativeInteger = undefined;
this.cursor += 2;
return integer;
}
this.integer = this.integer * 10 + byte - ASCII.ZERO;
} while (++this.cursor < chunk.length);
}
private bulkStringRemainingLength?: number;
private parseBulkString(chunk: Buffer): string | Buffer | null | undefined {
if (this.bulkStringRemainingLength === undefined) {
const length = this.parseInteger(chunk);
if (length === undefined) return;
if (length === -1) return null;
this.bulkStringRemainingLength = length;
if (this.cursor >= chunk.length) return;
}
const end = this.cursor + this.bulkStringRemainingLength;
if (chunk.length >= end) {
const reply = this.currentStringComposer.end(
chunk.subarray(this.cursor, end)
);
this.bulkStringRemainingLength = undefined;
this.cursor = end + 2;
return reply;
}
const toWrite = chunk.subarray(this.cursor);
this.currentStringComposer.write(toWrite);
this.bulkStringRemainingLength -= toWrite.length;
this.cursor = chunk.length;
}
private arraysInProcess: Array<ArrayInProcess> = [];
private initializeArray = false;
private arrayItemType?: Types;
private parseArray(chunk: Buffer, arraysToKeep = 0): ArrayReply | undefined {
if (this.initializeArray || this.arraysInProcess.length === arraysToKeep) {
const length = this.parseInteger(chunk);
if (length === undefined) {
this.initializeArray = true;
return undefined;
}
this.initializeArray = false;
this.arrayItemType = undefined;
if (length === -1) {
return this.returnArrayReply(null, arraysToKeep);
} else if (length === 0) {
return this.returnArrayReply([], arraysToKeep);
}
this.arraysInProcess.push({
array: new Array(length),
pushCounter: 0
});
}
while (this.cursor < chunk.length) {
if (!this.arrayItemType) {
this.arrayItemType = chunk[this.cursor];
if (++this.cursor >= chunk.length) break;
}
const item = this.parseType(
chunk,
this.arrayItemType,
arraysToKeep + 1
);
if (item === undefined) break;
this.arrayItemType = undefined;
const reply = this.pushArrayItem(item, arraysToKeep);
if (reply !== undefined) return reply;
}
}
private returnArrayReply(reply: ArrayReply, arraysToKeep: number): ArrayReply | undefined {
if (this.arraysInProcess.length <= arraysToKeep) return reply;
return this.pushArrayItem(reply, arraysToKeep);
}
private pushArrayItem(item: Reply, arraysToKeep: number): ArrayReply | undefined {
const to = this.arraysInProcess[this.arraysInProcess.length - 1]!;
to.array[to.pushCounter] = item;
if (++to.pushCounter === to.array.length) {
return this.returnArrayReply(
this.arraysInProcess.pop()!.array,
arraysToKeep
);
}
}
}

View File

@ -0,0 +1,33 @@
import { strict as assert } from 'assert';
import { describe } from 'mocha';
import encodeCommand from './encoder';
describe('RESP2 Encoder', () => {
it('1 byte', () => {
assert.deepEqual(
encodeCommand(['a', 'z']),
['*2\r\n$1\r\na\r\n$1\r\nz\r\n']
);
});
it('2 bytes', () => {
assert.deepEqual(
encodeCommand(['א', 'ת']),
['*2\r\n$2\r\nא\r\n$2\r\nת\r\n']
);
});
it('4 bytes', () => {
assert.deepEqual(
[...encodeCommand(['🐣', '🐤'])],
['*2\r\n$4\r\n🐣\r\n$4\r\n🐤\r\n']
);
});
it('buffer', () => {
assert.deepEqual(
encodeCommand([Buffer.from('string')]),
['*1\r\n$6\r\n', Buffer.from('string'), '\r\n']
);
});
});

View File

@ -0,0 +1,30 @@
import { RedisCommandArgument, RedisCommandArguments } from '../../commands';
const CRLF = '\r\n';
export default function encodeCommand(args: RedisCommandArguments): Array<RedisCommandArgument> {
const toWrite: Array<RedisCommandArgument> = [];
let strings = `*${args.length}${CRLF}`;
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (typeof arg === 'string') {
const byteLength = Buffer.byteLength(arg);
strings += `$${byteLength}${CRLF}`;
strings += arg;
} else if (arg instanceof Buffer) {
toWrite.push(`${strings}$${arg.length}${CRLF}`);
strings = '';
toWrite.push(arg);
} else {
throw new TypeError('Invalid argument type');
}
strings += CRLF;
}
toWrite.push(strings);
return toWrite;
}

View File

@ -1,11 +1,8 @@
import * as LinkedList from 'yallist';
import { AbortError } from '../errors';
import { AbortError, ErrorReply } from '../errors';
import { RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from '../commands';
// We need to use 'require', because it's not possible with Typescript to import
// classes that are exported as 'module.exports = class`, without esModuleInterop
// set to true.
const RedisParser = require('redis-parser');
import RESP2Decoder from './RESP2/decoder';
import encodeCommand from './RESP2/encoder';
export interface QueueCommandOptions {
asap?: boolean;
@ -85,7 +82,6 @@ export default class RedisCommandsQueue {
readonly #maxLength: number | null | undefined;
readonly #waitingToBeSent = new LinkedList<CommandWaitingToBeSent>();
readonly #waitingForReply = new LinkedList<CommandWaitingForReply>();
readonly #pubSubState = {
@ -104,46 +100,33 @@ export default class RedisCommandsQueue {
pMessage: Buffer.from('pmessage'),
subscribe: Buffer.from('subscribe'),
pSubscribe: Buffer.from('psubscribe'),
unsubscribe: Buffer.from('unsunscribe'),
unsubscribe: Buffer.from('unsubscribe'),
pUnsubscribe: Buffer.from('punsubscribe')
};
readonly #parser = new RedisParser({
returnReply: (reply: unknown) => {
if (this.#pubSubState.isActive && Array.isArray(reply)) {
if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) {
return RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubState.listeners.channels,
reply[2],
reply[1]
);
} else if (RedisCommandsQueue.#PUB_SUB_MESSAGES.pMessage.equals(reply[0])) {
return RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubState.listeners.patterns,
reply[3],
reply[2],
reply[1]
);
} else if (
RedisCommandsQueue.#PUB_SUB_MESSAGES.subscribe.equals(reply[0]) ||
RedisCommandsQueue.#PUB_SUB_MESSAGES.pSubscribe.equals(reply[0]) ||
RedisCommandsQueue.#PUB_SUB_MESSAGES.unsubscribe.equals(reply[0]) ||
RedisCommandsQueue.#PUB_SUB_MESSAGES.pUnsubscribe.equals(reply[0])
) {
if (--this.#waitingForReply.head!.value.channelsCounter! === 0) {
this.#shiftWaitingForReply().resolve();
}
return;
}
}
this.#shiftWaitingForReply().resolve(reply);
},
returnError: (err: Error) => this.#shiftWaitingForReply().reject(err)
});
#chainInExecution: symbol | undefined;
#decoder = new RESP2Decoder({
returnStringsAsBuffers: () => {
return !!this.#waitingForReply.head?.value.returnBuffers ||
this.#pubSubState.isActive;
},
onReply: reply => {
if (this.#handlePubSubReply(reply)) {
return;
} else if (!this.#waitingForReply.length) {
throw new Error('Got an unexpected reply from Redis');
}
const { resolve, reject } = this.#waitingForReply.shift()!;
if (reply instanceof ErrorReply) {
reject(reply);
} else {
resolve(reply);
}
}
});
constructor(maxLength: number | null | undefined) {
this.#maxLength = maxLength;
}
@ -257,9 +240,11 @@ export default class RedisCommandsQueue {
listeners.delete(channel);
}
}
if (!channelsToUnsubscribe.length) {
return Promise.resolve();
}
return this.#pushPubSubCommand(command, channelsToUnsubscribe);
}
@ -342,42 +327,67 @@ export default class RedisCommandsQueue {
getCommandToSend(): RedisCommandArguments | undefined {
const toSend = this.#waitingToBeSent.shift();
if (toSend) {
if (!toSend) return;
let encoded: RedisCommandArguments;
try {
encoded = encodeCommand(toSend.args);
} catch (err) {
toSend.reject(err);
return;
}
this.#waitingForReply.push({
resolve: toSend.resolve,
reject: toSend.reject,
channelsCounter: toSend.channelsCounter,
returnBuffers: toSend.returnBuffers
});
}
this.#chainInExecution = toSend?.chainId;
return toSend?.args;
this.#chainInExecution = toSend.chainId;
return encoded;
}
#setReturnBuffers() {
this.#parser.setReturnBuffers(
!!this.#waitingForReply.head?.value.returnBuffers ||
!!this.#pubSubState.isActive
rejectLastCommand(err: unknown): void {
this.#waitingForReply.pop()!.reject(err);
}
onReplyChunk(chunk: Buffer): void {
this.#decoder.write(chunk);
}
#handlePubSubReply(reply: any): boolean {
if (!this.#pubSubState.isActive || !Array.isArray(reply)) return false;
if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) {
RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubState.listeners.channels,
reply[2],
reply[1]
);
} else if (RedisCommandsQueue.#PUB_SUB_MESSAGES.pMessage.equals(reply[0])) {
RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubState.listeners.patterns,
reply[3],
reply[2],
reply[1]
);
} else if (
RedisCommandsQueue.#PUB_SUB_MESSAGES.subscribe.equals(reply[0]) ||
RedisCommandsQueue.#PUB_SUB_MESSAGES.pSubscribe.equals(reply[0]) ||
RedisCommandsQueue.#PUB_SUB_MESSAGES.unsubscribe.equals(reply[0]) ||
RedisCommandsQueue.#PUB_SUB_MESSAGES.pUnsubscribe.equals(reply[0])
) {
if (--this.#waitingForReply.head!.value.channelsCounter! === 0) {
this.#waitingForReply.shift()!.resolve();
}
}
parseResponse(data: Buffer): void {
this.#setReturnBuffers();
this.#parser.execute(data);
}
#shiftWaitingForReply(): CommandWaitingForReply {
if (!this.#waitingForReply.length) {
throw new Error('Got an unexpected reply from Redis');
}
const waitingForReply = this.#waitingForReply.shift()!;
this.#setReturnBuffers();
return waitingForReply;
return true;
}
flushWaitingForReply(err: Error): void {
this.#parser.reset();
this.#decoder.reset();
this.#pubSubState.isActive = false;
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);
if (!this.#chainInExecution) return;

View File

@ -348,7 +348,7 @@ describe('Client', () => {
testUtils.testWithClient('undefined and null should not break the client', async client => {
await assert.rejects(
client.sendCommand([null as any, undefined as any]),
'ERR unknown command ``, with args beginning with: ``'
TypeError
);
assert.equal(

View File

@ -9,7 +9,7 @@ import { CommandOptions, commandOptions, isCommandOptions } from '../command-opt
import { ScanOptions, ZMember } from '../commands/generic-transformers';
import { ScanCommandOptions } from '../commands/SCAN';
import { HScanTuple } from '../commands/HSCAN';
import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply } from '../commander';
import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply, transformLegacyCommandArguments } from '../commander';
import { Pool, Options as PoolOptions, createPool } from 'generic-pool';
import { ClientClosedError, DisconnectsClientError } from '../errors';
import { URL } from 'url';
@ -158,8 +158,8 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
}
readonly #options?: RedisClientOptions<M, S>;
readonly #socket: RedisSocket;
readonly #queue: RedisCommandsQueue;
readonly #socket: RedisSocket;
readonly #isolationPool: Pool<RedisClientType<M, S>>;
readonly #v4: Record<string, any> = {};
#selectedDB = 0;
@ -183,8 +183,8 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
constructor(options?: RedisClientOptions<M, S>) {
super();
this.#options = this.#initiateOptions(options);
this.#socket = this.#initiateSocket();
this.#queue = this.#initiateQueue();
this.#socket = this.#initiateSocket();
this.#isolationPool = createPool({
create: async () => {
const duplicate = this.duplicate({
@ -215,6 +215,10 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
return options;
}
#initiateQueue(): RedisCommandsQueue {
return new RedisCommandsQueue(this.#options?.commandsQueueMaxLength);
}
#initiateSocket(): RedisSocket {
const socketInitiator = async (): Promise<void> => {
const promises = [];
@ -270,7 +274,7 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
};
return new RedisSocket(socketInitiator, this.#options?.socket)
.on('data', data => this.#queue.parseResponse(data))
.on('data', chunk => this.#queue.onReplyChunk(chunk))
.on('error', err => {
this.emit('error', err);
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
@ -289,10 +293,6 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
.on('end', () => this.emit('end'));
}
#initiateQueue(): RedisCommandsQueue {
return new RedisCommandsQueue(this.#options?.commandsQueueMaxLength);
}
#legacyMode(): void {
if (!this.#options?.legacyMode) return;
@ -303,7 +303,7 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
callback = args.pop() as ClientLegacyCallback;
}
this.#sendCommand(args.flat())
this.#sendCommand(transformLegacyCommandArguments(args))
.then((reply: RedisCommandRawReply) => {
if (!callback) return;

View File

@ -1,7 +1,7 @@
import COMMANDS from './commands';
import { RedisCommand, RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands';
import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command';
import { extendWithCommands, extendWithModulesAndScripts } from '../commander';
import { extendWithCommands, extendWithModulesAndScripts, transformLegacyCommandArguments } from '../commander';
import { ExcludeMappedString } from '.';
type RedisClientMultiCommandSignature<C extends RedisCommand, M extends RedisModules, S extends RedisScripts> =
@ -54,7 +54,7 @@ export default class RedisClientMultiCommand {
#legacyMode(): void {
this.v4.addCommand = this.addCommand.bind(this);
(this as any).addCommand = (...args: Array<any>): this => {
this.#multi.addCommand(args.flat());
this.#multi.addCommand(transformLegacyCommandArguments(args));
return this;
};
this.v4.exec = this.exec.bind(this);

View File

@ -1,7 +1,6 @@
import { EventEmitter } from 'events';
import * as net from 'net';
import * as tls from 'tls';
import { encodeCommand } from '../commander';
import { RedisCommandArguments } from '../commands';
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError } from '../errors';
import { promiseTimeout } from '../utils';
@ -157,7 +156,7 @@ export default class RedisSocket extends EventEmitter {
this.#writableNeedDrain = false;
this.emit('drain');
})
.on('data', (data: Buffer) => this.emit('data', data));
.on('data', data => this.emit('data', data));
resolve(socket);
});
@ -192,7 +191,7 @@ export default class RedisSocket extends EventEmitter {
throw new ClientClosedError();
}
for (const toWrite of encodeCommand(args)) {
for (const toWrite of args) {
this.#writableNeedDrain = !this.#socket.write(toWrite);
}
}

View File

@ -1,36 +0,0 @@
import { strict as assert } from 'assert';
import { describe } from 'mocha';
import { encodeCommand } from './commander';
describe('Commander', () => {
describe('encodeCommand (see #1628)', () => {
it('1 byte', () => {
assert.deepEqual(
[...encodeCommand(['a', 'z'])],
['*2\r\n$1\r\na\r\n$1\r\nz\r\n']
);
});
it('2 bytes', () => {
assert.deepEqual(
[...encodeCommand(['א', 'ת'])],
['*2\r\n$2\r\nא\r\n$2\r\nת\r\n']
);
});
it('4 bytes', () => {
assert.deepEqual(
[...encodeCommand(['🐣', '🐤'])],
['*2\r\n$4\r\n🐣\r\n$4\r\n🐤\r\n']
);
});
it('with a buffer', () => {
assert.deepEqual(
[...encodeCommand([Buffer.from('string')])],
['*1\r\n$6\r\n', Buffer.from('string'), '\r\n']
);
});
});
});

View File

@ -1,6 +1,6 @@
import { CommandOptions, isCommandOptions } from './command-options';
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisCommands, RedisModules, RedisScript, RedisScripts } from './commands';
import { RedisCommand, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisCommands, RedisModules, RedisScript, RedisScripts } from './commands';
type Instantiable<T = any> = new(...args: Array<any>) => T;
@ -89,37 +89,8 @@ export function transformCommandArguments<T>(
};
}
const DELIMITER = '\r\n';
export function* encodeCommand(args: RedisCommandArguments): IterableIterator<RedisCommandArgument> {
let strings = `*${args.length}${DELIMITER}`,
stringsLength = 0;
for (const arg of args) {
if (Buffer.isBuffer(arg)) {
yield `${strings}$${arg.length}${DELIMITER}`;
strings = '';
stringsLength = 0;
yield arg;
} else {
const string = arg?.toString?.() ?? '',
byteLength = Buffer.byteLength(string);
strings += `$${byteLength}${DELIMITER}`;
const totalLength = stringsLength + byteLength;
if (totalLength > 1024) {
yield strings;
strings = string;
stringsLength = byteLength;
} else {
strings += string;
stringsLength = totalLength;
}
}
strings += DELIMITER;
}
yield strings;
export function transformLegacyCommandArguments(args: Array<any>): Array<any> {
return args.flat().map(x => x?.toString?.());
}
export function transformCommandReply(

View File

@ -50,3 +50,10 @@ export class ReconnectStrategyError extends Error {
this.socketError = socketError;
}
}
export class ErrorReply extends Error {
constructor(message: string) {
super(message);
this.stack = undefined;
}
}

View File

@ -44,6 +44,6 @@ export async function waitTillBeenCalled(spy: SinonSpy): Promise<void> {
throw new Error('Waiting for more than 1 second');
}
await promiseTimeout(1);
await promiseTimeout(50);
} while (spy.callCount === calls);
}

View File

@ -16,14 +16,12 @@
"dependencies": {
"cluster-key-slot": "1.1.0",
"generic-pool": "3.8.2",
"redis-parser": "3.0.0",
"yallist": "4.0.0"
},
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.2",
"@node-redis/test-utils": "*",
"@types/node": "^17.0.23",
"@types/redis-parser": "^3.0.0",
"@types/sinon": "^10.0.11",
"@types/yallist": "^4.0.1",
"@typescript-eslint/eslint-plugin": "^5.19.0",

View File

@ -11,6 +11,9 @@
"./lib/test-utils.ts",
"./lib/**/*.spec.ts"
],
"ts-node": {
"transpileOnly": true
},
"typedocOptions": {
"entryPoints": [
"./index.ts",