From c74e043ee14c655b168f6f7f2fbd22030f306470 Mon Sep 17 00:00:00 2001 From: leibale Date: Wed, 24 Nov 2021 21:37:31 -0500 Subject: [PATCH] fix merge --- packages/client/lib/client/commands-queue.ts | 233 ++++++++++++------- packages/client/lib/client/index.spec.ts | 31 +-- packages/client/lib/commands/LINDEX.spec.ts | 32 ++- packages/client/lib/commands/LINDEX.ts | 7 +- packages/client/lib/commands/PUBLISH.ts | 4 +- 5 files changed, 189 insertions(+), 118 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 4fcae1e8b6..480d7d5140 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,18 +1,15 @@ import * as LinkedList from 'yallist'; import { AbortError } from '../errors'; import { RedisCommandArguments, RedisCommandRawReply } from '../commands'; - // We need to use 'require', because it's not possible with Typescript to import // classes that are exported as 'module.exports = class`, without esModuleInterop // set to true. const RedisParser = require('redis-parser'); - export interface QueueCommandOptions { asap?: boolean; chainId?: symbol; signal?: AbortSignal; } - interface CommandWaitingToBeSent extends CommandWaitingForReply { args: RedisCommandArguments; chainId?: symbol; @@ -21,27 +18,44 @@ interface CommandWaitingToBeSent extends CommandWaitingForReply { listener(): void; }; } - interface CommandWaitingForReply { resolve(reply?: unknown): void; reject(err: Error): void; channelsCounter?: number; bufferMode?: boolean; } - export enum PubSubSubscribeCommands { SUBSCRIBE = 'SUBSCRIBE', PSUBSCRIBE = 'PSUBSCRIBE' } - export enum PubSubUnsubscribeCommands { UNSUBSCRIBE = 'UNSUBSCRIBE', PUNSUBSCRIBE = 'PUNSUBSCRIBE' } -export type PubSubListener = (message: string, channel: string) => unknown; +type PubSubArgumentTypes = Buffer | string; -export type PubSubListenersMap = Map>; +export type PubSubListener< + BUFFER_MODE extends boolean = false, + T = BUFFER_MODE extends true ? Buffer : string +> = (message: T, channel: T) => unknown; + +interface PubSubListeners { + buffers: Set>; + strings: Set>; +} + +type PubSubListenersMap = Map; + +interface PubSubState { + subscribing: number; + subscribed: number; + unsubscribing: number; + listeners: { + channels: PubSubListenersMap; + patterns: PubSubListenersMap; + }; +} export default class RedisCommandsQueue { static #flushQueue(queue: LinkedList, err: Error): void { @@ -50,53 +64,64 @@ export default class RedisCommandsQueue { } } - static #emitPubSubMessage(listeners: Set, message: string, channel: string): void { - for (const listener of listeners) { + static #emitPubSubMessage(listenersMap: PubSubListenersMap, message: Buffer, channel: Buffer, pattern?: Buffer): void { + const keyString = (pattern || channel).toString(), + listeners = listenersMap.get(keyString)!; + for (const listener of listeners.buffers) { listener(message, channel); } + + if (!listeners.strings.size) return; + + const messageString = message.toString(), + channelString = pattern ? channel.toString() : keyString; + for (const listener of listeners.strings) { + listener(messageString, channelString); + } } readonly #maxLength: number | null | undefined; - readonly #waitingToBeSent = new LinkedList(); readonly #waitingForReply = new LinkedList(); - readonly #pubSubState = { - subscribing: 0, - subscribed: 0, - unsubscribing: 0 - }; + #pubSubState: PubSubState | undefined; - readonly #pubSubListeners = { - channels: new Map(), - patterns: new Map() + static readonly #PUB_SUB_MESSAGES = { + message: Buffer.from('message'), + pMessage: Buffer.from('pmessage'), + subscribe: Buffer.from('subscribe'), + pSubscribe: Buffer.from('psubscribe'), + unsubscribe: Buffer.from('unsunscribe'), + pUnsubscribe: Buffer.from('punsubscribe') }; readonly #parser = new RedisParser({ returnReply: (reply: unknown) => { - if ((this.#pubSubState.subscribing || this.#pubSubState.subscribed) && Array.isArray(reply)) { - switch (reply[0]) { - case 'message': - return RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubListeners.channels.get(reply[1])!, - reply[2], - reply[1] - ); - - case 'pmessage': - return RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubListeners.patterns.get(reply[1])!, - reply[3], - reply[2] - ); - - case 'subscribe': - case 'psubscribe': - if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { - this.#shiftWaitingForReply().resolve(); - } - return; + if (this.#pubSubState && Array.isArray(reply)) { + if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) { + return RedisCommandsQueue.#emitPubSubMessage( + this.#pubSubState.listeners.channels, + reply[2], + reply[1] + ); + } else if (RedisCommandsQueue.#PUB_SUB_MESSAGES.pMessage.equals(reply[0])) { + return RedisCommandsQueue.#emitPubSubMessage( + this.#pubSubState.listeners.patterns, + reply[3], + reply[2], + reply[1] + ); + } else if ( + RedisCommandsQueue.#PUB_SUB_MESSAGES.subscribe.equals(reply[0]) || + RedisCommandsQueue.#PUB_SUB_MESSAGES.pSubscribe.equals(reply[0]) || + RedisCommandsQueue.#PUB_SUB_MESSAGES.unsubscribe.equals(reply[0]) || + RedisCommandsQueue.#PUB_SUB_MESSAGES.pUnsubscribe.equals(reply[0]) + ) { + if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { + this.#shiftWaitingForReply().resolve(); + } + return; } } @@ -104,29 +129,26 @@ export default class RedisCommandsQueue { }, returnError: (err: Error) => this.#shiftWaitingForReply().reject(err) }); - #chainInExecution: symbol | undefined; - constructor(maxLength: number | null | undefined) { this.#maxLength = maxLength; } addCommand(args: RedisCommandArguments, options?: QueueCommandOptions, bufferMode?: boolean): Promise { - if (this.#pubSubState.subscribing || this.#pubSubState.subscribed) { + if (this.#pubSubState) { return Promise.reject(new Error('Cannot send commands in PubSub mode')); } else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) { return Promise.reject(new Error('The queue is full')); } else if (options?.signal?.aborted) { return Promise.reject(new AbortError()); } - return new Promise((resolve, reject) => { const node = new LinkedList.Node({ args, chainId: options?.chainId, bufferMode, resolve, - reject, + reject }); if (options?.signal) { @@ -134,7 +156,6 @@ export default class RedisCommandsQueue { this.#waitingToBeSent.removeNode(node); node.value.reject(new AbortError()); }; - node.value.abort = { signal: options.signal, listener @@ -144,7 +165,6 @@ export default class RedisCommandsQueue { once: true }); } - if (options?.asap) { this.#waitingToBeSent.unshiftNode(node); } else { @@ -153,28 +173,63 @@ export default class RedisCommandsQueue { }); } - subscribe(command: PubSubSubscribeCommands, channels: string | Array, listener: PubSubListener): Promise { - const channelsToSubscribe: Array = [], - listeners = command === PubSubSubscribeCommands.SUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns; + #initiatePubSubState(): PubSubState { + return this.#pubSubState ??= { + subscribed: 0, + subscribing: 0, + unsubscribing: 0, + listeners: { + channels: new Map(), + patterns: new Map() + } + }; + } + + subscribe( + command: PubSubSubscribeCommands, + channels: PubSubArgumentTypes | Array, + listener: PubSubListener, + bufferMode?: T + ): Promise { + const pubSubState = this.#initiatePubSubState(), + channelsToSubscribe: Array = [], + listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? pubSubState.listeners.channels : pubSubState.listeners.patterns; for (const channel of (Array.isArray(channels) ? channels : [channels])) { - if (listeners.has(channel)) { - listeners.get(channel)!.add(listener); - continue; + const channelString = typeof channel === 'string' ? channel : channel.toString(); + let listeners = listenersMap.get(channelString); + if (!listeners) { + listeners = { + buffers: new Set(), + strings: new Set() + }; + listenersMap.set(channelString, listeners); + channelsToSubscribe.push(channel); } - listeners.set(channel, new Set([listener])); - channelsToSubscribe.push(channel); + // https://github.com/microsoft/TypeScript/issues/23132 + (bufferMode ? listeners.buffers : listeners.strings).add(listener as any); } if (!channelsToSubscribe.length) { return Promise.resolve(); } - return this.#pushPubSubCommand(command, channelsToSubscribe); } - unsubscribe(command: PubSubUnsubscribeCommands, channels?: string | Array, listener?: PubSubListener): Promise { - const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? this.#pubSubListeners.channels : this.#pubSubListeners.patterns; + unsubscribe( + command: PubSubUnsubscribeCommands, + channels?: string | Array, + listener?: PubSubListener, + bufferMode?: T + ): Promise { + if (!this.#pubSubState) { + return Promise.resolve(); + } + + const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? + this.#pubSubState.listeners.channels : + this.#pubSubState.listeners.patterns; + if (!channels) { const size = listeners.size; listeners.clear(); @@ -183,13 +238,16 @@ export default class RedisCommandsQueue { const channelsToUnsubscribe = []; for (const channel of (Array.isArray(channels) ? channels : [channels])) { - const set = listeners.get(channel); - if (!set) continue; + const sets = listeners.get(channel); + if (!sets) continue; - let shouldUnsubscribe = !listener; + let shouldUnsubscribe; if (listener) { - set.delete(listener); - shouldUnsubscribe = set.size === 0; + // https://github.com/microsoft/TypeScript/issues/23132 + (bufferMode ? sets.buffers : sets.strings).delete(listener as any); + shouldUnsubscribe = !sets.buffers.size && !sets.strings.size; + } else { + shouldUnsubscribe = true; } if (shouldUnsubscribe) { @@ -197,19 +255,18 @@ export default class RedisCommandsQueue { listeners.delete(channel); } } - if (!channelsToUnsubscribe.length) { return Promise.resolve(); } - return this.#pushPubSubCommand(command, channelsToUnsubscribe); } - #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array): Promise { + #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array): Promise { return new Promise((resolve, reject) => { - const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, + const pubSubState = this.#initiatePubSubState(), + isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing', - commandArgs: Array = [command]; + commandArgs: Array = [command]; let channelsCounter: number; if (typeof channels === 'number') { // unsubscribe only @@ -219,18 +276,26 @@ export default class RedisCommandsQueue { channelsCounter = channels.length; } - this.#pubSubState[inProgressKey] += channelsCounter; + pubSubState[inProgressKey] += channelsCounter; this.#waitingToBeSent.push({ args: commandArgs, channelsCounter, + bufferMode: true, resolve: () => { - this.#pubSubState[inProgressKey] -= channelsCounter; - this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1); + pubSubState[inProgressKey] -= channelsCounter; + if (isSubscribe) { + pubSubState.subscribed += channelsCounter; + } else { + pubSubState.subscribed -= channelsCounter; + if (!pubSubState.subscribed && !pubSubState.subscribing && !pubSubState.subscribed) { + this.#pubSubState = undefined; + } + } resolve(); }, reject: () => { - this.#pubSubState[inProgressKey] -= channelsCounter; + pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1); reject(); } }); @@ -238,22 +303,19 @@ export default class RedisCommandsQueue { } resubscribe(): Promise | undefined { - if (!this.#pubSubState.subscribed && !this.#pubSubState.subscribing) { + if (!this.#pubSubState) { return; } - this.#pubSubState.subscribed = this.#pubSubState.subscribing = 0; - // TODO: acl error on one channel/pattern will reject the whole command return Promise.all([ - this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubListeners.channels.keys()]), - this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubListeners.patterns.keys()]) + this.#pushPubSubCommand(PubSubSubscribeCommands.SUBSCRIBE, [...this.#pubSubState.listeners.channels.keys()]), + this.#pushPubSubCommand(PubSubSubscribeCommands.PSUBSCRIBE, [...this.#pubSubState.listeners.patterns.keys()]) ]); } getCommandToSend(): RedisCommandArguments | undefined { const toSend = this.#waitingToBeSent.shift(); - if (toSend) { this.#waitingForReply.push({ resolve: toSend.resolve, @@ -262,14 +324,15 @@ export default class RedisCommandsQueue { bufferMode: toSend.bufferMode }); } - this.#chainInExecution = toSend?.chainId; - return toSend?.args; } parseResponse(data: Buffer): void { - this.#parser.setReturnBuffers(!!this.#waitingForReply.head?.value.bufferMode); + this.#parser.setReturnBuffers( + !!this.#waitingForReply.head?.value.bufferMode || + !!this.#pubSubState?.subscribed + ); this.#parser.execute(data); } @@ -277,24 +340,18 @@ export default class RedisCommandsQueue { if (!this.#waitingForReply.length) { throw new Error('Got an unexpected reply from Redis'); } - return this.#waitingForReply.shift()!; } - flushWaitingForReply(err: Error): void { RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); - if (!this.#chainInExecution) { return; } - while (this.#waitingToBeSent.head?.value.chainId === this.#chainInExecution) { this.#waitingToBeSent.shift(); } - this.#chainInExecution = undefined; } - flushAll(err: Error): void { RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); RedisCommandsQueue.#flushQueue(this.#waitingToBeSent, err); diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 3f0bca45e2..679c7ae692 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -561,63 +561,66 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); testUtils.testWithClient('PubSub', async publisher => { + function assertStringListener(message: string, channel: string) { + assert.ok(typeof message === 'string'); + assert.ok(typeof channel === 'string'); + } + + function assertBufferListener(message: Buffer, channel: Buffer) { + assert.ok(Buffer.isBuffer(message)); + assert.ok(Buffer.isBuffer(channel)); + } + const subscriber = publisher.duplicate(); await subscriber.connect(); try { - const channelListener1 = spy(), - channelListener2 = spy(), - patternListener = spy(); + const channelListener1 = spy(assertBufferListener), + channelListener2 = spy(assertStringListener), + patternListener = spy(assertStringListener); await Promise.all([ - subscriber.subscribe('channel', channelListener1), + subscriber.subscribe('channel', channelListener1, true), subscriber.subscribe('channel', channelListener2), subscriber.pSubscribe('channel*', patternListener) ]); - await Promise.all([ waitTillBeenCalled(channelListener1), waitTillBeenCalled(channelListener2), waitTillBeenCalled(patternListener), - publisher.publish('channel', 'message') + publisher.publish(Buffer.from('channel'), Buffer.from('message')) ]); - assert.ok(channelListener1.calledOnceWithExactly('message', 'channel')); + assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel'))); assert.ok(channelListener2.calledOnceWithExactly('message', 'channel')); assert.ok(patternListener.calledOnceWithExactly('message', 'channel')); - await subscriber.unsubscribe('channel', channelListener1); + await subscriber.unsubscribe('channel', channelListener1, true); await Promise.all([ waitTillBeenCalled(channelListener2), waitTillBeenCalled(patternListener), publisher.publish('channel', 'message') ]); - assert.ok(channelListener1.calledOnce); assert.ok(channelListener2.calledTwice); assert.ok(channelListener2.secondCall.calledWithExactly('message', 'channel')); assert.ok(patternListener.calledTwice); assert.ok(patternListener.secondCall.calledWithExactly('message', 'channel')); - await subscriber.unsubscribe('channel'); await Promise.all([ waitTillBeenCalled(patternListener), publisher.publish('channel', 'message') ]); - assert.ok(channelListener1.calledOnce); assert.ok(channelListener2.calledTwice); assert.ok(patternListener.calledThrice); assert.ok(patternListener.thirdCall.calledWithExactly('message', 'channel')); - await subscriber.pUnsubscribe(); await publisher.publish('channel', 'message'); - assert.ok(channelListener1.calledOnce); assert.ok(channelListener2.calledTwice); assert.ok(patternListener.calledThrice); - // should be able to send commands when unsubsribed from all channels (see #1652) await assert.doesNotReject(subscriber.ping()); } finally { diff --git a/packages/client/lib/commands/LINDEX.spec.ts b/packages/client/lib/commands/LINDEX.spec.ts index 5e0b1473ec..aa3aafa789 100644 --- a/packages/client/lib/commands/LINDEX.spec.ts +++ b/packages/client/lib/commands/LINDEX.spec.ts @@ -1,26 +1,36 @@ import { strict as assert } from 'assert'; import testUtils, { GLOBAL } from '../test-utils'; import { transformArguments } from './LINDEX'; - describe('LINDEX', () => { it('transformArguments', () => { assert.deepEqual( - transformArguments('key', 'element'), - ['LINDEX', 'key', 'element'] + transformArguments('key', 0), + ['LINDEX', 'key', '0'] ); }); - testUtils.testWithClient('client.lIndex', async client => { - assert.equal( - await client.lIndex('key', 'element'), - null - ); - }, GLOBAL.SERVERS.OPEN); + describe('client.lIndex', () => { + testUtils.testWithClient('null', async client => { + assert.equal( + await client.lIndex('key', 0), + null + ); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('with value', async client => { + const [, lIndexReply] = await Promise.all([ + client.lPush('key', 'element'), + client.lIndex('key', 0) + ]); + + assert.equal(lIndexReply, 'element'); + }, GLOBAL.SERVERS.OPEN); + }); testUtils.testWithCluster('cluster.lIndex', async cluster => { assert.equal( - await cluster.lIndex('key', 'element'), + await cluster.lIndex('key', 0), null ); }, GLOBAL.CLUSTERS.OPEN); -}); +}); \ No newline at end of file diff --git a/packages/client/lib/commands/LINDEX.ts b/packages/client/lib/commands/LINDEX.ts index 4c283f0912..d13bc0c2d0 100644 --- a/packages/client/lib/commands/LINDEX.ts +++ b/packages/client/lib/commands/LINDEX.ts @@ -1,9 +1,8 @@ -export const FIRST_KEY_INDEX = 1; export const IS_READ_ONLY = true; -export function transformArguments(key: string, element: string): Array { - return ['LINDEX', key, element]; +export function transformArguments(key: string, index: number): Array { + return ['LINDEX', key, index.toString()]; } -export declare function transformReply(): string | null; +export declare function transformReply(): string | null; \ No newline at end of file diff --git a/packages/client/lib/commands/PUBLISH.ts b/packages/client/lib/commands/PUBLISH.ts index eda5234df2..cbfcaabd1c 100644 --- a/packages/client/lib/commands/PUBLISH.ts +++ b/packages/client/lib/commands/PUBLISH.ts @@ -1,4 +1,6 @@ -export function transformArguments(channel: string, message: string): Array { +import { RedisCommandArguments } from '.'; + +export function transformArguments(channel: string | Buffer, message: string | Buffer): RedisCommandArguments { return ['PUBLISH', channel, message]; }