You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-07 13:22:56 +03:00
fix returnBuffers
, add some tests
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
import { strict as assert } from 'assert';
|
import { strict as assert } from 'assert';
|
||||||
|
import testUtils, { GLOBAL } from '../../test-utils';
|
||||||
import { transformArguments } from './LOADCHUNK';
|
import { transformArguments } from './LOADCHUNK';
|
||||||
|
|
||||||
describe('BF LOADCHUNK', () => {
|
describe('BF LOADCHUNK', () => {
|
||||||
@@ -8,4 +9,20 @@ describe('BF LOADCHUNK', () => {
|
|||||||
['BF.LOADCHUNK', 'key', '0', '']
|
['BF.LOADCHUNK', 'key', '0', '']
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
testUtils.testWithClient('client.bf.loadChunk', async client => {
|
||||||
|
const [, { iterator, chunk }] = await Promise.all([
|
||||||
|
client.bf.reserve('source', 0.01, 100),
|
||||||
|
client.bf.scanDump(
|
||||||
|
client.commandOptions({ returnBuffers: true }),
|
||||||
|
'source',
|
||||||
|
0
|
||||||
|
)
|
||||||
|
]);
|
||||||
|
|
||||||
|
assert.equal(
|
||||||
|
await client.bf.loadChunk('destination', iterator, chunk),
|
||||||
|
'OK'
|
||||||
|
);
|
||||||
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
});
|
});
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { strict as assert } from 'assert';
|
import { strict as assert } from 'assert';
|
||||||
|
import testUtils, { GLOBAL } from '../../test-utils';
|
||||||
import { transformArguments } from './SCANDUMP';
|
import { transformArguments } from './SCANDUMP';
|
||||||
|
|
||||||
describe('BF SCANDUMP', () => {
|
describe('BF SCANDUMP', () => {
|
||||||
@@ -8,4 +9,14 @@ describe('BF SCANDUMP', () => {
|
|||||||
['BF.SCANDUMP', 'key', '0']
|
['BF.SCANDUMP', 'key', '0']
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
testUtils.testWithClient('client.bf.scanDump', async client => {
|
||||||
|
const [, dump] = await Promise.all([
|
||||||
|
client.bf.reserve('key', 0.01, 100),
|
||||||
|
client.bf.scanDump('key', 0)
|
||||||
|
]);
|
||||||
|
assert.equal(typeof dump, 'object');
|
||||||
|
assert.equal(typeof dump.iterator, 'number');
|
||||||
|
assert.equal(typeof dump.chunk, 'string');
|
||||||
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
});
|
});
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { strict as assert } from 'assert';
|
import { strict as assert } from 'assert';
|
||||||
|
import testUtils, { GLOBAL } from '../../test-utils';
|
||||||
import { transformArguments } from './LOADCHUNK';
|
import { transformArguments } from './LOADCHUNK';
|
||||||
|
|
||||||
describe('CF LOADCHUNK', () => {
|
describe('CF LOADCHUNK', () => {
|
||||||
@@ -8,4 +9,23 @@ describe('CF LOADCHUNK', () => {
|
|||||||
['CF.LOADCHUNK', 'item', '0', '']
|
['CF.LOADCHUNK', 'item', '0', '']
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
testUtils.testWithClient('client.cf.loadChunk', async client => {
|
||||||
|
const [,, { iterator, chunk }] = await Promise.all([
|
||||||
|
client.cf.reserve('source', 4),
|
||||||
|
client.cf.add('source', 'item'),
|
||||||
|
client.cf.scanDump(
|
||||||
|
client.commandOptions({ returnBuffers: true }),
|
||||||
|
'source',
|
||||||
|
0
|
||||||
|
)
|
||||||
|
]);
|
||||||
|
|
||||||
|
assert.ok(Buffer.isBuffer(chunk));
|
||||||
|
|
||||||
|
assert.equal(
|
||||||
|
await client.cf.loadChunk('destination', iterator, chunk),
|
||||||
|
'OK'
|
||||||
|
);
|
||||||
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
});
|
});
|
||||||
|
@@ -1,6 +1,12 @@
|
|||||||
|
import { RedisCommandArgument, RedisCommandArguments } from '@node-redis/client/dist/lib/commands';
|
||||||
|
|
||||||
export const FIRST_KEY_INDEX = 1;
|
export const FIRST_KEY_INDEX = 1;
|
||||||
|
|
||||||
export function transformArguments(key: string, iterator: number, chunk: string): Array<string> {
|
export function transformArguments(
|
||||||
|
key: string,
|
||||||
|
iterator: number,
|
||||||
|
chunk: RedisCommandArgument
|
||||||
|
): RedisCommandArguments {
|
||||||
return ['CF.LOADCHUNK', key, iterator.toString(), chunk];
|
return ['CF.LOADCHUNK', key, iterator.toString(), chunk];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { strict as assert } from 'assert';
|
import { strict as assert } from 'assert';
|
||||||
|
import testUtils, { GLOBAL } from '../../test-utils';
|
||||||
import { transformArguments } from './SCANDUMP';
|
import { transformArguments } from './SCANDUMP';
|
||||||
|
|
||||||
describe('CF SCANDUMP', () => {
|
describe('CF SCANDUMP', () => {
|
||||||
@@ -8,4 +9,15 @@ describe('CF SCANDUMP', () => {
|
|||||||
['CF.SCANDUMP', 'key', '0']
|
['CF.SCANDUMP', 'key', '0']
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
testUtils.testWithClient('client.cf.scanDump', async client => {
|
||||||
|
await client.cf.reserve('key', 4);
|
||||||
|
assert.deepEqual(
|
||||||
|
await client.cf.scanDump('key', 0),
|
||||||
|
{
|
||||||
|
iterator: 0,
|
||||||
|
chunk: null
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
});
|
});
|
||||||
|
@@ -6,12 +6,12 @@ export function transformArguments(key: string, iterator: number): Array<string>
|
|||||||
|
|
||||||
type ScanDumpRawReply = [
|
type ScanDumpRawReply = [
|
||||||
iterator: number,
|
iterator: number,
|
||||||
chunk: string
|
chunk: string | null
|
||||||
];
|
];
|
||||||
|
|
||||||
interface ScanDumpReply {
|
interface ScanDumpReply {
|
||||||
iterator: number;
|
iterator: number;
|
||||||
chunk: string;
|
chunk: string | null;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function transformReply([iterator, chunk]: ScanDumpRawReply): ScanDumpReply {
|
export function transformReply([iterator, chunk]: ScanDumpRawReply): ScanDumpReply {
|
||||||
|
@@ -360,11 +360,15 @@ export default class RedisCommandsQueue {
|
|||||||
return toSend?.args;
|
return toSend?.args;
|
||||||
}
|
}
|
||||||
|
|
||||||
parseResponse(data: Buffer): void {
|
#setReturnBuffers() {
|
||||||
this.#parser.setReturnBuffers(
|
this.#parser.setReturnBuffers(
|
||||||
!!this.#waitingForReply.head?.value.returnBuffers ||
|
!!this.#waitingForReply.head?.value.returnBuffers ||
|
||||||
!!this.#pubSubState?.subscribed
|
!!this.#pubSubState?.subscribed
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
parseResponse(data: Buffer): void {
|
||||||
|
this.#setReturnBuffers();
|
||||||
this.#parser.execute(data);
|
this.#parser.execute(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -372,8 +376,12 @@ export default class RedisCommandsQueue {
|
|||||||
if (!this.#waitingForReply.length) {
|
if (!this.#waitingForReply.length) {
|
||||||
throw new Error('Got an unexpected reply from Redis');
|
throw new Error('Got an unexpected reply from Redis');
|
||||||
}
|
}
|
||||||
return this.#waitingForReply.shift()!;
|
|
||||||
|
const waitingForReply = this.#waitingForReply.shift()!;
|
||||||
|
this.#setReturnBuffers();
|
||||||
|
return waitingForReply;
|
||||||
}
|
}
|
||||||
|
|
||||||
flushWaitingForReply(err: Error): void {
|
flushWaitingForReply(err: Error): void {
|
||||||
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);
|
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);
|
||||||
if (!this.#chainInExecution) {
|
if (!this.#chainInExecution) {
|
||||||
@@ -384,6 +392,7 @@ export default class RedisCommandsQueue {
|
|||||||
}
|
}
|
||||||
this.#chainInExecution = undefined;
|
this.#chainInExecution = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
flushAll(err: Error): void {
|
flushAll(err: Error): void {
|
||||||
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);
|
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);
|
||||||
RedisCommandsQueue.#flushQueue(this.#waitingToBeSent, err);
|
RedisCommandsQueue.#flushQueue(this.#waitingToBeSent, err);
|
||||||
|
@@ -89,6 +89,8 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
|
|||||||
return commandOptions(options);
|
return commandOptions(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
commandOptions = RedisClient.commandOptions;
|
||||||
|
|
||||||
static extend<M extends RedisModules, S extends RedisScripts>(plugins?: RedisPlugins<M, S>): InstantiableRedisClient<M, S> {
|
static extend<M extends RedisModules, S extends RedisScripts>(plugins?: RedisPlugins<M, S>): InstantiableRedisClient<M, S> {
|
||||||
const Client = <any>extendWithModulesAndScripts({
|
const Client = <any>extendWithModulesAndScripts({
|
||||||
BaseClass: RedisClient,
|
BaseClass: RedisClient,
|
||||||
|
@@ -93,5 +93,18 @@ describe('CLIENT KILL', () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('TYPE & SKIP_ME', () => {
|
||||||
|
assert.deepEqual(
|
||||||
|
transformArguments([
|
||||||
|
{
|
||||||
|
filter: ClientKillFilters.TYPE,
|
||||||
|
type: 'master'
|
||||||
|
},
|
||||||
|
ClientKillFilters.SKIP_ME
|
||||||
|
]),
|
||||||
|
['CLIENT', 'KILL', 'TYPE', 'master', 'SKIPME']
|
||||||
|
);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@@ -12,10 +12,14 @@ describe('XPENDING', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
testUtils.testWithClient('client.xPending', async client => {
|
describe('client.xPending', () => {
|
||||||
await client.xGroupCreate('key', 'group', '$', {
|
testUtils.testWithClient('simple', async client => {
|
||||||
|
await Promise.all([
|
||||||
|
client.xGroupCreate('key', 'group', '$', {
|
||||||
MKSTREAM: true
|
MKSTREAM: true
|
||||||
});
|
}),
|
||||||
|
client.xGroupCreateConsumer('key', 'group', 'consumer')
|
||||||
|
]);
|
||||||
|
|
||||||
assert.deepEqual(
|
assert.deepEqual(
|
||||||
await client.xPending('key', 'group'),
|
await client.xPending('key', 'group'),
|
||||||
@@ -27,4 +31,34 @@ describe('XPENDING', () => {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
}, GLOBAL.SERVERS.OPEN);
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
|
|
||||||
|
testUtils.testWithClient('with consumers', async client => {
|
||||||
|
const [,, id] = await Promise.all([
|
||||||
|
client.xGroupCreate('key', 'group', '$', {
|
||||||
|
MKSTREAM: true
|
||||||
|
}),
|
||||||
|
client.xGroupCreateConsumer('key', 'group', 'consumer'),
|
||||||
|
client.xAdd('key', '*', { field: 'value' }),
|
||||||
|
client.xReadGroup('group', 'consumer', {
|
||||||
|
key: 'key',
|
||||||
|
id: '>'
|
||||||
|
})
|
||||||
|
]);
|
||||||
|
|
||||||
|
assert.deepEqual(
|
||||||
|
await client.xPending('key', 'group'),
|
||||||
|
{
|
||||||
|
pending: 1,
|
||||||
|
firstId: id,
|
||||||
|
lastId: id,
|
||||||
|
consumers: [{
|
||||||
|
name: 'consumer',
|
||||||
|
deliveriesCounter: 1
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
});
|
});
|
||||||
|
@@ -17,17 +17,17 @@ type XPendingRawReply = [
|
|||||||
lastId: RedisCommandArgument | null,
|
lastId: RedisCommandArgument | null,
|
||||||
consumers: Array<[
|
consumers: Array<[
|
||||||
name: RedisCommandArgument,
|
name: RedisCommandArgument,
|
||||||
deliveriesCounter: number
|
deliveriesCounter: RedisCommandArgument
|
||||||
]> | null
|
]> | null
|
||||||
]
|
];
|
||||||
|
|
||||||
interface XPendingReply {
|
interface XPendingReply {
|
||||||
pending: number;
|
pending: number;
|
||||||
firstId: RedisCommandArgument | null;
|
firstId: RedisCommandArgument | null;
|
||||||
lastId: RedisCommandArgument | null
|
lastId: RedisCommandArgument | null;
|
||||||
consumers: Array<{
|
consumers: Array<{
|
||||||
name: RedisCommandArgument,
|
name: RedisCommandArgument;
|
||||||
deliveriesCounter: number
|
deliveriesCounter: number;
|
||||||
}> | null;
|
}> | null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -38,7 +38,7 @@ export function transformReply(reply: XPendingRawReply): XPendingReply {
|
|||||||
lastId: reply[2],
|
lastId: reply[2],
|
||||||
consumers: reply[3] === null ? null : reply[3].map(([name, deliveriesCounter]) => ({
|
consumers: reply[3] === null ? null : reply[3].map(([name, deliveriesCounter]) => ({
|
||||||
name,
|
name,
|
||||||
deliveriesCounter
|
deliveriesCounter: Number(deliveriesCounter)
|
||||||
}))
|
}))
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user