diff --git a/packages/client/lib/commands/XAUTOCLAIM.spec.ts b/packages/client/lib/commands/XAUTOCLAIM.spec.ts index 229e71513f..bf3b11c6c0 100644 --- a/packages/client/lib/commands/XAUTOCLAIM.spec.ts +++ b/packages/client/lib/commands/XAUTOCLAIM.spec.ts @@ -31,11 +31,10 @@ describe('XAUTOCLAIM', () => { } }); - const [, , id1, id2, , , reply] = await Promise.all([ + const [, id1, id2, , , reply] = await Promise.all([ client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }), - client.xGroupCreateConsumer('key', 'group', 'consumer'), client.xAdd('key', '*', message), client.xAdd('key', '*', message), client.xReadGroup('group', 'consumer', { diff --git a/packages/client/lib/commands/XAUTOCLAIM.ts b/packages/client/lib/commands/XAUTOCLAIM.ts index 1fcd1cf32d..57ad010991 100644 --- a/packages/client/lib/commands/XAUTOCLAIM.ts +++ b/packages/client/lib/commands/XAUTOCLAIM.ts @@ -1,5 +1,5 @@ import { RedisArgument, TuplesReply, BlobStringReply, ArrayReply, NullReply, UnwrapReply, Command } from '../RESP/types'; -import { StreamMessageRawReply, isNullReply, transformStreamMessageReply } from './generic-transformers'; +import { StreamMessageReply, transformStreamMessageNullReply } from './generic-transformers'; export interface XAutoClaimOptions { COUNT?: number; @@ -7,7 +7,7 @@ export interface XAutoClaimOptions { export type XAutoClaimRawReply = TuplesReply<[ nextId: BlobStringReply, - messages: ArrayReply, + messages: ArrayReply, deletedMessages: ArrayReply ]>; @@ -40,9 +40,7 @@ export default { transformReply(reply: UnwrapReply) { return { nextId: reply[0], - messages: (reply[1] as unknown as UnwrapReply).map(message => { - return isNullReply(message) ? null : transformStreamMessageReply(message); - }), + messages: (reply[1] as unknown as UnwrapReply).map(transformStreamMessageNullReply), deletedMessages: reply[2] }; } diff --git a/packages/client/lib/commands/XCLAIM.spec.ts b/packages/client/lib/commands/XCLAIM.spec.ts index ffe2deeaae..48546eecdb 100644 --- a/packages/client/lib/commands/XCLAIM.spec.ts +++ b/packages/client/lib/commands/XCLAIM.spec.ts @@ -89,16 +89,35 @@ describe('XCLAIM', () => { }); }); - // TODO: test with messages testUtils.testAll('xClaim', async client => { - const [, reply] = await Promise.all([ + const message = Object.create(null, { + field: { + value: 'value', + enumerable: true + } + }); + + const [, , , , , reply] = await Promise.all([ client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }), - client.xClaim('key', 'group', 'consumer', 1, '0-0') + client.xAdd('key', '1-0', message), + client.xAdd('key', '2-0', message), + client.xReadGroup('group', 'consumer', { + key: 'key', + id: '>' + }), + client.xTrim('key', 'MAXLEN', 1), + client.xClaim('key', 'group', 'consumer', 0, ['1-0', '2-0']) ]); - assert.deepEqual(reply, []); + assert.deepEqual(reply, [ + ...(testUtils.isVersionGreaterThan([7, 0]) ? [] : [null]), + { + id: '2-0', + message + } + ]); }, { client: GLOBAL.SERVERS.OPEN, cluster: GLOBAL.CLUSTERS.OPEN diff --git a/packages/client/lib/commands/XCLAIM.ts b/packages/client/lib/commands/XCLAIM.ts index 2c04123976..3ec4f6639b 100644 --- a/packages/client/lib/commands/XCLAIM.ts +++ b/packages/client/lib/commands/XCLAIM.ts @@ -1,5 +1,5 @@ -import { RedisArgument, Command } from '../RESP/types'; -import { RedisVariadicArgument, pushVariadicArguments, transformStreamMessagesReply } from './generic-transformers'; +import { RedisArgument, ArrayReply, NullReply, UnwrapReply, Command } from '../RESP/types'; +import { RedisVariadicArgument, pushVariadicArguments, StreamMessageReply, transformStreamMessageNullReply } from './generic-transformers'; export interface XClaimOptions { IDLE?: number; @@ -50,5 +50,7 @@ export default { return args; }, - transformReply: transformStreamMessagesReply + transformReply(reply: UnwrapReply>) { + return reply.map(transformStreamMessageNullReply); + } } as const satisfies Command; diff --git a/packages/client/lib/commands/XINFO_STREAM.ts b/packages/client/lib/commands/XINFO_STREAM.ts index 9d809d4ab5..04721d0ad3 100644 --- a/packages/client/lib/commands/XINFO_STREAM.ts +++ b/packages/client/lib/commands/XINFO_STREAM.ts @@ -1,5 +1,5 @@ -import { TuplesToMapReply, BlobStringReply, NumberReply, NullReply, Resp2Reply, Command, RespType, RESP_TYPES, RedisArgument } from '../RESP/types'; -import { StreamMessageRawReply, transformStreamMessageReply } from './generic-transformers'; +import { RedisArgument, TuplesToMapReply, BlobStringReply, NumberReply, NullReply, TuplesReply, ArrayReply, UnwrapReply, Command } from '../RESP/types'; +import { isNullReply, transformTuplesReply } from './generic-transformers'; export type XInfoStreamReply = TuplesToMapReply<[ [BlobStringReply<'length'>, NumberReply], @@ -13,8 +13,8 @@ export type XInfoStreamReply = TuplesToMapReply<[ /** added in 7.2 */ [BlobStringReply<'recorded-first-entry-id'>, BlobStringReply], [BlobStringReply<'groups'>, NumberReply], - [BlobStringReply<'first-entry'>, ReturnType | NullReply], - [BlobStringReply<'last-entry'>, ReturnType | NullReply] + [BlobStringReply<'first-entry'>, ReturnType], + [BlobStringReply<'last-entry'>, ReturnType] ]>; export default { @@ -66,6 +66,17 @@ export default { } } as const satisfies Command; -function transformEntry(entry: StreamMessageRawReply | NullReply) { - return entry === null ? null : transformStreamMessageReply(entry as StreamMessageRawReply); +type RawEntry = TuplesReply<[ + id: BlobStringReply, + message: ArrayReply +]> | NullReply; + +function transformEntry(entry: RawEntry) { + if (isNullReply(entry)) return entry; + + const [id, message] = entry as unknown as UnwrapReply; + return { + id, + message: transformTuplesReply(message) + }; } diff --git a/packages/client/lib/commands/XRANGE.ts b/packages/client/lib/commands/XRANGE.ts index 1eed1e12b9..908e3d717f 100644 --- a/packages/client/lib/commands/XRANGE.ts +++ b/packages/client/lib/commands/XRANGE.ts @@ -1,5 +1,5 @@ -import { RedisArgument, Command } from '../RESP/types'; -import { transformStreamMessagesReply } from './generic-transformers'; +import { RedisArgument, ArrayReply, UnwrapReply, Command } from '../RESP/types'; +import { StreamMessageReply, transformStreamMessageReply } from './generic-transformers'; export interface XRangeOptions { COUNT?: number; @@ -25,5 +25,7 @@ export default { FIRST_KEY_INDEX: 1, IS_READ_ONLY: true, transformArguments: transformXRangeArguments.bind(undefined, 'XRANGE'), - transformReply: transformStreamMessagesReply + transformReply(reply: UnwrapReply>) { + return reply.map(transformStreamMessageReply); + } } as const satisfies Command; diff --git a/packages/client/lib/commands/generic-transformers.ts b/packages/client/lib/commands/generic-transformers.ts index 3c51100cc7..789464fddb 100644 --- a/packages/client/lib/commands/generic-transformers.ts +++ b/packages/client/lib/commands/generic-transformers.ts @@ -84,37 +84,23 @@ export function transformTuplesReply( return message; } -export type StreamMessageRawReply = TuplesReply<[ +export type StreamMessageReply = TuplesReply<[ id: BlobStringReply, message: ArrayReply ]>; -export function transformStreamMessageReply(reply: StreamMessageRawReply) { - const [id, message] = reply as unknown as UnwrapReply; +export function transformStreamMessageReply(reply: StreamMessageReply) { + const [ id, message ] = reply as unknown as UnwrapReply; return { id, message: transformTuplesReply(message) }; } -export type StreamMessagesRawReply = ArrayReply; - -export function transformStreamMessagesReply(reply: StreamMessagesRawReply) { - return (reply as unknown as UnwrapReply) - .map(message => transformStreamMessageReply(message)); +export function transformStreamMessageNullReply(reply: StreamMessageReply | NullReply) { + return isNullReply(reply) ? reply : transformStreamMessageReply(reply); } -// export type StreamsMessagesReply = MapReply; - -// export function transformStreamsMessagesReply(reply: Array | null): StreamsMessagesReply | null { -// if (reply === null) return null; - -// return reply.map(([name, rawMessages]) => ({ -// name, -// messages: transformStreamMessagesReply(rawMessages) -// })); -// } - export interface SortedSetMember { value: RedisArgument; score: number;