1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

implement h/s/zScanIterator

Co-authored-by: Leibale Eidelman <me@leibale.com>
This commit is contained in:
shacharPash
2023-02-08 18:52:07 +02:00
parent 3511995cf7
commit 1a8fde001f
3 changed files with 225 additions and 58 deletions

View File

@@ -2,13 +2,16 @@ import { strict as assert } from 'assert';
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
import RedisClient, { RedisClientType } from '.';
import { RedisClientMultiCommandType } from './multi-command';
import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands';
import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts, ConvertArgumentType } from '../commands';
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
import { defineScript } from '../lua-script';
import { spy } from 'sinon';
import { once } from 'events';
import { ClientKillFilters } from '../commands/CLIENT_KILL';
import { promisify } from 'util';
import { commandOptions } from '../../dist/lib/command-options';
import { ZMember } from '../commands/generic-transformers';
export const SQUARE_SCRIPT = defineScript({
SCRIPT: 'return ARGV[1] * ARGV[1];',
@@ -639,10 +642,10 @@ describe('Client', () => {
await client.mSet(args);
const results = new Set<Buffer>(),
iteartor = client.scanIterator(
iterator = client.scanIterator(
client.commandOptions({ returnBuffers: true })
);
for await (const key of iteartor) {
for await (const key of iterator) {
results.add(key);
}
@@ -650,65 +653,145 @@ describe('Client', () => {
}, GLOBAL.SERVERS.OPEN);
});
testUtils.testWithClient('hScanIterator', async client => {
const hash: Record<string, string> = {};
for (let i = 0; i < 100; i++) {
hash[i.toString()] = i.toString();
}
describe('hScanIterator', () => {
testUtils.testWithClient('strings', async client => {
const hash: Record<string, string> = {};
for (let i = 0; i < 100; i++) {
hash[i.toString()] = i.toString();
}
await client.hSet('key', hash);
await client.hSet('key', hash);
const results: Record<string, string> = {};
for await (const { field, value } of client.hScanIterator('key')) {
results[field] = value;
}
const results: Record<string, string> = {};
for await (const { field, value } of client.hScanIterator('key')) {
results[field] = value;
}
assert.deepEqual(hash, results);
}, GLOBAL.SERVERS.OPEN);
assert.deepEqual(hash, results);
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('sScanIterator', async client => {
const members = new Set<string>();
for (let i = 0; i < 100; i++) {
members.add(i.toString());
}
testUtils.testWithClient('buffers', async client => {
const hash = new Map<Buffer, Buffer>();
for (let i = 0; i < 100; i++) {
const buffer = Buffer.from([i]);
hash.set(buffer, buffer);
}
await client.sAdd('key', Array.from(members));
await client.hSet('key', hash);
const results = new Set<string>();
for await (const key of client.sScanIterator('key')) {
results.add(key);
}
const results = new Map<Buffer, Buffer>(),
iterator = client.hScanIterator(
client.commandOptions({ returnBuffers: true }),
'key'
);
for await (const { field, value } of iterator) {
results.set(field, value);
}
assert.deepEqual(members, results);
}, GLOBAL.SERVERS.OPEN);
assert.deepEqual(hash, results);
}, GLOBAL.SERVERS.OPEN);
});
testUtils.testWithClient('zScanIterator', async client => {
const members = [];
for (let i = 0; i < 100; i++) {
members.push({
score: 1,
value: i.toString()
});
}
describe('sScanIterator', () => {
testUtils.testWithClient('strings', async client => {
const members = new Set<string>();
for (let i = 0; i < 100; i++) {
members.add(i.toString());
}
await client.zAdd('key', members);
await client.sAdd('key', Array.from(members));
const map = new Map<string, number>();
for await (const member of client.zScanIterator('key')) {
map.set(member.value, member.score);
}
const results = new Set<string>();
for await (const key of client.sScanIterator('key')) {
results.add(key);
}
type MemberTuple = [string, number];
assert.deepEqual(members, results);
}, GLOBAL.SERVERS.OPEN);
function sort(a: MemberTuple, b: MemberTuple) {
return Number(b[0]) - Number(a[0]);
}
testUtils.testWithClient('buffers', async client => {
const members = new Set<Buffer>();
for (let i = 0; i < 100; i++) {
members.add(Buffer.from([i]));
}
assert.deepEqual(
[...map.entries()].sort(sort),
members.map<MemberTuple>(member => [member.value, member.score]).sort(sort)
);
}, GLOBAL.SERVERS.OPEN);
await client.sAdd('key', Array.from(members));
const results = new Set<Buffer>(),
iterator = client.sScanIterator(
client.commandOptions({ returnBuffers: true }),
'key'
);
for await (const key of iterator) {
results.add(key);
}
assert.deepEqual(members, results);
}, GLOBAL.SERVERS.OPEN);
});
describe('zScanIterator', () => {
testUtils.testWithClient('strings', async client => {
const members: Array<ConvertArgumentType<ZMember, string>> = [];
for (let i = 0; i < 100; i++) {
members.push({
score: i,
value: i.toString()
});
}
await client.zAdd('key', members);
const map = new Map<string, number>();
for await (const member of client.zScanIterator('key')) {
map.set(member.value, member.score);
}
type MemberTuple = [string, number];
function sort(a: MemberTuple, b: MemberTuple) {
return Number(b[0]) - Number(a[0]);
}
assert.deepEqual(
[...map.entries()].sort(sort),
members.map<MemberTuple>(member => [member.value, member.score]).sort(sort)
);
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('buffers', async client => {
const members: Array<ConvertArgumentType<ZMember, Buffer>> = [];
for (let i = 0; i < 100; i++) {
members.push({
score: i,
value: Buffer.from([i])
});
}
await client.zAdd('key', members);
const map = new Map<Buffer, number>(),
iterator = client.zScanIterator(
client.commandOptions({ returnBuffers: true }),
'key'
);
for await (const member of iterator) {
map.set(member.value, member.score);
}
type MemberTuple = [Buffer, number];
function sort(a: MemberTuple, b: MemberTuple) {
return b[0][0] - a[0][0];
}
assert.deepEqual(
[...map.entries()].sort(sort),
members.map<MemberTuple>(member => [member.value, member.score]).sort(sort)
);
}, GLOBAL.SERVERS.OPEN);
});
describe('PubSub', () => {
testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => {

View File

@@ -1,5 +1,5 @@
import COMMANDS from './commands';
import { RedisCommand, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, RedisCommandSignature, ConvertArgumentType, RedisFunction, ExcludeMappedString, RedisCommands } from '../commands';
import { RedisCommand, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, RedisCommandSignature, ConvertArgumentType, RedisFunction, ExcludeMappedString, RedisCommands, RedisCommandArgument } from '../commands';
import RedisSocket, { RedisSocketOptions, RedisTlsSocketOptions } from './socket';
import RedisCommandsQueue, { PubSubListener, PubSubSubscribeCommands, PubSubUnsubscribeCommands, QueueCommandOptions } from './commands-queue';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
@@ -656,6 +656,20 @@ export default class RedisClient<
return results;
}
// #scanIterator<T extends CommandOptions<ClientCommandOptions>>(
// commandOptions: T,
// options?: ScanCommandOptions
// ): AsyncIterable<T['returnBuffers'] extends true ? Buffer : string>;
// #scanIterator(
// options?: ScanCommandOptions
// ): AsyncIterable<string>;
// async* #scanIterator<T extends CommandOptions<ClientCommandOptions>>(
// commandOptions?: T | ScanCommandOptions,
// options?: ScanCommandOptions
// ): AsyncIterable<T['returnBuffers'] extends true ? Buffer : string> {
// }
scanIterator<T extends CommandOptions<ClientCommandOptions>>(
commandOptions: T,
options?: ScanCommandOptions
@@ -674,7 +688,7 @@ export default class RedisClient<
const scan = commandOptions ?
(...args: Array<unknown>) => (this as any).scan(commandOptions, ...args) :
(...args: Array<unknown>) => (this as any).scan(...args);
(this as any).scan.bind(this);
let cursor = 0;
do {
@@ -686,10 +700,33 @@ export default class RedisClient<
} while (cursor !== 0);
}
async* hScanIterator(key: string, options?: ScanOptions): AsyncIterable<ConvertArgumentType<HScanTuple, string>> {
hScanIterator<T extends CommandOptions<ClientCommandOptions>>(
commandOptions: T,
key: RedisCommandArgument,
options?: ScanOptions
): AsyncIterable<ConvertArgumentType<HScanTuple, T['returnBuffers'] extends true ? Buffer : string>>;
hScanIterator(
key: RedisCommandArgument,
options?: ScanOptions
): AsyncIterable<ConvertArgumentType<HScanTuple, string>>;
async* hScanIterator<T extends CommandOptions<ClientCommandOptions>>(
commandOptions?: T | RedisCommandArgument,
key?: RedisCommandArgument | ScanOptions,
options?: ScanOptions
): AsyncIterable<ConvertArgumentType<HScanTuple, T['returnBuffers'] extends true ? Buffer : string>> {
if (!isCommandOptions(commandOptions)) {
options = key as ScanOptions | undefined;
key = commandOptions;
commandOptions = undefined;
}
const hScan = commandOptions ?
(...args: Array<unknown>) => (this as any).hScan(commandOptions, ...args) :
(this as any).hScan.bind(this);
let cursor = 0;
do {
const reply = await (this as any).hScan(key, cursor, options);
const reply = await hScan(key, cursor, options);
cursor = reply.cursor;
for (const tuple of reply.tuples) {
yield tuple;
@@ -697,10 +734,34 @@ export default class RedisClient<
} while (cursor !== 0);
}
async* sScanIterator(key: string, options?: ScanOptions): AsyncIterable<string> {
sScanIterator<T extends CommandOptions<ClientCommandOptions>>(
commandOptions: T,
key: RedisCommandArgument,
options?: ScanOptions
): AsyncIterable<T['returnBuffers'] extends true ? Buffer : string>;
sScanIterator(
key: RedisCommandArgument,
options?: ScanOptions
): AsyncIterable<string>;
async* sScanIterator<T extends CommandOptions<ClientCommandOptions>>(
commandOptions?: T | RedisCommandArgument,
key?: RedisCommandArgument | ScanOptions,
options?: ScanOptions
): AsyncIterable<T['returnBuffers'] extends true ? Buffer : string> {
if (!isCommandOptions(commandOptions)) {
options = key as ScanOptions | undefined;
key = commandOptions;
commandOptions = undefined;
}
const sScan = commandOptions ?
(...args: Array<unknown>) => (this as any).sScan(commandOptions, ...args) :
(this as any).sScan.bind(this);
let cursor = 0;
do {
const reply = await (this as any).sScan(key, cursor, options);
const reply = await sScan(key, cursor, options);
cursor = reply.cursor;
for (const member of reply.members) {
yield member;
@@ -708,10 +769,33 @@ export default class RedisClient<
} while (cursor !== 0);
}
async* zScanIterator(key: string, options?: ScanOptions): AsyncIterable<ConvertArgumentType<ZMember, string>> {
zScanIterator<T extends CommandOptions<ClientCommandOptions>>(
commandOptions: T,
key: RedisCommandArgument,
options?: ScanOptions
): AsyncIterable<ConvertArgumentType<ZMember, T['returnBuffers'] extends true ? Buffer : string>>;
zScanIterator(
key: RedisCommandArgument,
options?: ScanOptions
): AsyncIterable<ConvertArgumentType<ZMember, string>>;
async* zScanIterator<T extends CommandOptions<ClientCommandOptions>>(
commandOptions?: T | RedisCommandArgument,
key?: RedisCommandArgument | ScanOptions,
options?: ScanOptions
): AsyncIterable<ConvertArgumentType<ZMember, T['returnBuffers'] extends true ? Buffer : string>> {
if (!isCommandOptions(commandOptions)) {
options = key as ScanOptions | undefined;
key = commandOptions;
commandOptions = undefined;
}
const zScan = commandOptions ?
(...args: Array<unknown>) => (this as any).zScan(commandOptions, ...args) :
(this as any).zScan.bind(this);
let cursor = 0;
do {
const reply = await (this as any).zScan(key, cursor, options);
const reply = await zScan(key, cursor, options);
cursor = reply.cursor;
for (const member of reply.members) {
yield member;