1
0
mirror of https://github.com/redis/node-redis.git synced 2025-12-25 00:40:59 +03:00

add support for most stream commands

This commit is contained in:
leibale
2021-06-17 18:22:46 -04:00
parent 50b35de439
commit 53de279afe
43 changed files with 1401 additions and 0 deletions

28
lib/commands/XACK.spec.ts Normal file
View File

@@ -0,0 +1,28 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XACK';
describe('XACK', () => {
describe('transformArguments', () => {
it('string', () => {
assert.deepEqual(
transformArguments('key', 'group', '1-0'),
['XACK', 'key', 'group', '1-0']
);
});
it('array', () => {
assert.deepEqual(
transformArguments('key', 'group', ['1-0', '2-0']),
['XACK', 'key', 'group', '1-0', '2-0']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xAck', async client => {
assert.equal(
await client.xAck('key', 'group', '1-0'),
0
);
});
});

17
lib/commands/XACK.ts Normal file
View File

@@ -0,0 +1,17 @@
import { transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, group: string, id: string | Array<string>): Array<string> {
const args = ['XACK', key, group];
if (typeof id === 'string') {
args.push(id);
} else {
args.push(...id);
}
return args;
}
export const transformReply = transformReplyNumber;

118
lib/commands/XADD.spec.ts Normal file
View File

@@ -0,0 +1,118 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XADD';
describe('XADD', () => {
describe('transformArguments', () => {
it('single field', () => {
assert.deepEqual(
transformArguments('key', '*', {
field: 'value'
}),
['XADD', 'key', '*', 'field', 'value']
);
});
it('multiple fields', () => {
assert.deepEqual(
transformArguments('key', '*', {
'1': 'I',
'2': 'II'
}),
['XADD', 'key', '*', '1', 'I', '2', 'II']
);
});
it('with NOMKSTREAM', () => {
assert.deepEqual(
transformArguments('key', '*', {
field: 'value'
}, {
NOMKSTREAM: true
}),
['XADD', 'key', 'NOMKSTREAM', '*', 'field', 'value']
);
});
it('with TRIM', () => {
assert.deepEqual(
transformArguments('key', '*', {
field: 'value'
}, {
TRIM: {
threshold: 1000
}
}),
['XADD', 'key', '1000', '*', 'field', 'value']
);
});
it('with TRIM.strategy', () => {
assert.deepEqual(
transformArguments('key', '*', {
field: 'value'
}, {
TRIM: {
strategy: 'MAXLEN',
threshold: 1000
}
}),
['XADD', 'key', 'MAXLEN', '1000', '*','field', 'value']
);
});
it('with TRIM.strategyModifier', () => {
assert.deepEqual(
transformArguments('key', '*', {
field: 'value'
}, {
TRIM: {
strategyModifier: '=',
threshold: 1000
}
}),
['XADD', 'key', '=', '1000', '*', 'field', 'value']
);
});
it('with TRIM.limit', () => {
assert.deepEqual(
transformArguments('key', '*', {
field: 'value'
}, {
TRIM: {
threshold: 1000,
limit: 1
}
}),
['XADD', 'key', '1000', 'LIMIT', '1', '*', 'field', 'value']
);
});
it('with NOMKSTREAM, TRIM, TRIM.*', () => {
assert.deepEqual(
transformArguments('key', '*', {
field: 'value'
}, {
NOMKSTREAM: true,
TRIM: {
strategy: 'MAXLEN',
strategyModifier: '=',
threshold: 1000,
limit: 1
}
}),
['XADD', 'key', 'NOMKSTREAM', 'MAXLEN', '=', '1000', 'LIMIT', '1', '*', 'field', 'value']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xAdd', async client => {
assert.equal(
typeof await client.xAdd('key', '*', {
field: 'value'
}),
'string'
);
});
});

48
lib/commands/XADD.ts Normal file
View File

@@ -0,0 +1,48 @@
import { StreamMessage, transformReplyString } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
interface XAddOptions {
NOMKSTREAM?: true;
TRIM?: {
strategy?: 'MAXLEN' | 'MINID';
strategyModifier?: '=' | '~';
threshold: number;
limit?: number;
}
}
export function transformArguments(key: string, id: string, message: StreamMessage, options?: XAddOptions): Array<string> {
const args = ['XADD', key];
if (options?.NOMKSTREAM) {
args.push('NOMKSTREAM');
}
if (options?.TRIM) {
if (options.TRIM.strategy) {
args.push(options.TRIM.strategy);
}
if (options.TRIM.strategyModifier) {
args.push(options.TRIM.strategyModifier);
}
args.push(options.TRIM.threshold.toString());
if (options.TRIM.limit) {
args.push('LIMIT', options.TRIM.limit.toString());
}
}
args.push(id);
for (const [key, value] of Object.entries(message)) {
args.push(key, value);
}
return args;
}
export const transformReply = transformReplyString;

View File

@@ -0,0 +1,40 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XAUTOCLAIM';
describe('XAUTOCLAIM', () => {
describe('transformArguments', () => {
it('simple', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0'),
['XAUTOCLAIM', 'key', 'group', 'consumer', '1', '0-0']
);
});
it('with COUNT', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0', {
COUNT: 1
}),
['XAUTOCLAIM', 'key', 'group', 'consumer', '1', '0-0', 'COUNT', '1']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xAutoClaim', async client => {
await Promise.all([
client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
}),
client.xGroupCreateConsumer('key', 'group', 'consumer'),
]);
assert.deepEqual(
await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'),
{
nextId: '0-0',
messages: []
}
);
});
});

View File

@@ -0,0 +1,36 @@
import { StreamMessagesReply, transformReplyStreamMessages } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export interface XAutoClaimOptions {
COUNT?: number;
}
export function transformArguments(
key: string,
group: string,
consumer: string,
minIdleTime: number,
start: string,
options?: XAutoClaimOptions
): Array<string> {
const args = ['XAUTOCLAIM', key, group, consumer, minIdleTime.toString(), start];
if (options?.COUNT) {
args.push('COUNT', options.COUNT.toString());
}
return args;
}
interface XAutoClaimReply {
nextId: string;
messages: StreamMessagesReply;
}
export function transformReply(reply: [string, Array<any>]): XAutoClaimReply {
return {
nextId: reply[0],
messages: transformReplyStreamMessages(reply[1])
};
}

View File

@@ -0,0 +1,29 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XAUTOCLAIM_JUSTID';
describe('XAUTOCLAIM JUSTID', () => {
it('transformArguments', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0'),
['XAUTOCLAIM', 'key', 'group', 'consumer', '1', '0-0', 'JUSTID']
);
});
itWithClient(TestRedisServers.OPEN, 'client.xAutoClaimJustId', async client => {
await Promise.all([
client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
}),
client.xGroupCreateConsumer('key', 'group', 'consumer'),
]);
assert.deepEqual(
await client.xAutoClaimJustId('key', 'group', 'consumer', 1, '0-0'),
{
nextId: '0-0',
messages: []
}
);
});
});

