1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-10 11:43:01 +03:00

v4.0.0-rc.2 (#1664)

* update workflows & README

* add .deepsource.toml

* fix client.quit, add error events on cluster, fix some "deepsource.io" warnings

* Release 4.0.0-rc.1

* add cluster.duplicate, add some tests

* fix #1650 - add support for Buffer in some commands, add GET_BUFFER command

* fix GET and GET_BUFFER return type

* update FAQ

* Update invalid code example in README.md (#1654)

* Update invalid code example in README.md

* Update README.md

Co-authored-by: Leibale Eidelman <leibale1998@gmail.com>

* fix #1652

* ref #1653 - better types

* better types

* fix 54124793ad

* Update GEOSEARCHSTORE.spec.ts

* fix #1660 - add support for client.HSET('key', 'field', 'value')

* upgrade dependencies, update README

* fix #1659 - add support for db-number in client options url

* fix README, remove unused import, downgrade typedoc & typedoc-plugin-markdown

* update client-configurations.md

* fix README

* add CLUSTER_SLOTS, add some tests

* fix "createClient with url" test with redis 5

* remove unused imports

* Release 4.0.0-rc.2

Co-authored-by: Richard Samuelsson <noobtoothfairy@gmail.com>
This commit is contained in:
Leibale Eidelman
2021-09-23 16:36:40 -04:00
committed by GitHub
parent 77664c31ff
commit e592d9403d
81 changed files with 1389 additions and 966 deletions

View File

@@ -49,13 +49,11 @@ import { createClient } from 'redis';
})();
```
The above code connects to localhost on port 6379. To connect to a different host or port, use a connection string in the format `[redis[s]:]//[[username][:password]@][host][:port]`:
The above code connects to localhost on port 6379. To connect to a different host or port, use a connection string in the format `redis[s]://[[username][:password]@][host][:port][/db-number]`:
```typescript
createClient({
socket: {
url: 'redis://alice:foobared@awesome.redis.server:6380'
}
url: 'redis://alice:foobared@awesome.redis.server:6380',
});
```
@@ -80,7 +78,7 @@ Modifiers to commands are specified using a JavaScript object:
```typescript
await client.set('key', 'value', {
EX: 10,
NX: true
NX: true,
});
```
@@ -108,11 +106,11 @@ Start a [transaction](https://redis.io/topics/transactions) by calling `.multi()
```typescript
await client.set('another-key', 'another-value');
const [ setKeyReply, otherKeyValue ] = await client.multi()
const [setKeyReply, otherKeyValue] = await client
.multi()
.set('key', 'value')
.get('another-key')
.exec()
]); // ['OK', 'another-value']
.exec(); // ['OK', 'another-value']
```
You can also [watch](https://redis.io/topics/transactions#optimistic-locking-using-check-and-set) keys by calling `.watch()`. Your transaction will abort if any of the watched keys change.
@@ -128,10 +126,7 @@ This pattern works especially well for blocking commands—such as `BLPOP` and `
```typescript
import { commandOptions } from 'redis';
const blPopPromise = client.blPop(
commandOptions({ isolated: true }),
'key'
);
const blPopPromise = client.blPop(commandOptions({ isolated: true }), 'key');
await client.lPush('key', ['1', '2']);
@@ -153,7 +148,7 @@ await subscriber.connect();
Once you have one, simply subscribe and unsubscribe as needed:
```typescript
await subscriber.subscribe('channel', message => {
await subscriber.subscribe('channel', (message) => {
console.log(message); // 'message'
});
@@ -186,9 +181,12 @@ for await (const key of client.scanIterator()) {
This works with `HSCAN`, `SSCAN`, and `ZSCAN` too:
```typescript
for await (const member of client.hScanIterator('hash')) {}
for await (const { field, value } of client.sScanIterator('set')) {}
for await (const { member, score } of client.zScanIterator('sorted-set')) {}
for await (const member of client.hScanIterator('hash')) {
}
for await (const { field, value } of client.sScanIterator('set')) {
}
for await (const { member, score } of client.zScanIterator('sorted-set')) {
}
```
You can override the default options by providing a configuration object:
@@ -197,7 +195,7 @@ You can override the default options by providing a configuration object:
client.scanIterator({
TYPE: 'string', // `SCAN` only
MATCH: 'patter*',
COUNT: 100
COUNT: 100,
});
```
@@ -214,16 +212,15 @@ import { createClient, defineScript } from 'redis';
add: defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT:
'local val = redis.pcall("GET", KEYS[1]);' +
'return val + ARGV[1];',
"local val = redis.pcall('GET', KEYS[1]);' + 'return val + ARGV[1];",
transformArguments(key: string, toAdd: number): Array<string> {
return [key, number.toString()];
},
transformReply(reply: number): number {
return reply;
}
})
}
},
}),
},
});
await client.connect();
@@ -242,13 +239,16 @@ import { createCluster } from 'redis';
(async () => {
const cluster = createCluster({
rootNodes: [{
rootNodes: [
{
host: '10.0.0.1',
port: 30001
}, {
port: 30001,
},
{
host: '10.0.0.2',
port: 30002
}]
port: 30002,
},
],
});
cluster.on('error', (err) => console.log('Redis Cluster Error', err));
@@ -274,7 +274,7 @@ Of course, if you don't do something with your Promises you're certain to get [u
```typescript
await Promise.all([
client.set('Tm9kZSBSZWRpcw==', 'users:1'),
client.sAdd('users:1:tokens', 'Tm9kZSBSZWRpcw==')
client.sAdd('users:1:tokens', 'Tm9kZSBSZWRpcw=='),
]);
```
@@ -284,7 +284,9 @@ If you'd like to contribute, check out the [contributing guide](CONTRIBUTING.md)
Thank you to all the people who already contributed to Node Redis!
<a href="https://github.com/NodeRedis/node-redis/graphs/contributors"><img src="https://opencollective.com/node-redis/contributors.svg?width=1012" /></a>
<a href="https://github.com/NodeRedis/node-redis/graphs/contributors">
<img src="https://contrib.rocks/image?repo=NodeRedis/node-redis"/>
</a>
## License

View File

@@ -8,6 +8,6 @@ When a socket closed unexpectedly, all the commands that were already sent will
## How are commands batched?
Commands are pipelined using [`queueMicrotask`](https://nodejs.org/api/globals.html#globals_queuemicrotask_callback). Commands from the same "tick" will be sent in batches and respect the [`writableHighWaterMark`](https://nodejs.org/api/stream.html#stream_new_stream_writable_options).
Commands are pipelined using [`queueMicrotask`](https://nodejs.org/api/globals.html#globals_queuemicrotask_callback).
If `socket.write()` returns `false`—meaning that ["all or part of the data was queued in user memory"](https://nodejs.org/api/net.html#net_socket_write_data_encoding_callback:~:text=all%20or%20part%20of%20the%20data%20was%20queued%20in%20user%20memory)—the commands will stack in memory until the [`drain`](https://nodejs.org/api/net.html#net_event_drain) event is fired.

View File

@@ -1,20 +1,22 @@
# `createClient` configuration
| Property | Default | Description |
|--------------------------|------------------------------------------|------------------------------------------------------------------------------------------------------------------------------|
|--------------------------|------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| url | | `redis[s]://[[username][:password]@][host][:port][/db-number]` (see [`redis`](https://www.iana.org/assignments/uri-schemes/prov/redis) and [`rediss`](https://www.iana.org/assignments/uri-schemes/prov/rediss) IANA registration for more details) |
| socket | | Object defining socket connection properties |
| socket.url | | `[redis[s]:]//[[username][:password]@][host][:port]` |
| socket.host | `'localhost'` | Hostname to connect to |
| socket.port | `6379` | Port to connect to |
| socket.username | | ACL username ([see ACL guide](https://redis.io/topics/acl)) |
| socket.password | | ACL password or the old "--requirepass" password |
| socket.path | | UNIX Socket to connect to |
| socket.connectTimeout | `5000` | The timeout for connecting to the Redis Server (in milliseconds) |
| socket.noDelay | `true` | Enable/disable the use of [`Nagle's algorithm`](https://nodejs.org/api/net.html#net_socket_setnodelay_nodelay) |
| socket.keepAlive | `5000` | Enable/disable the [`keep-alive`](https://nodejs.org/api/net.html#net_socket_setkeepalive_enable_initialdelay) functionality |
| socket.tls | | Set to `true` to enable [TLS Configuration](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback) |
| socket.reconnectStrategy | `retries => Math.min(retries * 50, 500)` | A function containing the [Reconnect Strategy](#reconnect-strategy) logic |
| username | | ACL username ([see ACL guide](https://redis.io/topics/acl)) |
| password | | ACL password or the old "--requirepass" password |
| database | | Database number to connect to (see [`SELECT`](https://redis.io/commands/select) command) |
| modules | | Object defining which [Redis Modules](https://redis.io/modules) to include (TODO - document) |
| scripts | | Object defining Lua scripts to use with this client. See [Lua Scripts](../README.md#lua-scripts) |
| scripts | | Object defining Lua Scripts to use with this client (see [Lua Scripts](../README.md#lua-scripts)) |
| commandsQueueMaxLength | | Maximum length of the client's internal command queue |
| readonly | `false` | Connect in [`READONLY`](https://redis.io/commands/readonly) mode |
| legacyMode | `false` | Maintain some backwards compatibility (see the [Migration Guide](v3-to-v4.md)) |

View File

@@ -5,6 +5,7 @@ import RedisClient from './client';
import { AbortError, ClientClosedError, ConnectionTimeoutError, WatchError } from './errors';
import { defineScript } from './lua-script';
import { spy } from 'sinon';
import { RedisNetSocketOptions } from './socket';
export const SQUARE_SCRIPT = defineScript({
NUMBER_OF_KEYS: 0,
@@ -18,6 +19,81 @@ export const SQUARE_SCRIPT = defineScript({
});
describe('Client', () => {
describe('parseURL', () => {
it('redis://user:secret@localhost:6379/0', () => {
assert.deepEqual(
RedisClient.parseURL('redis://user:secret@localhost:6379/0'),
{
socket: {
host: 'localhost',
port: 6379
},
username: 'user',
password: 'secret',
database: 0
}
);
});
it('rediss://user:secret@localhost:6379/0', () => {
assert.deepEqual(
RedisClient.parseURL('rediss://user:secret@localhost:6379/0'),
{
socket: {
host: 'localhost',
port: 6379,
tls: true
},
username: 'user',
password: 'secret',
database: 0
}
);
});
it('Invalid protocol', () => {
assert.throws(
() => RedisClient.parseURL('redi://user:secret@localhost:6379/0'),
TypeError
);
});
it('Invalid pathname', () => {
assert.throws(
() => RedisClient.parseURL('redis://user:secret@localhost:6379/NaN'),
TypeError
);
});
it('redis://localhost', () => {
assert.deepEqual(
RedisClient.parseURL('redis://localhost'),
{
socket: {
host: 'localhost',
}
}
);
});
it('createClient with url', async () => {
const client = RedisClient.create({
url: `redis://localhost:${(TEST_REDIS_SERVERS[TestRedisServers.OPEN].socket as RedisNetSocketOptions)!.port!.toString()}/1`
});
await client.connect();
try {
assert.equal(
await client.ping(),
'PONG'
);
} finally {
await client.disconnect();
}
})
});
describe('authentication', () => {
itWithClient(TestRedisServers.PASSWORD, 'Client should be authenticated', async client => {
assert.equal(
@@ -28,10 +104,8 @@ describe('Client', () => {
it('should not retry connecting if failed due to wrong auth', async () => {
const client = RedisClient.create({
socket: {
...TEST_REDIS_SERVERS[TestRedisServers.PASSWORD],
password: 'wrongpassword'
}
});
await assert.rejects(
@@ -49,7 +123,7 @@ describe('Client', () => {
describe('legacyMode', () => {
const client = RedisClient.create({
socket: TEST_REDIS_SERVERS[TestRedisServers.OPEN],
...TEST_REDIS_SERVERS[TestRedisServers.OPEN],
scripts: {
square: SQUARE_SCRIPT
},
@@ -173,9 +247,7 @@ describe('Client', () => {
describe('events', () => {
it('connect, ready, end', async () => {
const client = RedisClient.create({
socket: TEST_REDIS_SERVERS[TestRedisServers.OPEN]
});
const client = RedisClient.create(TEST_REDIS_SERVERS[TestRedisServers.OPEN]);
await Promise.all([
client.connect(),
@@ -195,6 +267,13 @@ describe('Client', () => {
assert.equal(await client.sendCommand(['PING']), 'PONG');
});
itWithClient(TestRedisServers.OPEN, 'bufferMode', async client => {
assert.deepEqual(
await client.sendCommand(['PING'], undefined, true),
Buffer.from('PONG')
);
});
describe('AbortController', () => {
before(function () {
if (!global.AbortController) {
@@ -509,6 +588,9 @@ describe('Client', () => {
assert.ok(channelListener1.calledOnce);
assert.ok(channelListener2.calledTwice);
assert.ok(patternListener.calledThrice);
// should be able to send commands when unsubsribed from all channels (see #1652)
await assert.doesNotReject(subscriber.ping());
} finally {
await subscriber.disconnect();
}
@@ -540,9 +622,7 @@ describe('Client', () => {
});
it('client.quit', async () => {
const client = RedisClient.create({
socket: TEST_REDIS_SERVERS[TestRedisServers.OPEN]
});
const client = RedisClient.create(TEST_REDIS_SERVERS[TestRedisServers.OPEN]);
await client.connect();

View File

@@ -1,6 +1,6 @@
import RedisSocket, { RedisSocketOptions } from './socket';
import RedisSocket, { RedisSocketOptions, RedisNetSocketOptions, RedisTlsSocketOptions } from './socket';
import RedisCommandsQueue, { PubSubListener, PubSubSubscribeCommands, PubSubUnsubscribeCommands, QueueCommandOptions } from './commands-queue';
import COMMANDS from './commands';
import COMMANDS, { TransformArgumentsReply } from './commands';
import { RedisCommand, RedisModules, RedisReply } from './commands';
import RedisMultiCommand, { MultiQueuedCommand, RedisMultiCommandType } from './multi-command';
import EventEmitter from 'events';
@@ -12,9 +12,14 @@ import { HScanTuple } from './commands/HSCAN';
import { encodeCommand, extendWithDefaultCommands, extendWithModulesAndScripts, transformCommandArguments } from './commander';
import { Pool, Options as PoolOptions, createPool } from 'generic-pool';
import { ClientClosedError } from './errors';
import { URL } from 'url';
export interface RedisClientOptions<M = RedisModules, S = RedisLuaScripts> {
export interface RedisClientOptions<M, S> {
url?: string;
socket?: RedisSocketOptions;
username?: string;
password?: string;
database?: number;
modules?: M;
scripts?: S;
commandsQueueMaxLength?: number;
@@ -43,55 +48,25 @@ type WithScripts<S extends RedisLuaScripts> = {
export type WithPlugins<M extends RedisModules, S extends RedisLuaScripts> =
WithCommands & WithModules<M> & WithScripts<S>;
export type RedisClientType<M extends RedisModules, S extends RedisLuaScripts> =
export type RedisClientType<M extends RedisModules = {}, S extends RedisLuaScripts = {}> =
WithPlugins<M, S> & RedisClient<M, S>;
export interface ClientCommandOptions extends QueueCommandOptions {
isolated?: boolean;
}
export default class RedisClient<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> extends EventEmitter {
export default class RedisClient<M extends RedisModules, S extends RedisLuaScripts> extends EventEmitter {
static commandOptions(options: ClientCommandOptions): CommandOptions<ClientCommandOptions> {
return commandOptions(options);
}
static async commandsExecutor(
this: RedisClient,
command: RedisCommand,
args: Array<unknown>
): Promise<ReturnType<typeof command['transformReply']>> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(command, args);
const reply = command.transformReply(
await this.#sendCommand(redisArgs, options),
redisArgs.preserve
);
return reply;
}
static async #scriptsExecutor(
this: RedisClient,
script: RedisLuaScript,
args: Array<unknown>
): Promise<typeof script['transformArguments']> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(script, args);
const reply = script.transformReply(
await this.executeScript(script, redisArgs, options),
redisArgs.preserve
);
return reply;
}
static create<M extends RedisModules, S extends RedisLuaScripts>(options?: RedisClientOptions<M, S>): RedisClientType<M, S> {
static create<M extends RedisModules = {}, S extends RedisLuaScripts = {}>(options?: RedisClientOptions<M, S>): RedisClientType<M, S> {
const Client = (<any>extendWithModulesAndScripts({
BaseClass: RedisClient,
modules: options?.modules,
modulesCommandsExecutor: RedisClient.commandsExecutor,
modulesCommandsExecutor: RedisClient.prototype.commandsExecutor,
scripts: options?.scripts,
scriptsExecutor: RedisClient.#scriptsExecutor
scriptsExecutor: RedisClient.prototype.scriptsExecutor
}));
if (Client !== RedisClient) {
@@ -101,6 +76,45 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
return new Client(options);
}
static parseURL(url: string): RedisClientOptions<{}, {}> {
// https://www.iana.org/assignments/uri-schemes/prov/redis
const { hostname, port, protocol, username, password, pathname } = new URL(url),
parsed: RedisClientOptions<{}, {}> = {
socket: {
host: hostname
}
};
if (protocol === 'rediss:') {
(parsed.socket as RedisTlsSocketOptions).tls = true;
} else if (protocol !== 'redis:') {
throw new TypeError('Invalid protocol');
}
if (port) {
(parsed.socket as RedisNetSocketOptions).port = Number(port);
}
if (username) {
parsed.username = username;
}
if (password) {
parsed.password = password;
}
if (pathname.length > 1) {
const database = Number(pathname.substring(1));
if (isNaN(database)) {
throw new TypeError('Invalid pathname');
}
parsed.database = database;
}
return parsed;
}
readonly #options?: RedisClientOptions<M, S>;
readonly #socket: RedisSocket;
readonly #queue: RedisCommandsQueue;
@@ -108,7 +122,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
readonly #v4: Record<string, any> = {};
#selectedDB = 0;
get options(): RedisClientOptions<M> | null | undefined {
get options(): RedisClientOptions<M, S> | undefined {
return this.#options;
}
@@ -126,7 +140,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
constructor(options?: RedisClientOptions<M, S>) {
super();
this.#options = options;
this.#options = this.#initiateOptions(options);
this.#socket = this.#initiateSocket();
this.#queue = this.#initiateQueue();
this.#isolationPool = createPool({
@@ -140,6 +154,23 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
this.#legacyMode();
}
#initiateOptions(options?: RedisClientOptions<M, S>): RedisClientOptions<M, S> | undefined {
if (options?.url) {
const parsed = RedisClient.parseURL(options.url);
if (options.socket) {
parsed.socket = Object.assign(options.socket, parsed.socket);
}
Object.assign(options, parsed);
}
if (options?.database) {
this.#selectedDB = options.database;
}
return options;
}
#initiateSocket(): RedisSocket {
const socketInitiator = async (): Promise<void> => {
const v4Commands = this.#options?.legacyMode ? this.#v4 : this,
@@ -153,8 +184,8 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
promises.push(v4Commands.readonly(RedisClient.commandOptions({ asap: true })));
}
if (this.#options?.socket?.username || this.#options?.socket?.password) {
promises.push(v4Commands.auth(RedisClient.commandOptions({ asap: true }), this.#options.socket));
if (this.#options?.username || this.#options?.password) {
promises.push(v4Commands.auth(RedisClient.commandOptions({ asap: true }), this.#options));
}
const resubscribePromise = this.#queue.resubscribe();
@@ -182,10 +213,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
}
#initiateQueue(): RedisCommandsQueue {
return new RedisCommandsQueue(
this.#options?.commandsQueueMaxLength,
(encodedCommands: string) => this.#socket.write(encodedCommands)
);
return new RedisCommandsQueue(this.#options?.commandsQueueMaxLength);
}
#legacyMode(): void {
@@ -247,6 +275,72 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
await this.#socket.connect();
}
async commandsExecutor(command: RedisCommand, args: Array<unknown>): Promise<ReturnType<typeof command['transformReply']>> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(command, args);
return command.transformReply(
await this.#sendCommand(redisArgs, options, command.BUFFER_MODE),
redisArgs.preserve,
);
}
sendCommand<T = RedisReply>(args: TransformArgumentsReply, options?: ClientCommandOptions, bufferMode?: boolean): Promise<T> {
return this.#sendCommand(args, options, bufferMode);
}
// using `#sendCommand` cause `sendCommand` is overwritten in legacy mode
async #sendCommand<T = RedisReply>(args: TransformArgumentsReply, options?: ClientCommandOptions, bufferMode?: boolean): Promise<T> {
if (!this.#socket.isOpen) {
throw new ClientClosedError();
}
if (options?.isolated) {
return this.executeIsolated(isolatedClient =>
isolatedClient.sendCommand(args, {
...options,
isolated: false
})
);
}
const promise = this.#queue.addCommand<T>(args, options, bufferMode);
this.#tick();
return await promise;
}
async scriptsExecutor(script: RedisLuaScript, args: Array<unknown>): Promise<ReturnType<typeof script['transformReply']>> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(script, args);
return script.transformReply(
await this.executeScript(script, redisArgs, options, script.BUFFER_MODE),
redisArgs.preserve
);
}
async executeScript(script: RedisLuaScript, args: TransformArgumentsReply, options?: ClientCommandOptions, bufferMode?: boolean): Promise<ReturnType<typeof script['transformReply']>> {
try {
return await this.#sendCommand([
'EVALSHA',
script.SHA1,
script.NUMBER_OF_KEYS.toString(),
...args
], options, bufferMode);
} catch (err: any) {
if (!err?.message?.startsWith?.('NOSCRIPT')) {
throw err;
}
return await this.#sendCommand([
'EVAL',
script.SCRIPT,
script.NUMBER_OF_KEYS.toString(),
...args
], options, bufferMode);
}
}
async SELECT(db: number): Promise<void>;
async SELECT(options: CommandOptions<ClientCommandOptions>, db: number): Promise<void>;
async SELECT(options?: any, db?: any): Promise<void> {
@@ -299,7 +393,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
QUIT(): Promise<void> {
return this.#socket.quit(() => {
const promise = this.#queue.addEncodedCommand(encodeCommand(['QUIT']));
const promise = this.#queue.addCommand(['QUIT']);
this.#tick();
return promise;
});
@@ -307,64 +401,43 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
quit = this.QUIT;
sendCommand<T = unknown>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
return this.#sendCommand(args, options);
#tick(): void {
if (!this.#socket.isSocketExists) {
return;
}
// using `#sendCommand` cause `sendCommand` is overwritten in legacy mode
#sendCommand<T = RedisReply>(args: Array<string>, options?: ClientCommandOptions): Promise<T> {
return this.sendEncodedCommand(encodeCommand(args), options);
this.#socket.cork();
while (true) {
const args = this.#queue.getCommandToSend();
if (args === undefined) break;
let writeResult;
for (const toWrite of encodeCommand(args)) {
writeResult = this.#socket.write(toWrite);
}
async sendEncodedCommand<T = RedisReply>(encodedCommand: string, options?: ClientCommandOptions): Promise<T> {
if (!this.#socket.isOpen) {
throw new ClientClosedError();
if (!writeResult) {
break;
}
if (options?.isolated) {
return this.executeIsolated(isolatedClient =>
isolatedClient.sendEncodedCommand(encodedCommand, {
...options,
isolated: false
})
);
}
const promise = this.#queue.addEncodedCommand<T>(encodedCommand, options);
this.#tick();
return await promise;
}
executeIsolated<T>(fn: (client: RedisClientType<M, S>) => T | Promise<T>): Promise<T> {
return this.#isolationPool.use(fn);
}
async executeScript(script: RedisLuaScript, args: Array<string>, options?: ClientCommandOptions): Promise<ReturnType<typeof script['transformReply']>> {
try {
return await this.#sendCommand([
'EVALSHA',
script.SHA1,
script.NUMBER_OF_KEYS.toString(),
...args
], options);
} catch (err: any) {
if (!err?.message?.startsWith?.('NOSCRIPT')) {
throw err;
}
return await this.#sendCommand([
'EVAL',
script.SCRIPT,
script.NUMBER_OF_KEYS.toString(),
...args
], options);
}
multi(): RedisMultiCommandType<M, S> {
return new (this as any).Multi(
this.#multiExecutor.bind(this),
this.#options
);
}
#multiExecutor(commands: Array<MultiQueuedCommand>, chainId?: symbol): Promise<Array<RedisReply>> {
const promise = Promise.all(
commands.map(({encodedCommand}) => {
return this.#queue.addEncodedCommand(encodedCommand, RedisClient.commandOptions({
commands.map(({ args }) => {
return this.#queue.addCommand(args, RedisClient.commandOptions({
chainId
}));
})
@@ -375,13 +448,6 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
return promise;
}
multi(): RedisMultiCommandType<M, S> {
return new (this as any).Multi(
this.#multiExecutor.bind(this),
this.#options
);
}
async* scanIterator(options?: ScanCommandOptions): AsyncIterable<string> {
let cursor = 0;
do {
@@ -438,32 +504,7 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
await this.#isolationPool.drain();
await this.#isolationPool.clear();
}
#isTickQueued = false;
#tick(): void {
const {chunkRecommendedSize} = this.#socket;
if (!chunkRecommendedSize) {
return;
}
if (!this.#isTickQueued && this.#queue.waitingToBeSentCommandsLength < chunkRecommendedSize) {
queueMicrotask(() => this.#tick());
this.#isTickQueued = true;
return;
}
const isBuffering = this.#queue.executeChunk(chunkRecommendedSize);
if (isBuffering === true) {
this.#socket.once('drain', () => this.#tick());
} else if (isBuffering === false) {
this.#tick();
return;
}
this.#isTickQueued = false;
}
}
extendWithDefaultCommands(RedisClient, RedisClient.commandsExecutor);
extendWithDefaultCommands(RedisClient, RedisClient.prototype.commandsExecutor);
(RedisClient.prototype as any).Multi = RedisMultiCommand.extend();

View File

@@ -172,7 +172,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
return value.client;
}
getClient(firstKey?: string, isReadonly?: boolean): RedisClientType<M, S> {
getClient(firstKey?: string | Buffer, isReadonly?: boolean): RedisClientType<M, S> {
if (!firstKey) {
return this.#getRandomClient();
}

View File

@@ -1,4 +1,4 @@
import { RedisCommand, RedisModules } from './commands';
import { RedisCommand, RedisModules, TransformArgumentsReply } from './commands';
import RedisClient, { ClientCommandOptions, RedisClientType, WithPlugins } from './client';
import { RedisSocketOptions } from './socket';
import RedisClusterSlots, { ClusterNode } from './cluster-slots';
@@ -15,11 +15,11 @@ export interface RedisClusterOptions<M = RedisModules, S = RedisLuaScripts> {
maxCommandRedirections?: number;
}
export type RedisClusterType<M extends RedisModules, S extends RedisLuaScripts> =
WithPlugins<M, S> & RedisCluster;
export type RedisClusterType<M extends RedisModules = {}, S extends RedisLuaScripts = {}> =
WithPlugins<M, S> & RedisCluster<M, S>;
export default class RedisCluster<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> extends EventEmitter {
static #extractFirstKey(command: RedisCommand, originalArgs: Array<unknown>, redisArgs: Array<string>): string | undefined {
export default class RedisCluster<M extends RedisModules = {}, S extends RedisLuaScripts = {}> extends EventEmitter {
static #extractFirstKey(command: RedisCommand, originalArgs: Array<unknown>, redisArgs: TransformArgumentsReply): string | Buffer | undefined {
if (command.FIRST_KEY_INDEX === undefined) {
return undefined;
} else if (typeof command.FIRST_KEY_INDEX === 'number') {
@@ -29,53 +29,13 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
return command.FIRST_KEY_INDEX(...originalArgs);
}
static async commandsExecutor(
this: RedisCluster,
command: RedisCommand,
args: Array<unknown>
): Promise<ReturnType<typeof command['transformReply']>> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(command, args);
const reply = command.transformReply(
await this.sendCommand(
RedisCluster.#extractFirstKey(command, args, redisArgs),
command.IS_READ_ONLY,
redisArgs,
options
),
redisArgs.preserve
);
return reply;
}
static async #scriptsExecutor(
this: RedisCluster,
script: RedisLuaScript,
args: Array<unknown>
): Promise<typeof script['transformArguments']> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(script, args);
const reply = script.transformReply(
await this.executeScript(
script,
args,
redisArgs,
options
),
redisArgs.preserve
);
return reply;
}
static create<M extends RedisModules, S extends RedisLuaScripts>(options?: RedisClusterOptions<M, S>): RedisClusterType<M, S> {
static create<M extends RedisModules = {}, S extends RedisLuaScripts = {}>(options?: RedisClusterOptions<M, S>): RedisClusterType<M, S> {
return new (<any>extendWithModulesAndScripts({
BaseClass: RedisCluster,
modules: options?.modules,
modulesCommandsExecutor: RedisCluster.commandsExecutor,
modulesCommandsExecutor: RedisCluster.prototype.commandsExecutor,
scripts: options?.scripts,
scriptsExecutor: RedisCluster.#scriptsExecutor
scriptsExecutor: RedisCluster.prototype.scriptsExecutor
}))(options);
}
@@ -91,37 +51,75 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
this.#Multi = RedisMultiCommand.extend(options);
}
duplicate(): RedisClusterOptions<M, S> {
return new (Object.getPrototypeOf(this).constructor)(this.#options);
}
async connect(): Promise<void> {
return this.#slots.connect();
}
async commandsExecutor(command: RedisCommand, args: Array<unknown>): Promise<ReturnType<typeof command['transformReply']>> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(command, args);
const reply = command.transformReply(
await this.sendCommand(
RedisCluster.#extractFirstKey(command, args, redisArgs),
command.IS_READ_ONLY,
redisArgs,
options,
command.BUFFER_MODE
),
redisArgs.preserve
);
return reply;
}
async sendCommand<C extends RedisCommand>(
firstKey: string | undefined,
firstKey: string | Buffer | undefined,
isReadonly: boolean | undefined,
args: Array<string>,
args: TransformArgumentsReply,
options?: ClientCommandOptions,
bufferMode?: boolean,
redirections = 0
): Promise<ReturnType<C['transformReply']>> {
const client = this.#slots.getClient(firstKey, isReadonly);
try {
return await client.sendCommand(args, options);
return await client.sendCommand(args, options, bufferMode);
} catch (err: any) {
const shouldRetry = await this.#handleCommandError(err, client, redirections);
if (shouldRetry === true) {
return this.sendCommand(firstKey, isReadonly, args, options, redirections + 1);
return this.sendCommand(firstKey, isReadonly, args, options, bufferMode, redirections + 1);
} else if (shouldRetry) {
return shouldRetry.sendCommand(args, options);
return shouldRetry.sendCommand(args, options, bufferMode);
}
throw err;
}
}
async scriptsExecutor(script: RedisLuaScript, args: Array<unknown>): Promise<ReturnType<typeof script['transformReply']>> {
const { args: redisArgs, options } = transformCommandArguments<ClientCommandOptions>(script, args);
const reply = script.transformReply(
await this.executeScript(
script,
args,
redisArgs,
options
),
redisArgs.preserve
);
return reply;
}
async executeScript(
script: RedisLuaScript,
originalArgs: Array<unknown>,
redisArgs: Array<string>,
redisArgs: TransformArgumentsReply,
options?: ClientCommandOptions,
redirections = 0
): Promise<ReturnType<typeof script['transformReply']>> {
@@ -131,13 +129,13 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
);
try {
return await client.executeScript(script, redisArgs, options);
return await client.executeScript(script, redisArgs, options, script.BUFFER_MODE);
} catch (err: any) {
const shouldRetry = await this.#handleCommandError(err, client, redirections);
if (shouldRetry === true) {
return this.executeScript(script, originalArgs, redisArgs, options, redirections + 1);
} else if (shouldRetry) {
return shouldRetry.executeScript(script, redisArgs, options);
return shouldRetry.executeScript(script, redisArgs, options, script.BUFFER_MODE);
}
throw err;
@@ -177,8 +175,8 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
const client = this.#slots.getClient(routing);
return Promise.all(
commands.map(({encodedCommand}) => {
return client.sendEncodedCommand(encodedCommand, RedisClient.commandOptions({
commands.map(({ args }) => {
return client.sendCommand(args, RedisClient.commandOptions({
chainId
}));
})
@@ -201,5 +199,4 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
}
}
extendWithDefaultCommands(RedisCluster, RedisCluster.commandsExecutor);
extendWithDefaultCommands(RedisCluster, RedisCluster.prototype.commandsExecutor);

View File

@@ -2,27 +2,43 @@ import { strict as assert } from 'assert';
import { describe } from 'mocha';
import { encodeCommand } from './commander';
function encodeCommandToString(...args: Parameters<typeof encodeCommand>): string {
const arr = [];
for (const item of encodeCommand(...args)) {
arr.push(item.toString());
}
return arr.join('');
}
describe('Commander', () => {
describe('encodeCommand (see #1628)', () => {
it('1 byte', () => {
assert.equal(
encodeCommand(['a', 'z']),
encodeCommandToString(['a', 'z']),
'*2\r\n$1\r\na\r\n$1\r\nz\r\n'
);
});
it('2 bytes', () => {
assert.equal(
encodeCommand(['א', 'ת']),
encodeCommandToString(['א', 'ת']),
'*2\r\n$2\r\nא\r\n$2\r\nת\r\n'
);
});
it('4 bytes', () => {
assert.equal(
encodeCommand(['🐣', '🐤']),
encodeCommandToString(['🐣', '🐤']),
'*2\r\n$4\r\n🐣\r\n$4\r\n🐤\r\n'
);
});
it('with a buffer', () => {
assert.equal(
encodeCommandToString([Buffer.from('string')]),
'*1\r\n$6\r\nstring\r\n'
);
});
});
});

View File

@@ -94,16 +94,15 @@ export function transformCommandArguments<T = unknown>(
};
}
export function encodeCommand(args: Array<string>): string {
const encoded = [
`*${args.length}`,
`$${Buffer.byteLength(args[0]).toString()}`,
args[0]
];
const DELIMITER = '\r\n';
for (let i = 1; i < args.length; i++) {
encoded.push(`$${Buffer.byteLength(args[i]).toString()}`, args[i]);
export function* encodeCommand(args: TransformArgumentsReply): IterableIterator<string | Buffer> {
yield `*${args.length}${DELIMITER}`;
for (const arg of args) {
const byteLength = typeof arg === 'string' ? Buffer.byteLength(arg): arg.length;
yield `$${byteLength.toString()}${DELIMITER}`;
yield arg;
yield DELIMITER;
}
return encoded.join('\r\n') + '\r\n';
}

View File

@@ -2,17 +2,15 @@ import LinkedList from 'yallist';
import RedisParser from 'redis-parser';
import { AbortError } from './errors';
import { RedisReply } from './commands';
import { encodeCommand } from './commander';
export interface QueueCommandOptions {
asap?: boolean;
signal?: any; // TODO: `AbortSignal` type is incorrect
chainId?: symbol;
signal?: any; // TODO: `AbortSignal` type is incorrect
}
interface CommandWaitingToBeSent extends CommandWaitingForReply {
encodedCommand: string;
byteLength: number;
args: Array<string | Buffer>;
chainId?: symbol;
abort?: {
signal: any; // TODO: `AbortSignal` type is incorrect
@@ -24,10 +22,9 @@ interface CommandWaitingForReply {
resolve(reply?: any): void;
reject(err: Error): void;
channelsCounter?: number;
bufferMode?: boolean;
}
export type CommandsQueueExecutor = (encodedCommands: string) => boolean | undefined;
export enum PubSubSubscribeCommands {
SUBSCRIBE = 'SUBSCRIBE',
PSUBSCRIBE = 'PSUBSCRIBE'
@@ -57,16 +54,8 @@ export default class RedisCommandsQueue {
readonly #maxLength: number | null | undefined;
readonly #executor: CommandsQueueExecutor;
readonly #waitingToBeSent = new LinkedList<CommandWaitingToBeSent>();
#waitingToBeSentCommandsLength = 0;
get waitingToBeSentCommandsLength() {
return this.#waitingToBeSentCommandsLength;
}
readonly #waitingForReply = new LinkedList<CommandWaitingForReply>();
readonly #pubSubState = {
@@ -114,12 +103,11 @@ export default class RedisCommandsQueue {
#chainInExecution: symbol | undefined;
constructor(maxLength: number | null | undefined, executor: CommandsQueueExecutor) {
constructor(maxLength: number | null | undefined) {
this.#maxLength = maxLength;
this.#executor = executor;
}
addEncodedCommand<T = RedisReply>(encodedCommand: string, options?: QueueCommandOptions): Promise<T> {
addCommand<T = RedisReply>(args: Array<string | Buffer>, options?: QueueCommandOptions, bufferMode?: boolean): Promise<T> {
if (this.#pubSubState.subscribing || this.#pubSubState.subscribed) {
return Promise.reject(new Error('Cannot send commands in PubSub mode'));
} else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
@@ -130,11 +118,11 @@ export default class RedisCommandsQueue {
return new Promise((resolve, reject) => {
const node = new LinkedList.Node<CommandWaitingToBeSent>({
encodedCommand,
byteLength: Buffer.byteLength(encodedCommand),
args,
chainId: options?.chainId,
bufferMode,
resolve,
reject
reject,
});
if (options?.signal) {
@@ -157,8 +145,6 @@ export default class RedisCommandsQueue {
} else {
this.#waitingToBeSent.pushNode(node);
}
this.#waitingToBeSentCommandsLength += node.value.byteLength;
});
}
@@ -185,8 +171,9 @@ export default class RedisCommandsQueue {
unsubscribe(command: PubSubUnsubscribeCommands, channels?: string | Array<string>, listener?: PubSubListener): Promise<void> {
const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns;
if (!channels) {
const size = listeners.size;
listeners.clear();
return this.#pushPubSubCommand(command);
return this.#pushPubSubCommand(command, size);
}
const channelsToUnsubscribe = [];
@@ -213,31 +200,24 @@ export default class RedisCommandsQueue {
return this.#pushPubSubCommand(command, channelsToUnsubscribe);
}
#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels?: Array<string>): Promise<void> {
#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array<string>): Promise<void> {
return new Promise((resolve, reject) => {
const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing',
commandArgs: Array<string> = [command];
let channelsCounter: number;
if (channels?.length) {
if (typeof channels === 'number') { // unsubscribe only
channelsCounter = channels;
} else {
commandArgs.push(...channels);
channelsCounter = channels.length;
} else {
// unsubscribe only
channelsCounter = (
command[0] === 'P' ?
this.#pubSubListeners.patterns :
this.#pubSubListeners.channels
).size;
}
this.#pubSubState[inProgressKey] += channelsCounter;
const encodedCommand = encodeCommand(commandArgs),
byteLength = Buffer.byteLength(encodedCommand);
this.#waitingToBeSent.push({
encodedCommand,
byteLength,
args: commandArgs,
channelsCounter,
resolve: () => {
this.#pubSubState[inProgressKey] -= channelsCounter;
@@ -249,7 +229,6 @@ export default class RedisCommandsQueue {
reject();
}
});
this.#waitingToBeSentCommandsLength += byteLength;
});
}
@@ -267,47 +246,25 @@ export default class RedisCommandsQueue {
]);
}
executeChunk(recommendedSize: number): boolean | undefined {
if (!this.#waitingToBeSent.length) return;
const encoded: Array<string> = [];
let size = 0,
lastCommandChainId: symbol | undefined;
for (const command of this.#waitingToBeSent) {
encoded.push(command.encodedCommand);
size += command.byteLength;
if (size > recommendedSize) {
lastCommandChainId = command.chainId;
break;
}
}
if (!lastCommandChainId && encoded.length === this.#waitingToBeSent.length) {
lastCommandChainId = this.#waitingToBeSent.tail!.value.chainId;
}
lastCommandChainId ??= this.#waitingToBeSent.tail?.value.chainId;
this.#executor(encoded.join(''));
for (let i = 0; i < encoded.length; i++) {
const waitingToBeSent = this.#waitingToBeSent.shift()!;
if (waitingToBeSent.abort) {
waitingToBeSent.abort.signal.removeEventListener('abort', waitingToBeSent.abort.listener);
}
getCommandToSend(): Array<string | Buffer> | undefined {
const toSend = this.#waitingToBeSent.shift();
if (toSend) {
this.#waitingForReply.push({
resolve: waitingToBeSent.resolve,
reject: waitingToBeSent.reject,
channelsCounter: waitingToBeSent.channelsCounter
resolve: toSend.resolve,
reject: toSend.reject,
channelsCounter: toSend.channelsCounter,
bufferMode: toSend.bufferMode
});
}
this.#chainInExecution = lastCommandChainId;
this.#waitingToBeSentCommandsLength -= size;
this.#chainInExecution = toSend?.chainId;
return toSend?.args;
}
parseResponse(data: Buffer): void {
this.#parser.setReturnBuffers(!!this.#waitingForReply.head?.value.bufferMode);
this.#parser.execute(data);
}

View File

@@ -1,6 +1,7 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export function transformArguments(username: string | Array<string>): Array<string> {
export function transformArguments(username: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['ACL', 'DELUSER'], username);
}

View File

@@ -1,6 +1,7 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyString } from './generic-transformers';
export function transformArguments(username: string, rule: string | Array<string>): Array<string> {
export function transformArguments(username: string, rule: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['ACL', 'SETUSER', username], rule);
}

View File

@@ -1,10 +1,11 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
type BitOperations = 'AND' | 'OR' | 'XOR' | 'NOT';
export function transformArguments(operation: BitOperations, destKey: string, key: string | Array<string>): Array<string> {
export function transformArguments(operation: BitOperations, destKey: string, key: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['BITOP', operation, destKey], key);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(keys: string | Array<string>, timeout: number): Array<string> {
export function transformArguments(keys: string | Buffer | Array<string | Buffer>, timeout: number): TransformArgumentsReply {
const args = pushVerdictArguments(['BLPOP'], keys);
args.push(timeout.toString());

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string | Array<string>, timeout: number): Array<string> {
export function transformArguments(key: string | Array<string>, timeout: number): TransformArgumentsReply {
const args = pushVerdictArguments(['BRPOP'], key);
args.push(timeout.toString());

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumberInfinity, ZMember } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string | Array<string>, timeout: number): Array<string> {
export function transformArguments(key: string | Array<string>, timeout: number): TransformArgumentsReply {
const args = pushVerdictArguments(['BZPOPMAX'], key);
args.push(timeout.toString());

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumberInfinity, ZMember } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string | Array<string>, timeout: number): Array<string> {
export function transformArguments(key: string | Array<string>, timeout: number): TransformArgumentsReply {
const args = pushVerdictArguments(['BZPOPMIN'], key);
args.push(timeout.toString());

View File

@@ -0,0 +1,76 @@
import { strict as assert } from 'assert';
import { transformArguments, transformReply } from './CLUSTER_SLOTS';
describe('CLUSTER SLOTS', () => {
it('transformArguments', () => {
assert.deepEqual(
transformArguments(),
['CLUSTER', 'SLOTS']
);
});
it('transformReply', () => {
assert.deepEqual(
transformReply([
[
0,
5460,
['127.0.0.1', 30001, '09dbe9720cda62f7865eabc5fd8857c5d2678366'],
['127.0.0.1', 30004, '821d8ca00d7ccf931ed3ffc7e3db0599d2271abf']
],
[
5461,
10922,
['127.0.0.1', 30002, 'c9d93d9f2c0c524ff34cc11838c2003d8c29e013'],
['127.0.0.1', 30005, 'faadb3eb99009de4ab72ad6b6ed87634c7ee410f']
],
[
10923,
16383,
['127.0.0.1', 30003, '044ec91f325b7595e76dbcb18cc688b6a5b434a1'],
['127.0.0.1', 30006, '58e6e48d41228013e5d9c1c37c5060693925e97e']
]
]),
[{
from: 0,
to: 5460,
master: {
ip: '127.0.0.1',
port: 30001,
id: '09dbe9720cda62f7865eabc5fd8857c5d2678366'
},
replicas: [{
ip: '127.0.0.1',
port: 30004,
id: '821d8ca00d7ccf931ed3ffc7e3db0599d2271abf'
}]
}, {
from: 5461,
to: 10922,
master: {
ip: '127.0.0.1',
port: 30002,
id: 'c9d93d9f2c0c524ff34cc11838c2003d8c29e013'
},
replicas: [{
ip: '127.0.0.1',
port: 30005,
id: 'faadb3eb99009de4ab72ad6b6ed87634c7ee410f'
}]
}, {
from: 10923,
to: 16383,
master: {
ip: '127.0.0.1',
port: 30003,
id: '044ec91f325b7595e76dbcb18cc688b6a5b434a1'
},
replicas: [{
ip: '127.0.0.1',
port: 30006,
id: '58e6e48d41228013e5d9c1c37c5060693925e97e'
}]
}]
)
});
});

View File

@@ -0,0 +1,41 @@
import { TransformArgumentsReply } from '.';
export function transformArguments(): TransformArgumentsReply {
return ['CLUSTER', 'SLOTS'];
}
type ClusterSlotsRawNode = [ip: string, port: number, id: string];
type ClusterSlotsRawReply = Array<[from: number, to: number, master: ClusterSlotsRawNode, ...replicas: Array<ClusterSlotsRawNode>]>;
type ClusterSlotsNode = {
ip: string;
port: number;
id: string;
};
export type ClusterSlotsReply = Array<{
from: number;
to: number;
master: ClusterSlotsNode;
replicas: Array<ClusterSlotsNode>;
}>;
export function transformReply(reply: ClusterSlotsRawReply): ClusterSlotsReply {
return reply.map(([from, to, master, ...replicas]) => {
return {
from,
to,
master: transformNode(master),
replicas: replicas.map(transformNode)
};
});
}
function transformNode([ip, port, id]: ClusterSlotsRawNode): ClusterSlotsNode {
return {
ip,
port,
id
};
}

View File

@@ -1,6 +1,7 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export function transformArguments(keys: string | Array<string>): Array<string> {
export function transformArguments(keys: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['DEL'], keys);
}

View File

@@ -1,10 +1,11 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyBoolean } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(keys: string | Array<string>): Array<string> {
export function transformArguments(keys: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['EXISTS'], keys);
}

View File

@@ -1,10 +1,11 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyStringArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(key: string, member: string | Array<string>): Array<string> {
export function transformArguments(key: string, member: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['GEOHASH', key], member);
}

View File

@@ -1,6 +1,6 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient, TestRedisClusters, itWithCluster } from '../test-utils';
import { transformArguments } from './GEOPOS';
import { transformArguments, transformReply } from './GEOPOS';
describe('GEOPOS', () => {
describe('transformArguments', () => {
@@ -19,13 +19,51 @@ describe('GEOPOS', () => {
});
});
itWithClient(TestRedisServers.OPEN, 'client.geoPos', async client => {
describe('transformReply', () => {
it('null', () => {
assert.deepEqual(
transformReply([null]),
[null]
);
});
it('with member', () => {
assert.deepEqual(
transformReply([['1', '2']]),
[{
longitude: '1',
latitude: '2'
}]
);
});
});
describe('client.geoPos', () => {
itWithClient(TestRedisServers.OPEN, 'null', async client => {
assert.deepEqual(
await client.geoPos('key', 'member'),
[null]
);
});
itWithClient(TestRedisServers.OPEN, 'with member', async client => {
const coordinates = {
longitude: '-122.06429868936538696',
latitude: '37.37749628831998194'
};
await client.geoAdd('key', {
member: 'member',
...coordinates
});
assert.deepEqual(
await client.geoPos('key', 'member'),
[coordinates]
);
});
});
itWithCluster(TestRedisClusters.OPEN, 'cluster.geoPos', async cluster => {
assert.deepEqual(
await cluster.geoPos('key', 'member'),

View File

@@ -1,10 +1,11 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(key: string, member: string | Array<string>): Array<string> {
export function transformArguments(key: string, member: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['GEOPOS', key], member);
}

View File

@@ -1,6 +1,6 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient, TestRedisClusters, itWithCluster, describeHandleMinimumRedisVersion } from '../test-utils';
import { transformArguments } from './GEOSEARCHSTORE';
import { transformArguments, transformReply } from './GEOSEARCHSTORE';
describe('GEOSEARCHSTORE', () => {
describeHandleMinimumRedisVersion([6, 2]);
@@ -40,6 +40,13 @@ describe('GEOSEARCHSTORE', () => {
});
});
it('transformReply with empty array (https://github.com/redis/redis/issues/9261)', () => {
assert.throws(
() => (transformReply as any)([]),
TypeError
);
});
itWithClient(TestRedisServers.OPEN, 'client.geoSearchStore', async client => {
await client.geoAdd('source', {
longitude: 1,

View File

@@ -1,11 +1,12 @@
import { transformReplyString } from './generic-transformers';
import { TransformArgumentsReply } from '.';
import { transformReplyStringNull } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(key: string): Array<string> {
export function transformArguments(key: string | Buffer): TransformArgumentsReply {
return ['GET', key];
}
export const transformReply = transformReplyString;
export const transformReply = transformReplyStringNull;

View File

@@ -1,3 +1,4 @@
import { TransformArgumentsReply } from '.';
import { transformEXAT, transformPXAT, transformReplyStringNull } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
@@ -14,7 +15,7 @@ type GetExModes = {
PERSIST: true;
};
export function transformArguments(key: string, mode: GetExModes) {
export function transformArguments(key: string, mode: GetExModes): TransformArgumentsReply {
const args = ['GETEX', key];
if ('EX' in mode) {

View File

@@ -0,0 +1,22 @@
import { strict as assert } from 'assert';
import { TestRedisServers, itWithClient, TestRedisClusters, itWithCluster } from '../test-utils';
describe('GET_BUFFER', () => {
itWithClient(TestRedisServers.OPEN, 'client.getBuffer', async client => {
const buffer = Buffer.from('string');
await client.set('key', buffer);
assert.deepEqual(
buffer,
await client.getBuffer('key')
);
});
itWithCluster(TestRedisClusters.OPEN, 'cluster.getBuffer', async cluster => {
const buffer = Buffer.from('string');
await cluster.set('key', buffer);
assert.deepEqual(
buffer,
await cluster.getBuffer('key')
);
});
});

View File

@@ -0,0 +1,7 @@
import { transformReplyBufferNull } from './generic-transformers';
export { FIRST_KEY_INDEX, IS_READ_ONLY, transformArguments } from './GET';
export const BUFFER_MODE = true;
export const transformReply = transformReplyBufferNull;

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, field: string | Array<string>): Array<string> {
export function transformArguments(key: string, field: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['HDEL', key], field);
}

View File

@@ -1,10 +1,11 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyStringArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(key: string, fields: string | Array<string>): Array<string> {
export function transformArguments(key: string, fields: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['HMGET', key], fields);
}

View File

@@ -4,6 +4,13 @@ import { TestRedisServers, itWithClient, TestRedisClusters, itWithCluster } from
describe('HSET', () => {
describe('transformArguments', () => {
it('field, value', () => {
assert.deepEqual(
transformArguments('key', 'field', 'value'),
['HSET', 'key', 'field', 'value']
);
});
it('Map', () => {
assert.deepEqual(
transformArguments('key', new Map([['field', 'value']])),
@@ -30,7 +37,7 @@ describe('HSET', () => {
itWithClient(TestRedisServers.OPEN, 'client.hSet', async client => {
assert.equal(
await client.hSet('key', { field: 'value' }),
await client.hSet('key', 'field', 'value'),
1
);
});

View File

@@ -1,3 +1,4 @@
import { TransformArgumentsReply } from '.';
import { transformReplyString } from './generic-transformers';
type HSETObject = Record<string | number, string | number>;
@@ -8,10 +9,18 @@ type HSETTuples = Array<[string, string]> | Array<string>;
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, value: HSETObject | HSETMap | HSETTuples): Array<string> {
type GenericArguments = [key: string];
type SingleFieldArguments = [...generic: GenericArguments, field: string, value: string];
type MultipleFieldsArguments = [...generic: GenericArguments, value: HSETObject | HSETMap | HSETTuples];
export function transformArguments(...[ key, value, fieldValue ]: SingleFieldArguments | MultipleFieldsArguments): TransformArgumentsReply {
const args = ['HSET', key];
if (value instanceof Map) {
if (typeof value === 'string') {
args.push(value, fieldValue!);
} else if (value instanceof Map) {
pushMap(args, value);
} else if (Array.isArray(value)) {
pushTuples(args, value);

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, elements: string | Array<string>): Array<string> {
export function transformArguments(key: string, elements: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['LPUSH', key], elements);}
export const transformReply = transformReplyNumber;

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, element: string | Array<string>): Array<string> {
export function transformArguments(key: string, element: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['LPUSHX', key], element);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyBoolean } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, element: string | Array<string>): Array<string> {
export function transformArguments(key: string, element: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['PFADD', key], element);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string | Array<string>): Array<string> {
export function transformArguments(key: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['PFCOUNT'], key);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyString } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(destination: string, source: string | Array<string>): Array<string> {
export function transformArguments(destination: string, source: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['PFMERGE', destination], source);
}

View File

@@ -33,7 +33,7 @@ describe('PUBSUB NUMSUB', () => {
);
});
itWithCluster(TestRedisClusters.OPEN, 'cluster.pubSubNumPat', async cluster => {
itWithCluster(TestRedisClusters.OPEN, 'cluster.pubSubNumSub', async cluster => {
assert.deepEqual(
await cluster.pubSubNumSub(),
Object.create(null)

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, element: string | Array<string>): Array<string> {
export function transformArguments(key: string, element: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['RPUSH', key], element);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, element: string | Array<string>): Array<string> {
export function transformArguments(key: string, element: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['RPUSHX', key], element);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, members: string | Array<string>): Array<string> {
export function transformArguments(key: string, members: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['SADD', key], members);
}

View File

@@ -1,6 +1,7 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyBooleanArray } from './generic-transformers';
export function transformArguments(sha1: string | Array<string>): Array<string> {
export function transformArguments(sha1: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['SCRIPT', 'EXISTS'], sha1);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyStringArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(keys: string | Array<string>): Array<string> {
export function transformArguments(keys: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['SDIFF'], keys);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(destination: string, keys: string | Array<string>): Array<string> {
export function transformArguments(destination: string, keys: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['SDIFFSTORE', destination], keys);
}

View File

@@ -1,3 +1,5 @@
import { TransformArgumentsReply } from '.';
export const FIRST_KEY_INDEX = 1;
interface EX {
@@ -38,7 +40,7 @@ interface SetCommonOptions {
type SetOptions = SetTTL & SetGuards & (SetCommonOptions | {});
export function transformArguments(key: string, value: string, options?: SetOptions): Array<string> {
export function transformArguments(key: string | Buffer, value: string | Buffer, options?: SetOptions): TransformArgumentsReply {
const args = ['SET', key, value];
if (!options) {

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { BitValue, transformReplyBit } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, offset: number, value: BitValue) {
export function transformArguments(key: string, offset: number, value: BitValue): TransformArgumentsReply {
return ['SETBIT', key, offset.toString(), value.toString()];
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { transformReplyString } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, seconds: number, value: string): Array<string> {
export function transformArguments(key: string | Buffer, seconds: number, value: string): TransformArgumentsReply {
return [
'SETEX',
key,

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyStringArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(keys: string | Array<string>): Array<string> {
export function transformArguments(keys: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['SINTER'], keys);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyStringArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(destination: string, keys: string | Array<string>): Array<string> {
export function transformArguments(destination: string, keys: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['SINTERSTORE', destination], keys);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, members: string | Array<string>): Array<string> {
export function transformArguments(key: string, members: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['SREM', key], members);
}

View File

@@ -1,10 +1,11 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyStringArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(keys: string | Array<string>): Array<string> {
export function transformArguments(keys: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['SUNION'], keys);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(destination: string, keys: string | Array<string>): Array<string> {
export function transformArguments(destination: string, keys: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['SUNIONSTORE', destination], keys);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string | Array<string>): Array<string> {
export function transformArguments(key: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['TOUCH'], key);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string | Array<string>): Array<string> {
export function transformArguments(key: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['UNLINK'], key);
}

View File

@@ -1,6 +1,7 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyString } from './generic-transformers';
export function transformArguments(key: string | Array<string>): Array<string> {
export function transformArguments(key: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['WATCH'], key);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, group: string, id: string | Array<string>): Array<string> {
export function transformArguments(key: string, group: string, id: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['XACK', key, group], id);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, id: string | Array<string>): Array<string> {
export function transformArguments(key: string, id: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['XDEL', key], id);
}

View File

@@ -1,10 +1,11 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArgument, transformReplyStringArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
export const IS_READ_ONLY = true;
export function transformArguments(keys: Array<string> | string): Array<string> {
export function transformArguments(keys: Array<string> | string): TransformArgumentsReply {
return pushVerdictArgument(['ZDIFF'], keys);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArgument, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(destination: string, keys: Array<string> | string): Array<string> {
export function transformArguments(destination: string, keys: Array<string> | string): TransformArgumentsReply {
return pushVerdictArgument(['ZDIFFSTORE', destination], keys);
}

View File

@@ -1,9 +1,10 @@
import { TransformArgumentsReply } from '.';
import { transformReplySortedSetWithScores } from './generic-transformers';
import { transformArguments as transformZDiffArguments } from './ZDIFF';
export { FIRST_KEY_INDEX, IS_READ_ONLY } from './ZDIFF';
export function transformArguments(...args: Parameters<typeof transformZDiffArguments>): Array<string> {
export function transformArguments(...args: Parameters<typeof transformZDiffArguments>): TransformArgumentsReply {
return [
...transformZDiffArguments(...args),
'WITHSCORES'

View File

@@ -1,3 +1,4 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArgument, transformReplyStringArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
@@ -9,7 +10,7 @@ interface ZInterOptions {
AGGREGATE?: 'SUM' | 'MIN' | 'MAX';
}
export function transformArguments(keys: Array<string> | string, options?: ZInterOptions): Array<string> {
export function transformArguments(keys: Array<string> | string, options?: ZInterOptions): TransformArgumentsReply {
const args = pushVerdictArgument(['ZINTER'], keys);
if (options?.WEIGHTS) {

View File

@@ -1,3 +1,4 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArgument, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
@@ -7,7 +8,7 @@ interface ZInterStoreOptions {
AGGREGATE?: 'SUM' | 'MIN' | 'MAX';
}
export function transformArguments(destination: string, keys: Array<string> | string, options?: ZInterStoreOptions): Array<string> {
export function transformArguments(destination: string, keys: Array<string> | string, options?: ZInterStoreOptions): TransformArgumentsReply {
const args = pushVerdictArgument(['ZINTERSTORE', destination], keys);
if (options?.WEIGHTS) {

View File

@@ -1,9 +1,10 @@
import { TransformArgumentsReply } from '.';
import { transformReplySortedSetWithScores } from './generic-transformers';
import { transformArguments as transformZInterArguments } from './ZINTER';
export { FIRST_KEY_INDEX, IS_READ_ONLY } from './ZINTER';
export function transformArguments(...args: Parameters<typeof transformZInterArguments>): Array<string> {
export function transformArguments(...args: Parameters<typeof transformZInterArguments>): TransformArgumentsReply {
return [
...transformZInterArguments(...args),
'WITHSCORES'

View File

@@ -1,10 +1,11 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumberInfinityNullArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(key: string, member: string | Array<string>): Array<string> {
export function transformArguments(key: string, member: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['ZMSCORE', key], member);
}

View File

@@ -1,8 +1,9 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArguments, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
export function transformArguments(key: string, member: string | Array<string>): Array<string> {
export function transformArguments(key: string, member: string | Array<string>): TransformArgumentsReply {
return pushVerdictArguments(['ZREM', key], member);
}

View File

@@ -1,3 +1,4 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArgument, transformReplyStringArray } from './generic-transformers';
export const FIRST_KEY_INDEX = 2;
@@ -9,7 +10,7 @@ interface ZUnionOptions {
AGGREGATE?: 'SUM' | 'MIN' | 'MAX';
}
export function transformArguments(keys: Array<string> | string, options?: ZUnionOptions): Array<string> {
export function transformArguments(keys: Array<string> | string, options?: ZUnionOptions): TransformArgumentsReply {
const args = pushVerdictArgument(['ZUNION'], keys);
if (options?.WEIGHTS) {

View File

@@ -1,3 +1,4 @@
import { TransformArgumentsReply } from '.';
import { pushVerdictArgument, transformReplyNumber } from './generic-transformers';
export const FIRST_KEY_INDEX = 1;
@@ -7,7 +8,7 @@ interface ZUnionOptions {
AGGREGATE?: 'SUM' | 'MIN' | 'MAX';
}
export function transformArguments(destination: string, keys: Array<string> | string, options?: ZUnionOptions): Array<string> {
export function transformArguments(destination: string, keys: Array<string> | string, options?: ZUnionOptions): TransformArgumentsReply {
const args = pushVerdictArgument(['ZUNIONSTORE', destination], keys);
if (options?.WEIGHTS) {

View File

@@ -1,9 +1,10 @@
import { TransformArgumentsReply } from '.';
import { transformReplySortedSetWithScores } from './generic-transformers';
import { transformArguments as transformZUnionArguments } from './ZUNION';
export { FIRST_KEY_INDEX, IS_READ_ONLY } from './ZUNION';
export function transformArguments(...args: Parameters<typeof transformZUnionArguments>): Array<string> {
export function transformArguments(...args: Parameters<typeof transformZUnionArguments>): TransformArgumentsReply {
return [
...transformZUnionArguments(...args),
'WITHSCORES'

View File

@@ -50,6 +50,10 @@ export function transformReplyBit(reply: BitValue): BitValue {
return reply;
}
export function transformReplyBufferNull(reply: Buffer | null): Buffer | null {
return reply;
}
export function transformReplyVoid(): void {}
export interface ScanOptions {
@@ -352,11 +356,11 @@ export function pushStringTuplesArguments(args: Array<string>, tuples: StringTup
return args;
}
export function pushVerdictArguments(args: TransformArgumentsReply, value: string | Array<string>): TransformArgumentsReply {
if (typeof value === 'string') {
args.push(value);
} else {
export function pushVerdictArguments(args: TransformArgumentsReply, value: string | Buffer | Array<string | Buffer>): TransformArgumentsReply {
if (Array.isArray(value)) {
args.push(...value);
} else {
args.push(value);
}
return args;

View File

@@ -34,6 +34,7 @@ import * as CLUSTER_NODES from './CLUSTER_NODES';
import * as CLUSTER_MEET from './CLUSTER_MEET';
import * as CLUSTER_RESET from './CLUSTER_RESET';
import * as CLUSTER_SETSLOT from './CLUSTER_SETSLOT';
import * as CLUSTER_SLOTS from './CLUSTER_SLOTS';
import * as CONFIG_GET from './CONFIG_GET';
import * as CONFIG_RESETASTAT from './CONFIG_RESETSTAT';
import * as CONFIG_REWRITE from './CONFIG_REWRITE';
@@ -61,6 +62,7 @@ import * as GEOPOS from './GEOPOS';
import * as GEOSEARCH_WITH from './GEOSEARCH_WITH';
import * as GEOSEARCH from './GEOSEARCH';
import * as GEOSEARCHSTORE from './GEOSEARCHSTORE';
import * as GET_BUFFER from './GET_BUFFER';
import * as GET from './GET';
import * as GETBIT from './GETBIT';
import * as GETDEL from './GETDEL';
@@ -316,6 +318,8 @@ export default {
clusterReset: CLUSTER_RESET,
CLUSTER_SETSLOT,
clusterSetSlot: CLUSTER_SETSLOT,
CLUSTER_SLOTS,
clusterSlots: CLUSTER_SLOTS,
CONFIG_GET,
configGet: CONFIG_GET,
CONFIG_RESETASTAT,
@@ -370,6 +374,8 @@ export default {
geoSearch: GEOSEARCH,
GEOSEARCHSTORE,
geoSearchStore: GEOSEARCHSTORE,
GET_BUFFER,
getBuffer: GET_BUFFER,
GET,
get: GET,
GETBIT,
@@ -733,15 +739,16 @@ export default {
zUnionStore: ZUNIONSTORE
};
export type RedisReply = string | number | Array<RedisReply> | null | undefined;
export type RedisReply = string | number | Buffer | Array<RedisReply> | null | undefined;
export type TransformArgumentsReply = Array<string> & { preserve?: unknown };
export type TransformArgumentsReply = Array<string | Buffer> & { preserve?: unknown };
export interface RedisCommand {
FIRST_KEY_INDEX?: number | ((...args: Array<any>) => string);
IS_READ_ONLY?: boolean;
transformArguments(...args: Array<any>): TransformArgumentsReply;
transformReply(reply: RedisReply, preserved: unknown): any;
transformArguments(this: void, ...args: Array<any>): TransformArgumentsReply;
BUFFER_MODE?: boolean;
transformReply(this: void, reply: RedisReply, preserved?: unknown): any;
}
export interface RedisCommands {
@@ -749,7 +756,10 @@ export interface RedisCommands {
}
export interface RedisModule {
[key: string]: RedisCommand;
[command: string]: RedisCommand;
}
export type RedisModules = Record<string, RedisModule>;
export interface RedisModules {
[module: string]: RedisModule;
}
// export type RedisModules = Record<string, RedisModule>;

View File

@@ -13,10 +13,10 @@ export interface SHA1 {
export type RedisLuaScript = RedisLuaScriptConfig & SHA1;
export interface RedisLuaScripts {
[key: string]: RedisLuaScript;
[script: string]: RedisLuaScript;
}
export function defineScript<S extends RedisLuaScriptConfig>(script: S): S & SHA1 {
export function defineScript(script: RedisLuaScriptConfig): typeof script & SHA1 {
return {
...script,
SHA1: scriptSha1(script.SCRIPT)

View File

@@ -1,6 +1,5 @@
import { strict as assert } from 'assert';
import RedisMultiCommand from './multi-command';
import { encodeCommand } from './commander';
import { WatchError } from './errors';
import { spy } from 'sinon';
import { SQUARE_SCRIPT } from './client.spec';
@@ -10,11 +9,11 @@ describe('Multi Command', () => {
it('simple', async () => {
const multi = RedisMultiCommand.create((queue, symbol) => {
assert.deepEqual(
queue.map(({encodedCommand}) => encodedCommand),
queue.map(({ args }) => args),
[
encodeCommand(['MULTI']),
encodeCommand(['PING']),
encodeCommand(['EXEC']),
['MULTI'],
['PING'],
['EXEC'],
]
);
@@ -55,8 +54,8 @@ describe('Multi Command', () => {
it('execAsPipeline', async () => {
const multi = RedisMultiCommand.create(queue => {
assert.deepEqual(
queue.map(({encodedCommand}) => encodedCommand),
[encodeCommand(['PING'])]
queue.map(({ args }) => args),
[['PING']]
);
return Promise.resolve(['PONG']);
@@ -75,8 +74,8 @@ describe('Multi Command', () => {
it('simple', async () => {
const multi = RedisMultiCommand.create(queue => {
assert.deepEqual(
queue.map(({encodedCommand}) => encodedCommand),
[encodeCommand(['PING'])]
queue.map(({ args }) => args),
[['PING']]
);
return Promise.resolve(['PONG']);
@@ -111,10 +110,10 @@ describe('Multi Command', () => {
assert.deepEqual(
await new MultiWithScript(queue => {
assert.deepEqual(
queue.map(({encodedCommand}) => encodedCommand),
queue.map(({ args }) => args),
[
encodeCommand(['EVAL', SQUARE_SCRIPT.SCRIPT, '0', '2']),
encodeCommand(['EVALSHA', SQUARE_SCRIPT.SHA1, '0', '3']),
['EVAL', SQUARE_SCRIPT.SCRIPT, '0', '2'],
['EVALSHA', SQUARE_SCRIPT.SHA1, '0', '3'],
]
);

View File

@@ -2,7 +2,7 @@ import COMMANDS, { TransformArgumentsReply } from './commands';
import { RedisCommand, RedisModules, RedisReply } from './commands';
import { RedisLuaScript, RedisLuaScripts } from './lua-script';
import { RedisClientOptions } from './client';
import { extendWithModulesAndScripts, extendWithDefaultCommands, encodeCommand } from './commander';
import { extendWithModulesAndScripts, extendWithDefaultCommands } from './commander';
import { WatchError } from './errors';
type RedisMultiCommandSignature<C extends RedisCommand, M extends RedisModules, S extends RedisLuaScripts> = (...args: Parameters<C['transformArguments']>) => RedisMultiCommandType<M, S>;
@@ -21,68 +21,31 @@ type WithScripts<M extends RedisModules, S extends RedisLuaScripts> = {
[P in keyof S]: RedisMultiCommandSignature<S[P], M, S>
};
export type RedisMultiCommandType<M extends RedisModules, S extends RedisLuaScripts> = RedisMultiCommand & WithCommands<M, S> & WithModules<M, S> & WithScripts<M, S>;
export type RedisMultiCommandType<M extends RedisModules = {}, S extends RedisLuaScripts = {}> =
RedisMultiCommand<M, S> & WithCommands<M, S> & WithModules<M, S> & WithScripts<M, S>;
export interface MultiQueuedCommand {
encodedCommand: string;
args: TransformArgumentsReply;
preservedArguments?: unknown;
transformReply?: RedisCommand['transformReply'];
}
export type RedisMultiExecutor = (queue: Array<MultiQueuedCommand>, chainId?: symbol) => Promise<Array<RedisReply>>;
export default class RedisMultiCommand<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> {
static commandsExecutor(this: RedisMultiCommand, command: RedisCommand, args: Array<unknown>): RedisMultiCommand {
return this.addCommand(
command.transformArguments(...args),
command.transformReply
);
}
static #scriptsExecutor(
this: RedisMultiCommand,
script: RedisLuaScript,
args: Array<unknown>
): RedisMultiCommand {
const transformedArguments: TransformArgumentsReply = [];
if (this.#scriptsInUse.has(script.SHA1)) {
transformedArguments.push(
'EVALSHA',
script.SHA1
);
} else {
this.#scriptsInUse.add(script.SHA1);
transformedArguments.push(
'EVAL',
script.SCRIPT
);
}
transformedArguments.push(script.NUMBER_OF_KEYS.toString());
const scriptArguments = script.transformArguments(...args);
transformedArguments.push(...scriptArguments);
transformedArguments.preserve = scriptArguments.preserve;
return this.addCommand(
transformedArguments,
script.transformReply
);
}
export default class RedisMultiCommand<M extends RedisModules, S extends RedisLuaScripts> {
static extend<M extends RedisModules, S extends RedisLuaScripts>(
clientOptions?: RedisClientOptions<M, S>
): new (...args: ConstructorParameters<typeof RedisMultiCommand>) => RedisMultiCommandType<M, S> {
return <any>extendWithModulesAndScripts({
BaseClass: RedisMultiCommand,
modules: clientOptions?.modules,
modulesCommandsExecutor: RedisMultiCommand.commandsExecutor,
modulesCommandsExecutor: RedisMultiCommand.prototype.commandsExecutor,
scripts: clientOptions?.scripts,
scriptsExecutor: RedisMultiCommand.#scriptsExecutor
scriptsExecutor: RedisMultiCommand.prototype.scriptsExecutor
});
}
static create<M extends RedisModules, S extends RedisLuaScripts>(
static create<M extends RedisModules = {}, S extends RedisLuaScripts = {}>(
executor: RedisMultiExecutor,
clientOptions?: RedisClientOptions<M, S>
): RedisMultiCommandType<M, S> {
@@ -119,7 +82,7 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
this.#v4.addCommand = this.addCommand.bind(this);
(this as any).addCommand = (...args: Array<unknown>): this => {
this.#queue.push({
encodedCommand: encodeCommand(args.flat() as Array<string>)
args: args.flat() as Array<string>
});
return this;
}
@@ -151,9 +114,45 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
(this as any)[name] = (...args: Array<unknown>): void => (this as any).addCommand(name, args);
}
commandsExecutor(command: RedisCommand, args: Array<unknown>): this {
return this.addCommand(
command.transformArguments(...args),
command.transformReply
);
}
scriptsExecutor(script: RedisLuaScript, args: Array<unknown>): this {
const transformedArguments: TransformArgumentsReply = [];
if (this.#scriptsInUse.has(script.SHA1)) {
transformedArguments.push(
'EVALSHA',
script.SHA1
);
} else {
this.#scriptsInUse.add(script.SHA1);
transformedArguments.push(
'EVAL',
script.SCRIPT
);
}
transformedArguments.push(script.NUMBER_OF_KEYS.toString());
const scriptArguments = script.transformArguments(...args);
transformedArguments.push(...scriptArguments);
if (scriptArguments.preserve) {
transformedArguments.preserve = scriptArguments.preserve;
}
return this.addCommand(
transformedArguments,
script.transformReply
);
}
addCommand(args: TransformArgumentsReply, transformReply?: RedisCommand['transformReply']): this {
this.#queue.push({
encodedCommand: encodeCommand(args),
args,
preservedArguments: args.preserve,
transformReply
});
@@ -170,13 +169,9 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
const queue = this.#queue.splice(0),
rawReplies = await this.#executor([
{
encodedCommand: encodeCommand(['MULTI'])
},
{ args: ['MULTI'] },
...queue,
{
encodedCommand: encodeCommand(['EXEC'])
}
{ args: ['EXEC'] }
], Symbol('[RedisMultiCommand] Chain ID')),
execReply = rawReplies[rawReplies.length - 1] as (null | Array<RedisReply>);
@@ -207,4 +202,4 @@ export default class RedisMultiCommand<M extends RedisModules = RedisModules, S
}
}
extendWithDefaultCommands(RedisMultiCommand, RedisMultiCommand.commandsExecutor);
extendWithDefaultCommands(RedisMultiCommand, RedisMultiCommand.prototype.commandsExecutor);

View File

@@ -1,13 +1,10 @@
import EventEmitter from 'events';
import net from 'net';
import tls from 'tls';
import { URL } from 'url';
import { ConnectionTimeoutError, ClientClosedError } from './errors';
import { promiseTimeout } from './utils';
export interface RedisSocketCommonOptions {
username?: string;
password?: string;
connectTimeout?: number;
noDelay?: boolean;
keepAlive?: number | false;
@@ -19,10 +16,6 @@ export interface RedisNetSocketOptions extends RedisSocketCommonOptions {
host?: string;
}
export interface RedisUrlSocketOptions extends RedisSocketCommonOptions {
url: string;
}
export interface RedisUnixSocketOptions extends RedisSocketCommonOptions {
path: string;
}
@@ -31,7 +24,7 @@ export interface RedisTlsSocketOptions extends RedisNetSocketOptions, tls.Secure
tls: true;
}
export type RedisSocketOptions = RedisNetSocketOptions | RedisUrlSocketOptions | RedisUnixSocketOptions | RedisTlsSocketOptions;
export type RedisSocketOptions = RedisNetSocketOptions | RedisUnixSocketOptions | RedisTlsSocketOptions;
interface CreateSocketReturn<T> {
connectEvent: string;
@@ -44,14 +37,6 @@ export default class RedisSocket extends EventEmitter {
static #initiateOptions(options?: RedisSocketOptions): RedisSocketOptions {
options ??= {};
if (!RedisSocket.#isUnixSocket(options)) {
if (RedisSocket.#isUrlSocket(options)) {
const url = new URL(options.url);
(options as RedisNetSocketOptions).port = Number(url.port);
(options as RedisNetSocketOptions).host = url.hostname;
options.username = url.username;
options.password = url.password;
}
(options as RedisNetSocketOptions).port ??= 6379;
(options as RedisNetSocketOptions).host ??= '127.0.0.1';
}
@@ -67,10 +52,6 @@ export default class RedisSocket extends EventEmitter {
return Math.min(retries * 50, 500);
}
static #isUrlSocket(options: RedisSocketOptions): options is RedisUrlSocketOptions {
return Object.prototype.hasOwnProperty.call(options, 'url');
}
static #isUnixSocket(options: RedisSocketOptions): options is RedisUnixSocketOptions {
return Object.prototype.hasOwnProperty.call(options, 'path');
}
@@ -91,10 +72,8 @@ export default class RedisSocket extends EventEmitter {
return this.#isOpen;
}
get chunkRecommendedSize(): number {
if (!this.#socket) return 0;
return this.#socket.writableHighWaterMark - this.#socket.writableLength;
get isSocketExists(): boolean {
return !!this.#socket;
}
constructor(initiator?: RedisSocketInitiator, options?: RedisSocketOptions) {
@@ -214,12 +193,12 @@ export default class RedisSocket extends EventEmitter {
.catch(err => this.emit('error', err));
}
write(encodedCommands: string): boolean {
write(toWrite: string | Buffer): boolean {
if (!this.#socket) {
throw new ClientClosedError();
}
return this.#socket.write(encodedCommands);
return this.#socket.write(toWrite);
}
async disconnect(ignoreIsOpen = false): Promise<void> {
@@ -251,4 +230,22 @@ export default class RedisSocket extends EventEmitter {
throw err;
}
}
#isCorked = false;
cork(): void {
if (!this.#socket) {
return;
}
if (!this.#isCorked) {
this.#socket.cork();
this.#isCorked = true;
queueMicrotask(() => {
this.#socket?.uncork();
this.#isCorked = false;
});
}
}
}

View File

@@ -1,7 +1,5 @@
import { strict as assert } from 'assert';
import RedisClient, { RedisClientType } from './client';
import { RedisModules } from './commands';
import { RedisLuaScripts } from './lua-script';
import RedisClient, { RedisClientOptions, RedisClientType } from './client';
import { execSync, spawn } from 'child_process';
import { once } from 'events';
import { RedisSocketOptions } from './socket';
@@ -11,6 +9,8 @@ import RedisCluster, { RedisClusterType } from './cluster';
import { promises as fs } from 'fs';
import { Context as MochaContext } from 'mocha';
import { promiseTimeout } from './utils';
import { RedisModules } from './commands';
import { RedisLuaScripts } from './lua-script';
type RedisVersion = [major: number, minor: number, patch: number];
@@ -54,7 +54,7 @@ export enum TestRedisServers {
PASSWORD
}
export const TEST_REDIS_SERVERS: Record<TestRedisServers, RedisSocketOptions> = <any>{};
export const TEST_REDIS_SERVERS: Record<TestRedisServers, RedisClientOptions<RedisModules, RedisLuaScripts>> = <any>{};
export enum TestRedisClusters {
OPEN
@@ -112,7 +112,7 @@ async function spawnGlobalRedisServer(args?: Array<string>): Promise<number> {
const SLOTS = 16384;
interface SpawnRedisClusterNodeResult extends SpawnRedisServerResult {
client: RedisClientType<RedisModules, RedisLuaScripts>
client: RedisClientType
}
async function spawnRedisClusterNode(
@@ -228,13 +228,17 @@ export async function spawnGlobalRedisCluster(type: TestRedisClusters | null, nu
async function spawnOpenServer(): Promise<void> {
TEST_REDIS_SERVERS[TestRedisServers.OPEN] = {
socket: {
port: await spawnGlobalRedisServer()
}
};
}
async function spawnPasswordServer(): Promise<void> {
TEST_REDIS_SERVERS[TestRedisServers.PASSWORD] = {
socket: {
port: await spawnGlobalRedisServer(['--requirepass', 'password']),
},
password: 'password'
};
@@ -281,15 +285,13 @@ export function describeHandleMinimumRedisVersion(minimumVersion: PartialRedisVe
export function itWithClient(
type: TestRedisServers,
title: string,
fn: (client: RedisClientType<RedisModules, RedisLuaScripts>) => Promise<void>,
fn: (client: RedisClientType) => Promise<void>,
options?: RedisTestOptions
): void {
it(title, async function () {
if (handleMinimumRedisVersion(this, options?.minimumRedisVersion)) return;
const client = RedisClient.create({
socket: TEST_REDIS_SERVERS[type]
});
const client = RedisClient.create(TEST_REDIS_SERVERS[type]);
await client.connect();
@@ -306,7 +308,7 @@ export function itWithClient(
export function itWithCluster(
type: TestRedisClusters,
title: string,
fn: (cluster: RedisClusterType<RedisModules, RedisLuaScripts>) => Promise<void>,
fn: (cluster: RedisClusterType) => Promise<void>,
options?: RedisTestOptions
): void {
it(title, async function () {
@@ -328,7 +330,7 @@ export function itWithCluster(
});
}
export function itWithDedicatedCluster(title: string, fn: (cluster: RedisClusterType<RedisModules, RedisLuaScripts>) => Promise<void>): void {
export function itWithDedicatedCluster(title: string, fn: (cluster: RedisClusterType) => Promise<void>): void {
it(title, async function () {
this.timeout(10000);

View File

@@ -1,3 +1,3 @@
declare module 'cluster-key-slot' {
export default function calculateSlot(key: string): number;
export default function calculateSlot(key: string | Buffer): number;
}

View File

@@ -8,6 +8,8 @@ declare module 'redis-parser' {
export default class RedisParser {
constructor(callbacks: RedisParserCallbacks);
setReturnBuffers(returnBuffers?: boolean): void;
execute(buffer: Buffer): void;
}
}

880
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
{
"name": "redis",
"version": "4.0.0-rc.1",
"version": "4.0.0-rc.2",
"description": "A high performance Redis client.",
"keywords": [
"database",
@@ -35,20 +35,20 @@
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.1",
"@types/mocha": "^9.0.0",
"@types/node": "^16.7.10",
"@types/sinon": "^10.0.2",
"@types/node": "^16.9.6",
"@types/sinon": "^10.0.3",
"@types/which": "^2.0.1",
"@types/yallist": "^4.0.1",
"mocha": "^9.1.1",
"nyc": "^15.1.0",
"release-it": "^14.11.5",
"release-it": "^14.11.6",
"sinon": "^11.1.2",
"source-map-support": "^0.5.19",
"source-map-support": "^0.5.20",
"ts-node": "^10.2.1",
"typedoc": "^0.21.9",
"typedoc": "0.21.9",
"typedoc-github-wiki-theme": "^0.5.1",
"typedoc-plugin-markdown": "^3.10.4",
"typescript": "^4.4.2",
"typedoc-plugin-markdown": "3.10.4",
"typescript": "^4.4.3",
"which": "^2.0.2"
},
"engines": {