diff --git a/.github/README.md b/.github/README.md new file mode 100644 index 0000000000..c704680510 --- /dev/null +++ b/.github/README.md @@ -0,0 +1,298 @@ +# Node-Redis + +[![Tests](https://img.shields.io/github/workflow/status/redis/node-redis/Tests/master.svg?label=tests)](https://codecov.io/gh/redis/node-redis) +[![Coverage](https://codecov.io/gh/redis/node-redis/branch/master/graph/badge.svg?token=xcfqHhJC37)](https://codecov.io/gh/redis/node-redis) +[![License](https://img.shields.io/github/license/redis/node-redis.svg)](https://codecov.io/gh/redis/node-redis) +[![Chat](https://img.shields.io/discord/697882427875393627.svg)](https://discord.gg/XMMVgxUm) + +## Installation + +```bash +npm install redis@next +``` + +> :warning: The new interface is clean and cool, but if you have an existing code base, you'll want to read the [migration guide](../docs/v3-to-v4.md). + +## Usage + +### Basic Example + +```typescript +import { createClient } from 'redis'; + +(async () => { + const client = createClient(); + + client.on('error', (err) => console.log('Redis Client Error', err)); + + await client.connect(); + + await client.set('key', 'value'); + const value = await client.get('key'); +})(); +``` + +The above code connects to localhost on port 6379. To connect to a different host or port, use a connection string in the format `redis[s]://[[username][:password]@][host][:port][/db-number]`: + +```typescript +createClient({ + url: 'redis://alice:foobared@awesome.redis.server:6380' +}); +``` + +You can also use discrete parameters, UNIX sockets, and even TLS to connect. Details can be found in the [client configuration guide](../docs/client-configuration.md). + +### Redis Commands + +There is built-in support for all of the [out-of-the-box Redis commands](https://redis.io/commands). They are exposed using the raw Redis command names (`HSET`, `HGETALL`, etc.) and a friendlier camel-cased version (`hSet`, `hGetAll`, etc.): + +```typescript +// raw Redis commands +await client.HSET('key', 'field', 'value'); +await client.HGETALL('key'); + +// friendly JavaScript commands +await client.hSet('key', 'field', 'value'); +await client.hGetAll('key'); +``` + +Modifiers to commands are specified using a JavaScript object: + +```typescript +await client.set('key', 'value', { + EX: 10, + NX: true +}); +``` + +Replies will be transformed into useful data structures: + +```typescript +await client.hGetAll('key'); // { field1: 'value1', field2: 'value2' } +await client.hVals('key'); // ['value1', 'value2'] +``` + +### Unsupported Redis Commands + +If you want to run commands and/or use arguments that Node Redis doesn't know about (yet!) use `.sendCommand()`: + +```typescript +await client.sendCommand(['SET', 'key', 'value', 'NX']); // 'OK' + +await client.sendCommand(['HGETALL', 'key']); // ['key1', 'field1', 'key2', 'field2'] +``` + +### Transactions (Multi/Exec) + +Start a [transaction](https://redis.io/topics/transactions) by calling `.multi()`, then chaining your commands. When you're done, call `.exec()` and you'll get an array back with your results: + +```typescript +await client.set('another-key', 'another-value'); + +const [setKeyReply, otherKeyValue] = await client + .multi() + .set('key', 'value') + .get('another-key') + .exec(); // ['OK', 'another-value'] +``` + +You can also [watch](https://redis.io/topics/transactions#optimistic-locking-using-check-and-set) keys by calling `.watch()`. Your transaction will abort if any of the watched keys change. + +To dig deeper into transactions, check out the [Isolated Execution Guide](../docs/isolated-execution.md). + +### Blocking Commands + +Any command can be run on a new connection by specifying the `isolated` option. The newly created connection is closed when the command's `Promise` is fulfilled. + +This pattern works especially well for blocking commands—such as `BLPOP` and `BLMOVE`: + +```typescript +import { commandOptions } from 'redis'; + +const blPopPromise = client.blPop(commandOptions({ isolated: true }), 'key', 0); + +await client.lPush('key', ['1', '2']); + +await blPopPromise; // '2' +``` + +To learn more about isolated execution, check out the [guide](../docs/isolated-execution.md). + +### Pub/Sub + +Subscribing to a channel requires a dedicated stand-alone connection. You can easily get one by `.duplicate()`ing an existing Redis connection. + +```typescript +const subscriber = client.duplicate(); + +await subscriber.connect(); +``` + +Once you have one, simply subscribe and unsubscribe as needed: + +```typescript +await subscriber.subscribe('channel', (message) => { + console.log(message); // 'message' +}); + +await subscriber.pSubscribe('channe*', (message, channel) => { + console.log(message, channel); // 'message', 'channel' +}); + +await subscriber.unsubscribe('channel'); + +await subscriber.pUnsubscribe('channe*'); +``` + +Publish a message on a channel: + +```typescript +await publisher.publish('channel', 'message'); +``` + +### Scan Iterator + +[`SCAN`](https://redis.io/commands/scan) results can be looped over using [async iterators](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/asyncIterator): + +```typescript +for await (const key of client.scanIterator()) { + // use the key! + await client.get(key); +} +``` + +This works with `HSCAN`, `SSCAN`, and `ZSCAN` too: + +```typescript +for await (const { field, value } of client.hScanIterator('hash')) {} +for await (const member of client.sScanIterator('set')) {} +for await (const { score, member } of client.zScanIterator('sorted-set')) {} +``` + +You can override the default options by providing a configuration object: + +```typescript +client.scanIterator({ + TYPE: 'string', // `SCAN` only + MATCH: 'patter*', + COUNT: 100 +}); +``` + +### Lua Scripts + +Define new functions using [Lua scripts](https://redis.io/commands/eval) which execute on the Redis server: + +```typescript +import { createClient, defineScript } from 'redis'; + +(async () => { + const client = createClient({ + scripts: { + add: defineScript({ + NUMBER_OF_KEYS: 1, + SCRIPT: + 'local val = redis.pcall("GET", KEYS[1]);' + + 'return val + ARGV[1];', + transformArguments(key: string, toAdd: number): Array { + return [key, toAdd.toString()]; + }, + transformReply(reply: number): number { + return reply; + } + }) + } + }); + + await client.connect(); + + await client.set('key', '1'); + await client.add('key', 2); // 3 +})(); +``` + +### Disconnecting + +There are two functions that disconnect a client from the Redis server. In most scenarios you should use `.quit()` to ensure that pending commands are sent to Redis before closing a connection. + +#### `.QUIT()`/`.quit()` + +Gracefully close a client's connection to Redis, by sending the [`QUIT`](https://redis.io/commands/quit) command to the server. Before quitting, the client executes any remaining commands in its queue, and will receive replies from Redis for each of them. + +```typescript +const [ping, get, quit] = await Promise.all([ + client.ping(), + client.get('key'), + client.quit() +]); // ['PONG', null, 'OK'] + +try { + await client.get('key'); +} catch (err) { + // ClosedClient Error +} +``` + +#### `.disconnect()` + +Forcibly close a client's connection to Redis immediately. Calling `disconnect` will not send further pending commands to the Redis server, or wait for or parse outstanding responses. + +```typescript +await client.disconnect(); +``` + +### Auto-Pipelining + +Node Redis will automatically pipeline requests that are made during the same "tick". + +```typescript +client.set('Tm9kZSBSZWRpcw==', 'users:1'); +client.sAdd('users:1:tokens', 'Tm9kZSBSZWRpcw=='); +``` + +Of course, if you don't do something with your Promises you're certain to get [unhandled Promise exceptions](https://nodejs.org/api/process.html#process_event_unhandledrejection). To take advantage of auto-pipelining and handle your Promises, use `Promise.all()`. + +```typescript +await Promise.all([ + client.set('Tm9kZSBSZWRpcw==', 'users:1'), + client.sAdd('users:1:tokens', 'Tm9kZSBSZWRpcw==') +]); +``` + +### Clustering + +Check out the [Clustering Guide](../docs/clustering.md) when using Node Redis to connect to a Redis Cluster. + +## Supported Redis versions + +Node Redis is supported with the following versions of Redis: + +| Version | Supported | +|---------|--------------------| +| 6.2.z | :heavy_check_mark: | +| 6.0.z | :heavy_check_mark: | +| 5.y.z | :heavy_check_mark: | +| < 5.0 | :x: | + +> Node Redis should work with older versions of Redis, but it is not fully tested and we cannot offer support. + +## Packages + +| Name | Description | +|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| [redis](../) | [![Downloads](https://img.shields.io/npm/dm/redis.svg)](https://www.npmjs.com/package/redis/v/next) [![Version](https://img.shields.io/npm/v/redis/next.svg)](https://www.npmjs.com/package/redis/v/next) | +| [@node-redis/client](../packages/client) | [![Downloads](https://img.shields.io/npm/dm/@node-redis/client.svg)](https://www.npmjs.com/package/@node-redis/client/v/next) [![Version](https://img.shields.io/npm/v/@node-redis/client/next.svg)](https://www.npmjs.com/package/@node-redis/client/v/next) | +| [@node-redis/json](../packages/json) | [![Downloads](https://img.shields.io/npm/dm/@node-redis/json.svg)](https://www.npmjs.com/package/@node-redis/json/v/next) [![Version](https://img.shields.io/npm/v/@node-redis/json/next.svg)](https://www.npmjs.com/package/@node-redis/json/v/next) [Redis JSON](https://oss.redis.com/redisjson/) commands | +| [@node-redis/search](../packages/search) | [![Downloads](https://img.shields.io/npm/dm/@node-redis/search.svg)](https://www.npmjs.com/package/@node-redis/search/v/next) [![Version](https://img.shields.io/npm/v/@node-redis/search/next.svg)](https://www.npmjs.com/package/@node-redis/search/v/next) [Redis Search](https://oss.redis.com/redisearch/) commands | + +## Contributing + +If you'd like to contribute, check out the [contributing guide](CONTRIBUTING.md). + +Thank you to all the people who already contributed to Node Redis! + +[![Contributors](https://contrib.rocks/image?repo=redis/node-redis)](https://github.com/redis/node-redis/graphs/contributors) + +## License + +This repository is licensed under the "MIT" license. See [LICENSE](LICENSE). diff --git a/examples/search+json.js b/examples/search+json.js new file mode 100644 index 0000000000..adc298289c --- /dev/null +++ b/examples/search+json.js @@ -0,0 +1,74 @@ +// Use Redis Search and Redis JSON + +import { createClient, SchemaFieldTypes, AggregateGroupByReducers, AggregateSteps } from 'redis'; + +async function searchPlusJson() { + const client = createClient(); + + await client.connect(); + + // Create an index + await client.ft.create('users', { + '$.name': { + type: SchemaFieldTypes.TEXT, + SORTABLE: 'UNF' + }, + '$.age': SchemaFieldTypes.NUMERIC, + '$.coins': SchemaFieldTypes.NUMERIC + }, { + ON: 'JSON' + }); + + // Add some users + await Promise.all([ + client.json.set('users:1', '$', { + name: 'Alice', + age: 32, + coins: 100 + }), + client.json.set('users:2', '$', { + name: 'Bob', + age: 23, + coins: 15 + }) + ]); + + // Search all users under 30 + // TODO: why "$.age:[-inf, 30]" does not work? + console.log( + await client.ft.search('users', '*') + ); + // { + // total: 1, + // documents: [...] + // } + + // Some aggrigrations + console.log( + await client.ft.aggregate('users', '*', { + STEPS: [{ + type: AggregateSteps.GROUPBY, + REDUCE: [{ + type: AggregateGroupByReducers.AVG, + property: '$.age', + AS: 'avarageAge' + }, { + type: AggregateGroupByReducers.SUM, + property: '$.coins', + AS: 'totalCoins' + }] + }] + }) + ); + // { + // total: 2, + // results: [{ + // avarageAvg: '27.5', + // totalCoins: '115' + // }] + // } + + await client.quit(); +} + +searchPlusJson(); diff --git a/package-lock.json b/package-lock.json index c6dec588c5..570bb53cba 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2836,6 +2836,9 @@ "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", "integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==", "dev": true, + "dependencies": { + "type-fest": "^0.20.2" + }, "engines": { "node": ">=10" }, @@ -7497,6 +7500,31 @@ "eslint-visitor-keys": "^3.0.0" } }, + "@typescript-eslint/typescript-estree": { + "version": "5.4.0", + "resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-5.4.0.tgz", + "integrity": "sha512-nhlNoBdhKuwiLMx6GrybPT3SFILm5Gij2YBdPEPFlYNFAXUJWX6QRgvi/lwVoadaQEFsizohs6aFRMqsXI2ewA==", + "dev": true, + "requires": { + "@typescript-eslint/types": "5.4.0", + "@typescript-eslint/visitor-keys": "5.4.0", + "debug": "^4.3.2", + "globby": "^11.0.4", + "is-glob": "^4.0.3", + "semver": "^7.3.5", + "tsutils": "^3.21.0" + } + }, + "@typescript-eslint/visitor-keys": { + "version": "5.4.0", + "resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-5.4.0.tgz", + "integrity": "sha512-PVbax7MeE7tdLfW5SA0fs8NGVVr+buMPrcj+CWYWPXsZCH8qZ1THufDzbXm1xrZ2b2PA1iENJ0sRq5fuUtvsJg==", + "dev": true, + "requires": { + "@typescript-eslint/types": "5.4.0", + "eslint-visitor-keys": "^3.0.0" + } + }, "@ungap/promise-all-settled": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/@ungap/promise-all-settled/-/promise-all-settled-1.1.2.tgz", diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 908c58d23a..4fcae1e8b6 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -39,29 +39,9 @@ export enum PubSubUnsubscribeCommands { PUNSUBSCRIBE = 'PUNSUBSCRIBE' } -type PubSubArgumentTypes = Buffer | string; +export type PubSubListener = (message: string, channel: string) => unknown; -export type PubSubListener< - BUFFER_MODE extends boolean = false, - T = BUFFER_MODE extends true ? Buffer : string -> = (message: T, channel: T) => unknown; - -interface PubSubListeners { - buffers: Set>; - strings: Set>; -} - -type PubSubListenersMap = Map; - -interface PubSubState { - subscribing: number; - subscribed: number; - unsubscribing: number; - listeners: { - channels: PubSubListenersMap; - patterns: PubSubListenersMap; - }; -} +export type PubSubListenersMap = Map>; export default class RedisCommandsQueue { static #flushQueue(queue: LinkedList, err: Error): void { @@ -70,20 +50,10 @@ export default class RedisCommandsQueue { } } - static #emitPubSubMessage(listenersMap: PubSubListenersMap, message: Buffer, channel: Buffer, pattern?: Buffer): void { - const keyString = (pattern || channel).toString(), - listeners = listenersMap.get(keyString)!; - for (const listener of listeners.buffers) { + static #emitPubSubMessage(listeners: Set, message: string, channel: string): void { + for (const listener of listeners) { listener(message, channel); } - - if (!listeners.strings.size) return; - - const messageString = message.toString(), - channelString = pattern ? channel.toString() : keyString; - for (const listener of listeners.strings) { - listener(messageString, channelString); - } } readonly #maxLength: number | null | undefined; @@ -92,43 +62,41 @@ export default class RedisCommandsQueue { readonly #waitingForReply = new LinkedList(); - #pubSubState: PubSubState | undefined; + readonly #pubSubState = { + subscribing: 0, + subscribed: 0, + unsubscribing: 0 + }; - static readonly #PUB_SUB_MESSAGES = { - message: Buffer.from('message'), - pMessage: Buffer.from('pmessage'), - subscribe: Buffer.from('subscribe'), - pSubscribe: Buffer.from('psubscribe'), - unsubscribe: Buffer.from('unsunscribe'), - pUnsubscribe: Buffer.from('punsubscribe') + readonly #pubSubListeners = { + channels: new Map(), + patterns: new Map() }; readonly #parser = new RedisParser({ returnReply: (reply: unknown) => { - if (this.#pubSubState && Array.isArray(reply)) { - if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) { - return RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubState.listeners.channels, - reply[2], - reply[1] - ); - } else if (RedisCommandsQueue.#PUB_SUB_MESSAGES.pMessage.equals(reply[0])) { - return RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubState.listeners.patterns, - reply[3], - reply[2], - reply[1] - ); - } else if ( - RedisCommandsQueue.#PUB_SUB_MESSAGES.subscribe.equals(reply[0]) || - RedisCommandsQueue.#PUB_SUB_MESSAGES.pSubscribe.equals(reply[0]) || - RedisCommandsQueue.#PUB_SUB_MESSAGES.unsubscribe.equals(reply[0]) || - RedisCommandsQueue.#PUB_SUB_MESSAGES.pUnsubscribe.equals(reply[0]) - ) { - if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { - this.#shiftWaitingForReply().resolve(); - } - return; + if ((this.#pubSubState.subscribing || this.#pubSubState.subscribed) && Array.isArray(reply)) { + switch (reply[0]) { + case 'message': + return RedisCommandsQueue.#emitPubSubMessage( + this.#pubSubListeners.channels.get(reply[1])!, + reply[2], + reply[1] + ); + + case 'pmessage': + return RedisCommandsQueue.#emitPubSubMessage( + this.#pubSubListeners.patterns.get(reply[1])!, + reply[3], + reply[2] + ); + + case 'subscribe': + case 'psubscribe': + if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { + this.#shiftWaitingForReply().resolve(); + } + return; } } @@ -144,7 +112,7 @@ export default class RedisCommandsQueue { } addCommand(args: RedisCommandArguments, options?: QueueCommandOptions, bufferMode?: boolean): Promise { - if (this.#pubSubState) { + if (this.#pubSubState.subscribing || this.#pubSubState.subscribed) { return Promise.reject(new Error('Cannot send commands in PubSub mode')); } else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) { return Promise.reject(new Error('The queue is full')); @@ -158,7 +126,7 @@ export default class RedisCommandsQueue { chainId: options?.chainId, bufferMode, resolve, - reject + reject, }); if (options?.signal) { @@ -185,41 +153,17 @@ export default class RedisCommandsQueue { }); } - #initiatePubSubState(): PubSubState { - return this.#pubSubState ??= { - subscribed: 0, - subscribing: 0, - unsubscribing: 0, - listeners: { - channels: new Map(), - patterns: new Map() - } - }; - } - - subscribe( - command: PubSubSubscribeCommands, - channels: PubSubArgumentTypes | Array, - listener: PubSubListener, - bufferMode?: T - ): Promise { - const pubSubState = this.#initiatePubSubState(), - channelsToSubscribe: Array = [], - listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? pubSubState.listeners.channels : pubSubState.listeners.patterns; + subscribe(command: PubSubSubscribeCommands, channels: string | Array, listener: PubSubListener): Promise { + const channelsToSubscribe: Array = [], + listeners = command === PubSubSubscribeCommands.SUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns; for (const channel of (Array.isArray(channels) ? channels : [channels])) { - const channelString = typeof channel === 'string' ? channel : channel.toString(); - let listeners = listenersMap.get(channelString); - if (!listeners) { - listeners = { - buffers: new Set(), - strings: new Set() - }; - listenersMap.set(channelString, listeners); - channelsToSubscribe.push(channel); + if (listeners.has(channel)) { + listeners.get(channel)!.add(listener); + continue; } - // https://github.com/microsoft/TypeScript/issues/23132 - (bufferMode ? listeners.buffers : listeners.strings).add(listener as any); + listeners.set(channel, new Set([listener])); + channelsToSubscribe.push(channel); } if (!channelsToSubscribe.length) { @@ -229,20 +173,8 @@ export default class RedisCommandsQueue { return this.#pushPubSubCommand(command, channelsToSubscribe); } - unsubscribe( - command: PubSubUnsubscribeCommands, - channels?: string | Array, - listener?: PubSubListener, - bufferMode?: T - ): Promise { - if (!this.#pubSubState) { - return Promise.resolve(); - } - - const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? - this.#pubSubState.listeners.channels : - this.#pubSubState.listeners.patterns; - + unsubscribe(command: PubSubUnsubscribeCommands, channels?: string | Array, listener?: PubSubListener): Promise { + const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns; if (!channels) { const size = listeners.size; listeners.clear(); @@ -251,16 +183,13 @@ export default class RedisCommandsQueue { const channelsToUnsubscribe = []; for (const channel of (Array.isArray(channels) ? channels : [channels])) { - const sets = listeners.get(channel); - if (!sets) continue; + const set = listeners.get(channel); + if (!set) continue; - let shouldUnsubscribe; + let shouldUnsubscribe = !listener; if (listener) { - // https://github.com/microsoft/TypeScript/issues/23132 - (bufferMode ? sets.buffers : sets.strings).delete(listener as any); - shouldUnsubscribe = !sets.buffers.size && !sets.strings.size; - } else { - shouldUnsubscribe = true; + set.delete(listener); + shouldUnsubscribe = set.size === 0; } if (shouldUnsubscribe) { @@ -276,12 +205,11 @@ export default class RedisCommandsQueue { return this.#pushPubSubCommand(command, channelsToUnsubscribe); } - #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array): Promise { + #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array): Promise { return new Promise((resolve, reject) => { - const pubSubState = this.#initiatePubSubState(), - isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, + const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing', - commandArgs: Array = [command]; + commandArgs: Array = [command]; let channelsCounter: number; if (typeof channels === 'number') { // unsubscribe only @@ -291,26 +219,18 @@ export default class RedisCommandsQueue { channelsCounter = channels.length; } - pubSubState[inProgressKey] += channelsCounter; + this.#pubSubState[inProgressKey] += channelsCounter; this.#waitingToBeSent.push({ args: commandArgs, channelsCounter, - bufferMode: true, resolve: () => { - pubSubState[inProgressKey] -= channelsCounter; - if (isSubscribe) { - pubSubState.subscribed += channelsCounter; - } else { - pubSubState.subscribed -= channelsCounter; - if (!pubSubState.subscribed && !pubSubState.subscribing && !pubSubState.subscribed) { - this.#pubSubState = undefined; - } - } + this.#pubSubState[inProgressKey] -= channelsCounter; + this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1); resolve(); }, reject: () => { - pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1); + this.#pubSubState[inProgressKey] -= channelsCounter; reject(); } }); @@ -318,14 +238,16 @@ export default class RedisCommandsQueue { } resubscribe(): Promise | undefined { - if (!this.#pubSubState) { + if (!this.#pubSubState.subscribed && !this.#pubSubState.subscribing) { return; } + this.#pubSubState.subscribed = this.#pubSubState.subscribing = 0; + // TODO: acl error on one channel/pattern will reject the whole command return Promise.all([ - this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubState.listeners.channels.keys()]), - this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubState.listeners.patterns.keys()]) + this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubListeners.channels.keys()]), + this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubListeners.patterns.keys()]) ]); } @@ -347,10 +269,7 @@ export default class RedisCommandsQueue { } parseResponse(data: Buffer): void { - this.#parser.setReturnBuffers( - !!this.#waitingForReply.head?.value.bufferMode || - !!this.#pubSubState?.subscribed - ); + this.#parser.setReturnBuffers(!!this.#waitingForReply.head?.value.bufferMode); this.#parser.execute(data); } diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 41e7526eb2..3f0bca45e2 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -561,27 +561,17 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('PubSub', async publisher => { - function assertStringListener(message: string, channel: string) { - assert.ok(typeof message === 'string'); - assert.ok(typeof channel === 'string'); - } - - function assertBufferListener(message: Buffer, channel: Buffer) { - assert.ok(Buffer.isBuffer(message)); - assert.ok(Buffer.isBuffer(channel)); - } - const subscriber = publisher.duplicate(); await subscriber.connect(); try { - const channelListener1 = spy(assertBufferListener), - channelListener2 = spy(assertStringListener), - patternListener = spy(assertStringListener); + const channelListener1 = spy(), + channelListener2 = spy(), + patternListener = spy(); await Promise.all([ - subscriber.subscribe('channel', channelListener1, true), + subscriber.subscribe('channel', channelListener1), subscriber.subscribe('channel', channelListener2), subscriber.pSubscribe('channel*', patternListener) ]); @@ -590,14 +580,14 @@ describe('Client', () => { waitTillBeenCalled(channelListener1), waitTillBeenCalled(channelListener2), waitTillBeenCalled(patternListener), - publisher.publish(Buffer.from('channel'), Buffer.from('message')) + publisher.publish('channel', 'message') ]); - assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel'))); + assert.ok(channelListener1.calledOnceWithExactly('message', 'channel')); assert.ok(channelListener2.calledOnceWithExactly('message', 'channel')); assert.ok(patternListener.calledOnceWithExactly('message', 'channel')); - await subscriber.unsubscribe('channel', channelListener1, true); + await subscriber.unsubscribe('channel', channelListener1); await Promise.all([ waitTillBeenCalled(channelListener2), waitTillBeenCalled(patternListener), diff --git a/packages/client/lib/commands/LINDEX.spec.ts b/packages/client/lib/commands/LINDEX.spec.ts index fcabf2ba65..5e0b1473ec 100644 --- a/packages/client/lib/commands/LINDEX.spec.ts +++ b/packages/client/lib/commands/LINDEX.spec.ts @@ -5,32 +5,21 @@ import { transformArguments } from './LINDEX'; describe('LINDEX', () => { it('transformArguments', () => { assert.deepEqual( - transformArguments('key', 0), - ['LINDEX', 'key', '0'] + transformArguments('key', 'element'), + ['LINDEX', 'key', 'element'] ); }); - describe('client.lIndex', () => { - testUtils.testWithClient('null', async client => { - assert.equal( - await client.lIndex('key', 0), - null - ); - }, GLOBAL.SERVERS.OPEN); - - testUtils.testWithClient('with value', async client => { - const [, lIndexReply] = await Promise.all([ - client.lPush('key', 'element'), - client.lIndex('key', 0) - ]); - - assert.equal(lIndexReply, 'element'); - }, GLOBAL.SERVERS.OPEN); - }); + testUtils.testWithClient('client.lIndex', async client => { + assert.equal( + await client.lIndex('key', 'element'), + null + ); + }, GLOBAL.SERVERS.OPEN); testUtils.testWithCluster('cluster.lIndex', async cluster => { assert.equal( - await cluster.lIndex('key', 0), + await cluster.lIndex('key', 'element'), null ); }, GLOBAL.CLUSTERS.OPEN); diff --git a/packages/client/lib/commands/LINDEX.ts b/packages/client/lib/commands/LINDEX.ts index 6c31cf57cf..4c283f0912 100644 --- a/packages/client/lib/commands/LINDEX.ts +++ b/packages/client/lib/commands/LINDEX.ts @@ -2,8 +2,8 @@ export const FIRST_KEY_INDEX = 1; export const IS_READ_ONLY = true; -export function transformArguments(key: string, index: number): Array { - return ['LINDEX', key, index.toString()]; +export function transformArguments(key: string, element: string): Array { + return ['LINDEX', key, element]; } export declare function transformReply(): string | null; diff --git a/packages/client/lib/commands/PUBLISH.ts b/packages/client/lib/commands/PUBLISH.ts index cbfcaabd1c..eda5234df2 100644 --- a/packages/client/lib/commands/PUBLISH.ts +++ b/packages/client/lib/commands/PUBLISH.ts @@ -1,6 +1,4 @@ -import { RedisCommandArguments } from '.'; - -export function transformArguments(channel: string | Buffer, message: string | Buffer): RedisCommandArguments { +export function transformArguments(channel: string, message: string): Array { return ['PUBLISH', channel, message]; } diff --git a/packages/json/README.md b/packages/json/README.md index 5686f852d0..5b6d5ba8ce 100644 --- a/packages/json/README.md +++ b/packages/json/README.md @@ -77,4 +77,4 @@ And we can add a new object to the pets array with the [`JSON.ARRAPPEND`](https: age: 1, isMammal: false }); -``` \ No newline at end of file +``` diff --git a/packages/json/lib/commands/index.ts b/packages/json/lib/commands/index.ts index a79a5370e4..a3c561addc 100644 --- a/packages/json/lib/commands/index.ts +++ b/packages/json/lib/commands/index.ts @@ -86,7 +86,7 @@ export function transformRedisJsonNullReply(json: string | null): RedisJSON | nu export function transformRedisJsonNullArrayNullReply(jsons: Array | null): Array | null { if (jsons === null) return null; - + return jsons.map(transformRedisJsonNullReply); } diff --git a/packages/search/README.md b/packages/search/README.md index f13ca05eb0..f54316d3c1 100644 --- a/packages/search/README.md +++ b/packages/search/README.md @@ -47,7 +47,7 @@ const results = await client.ft.search('idx:animals', '@species:{dog}'); { total: 2, documents: [ - { + { id: 'noderedis:animals:4', value: { name: 'Fido',