diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 89efdb6111..df8cb1d1b6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -22,7 +22,7 @@ jobs: fail-fast: false matrix: node-version: ["18", "20", "22"] - redis-version: ["rs-7.2.0-v13", "rs-7.4.0-v1", "8.0.2", "8.2-M01-pre"] + redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2-rc1"] steps: - uses: actions/checkout@v4 with: diff --git a/packages/bloom/lib/test-utils.ts b/packages/bloom/lib/test-utils.ts index 4396c94f72..268ebca8cb 100644 --- a/packages/bloom/lib/test-utils.ts +++ b/packages/bloom/lib/test-utils.ts @@ -4,7 +4,7 @@ import RedisBloomModules from '.'; export default TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export const GLOBAL = { diff --git a/packages/client/lib/commands/XACKDEL.spec.ts b/packages/client/lib/commands/XACKDEL.spec.ts new file mode 100644 index 0000000000..9d7bad15a2 --- /dev/null +++ b/packages/client/lib/commands/XACKDEL.spec.ts @@ -0,0 +1,196 @@ +import { strict as assert } from "node:assert"; +import XACKDEL from "./XACKDEL"; +import { parseArgs } from "./generic-transformers"; +import testUtils, { GLOBAL } from "../test-utils"; +import { + STREAM_DELETION_POLICY, + STREAM_DELETION_REPLY_CODES, +} from "./common-stream.types"; + +describe("XACKDEL", () => { + describe("transformArguments", () => { + it("string - without policy", () => { + assert.deepEqual(parseArgs(XACKDEL, "key", "group", "0-0"), [ + "XACKDEL", + "key", + "group", + "IDS", + "1", + "0-0", + ]); + }); + + it("string - with policy", () => { + assert.deepEqual( + parseArgs( + XACKDEL, + "key", + "group", + "0-0", + STREAM_DELETION_POLICY.KEEPREF + ), + ["XACKDEL", "key", "group", "KEEPREF", "IDS", "1", "0-0"] + ); + }); + + it("array - without policy", () => { + assert.deepEqual(parseArgs(XACKDEL, "key", "group", ["0-0", "1-0"]), [ + "XACKDEL", + "key", + "group", + "IDS", + "2", + "0-0", + "1-0", + ]); + }); + + it("array - with policy", () => { + assert.deepEqual( + parseArgs( + XACKDEL, + "key", + "group", + ["0-0", "1-0"], + STREAM_DELETION_POLICY.DELREF + ), + ["XACKDEL", "key", "group", "DELREF", "IDS", "2", "0-0", "1-0"] + ); + }); + }); + + testUtils.testAll( + `XACKDEL non-existing key - without policy`, + async (client) => { + const reply = await client.xAckDel("{tag}stream-key", "testgroup", "0-0"); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XACKDEL existing key - without policy`, + async (client) => { + const streamKey = "{tag}stream-key"; + const groupName = "testgroup"; + + // create consumer group, stream and message + await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true }); + const messageId = await client.xAdd(streamKey, "*", { field: "value" }); + + // read message + await client.xReadGroup(groupName, "testconsumer", { + key: streamKey, + id: ">", + }); + + const reply = await client.xAckDel(streamKey, groupName, messageId); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XACKDEL existing key - with policy`, + async (client) => { + const streamKey = "{tag}stream-key"; + const groupName = "testgroup"; + + // create consumer group, stream and message + await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true }); + const messageId = await client.xAdd(streamKey, "*", { field: "value" }); + + // read message + await client.xReadGroup(groupName, "testconsumer", { + key: streamKey, + id: ">", + }); + + const reply = await client.xAckDel( + streamKey, + groupName, + messageId, + STREAM_DELETION_POLICY.DELREF + ); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XACKDEL acknowledge policy - with consumer group`, + async (client) => { + const streamKey = "{tag}stream-key"; + const groupName = "testgroup"; + + // create consumer groups, stream and message + await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true }); + await client.xGroupCreate(streamKey, "some-other-group", "0"); + const messageId = await client.xAdd(streamKey, "*", { field: "value" }); + + // read message + await client.xReadGroup(groupName, "testconsumer", { + key: streamKey, + id: ">", + }); + + const reply = await client.xAckDel( + streamKey, + groupName, + messageId, + STREAM_DELETION_POLICY.ACKED + ); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XACKDEL multiple keys`, + async (client) => { + const streamKey = "{tag}stream-key"; + const groupName = "testgroup"; + + // create consumer groups, stream and add messages + await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true }); + const messageIds = await Promise.all([ + client.xAdd(streamKey, "*", { field: "value1" }), + client.xAdd(streamKey, "*", { field: "value2" }), + ]); + + // read messages + await client.xReadGroup(groupName, "testconsumer", { + key: streamKey, + id: ">", + }); + + const reply = await client.xAckDel( + streamKey, + groupName, + [...messageIds, "0-0"], + STREAM_DELETION_POLICY.DELREF + ); + assert.deepEqual(reply, [ + STREAM_DELETION_REPLY_CODES.DELETED, + STREAM_DELETION_REPLY_CODES.DELETED, + STREAM_DELETION_REPLY_CODES.NOT_FOUND, + ]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); +}); diff --git a/packages/client/lib/commands/XACKDEL.ts b/packages/client/lib/commands/XACKDEL.ts new file mode 100644 index 0000000000..6e209879e4 --- /dev/null +++ b/packages/client/lib/commands/XACKDEL.ts @@ -0,0 +1,45 @@ +import { CommandParser } from "../client/parser"; +import { RedisArgument, ArrayReply, Command } from "../RESP/types"; +import { + StreamDeletionReplyCode, + StreamDeletionPolicy, +} from "./common-stream.types"; +import { RedisVariadicArgument } from "./generic-transformers"; + +/** + * Acknowledges and deletes one or multiple messages for a stream consumer group + */ +export default { + IS_READ_ONLY: false, + /** + * Constructs the XACKDEL command to acknowledge and delete one or multiple messages for a stream consumer group + * + * @param parser - The command parser + * @param key - The stream key + * @param group - The consumer group name + * @param id - One or more message IDs to acknowledge and delete + * @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF) + * @returns Array of integers: -1 (not found), 1 (acknowledged and deleted), 2 (acknowledged with dangling refs) + * @see https://redis.io/commands/xackdel/ + */ + parseCommand( + parser: CommandParser, + key: RedisArgument, + group: RedisArgument, + id: RedisVariadicArgument, + policy?: StreamDeletionPolicy + ) { + parser.push("XACKDEL"); + parser.pushKey(key); + parser.push(group); + + if (policy) { + parser.push(policy); + } + + parser.push("IDS"); + parser.pushVariadicWithLength(id); + }, + transformReply: + undefined as unknown as () => ArrayReply, +} as const satisfies Command; diff --git a/packages/client/lib/commands/XADD.spec.ts b/packages/client/lib/commands/XADD.spec.ts index 321581d086..a41e868275 100644 --- a/packages/client/lib/commands/XADD.spec.ts +++ b/packages/client/lib/commands/XADD.spec.ts @@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert'; import testUtils, { GLOBAL } from '../test-utils'; import XADD from './XADD'; import { parseArgs } from './generic-transformers'; +import { STREAM_DELETION_POLICY } from './common-stream.types'; describe('XADD', () => { describe('transformArguments', () => { @@ -78,6 +79,37 @@ describe('XADD', () => { ['XADD', 'key', '1000', 'LIMIT', '1', '*', 'field', 'value'] ); }); + + it('with TRIM.policy', () => { + assert.deepEqual( + parseArgs(XADD, 'key', '*', { + field: 'value' + }, { + TRIM: { + threshold: 1000, + policy: STREAM_DELETION_POLICY.DELREF + } + }), + ['XADD', 'key', '1000', 'DELREF', '*', 'field', 'value'] + ); + }); + + it('with all TRIM options', () => { + assert.deepEqual( + parseArgs(XADD, 'key', '*', { + field: 'value' + }, { + TRIM: { + strategy: 'MAXLEN', + strategyModifier: '~', + threshold: 1000, + limit: 100, + policy: STREAM_DELETION_POLICY.ACKED + } + }), + ['XADD', 'key', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value'] + ); + }); }); testUtils.testAll('xAdd', async client => { @@ -91,4 +123,52 @@ describe('XADD', () => { client: GLOBAL.SERVERS.OPEN, cluster: GLOBAL.CLUSTERS.OPEN }); + + testUtils.testAll( + 'xAdd with TRIM policy', + async (client) => { + assert.equal( + typeof await client.xAdd('{tag}key', '*', + { field: 'value' }, + { + TRIM: { + strategy: 'MAXLEN', + threshold: 1000, + policy: STREAM_DELETION_POLICY.KEEPREF + } + } + ), + 'string' + ); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + 'xAdd with all TRIM options', + async (client) => { + assert.equal( + typeof await client.xAdd('{tag}key2', '*', + { field: 'value' }, + { + TRIM: { + strategy: 'MAXLEN', + strategyModifier: '~', + threshold: 1000, + limit: 10, + policy: STREAM_DELETION_POLICY.DELREF + } + } + ), + 'string' + ); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); }); diff --git a/packages/client/lib/commands/XADD.ts b/packages/client/lib/commands/XADD.ts index b0c50b1bfd..f2509a9fa7 100644 --- a/packages/client/lib/commands/XADD.ts +++ b/packages/client/lib/commands/XADD.ts @@ -1,5 +1,6 @@ import { CommandParser } from '../client/parser'; import { RedisArgument, BlobStringReply, Command } from '../RESP/types'; +import { StreamDeletionPolicy } from './common-stream.types'; import { Tail } from './generic-transformers'; /** @@ -10,6 +11,7 @@ import { Tail } from './generic-transformers'; * @property TRIM.strategyModifier - Exact ('=') or approximate ('~') trimming * @property TRIM.threshold - Maximum stream length or minimum ID to retain * @property TRIM.limit - Maximum number of entries to trim in one call + * @property TRIM.policy - Policy to apply when trimming entries (optional, defaults to KEEPREF) */ export interface XAddOptions { TRIM?: { @@ -17,6 +19,8 @@ export interface XAddOptions { strategyModifier?: '=' | '~'; threshold: number; limit?: number; + /** added in 8.2 */ + policy?: StreamDeletionPolicy; }; } @@ -58,6 +62,10 @@ export function parseXAddArguments( if (options.TRIM.limit) { parser.push('LIMIT', options.TRIM.limit.toString()); } + + if (options.TRIM.policy) { + parser.push(options.TRIM.policy); + } } parser.push(id); diff --git a/packages/client/lib/commands/XADD_NOMKSTREAM.spec.ts b/packages/client/lib/commands/XADD_NOMKSTREAM.spec.ts index 97927f212f..a957d0f06c 100644 --- a/packages/client/lib/commands/XADD_NOMKSTREAM.spec.ts +++ b/packages/client/lib/commands/XADD_NOMKSTREAM.spec.ts @@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert'; import testUtils, { GLOBAL } from '../test-utils'; import XADD_NOMKSTREAM from './XADD_NOMKSTREAM'; import { parseArgs } from './generic-transformers'; +import { STREAM_DELETION_POLICY } from './common-stream.types'; describe('XADD NOMKSTREAM', () => { testUtils.isVersionGreaterThanHook([6, 2]); @@ -80,17 +81,82 @@ describe('XADD NOMKSTREAM', () => { ['XADD', 'key', 'NOMKSTREAM', '1000', 'LIMIT', '1', '*', 'field', 'value'] ); }); + + it('with TRIM.policy', () => { + assert.deepEqual( + parseArgs(XADD_NOMKSTREAM, 'key', '*', { + field: 'value' + }, { + TRIM: { + threshold: 1000, + policy: STREAM_DELETION_POLICY.DELREF + } + }), + ['XADD', 'key', 'NOMKSTREAM', '1000', 'DELREF', '*', 'field', 'value'] + ); + }); + + it('with all TRIM options', () => { + assert.deepEqual( + parseArgs(XADD_NOMKSTREAM, 'key', '*', { + field: 'value' + }, { + TRIM: { + strategy: 'MAXLEN', + strategyModifier: '~', + threshold: 1000, + limit: 100, + policy: STREAM_DELETION_POLICY.ACKED + } + }), + ['XADD', 'key', 'NOMKSTREAM', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value'] + ); + }); }); - testUtils.testAll('xAddNoMkStream', async client => { - assert.equal( - await client.xAddNoMkStream('key', '*', { - field: 'value' - }), - null - ); - }, { - client: GLOBAL.SERVERS.OPEN, - cluster: GLOBAL.CLUSTERS.OPEN - }); + testUtils.testAll( + 'xAddNoMkStream - null when stream does not exist', + async (client) => { + assert.equal( + await client.xAddNoMkStream('{tag}nonexistent-stream', '*', { + field: 'value' + }), + null + ); + }, + { + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN, + } + ); + + testUtils.testAll( + 'xAddNoMkStream - with all TRIM options', + async (client) => { + const streamKey = '{tag}stream'; + + // Create stream and add some messages + await client.xAdd(streamKey, '*', { field: 'value1' }); + + // Use NOMKSTREAM with all TRIM options + const messageId = await client.xAddNoMkStream(streamKey, '*', + { field: 'value2' }, + { + TRIM: { + strategyModifier: '~', + limit: 1, + strategy: 'MAXLEN', + threshold: 2, + policy: STREAM_DELETION_POLICY.DELREF + } + } + ); + + assert.equal(typeof messageId, 'string'); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); }); diff --git a/packages/client/lib/commands/XDELEX.spec.ts b/packages/client/lib/commands/XDELEX.spec.ts new file mode 100644 index 0000000000..8c42150325 --- /dev/null +++ b/packages/client/lib/commands/XDELEX.spec.ts @@ -0,0 +1,156 @@ +import { strict as assert } from "node:assert"; +import XDELEX from "./XDELEX"; +import { parseArgs } from "./generic-transformers"; +import testUtils, { GLOBAL } from "../test-utils"; +import { + STREAM_DELETION_POLICY, + STREAM_DELETION_REPLY_CODES, +} from "./common-stream.types"; + +describe("XDELEX", () => { + describe("transformArguments", () => { + it("string - without policy", () => { + assert.deepEqual(parseArgs(XDELEX, "key", "0-0"), [ + "XDELEX", + "key", + "IDS", + "1", + "0-0", + ]); + }); + + it("string - with policy", () => { + assert.deepEqual( + parseArgs(XDELEX, "key", "0-0", STREAM_DELETION_POLICY.KEEPREF), + ["XDELEX", "key", "KEEPREF", "IDS", "1", "0-0"] + ); + }); + + it("array - without policy", () => { + assert.deepEqual(parseArgs(XDELEX, "key", ["0-0", "1-0"]), [ + "XDELEX", + "key", + "IDS", + "2", + "0-0", + "1-0", + ]); + }); + + it("array - with policy", () => { + assert.deepEqual( + parseArgs(XDELEX, "key", ["0-0", "1-0"], STREAM_DELETION_POLICY.DELREF), + ["XDELEX", "key", "DELREF", "IDS", "2", "0-0", "1-0"] + ); + }); + }); + + testUtils.testAll( + `XDELEX non-existing key - without policy`, + async (client) => { + const reply = await client.xDelEx("{tag}stream-key", "0-0"); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XDELEX existing key - without policy`, + async (client) => { + const streamKey = "{tag}stream-key"; + const messageId = await client.xAdd(streamKey, "*", { + field: "value", + }); + + const reply = await client.xDelEx( + streamKey, + messageId, + ); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XDELEX existing key - with policy`, + async (client) => { + const streamKey = "{tag}stream-key"; + const messageId = await client.xAdd(streamKey, "*", { + field: "value", + }); + + const reply = await client.xDelEx( + streamKey, + messageId, + STREAM_DELETION_POLICY.DELREF + ); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XDELEX acknowledge policy - with consumer group`, + async (client) => { + const streamKey = "{tag}stream-key"; + + // Add a message to the stream + const messageId = await client.xAdd(streamKey, "*", { + field: "value", + }); + + // Create consumer group + await client.xGroupCreate(streamKey, "testgroup", "0"); + + const reply = await client.xDelEx( + streamKey, + messageId, + STREAM_DELETION_POLICY.ACKED + ); + assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + `XDELEX multiple keys`, + async (client) => { + const streamKey = "{tag}stream-key"; + const messageIds = await Promise.all([ + client.xAdd(streamKey, "*", { + field: "value1", + }), + client.xAdd(streamKey, "*", { + field: "value2", + }), + ]); + + const reply = await client.xDelEx( + streamKey, + [...messageIds, "0-0"], + STREAM_DELETION_POLICY.DELREF + ); + assert.deepEqual(reply, [ + STREAM_DELETION_REPLY_CODES.DELETED, + STREAM_DELETION_REPLY_CODES.DELETED, + STREAM_DELETION_REPLY_CODES.NOT_FOUND, + ]); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); +}); diff --git a/packages/client/lib/commands/XDELEX.ts b/packages/client/lib/commands/XDELEX.ts new file mode 100644 index 0000000000..021dd0a9e1 --- /dev/null +++ b/packages/client/lib/commands/XDELEX.ts @@ -0,0 +1,42 @@ +import { CommandParser } from "../client/parser"; +import { RedisArgument, ArrayReply, Command } from "../RESP/types"; +import { + StreamDeletionPolicy, + StreamDeletionReplyCode, +} from "./common-stream.types"; +import { RedisVariadicArgument } from "./generic-transformers"; + +/** + * Deletes one or multiple entries from the stream + */ +export default { + IS_READ_ONLY: false, + /** + * Constructs the XDELEX command to delete one or multiple entries from the stream + * + * @param parser - The command parser + * @param key - The stream key + * @param id - One or more message IDs to delete + * @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF) + * @returns Array of integers: -1 (not found), 1 (deleted), 2 (dangling refs) + * @see https://redis.io/commands/xdelex/ + */ + parseCommand( + parser: CommandParser, + key: RedisArgument, + id: RedisVariadicArgument, + policy?: StreamDeletionPolicy + ) { + parser.push("XDELEX"); + parser.pushKey(key); + + if (policy) { + parser.push(policy); + } + + parser.push("IDS"); + parser.pushVariadicWithLength(id); + }, + transformReply: + undefined as unknown as () => ArrayReply, +} as const satisfies Command; diff --git a/packages/client/lib/commands/XTRIM.spec.ts b/packages/client/lib/commands/XTRIM.spec.ts index 2c31f0fef9..b88cf84676 100644 --- a/packages/client/lib/commands/XTRIM.spec.ts +++ b/packages/client/lib/commands/XTRIM.spec.ts @@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert'; import testUtils, { GLOBAL } from '../test-utils'; import XTRIM from './XTRIM'; import { parseArgs } from './generic-transformers'; +import { STREAM_DELETION_POLICY } from './common-stream.types'; describe('XTRIM', () => { describe('transformArguments', () => { @@ -12,6 +13,13 @@ describe('XTRIM', () => { ); }); + it('simple - MINID', () => { + assert.deepEqual( + parseArgs(XTRIM, 'key', 'MINID', 123), + ['XTRIM', 'key', 'MINID', '123'] + ); + }); + it('with strategyModifier', () => { assert.deepEqual( parseArgs(XTRIM, 'key', 'MAXLEN', 1, { @@ -39,15 +47,96 @@ describe('XTRIM', () => { ['XTRIM', 'key', 'MAXLEN', '=', '1', 'LIMIT', '1'] ); }); + + it('with policy', () => { + assert.deepEqual( + parseArgs(XTRIM, 'key', 'MAXLEN', 1, { + policy: STREAM_DELETION_POLICY.DELREF + }), + ['XTRIM', 'key', 'MAXLEN', '1', 'DELREF'] + ); + }); + + it('with all options', () => { + assert.deepEqual( + parseArgs(XTRIM, 'key', 'MAXLEN', 1, { + strategyModifier: '~', + LIMIT: 100, + policy: STREAM_DELETION_POLICY.ACKED + }), + ['XTRIM', 'key', 'MAXLEN', '~', '1', 'LIMIT', '100', 'ACKED'] + ); + }); }); - testUtils.testAll('xTrim', async client => { + testUtils.testAll('xTrim with MAXLEN', async client => { assert.equal( - await client.xTrim('key', 'MAXLEN', 1), - 0 + typeof await client.xTrim('key', 'MAXLEN', 1), + 'number' ); }, { client: GLOBAL.SERVERS.OPEN, cluster: GLOBAL.CLUSTERS.OPEN, }); + + testUtils.testAll('xTrim with MINID', async client => { + assert.equal( + typeof await client.xTrim('key', 'MINID', 1), + 'number' + ); + }, { + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN, + }); + + testUtils.testAll( + 'xTrim with LIMIT', + async (client) => { + assert.equal( + typeof await client.xTrim('{tag}key', 'MAXLEN', 1000, { + strategyModifier: '~', + LIMIT: 10 + }), + 'number' + ); + }, + { + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN, + } + ); + + testUtils.testAll( + 'xTrim with policy', + async (client) => { + assert.equal( + typeof await client.xTrim('{tag}key', 'MAXLEN', 0, { + policy: STREAM_DELETION_POLICY.DELREF + }), + 'number' + ); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); + + testUtils.testAll( + 'xTrim with all options', + async (client) => { + assert.equal( + typeof await client.xTrim('{tag}key', 'MINID', 0, { + strategyModifier: '~', + LIMIT: 10, + policy: STREAM_DELETION_POLICY.KEEPREF + }), + 'number' + ); + }, + { + client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] }, + cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] }, + } + ); }); diff --git a/packages/client/lib/commands/XTRIM.ts b/packages/client/lib/commands/XTRIM.ts index 6125720111..34171d4611 100644 --- a/packages/client/lib/commands/XTRIM.ts +++ b/packages/client/lib/commands/XTRIM.ts @@ -1,16 +1,20 @@ import { CommandParser } from '../client/parser'; import { NumberReply, Command, RedisArgument } from '../RESP/types'; +import { StreamDeletionPolicy } from './common-stream.types'; /** * Options for the XTRIM command * * @property strategyModifier - Exact ('=') or approximate ('~') trimming * @property LIMIT - Maximum number of entries to trim in one call (Redis 6.2+) + * @property policy - Policy to apply when deleting entries (optional, defaults to KEEPREF) */ export interface XTrimOptions { strategyModifier?: '=' | '~'; /** added in 6.2 */ LIMIT?: number; + /** added in 8.2 */ + policy?: StreamDeletionPolicy; } /** @@ -49,6 +53,10 @@ export default { if (options?.LIMIT) { parser.push('LIMIT', options.LIMIT.toString()); } + + if (options?.policy) { + parser.push(options.policy); + } }, transformReply: undefined as unknown as () => NumberReply } as const satisfies Command; diff --git a/packages/client/lib/commands/common-stream.types.ts b/packages/client/lib/commands/common-stream.types.ts new file mode 100644 index 0000000000..60955b6e3c --- /dev/null +++ b/packages/client/lib/commands/common-stream.types.ts @@ -0,0 +1,28 @@ +/** Common stream deletion policies + * + * Added in Redis 8.2 + */ +export const STREAM_DELETION_POLICY = { + /** Preserve references (default) */ + KEEPREF: "KEEPREF", + /** Delete all references */ + DELREF: "DELREF", + /** Only acknowledged entries */ + ACKED: "ACKED", +} as const; + +export type StreamDeletionPolicy = + (typeof STREAM_DELETION_POLICY)[keyof typeof STREAM_DELETION_POLICY]; + +/** Common reply codes for stream deletion operations */ +export const STREAM_DELETION_REPLY_CODES = { + /** ID not found */ + NOT_FOUND: -1, + /** Entry deleted */ + DELETED: 1, + /** Dangling references */ + DANGLING_REFS: 2, +} as const; + +export type StreamDeletionReplyCode = + (typeof STREAM_DELETION_REPLY_CODES)[keyof typeof STREAM_DELETION_REPLY_CODES]; diff --git a/packages/client/lib/commands/index.ts b/packages/client/lib/commands/index.ts index 87ab8d10b8..4614c8b282 100644 --- a/packages/client/lib/commands/index.ts +++ b/packages/client/lib/commands/index.ts @@ -280,6 +280,7 @@ import TYPE from './TYPE'; import UNLINK from './UNLINK'; import WAIT from './WAIT'; import XACK from './XACK'; +import XACKDEL from './XACKDEL'; import XADD_NOMKSTREAM from './XADD_NOMKSTREAM'; import XADD from './XADD'; import XAUTOCLAIM_JUSTID from './XAUTOCLAIM_JUSTID'; @@ -287,6 +288,7 @@ import XAUTOCLAIM from './XAUTOCLAIM'; import XCLAIM_JUSTID from './XCLAIM_JUSTID'; import XCLAIM from './XCLAIM'; import XDEL from './XDEL'; +import XDELEX from './XDELEX'; import XGROUP_CREATE from './XGROUP_CREATE'; import XGROUP_CREATECONSUMER from './XGROUP_CREATECONSUMER'; import XGROUP_DELCONSUMER from './XGROUP_DELCONSUMER'; @@ -924,6 +926,8 @@ export default { wait: WAIT, XACK, xAck: XACK, + XACKDEL, + xAckDel: XACKDEL, XADD_NOMKSTREAM, xAddNoMkStream: XADD_NOMKSTREAM, XADD, @@ -938,6 +942,8 @@ export default { xClaim: XCLAIM, XDEL, xDel: XDEL, + XDELEX, + xDelEx: XDELEX, XGROUP_CREATE, xGroupCreate: XGROUP_CREATE, XGROUP_CREATECONSUMER, diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index c8efa47f41..7c4752a885 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -174,7 +174,7 @@ export class SentinelFramework extends DockerBase { this.#testUtils = TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); this.#nodeMap = new Map>>>(); this.#sentinelMap = new Map>>>(); diff --git a/packages/client/lib/test-utils.ts b/packages/client/lib/test-utils.ts index b9b906e943..86b6ed294a 100644 --- a/packages/client/lib/test-utils.ts +++ b/packages/client/lib/test-utils.ts @@ -9,7 +9,7 @@ import RedisBloomModules from '@redis/bloom'; const utils = TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export default utils; diff --git a/packages/entraid/lib/test-utils.ts b/packages/entraid/lib/test-utils.ts index 3c561d4ba4..e5d977a6b4 100644 --- a/packages/entraid/lib/test-utils.ts +++ b/packages/entraid/lib/test-utils.ts @@ -6,7 +6,7 @@ import { EntraidCredentialsProvider } from './entraid-credentials-provider'; export const testUtils = TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); const DEBUG_MODE_ARGS = testUtils.isVersionGreaterThan([7]) ? diff --git a/packages/json/lib/test-utils.ts b/packages/json/lib/test-utils.ts index 6b6859d61b..cba2d95e73 100644 --- a/packages/json/lib/test-utils.ts +++ b/packages/json/lib/test-utils.ts @@ -4,7 +4,7 @@ import RedisJSON from '.'; export default TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export const GLOBAL = { diff --git a/packages/search/lib/test-utils.ts b/packages/search/lib/test-utils.ts index a2b9c816da..d4d91307b9 100644 --- a/packages/search/lib/test-utils.ts +++ b/packages/search/lib/test-utils.ts @@ -5,7 +5,7 @@ import { RespVersions } from '@redis/client'; export default TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export const GLOBAL = { diff --git a/packages/time-series/lib/test-utils.ts b/packages/time-series/lib/test-utils.ts index 8a664ee8df..9c59918e70 100644 --- a/packages/time-series/lib/test-utils.ts +++ b/packages/time-series/lib/test-utils.ts @@ -4,7 +4,7 @@ import TimeSeries from '.'; export default TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', - defaultDockerVersion: '8.2-M01-pre' + defaultDockerVersion: '8.2-rc1' }); export const GLOBAL = {