View File

@@ -0,0 +1,22 @@
import { transformArguments as transformXAutoClaimArguments } from './XAUTOCLAIM';
export { FIRST_KEY_INDEX } from './XAUTOCLAIM';
export function transformArguments(...args: Parameters<typeof transformXAutoClaimArguments>): Array<string> {
return [
...transformXAutoClaimArguments(...args),
'JUSTID'
];
}
interface XAutoClaimJustIdReply {
nextId: string;
messages: Array<string>;
}
export function transformReply(reply: [string, Array<string>]): XAutoClaimJustIdReply {
return {
nextId: reply[0],
messages: reply[1]
};
}

View File

@@ -0,0 +1,93 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XCLAIM';
describe('XCLAIM', () => {
describe('transformArguments', () => {
it('single id (string)', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0'),
['XCLAIM', 'key', 'group', 'consumer', '1', '0-0']
);
});
it('multiple ids (array)', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, ['0-0', '1-0']),
['XCLAIM', 'key', 'group', 'consumer', '1', '0-0', '1-0']
);
});
it('with IDLE', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0', {
IDLE: 1
}),
['XCLAIM', 'key', 'group', 'consumer', '1', '0-0', 'IDLE', '1']
);
});
it('with TIME (number)', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0', {
TIME: 1
}),
['XCLAIM', 'key', 'group', 'consumer', '1', '0-0', 'TIME', '1']
);
});
it('with TIME (date)', () => {
const d = new Date();
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0', {
TIME: d
}),
['XCLAIM', 'key', 'group', 'consumer', '1', '0-0', 'TIME', d.getTime().toString()]
);
});
it('with RETRYCOUNT', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0', {
RETRYCOUNT: 1
}),
['XCLAIM', 'key', 'group', 'consumer', '1', '0-0', 'RETRYCOUNT', '1']
);
});
it('with FORCE', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0', {
FORCE: true
}),
['XCLAIM', 'key', 'group', 'consumer', '1', '0-0', 'FORCE']
);
});
it('with IDLE, TIME, RETRYCOUNT, FORCE, JUSTID', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0', {
IDLE: 1,
TIME: 1,
RETRYCOUNT: 1,
FORCE: true
}),
['XCLAIM', 'key', 'group', 'consumer', '1', '0-0', 'IDLE', '1', 'TIME', '1', 'RETRYCOUNT', '1', 'FORCE']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xClaim', async client => {
await Promise.all([
client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
}),
client.xGroupCreateConsumer('key', 'group', 'consumer'),
]);
assert.deepEqual(
await client.xClaim('key', 'group', 'consumer', 1, '0-0'),
[]
);
});
});

