You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-09 00:22:08 +03:00
ref #2498 - replace yallist with custom made linked list
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import * as LinkedList from 'yallist';
|
||||
import Queue, { QueueNode } from './queue';
|
||||
import encodeCommand from '../RESP/encoder';
|
||||
import { Decoder, PUSH_FLAGS, RESP_TYPES } from '../RESP/decoder';
|
||||
import { CommandArguments, Flags, ReplyUnion, RespVersions } from '../RESP/types';
|
||||
@@ -37,8 +37,8 @@ const RESP2_PUSH_FLAGS = {
|
||||
|
||||
export default class RedisCommandsQueue {
|
||||
private readonly _maxLength: number | null | undefined;
|
||||
private readonly _waitingToBeSent = new LinkedList<CommandWaitingToBeSent>();
|
||||
private readonly _waitingForReply = new LinkedList<CommandWaitingForReply>();
|
||||
private readonly _waitingToBeSent = new Queue<CommandWaitingToBeSent>();
|
||||
private readonly _waitingForReply = new Queue<CommandWaitingForReply>();
|
||||
private readonly _onShardedChannelMoved: OnShardedChannelMoved;
|
||||
|
||||
private readonly _pubSub = new PubSub();
|
||||
@@ -154,30 +154,30 @@ export default class RedisCommandsQueue {
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const node = new LinkedList.Node<CommandWaitingToBeSent>({
|
||||
let node: QueueNode<CommandWaitingToBeSent>;
|
||||
const value: CommandWaitingToBeSent = {
|
||||
args,
|
||||
chainId: options?.chainId,
|
||||
flags: options?.flags,
|
||||
resolve,
|
||||
reject
|
||||
});
|
||||
reject,
|
||||
removeAbortListener: undefined
|
||||
};
|
||||
|
||||
if (options?.signal) {
|
||||
const signal = options?.signal;
|
||||
if (signal) {
|
||||
const listener = () => {
|
||||
this._waitingToBeSent.removeNode(node);
|
||||
node.value.reject(new AbortError());
|
||||
this._waitingToBeSent.remove(node);
|
||||
value.reject(new AbortError());
|
||||
};
|
||||
|
||||
node.value.removeAbortListener = () => options.signal?.removeEventListener('abort', listener);
|
||||
|
||||
options.signal.addEventListener('abort', listener, { once: true });
|
||||
value.removeAbortListener = () => signal.removeEventListener('abort', listener);
|
||||
signal.addEventListener('abort', listener, { once: true });
|
||||
}
|
||||
|
||||
if (options?.asap) {
|
||||
this._waitingToBeSent.unshiftNode(node);
|
||||
} else {
|
||||
this._waitingToBeSent.pushNode(node);
|
||||
}
|
||||
node = options?.asap ?
|
||||
this._waitingToBeSent.unshift(value) :
|
||||
this._waitingToBeSent.push(value);
|
||||
});
|
||||
}
|
||||
|
||||
|
77
packages/client/lib/client/queue.spec.ts
Normal file
77
packages/client/lib/client/queue.spec.ts
Normal file
@@ -0,0 +1,77 @@
|
||||
import Queue from './queue';
|
||||
import { equal, deepEqual } from 'assert/strict';
|
||||
|
||||
describe.only('Queue', () => {
|
||||
const queue = new Queue();
|
||||
|
||||
it('should start empty', () => {
|
||||
equal(queue.length, 0);
|
||||
deepEqual(Array.from(queue), []);
|
||||
});
|
||||
|
||||
it('shift empty', () => {
|
||||
equal(queue.shift(), null);
|
||||
equal(queue.length, 0);
|
||||
deepEqual(Array.from(queue), []);
|
||||
});
|
||||
|
||||
it('push 1', () => {
|
||||
queue.push(1);
|
||||
equal(queue.length, 1);
|
||||
deepEqual(Array.from(queue), [1]);
|
||||
});
|
||||
|
||||
it('push 2', () => {
|
||||
queue.push(2);
|
||||
equal(queue.length, 2);
|
||||
deepEqual(Array.from(queue), [1, 2]);
|
||||
});
|
||||
|
||||
it('unshift 0', () => {
|
||||
queue.unshift(0);
|
||||
equal(queue.length, 3);
|
||||
deepEqual(Array.from(queue), [0, 1, 2]);
|
||||
});
|
||||
|
||||
it('remove middle node', () => {
|
||||
queue.remove(queue.head.next!);
|
||||
equal(queue.length, 2);
|
||||
deepEqual(Array.from(queue), [0, 2]);
|
||||
});
|
||||
|
||||
it('remove head', () => {
|
||||
queue.remove(queue.head);
|
||||
equal(queue.length, 1);
|
||||
deepEqual(Array.from(queue), [2]);
|
||||
});
|
||||
|
||||
it('remove tail', () => {
|
||||
queue.remove(queue.tail);
|
||||
equal(queue.length, 0);
|
||||
deepEqual(Array.from(queue), []);
|
||||
});
|
||||
|
||||
it('unshift empty queue', () => {
|
||||
queue.unshift(0);
|
||||
equal(queue.length, 1);
|
||||
deepEqual(Array.from(queue), [0]);
|
||||
});
|
||||
|
||||
it('push 1', () => {
|
||||
queue.push(1);
|
||||
equal(queue.length, 2);
|
||||
deepEqual(Array.from(queue), [0, 1]);
|
||||
});
|
||||
|
||||
it('shift', () => {
|
||||
equal(queue.shift(), 0);
|
||||
equal(queue.length, 1);
|
||||
deepEqual(Array.from(queue), [1]);
|
||||
});
|
||||
|
||||
it('shift last element', () => {
|
||||
equal(queue.shift(), 1);
|
||||
equal(queue.length, 0);
|
||||
deepEqual(Array.from(queue), []);
|
||||
});
|
||||
});
|
101
packages/client/lib/client/queue.ts
Normal file
101
packages/client/lib/client/queue.ts
Normal file
@@ -0,0 +1,101 @@
|
||||
export interface QueueNode<T> {
|
||||
value: T;
|
||||
previous: QueueNode<T> | null;
|
||||
next: QueueNode<T> | null;
|
||||
}
|
||||
|
||||
export default class Queue<T> {
|
||||
#length = 0;
|
||||
|
||||
get length() {
|
||||
return this.#length;
|
||||
}
|
||||
|
||||
#head: QueueNode<T> | null = null;
|
||||
|
||||
get head() {
|
||||
return this.#head;
|
||||
}
|
||||
|
||||
#tail: QueueNode<T> | null = null;
|
||||
|
||||
get tail() {
|
||||
return this.#tail;
|
||||
}
|
||||
|
||||
push(value: T) {
|
||||
++this.#length;
|
||||
|
||||
if (!this.#tail) {
|
||||
return this.#tail = this.#head = {
|
||||
previous: this.#head,
|
||||
next: null,
|
||||
value
|
||||
};
|
||||
}
|
||||
|
||||
return this.#tail = this.#tail.next = {
|
||||
previous: this.#tail,
|
||||
next: null,
|
||||
value
|
||||
};
|
||||
}
|
||||
|
||||
unshift(value: T) {
|
||||
++this.#length;
|
||||
|
||||
if (!this.#head) {
|
||||
return this.#head = this.#tail = {
|
||||
previous: null,
|
||||
next: null,
|
||||
value
|
||||
};
|
||||
}
|
||||
|
||||
return this.#head = this.#head.previous = {
|
||||
previous: null,
|
||||
next: this.#head,
|
||||
value
|
||||
};
|
||||
}
|
||||
|
||||
shift() {
|
||||
if (!this.#head) return null;
|
||||
|
||||
--this.#length;
|
||||
const node = this.#head;
|
||||
if (node.next) {
|
||||
node.next.previous = node.previous;
|
||||
this.#head = node.next;
|
||||
node.next = null;
|
||||
} else {
|
||||
this.#head = this.#tail = null;
|
||||
}
|
||||
return node.value;
|
||||
}
|
||||
|
||||
remove(node: QueueNode<T>) {
|
||||
--this.#length;
|
||||
|
||||
if (this.#tail === node) {
|
||||
this.#tail = node.previous;
|
||||
}
|
||||
|
||||
if (this.#head === node) {
|
||||
this.#head = node.next;
|
||||
} else {
|
||||
node.previous!.next = node.next;
|
||||
node.previous = null;
|
||||
}
|
||||
|
||||
node.next = null;
|
||||
}
|
||||
|
||||
*[Symbol.iterator]() {
|
||||
let node = this.#head;
|
||||
while (node !== null) {
|
||||
yield node.value;
|
||||
node = node.next;
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user