You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-04 15:02:09 +03:00
chore: improve multi performance by refactoring a array check away
This commit is contained in:
3
index.js
3
index.js
@@ -119,6 +119,7 @@ function RedisClient (options, stream) {
|
|||||||
this.timesConnected = 0
|
this.timesConnected = 0
|
||||||
this.buffers = options.returnBuffers || options.detectBuffers
|
this.buffers = options.returnBuffers || options.detectBuffers
|
||||||
this.options = options
|
this.options = options
|
||||||
|
this._multi = false
|
||||||
this.reply = 'ON' // Returning replies is the default
|
this.reply = 'ON' // Returning replies is the default
|
||||||
this.retryStrategy = options.retryStrategy || function (options) {
|
this.retryStrategy = options.retryStrategy || function (options) {
|
||||||
if (options.attempt > 100) {
|
if (options.attempt > 100) {
|
||||||
@@ -619,7 +620,7 @@ RedisClient.prototype.returnError = function (err) {
|
|||||||
|
|
||||||
function normalReply (self, reply) {
|
function normalReply (self, reply) {
|
||||||
const commandObj = self.commandQueue.shift()
|
const commandObj = self.commandQueue.shift()
|
||||||
if (commandObj.command !== 'exec') {
|
if (self._multi === false) {
|
||||||
reply = self.handleReply(reply, commandObj.command, commandObj.bufferArgs)
|
reply = self.handleReply(reply, commandObj.command, commandObj.bufferArgs)
|
||||||
}
|
}
|
||||||
commandObj.callback(null, reply)
|
commandObj.callback(null, reply)
|
||||||
|
37
lib/multi.js
37
lib/multi.js
@@ -59,7 +59,7 @@ function multiCallback (multi, replies) {
|
|||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
multi._client._multi = false
|
||||||
return replies
|
return replies
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -70,33 +70,36 @@ function multiCallback (multi, replies) {
|
|||||||
* @returns Promise<any[]>
|
* @returns Promise<any[]>
|
||||||
*/
|
*/
|
||||||
function execTransaction (multi) {
|
function execTransaction (multi) {
|
||||||
if (multi.monitoring || multi._client.monitoring) {
|
const client = multi._client
|
||||||
|
const queue = multi._queue
|
||||||
|
if (multi.monitoring || client.monitoring) {
|
||||||
const err = new RangeError(
|
const err = new RangeError(
|
||||||
'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.'
|
'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.'
|
||||||
)
|
)
|
||||||
err.command = 'EXEC'
|
err.command = 'EXEC'
|
||||||
err.code = 'EXECABORT'
|
err.code = 'EXECABORT'
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
utils.replyInOrder(multi._client, (err, res) => {
|
utils.replyInOrder(client, (err, res) => {
|
||||||
if (err) return reject(err)
|
if (err) return reject(err)
|
||||||
resolve(res)
|
resolve(res)
|
||||||
}, null, [])
|
}, null, [])
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
const len = multi._queue.length
|
const len = queue.length
|
||||||
multi.errors = []
|
multi.errors = []
|
||||||
multi._client.cork()
|
client.cork()
|
||||||
|
client._multi = true
|
||||||
multi.wantsBuffers = new Array(len)
|
multi.wantsBuffers = new Array(len)
|
||||||
// Silently ignore this error. We'll receive the error for the exec as well
|
// Silently ignore this error. We'll receive the error for the exec as well
|
||||||
const promises = [multi._client.internalSendCommand(new Command('multi', [])).catch(() => {})]
|
const promises = [client.internalSendCommand(new Command('multi', [])).catch(() => {})]
|
||||||
// Drain queue, callback will catch 'QUEUED' or error
|
// Drain queue, callback will catch 'QUEUED' or error
|
||||||
for (var index = 0; index < len; index++) {
|
for (var index = 0; index < len; index++) {
|
||||||
// The commands may not be shifted off, since they are needed in the result handler
|
// The commands may not be shifted off, since they are needed in the result handler
|
||||||
promises.push(pipelineTransactionCommand(multi, multi._queue.get(index), index).catch((e) => e))
|
promises.push(pipelineTransactionCommand(multi, queue.get(index), index).catch((e) => e))
|
||||||
}
|
}
|
||||||
|
|
||||||
const main = multi._client.internalSendCommand(new Command('exec', []))
|
const main = client.internalSendCommand(new Command('exec', []))
|
||||||
multi._client.uncork()
|
client.uncork()
|
||||||
return Promise.all(promises).then(() => main.then((replies) => multiCallback(multi, replies)).catch((err) => {
|
return Promise.all(promises).then(() => main.then((replies) => multiCallback(multi, replies)).catch((err) => {
|
||||||
err.errors = multi.errors
|
err.errors = multi.errors
|
||||||
return Promise.reject(err)
|
return Promise.reject(err)
|
||||||
@@ -110,25 +113,27 @@ function execTransaction (multi) {
|
|||||||
* @returns Promise<any[]>
|
* @returns Promise<any[]>
|
||||||
*/
|
*/
|
||||||
function execBatch (multi) {
|
function execBatch (multi) {
|
||||||
if (multi._queue.length === 0) {
|
const client = multi._client
|
||||||
|
const queue = multi._queue
|
||||||
|
if (queue.length === 0) {
|
||||||
// TODO: return an error if not "ready"
|
// TODO: return an error if not "ready"
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve) => {
|
||||||
utils.replyInOrder(multi._client, (e, res) => {
|
utils.replyInOrder(client, (e, res) => {
|
||||||
resolve(res)
|
resolve(res)
|
||||||
}, null, [])
|
}, null, [])
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
var error = false
|
var error = false
|
||||||
multi._client.cork()
|
client.cork()
|
||||||
const promises = []
|
const promises = []
|
||||||
while (multi._queue.length) {
|
while (queue.length) {
|
||||||
const commandObj = multi._queue.shift()
|
const command = queue.shift()
|
||||||
promises.push(multi._client.internalSendCommand(commandObj).catch((e) => {
|
promises.push(client.internalSendCommand(command).catch((e) => {
|
||||||
error = true
|
error = true
|
||||||
return e
|
return e
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
multi._client.uncork()
|
client.uncork()
|
||||||
return Promise.all(promises).then((res) => {
|
return Promise.all(promises).then((res) => {
|
||||||
if (error) {
|
if (error) {
|
||||||
const err = new Error('bla failed')
|
const err = new Error('bla failed')
|
||||||
|
11
lib/utils.js
11
lib/utils.js
@@ -3,18 +3,11 @@
|
|||||||
/**
|
/**
|
||||||
* @description Convert an array to an object
|
* @description Convert an array to an object
|
||||||
*
|
*
|
||||||
* hgetall converts its replies to an object. If the reply is empty, null is returned.
|
* @param {any[]} reply
|
||||||
* The reply might be a string or a buffer if this is called in a transaction (multi)
|
|
||||||
* because of the queuing response.
|
|
||||||
*
|
|
||||||
* reply.length can be undefined but `undefined > 0` === false.
|
|
||||||
*
|
|
||||||
* @param {any} reply
|
|
||||||
* @returns object
|
* @returns object
|
||||||
*/
|
*/
|
||||||
function replyToObject (reply) {
|
function replyToObject (reply) {
|
||||||
// The reply might be a string or a buffer if this is called in a transaction (multi)
|
if (reply.length === 0) {
|
||||||
if (reply.length === 0 || !(reply instanceof Array)) {
|
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
const obj = {}
|
const obj = {}
|
||||||
|
Reference in New Issue
Block a user