50
lib/commands/XCLAIM.ts Normal file
View File

@@ -0,0 +1,50 @@
import { transformReplyStreamMessages } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export interface XClaimOptions {
IDLE?: number;
TIME?: number | Date;
RETRYCOUNT?: number;
FORCE?: true;
}
export function transformArguments(
key: string,
group: string,
consumer: string,
minIdleTime: number,
id: string | Array<string>,
options?: XClaimOptions
): Array<string> {
const args = ['XCLAIM', key, group, consumer, minIdleTime.toString()];
if (typeof id === 'string') {
args.push(id);
} else {
args.push(...id);
}
if (options?.IDLE) {
args.push('IDLE', options.IDLE.toString());
}
if (options?.TIME) {
args.push(
'TIME',
(typeof options.TIME === 'number' ? options.TIME : options.TIME.getTime()).toString()
);
}
if (options?.RETRYCOUNT) {
args.push('RETRYCOUNT', options.RETRYCOUNT.toString());
}
if (options?.FORCE) {
args.push('FORCE');
}
return args;
}
export const transformReply = transformReplyStreamMessages;

View File

@@ -0,0 +1,26 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XCLAIM_JUSTID';
describe('XCLAIM JUSTID', () => {
it('transformArguments', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer', 1, '0-0'),
['XCLAIM', 'key', 'group', 'consumer', '1', '0-0', 'JUSTID']
);
});
itWithClient(TestRedisServers.OPEN, 'client.xClaimJustId', async client => {
await Promise.all([
client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
}),
client.xGroupCreateConsumer('key', 'group', 'consumer'),
]);
assert.deepEqual(
await client.xClaimJustId('key', 'group', 'consumer', 1, '0-0'),
[]
);
});
});

View File

@@ -0,0 +1,13 @@
import { transformReplyStringArray } from './generic-transformers';
import { transformArguments as transformArgumentsXClaim } from './XCLAIM';
export { FIRST_KEY_INDEX } from './XCLAIM';
export function transformArguments(...args: Parameters<typeof transformArgumentsXClaim>): Array<string> {
return [
...transformArgumentsXClaim(...args),
'JUSTID'
];
}
export const transformReply = transformReplyStringArray;

28
lib/commands/XDEL.spec.ts Normal file
View File

