1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-07 13:22:56 +03:00
This commit is contained in:
Leibale
2023-06-29 15:22:57 -04:00
parent c109fbf751
commit de7e2b85c4
3 changed files with 36 additions and 28 deletions

View File

@@ -16,7 +16,7 @@ export interface CommandOptions<T = TypeMapping> {
typeMapping?: T; typeMapping?: T;
} }
export interface CommandWaitingToBeSent extends CommandWaitingForReply { export interface CommandToWrite extends CommandWaitingForReply {
args: CommandArguments; args: CommandArguments;
chainId?: symbol; chainId?: symbol;
abort?: { abort?: {
@@ -43,7 +43,7 @@ const RESP2_PUSH_TYPE_MAPPING = {
export default class RedisCommandsQueue { export default class RedisCommandsQueue {
private readonly _maxLength: number | null | undefined; private readonly _maxLength: number | null | undefined;
private readonly _waitingToBeSent = new DoublyLinkedList<CommandWaitingToBeSent>(); private readonly _toWrite = new DoublyLinkedList<CommandToWrite>();
private readonly _waitingForReply = new SinglyLinkedList<CommandWaitingForReply>(); private readonly _waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
private readonly _onShardedChannelMoved: OnShardedChannelMoved; private readonly _onShardedChannelMoved: OnShardedChannelMoved;
@@ -153,15 +153,15 @@ export default class RedisCommandsQueue {
} }
addCommand<T>(args: CommandArguments, options?: CommandOptions): Promise<T> { addCommand<T>(args: CommandArguments, options?: CommandOptions): Promise<T> {
if (this._maxLength && this._waitingToBeSent.length + this._waitingForReply.length >= this._maxLength) { if (this._maxLength && this._toWrite.length + this._waitingForReply.length >= this._maxLength) {
return Promise.reject(new Error('The queue is full')); return Promise.reject(new Error('The queue is full'));
} else if (options?.abortSignal?.aborted) { } else if (options?.abortSignal?.aborted) {
return Promise.reject(new AbortError()); return Promise.reject(new AbortError());
} }
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let node: DoublyLinkedNode<CommandWaitingToBeSent>; let node: DoublyLinkedNode<CommandToWrite>;
const value: CommandWaitingToBeSent = { const value: CommandToWrite = {
args, args,
chainId: options?.chainId, chainId: options?.chainId,
typeMapping: options?.typeMapping, typeMapping: options?.typeMapping,
@@ -175,7 +175,7 @@ export default class RedisCommandsQueue {
value.abort = { value.abort = {
signal, signal,
listener: () => { listener: () => {
this._waitingToBeSent.remove(node); this._toWrite.remove(node);
value.reject(new AbortError()); value.reject(new AbortError());
} }
}; };
@@ -183,8 +183,8 @@ export default class RedisCommandsQueue {
} }
node = options?.asap ? node = options?.asap ?
this._waitingToBeSent.unshift(value) : this._toWrite.unshift(value) :
this._waitingToBeSent.push(value); this._toWrite.push(value);
}); });
} }
@@ -243,7 +243,7 @@ export default class RedisCommandsQueue {
if (command === undefined) return; if (command === undefined) return;
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
this._waitingToBeSent.push({ this._toWrite.push({
args: command.args, args: command.args,
channelsCounter: command.channelsCounter, channelsCounter: command.channelsCounter,
typeMapping: PUSH_TYPE_MAPPING, typeMapping: PUSH_TYPE_MAPPING,
@@ -259,15 +259,19 @@ export default class RedisCommandsQueue {
}); });
} }
*waitingToBeSent() { isWaitingToWrite() {
let toSend = this._waitingToBeSent.shift(); return this._toWrite.length > 0;
}
*commandsToWrite() {
let toSend = this._toWrite.shift();
while (toSend) { while (toSend) {
let encoded: CommandArguments; let encoded: CommandArguments;
try { try {
encoded = encodeCommand(toSend.args); encoded = encodeCommand(toSend.args);
} catch (err) { } catch (err) {
toSend.reject(err); toSend.reject(err);
toSend = this._waitingToBeSent.shift(); toSend = this._toWrite.shift();
continue; continue;
} }
@@ -283,7 +287,7 @@ export default class RedisCommandsQueue {
this._waitingForReply.push(toSend); this._waitingForReply.push(toSend);
this._chainInExecution = toSend.chainId; this._chainInExecution = toSend.chainId;
yield encoded; yield encoded;
toSend = this._waitingToBeSent.shift(); toSend = this._toWrite.shift();
} }
} }
@@ -294,11 +298,11 @@ export default class RedisCommandsQueue {
this._waitingForReply.reset(); this._waitingForReply.reset();
} }
private static _removeAbortListener(command: CommandWaitingToBeSent) { private static _removeAbortListener(command: CommandToWrite) {
command.abort!.signal.removeEventListener('abort', command.abort!.listener); command.abort!.signal.removeEventListener('abort', command.abort!.listener);
} }
private static _flushWaitingToBeSent(toBeSent: CommandWaitingToBeSent, err: Error) { private static _flushToWrite(toBeSent: CommandToWrite, err: Error) {
if (toBeSent.abort) { if (toBeSent.abort) {
RedisCommandsQueue._removeAbortListener(toBeSent); RedisCommandsQueue._removeAbortListener(toBeSent);
} }
@@ -314,9 +318,9 @@ export default class RedisCommandsQueue {
if (!this._chainInExecution) return; if (!this._chainInExecution) return;
while (this._waitingToBeSent.head?.value.chainId === this._chainInExecution) { while (this._toWrite.head?.value.chainId === this._chainInExecution) {
RedisCommandsQueue._flushWaitingToBeSent( RedisCommandsQueue._flushToWrite(
this._waitingToBeSent.shift()!, this._toWrite.shift()!,
err err
); );
} }
@@ -328,15 +332,15 @@ export default class RedisCommandsQueue {
this.decoder.reset(); this.decoder.reset();
this._pubSub.reset(); this._pubSub.reset();
this._flushWaitingForReply(err); this._flushWaitingForReply(err);
for (const node of this._waitingToBeSent) { for (const node of this._toWrite) {
RedisCommandsQueue._flushWaitingToBeSent(node, err); RedisCommandsQueue._flushToWrite(node, err);
} }
this._waitingToBeSent.reset(); this._toWrite.reset();
} }
isEmpty() { isEmpty() {
return ( return (
this._waitingToBeSent.length === 0 && this._toWrite.length === 0 &&
this._waitingForReply.length === 0 this._waitingForReply.length === 0
); );
} }

View File

@@ -440,10 +440,10 @@ export default class RedisClient<
.on('ready', () => { .on('ready', () => {
this.emit('ready'); this.emit('ready');
this._setPingTimer(); this._setPingTimer();
this._scheduleWrite(); this._maybeScheduleWrite();
}) })
.on('reconnecting', () => this.emit('reconnecting')) .on('reconnecting', () => this.emit('reconnecting'))
.on('drain', () => this._scheduleWrite()) .on('drain', () => this._maybeScheduleWrite())
.on('end', () => this.emit('end')); .on('end', () => this.emit('end'));
} }
@@ -706,7 +706,7 @@ export default class RedisClient<
} }
private _write() { private _write() {
this._socket.write(this._queue.waitingToBeSent()); this._socket.write(this._queue.commandsToWrite());
} }
private _scheduledWrite?: NodeJS.Immediate; private _scheduledWrite?: NodeJS.Immediate;
@@ -720,6 +720,12 @@ export default class RedisClient<
}); });
} }
private _maybeScheduleWrite() {
if (!this._queue.isWaitingToWrite()) return;
this._scheduleWrite();
}
/** /**
* @internal * @internal
*/ */

View File

@@ -229,9 +229,7 @@ export default class RedisSocket extends EventEmitter {
} }
write(iterator: IterableIterator<Array<RedisArgument>>): void { write(iterator: IterableIterator<Array<RedisArgument>>): void {
if (!this._socket) { if (!this._socket) return;
throw new ClientClosedError();
}
this._socket.cork(); this._socket.cork();
for (const args of iterator) { for (const args of iterator) {