You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
ref #2565 - fix X[AUTO]CLAIM
This commit is contained in:
@@ -31,11 +31,10 @@ describe('XAUTOCLAIM', () => {
|
||||
}
|
||||
});
|
||||
|
||||
const [, , id1, id2, , , reply] = await Promise.all([
|
||||
const [, id1, id2, , , reply] = await Promise.all([
|
||||
client.xGroupCreate('key', 'group', '$', {
|
||||
MKSTREAM: true
|
||||
}),
|
||||
client.xGroupCreateConsumer('key', 'group', 'consumer'),
|
||||
client.xAdd('key', '*', message),
|
||||
client.xAdd('key', '*', message),
|
||||
client.xReadGroup('group', 'consumer', {
|
||||
|
@@ -1,5 +1,5 @@
|
||||
import { RedisArgument, TuplesReply, BlobStringReply, ArrayReply, NullReply, UnwrapReply, Command } from '../RESP/types';
|
||||
import { StreamMessageRawReply, isNullReply, transformStreamMessageReply } from './generic-transformers';
|
||||
import { StreamMessageReply, transformStreamMessageNullReply } from './generic-transformers';
|
||||
|
||||
export interface XAutoClaimOptions {
|
||||
COUNT?: number;
|
||||
@@ -7,7 +7,7 @@ export interface XAutoClaimOptions {
|
||||
|
||||
export type XAutoClaimRawReply = TuplesReply<[
|
||||
nextId: BlobStringReply,
|
||||
messages: ArrayReply<StreamMessageRawReply | NullReply>,
|
||||
messages: ArrayReply<StreamMessageReply | NullReply>,
|
||||
deletedMessages: ArrayReply<BlobStringReply>
|
||||
]>;
|
||||
|
||||
@@ -40,9 +40,7 @@ export default {
|
||||
transformReply(reply: UnwrapReply<XAutoClaimRawReply>) {
|
||||
return {
|
||||
nextId: reply[0],
|
||||
messages: (reply[1] as unknown as UnwrapReply<typeof reply[1]>).map(message => {
|
||||
return isNullReply(message) ? null : transformStreamMessageReply(message);
|
||||
}),
|
||||
messages: (reply[1] as unknown as UnwrapReply<typeof reply[1]>).map(transformStreamMessageNullReply),
|
||||
deletedMessages: reply[2]
|
||||
};
|
||||
}
|
||||
|
@@ -89,16 +89,35 @@ describe('XCLAIM', () => {
|
||||
});
|
||||
});
|
||||
|
||||
// TODO: test with messages
|
||||
testUtils.testAll('xClaim', async client => {
|
||||
const [, reply] = await Promise.all([
|
||||
const message = Object.create(null, {
|
||||
field: {
|
||||
value: 'value',
|
||||
enumerable: true
|
||||
}
|
||||
});
|
||||
|
||||
const [, , , , , reply] = await Promise.all([
|
||||
client.xGroupCreate('key', 'group', '$', {
|
||||
MKSTREAM: true
|
||||
}),
|
||||
client.xClaim('key', 'group', 'consumer', 1, '0-0')
|
||||
client.xAdd('key', '1-0', message),
|
||||
client.xAdd('key', '2-0', message),
|
||||
client.xReadGroup('group', 'consumer', {
|
||||
key: 'key',
|
||||
id: '>'
|
||||
}),
|
||||
client.xTrim('key', 'MAXLEN', 1),
|
||||
client.xClaim('key', 'group', 'consumer', 0, ['1-0', '2-0'])
|
||||
]);
|
||||
|
||||
assert.deepEqual(reply, []);
|
||||
assert.deepEqual(reply, [
|
||||
...(testUtils.isVersionGreaterThan([7, 0]) ? [] : [null]),
|
||||
{
|
||||
id: '2-0',
|
||||
message
|
||||
}
|
||||
]);
|
||||
}, {
|
||||
client: GLOBAL.SERVERS.OPEN,
|
||||
cluster: GLOBAL.CLUSTERS.OPEN
|
||||
|
@@ -1,5 +1,5 @@
|
||||
import { RedisArgument, Command } from '../RESP/types';
|
||||
import { RedisVariadicArgument, pushVariadicArguments, transformStreamMessagesReply } from './generic-transformers';
|
||||
import { RedisArgument, ArrayReply, NullReply, UnwrapReply, Command } from '../RESP/types';
|
||||
import { RedisVariadicArgument, pushVariadicArguments, StreamMessageReply, transformStreamMessageNullReply } from './generic-transformers';
|
||||
|
||||
export interface XClaimOptions {
|
||||
IDLE?: number;
|
||||
@@ -50,5 +50,7 @@ export default {
|
||||
|
||||
return args;
|
||||
},
|
||||
transformReply: transformStreamMessagesReply
|
||||
transformReply(reply: UnwrapReply<ArrayReply<StreamMessageReply | NullReply>>) {
|
||||
return reply.map(transformStreamMessageNullReply);
|
||||
}
|
||||
} as const satisfies Command;
|
||||
|
@@ -1,5 +1,5 @@
|
||||
import { TuplesToMapReply, BlobStringReply, NumberReply, NullReply, Resp2Reply, Command, RespType, RESP_TYPES, RedisArgument } from '../RESP/types';
|
||||
import { StreamMessageRawReply, transformStreamMessageReply } from './generic-transformers';
|
||||
import { RedisArgument, TuplesToMapReply, BlobStringReply, NumberReply, NullReply, TuplesReply, ArrayReply, UnwrapReply, Command } from '../RESP/types';
|
||||
import { isNullReply, transformTuplesReply } from './generic-transformers';
|
||||
|
||||
export type XInfoStreamReply = TuplesToMapReply<[
|
||||
[BlobStringReply<'length'>, NumberReply],
|
||||
@@ -13,8 +13,8 @@ export type XInfoStreamReply = TuplesToMapReply<[
|
||||
/** added in 7.2 */
|
||||
[BlobStringReply<'recorded-first-entry-id'>, BlobStringReply],
|
||||
[BlobStringReply<'groups'>, NumberReply],
|
||||
[BlobStringReply<'first-entry'>, ReturnType<typeof transformStreamMessageReply> | NullReply],
|
||||
[BlobStringReply<'last-entry'>, ReturnType<typeof transformStreamMessageReply> | NullReply]
|
||||
[BlobStringReply<'first-entry'>, ReturnType<typeof transformEntry>],
|
||||
[BlobStringReply<'last-entry'>, ReturnType<typeof transformEntry>]
|
||||
]>;
|
||||
|
||||
export default {
|
||||
@@ -66,6 +66,17 @@ export default {
|
||||
}
|
||||
} as const satisfies Command;
|
||||
|
||||
function transformEntry(entry: StreamMessageRawReply | NullReply) {
|
||||
return entry === null ? null : transformStreamMessageReply(entry as StreamMessageRawReply);
|
||||
type RawEntry = TuplesReply<[
|
||||
id: BlobStringReply,
|
||||
message: ArrayReply<BlobStringReply>
|
||||
]> | NullReply;
|
||||
|
||||
function transformEntry(entry: RawEntry) {
|
||||
if (isNullReply(entry)) return entry;
|
||||
|
||||
const [id, message] = entry as unknown as UnwrapReply<typeof entry>;
|
||||
return {
|
||||
id,
|
||||
message: transformTuplesReply(message)
|
||||
};
|
||||
}
|
||||
|
@@ -1,5 +1,5 @@
|
||||
import { RedisArgument, Command } from '../RESP/types';
|
||||
import { transformStreamMessagesReply } from './generic-transformers';
|
||||
import { RedisArgument, ArrayReply, UnwrapReply, Command } from '../RESP/types';
|
||||
import { StreamMessageReply, transformStreamMessageReply } from './generic-transformers';
|
||||
|
||||
export interface XRangeOptions {
|
||||
COUNT?: number;
|
||||
@@ -25,5 +25,7 @@ export default {
|
||||
FIRST_KEY_INDEX: 1,
|
||||
IS_READ_ONLY: true,
|
||||
transformArguments: transformXRangeArguments.bind(undefined, 'XRANGE'),
|
||||
transformReply: transformStreamMessagesReply
|
||||
transformReply(reply: UnwrapReply<ArrayReply<StreamMessageReply>>) {
|
||||
return reply.map(transformStreamMessageReply);
|
||||
}
|
||||
} as const satisfies Command;
|
||||
|
@@ -84,12 +84,12 @@ export function transformTuplesReply(
|
||||
return message;
|
||||
}
|
||||
|
||||
export type StreamMessageRawReply = TuplesReply<[
|
||||
export type StreamMessageReply = TuplesReply<[
|
||||
id: BlobStringReply,
|
||||
message: ArrayReply<BlobStringReply>
|
||||
]>;
|
||||
|
||||
export function transformStreamMessageReply(reply: StreamMessageRawReply) {
|
||||
export function transformStreamMessageReply(reply: StreamMessageReply) {
|
||||
const [ id, message ] = reply as unknown as UnwrapReply<typeof reply>;
|
||||
return {
|
||||
id,
|
||||
@@ -97,24 +97,10 @@ export function transformStreamMessageReply(reply: StreamMessageRawReply) {
|
||||
};
|
||||
}
|
||||
|
||||
export type StreamMessagesRawReply = ArrayReply<StreamMessageRawReply>;
|
||||
|
||||
export function transformStreamMessagesReply(reply: StreamMessagesRawReply) {
|
||||
return (reply as unknown as UnwrapReply<typeof reply>)
|
||||
.map(message => transformStreamMessageReply(message));
|
||||
export function transformStreamMessageNullReply(reply: StreamMessageReply | NullReply) {
|
||||
return isNullReply(reply) ? reply : transformStreamMessageReply(reply);
|
||||
}
|
||||
|
||||
// export type StreamsMessagesReply = MapReply<BlobStringReply, StreamMessagesRawReply>;
|
||||
|
||||
// export function transformStreamsMessagesReply(reply: Array<any> | null): StreamsMessagesReply | null {
|
||||
// if (reply === null) return null;
|
||||
|
||||
// return reply.map(([name, rawMessages]) => ({
|
||||
// name,
|
||||
// messages: transformStreamMessagesReply(rawMessages)
|
||||
// }));
|
||||
// }
|
||||
|
||||
export interface SortedSetMember {
|
||||
value: RedisArgument;
|
||||
score: number;
|
||||
|
Reference in New Issue
Block a user