You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
fix #1766 - allow .quit() in PubSub mode
This commit is contained in:
@@ -1,15 +1,19 @@
|
|||||||
import * as LinkedList from 'yallist';
|
import * as LinkedList from 'yallist';
|
||||||
import { AbortError } from '../errors';
|
import { AbortError } from '../errors';
|
||||||
import { RedisCommandArguments, RedisCommandRawReply } from '../commands';
|
import { RedisCommandArguments, RedisCommandRawReply } from '../commands';
|
||||||
|
|
||||||
// We need to use 'require', because it's not possible with Typescript to import
|
// We need to use 'require', because it's not possible with Typescript to import
|
||||||
// classes that are exported as 'module.exports = class`, without esModuleInterop
|
// classes that are exported as 'module.exports = class`, without esModuleInterop
|
||||||
// set to true.
|
// set to true.
|
||||||
const RedisParser = require('redis-parser');
|
const RedisParser = require('redis-parser');
|
||||||
|
|
||||||
export interface QueueCommandOptions {
|
export interface QueueCommandOptions {
|
||||||
asap?: boolean;
|
asap?: boolean;
|
||||||
chainId?: symbol;
|
chainId?: symbol;
|
||||||
signal?: AbortSignal;
|
signal?: AbortSignal;
|
||||||
|
ignorePubSubMode?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface CommandWaitingToBeSent extends CommandWaitingForReply {
|
interface CommandWaitingToBeSent extends CommandWaitingForReply {
|
||||||
args: RedisCommandArguments;
|
args: RedisCommandArguments;
|
||||||
chainId?: symbol;
|
chainId?: symbol;
|
||||||
@@ -18,16 +22,19 @@ interface CommandWaitingToBeSent extends CommandWaitingForReply {
|
|||||||
listener(): void;
|
listener(): void;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
interface CommandWaitingForReply {
|
interface CommandWaitingForReply {
|
||||||
resolve(reply?: unknown): void;
|
resolve(reply?: unknown): void;
|
||||||
reject(err: Error): void;
|
reject(err: Error): void;
|
||||||
channelsCounter?: number;
|
channelsCounter?: number;
|
||||||
bufferMode?: boolean;
|
bufferMode?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
export enum PubSubSubscribeCommands {
|
export enum PubSubSubscribeCommands {
|
||||||
SUBSCRIBE = 'SUBSCRIBE',
|
SUBSCRIBE = 'SUBSCRIBE',
|
||||||
PSUBSCRIBE = 'PSUBSCRIBE'
|
PSUBSCRIBE = 'PSUBSCRIBE'
|
||||||
}
|
}
|
||||||
|
|
||||||
export enum PubSubUnsubscribeCommands {
|
export enum PubSubUnsubscribeCommands {
|
||||||
UNSUBSCRIBE = 'UNSUBSCRIBE',
|
UNSUBSCRIBE = 'UNSUBSCRIBE',
|
||||||
PUNSUBSCRIBE = 'PUNSUBSCRIBE'
|
PUNSUBSCRIBE = 'PUNSUBSCRIBE'
|
||||||
@@ -135,7 +142,7 @@ export default class RedisCommandsQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions, bufferMode?: boolean): Promise<T> {
|
addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions, bufferMode?: boolean): Promise<T> {
|
||||||
if (this.#pubSubState) {
|
if (this.#pubSubState && !options?.ignorePubSubMode) {
|
||||||
return Promise.reject(new Error('Cannot send commands in PubSub mode'));
|
return Promise.reject(new Error('Cannot send commands in PubSub mode'));
|
||||||
} else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
|
} else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
|
||||||
return Promise.reject(new Error('The queue is full'));
|
return Promise.reject(new Error('The queue is full'));
|
||||||
|
@@ -668,6 +668,16 @@ describe('Client', () => {
|
|||||||
await subscriber.disconnect();
|
await subscriber.disconnect();
|
||||||
}
|
}
|
||||||
}, GLOBAL.SERVERS.OPEN);
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
|
|
||||||
|
testUtils.testWithClient('should be able to quit in PubSub mode', async client => {
|
||||||
|
await client.subscribe('channel', () => {
|
||||||
|
// noop
|
||||||
|
});
|
||||||
|
|
||||||
|
await assert.doesNotReject(client.quit());
|
||||||
|
|
||||||
|
assert.equal(client.isOpen, false);
|
||||||
|
}, GLOBAL.SERVERS.OPEN);
|
||||||
});
|
});
|
||||||
|
|
||||||
testUtils.testWithClient('ConnectionTimeoutError', async client => {
|
testUtils.testWithClient('ConnectionTimeoutError', async client => {
|
||||||
|
@@ -487,7 +487,9 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
|
|||||||
|
|
||||||
QUIT(): Promise<void> {
|
QUIT(): Promise<void> {
|
||||||
return this.#socket.quit(() => {
|
return this.#socket.quit(() => {
|
||||||
const quitPromise = this.#queue.addCommand(['QUIT']);
|
const quitPromise = this.#queue.addCommand(['QUIT'], {
|
||||||
|
ignorePubSubMode: true
|
||||||
|
});
|
||||||
this.#tick();
|
this.#tick();
|
||||||
return Promise.all([
|
return Promise.all([
|
||||||
quitPromise,
|
quitPromise,
|
||||||
|
Reference in New Issue
Block a user