1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00

Merge branch 'v4.0' of github.com:redis/node-redis

This commit is contained in:
leibale
2021-11-24 21:23:35 -05:00
11 changed files with 486 additions and 190 deletions

298
.github/README.md vendored Normal file
View File

@@ -0,0 +1,298 @@
# Node-Redis
[![Tests](https://img.shields.io/github/workflow/status/redis/node-redis/Tests/master.svg?label=tests)](https://codecov.io/gh/redis/node-redis)
[![Coverage](https://codecov.io/gh/redis/node-redis/branch/master/graph/badge.svg?token=xcfqHhJC37)](https://codecov.io/gh/redis/node-redis)
[![License](https://img.shields.io/github/license/redis/node-redis.svg)](https://codecov.io/gh/redis/node-redis)
[![Chat](https://img.shields.io/discord/697882427875393627.svg)](https://discord.gg/XMMVgxUm)
## Installation
```bash
npm install redis@next
```
> :warning: The new interface is clean and cool, but if you have an existing code base, you'll want to read the [migration guide](../docs/v3-to-v4.md).
## Usage
### Basic Example
```typescript
import { createClient } from 'redis';
(async () => {
const client = createClient();
client.on('error', (err) => console.log('Redis Client Error', err));
await client.connect();
await client.set('key', 'value');
const value = await client.get('key');
})();
```
The above code connects to localhost on port 6379. To connect to a different host or port, use a connection string in the format `redis[s]://[[username][:password]@][host][:port][/db-number]`:
```typescript
createClient({
url: 'redis://alice:foobared@awesome.redis.server:6380'
});
```
You can also use discrete parameters, UNIX sockets, and even TLS to connect. Details can be found in the [client configuration guide](../docs/client-configuration.md).
### Redis Commands
There is built-in support for all of the [out-of-the-box Redis commands](https://redis.io/commands). They are exposed using the raw Redis command names (`HSET`, `HGETALL`, etc.) and a friendlier camel-cased version (`hSet`, `hGetAll`, etc.):
```typescript
// raw Redis commands
await client.HSET('key', 'field', 'value');
await client.HGETALL('key');
// friendly JavaScript commands
await client.hSet('key', 'field', 'value');
await client.hGetAll('key');
```
Modifiers to commands are specified using a JavaScript object:
```typescript
await client.set('key', 'value', {
EX: 10,
NX: true
});
```
Replies will be transformed into useful data structures:
```typescript
await client.hGetAll('key'); // { field1: 'value1', field2: 'value2' }
await client.hVals('key'); // ['value1', 'value2']
```
### Unsupported Redis Commands
If you want to run commands and/or use arguments that Node Redis doesn't know about (yet!) use `.sendCommand()`:
```typescript
await client.sendCommand(['SET', 'key', 'value', 'NX']); // 'OK'
await client.sendCommand(['HGETALL', 'key']); // ['key1', 'field1', 'key2', 'field2']
```
### Transactions (Multi/Exec)
Start a [transaction](https://redis.io/topics/transactions) by calling `.multi()`, then chaining your commands. When you're done, call `.exec()` and you'll get an array back with your results:
```typescript
await client.set('another-key', 'another-value');
const [setKeyReply, otherKeyValue] = await client
.multi()
.set('key', 'value')
.get('another-key')
.exec(); // ['OK', 'another-value']
```
You can also [watch](https://redis.io/topics/transactions#optimistic-locking-using-check-and-set) keys by calling `.watch()`. Your transaction will abort if any of the watched keys change.
To dig deeper into transactions, check out the [Isolated Execution Guide](../docs/isolated-execution.md).
### Blocking Commands
Any command can be run on a new connection by specifying the `isolated` option. The newly created connection is closed when the command's `Promise` is fulfilled.
This pattern works especially well for blocking commands—such as `BLPOP` and `BLMOVE`:
```typescript
import { commandOptions } from 'redis';
const blPopPromise = client.blPop(commandOptions({ isolated: true }), 'key', 0);
await client.lPush('key', ['1', '2']);
await blPopPromise; // '2'
```
To learn more about isolated execution, check out the [guide](../docs/isolated-execution.md).
### Pub/Sub
Subscribing to a channel requires a dedicated stand-alone connection. You can easily get one by `.duplicate()`ing an existing Redis connection.
```typescript
const subscriber = client.duplicate();
await subscriber.connect();
```
Once you have one, simply subscribe and unsubscribe as needed:
```typescript
await subscriber.subscribe('channel', (message) => {
console.log(message); // 'message'
});
await subscriber.pSubscribe('channe*', (message, channel) => {
console.log(message, channel); // 'message', 'channel'
});
await subscriber.unsubscribe('channel');
await subscriber.pUnsubscribe('channe*');
```
Publish a message on a channel:
```typescript
await publisher.publish('channel', 'message');
```
### Scan Iterator
[`SCAN`](https://redis.io/commands/scan) results can be looped over using [async iterators](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/asyncIterator):
```typescript
for await (const key of client.scanIterator()) {
// use the key!
await client.get(key);
}
```
This works with `HSCAN`, `SSCAN`, and `ZSCAN` too:
```typescript
for await (const { field, value } of client.hScanIterator('hash')) {}
for await (const member of client.sScanIterator('set')) {}
for await (const { score, member } of client.zScanIterator('sorted-set')) {}
```
You can override the default options by providing a configuration object:
```typescript
client.scanIterator({
TYPE: 'string', // `SCAN` only
MATCH: 'patter*',
COUNT: 100
});
```
### Lua Scripts
Define new functions using [Lua scripts](https://redis.io/commands/eval) which execute on the Redis server:
```typescript
import { createClient, defineScript } from 'redis';
(async () => {
const client = createClient({
scripts: {
add: defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT:
'local val = redis.pcall("GET", KEYS[1]);' +
'return val + ARGV[1];',
transformArguments(key: string, toAdd: number): Array<string> {
return [key, toAdd.toString()];
},
transformReply(reply: number): number {
return reply;
}
})
}
});
await client.connect();
await client.set('key', '1');
await client.add('key', 2); // 3
})();
```
### Disconnecting
There are two functions that disconnect a client from the Redis server. In most scenarios you should use `.quit()` to ensure that pending commands are sent to Redis before closing a connection.
#### `.QUIT()`/`.quit()`
Gracefully close a client's connection to Redis, by sending the [`QUIT`](https://redis.io/commands/quit) command to the server. Before quitting, the client executes any remaining commands in its queue, and will receive replies from Redis for each of them.
```typescript
const [ping, get, quit] = await Promise.all([
client.ping(),
client.get('key'),
client.quit()
]); // ['PONG', null, 'OK']
try {
await client.get('key');
} catch (err) {
// ClosedClient Error
}
```
#### `.disconnect()`
Forcibly close a client's connection to Redis immediately. Calling `disconnect` will not send further pending commands to the Redis server, or wait for or parse outstanding responses.
```typescript
await client.disconnect();
```
### Auto-Pipelining
Node Redis will automatically pipeline requests that are made during the same "tick".
```typescript
client.set('Tm9kZSBSZWRpcw==', 'users:1');
client.sAdd('users:1:tokens', 'Tm9kZSBSZWRpcw==');
```
Of course, if you don't do something with your Promises you're certain to get [unhandled Promise exceptions](https://nodejs.org/api/process.html#process_event_unhandledrejection). To take advantage of auto-pipelining and handle your Promises, use `Promise.all()`.
```typescript
await Promise.all([
client.set('Tm9kZSBSZWRpcw==', 'users:1'),
client.sAdd('users:1:tokens', 'Tm9kZSBSZWRpcw==')
]);
```
### Clustering
Check out the [Clustering Guide](../docs/clustering.md) when using Node Redis to connect to a Redis Cluster.
## Supported Redis versions
Node Redis is supported with the following versions of Redis:
| Version | Supported |
|---------|--------------------|
| 6.2.z | :heavy_check_mark: |
| 6.0.z | :heavy_check_mark: |
| 5.y.z | :heavy_check_mark: |
| < 5.0 | :x: |
> Node Redis should work with older versions of Redis, but it is not fully tested and we cannot offer support.
## Packages
| Name | Description |
|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [redis](../) | [![Downloads](https://img.shields.io/npm/dm/redis.svg)](https://www.npmjs.com/package/redis/v/next) [![Version](https://img.shields.io/npm/v/redis/next.svg)](https://www.npmjs.com/package/redis/v/next) |
| [@node-redis/client](../packages/client) | [![Downloads](https://img.shields.io/npm/dm/@node-redis/client.svg)](https://www.npmjs.com/package/@node-redis/client/v/next) [![Version](https://img.shields.io/npm/v/@node-redis/client/next.svg)](https://www.npmjs.com/package/@node-redis/client/v/next) |
| [@node-redis/json](../packages/json) | [![Downloads](https://img.shields.io/npm/dm/@node-redis/json.svg)](https://www.npmjs.com/package/@node-redis/json/v/next) [![Version](https://img.shields.io/npm/v/@node-redis/json/next.svg)](https://www.npmjs.com/package/@node-redis/json/v/next) [Redis JSON](https://oss.redis.com/redisjson/) commands |
| [@node-redis/search](../packages/search) | [![Downloads](https://img.shields.io/npm/dm/@node-redis/search.svg)](https://www.npmjs.com/package/@node-redis/search/v/next) [![Version](https://img.shields.io/npm/v/@node-redis/search/next.svg)](https://www.npmjs.com/package/@node-redis/search/v/next) [Redis Search](https://oss.redis.com/redisearch/) commands |
## Contributing
If you'd like to contribute, check out the [contributing guide](CONTRIBUTING.md).
Thank you to all the people who already contributed to Node Redis!
[![Contributors](https://contrib.rocks/image?repo=redis/node-redis)](https://github.com/redis/node-redis/graphs/contributors)
## License
This repository is licensed under the "MIT" license. See [LICENSE](LICENSE).

74
examples/search+json.js Normal file
View File

@@ -0,0 +1,74 @@
// Use Redis Search and Redis JSON
import { createClient, SchemaFieldTypes, AggregateGroupByReducers, AggregateSteps } from 'redis';
async function searchPlusJson() {
const client = createClient();
await client.connect();
// Create an index
await client.ft.create('users', {
'$.name': {
type: SchemaFieldTypes.TEXT,
SORTABLE: 'UNF'
},
'$.age': SchemaFieldTypes.NUMERIC,
'$.coins': SchemaFieldTypes.NUMERIC
}, {
ON: 'JSON'
});
// Add some users
await Promise.all([
client.json.set('users:1', '$', {
name: 'Alice',
age: 32,
coins: 100
}),
client.json.set('users:2', '$', {
name: 'Bob',
age: 23,
coins: 15
})
]);
// Search all users under 30
// TODO: why "$.age:[-inf, 30]" does not work?
console.log(
await client.ft.search('users', '*')
);
// {
// total: 1,
// documents: [...]
// }
// Some aggrigrations
console.log(
await client.ft.aggregate('users', '*', {
STEPS: [{
type: AggregateSteps.GROUPBY,
REDUCE: [{
type: AggregateGroupByReducers.AVG,
property: '$.age',
AS: 'avarageAge'
}, {
type: AggregateGroupByReducers.SUM,
property: '$.coins',
AS: 'totalCoins'
}]
}]
})
);
// {
// total: 2,
// results: [{
// avarageAvg: '27.5',
// totalCoins: '115'
// }]
// }
await client.quit();
}
searchPlusJson();

28
package-lock.json generated
View File

@@ -2836,6 +2836,9 @@
"resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz",
"integrity": "sha512-Ne+eE4r0/iWnpAxD852z3A+N0Bt5RN//NjJwRd2VFHEmrywxf5vsZlh4R6lixl6B+wz/8d+maTSAkN1FIkI3LQ==",
"dev": true,
"dependencies": {
"type-fest": "^0.20.2"
},
"engines": {
"node": ">=10"
},
@@ -7497,6 +7500,31 @@
"eslint-visitor-keys": "^3.0.0"
}
},
"@typescript-eslint/typescript-estree": {
"version": "5.4.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-5.4.0.tgz",
"integrity": "sha512-nhlNoBdhKuwiLMx6GrybPT3SFILm5Gij2YBdPEPFlYNFAXUJWX6QRgvi/lwVoadaQEFsizohs6aFRMqsXI2ewA==",
"dev": true,
"requires": {
"@typescript-eslint/types": "5.4.0",
"@typescript-eslint/visitor-keys": "5.4.0",
"debug": "^4.3.2",
"globby": "^11.0.4",
"is-glob": "^4.0.3",
"semver": "^7.3.5",
"tsutils": "^3.21.0"
}
},
"@typescript-eslint/visitor-keys": {
"version": "5.4.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-5.4.0.tgz",
"integrity": "sha512-PVbax7MeE7tdLfW5SA0fs8NGVVr+buMPrcj+CWYWPXsZCH8qZ1THufDzbXm1xrZ2b2PA1iENJ0sRq5fuUtvsJg==",
"dev": true,
"requires": {
"@typescript-eslint/types": "5.4.0",
"eslint-visitor-keys": "^3.0.0"
}
},
"@ungap/promise-all-settled": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/@ungap/promise-all-settled/-/promise-all-settled-1.1.2.tgz",

View File

@@ -39,29 +39,9 @@ export enum PubSubUnsubscribeCommands {
PUNSUBSCRIBE = 'PUNSUBSCRIBE'
}
type PubSubArgumentTypes = Buffer | string;
export type PubSubListener = (message: string, channel: string) => unknown;
export type PubSubListener<
BUFFER_MODE extends boolean = false,
T = BUFFER_MODE extends true ? Buffer : string
> = (message: T, channel: T) => unknown;
interface PubSubListeners {
buffers: Set<PubSubListener<true>>;
strings: Set<PubSubListener<false>>;
}
type PubSubListenersMap = Map<string, PubSubListeners>;
interface PubSubState {
subscribing: number;
subscribed: number;
unsubscribing: number;
listeners: {
channels: PubSubListenersMap;
patterns: PubSubListenersMap;
};
}
export type PubSubListenersMap = Map<string, Set<PubSubListener>>;
export default class RedisCommandsQueue {
static #flushQueue<T extends CommandWaitingForReply>(queue: LinkedList<T>, err: Error): void {
@@ -70,20 +50,10 @@ export default class RedisCommandsQueue {
}
}
static #emitPubSubMessage(listenersMap: PubSubListenersMap, message: Buffer, channel: Buffer, pattern?: Buffer): void {
const keyString = (pattern || channel).toString(),
listeners = listenersMap.get(keyString)!;
for (const listener of listeners.buffers) {
static #emitPubSubMessage(listeners: Set<PubSubListener>, message: string, channel: string): void {
for (const listener of listeners) {
listener(message, channel);
}
if (!listeners.strings.size) return;
const messageString = message.toString(),
channelString = pattern ? channel.toString() : keyString;
for (const listener of listeners.strings) {
listener(messageString, channelString);
}
}
readonly #maxLength: number | null | undefined;
@@ -92,43 +62,41 @@ export default class RedisCommandsQueue {
readonly #waitingForReply = new LinkedList<CommandWaitingForReply>();
#pubSubState: PubSubState | undefined;
readonly #pubSubState = {
subscribing: 0,
subscribed: 0,
unsubscribing: 0
};
static readonly #PUB_SUB_MESSAGES = {
message: Buffer.from('message'),
pMessage: Buffer.from('pmessage'),
subscribe: Buffer.from('subscribe'),
pSubscribe: Buffer.from('psubscribe'),
unsubscribe: Buffer.from('unsunscribe'),
pUnsubscribe: Buffer.from('punsubscribe')
readonly #pubSubListeners = {
channels: <PubSubListenersMap>new Map(),
patterns: <PubSubListenersMap>new Map()
};
readonly #parser = new RedisParser({
returnReply: (reply: unknown) => {
if (this.#pubSubState && Array.isArray(reply)) {
if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) {
return RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubState.listeners.channels,
reply[2],
reply[1]
);
} else if (RedisCommandsQueue.#PUB_SUB_MESSAGES.pMessage.equals(reply[0])) {
return RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubState.listeners.patterns,
reply[3],
reply[2],
reply[1]
);
} else if (
RedisCommandsQueue.#PUB_SUB_MESSAGES.subscribe.equals(reply[0]) ||
RedisCommandsQueue.#PUB_SUB_MESSAGES.pSubscribe.equals(reply[0]) ||
RedisCommandsQueue.#PUB_SUB_MESSAGES.unsubscribe.equals(reply[0]) ||
RedisCommandsQueue.#PUB_SUB_MESSAGES.pUnsubscribe.equals(reply[0])
) {
if (--this.#waitingForReply.head!.value.channelsCounter! === 0) {
this.#shiftWaitingForReply().resolve();
}
return;
if ((this.#pubSubState.subscribing || this.#pubSubState.subscribed) && Array.isArray(reply)) {
switch (reply[0]) {
case 'message':
return RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubListeners.channels.get(reply[1])!,
reply[2],
reply[1]
);
case 'pmessage':
return RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubListeners.patterns.get(reply[1])!,
reply[3],
reply[2]
);
case 'subscribe':
case 'psubscribe':
if (--this.#waitingForReply.head!.value.channelsCounter! === 0) {
this.#shiftWaitingForReply().resolve();
}
return;
}
}
@@ -144,7 +112,7 @@ export default class RedisCommandsQueue {
}
addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions, bufferMode?: boolean): Promise<T> {
if (this.#pubSubState) {
if (this.#pubSubState.subscribing || this.#pubSubState.subscribed) {
return Promise.reject(new Error('Cannot send commands in PubSub mode'));
} else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
return Promise.reject(new Error('The queue is full'));
@@ -158,7 +126,7 @@ export default class RedisCommandsQueue {
chainId: options?.chainId,
bufferMode,
resolve,
reject
reject,
});
if (options?.signal) {
@@ -185,41 +153,17 @@ export default class RedisCommandsQueue {
});
}
#initiatePubSubState(): PubSubState {
return this.#pubSubState ??= {
subscribed: 0,
subscribing: 0,
unsubscribing: 0,
listeners: {
channels: new Map(),
patterns: new Map()
}
};
}
subscribe<T extends boolean>(
command: PubSubSubscribeCommands,
channels: PubSubArgumentTypes | Array<PubSubArgumentTypes>,
listener: PubSubListener<T>,
bufferMode?: T
): Promise<void> {
const pubSubState = this.#initiatePubSubState(),
channelsToSubscribe: Array<PubSubArgumentTypes> = [],
listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? pubSubState.listeners.channels : pubSubState.listeners.patterns;
subscribe(command: PubSubSubscribeCommands, channels: string | Array<string>, listener: PubSubListener): Promise<void> {
const channelsToSubscribe: Array<string> = [],
listeners = command === PubSubSubscribeCommands.SUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns;
for (const channel of (Array.isArray(channels) ? channels : [channels])) {
const channelString = typeof channel === 'string' ? channel : channel.toString();
let listeners = listenersMap.get(channelString);
if (!listeners) {
listeners = {
buffers: new Set(),
strings: new Set()
};
listenersMap.set(channelString, listeners);
channelsToSubscribe.push(channel);
if (listeners.has(channel)) {
listeners.get(channel)!.add(listener);
continue;
}
// https://github.com/microsoft/TypeScript/issues/23132
(bufferMode ? listeners.buffers : listeners.strings).add(listener as any);
listeners.set(channel, new Set([listener]));
channelsToSubscribe.push(channel);
}
if (!channelsToSubscribe.length) {
@@ -229,20 +173,8 @@ export default class RedisCommandsQueue {
return this.#pushPubSubCommand(command, channelsToSubscribe);
}
unsubscribe<T extends boolean>(
command: PubSubUnsubscribeCommands,
channels?: string | Array<string>,
listener?: PubSubListener<T>,
bufferMode?: T
): Promise<void> {
if (!this.#pubSubState) {
return Promise.resolve();
}
const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ?
this.#pubSubState.listeners.channels :
this.#pubSubState.listeners.patterns;
unsubscribe(command: PubSubUnsubscribeCommands, channels?: string | Array<string>, listener?: PubSubListener): Promise<void> {
const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns;
if (!channels) {
const size = listeners.size;
listeners.clear();
@@ -251,16 +183,13 @@ export default class RedisCommandsQueue {
const channelsToUnsubscribe = [];
for (const channel of (Array.isArray(channels) ? channels : [channels])) {
const sets = listeners.get(channel);
if (!sets) continue;
const set = listeners.get(channel);
if (!set) continue;
let shouldUnsubscribe;
let shouldUnsubscribe = !listener;
if (listener) {
// https://github.com/microsoft/TypeScript/issues/23132
(bufferMode ? sets.buffers : sets.strings).delete(listener as any);
shouldUnsubscribe = !sets.buffers.size && !sets.strings.size;
} else {
shouldUnsubscribe = true;
set.delete(listener);
shouldUnsubscribe = set.size === 0;
}
if (shouldUnsubscribe) {
@@ -276,12 +205,11 @@ export default class RedisCommandsQueue {
return this.#pushPubSubCommand(command, channelsToUnsubscribe);
}
#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array<PubSubArgumentTypes>): Promise<void> {
#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array<string>): Promise<void> {
return new Promise((resolve, reject) => {
const pubSubState = this.#initiatePubSubState(),
isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing',
commandArgs: Array<PubSubArgumentTypes> = [command];
commandArgs: Array<string> = [command];
let channelsCounter: number;
if (typeof channels === 'number') { // unsubscribe only
@@ -291,26 +219,18 @@ export default class RedisCommandsQueue {
channelsCounter = channels.length;
}
pubSubState[inProgressKey] += channelsCounter;
this.#pubSubState[inProgressKey] += channelsCounter;
this.#waitingToBeSent.push({
args: commandArgs,
channelsCounter,
bufferMode: true,
resolve: () => {
pubSubState[inProgressKey] -= channelsCounter;
if (isSubscribe) {
pubSubState.subscribed += channelsCounter;
} else {
pubSubState.subscribed -= channelsCounter;
if (!pubSubState.subscribed && !pubSubState.subscribing && !pubSubState.subscribed) {
this.#pubSubState = undefined;
}
}
this.#pubSubState[inProgressKey] -= channelsCounter;
this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1);
resolve();
},
reject: () => {
pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1);
this.#pubSubState[inProgressKey] -= channelsCounter;
reject();
}
});
@@ -318,14 +238,16 @@ export default class RedisCommandsQueue {
}
resubscribe(): Promise<any> | undefined {
if (!this.#pubSubState) {
if (!this.#pubSubState.subscribed && !this.#pubSubState.subscribing) {
return;
}
this.#pubSubState.subscribed = this.#pubSubState.subscribing = 0;
// TODO: acl error on one channel/pattern will reject the whole command
return Promise.all([
this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubState.listeners.channels.keys()]),
this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubState.listeners.patterns.keys()])
this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubListeners.channels.keys()]),
this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubListeners.patterns.keys()])
]);
}
@@ -347,10 +269,7 @@ export default class RedisCommandsQueue {
}
parseResponse(data: Buffer): void {
this.#parser.setReturnBuffers(
!!this.#waitingForReply.head?.value.bufferMode ||
!!this.#pubSubState?.subscribed
);
this.#parser.setReturnBuffers(!!this.#waitingForReply.head?.value.bufferMode);
this.#parser.execute(data);
}

View File

@@ -561,27 +561,17 @@ describe('Client', () => {
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('PubSub', async publisher => {
function assertStringListener(message: string, channel: string) {
assert.ok(typeof message === 'string');
assert.ok(typeof channel === 'string');
}
function assertBufferListener(message: Buffer, channel: Buffer) {
assert.ok(Buffer.isBuffer(message));
assert.ok(Buffer.isBuffer(channel));
}
const subscriber = publisher.duplicate();
await subscriber.connect();
try {
const channelListener1 = spy(assertBufferListener),
channelListener2 = spy(assertStringListener),
patternListener = spy(assertStringListener);
const channelListener1 = spy(),
channelListener2 = spy(),
patternListener = spy();
await Promise.all([
subscriber.subscribe('channel', channelListener1, true),
subscriber.subscribe('channel', channelListener1),
subscriber.subscribe('channel', channelListener2),
subscriber.pSubscribe('channel*', patternListener)
]);
@@ -590,14 +580,14 @@ describe('Client', () => {
waitTillBeenCalled(channelListener1),
waitTillBeenCalled(channelListener2),
waitTillBeenCalled(patternListener),
publisher.publish(Buffer.from('channel'), Buffer.from('message'))
publisher.publish('channel', 'message')
]);
assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel')));
assert.ok(channelListener1.calledOnceWithExactly('message', 'channel'));
assert.ok(channelListener2.calledOnceWithExactly('message', 'channel'));
assert.ok(patternListener.calledOnceWithExactly('message', 'channel'));
await subscriber.unsubscribe('channel', channelListener1, true);
await subscriber.unsubscribe('channel', channelListener1);
await Promise.all([
waitTillBeenCalled(channelListener2),
waitTillBeenCalled(patternListener),

View File

@@ -5,32 +5,21 @@ import { transformArguments } from './LINDEX';
describe('LINDEX', () => {
it('transformArguments', () => {
assert.deepEqual(
transformArguments('key', 0),
['LINDEX', 'key', '0']
transformArguments('key', 'element'),
['LINDEX', 'key', 'element']
);
});
describe('client.lIndex', () => {
testUtils.testWithClient('null', async client => {
assert.equal(
await client.lIndex('key', 0),
null
);
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('with value', async client => {
const [, lIndexReply] = await Promise.all([
client.lPush('key', 'element'),
client.lIndex('key', 0)
]);
assert.equal(lIndexReply, 'element');
}, GLOBAL.SERVERS.OPEN);
});
testUtils.testWithClient('client.lIndex', async client => {
assert.equal(
await client.lIndex('key', 'element'),
null
);
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithCluster('cluster.lIndex', async cluster => {
assert.equal(
await cluster.lIndex('key', 0),
await cluster.lIndex('key', 'element'),
null
);
}, GLOBAL.CLUSTERS.OPEN);

View File

@@ -2,8 +2,8 @@ export const FIRST_KEY_INDEX = 1;
export const IS_READ_ONLY = true;
export function transformArguments(key: string, index: number): Array<string> {
return ['LINDEX', key, index.toString()];
export function transformArguments(key: string, element: string): Array<string> {
return ['LINDEX', key, element];
}
export declare function transformReply(): string | null;

View File

@@ -1,6 +1,4 @@
import { RedisCommandArguments } from '.';
export function transformArguments(channel: string | Buffer, message: string | Buffer): RedisCommandArguments {
export function transformArguments(channel: string, message: string): Array<string> {
return ['PUBLISH', channel, message];
}