1
0
mirror of https://github.com/redis/node-redis.git synced 2025-12-11 09:22:35 +03:00

feat(xreadgroup): add claim attribute (#3122)

* feat(xreadgroup): add claim attribute

the CLAIM attribute can be used to instruct redis to return
PEL ( Pending Entries List ) entries with their respective
deliveries and ms since last delivery

* remove m01 from test matrix

* add jsdoc
This commit is contained in:
Nikolay Karadzhov
2025-11-03 11:59:49 +02:00
committed by GitHub
parent 130e88d45c
commit 5a0a06df69
4 changed files with 106 additions and 49 deletions

View File

@@ -46,7 +46,7 @@ export function transformStringDoubleArgument(num: RedisArgument | number): Redi
export const transformDoubleReply = {
2: (reply: BlobStringReply, preserve?: any, typeMapping?: TypeMapping): DoubleReply => {
const double = typeMapping ? typeMapping[RESP_TYPES.DOUBLE] : undefined;
switch (double) {
case String: {
return reply as unknown as DoubleReply;
@@ -58,13 +58,13 @@ export const transformDoubleReply = {
case 'inf':
case '+inf':
ret = Infinity;
case '-inf':
ret = -Infinity;
case 'nan':
ret = NaN;
default:
ret = Number(reply);
}
@@ -98,7 +98,7 @@ export function createTransformNullableDoubleReplyResp2Func(preserve?: any, type
export const transformNullableDoubleReply = {
2: (reply: BlobStringReply | NullReply, preserve?: any, typeMapping?: TypeMapping) => {
if (reply === null) return null;
return transformDoubleReply[2](reply as BlobStringReply, preserve, typeMapping);
},
3: undefined as unknown as () => DoubleReply | NullReply
@@ -514,19 +514,25 @@ export function parseArgs(command: Command, ...args: Array<any>): CommandArgumen
export type StreamMessageRawReply = TuplesReply<[
id: BlobStringReply,
message: ArrayReply<BlobStringReply>
message: ArrayReply<BlobStringReply>,
millisElapsedFromDelivery?: NumberReply,
deliveriesCounter?: NumberReply
]>;
export type StreamMessageReply = {
id: BlobStringReply,
message: MapReply<BlobStringReply | string, BlobStringReply>,
millisElapsedFromDelivery?: number
deliveriesCounter?: number
};
export function transformStreamMessageReply(typeMapping: TypeMapping | undefined, reply: StreamMessageRawReply): StreamMessageReply {
const [ id, message ] = reply as unknown as UnwrapReply<typeof reply>;
const [ id, message, millisElapsedFromDelivery, deliveriesCounter ] = reply as unknown as UnwrapReply<typeof reply>;
return {
id: id,
message: transformTuplesReply(message, undefined, typeMapping)
message: transformTuplesReply(message, undefined, typeMapping),
...(millisElapsedFromDelivery !== undefined ? { millisElapsedFromDelivery: Number(millisElapsedFromDelivery) } : {}),
...(deliveriesCounter !== undefined ? { deliveriesCounter: Number(deliveriesCounter) } : {})
};
}
@@ -557,7 +563,7 @@ export function transformStreamsMessagesReplyResp2(
reply: UnwrapReply<StreamsMessagesRawReply2 | NullReply>,
preserve?: any,
typeMapping?: TypeMapping
): StreamsMessagesReply | NullReply {
): StreamsMessagesReply | NullReply {
// FUTURE: resposne type if resp3 was working, reverting to old v4 for now
//: MapReply<BlobStringReply | string, StreamMessagesReply> | NullReply {
if (reply === null) return null as unknown as NullReply;
@@ -569,13 +575,13 @@ export function transformStreamsMessagesReplyResp2(
for (let i=0; i < reply.length; i++) {
const stream = reply[i] as unknown as UnwrapReply<StreamMessagesRawReply>;
const name = stream[0];
const rawMessages = stream[1];
ret.set(name.toString(), transformStreamMessagesReply(rawMessages, typeMapping));
}
return ret as unknown as MapReply<string, StreamMessagesReply>;
}
case Array: {
@@ -583,11 +589,11 @@ export function transformStreamsMessagesReplyResp2(
for (let i=0; i < reply.length; i++) {
const stream = reply[i] as unknown as UnwrapReply<StreamMessagesRawReply>;
const name = stream[0];
const rawMessages = stream[1];
ret.push(name);
ret.push(name);
ret.push(transformStreamMessagesReply(rawMessages, typeMapping));
}
@@ -598,13 +604,13 @@ export function transformStreamsMessagesReplyResp2(
for (let i=0; i < reply.length; i++) {
const stream = reply[i] as unknown as UnwrapReply<StreamMessagesRawReply>;
const name = stream[0] as unknown as UnwrapReply<BlobStringReply>;
const rawMessages = stream[1];
ret[name.toString()] = transformStreamMessagesReply(rawMessages);
}
return ret as unknown as MapReply<string, StreamMessagesReply>;
}
*/
@@ -630,7 +636,7 @@ type StreamsMessagesRawReply3 = MapReply<BlobStringReply, ArrayReply<StreamMessa
export function transformStreamsMessagesReplyResp3(reply: UnwrapReply<StreamsMessagesRawReply3 | NullReply>): MapReply<BlobStringReply, StreamMessagesReply> | NullReply {
if (reply === null) return null as unknown as NullReply;
if (reply instanceof Map) {
const ret = new Map<string, StreamMessagesReply>();