@@ -0,0 +1,28 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XDEL';
describe('XDEL', () => {
describe('transformArguments', () => {
it('string', () => {
assert.deepEqual(
transformArguments('key', '0-0'),
['XDEL', 'key', '0-0']
);
});
it('array', () => {
assert.deepEqual(
transformArguments('key', ['0-0', '1-0']),
['XDEL', 'key', '0-0', '1-0']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xDel', async client => {
assert.equal(
await client.xDel('key', '0-0'),
0
);
});
});

17
lib/commands/XDEL.ts Normal file
View File

@@ -0,0 +1,17 @@
import { transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, id: string | Array<string>): Array<string> {
const args = ['XDEL', key];
if (typeof id === 'string') {
args.push(id);
} else {
args.push(...id);
}
return args;
}
export const transformReply = transformReplyNumber;

View File

@@ -0,0 +1,32 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XGROUP_CREATE';
describe('XGROUP CREATE', () => {
describe('transformArguments', () => {
it('simple', () => {
assert.deepEqual(
transformArguments('key', 'group', '$'),
['XGROUP', 'CREATE', 'key', 'group', '$']
);
});
it('with MKSTREAM', () => {
assert.deepEqual(
transformArguments('key', 'group', '$', {
MKSTREAM: true
}),
['XGROUP', 'CREATE', 'key', 'group', '$', 'MKSTREAM']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xGroupCreate', async client => {
assert.equal(
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
}),
'OK'
);
});
});

View File

@@ -0,0 +1,19 @@
import { transformReplyString } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
interface XGroupCreateOptions {
MKSTREAM?: true;
}
export function transformArguments(key: string, group: string, id: string, options?: XGroupCreateOptions): Array<string> {
const args = ['XGROUP', 'CREATE', key, group, id];
if (options?.MKSTREAM) {
args.push('MKSTREAM');
}
return args;
}
export const transformReply = transformReplyString;

View File

@@ -0,0 +1,23 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XGROUP_CREATECONSUMER';
describe('XGROUP CREATECONSUMER', () => {
it('transformArguments', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer'),
['XGROUP', 'CREATECONSUMER', 'key', 'group', 'consumer']
);
});
itWithClient(TestRedisServers.OPEN, 'client.xGroupCreateConsumer', async client => {
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
});
assert.equal(
await client.xGroupCreateConsumer('key', 'group', 'consumer'),
true
);
});
});

View File

@@ -0,0 +1,9 @@
import { transformReplyBoolean } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
export function transformArguments(key: string, group: string, consumer: string): Array<string> {
return ['XGROUP', 'CREATECONSUMER', key, group, consumer];
}
export const transformReply = transformReplyBoolean;

View File

@@ -0,0 +1,23 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XGROUP_DELCONSUMER';
describe('XGROUP DELCONSUMER', () => {
it('transformArguments', () => {
assert.deepEqual(
transformArguments('key', 'group', 'consumer'),
['XGROUP', 'DELCONSUMER', 'key', 'group', 'consumer']
);
});
itWithClient(TestRedisServers.OPEN, 'client.xGroupDelConsumer', async client => {
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
});
assert.equal(
await client.xGroupDelConsumer('key', 'group', 'consumer'),
0
);
});
});

View File

@@ -0,0 +1,9 @@
import { transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
export function transformArguments(key: string, group: string, consumer: string): Array<string> {
return ['XGROUP', 'DELCONSUMER', key, group, consumer];
}
export const transformReply = transformReplyNumber;

View File

@@ -0,0 +1,23 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XGROUP_DESTROY';
describe('XGROUP DESTROY', () => {
describe('transformArguments', () => {
assert.deepEqual(
transformArguments('key', 'group'),
['XGROUP', 'DESTROY', 'key', 'group']
);
});
itWithClient(TestRedisServers.OPEN, 'client.xGroupDestroy', async client => {
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
});
assert.equal(
await client.xGroupDestroy('key', 'group'),
true
);
});
});

View File

@@ -0,0 +1,9 @@
import { transformReplyBoolean, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
export function transformArguments(key: string, group: string): Array<string> {
return ['XGROUP', 'DESTROY', key, group];
}
export const transformReply = transformReplyBoolean;

View File

@@ -0,0 +1,23 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XGROUP_SETID';
describe('XGROUP SETID', () => {
describe('transformArguments', () => {
assert.deepEqual(
transformArguments('key', 'group', '0'),
['XGROUP', 'SETID', 'key', 'group', '0']
);
});
itWithClient(TestRedisServers.OPEN, 'client.xGroupSetId', async client => {
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
});
assert.equal(
await client.xGroupSetId('key', 'group', '0'),
'OK'
);
});
});

View File

@@ -0,0 +1,9 @@
import { transformReplyString } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
export function transformArguments(key: string, group: string, id: string): Array<string> {
return ['XGROUP', 'SETID', key, group, id];
}
export const transformReply = transformReplyString;

