diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index f8e351f6e2..5ec181d78b 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,4 +1,4 @@ -import * as LinkedList from 'yallist'; +import Queue, { QueueNode } from './queue'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_FLAGS, RESP_TYPES } from '../RESP/decoder'; import { CommandArguments, Flags, ReplyUnion, RespVersions } from '../RESP/types'; @@ -37,8 +37,8 @@ const RESP2_PUSH_FLAGS = { export default class RedisCommandsQueue { private readonly _maxLength: number | null | undefined; - private readonly _waitingToBeSent = new LinkedList(); - private readonly _waitingForReply = new LinkedList(); + private readonly _waitingToBeSent = new Queue(); + private readonly _waitingForReply = new Queue(); private readonly _onShardedChannelMoved: OnShardedChannelMoved; private readonly _pubSub = new PubSub(); @@ -154,30 +154,30 @@ export default class RedisCommandsQueue { } return new Promise((resolve, reject) => { - const node = new LinkedList.Node({ + let node: QueueNode; + const value: CommandWaitingToBeSent = { args, chainId: options?.chainId, flags: options?.flags, resolve, - reject - }); + reject, + removeAbortListener: undefined + }; - if (options?.signal) { + const signal = options?.signal; + if (signal) { const listener = () => { - this._waitingToBeSent.removeNode(node); - node.value.reject(new AbortError()); + this._waitingToBeSent.remove(node); + value.reject(new AbortError()); }; - node.value.removeAbortListener = () => options.signal?.removeEventListener('abort', listener); - - options.signal.addEventListener('abort', listener, { once: true }); + value.removeAbortListener = () => signal.removeEventListener('abort', listener); + signal.addEventListener('abort', listener, { once: true }); } - if (options?.asap) { - this._waitingToBeSent.unshiftNode(node); - } else { - this._waitingToBeSent.pushNode(node); - } + node = options?.asap ? + this._waitingToBeSent.unshift(value) : + this._waitingToBeSent.push(value); }); } diff --git a/packages/client/lib/client/queue.spec.ts b/packages/client/lib/client/queue.spec.ts new file mode 100644 index 0000000000..139dc8f890 --- /dev/null +++ b/packages/client/lib/client/queue.spec.ts @@ -0,0 +1,77 @@ +import Queue from './queue'; +import { equal, deepEqual } from 'assert/strict'; + +describe.only('Queue', () => { + const queue = new Queue(); + + it('should start empty', () => { + equal(queue.length, 0); + deepEqual(Array.from(queue), []); + }); + + it('shift empty', () => { + equal(queue.shift(), null); + equal(queue.length, 0); + deepEqual(Array.from(queue), []); + }); + + it('push 1', () => { + queue.push(1); + equal(queue.length, 1); + deepEqual(Array.from(queue), [1]); + }); + + it('push 2', () => { + queue.push(2); + equal(queue.length, 2); + deepEqual(Array.from(queue), [1, 2]); + }); + + it('unshift 0', () => { + queue.unshift(0); + equal(queue.length, 3); + deepEqual(Array.from(queue), [0, 1, 2]); + }); + + it('remove middle node', () => { + queue.remove(queue.head.next!); + equal(queue.length, 2); + deepEqual(Array.from(queue), [0, 2]); + }); + + it('remove head', () => { + queue.remove(queue.head); + equal(queue.length, 1); + deepEqual(Array.from(queue), [2]); + }); + + it('remove tail', () => { + queue.remove(queue.tail); + equal(queue.length, 0); + deepEqual(Array.from(queue), []); + }); + + it('unshift empty queue', () => { + queue.unshift(0); + equal(queue.length, 1); + deepEqual(Array.from(queue), [0]); + }); + + it('push 1', () => { + queue.push(1); + equal(queue.length, 2); + deepEqual(Array.from(queue), [0, 1]); + }); + + it('shift', () => { + equal(queue.shift(), 0); + equal(queue.length, 1); + deepEqual(Array.from(queue), [1]); + }); + + it('shift last element', () => { + equal(queue.shift(), 1); + equal(queue.length, 0); + deepEqual(Array.from(queue), []); + }); +}); diff --git a/packages/client/lib/client/queue.ts b/packages/client/lib/client/queue.ts new file mode 100644 index 0000000000..20a1fe5c9a --- /dev/null +++ b/packages/client/lib/client/queue.ts @@ -0,0 +1,101 @@ +export interface QueueNode { + value: T; + previous: QueueNode | null; + next: QueueNode | null; +} + +export default class Queue { + #length = 0; + + get length() { + return this.#length; + } + + #head: QueueNode | null = null; + + get head() { + return this.#head; + } + + #tail: QueueNode | null = null; + + get tail() { + return this.#tail; + } + + push(value: T) { + ++this.#length; + + if (!this.#tail) { + return this.#tail = this.#head = { + previous: this.#head, + next: null, + value + }; + } + + return this.#tail = this.#tail.next = { + previous: this.#tail, + next: null, + value + }; + } + + unshift(value: T) { + ++this.#length; + + if (!this.#head) { + return this.#head = this.#tail = { + previous: null, + next: null, + value + }; + } + + return this.#head = this.#head.previous = { + previous: null, + next: this.#head, + value + }; + } + + shift() { + if (!this.#head) return null; + + --this.#length; + const node = this.#head; + if (node.next) { + node.next.previous = node.previous; + this.#head = node.next; + node.next = null; + } else { + this.#head = this.#tail = null; + } + return node.value; + } + + remove(node: QueueNode) { + --this.#length; + + if (this.#tail === node) { + this.#tail = node.previous; + } + + if (this.#head === node) { + this.#head = node.next; + } else { + node.previous!.next = node.next; + node.previous = null; + } + + node.next = null; + } + + *[Symbol.iterator]() { + let node = this.#head; + while (node !== null) { + yield node.value; + node = node.next; + } + } +}