From 77022209bde327d6a2d004dd07153bb6140d0ae2 Mon Sep 17 00:00:00 2001 From: leibale Date: Wed, 29 Dec 2021 17:09:59 -0500 Subject: [PATCH] fix `returnBuffers`, add some tests --- .../lib/commands/bloom/LOADCHUNK.spec.ts | 17 +++++ .../bloom/lib/commands/bloom/SCANDUMP.spec.ts | 11 ++++ .../lib/commands/cuckoo/LOADCHUNK.spec.ts | 20 ++++++ .../bloom/lib/commands/cuckoo/LOADCHUNK.ts | 8 ++- .../lib/commands/cuckoo/SCANDUMP.spec.ts | 12 ++++ .../bloom/lib/commands/cuckoo/SCANDUMP.ts | 4 +- packages/client/lib/client/commands-queue.ts | 13 +++- packages/client/lib/client/index.ts | 2 + .../client/lib/commands/CLIENT_KILL.spec.ts | 13 ++++ packages/client/lib/commands/XPENDING.spec.ts | 62 ++++++++++++++----- packages/client/lib/commands/XPENDING.ts | 12 ++-- 11 files changed, 149 insertions(+), 25 deletions(-) diff --git a/packages/bloom/lib/commands/bloom/LOADCHUNK.spec.ts b/packages/bloom/lib/commands/bloom/LOADCHUNK.spec.ts index ea9f0acfa0..19634cb4a7 100644 --- a/packages/bloom/lib/commands/bloom/LOADCHUNK.spec.ts +++ b/packages/bloom/lib/commands/bloom/LOADCHUNK.spec.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'assert'; +import testUtils, { GLOBAL } from '../../test-utils'; import { transformArguments } from './LOADCHUNK'; describe('BF LOADCHUNK', () => { @@ -8,4 +9,20 @@ describe('BF LOADCHUNK', () => { ['BF.LOADCHUNK', 'key', '0', ''] ); }); + + testUtils.testWithClient('client.bf.loadChunk', async client => { + const [, { iterator, chunk }] = await Promise.all([ + client.bf.reserve('source', 0.01, 100), + client.bf.scanDump( + client.commandOptions({ returnBuffers: true }), + 'source', + 0 + ) + ]); + + assert.equal( + await client.bf.loadChunk('destination', iterator, chunk), + 'OK' + ); + }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/bloom/lib/commands/bloom/SCANDUMP.spec.ts b/packages/bloom/lib/commands/bloom/SCANDUMP.spec.ts index 8344016759..5011959048 100644 --- a/packages/bloom/lib/commands/bloom/SCANDUMP.spec.ts +++ b/packages/bloom/lib/commands/bloom/SCANDUMP.spec.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'assert'; +import testUtils, { GLOBAL } from '../../test-utils'; import { transformArguments } from './SCANDUMP'; describe('BF SCANDUMP', () => { @@ -8,4 +9,14 @@ describe('BF SCANDUMP', () => { ['BF.SCANDUMP', 'key', '0'] ); }); + + testUtils.testWithClient('client.bf.scanDump', async client => { + const [, dump] = await Promise.all([ + client.bf.reserve('key', 0.01, 100), + client.bf.scanDump('key', 0) + ]); + assert.equal(typeof dump, 'object'); + assert.equal(typeof dump.iterator, 'number'); + assert.equal(typeof dump.chunk, 'string'); + }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/bloom/lib/commands/cuckoo/LOADCHUNK.spec.ts b/packages/bloom/lib/commands/cuckoo/LOADCHUNK.spec.ts index 68dff468d4..ca3d6f2f8f 100644 --- a/packages/bloom/lib/commands/cuckoo/LOADCHUNK.spec.ts +++ b/packages/bloom/lib/commands/cuckoo/LOADCHUNK.spec.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'assert'; +import testUtils, { GLOBAL } from '../../test-utils'; import { transformArguments } from './LOADCHUNK'; describe('CF LOADCHUNK', () => { @@ -8,4 +9,23 @@ describe('CF LOADCHUNK', () => { ['CF.LOADCHUNK', 'item', '0', ''] ); }); + + testUtils.testWithClient('client.cf.loadChunk', async client => { + const [,, { iterator, chunk }] = await Promise.all([ + client.cf.reserve('source', 4), + client.cf.add('source', 'item'), + client.cf.scanDump( + client.commandOptions({ returnBuffers: true }), + 'source', + 0 + ) + ]); + + assert.ok(Buffer.isBuffer(chunk)); + + assert.equal( + await client.cf.loadChunk('destination', iterator, chunk), + 'OK' + ); + }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/bloom/lib/commands/cuckoo/LOADCHUNK.ts b/packages/bloom/lib/commands/cuckoo/LOADCHUNK.ts index 5d22109978..5b739f67cc 100644 --- a/packages/bloom/lib/commands/cuckoo/LOADCHUNK.ts +++ b/packages/bloom/lib/commands/cuckoo/LOADCHUNK.ts @@ -1,6 +1,12 @@ +import { RedisCommandArgument, RedisCommandArguments } from '@node-redis/client/dist/lib/commands'; + export const FIRST_KEY_INDEX = 1; -export function transformArguments(key: string, iterator: number, chunk: string): Array { +export function transformArguments( + key: string, + iterator: number, + chunk: RedisCommandArgument +): RedisCommandArguments { return ['CF.LOADCHUNK', key, iterator.toString(), chunk]; } diff --git a/packages/bloom/lib/commands/cuckoo/SCANDUMP.spec.ts b/packages/bloom/lib/commands/cuckoo/SCANDUMP.spec.ts index 43b812999f..ec269c62aa 100644 --- a/packages/bloom/lib/commands/cuckoo/SCANDUMP.spec.ts +++ b/packages/bloom/lib/commands/cuckoo/SCANDUMP.spec.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'assert'; +import testUtils, { GLOBAL } from '../../test-utils'; import { transformArguments } from './SCANDUMP'; describe('CF SCANDUMP', () => { @@ -8,4 +9,15 @@ describe('CF SCANDUMP', () => { ['CF.SCANDUMP', 'key', '0'] ); }); + + testUtils.testWithClient('client.cf.scanDump', async client => { + await client.cf.reserve('key', 4); + assert.deepEqual( + await client.cf.scanDump('key', 0), + { + iterator: 0, + chunk: null + } + ); + }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/bloom/lib/commands/cuckoo/SCANDUMP.ts b/packages/bloom/lib/commands/cuckoo/SCANDUMP.ts index dcabadb710..91476b49a7 100644 --- a/packages/bloom/lib/commands/cuckoo/SCANDUMP.ts +++ b/packages/bloom/lib/commands/cuckoo/SCANDUMP.ts @@ -6,12 +6,12 @@ export function transformArguments(key: string, iterator: number): Array type ScanDumpRawReply = [ iterator: number, - chunk: string + chunk: string | null ]; interface ScanDumpReply { iterator: number; - chunk: string; + chunk: string | null; } export function transformReply([iterator, chunk]: ScanDumpRawReply): ScanDumpReply { diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 3505ef8fa7..13e37c4ccc 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -360,11 +360,15 @@ export default class RedisCommandsQueue { return toSend?.args; } - parseResponse(data: Buffer): void { + #setReturnBuffers() { this.#parser.setReturnBuffers( !!this.#waitingForReply.head?.value.returnBuffers || !!this.#pubSubState?.subscribed ); + } + + parseResponse(data: Buffer): void { + this.#setReturnBuffers(); this.#parser.execute(data); } @@ -372,8 +376,12 @@ export default class RedisCommandsQueue { if (!this.#waitingForReply.length) { throw new Error('Got an unexpected reply from Redis'); } - return this.#waitingForReply.shift()!; + + const waitingForReply = this.#waitingForReply.shift()!; + this.#setReturnBuffers(); + return waitingForReply; } + flushWaitingForReply(err: Error): void { RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); if (!this.#chainInExecution) { @@ -384,6 +392,7 @@ export default class RedisCommandsQueue { } this.#chainInExecution = undefined; } + flushAll(err: Error): void { RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); RedisCommandsQueue.#flushQueue(this.#waitingToBeSent, err); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 2b801a5408..fe3ed36b75 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -89,6 +89,8 @@ export default class RedisClient return commandOptions(options); } + commandOptions = RedisClient.commandOptions; + static extend(plugins?: RedisPlugins): InstantiableRedisClient { const Client = extendWithModulesAndScripts({ BaseClass: RedisClient, diff --git a/packages/client/lib/commands/CLIENT_KILL.spec.ts b/packages/client/lib/commands/CLIENT_KILL.spec.ts index 0c38a0fb16..2fe894f361 100644 --- a/packages/client/lib/commands/CLIENT_KILL.spec.ts +++ b/packages/client/lib/commands/CLIENT_KILL.spec.ts @@ -93,5 +93,18 @@ describe('CLIENT KILL', () => { ); }); }); + + it('TYPE & SKIP_ME', () => { + assert.deepEqual( + transformArguments([ + { + filter: ClientKillFilters.TYPE, + type: 'master' + }, + ClientKillFilters.SKIP_ME + ]), + ['CLIENT', 'KILL', 'TYPE', 'master', 'SKIPME'] + ); + }); }); }); diff --git a/packages/client/lib/commands/XPENDING.spec.ts b/packages/client/lib/commands/XPENDING.spec.ts index 7eb12b40ef..af5c239e6c 100644 --- a/packages/client/lib/commands/XPENDING.spec.ts +++ b/packages/client/lib/commands/XPENDING.spec.ts @@ -12,19 +12,53 @@ describe('XPENDING', () => { }); }); - testUtils.testWithClient('client.xPending', async client => { - await client.xGroupCreate('key', 'group', '$', { - MKSTREAM: true - }); + describe('client.xPending', () => { + testUtils.testWithClient('simple', async client => { + await Promise.all([ + client.xGroupCreate('key', 'group', '$', { + MKSTREAM: true + }), + client.xGroupCreateConsumer('key', 'group', 'consumer') + ]); + + assert.deepEqual( + await client.xPending('key', 'group'), + { + pending: 0, + firstId: null, + lastId: null, + consumers: null + } + ); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('with consumers', async client => { + const [,, id] = await Promise.all([ + client.xGroupCreate('key', 'group', '$', { + MKSTREAM: true + }), + client.xGroupCreateConsumer('key', 'group', 'consumer'), + client.xAdd('key', '*', { field: 'value' }), + client.xReadGroup('group', 'consumer', { + key: 'key', + id: '>' + }) + ]); + + assert.deepEqual( + await client.xPending('key', 'group'), + { + pending: 1, + firstId: id, + lastId: id, + consumers: [{ + name: 'consumer', + deliveriesCounter: 1 + }] + } + ); + }, GLOBAL.SERVERS.OPEN); + }); + - assert.deepEqual( - await client.xPending('key', 'group'), - { - pending: 0, - firstId: null, - lastId: null, - consumers: null - } - ); - }, GLOBAL.SERVERS.OPEN); }); diff --git a/packages/client/lib/commands/XPENDING.ts b/packages/client/lib/commands/XPENDING.ts index a6052adb0f..ac56e42941 100644 --- a/packages/client/lib/commands/XPENDING.ts +++ b/packages/client/lib/commands/XPENDING.ts @@ -17,17 +17,17 @@ type XPendingRawReply = [ lastId: RedisCommandArgument | null, consumers: Array<[ name: RedisCommandArgument, - deliveriesCounter: number + deliveriesCounter: RedisCommandArgument ]> | null -] +]; interface XPendingReply { pending: number; firstId: RedisCommandArgument | null; - lastId: RedisCommandArgument | null + lastId: RedisCommandArgument | null; consumers: Array<{ - name: RedisCommandArgument, - deliveriesCounter: number + name: RedisCommandArgument; + deliveriesCounter: number; }> | null; } @@ -38,7 +38,7 @@ export function transformReply(reply: XPendingRawReply): XPendingReply { lastId: reply[2], consumers: reply[3] === null ? null : reply[3].map(([name, deliveriesCounter]) => ({ name, - deliveriesCounter + deliveriesCounter: Number(deliveriesCounter) })) }; }