From ac808ea781c4cdcc13e4e7a68777532d646b56c9 Mon Sep 17 00:00:00 2001 From: leibale Date: Wed, 8 Dec 2021 10:28:34 -0500 Subject: [PATCH] fix #1766 - allow .quit() in PubSub mode --- packages/client/lib/client/commands-queue.ts | 9 ++++++++- packages/client/lib/client/index.spec.ts | 10 ++++++++++ packages/client/lib/client/index.ts | 4 +++- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index efa8082090..52f86c6375 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,15 +1,19 @@ import * as LinkedList from 'yallist'; import { AbortError } from '../errors'; import { RedisCommandArguments, RedisCommandRawReply } from '../commands'; + // We need to use 'require', because it's not possible with Typescript to import // classes that are exported as 'module.exports = class`, without esModuleInterop // set to true. const RedisParser = require('redis-parser'); + export interface QueueCommandOptions { asap?: boolean; chainId?: symbol; signal?: AbortSignal; + ignorePubSubMode?: boolean; } + interface CommandWaitingToBeSent extends CommandWaitingForReply { args: RedisCommandArguments; chainId?: symbol; @@ -18,16 +22,19 @@ interface CommandWaitingToBeSent extends CommandWaitingForReply { listener(): void; }; } + interface CommandWaitingForReply { resolve(reply?: unknown): void; reject(err: Error): void; channelsCounter?: number; bufferMode?: boolean; } + export enum PubSubSubscribeCommands { SUBSCRIBE = 'SUBSCRIBE', PSUBSCRIBE = 'PSUBSCRIBE' } + export enum PubSubUnsubscribeCommands { UNSUBSCRIBE = 'UNSUBSCRIBE', PUNSUBSCRIBE = 'PUNSUBSCRIBE' @@ -135,7 +142,7 @@ export default class RedisCommandsQueue { } addCommand(args: RedisCommandArguments, options?: QueueCommandOptions, bufferMode?: boolean): Promise { - if (this.#pubSubState) { + if (this.#pubSubState && !options?.ignorePubSubMode) { return Promise.reject(new Error('Cannot send commands in PubSub mode')); } else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) { return Promise.reject(new Error('The queue is full')); diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 01154e9dd1..fbe38adafc 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -668,6 +668,16 @@ describe('Client', () => { await subscriber.disconnect(); } }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should be able to quit in PubSub mode', async client => { + await client.subscribe('channel', () => { + // noop + }); + + await assert.doesNotReject(client.quit()); + + assert.equal(client.isOpen, false); + }, GLOBAL.SERVERS.OPEN); }); testUtils.testWithClient('ConnectionTimeoutError', async client => { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 11a6823392..d3655b0341 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -487,7 +487,9 @@ export default class RedisClient QUIT(): Promise { return this.#socket.quit(() => { - const quitPromise = this.#queue.addCommand(['QUIT']); + const quitPromise = this.#queue.addCommand(['QUIT'], { + ignorePubSubMode: true + }); this.#tick(); return Promise.all([ quitPromise,