View File

@@ -0,0 +1,41 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments, transformReply } from './XINFO_CONSUMERS';
describe('XINFO CONSUMERS', () => {
describe('transformArguments', () => {
assert.deepEqual(
transformArguments('key', 'group'),
['XINFO', 'CONSUMERS', 'key', 'group']
);
});
it('transformReply', () => {
assert.deepEqual(
transformReply([
['name', 'Alice', 'pending', 1, 'idle', 9104628],
['name', 'Bob', 'pending', 1, 'idle', 83841983]
]),
[{
name: 'Alice',
pending: 1,
idle: 9104628
}, {
name: 'Bob',
pending: 1,
idle: 83841983
}]
);
})
itWithClient(TestRedisServers.OPEN, 'client.xInfoConsumers', async client => {
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
});
assert.deepEqual(
await client.xInfoConsumers('key', 'group'),
[]
);
});
});

View File

@@ -0,0 +1,21 @@
export const FIRST_KEY_INDEX = 2;
export const IS_READ_ONLY = true;
export function transformArguments(key: string, group: string): Array<string> {
return ['XINFO', 'CONSUMERS', key, group];
}
type XInfoConsumersReply = Array<{
name: string;
pending: number;
idle: number;
}>;
export function transformReply(rawReply: Array<any>): XInfoConsumersReply {
return rawReply.map(consumer => ({
name: consumer[1],
pending: consumer[3],
idle: consumer[5]
}));
}

View File

@@ -0,0 +1,48 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments, transformReply } from './XINFO_GROUPS';
describe('XINFO GROUPS', () => {
describe('transformArguments', () => {
assert.deepEqual(
transformArguments('key'),
['XINFO', 'GROUPS', 'key']
);
});
it('transformReply', () => {
assert.deepEqual(
transformReply([
['name', 'mygroup', 'consumers', 2, 'pending', 2, 'last-delivered-id', '1588152489012-0'],
['name', 'some-other-group', 'consumers', 1, 'pending', 0, 'last-delivered-id', '1588152498034-0']
]),
[{
name: 'mygroup',
consumers: 2,
pending: 2,
lastDeliveredId: '1588152489012-0'
}, {
name: 'some-other-group',
consumers: 1,
pending: 0,
lastDeliveredId: '1588152498034-0'
}]
);
})
itWithClient(TestRedisServers.OPEN, 'client.xInfoGroups', async client => {
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
});
assert.deepEqual(
await client.xInfoGroups('key'),
[{
name: 'group',
consumers: 0,
pending: 0,
lastDeliveredId: '0-0'
}]
);
});
});

View File

@@ -0,0 +1,23 @@
export const FIRST_KEY_INDEX = 2;
export const IS_READ_ONLY = true;
export function transformArguments(key: string): Array<string> {
return ['XINFO', 'GROUPS', key];
}
type XInfoGroupsReply = Array<{
name: string;
consumers: number;
pending: number;
lastDeliveredId: string;
}>;
export function transformReply(rawReply: Array<any>): XInfoGroupsReply {
return rawReply.map(group => ({
name: group[1],
consumers: group[3],
pending: group[5],
lastDeliveredId: group[7]
}));
}

View File

@@ -0,0 +1,73 @@
import { strict as assert } from 'assert';
import { triggerAsyncId } from 'async_hooks';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments, transformReply } from './XINFO_STREAM';
describe('XINFO STREAM', () => {
it('transformArguments', () => {
assert.deepEqual(
transformArguments('key'),
['XINFO', 'STREAM', 'key']
);
});
it('transformReply', () => {
assert.deepEqual(
transformReply([
'length', 2,
'radix-tree-keys', 1,
'radix-tree-nodes', 2,
'last-generated-id', '1538385846314-0',
'groups', 2,
'first-entry', ['1538385820729-0', ['foo', 'bar']],
'last-entry', ['1538385846314-0', ['field', 'value']]
]),
{
length: 2,
radixTreeKeys: 1,
radixTreeNodes: 2,
groups: 2,
lastGeneratedId: '1538385846314-0',
firstEntry: {
id: '1538385820729-0',
message: Object.create(null, {
foo: {
value: 'bar',
configurable: true,
enumerable: true
}
})
},
lastEntry: {
id: '1538385846314-0',
message: Object.create(null, {
field: {
value: 'value',
configurable: true,
enumerable: true
}
})
}
}
);
});
itWithClient(TestRedisServers.OPEN, 'client.xInfoStream', async client => {
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
});
assert.deepEqual(
await client.xInfoStream('key'),
{
length: 0,
radixTreeKeys: 0,
radixTreeNodes: 1,
groups: 1,
lastGeneratedId: '0-0',
firstEntry: null,
lastEntry: null
}
);
});
});

