You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-07-31 05:44:24 +03:00
Add Redis 8.2 New Stream Commands (#3029)
* chore: update Redis version from 8.2-RC1-pre to 8.2-rc1 * feat: implement XDELEX command for Redis 8.2 * feat: implement XACKDEL command for Redis 8.2 * refactor: create shared stream deletion types for Redis 8.2 commands * feat: add Redis 8.2 deletion policies to XTRIM command * feat: add Redis 8.2 deletion policies to XADD commands * fix: correct XDELEX command method name and test parameter
This commit is contained in:
@ -4,7 +4,7 @@ import RedisBloomModules from '.';
|
||||
export default TestUtils.createFromConfig({
|
||||
dockerImageName: 'redislabs/client-libs-test',
|
||||
dockerImageVersionArgument: 'redis-version',
|
||||
defaultDockerVersion: '8.2-M01-pre'
|
||||
defaultDockerVersion: '8.2-rc1'
|
||||
});
|
||||
|
||||
export const GLOBAL = {
|
||||
|
196
packages/client/lib/commands/XACKDEL.spec.ts
Normal file
196
packages/client/lib/commands/XACKDEL.spec.ts
Normal file
@ -0,0 +1,196 @@
|
||||
import { strict as assert } from "node:assert";
|
||||
import XACKDEL from "./XACKDEL";
|
||||
import { parseArgs } from "./generic-transformers";
|
||||
import testUtils, { GLOBAL } from "../test-utils";
|
||||
import {
|
||||
STREAM_DELETION_POLICY,
|
||||
STREAM_DELETION_REPLY_CODES,
|
||||
} from "./common-stream.types";
|
||||
|
||||
describe("XACKDEL", () => {
|
||||
describe("transformArguments", () => {
|
||||
it("string - without policy", () => {
|
||||
assert.deepEqual(parseArgs(XACKDEL, "key", "group", "0-0"), [
|
||||
"XACKDEL",
|
||||
"key",
|
||||
"group",
|
||||
"IDS",
|
||||
"1",
|
||||
"0-0",
|
||||
]);
|
||||
});
|
||||
|
||||
it("string - with policy", () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(
|
||||
XACKDEL,
|
||||
"key",
|
||||
"group",
|
||||
"0-0",
|
||||
STREAM_DELETION_POLICY.KEEPREF
|
||||
),
|
||||
["XACKDEL", "key", "group", "KEEPREF", "IDS", "1", "0-0"]
|
||||
);
|
||||
});
|
||||
|
||||
it("array - without policy", () => {
|
||||
assert.deepEqual(parseArgs(XACKDEL, "key", "group", ["0-0", "1-0"]), [
|
||||
"XACKDEL",
|
||||
"key",
|
||||
"group",
|
||||
"IDS",
|
||||
"2",
|
||||
"0-0",
|
||||
"1-0",
|
||||
]);
|
||||
});
|
||||
|
||||
it("array - with policy", () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(
|
||||
XACKDEL,
|
||||
"key",
|
||||
"group",
|
||||
["0-0", "1-0"],
|
||||
STREAM_DELETION_POLICY.DELREF
|
||||
),
|
||||
["XACKDEL", "key", "group", "DELREF", "IDS", "2", "0-0", "1-0"]
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
testUtils.testAll(
|
||||
`XACKDEL non-existing key - without policy`,
|
||||
async (client) => {
|
||||
const reply = await client.xAckDel("{tag}stream-key", "testgroup", "0-0");
|
||||
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
`XACKDEL existing key - without policy`,
|
||||
async (client) => {
|
||||
const streamKey = "{tag}stream-key";
|
||||
const groupName = "testgroup";
|
||||
|
||||
// create consumer group, stream and message
|
||||
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
|
||||
const messageId = await client.xAdd(streamKey, "*", { field: "value" });
|
||||
|
||||
// read message
|
||||
await client.xReadGroup(groupName, "testconsumer", {
|
||||
key: streamKey,
|
||||
id: ">",
|
||||
});
|
||||
|
||||
const reply = await client.xAckDel(streamKey, groupName, messageId);
|
||||
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
`XACKDEL existing key - with policy`,
|
||||
async (client) => {
|
||||
const streamKey = "{tag}stream-key";
|
||||
const groupName = "testgroup";
|
||||
|
||||
// create consumer group, stream and message
|
||||
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
|
||||
const messageId = await client.xAdd(streamKey, "*", { field: "value" });
|
||||
|
||||
// read message
|
||||
await client.xReadGroup(groupName, "testconsumer", {
|
||||
key: streamKey,
|
||||
id: ">",
|
||||
});
|
||||
|
||||
const reply = await client.xAckDel(
|
||||
streamKey,
|
||||
groupName,
|
||||
messageId,
|
||||
STREAM_DELETION_POLICY.DELREF
|
||||
);
|
||||
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
`XACKDEL acknowledge policy - with consumer group`,
|
||||
async (client) => {
|
||||
const streamKey = "{tag}stream-key";
|
||||
const groupName = "testgroup";
|
||||
|
||||
// create consumer groups, stream and message
|
||||
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
|
||||
await client.xGroupCreate(streamKey, "some-other-group", "0");
|
||||
const messageId = await client.xAdd(streamKey, "*", { field: "value" });
|
||||
|
||||
// read message
|
||||
await client.xReadGroup(groupName, "testconsumer", {
|
||||
key: streamKey,
|
||||
id: ">",
|
||||
});
|
||||
|
||||
const reply = await client.xAckDel(
|
||||
streamKey,
|
||||
groupName,
|
||||
messageId,
|
||||
STREAM_DELETION_POLICY.ACKED
|
||||
);
|
||||
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
`XACKDEL multiple keys`,
|
||||
async (client) => {
|
||||
const streamKey = "{tag}stream-key";
|
||||
const groupName = "testgroup";
|
||||
|
||||
// create consumer groups, stream and add messages
|
||||
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
|
||||
const messageIds = await Promise.all([
|
||||
client.xAdd(streamKey, "*", { field: "value1" }),
|
||||
client.xAdd(streamKey, "*", { field: "value2" }),
|
||||
]);
|
||||
|
||||
// read messages
|
||||
await client.xReadGroup(groupName, "testconsumer", {
|
||||
key: streamKey,
|
||||
id: ">",
|
||||
});
|
||||
|
||||
const reply = await client.xAckDel(
|
||||
streamKey,
|
||||
groupName,
|
||||
[...messageIds, "0-0"],
|
||||
STREAM_DELETION_POLICY.DELREF
|
||||
);
|
||||
assert.deepEqual(reply, [
|
||||
STREAM_DELETION_REPLY_CODES.DELETED,
|
||||
STREAM_DELETION_REPLY_CODES.DELETED,
|
||||
STREAM_DELETION_REPLY_CODES.NOT_FOUND,
|
||||
]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
});
|
45
packages/client/lib/commands/XACKDEL.ts
Normal file
45
packages/client/lib/commands/XACKDEL.ts
Normal file
@ -0,0 +1,45 @@
|
||||
import { CommandParser } from "../client/parser";
|
||||
import { RedisArgument, ArrayReply, Command } from "../RESP/types";
|
||||
import {
|
||||
StreamDeletionReplyCode,
|
||||
StreamDeletionPolicy,
|
||||
} from "./common-stream.types";
|
||||
import { RedisVariadicArgument } from "./generic-transformers";
|
||||
|
||||
/**
|
||||
* Acknowledges and deletes one or multiple messages for a stream consumer group
|
||||
*/
|
||||
export default {
|
||||
IS_READ_ONLY: false,
|
||||
/**
|
||||
* Constructs the XACKDEL command to acknowledge and delete one or multiple messages for a stream consumer group
|
||||
*
|
||||
* @param parser - The command parser
|
||||
* @param key - The stream key
|
||||
* @param group - The consumer group name
|
||||
* @param id - One or more message IDs to acknowledge and delete
|
||||
* @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF)
|
||||
* @returns Array of integers: -1 (not found), 1 (acknowledged and deleted), 2 (acknowledged with dangling refs)
|
||||
* @see https://redis.io/commands/xackdel/
|
||||
*/
|
||||
parseCommand(
|
||||
parser: CommandParser,
|
||||
key: RedisArgument,
|
||||
group: RedisArgument,
|
||||
id: RedisVariadicArgument,
|
||||
policy?: StreamDeletionPolicy
|
||||
) {
|
||||
parser.push("XACKDEL");
|
||||
parser.pushKey(key);
|
||||
parser.push(group);
|
||||
|
||||
if (policy) {
|
||||
parser.push(policy);
|
||||
}
|
||||
|
||||
parser.push("IDS");
|
||||
parser.pushVariadicWithLength(id);
|
||||
},
|
||||
transformReply:
|
||||
undefined as unknown as () => ArrayReply<StreamDeletionReplyCode>,
|
||||
} as const satisfies Command;
|
@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert';
|
||||
import testUtils, { GLOBAL } from '../test-utils';
|
||||
import XADD from './XADD';
|
||||
import { parseArgs } from './generic-transformers';
|
||||
import { STREAM_DELETION_POLICY } from './common-stream.types';
|
||||
|
||||
describe('XADD', () => {
|
||||
describe('transformArguments', () => {
|
||||
@ -78,6 +79,37 @@ describe('XADD', () => {
|
||||
['XADD', 'key', '1000', 'LIMIT', '1', '*', 'field', 'value']
|
||||
);
|
||||
});
|
||||
|
||||
it('with TRIM.policy', () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XADD, 'key', '*', {
|
||||
field: 'value'
|
||||
}, {
|
||||
TRIM: {
|
||||
threshold: 1000,
|
||||
policy: STREAM_DELETION_POLICY.DELREF
|
||||
}
|
||||
}),
|
||||
['XADD', 'key', '1000', 'DELREF', '*', 'field', 'value']
|
||||
);
|
||||
});
|
||||
|
||||
it('with all TRIM options', () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XADD, 'key', '*', {
|
||||
field: 'value'
|
||||
}, {
|
||||
TRIM: {
|
||||
strategy: 'MAXLEN',
|
||||
strategyModifier: '~',
|
||||
threshold: 1000,
|
||||
limit: 100,
|
||||
policy: STREAM_DELETION_POLICY.ACKED
|
||||
}
|
||||
}),
|
||||
['XADD', 'key', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value']
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
testUtils.testAll('xAdd', async client => {
|
||||
@ -91,4 +123,52 @@ describe('XADD', () => {
|
||||
client: GLOBAL.SERVERS.OPEN,
|
||||
cluster: GLOBAL.CLUSTERS.OPEN
|
||||
});
|
||||
|
||||
testUtils.testAll(
|
||||
'xAdd with TRIM policy',
|
||||
async (client) => {
|
||||
assert.equal(
|
||||
typeof await client.xAdd('{tag}key', '*',
|
||||
{ field: 'value' },
|
||||
{
|
||||
TRIM: {
|
||||
strategy: 'MAXLEN',
|
||||
threshold: 1000,
|
||||
policy: STREAM_DELETION_POLICY.KEEPREF
|
||||
}
|
||||
}
|
||||
),
|
||||
'string'
|
||||
);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
'xAdd with all TRIM options',
|
||||
async (client) => {
|
||||
assert.equal(
|
||||
typeof await client.xAdd('{tag}key2', '*',
|
||||
{ field: 'value' },
|
||||
{
|
||||
TRIM: {
|
||||
strategy: 'MAXLEN',
|
||||
strategyModifier: '~',
|
||||
threshold: 1000,
|
||||
limit: 10,
|
||||
policy: STREAM_DELETION_POLICY.DELREF
|
||||
}
|
||||
}
|
||||
),
|
||||
'string'
|
||||
);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
});
|
||||
|
@ -1,5 +1,6 @@
|
||||
import { CommandParser } from '../client/parser';
|
||||
import { RedisArgument, BlobStringReply, Command } from '../RESP/types';
|
||||
import { StreamDeletionPolicy } from './common-stream.types';
|
||||
import { Tail } from './generic-transformers';
|
||||
|
||||
/**
|
||||
@ -10,6 +11,7 @@ import { Tail } from './generic-transformers';
|
||||
* @property TRIM.strategyModifier - Exact ('=') or approximate ('~') trimming
|
||||
* @property TRIM.threshold - Maximum stream length or minimum ID to retain
|
||||
* @property TRIM.limit - Maximum number of entries to trim in one call
|
||||
* @property TRIM.policy - Policy to apply when trimming entries (optional, defaults to KEEPREF)
|
||||
*/
|
||||
export interface XAddOptions {
|
||||
TRIM?: {
|
||||
@ -17,6 +19,8 @@ export interface XAddOptions {
|
||||
strategyModifier?: '=' | '~';
|
||||
threshold: number;
|
||||
limit?: number;
|
||||
/** added in 8.2 */
|
||||
policy?: StreamDeletionPolicy;
|
||||
};
|
||||
}
|
||||
|
||||
@ -58,6 +62,10 @@ export function parseXAddArguments(
|
||||
if (options.TRIM.limit) {
|
||||
parser.push('LIMIT', options.TRIM.limit.toString());
|
||||
}
|
||||
|
||||
if (options.TRIM.policy) {
|
||||
parser.push(options.TRIM.policy);
|
||||
}
|
||||
}
|
||||
|
||||
parser.push(id);
|
||||
|
@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert';
|
||||
import testUtils, { GLOBAL } from '../test-utils';
|
||||
import XADD_NOMKSTREAM from './XADD_NOMKSTREAM';
|
||||
import { parseArgs } from './generic-transformers';
|
||||
import { STREAM_DELETION_POLICY } from './common-stream.types';
|
||||
|
||||
describe('XADD NOMKSTREAM', () => {
|
||||
testUtils.isVersionGreaterThanHook([6, 2]);
|
||||
@ -80,17 +81,82 @@ describe('XADD NOMKSTREAM', () => {
|
||||
['XADD', 'key', 'NOMKSTREAM', '1000', 'LIMIT', '1', '*', 'field', 'value']
|
||||
);
|
||||
});
|
||||
|
||||
it('with TRIM.policy', () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XADD_NOMKSTREAM, 'key', '*', {
|
||||
field: 'value'
|
||||
}, {
|
||||
TRIM: {
|
||||
threshold: 1000,
|
||||
policy: STREAM_DELETION_POLICY.DELREF
|
||||
}
|
||||
}),
|
||||
['XADD', 'key', 'NOMKSTREAM', '1000', 'DELREF', '*', 'field', 'value']
|
||||
);
|
||||
});
|
||||
|
||||
it('with all TRIM options', () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XADD_NOMKSTREAM, 'key', '*', {
|
||||
field: 'value'
|
||||
}, {
|
||||
TRIM: {
|
||||
strategy: 'MAXLEN',
|
||||
strategyModifier: '~',
|
||||
threshold: 1000,
|
||||
limit: 100,
|
||||
policy: STREAM_DELETION_POLICY.ACKED
|
||||
}
|
||||
}),
|
||||
['XADD', 'key', 'NOMKSTREAM', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value']
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
testUtils.testAll('xAddNoMkStream', async client => {
|
||||
assert.equal(
|
||||
await client.xAddNoMkStream('key', '*', {
|
||||
field: 'value'
|
||||
}),
|
||||
null
|
||||
);
|
||||
}, {
|
||||
client: GLOBAL.SERVERS.OPEN,
|
||||
cluster: GLOBAL.CLUSTERS.OPEN
|
||||
});
|
||||
testUtils.testAll(
|
||||
'xAddNoMkStream - null when stream does not exist',
|
||||
async (client) => {
|
||||
assert.equal(
|
||||
await client.xAddNoMkStream('{tag}nonexistent-stream', '*', {
|
||||
field: 'value'
|
||||
}),
|
||||
null
|
||||
);
|
||||
},
|
||||
{
|
||||
client: GLOBAL.SERVERS.OPEN,
|
||||
cluster: GLOBAL.CLUSTERS.OPEN,
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
'xAddNoMkStream - with all TRIM options',
|
||||
async (client) => {
|
||||
const streamKey = '{tag}stream';
|
||||
|
||||
// Create stream and add some messages
|
||||
await client.xAdd(streamKey, '*', { field: 'value1' });
|
||||
|
||||
// Use NOMKSTREAM with all TRIM options
|
||||
const messageId = await client.xAddNoMkStream(streamKey, '*',
|
||||
{ field: 'value2' },
|
||||
{
|
||||
TRIM: {
|
||||
strategyModifier: '~',
|
||||
limit: 1,
|
||||
strategy: 'MAXLEN',
|
||||
threshold: 2,
|
||||
policy: STREAM_DELETION_POLICY.DELREF
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
assert.equal(typeof messageId, 'string');
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
});
|
||||
|
156
packages/client/lib/commands/XDELEX.spec.ts
Normal file
156
packages/client/lib/commands/XDELEX.spec.ts
Normal file
@ -0,0 +1,156 @@
|
||||
import { strict as assert } from "node:assert";
|
||||
import XDELEX from "./XDELEX";
|
||||
import { parseArgs } from "./generic-transformers";
|
||||
import testUtils, { GLOBAL } from "../test-utils";
|
||||
import {
|
||||
STREAM_DELETION_POLICY,
|
||||
STREAM_DELETION_REPLY_CODES,
|
||||
} from "./common-stream.types";
|
||||
|
||||
describe("XDELEX", () => {
|
||||
describe("transformArguments", () => {
|
||||
it("string - without policy", () => {
|
||||
assert.deepEqual(parseArgs(XDELEX, "key", "0-0"), [
|
||||
"XDELEX",
|
||||
"key",
|
||||
"IDS",
|
||||
"1",
|
||||
"0-0",
|
||||
]);
|
||||
});
|
||||
|
||||
it("string - with policy", () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XDELEX, "key", "0-0", STREAM_DELETION_POLICY.KEEPREF),
|
||||
["XDELEX", "key", "KEEPREF", "IDS", "1", "0-0"]
|
||||
);
|
||||
});
|
||||
|
||||
it("array - without policy", () => {
|
||||
assert.deepEqual(parseArgs(XDELEX, "key", ["0-0", "1-0"]), [
|
||||
"XDELEX",
|
||||
"key",
|
||||
"IDS",
|
||||
"2",
|
||||
"0-0",
|
||||
"1-0",
|
||||
]);
|
||||
});
|
||||
|
||||
it("array - with policy", () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XDELEX, "key", ["0-0", "1-0"], STREAM_DELETION_POLICY.DELREF),
|
||||
["XDELEX", "key", "DELREF", "IDS", "2", "0-0", "1-0"]
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
testUtils.testAll(
|
||||
`XDELEX non-existing key - without policy`,
|
||||
async (client) => {
|
||||
const reply = await client.xDelEx("{tag}stream-key", "0-0");
|
||||
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
`XDELEX existing key - without policy`,
|
||||
async (client) => {
|
||||
const streamKey = "{tag}stream-key";
|
||||
const messageId = await client.xAdd(streamKey, "*", {
|
||||
field: "value",
|
||||
});
|
||||
|
||||
const reply = await client.xDelEx(
|
||||
streamKey,
|
||||
messageId,
|
||||
);
|
||||
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
`XDELEX existing key - with policy`,
|
||||
async (client) => {
|
||||
const streamKey = "{tag}stream-key";
|
||||
const messageId = await client.xAdd(streamKey, "*", {
|
||||
field: "value",
|
||||
});
|
||||
|
||||
const reply = await client.xDelEx(
|
||||
streamKey,
|
||||
messageId,
|
||||
STREAM_DELETION_POLICY.DELREF
|
||||
);
|
||||
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
`XDELEX acknowledge policy - with consumer group`,
|
||||
async (client) => {
|
||||
const streamKey = "{tag}stream-key";
|
||||
|
||||
// Add a message to the stream
|
||||
const messageId = await client.xAdd(streamKey, "*", {
|
||||
field: "value",
|
||||
});
|
||||
|
||||
// Create consumer group
|
||||
await client.xGroupCreate(streamKey, "testgroup", "0");
|
||||
|
||||
const reply = await client.xDelEx(
|
||||
streamKey,
|
||||
messageId,
|
||||
STREAM_DELETION_POLICY.ACKED
|
||||
);
|
||||
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
`XDELEX multiple keys`,
|
||||
async (client) => {
|
||||
const streamKey = "{tag}stream-key";
|
||||
const messageIds = await Promise.all([
|
||||
client.xAdd(streamKey, "*", {
|
||||
field: "value1",
|
||||
}),
|
||||
client.xAdd(streamKey, "*", {
|
||||
field: "value2",
|
||||
}),
|
||||
]);
|
||||
|
||||
const reply = await client.xDelEx(
|
||||
streamKey,
|
||||
[...messageIds, "0-0"],
|
||||
STREAM_DELETION_POLICY.DELREF
|
||||
);
|
||||
assert.deepEqual(reply, [
|
||||
STREAM_DELETION_REPLY_CODES.DELETED,
|
||||
STREAM_DELETION_REPLY_CODES.DELETED,
|
||||
STREAM_DELETION_REPLY_CODES.NOT_FOUND,
|
||||
]);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
});
|
42
packages/client/lib/commands/XDELEX.ts
Normal file
42
packages/client/lib/commands/XDELEX.ts
Normal file
@ -0,0 +1,42 @@
|
||||
import { CommandParser } from "../client/parser";
|
||||
import { RedisArgument, ArrayReply, Command } from "../RESP/types";
|
||||
import {
|
||||
StreamDeletionPolicy,
|
||||
StreamDeletionReplyCode,
|
||||
} from "./common-stream.types";
|
||||
import { RedisVariadicArgument } from "./generic-transformers";
|
||||
|
||||
/**
|
||||
* Deletes one or multiple entries from the stream
|
||||
*/
|
||||
export default {
|
||||
IS_READ_ONLY: false,
|
||||
/**
|
||||
* Constructs the XDELEX command to delete one or multiple entries from the stream
|
||||
*
|
||||
* @param parser - The command parser
|
||||
* @param key - The stream key
|
||||
* @param id - One or more message IDs to delete
|
||||
* @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF)
|
||||
* @returns Array of integers: -1 (not found), 1 (deleted), 2 (dangling refs)
|
||||
* @see https://redis.io/commands/xdelex/
|
||||
*/
|
||||
parseCommand(
|
||||
parser: CommandParser,
|
||||
key: RedisArgument,
|
||||
id: RedisVariadicArgument,
|
||||
policy?: StreamDeletionPolicy
|
||||
) {
|
||||
parser.push("XDELEX");
|
||||
parser.pushKey(key);
|
||||
|
||||
if (policy) {
|
||||
parser.push(policy);
|
||||
}
|
||||
|
||||
parser.push("IDS");
|
||||
parser.pushVariadicWithLength(id);
|
||||
},
|
||||
transformReply:
|
||||
undefined as unknown as () => ArrayReply<StreamDeletionReplyCode>,
|
||||
} as const satisfies Command;
|
@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert';
|
||||
import testUtils, { GLOBAL } from '../test-utils';
|
||||
import XTRIM from './XTRIM';
|
||||
import { parseArgs } from './generic-transformers';
|
||||
import { STREAM_DELETION_POLICY } from './common-stream.types';
|
||||
|
||||
describe('XTRIM', () => {
|
||||
describe('transformArguments', () => {
|
||||
@ -12,6 +13,13 @@ describe('XTRIM', () => {
|
||||
);
|
||||
});
|
||||
|
||||
it('simple - MINID', () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XTRIM, 'key', 'MINID', 123),
|
||||
['XTRIM', 'key', 'MINID', '123']
|
||||
);
|
||||
});
|
||||
|
||||
it('with strategyModifier', () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XTRIM, 'key', 'MAXLEN', 1, {
|
||||
@ -39,15 +47,96 @@ describe('XTRIM', () => {
|
||||
['XTRIM', 'key', 'MAXLEN', '=', '1', 'LIMIT', '1']
|
||||
);
|
||||
});
|
||||
|
||||
it('with policy', () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XTRIM, 'key', 'MAXLEN', 1, {
|
||||
policy: STREAM_DELETION_POLICY.DELREF
|
||||
}),
|
||||
['XTRIM', 'key', 'MAXLEN', '1', 'DELREF']
|
||||
);
|
||||
});
|
||||
|
||||
it('with all options', () => {
|
||||
assert.deepEqual(
|
||||
parseArgs(XTRIM, 'key', 'MAXLEN', 1, {
|
||||
strategyModifier: '~',
|
||||
LIMIT: 100,
|
||||
policy: STREAM_DELETION_POLICY.ACKED
|
||||
}),
|
||||
['XTRIM', 'key', 'MAXLEN', '~', '1', 'LIMIT', '100', 'ACKED']
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
testUtils.testAll('xTrim', async client => {
|
||||
testUtils.testAll('xTrim with MAXLEN', async client => {
|
||||
assert.equal(
|
||||
await client.xTrim('key', 'MAXLEN', 1),
|
||||
0
|
||||
typeof await client.xTrim('key', 'MAXLEN', 1),
|
||||
'number'
|
||||
);
|
||||
}, {
|
||||
client: GLOBAL.SERVERS.OPEN,
|
||||
cluster: GLOBAL.CLUSTERS.OPEN,
|
||||
});
|
||||
|
||||
testUtils.testAll('xTrim with MINID', async client => {
|
||||
assert.equal(
|
||||
typeof await client.xTrim('key', 'MINID', 1),
|
||||
'number'
|
||||
);
|
||||
}, {
|
||||
client: GLOBAL.SERVERS.OPEN,
|
||||
cluster: GLOBAL.CLUSTERS.OPEN,
|
||||
});
|
||||
|
||||
testUtils.testAll(
|
||||
'xTrim with LIMIT',
|
||||
async (client) => {
|
||||
assert.equal(
|
||||
typeof await client.xTrim('{tag}key', 'MAXLEN', 1000, {
|
||||
strategyModifier: '~',
|
||||
LIMIT: 10
|
||||
}),
|
||||
'number'
|
||||
);
|
||||
},
|
||||
{
|
||||
client: GLOBAL.SERVERS.OPEN,
|
||||
cluster: GLOBAL.CLUSTERS.OPEN,
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
'xTrim with policy',
|
||||
async (client) => {
|
||||
assert.equal(
|
||||
typeof await client.xTrim('{tag}key', 'MAXLEN', 0, {
|
||||
policy: STREAM_DELETION_POLICY.DELREF
|
||||
}),
|
||||
'number'
|
||||
);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
|
||||
testUtils.testAll(
|
||||
'xTrim with all options',
|
||||
async (client) => {
|
||||
assert.equal(
|
||||
typeof await client.xTrim('{tag}key', 'MINID', 0, {
|
||||
strategyModifier: '~',
|
||||
LIMIT: 10,
|
||||
policy: STREAM_DELETION_POLICY.KEEPREF
|
||||
}),
|
||||
'number'
|
||||
);
|
||||
},
|
||||
{
|
||||
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
|
||||
}
|
||||
);
|
||||
});
|
||||
|
@ -1,16 +1,20 @@
|
||||
import { CommandParser } from '../client/parser';
|
||||
import { NumberReply, Command, RedisArgument } from '../RESP/types';
|
||||
import { StreamDeletionPolicy } from './common-stream.types';
|
||||
|
||||
/**
|
||||
* Options for the XTRIM command
|
||||
*
|
||||
* @property strategyModifier - Exact ('=') or approximate ('~') trimming
|
||||
* @property LIMIT - Maximum number of entries to trim in one call (Redis 6.2+)
|
||||
* @property policy - Policy to apply when deleting entries (optional, defaults to KEEPREF)
|
||||
*/
|
||||
export interface XTrimOptions {
|
||||
strategyModifier?: '=' | '~';
|
||||
/** added in 6.2 */
|
||||
LIMIT?: number;
|
||||
/** added in 8.2 */
|
||||
policy?: StreamDeletionPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -49,6 +53,10 @@ export default {
|
||||
if (options?.LIMIT) {
|
||||
parser.push('LIMIT', options.LIMIT.toString());
|
||||
}
|
||||
|
||||
if (options?.policy) {
|
||||
parser.push(options.policy);
|
||||
}
|
||||
},
|
||||
transformReply: undefined as unknown as () => NumberReply
|
||||
} as const satisfies Command;
|
||||
|
28
packages/client/lib/commands/common-stream.types.ts
Normal file
28
packages/client/lib/commands/common-stream.types.ts
Normal file
@ -0,0 +1,28 @@
|
||||
/** Common stream deletion policies
|
||||
*
|
||||
* Added in Redis 8.2
|
||||
*/
|
||||
export const STREAM_DELETION_POLICY = {
|
||||
/** Preserve references (default) */
|
||||
KEEPREF: "KEEPREF",
|
||||
/** Delete all references */
|
||||
DELREF: "DELREF",
|
||||
/** Only acknowledged entries */
|
||||
ACKED: "ACKED",
|
||||
} as const;
|
||||
|
||||
export type StreamDeletionPolicy =
|
||||
(typeof STREAM_DELETION_POLICY)[keyof typeof STREAM_DELETION_POLICY];
|
||||
|
||||
/** Common reply codes for stream deletion operations */
|
||||
export const STREAM_DELETION_REPLY_CODES = {
|
||||
/** ID not found */
|
||||
NOT_FOUND: -1,
|
||||
/** Entry deleted */
|
||||
DELETED: 1,
|
||||
/** Dangling references */
|
||||
DANGLING_REFS: 2,
|
||||
} as const;
|
||||
|
||||
export type StreamDeletionReplyCode =
|
||||
(typeof STREAM_DELETION_REPLY_CODES)[keyof typeof STREAM_DELETION_REPLY_CODES];
|
@ -280,6 +280,7 @@ import TYPE from './TYPE';
|
||||
import UNLINK from './UNLINK';
|
||||
import WAIT from './WAIT';
|
||||
import XACK from './XACK';
|
||||
import XACKDEL from './XACKDEL';
|
||||
import XADD_NOMKSTREAM from './XADD_NOMKSTREAM';
|
||||
import XADD from './XADD';
|
||||
import XAUTOCLAIM_JUSTID from './XAUTOCLAIM_JUSTID';
|
||||
@ -287,6 +288,7 @@ import XAUTOCLAIM from './XAUTOCLAIM';
|
||||
import XCLAIM_JUSTID from './XCLAIM_JUSTID';
|
||||
import XCLAIM from './XCLAIM';
|
||||
import XDEL from './XDEL';
|
||||
import XDELEX from './XDELEX';
|
||||
import XGROUP_CREATE from './XGROUP_CREATE';
|
||||
import XGROUP_CREATECONSUMER from './XGROUP_CREATECONSUMER';
|
||||
import XGROUP_DELCONSUMER from './XGROUP_DELCONSUMER';
|
||||
@ -924,6 +926,8 @@ export default {
|
||||
wait: WAIT,
|
||||
XACK,
|
||||
xAck: XACK,
|
||||
XACKDEL,
|
||||
xAckDel: XACKDEL,
|
||||
XADD_NOMKSTREAM,
|
||||
xAddNoMkStream: XADD_NOMKSTREAM,
|
||||
XADD,
|
||||
@ -938,6 +942,8 @@ export default {
|
||||
xClaim: XCLAIM,
|
||||
XDEL,
|
||||
xDel: XDEL,
|
||||
XDELEX,
|
||||
xDelEx: XDELEX,
|
||||
XGROUP_CREATE,
|
||||
xGroupCreate: XGROUP_CREATE,
|
||||
XGROUP_CREATECONSUMER,
|
||||
|
@ -174,7 +174,7 @@ export class SentinelFramework extends DockerBase {
|
||||
this.#testUtils = TestUtils.createFromConfig({
|
||||
dockerImageName: 'redislabs/client-libs-test',
|
||||
dockerImageVersionArgument: 'redis-version',
|
||||
defaultDockerVersion: '8.2-M01-pre'
|
||||
defaultDockerVersion: '8.2-rc1'
|
||||
});
|
||||
this.#nodeMap = new Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelNodes']>>>>();
|
||||
this.#sentinelMap = new Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelSentinels']>>>>();
|
||||
|
@ -9,7 +9,7 @@ import RedisBloomModules from '@redis/bloom';
|
||||
const utils = TestUtils.createFromConfig({
|
||||
dockerImageName: 'redislabs/client-libs-test',
|
||||
dockerImageVersionArgument: 'redis-version',
|
||||
defaultDockerVersion: '8.2-M01-pre'
|
||||
defaultDockerVersion: '8.2-rc1'
|
||||
});
|
||||
|
||||
export default utils;
|
||||
|
@ -6,7 +6,7 @@ import { EntraidCredentialsProvider } from './entraid-credentials-provider';
|
||||
export const testUtils = TestUtils.createFromConfig({
|
||||
dockerImageName: 'redislabs/client-libs-test',
|
||||
dockerImageVersionArgument: 'redis-version',
|
||||
defaultDockerVersion: '8.2-M01-pre'
|
||||
defaultDockerVersion: '8.2-rc1'
|
||||
});
|
||||
|
||||
const DEBUG_MODE_ARGS = testUtils.isVersionGreaterThan([7]) ?
|
||||
|
@ -4,7 +4,7 @@ import RedisJSON from '.';
|
||||
export default TestUtils.createFromConfig({
|
||||
dockerImageName: 'redislabs/client-libs-test',
|
||||
dockerImageVersionArgument: 'redis-version',
|
||||
defaultDockerVersion: '8.2-M01-pre'
|
||||
defaultDockerVersion: '8.2-rc1'
|
||||
});
|
||||
|
||||
export const GLOBAL = {
|
||||
|
@ -5,7 +5,7 @@ import { RespVersions } from '@redis/client';
|
||||
export default TestUtils.createFromConfig({
|
||||
dockerImageName: 'redislabs/client-libs-test',
|
||||
dockerImageVersionArgument: 'redis-version',
|
||||
defaultDockerVersion: '8.2-M01-pre'
|
||||
defaultDockerVersion: '8.2-rc1'
|
||||
});
|
||||
|
||||
export const GLOBAL = {
|
||||
|
@ -4,7 +4,7 @@ import TimeSeries from '.';
|
||||
export default TestUtils.createFromConfig({
|
||||
dockerImageName: 'redislabs/client-libs-test',
|
||||
dockerImageVersionArgument: 'redis-version',
|
||||
defaultDockerVersion: '8.2-M01-pre'
|
||||
defaultDockerVersion: '8.2-rc1'
|
||||
});
|
||||
|
||||
export const GLOBAL = {
|
||||
|
Reference in New Issue
Block a user