From fc73cb8736ddbeac35b84b3187faaa818a691cb2 Mon Sep 17 00:00:00 2001 From: leibale Date: Tue, 16 Nov 2021 21:14:47 -0500 Subject: [PATCH] fix #1718 - add support for buffers in pubsub --- .github/README.md | 12 ++ packages/client/lib/client/commands-queue.ts | 209 +++++++++++++------ packages/client/lib/client/index.spec.ts | 24 ++- packages/client/lib/client/index.ts | 83 ++++++-- packages/client/lib/commands/PUBLISH.ts | 4 +- 5 files changed, 244 insertions(+), 88 deletions(-) diff --git a/.github/README.md b/.github/README.md index c704680510..2ab819c7d4 100644 --- a/.github/README.md +++ b/.github/README.md @@ -150,6 +150,18 @@ Publish a message on a channel: await publisher.publish('channel', 'message'); ``` +There is support for buffers as well: + +```typescript +await subscriber.subscribe('channel', (message) => { + console.log(message); // +}, true); + +await subscriber.pSubscribe('channe*', (message, channel) => { + console.log(message, channel); // , +}, true); +``` + ### Scan Iterator [`SCAN`](https://redis.io/commands/scan) results can be looped over using [async iterators](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/asyncIterator): diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 4fcae1e8b6..908c58d23a 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -39,9 +39,29 @@ export enum PubSubUnsubscribeCommands { PUNSUBSCRIBE = 'PUNSUBSCRIBE' } -export type PubSubListener = (message: string, channel: string) => unknown; +type PubSubArgumentTypes = Buffer | string; -export type PubSubListenersMap = Map>; +export type PubSubListener< + BUFFER_MODE extends boolean = false, + T = BUFFER_MODE extends true ? Buffer : string +> = (message: T, channel: T) => unknown; + +interface PubSubListeners { + buffers: Set>; + strings: Set>; +} + +type PubSubListenersMap = Map; + +interface PubSubState { + subscribing: number; + subscribed: number; + unsubscribing: number; + listeners: { + channels: PubSubListenersMap; + patterns: PubSubListenersMap; + }; +} export default class RedisCommandsQueue { static #flushQueue(queue: LinkedList, err: Error): void { @@ -50,10 +70,20 @@ export default class RedisCommandsQueue { } } - static #emitPubSubMessage(listeners: Set, message: string, channel: string): void { - for (const listener of listeners) { + static #emitPubSubMessage(listenersMap: PubSubListenersMap, message: Buffer, channel: Buffer, pattern?: Buffer): void { + const keyString = (pattern || channel).toString(), + listeners = listenersMap.get(keyString)!; + for (const listener of listeners.buffers) { listener(message, channel); } + + if (!listeners.strings.size) return; + + const messageString = message.toString(), + channelString = pattern ? channel.toString() : keyString; + for (const listener of listeners.strings) { + listener(messageString, channelString); + } } readonly #maxLength: number | null | undefined; @@ -62,41 +92,43 @@ export default class RedisCommandsQueue { readonly #waitingForReply = new LinkedList(); - readonly #pubSubState = { - subscribing: 0, - subscribed: 0, - unsubscribing: 0 - }; + #pubSubState: PubSubState | undefined; - readonly #pubSubListeners = { - channels: new Map(), - patterns: new Map() + static readonly #PUB_SUB_MESSAGES = { + message: Buffer.from('message'), + pMessage: Buffer.from('pmessage'), + subscribe: Buffer.from('subscribe'), + pSubscribe: Buffer.from('psubscribe'), + unsubscribe: Buffer.from('unsunscribe'), + pUnsubscribe: Buffer.from('punsubscribe') }; readonly #parser = new RedisParser({ returnReply: (reply: unknown) => { - if ((this.#pubSubState.subscribing || this.#pubSubState.subscribed) && Array.isArray(reply)) { - switch (reply[0]) { - case 'message': - return RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubListeners.channels.get(reply[1])!, - reply[2], - reply[1] - ); - - case 'pmessage': - return RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubListeners.patterns.get(reply[1])!, - reply[3], - reply[2] - ); - - case 'subscribe': - case 'psubscribe': - if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { - this.#shiftWaitingForReply().resolve(); - } - return; + if (this.#pubSubState && Array.isArray(reply)) { + if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) { + return RedisCommandsQueue.#emitPubSubMessage( + this.#pubSubState.listeners.channels, + reply[2], + reply[1] + ); + } else if (RedisCommandsQueue.#PUB_SUB_MESSAGES.pMessage.equals(reply[0])) { + return RedisCommandsQueue.#emitPubSubMessage( + this.#pubSubState.listeners.patterns, + reply[3], + reply[2], + reply[1] + ); + } else if ( + RedisCommandsQueue.#PUB_SUB_MESSAGES.subscribe.equals(reply[0]) || + RedisCommandsQueue.#PUB_SUB_MESSAGES.pSubscribe.equals(reply[0]) || + RedisCommandsQueue.#PUB_SUB_MESSAGES.unsubscribe.equals(reply[0]) || + RedisCommandsQueue.#PUB_SUB_MESSAGES.pUnsubscribe.equals(reply[0]) + ) { + if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { + this.#shiftWaitingForReply().resolve(); + } + return; } } @@ -112,7 +144,7 @@ export default class RedisCommandsQueue { } addCommand(args: RedisCommandArguments, options?: QueueCommandOptions, bufferMode?: boolean): Promise { - if (this.#pubSubState.subscribing || this.#pubSubState.subscribed) { + if (this.#pubSubState) { return Promise.reject(new Error('Cannot send commands in PubSub mode')); } else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) { return Promise.reject(new Error('The queue is full')); @@ -126,7 +158,7 @@ export default class RedisCommandsQueue { chainId: options?.chainId, bufferMode, resolve, - reject, + reject }); if (options?.signal) { @@ -153,17 +185,41 @@ export default class RedisCommandsQueue { }); } - subscribe(command: PubSubSubscribeCommands, channels: string | Array, listener: PubSubListener): Promise { - const channelsToSubscribe: Array = [], - listeners = command === PubSubSubscribeCommands.SUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns; + #initiatePubSubState(): PubSubState { + return this.#pubSubState ??= { + subscribed: 0, + subscribing: 0, + unsubscribing: 0, + listeners: { + channels: new Map(), + patterns: new Map() + } + }; + } + + subscribe( + command: PubSubSubscribeCommands, + channels: PubSubArgumentTypes | Array, + listener: PubSubListener, + bufferMode?: T + ): Promise { + const pubSubState = this.#initiatePubSubState(), + channelsToSubscribe: Array = [], + listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? pubSubState.listeners.channels : pubSubState.listeners.patterns; for (const channel of (Array.isArray(channels) ? channels : [channels])) { - if (listeners.has(channel)) { - listeners.get(channel)!.add(listener); - continue; + const channelString = typeof channel === 'string' ? channel : channel.toString(); + let listeners = listenersMap.get(channelString); + if (!listeners) { + listeners = { + buffers: new Set(), + strings: new Set() + }; + listenersMap.set(channelString, listeners); + channelsToSubscribe.push(channel); } - listeners.set(channel, new Set([listener])); - channelsToSubscribe.push(channel); + // https://github.com/microsoft/TypeScript/issues/23132 + (bufferMode ? listeners.buffers : listeners.strings).add(listener as any); } if (!channelsToSubscribe.length) { @@ -173,8 +229,20 @@ export default class RedisCommandsQueue { return this.#pushPubSubCommand(command, channelsToSubscribe); } - unsubscribe(command: PubSubUnsubscribeCommands, channels?: string | Array, listener?: PubSubListener): Promise { - const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns; + unsubscribe( + command: PubSubUnsubscribeCommands, + channels?: string | Array, + listener?: PubSubListener, + bufferMode?: T + ): Promise { + if (!this.#pubSubState) { + return Promise.resolve(); + } + + const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? + this.#pubSubState.listeners.channels : + this.#pubSubState.listeners.patterns; + if (!channels) { const size = listeners.size; listeners.clear(); @@ -183,13 +251,16 @@ export default class RedisCommandsQueue { const channelsToUnsubscribe = []; for (const channel of (Array.isArray(channels) ? channels : [channels])) { - const set = listeners.get(channel); - if (!set) continue; + const sets = listeners.get(channel); + if (!sets) continue; - let shouldUnsubscribe = !listener; + let shouldUnsubscribe; if (listener) { - set.delete(listener); - shouldUnsubscribe = set.size === 0; + // https://github.com/microsoft/TypeScript/issues/23132 + (bufferMode ? sets.buffers : sets.strings).delete(listener as any); + shouldUnsubscribe = !sets.buffers.size && !sets.strings.size; + } else { + shouldUnsubscribe = true; } if (shouldUnsubscribe) { @@ -205,11 +276,12 @@ export default class RedisCommandsQueue { return this.#pushPubSubCommand(command, channelsToUnsubscribe); } - #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array): Promise { + #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array): Promise { return new Promise((resolve, reject) => { - const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, + const pubSubState = this.#initiatePubSubState(), + isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing', - commandArgs: Array = [command]; + commandArgs: Array = [command]; let channelsCounter: number; if (typeof channels === 'number') { // unsubscribe only @@ -219,18 +291,26 @@ export default class RedisCommandsQueue { channelsCounter = channels.length; } - this.#pubSubState[inProgressKey] += channelsCounter; + pubSubState[inProgressKey] += channelsCounter; this.#waitingToBeSent.push({ args: commandArgs, channelsCounter, + bufferMode: true, resolve: () => { - this.#pubSubState[inProgressKey] -= channelsCounter; - this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1); + pubSubState[inProgressKey] -= channelsCounter; + if (isSubscribe) { + pubSubState.subscribed += channelsCounter; + } else { + pubSubState.subscribed -= channelsCounter; + if (!pubSubState.subscribed && !pubSubState.subscribing && !pubSubState.subscribed) { + this.#pubSubState = undefined; + } + } resolve(); }, reject: () => { - this.#pubSubState[inProgressKey] -= channelsCounter; + pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1); reject(); } }); @@ -238,16 +318,14 @@ export default class RedisCommandsQueue { } resubscribe(): Promise | undefined { - if (!this.#pubSubState.subscribed && !this.#pubSubState.subscribing) { + if (!this.#pubSubState) { return; } - this.#pubSubState.subscribed = this.#pubSubState.subscribing = 0; - // TODO: acl error on one channel/pattern will reject the whole command return Promise.all([ - this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubListeners.channels.keys()]), - this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubListeners.patterns.keys()]) + this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubState.listeners.channels.keys()]), + this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubState.listeners.patterns.keys()]) ]); } @@ -269,7 +347,10 @@ export default class RedisCommandsQueue { } parseResponse(data: Buffer): void { - this.#parser.setReturnBuffers(!!this.#waitingForReply.head?.value.bufferMode); + this.#parser.setReturnBuffers( + !!this.#waitingForReply.head?.value.bufferMode || + !!this.#pubSubState?.subscribed + ); this.#parser.execute(data); } diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 3f0bca45e2..41e7526eb2 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -561,17 +561,27 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('PubSub', async publisher => { + function assertStringListener(message: string, channel: string) { + assert.ok(typeof message === 'string'); + assert.ok(typeof channel === 'string'); + } + + function assertBufferListener(message: Buffer, channel: Buffer) { + assert.ok(Buffer.isBuffer(message)); + assert.ok(Buffer.isBuffer(channel)); + } + const subscriber = publisher.duplicate(); await subscriber.connect(); try { - const channelListener1 = spy(), - channelListener2 = spy(), - patternListener = spy(); + const channelListener1 = spy(assertBufferListener), + channelListener2 = spy(assertStringListener), + patternListener = spy(assertStringListener); await Promise.all([ - subscriber.subscribe('channel', channelListener1), + subscriber.subscribe('channel', channelListener1, true), subscriber.subscribe('channel', channelListener2), subscriber.pSubscribe('channel*', patternListener) ]); @@ -580,14 +590,14 @@ describe('Client', () => { waitTillBeenCalled(channelListener1), waitTillBeenCalled(channelListener2), waitTillBeenCalled(patternListener), - publisher.publish('channel', 'message') + publisher.publish(Buffer.from('channel'), Buffer.from('message')) ]); - assert.ok(channelListener1.calledOnceWithExactly('message', 'channel')); + assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel'))); assert.ok(channelListener2.calledOnceWithExactly('message', 'channel')); assert.ok(patternListener.calledOnceWithExactly('message', 'channel')); - await subscriber.unsubscribe('channel', channelListener1); + await subscriber.unsubscribe('channel', channelListener1, true); await Promise.all([ waitTillBeenCalled(channelListener2), waitTillBeenCalled(patternListener), diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 8802631eda..c520e36a08 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -388,42 +388,93 @@ export default class RedisClient select = this.SELECT; - SUBSCRIBE(channels: string | Array, listener: PubSubListener): Promise { - return this.#subscribe(PubSubSubscribeCommands.SUBSCRIBE, channels, listener); + #subscribe( + command: PubSubSubscribeCommands, + channels: string | Array, + listener: PubSubListener, + bufferMode?: T + ): Promise { + const promise = this.#queue.subscribe( + command, + channels, + listener, + bufferMode + ); + this.#tick(); + return promise; + } + + SUBSCRIBE( + channels: string | Array, + listener: PubSubListener, + bufferMode?: T + ): Promise { + return this.#subscribe( + PubSubSubscribeCommands.SUBSCRIBE, + channels, + listener, + bufferMode + ); } subscribe = this.SUBSCRIBE; - PSUBSCRIBE(patterns: string | Array, listener: PubSubListener): Promise { - return this.#subscribe(PubSubSubscribeCommands.PSUBSCRIBE, patterns, listener); + PSUBSCRIBE( + patterns: string | Array, + listener: PubSubListener, + bufferMode?: T + ): Promise { + return this.#subscribe( + PubSubSubscribeCommands.PSUBSCRIBE, + patterns, + listener, + bufferMode + ); } pSubscribe = this.PSUBSCRIBE; - #subscribe(command: PubSubSubscribeCommands, channels: string | Array, listener: PubSubListener): Promise { - const promise = this.#queue.subscribe(command, channels, listener); + #unsubscribe( + command: PubSubUnsubscribeCommands, + channels?: string | Array, + listener?: PubSubListener, + bufferMode?: T + ): Promise { + const promise = this.#queue.unsubscribe(command, channels, listener, bufferMode); this.#tick(); return promise; } - UNSUBSCRIBE(channels?: string | Array, listener?: PubSubListener): Promise { - return this.#unsubscribe(PubSubUnsubscribeCommands.UNSUBSCRIBE, channels, listener); + UNSUBSCRIBE( + channels?: string | Array, + listener?: PubSubListener, + bufferMode?: T + ): Promise { + return this.#unsubscribe( + PubSubUnsubscribeCommands.UNSUBSCRIBE, + channels, + listener, + bufferMode + ); } unsubscribe = this.UNSUBSCRIBE; - PUNSUBSCRIBE(patterns?: string | Array, listener?: PubSubListener): Promise { - return this.#unsubscribe(PubSubUnsubscribeCommands.PUNSUBSCRIBE, patterns, listener); + PUNSUBSCRIBE( + patterns?: string | Array, + listener?: PubSubListener, + bufferMode?: T + ): Promise { + return this.#unsubscribe( + PubSubUnsubscribeCommands.PUNSUBSCRIBE, + patterns, + listener, + bufferMode + ); } pUnsubscribe = this.PUNSUBSCRIBE; - #unsubscribe(command: PubSubUnsubscribeCommands, channels?: string | Array, listener?: PubSubListener): Promise { - const promise = this.#queue.unsubscribe(command, channels, listener); - this.#tick(); - return promise; - } - QUIT(): Promise { return this.#socket.quit(() => { const quitPromise = this.#queue.addCommand(['QUIT']); diff --git a/packages/client/lib/commands/PUBLISH.ts b/packages/client/lib/commands/PUBLISH.ts index eda5234df2..cbfcaabd1c 100644 --- a/packages/client/lib/commands/PUBLISH.ts +++ b/packages/client/lib/commands/PUBLISH.ts @@ -1,4 +1,6 @@ -export function transformArguments(channel: string, message: string): Array { +import { RedisCommandArguments } from '.'; + +export function transformArguments(channel: string | Buffer, message: string | Buffer): RedisCommandArguments { return ['PUBLISH', channel, message]; }