1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

chore: add callback functionality back in

This also improves the performance for multi / batch commands a lot.
The reason is that now there are only callbacks internally even if
a promise is going to be returned in the end.
This commit is contained in:
Ruben Bridgewater
2017-11-29 19:16:40 -02:00
parent 2b4ab10305
commit d7c31da598
10 changed files with 288 additions and 184 deletions

View File

@@ -2,143 +2,154 @@
const Queue = require('denque')
const Errors = require('redis-errors')
const Command = require('./command')
const { MultiCommand } = require('./command')
const utils = require('./utils')
const handleReply = utils.handleReply
/**
* @description Queues all transaction commands and checks if a queuing error
* occurred.
*
* @param {Multi} multi
* @param {Command} command
* @param {number} index Command index in the Multi queue
* @returns *
*/
function pipelineTransactionCommand(multi, command, index) {
// Queueing is done first, then the commands are executed
const tmp = command.callback
command.callback = function (err, reply) {
if (err) {
tmp(err)
err.position = index
multi._errors.push(err)
return
}
tmp(null, reply)
}
return multi._client.internalSendCommand(command)
}
/**
* @description Make sure all replies are of the correct type and call the command callback
*
* @param {Multi} multi
* @param {any[]} replies
* @returns any[]
*/
function multiCallback(multi, replies) {
if (replies) {
var i = 0
const queue = multi._queue
const client = multi._client
while (queue.length !== 0) {
const command = queue.shift()
if (replies[i] instanceof Error) {
const match = replies[i].message.match(utils.errCode)
// LUA script could return user errors that don't behave like all other errors!
if (match) {
replies[i].code = match[1]
}
replies[i].command = command.command.toUpperCase()
command.callback(replies[i])
} else {
// If we asked for strings, even in detectBuffers mode, then return strings:
replies[i] = handleReply(client, replies[i], command)
command.callback(null, replies[i])
}
i++
}
}
multi._client._multi = false
return replies
}
/**
* @description Execute a Redis transaction (multi ... exec)
*
* @param {Multi} multi
* @param {function} [callback]
* @returns Promise<any[]>
*/
function execTransaction(multi) {
function execTransaction(multi, callback) {
const client = multi._client
const queue = multi._queue
if (multi._monitoring || client._monitoring) {
const err = new RangeError('Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.')
err.command = 'EXEC'
err.code = 'EXECABORT'
return new Promise((resolve, reject) => {
utils.replyInOrder(client, reject, err)
})
utils.replyInOrder(client, callback, err)
return
}
const len = queue.length
multi._errors = []
client._multi = true
// Silently ignore this error. We'll receive the error for the exec as well
const promises = [client.internalSendCommand(new Command('multi', [])).catch(() => {})]
// Drain queue, callback will catch 'QUEUED' or error
for (let index = 0; index < len; index++) {
// The commands may not be shifted off, since they are needed in the result handler
promises.push(pipelineTransactionCommand(multi, queue.peekAt(index), index).catch(e => e))
function receiver(err, reply) {
if (err !== null) {
multi._error = true
multi._results.push(err)
}
}
// Silently ignore the possible error. We'll receive the error for the exec as well
const multiCommand = new MultiCommand('multi', [])
multiCommand.callback = () => {}
client.internalSendCommand(multiCommand)
const queue = multi._queue
for (var i = 0; i < queue.length; i++) {
// Drain queue, callback will catch 'QUEUED' or error
const command = queue.peekAt(i)
// Queueing is done first, then the commands are executed
command.callback = receiver
client.internalSendCommand(command)
}
const main = client.internalSendCommand(new Command('exec', []))
return Promise.all(promises)
.then(() => main
.then(replies => multiCallback(multi, replies))
.catch((err) => {
err.errors = multi._errors
return Promise.reject(err)
}))
const execCommand = new MultiCommand('exec', [])
execCommand.callback = function (err, res) {
if (err !== null) {
multi._error = true
res = multi.results
} else if (res) {
for (var i = 0; i < queue.length; i++) {
const command = queue.peekAt(i)
if (res[i] instanceof Errors.RedisError) {
const match = res[i].message.match(utils.errCode)
// LUA script could return user errors that don't behave like all other errors!
if (match) {
res[i].code = match[1]
}
res[i].command = command.command.toUpperCase()
multi._error = true
} else {
// If we asked for strings, even in detectBuffers mode, then return strings:
res[i] = handleReply(multi._client, res[i], command)
}
}
}
if (multi._error) {
// TODO: The stack trace should be improved in case betterStackTraces is
// activated
const err = new Errors.RedisError('Batch command failed')
err.code = 'ERR'
// TODO: This was called "errors" instead of "replies". That is not
// consistent with the batch command.
err.replies = res
callback(err)
} else {
callback(null, res)
}
client._multi = false
}
client.internalSendCommand(execCommand)
}
function newBatchReceiver(multi, transformer) {
return function receiver(err, res) {
if (transformer) {
const tmp = transformer(err, res)
err = tmp[0]
res = tmp[1]
}
if (err !== null) {
multi._error = true
multi._results.push(err)
} else {
multi._results.push(res)
}
}
}
/**
* @description Execute a pipeline without transaction (batch ... exec)
*
* @param {Multi} multi
* @param {function} callback
* @returns Promise<any[]>
*/
function execBatch(multi) {
function execBatch(multi, callback) {
var i = 0
const client = multi._client
const queue = multi._queue
if (queue.length === 0) {
// TODO: return an error if not "ready"
return new Promise((resolve) => {
utils.replyInOrder(client, (e, res) => {
resolve(res)
}, null, [])
})
// This will return a result even if the client is not ready in case the
// queue is empty.
utils.replyInOrder(client, callback, null, [])
return
}
var error = false
function setError(err) {
error = true
return err
// if (betterStackTraces) {
// goodStackTrace = new Error()
// }
for (; i < queue.length - 1; i++) {
const command = queue.peekAt(i)
command.callback = newBatchReceiver(multi, command.transformer)
client.internalSendCommand(command)
}
const promises = []
while (queue.length) {
const command = queue.shift()
promises.push(client.internalSendCommand(command).catch(setError))
}
return Promise.all(promises).then((res) => {
if (error) {
const err = new Errors.RedisError('bla failed')
err.code = 'ERR'
err.replies = res
return Promise.reject(err)
const command = queue.peekAt(i)
command.callback = function (err, res) {
if (command.transformer !== undefined) {
const tmp = command.transformer(err, res)
err = tmp[0]
res = tmp[1]
}
return res
})
if (err !== null) {
multi._error = true
multi._results.push(err)
} else {
multi._results.push(res)
}
if (multi._error) {
// TODO: The stack trace should be improved in case betterStackTraces is
// activated.
const err = new Errors.RedisError('Batch command failed')
err.code = 'ERR'
err.replies = multi._results
callback(err)
} else {
callback(null, multi._results)
}
}
client.internalSendCommand(command)
}
class Multi {
@@ -154,6 +165,8 @@ class Multi {
this._client = client
this._type = type
this._queue = new Queue()
this._error = false
this._results = []
// Either undefined or an array. Fail hard if it's not an array
if (args) {
// Legacy support for passing in an array of arguments
@@ -172,29 +185,61 @@ class Multi {
/**
* @description Check the number of commands and execute those atomic
*
* @returns Promise<any[]>
* @param {function} [callback]
*
* @returns Promise<any[]>|undefined
*
* @memberof Multi
*/
execAtomic() {
if (this._queue.length < 2) {
return this.execBatch()
execAtomic(callback) {
var promise
if (callback === undefined) {
promise = new Promise((resolve, reject) => {
callback = function (err, res) {
if (err === null) {
resolve(res)
} else {
reject(err)
}
}
})
}
return this.exec()
if (this._queue.length < 2) {
this.execBatch(callback)
} else {
this.exec(callback)
}
return promise
}
/**
* @description Execute the corresponding multi type
*
* @returns Promise<any[]>
* @param {function} [callback]
*
* @returns Promise<any[]>|undefined
*
* @memberof Multi
*/
exec() {
if (this._type === 'batch') {
return execBatch(this)
exec(callback) {
var promise
if (callback === undefined) {
promise = new Promise((resolve, reject) => {
callback = function (err, res) {
if (err === null) {
resolve(res)
} else {
reject(err)
}
}
})
}
return execTransaction(this)
if (this._type === 'batch') {
execBatch(this, callback)
} else {
execTransaction(this, callback)
}
return promise
}
}