View File

@@ -0,0 +1,37 @@
import { StreamMessageReply, transformReplyStreamMessage } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
export const IS_READ_ONLY = true;
export function transformArguments(key: string): Array<string> {
return ['XINFO', 'STREAM', key];
}
interface XInfoStreamReply {
length: number;
radixTreeKeys: number;
radixTreeNodes: number;
groups: number;
lastGeneratedId: string;
firstEntry: StreamMessageReply | null;
lastEntry: StreamMessageReply | null;
};
export function transformReply(reply: Array<any>): XInfoStreamReply {
return {
length: reply[1],
radixTreeKeys: reply[3],
radixTreeNodes: reply[5],
lastGeneratedId: reply[7],
groups: reply[9],
firstEntry: reply[11] ? {
id: reply[11][0] ?? null,
message: transformReplyStreamMessage(reply[11][1])
} : null,
lastEntry: reply[13] ? {
id: reply[13][0],
message: transformReplyStreamMessage(reply[13][1])
} : null
};
}

19
lib/commands/XLEN.spec.ts Normal file
View File

@@ -0,0 +1,19 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XLEN';
describe('XLEN', () => {
it('transformArguments', () => {
assert.deepEqual(
transformArguments('key'),
['XLEN', 'key']
);
});
itWithClient(TestRedisServers.OPEN, 'client.xLen', async client => {
assert.equal(
await client.xLen('key'),
0
);
});
});

11
lib/commands/XLEN.ts Normal file
View File

@@ -0,0 +1,11 @@
import { transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(key: string): Array<string> {
return ['XLEN', key];
}
export const transformReply = transformReplyNumber;

View File

@@ -0,0 +1,30 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XPENDING';
describe('XPENDING', () => {
describe('transformArguments', () => {
it('transformArguments', () => {
assert.deepEqual(
transformArguments('key', 'group'),
['XPENDING', 'key', 'group']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xPending', async client => {
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
});
assert.deepEqual(
await client.xPending('key', 'group'),
{
pending: 0,
firstId: null,
lastId: null,
consumers: null
}
);
});
});

23
lib/commands/XPENDING.ts Normal file
View File

@@ -0,0 +1,23 @@
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(key: string, group: string): Array<string> {
return ['XPENDING', key, group];
}
interface XPendingReply {
pending: number;
firstId: string | null;
lastId: number | null
consumers: Array<string> | null;
}
export function transformReply(reply: [number, string | null, number | null, Array<string> | null]): XPendingReply {
return {
pending: reply[0],
firstId: reply[1],
lastId: reply[2],
consumers: reply[3]
};
}

View File

@@ -0,0 +1,53 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XPENDING_RANGE';
describe('XPENDING RANGE', () => {
describe('transformArguments', () => {
it('simple', () => {
assert.deepEqual(
transformArguments('key', 'group', '-', '+', 1),
['XPENDING', 'key', 'group', '-', '+', '1']
);
});
it('with IDLE', () => {
assert.deepEqual(
transformArguments('key', 'group', '-', '+', 1, {
IDLE: 1,
}),
['XPENDING', 'key', 'group', 'IDLE', '1', '-', '+', '1']
);
});
it('with consumer', () => {
assert.deepEqual(
transformArguments('key', 'group', '-', '+', 1, {
consumer: 'consumer'
}),
['XPENDING', 'key', 'group', '-', '+', '1', 'consumer']
);
});
it('with IDLE, consumer', () => {
assert.deepEqual(
transformArguments('key', 'group', '-', '+', 1, {
IDLE: 1,
consumer: 'consumer'
}),
['XPENDING', 'key', 'group', 'IDLE', '1', '-', '+', '1', 'consumer']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xPendingRange', async client => {
await client.xGroupCreate('key', 'group', '$', {
MKSTREAM: true
});
assert.deepEqual(
await client.xPendingRange('key', 'group', '-', '+', 1),
[]
);
});
});

View File

@@ -0,0 +1,42 @@
import { transformReplyStreamMessages } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
interface XPendingRangeOptions {
IDLE?: number;
consumer?: string;
}
export function transformArguments(
key: string,
group: string,
start: string,
end: string,
count: number,
options?: XPendingRangeOptions
): Array<string> {
const args = ['XPENDING', key, group];
if (options?.IDLE) {
args.push('IDLE', options.IDLE.toString());
}
args.push(start, end, count.toString());
if (options?.consumer) {
args.push(options.consumer);
}
return args;
}
interface XPendingReply {
messageId: string;
owner: string;
msSinceLastDelivery: number;
deliveriesCounter: number;
}
export const transformReply = transformReplyStreamMessages;

View File

@@ -0,0 +1,30 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XRANGE';
describe('XRANGE', () => {
describe('transformArguments', () => {
it('simple', () => {
assert.deepEqual(
transformArguments('key', '-', '+'),
['XRANGE', 'key', '-', '+']
);
});
it('with COUNT', () => {
assert.deepEqual(
transformArguments('key', '-', '+', {
COUNT: 1
}),
['XRANGE', 'key', '-', '+', 'COUNT', '1']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xRange', async client => {
assert.deepEqual(
await client.xRange('key', '+', '-'),
[]
);
});
});

21
lib/commands/XRANGE.ts Normal file
View File

@@ -0,0 +1,21 @@
import { transformReplyStreamMessages } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
interface XRangeOptions {
COUNT?: number;
}
export function transformArguments(key: string, start: string, end: string, options?: XRangeOptions): Array<string> {
const args = ['XRANGE', key, start, end];
if (options?.COUNT) {
args.push('COUNT', options.COUNT.toString());
}
return args;
}
export const transformReply = transformReplyStreamMessages;

View File

@@ -0,0 +1,30 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XREVRANGE';
describe('XREVRANGE', () => {
describe('transformArguments', () => {
it('simple', () => {
assert.deepEqual(
transformArguments('key', '-', '+'),
['XREVRANGE', 'key', '-', '+']
);
});
it('with COUNT', () => {
assert.deepEqual(
transformArguments('key', '-', '+', {
COUNT: 1
}),
['XREVRANGE', 'key', '-', '+', 'COUNT', '1']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xRevRange', async client => {
assert.deepEqual(
await client.xRevRange('key', '+', '-'),
[]
);
});
});

17
lib/commands/XREVRANGE.ts Normal file
View File

@@ -0,0 +1,17 @@
import { transformReplyStreamMessages } from './generic-transformers';
interface XRangeRevOptions {
COUNT?: number;
}
export function transformArguments(key: string, start: string, end: string, options?: XRangeRevOptions): Array<string> {
const args = ['XREVRANGE', key, start, end];
if (options?.COUNT) {
args.push('COUNT', options.COUNT.toString());
}
return args;
}
export const transformReply = transformReplyStreamMessages;

View File

@@ -0,0 +1,49 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient } from '../test-utils';
import { transformArguments } from './XTRIM';
describe('XTRIM', () => {
it('transformArguments', () => {
it('simple', () => {
assert.deepEqual(
transformArguments('key', 'MAXLEN', 1),
['XTRIM', 'key', 'MAXLEN', '1']
);
});
it('with strategyModifier', () => {
assert.deepEqual(
transformArguments('key', 'MAXLEN', 1, {
strategyModifier: '='
}),
['XTRIM', 'key', 'MAXLEN', '=', '1']
);
});
it('with LIMIT', () => {
assert.deepEqual(
transformArguments('key', 'MAXLEN', 1, {
LIMIT: 1
}),
['XTRIM', 'key', 'MAXLEN', '1', 'LIMIT', '1']
);
});
it('with strategyModifier, LIMIT', () => {
assert.deepEqual(
transformArguments('key', 'MAXLEN', 1, {
strategyModifier: '=',
LIMIT: 1
}),
['XTRIM', 'key', 'MAXLEN', '=', '1', 'LIMIT', '1']
);
});
});
itWithClient(TestRedisServers.OPEN, 'client.xTrim', async client => {
assert.deepEqual(
await client.xTrim('key', 'MAXLEN', 1),
[]
);
});
});

26
lib/commands/XTRIM.ts Normal file
View File

@@ -0,0 +1,26 @@
import { transformReplyStreamMessages } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
interface XTrimOptions {
strategyModifier?: '=' | '~';
LIMIT?: number;
}
export function transformArguments(key: string, strategy: 'MAXLEN' | 'MINID', threshold: number, options?: XTrimOptions): Array<string> {
const args = ['XTRIM', key, strategy];
if (options?.strategyModifier) {
args.push(options.strategyModifier);
}
args.push(threshold.toString());
if (options?.LIMIT) {
args.push('LIMIT', options.LIMIT.toString());
}
return args;
}
export const transformReply = transformReplyStreamMessages;

View File

@@ -73,6 +73,27 @@ import * as TTL from './TTL';
import * as TYPE from './TYPE';
import * as UNLINK from './UNLINK';
import * as WAIT from './WAIT';
import * as XACK from './XACK';
import * as XADD from './XADD';
import * as XAUTOCLAIM_JUSTID from './XAUTOCLAIM_JUSTID';
import * as XAUTOCLAIM from './XAUTOCLAIM';
import * as XCLAIM from './XCLAIM';
import * as XCLAIM_JUSTID from './XCLAIM_JUSTID';
import * as XDEL from './XDEL';
import * as XGROUP_CREATE from './XGROUP_CREATE';
import * as XGROUP_CREATECONSUMER from './XGROUP_CREATECONSUMER';
import * as XGROUP_DELCONSUMER from './XGROUP_DELCONSUMER';
import * as XGROUP_DESTROY from './XGROUP_DESTROY';
import * as XGROUP_SETID from './XGROUP_SETID';
import * as XINFO_CONSUMERS from './XINFO_CONSUMERS';
import * as XINFO_GROUPS from './XINFO_GROUPS';
import * as XINFO_STREAM from './XINFO_STREAM';
import * as XLEN from './XLEN';
import * as XPENDING_RANGE from './XPENDING_RANGE';
import * as XPENDING from './XPENDING';
import * as XRANGE from './XRANGE';
import * as XREVRANGE from './XREVRANGE';
import * as XTRIM from './XTRIM';
import * as ZADD from './ZADD';
import * as ZCARD from './ZCARD';
import * as ZCOUNT from './ZCOUNT';
@@ -250,6 +271,48 @@ export default {
unlink: UNLINK,
WAIT,
wait: WAIT,
XACK,
xAck: XACK,
XADD,
xAdd: XADD,
XAUTOCLAIM_JUSTID,
xAutoClaimJustId: XAUTOCLAIM_JUSTID,
XAUTOCLAIM,
xAutoClaim: XAUTOCLAIM,
XCLAIM,
xClaim: XCLAIM,
XCLAIM_JUSTID,
xClaimJustId: XCLAIM_JUSTID,
XDEL,
xDel: XDEL,
XGROUP_CREATE,
xGroupCreate: XGROUP_CREATE,
XGROUP_CREATECONSUMER,
xGroupCreateConsumer: XGROUP_CREATECONSUMER,
XGROUP_DELCONSUMER,
xGroupDelConsumer: XGROUP_DELCONSUMER,
XGROUP_DESTROY,
xGroupDestroy: XGROUP_DESTROY,
XGROUP_SETID,
xGroupSetId: XGROUP_SETID,
XINFO_CONSUMERS,
xInfoConsumers: XINFO_CONSUMERS,
XINFO_GROUPS,
xInfoGroups: XINFO_GROUPS,
XINFO_STREAM,
xInfoStream: XINFO_STREAM,
XLEN,
xLen: XLEN,
XPENDING_RANGE,
xPendingRange: XPENDING_RANGE,
XPENDING,
xPending: XPENDING,
XRANGE,
xRange: XRANGE,
XREVRANGE,
xRevRange: XREVRANGE,
XTRIM,
xTrim: XTRIM,
ZADD,
zAdd: ZADD,
ZCARD,