You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
chore: use es6 for multi
This commit is contained in:
@@ -40,7 +40,7 @@ commands.list.forEach((command) => {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
this.queue.push(new Command(command, arr))
|
this._queue.push(new Command(command, arr))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
if (multiProto[command] !== commandName) {
|
if (multiProto[command] !== commandName) {
|
||||||
|
@@ -4,6 +4,7 @@ const utils = require('./utils')
|
|||||||
const debug = require('./debug')
|
const debug = require('./debug')
|
||||||
const RedisClient = require('../').RedisClient
|
const RedisClient = require('../').RedisClient
|
||||||
const Command = require('./command')
|
const Command = require('./command')
|
||||||
|
const Multi = require('./multi')
|
||||||
const noop = function () {}
|
const noop = function () {}
|
||||||
|
|
||||||
/**********************************************
|
/**********************************************
|
||||||
@@ -77,7 +78,6 @@ RedisClient.prototype.unref = function () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: promisify this
|
// TODO: promisify this
|
||||||
// TODO: the sendCommand legacy module should also make duplicate handle callbacks again
|
|
||||||
RedisClient.prototype.duplicate = function (options, callback) {
|
RedisClient.prototype.duplicate = function (options, callback) {
|
||||||
if (typeof options === 'function') {
|
if (typeof options === 'function') {
|
||||||
callback = options
|
callback = options
|
||||||
@@ -105,3 +105,13 @@ RedisClient.prototype.duplicate = function (options, callback) {
|
|||||||
}
|
}
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: this overrides a native function!
|
||||||
|
RedisClient.prototype.multi = function multi (args) {
|
||||||
|
return new Multi(this, 'multi', args)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: This is not a native function but is still handled as a individual command as it behaves just the same as multi
|
||||||
|
RedisClient.prototype.batch = function batch (args) {
|
||||||
|
return new Multi(this, 'batch', args)
|
||||||
|
}
|
||||||
|
@@ -17,19 +17,10 @@ const RedisClient = require('../').RedisClient
|
|||||||
|
|
||||||
TODO: Implement individual command generation as soon as possible to prevent divergent code
|
TODO: Implement individual command generation as soon as possible to prevent divergent code
|
||||||
on single and multi calls!
|
on single and multi calls!
|
||||||
|
|
||||||
|
TODO: Implement hooks to replace this. Most of these things are perfect for hooks
|
||||||
********************************************************************************************/
|
********************************************************************************************/
|
||||||
|
|
||||||
RedisClient.prototype.multi = function multi (args) {
|
|
||||||
const multi = new Multi(this, args)
|
|
||||||
multi.exec = multi.EXEC = multi.execTransaction
|
|
||||||
return multi
|
|
||||||
}
|
|
||||||
|
|
||||||
// ATTENTION: This is not a native function but is still handled as a individual command as it behaves just the same as multi
|
|
||||||
RedisClient.prototype.batch = function batch (args) {
|
|
||||||
return new Multi(this, args)
|
|
||||||
}
|
|
||||||
|
|
||||||
function selectCallback (self, db) {
|
function selectCallback (self, db) {
|
||||||
return function (err, res) {
|
return function (err, res) {
|
||||||
if (err === null) {
|
if (err === null) {
|
||||||
@@ -45,7 +36,7 @@ RedisClient.prototype.select = function select (db) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Multi.prototype.select = function select (db) {
|
Multi.prototype.select = function select (db) {
|
||||||
this.queue.push(new Command('select', [db], null, selectCallback(this._client, db)))
|
this._queue.push(new Command('select', [db], null, selectCallback(this._client, db)))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -66,7 +57,7 @@ Multi.prototype.monitor = function monitor () {
|
|||||||
const callOnWrite = () => {
|
const callOnWrite = () => {
|
||||||
this._client.monitoring = true
|
this._client.monitoring = true
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('monitor', [], callOnWrite))
|
this._queue.push(new Command('monitor', [], callOnWrite))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
// Set multi monitoring to indicate the exec that it should abort
|
// Set multi monitoring to indicate the exec that it should abort
|
||||||
@@ -111,7 +102,7 @@ Multi.prototype.quit = function quit () {
|
|||||||
this._client.closing = true
|
this._client.closing = true
|
||||||
this._client.ready = false
|
this._client.ready = false
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('quit', [], null, quitCallback(this._client), callOnWrite))
|
this._queue.push(new Command('quit', [], null, quitCallback(this._client), callOnWrite))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,7 +152,7 @@ RedisClient.prototype.info = function info (section) {
|
|||||||
|
|
||||||
Multi.prototype.info = function info (section) {
|
Multi.prototype.info = function info (section) {
|
||||||
const args = section ? [section] : []
|
const args = section ? [section] : []
|
||||||
this.queue.push(new Command('info', args, null, infoCallback(this._client)))
|
this._queue.push(new Command('info', args, null, infoCallback(this._client)))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -196,7 +187,7 @@ Multi.prototype.auth = function auth (pass) {
|
|||||||
|
|
||||||
// Stash auth for connect and reconnect.
|
// Stash auth for connect and reconnect.
|
||||||
this.authPass = pass
|
this.authPass = pass
|
||||||
this.queue.push(new Command('auth', [pass], null, authCallback(this._client)))
|
this._queue.push(new Command('auth', [pass], null, authCallback(this._client)))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -237,7 +228,7 @@ Multi.prototype.client = function client () {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('client', arr, callOnWrite))
|
this._queue.push(new Command('client', arr, callOnWrite))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -262,7 +253,7 @@ Multi.prototype.subscribe = function subscribe () {
|
|||||||
const callOnWrite = () => {
|
const callOnWrite = () => {
|
||||||
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('subscribe', arr, callOnWrite))
|
this._queue.push(new Command('subscribe', arr, callOnWrite))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -289,7 +280,7 @@ Multi.prototype.unsubscribe = function unsubscribe () {
|
|||||||
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
||||||
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('unsubscribe', arr, callOnWrite))
|
this._queue.push(new Command('unsubscribe', arr, callOnWrite))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -314,7 +305,7 @@ Multi.prototype.psubscribe = function psubscribe () {
|
|||||||
const callOnWrite = () => {
|
const callOnWrite = () => {
|
||||||
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('psubscribe', arr, callOnWrite))
|
this._queue.push(new Command('psubscribe', arr, callOnWrite))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -341,6 +332,6 @@ Multi.prototype.punsubscribe = function punsubscribe () {
|
|||||||
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
||||||
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('punsubscribe', arr, callOnWrite))
|
this._queue.push(new Command('punsubscribe', arr, callOnWrite))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
185
lib/multi.js
185
lib/multi.js
@@ -4,67 +4,57 @@ const Queue = require('denque')
|
|||||||
const utils = require('./utils')
|
const utils = require('./utils')
|
||||||
const Command = require('./command')
|
const Command = require('./command')
|
||||||
|
|
||||||
// TODO: Remove support for the non chaining way of using this
|
/**
|
||||||
// It's confusing and has no benefit
|
* @description Queues all transaction commands and checks if a queuing error
|
||||||
function Multi (client, args) {
|
* occurred.
|
||||||
this._client = client
|
*
|
||||||
this.queue = new Queue()
|
* @param {Multi} multi
|
||||||
var command, tmpArgs
|
* @param {Command} command
|
||||||
if (args) { // Either undefined or an array. Fail hard if it's not an array
|
* @param {number} index Command index in the Multi queue
|
||||||
for (let i = 0; i < args.length; i++) {
|
* @returns *
|
||||||
command = args[i][0]
|
*/
|
||||||
tmpArgs = args[i].slice(1)
|
function pipelineTransactionCommand (multi, command, index) {
|
||||||
if (Array.isArray(command)) {
|
|
||||||
this[command[0]].apply(this, command.slice(1).concat(tmpArgs))
|
|
||||||
} else {
|
|
||||||
this[command].apply(this, tmpArgs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function pipelineTransactionCommand (self, commandObj, index) {
|
|
||||||
// Queueing is done first, then the commands are executed
|
// Queueing is done first, then the commands are executed
|
||||||
const tmp = commandObj.callback
|
const tmp = command.callback
|
||||||
commandObj.callback = function (err, reply) {
|
command.callback = function (err, reply) {
|
||||||
if (err) {
|
if (err) {
|
||||||
tmp(err)
|
tmp(err)
|
||||||
err.position = index
|
err.position = index
|
||||||
self.errors.push(err)
|
multi.errors.push(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Keep track of who wants buffer responses:
|
// Keep track of who wants buffer responses:
|
||||||
// By the time the callback is called the commandObj got the bufferArgs attribute attached
|
// By the time the callback is called the command got the bufferArgs attribute attached
|
||||||
self.wantsBuffers[index] = commandObj.bufferArgs
|
multi.wantsBuffers[index] = command.bufferArgs
|
||||||
tmp(null, reply)
|
tmp(null, reply)
|
||||||
}
|
}
|
||||||
return self._client.internalSendCommand(commandObj)
|
return multi._client.internalSendCommand(command)
|
||||||
}
|
}
|
||||||
|
|
||||||
Multi.prototype.execAtomic = function execAtomic () {
|
/**
|
||||||
if (this.queue.length < 2) {
|
* @description Make sure all replies are of the correct type and call the command callback
|
||||||
return this.execBatch()
|
*
|
||||||
}
|
* @param {Multi} multi
|
||||||
return this.exec()
|
* @param {any[]} replies
|
||||||
}
|
* @returns any[]
|
||||||
|
*/
|
||||||
function multiCallback (self, replies) {
|
function multiCallback (multi, replies) {
|
||||||
var i = 0
|
|
||||||
|
|
||||||
if (replies) {
|
if (replies) {
|
||||||
for (let commandObj = self.queue.shift(); commandObj !== undefined; commandObj = self.queue.shift()) {
|
var i = 0
|
||||||
if (replies[i].message) { // instanceof Error
|
while (multi._queue.length !== 0) {
|
||||||
|
const command = multi._queue.shift()
|
||||||
|
if (replies[i] instanceof Error) {
|
||||||
const match = replies[i].message.match(utils.errCode)
|
const match = replies[i].message.match(utils.errCode)
|
||||||
// LUA script could return user errors that don't behave like all other errors!
|
// LUA script could return user errors that don't behave like all other errors!
|
||||||
if (match) {
|
if (match) {
|
||||||
replies[i].code = match[1]
|
replies[i].code = match[1]
|
||||||
}
|
}
|
||||||
replies[i].command = commandObj.command.toUpperCase()
|
replies[i].command = command.command.toUpperCase()
|
||||||
commandObj.callback(replies[i])
|
command.callback(replies[i])
|
||||||
} else {
|
} else {
|
||||||
// If we asked for strings, even in detectBuffers mode, then return strings:
|
// If we asked for strings, even in detectBuffers mode, then return strings:
|
||||||
replies[i] = self._client.handleReply(replies[i], commandObj.command, self.wantsBuffers[i])
|
replies[i] = multi._client.handleReply(replies[i], command.command, multi.wantsBuffers[i])
|
||||||
commandObj.callback(null, replies[i])
|
command.callback(null, replies[i])
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
@@ -73,60 +63,72 @@ function multiCallback (self, replies) {
|
|||||||
return replies
|
return replies
|
||||||
}
|
}
|
||||||
|
|
||||||
Multi.prototype.execTransaction = function execTransaction () {
|
/**
|
||||||
if (this.monitoring || this._client.monitoring) {
|
* @description Execute a Redis transaction (multi ... exec)
|
||||||
|
*
|
||||||
|
* @param {Multi} multi
|
||||||
|
* @returns Promise<any[]>
|
||||||
|
*/
|
||||||
|
function execTransaction (multi) {
|
||||||
|
if (multi.monitoring || multi._client.monitoring) {
|
||||||
const err = new RangeError(
|
const err = new RangeError(
|
||||||
'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.'
|
'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.'
|
||||||
)
|
)
|
||||||
err.command = 'EXEC'
|
err.command = 'EXEC'
|
||||||
err.code = 'EXECABORT'
|
err.code = 'EXECABORT'
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
utils.replyInOrder(this._client, (err, res) => {
|
utils.replyInOrder(multi._client, (err, res) => {
|
||||||
if (err) return reject(err)
|
if (err) return reject(err)
|
||||||
resolve(res)
|
resolve(res)
|
||||||
}, null, [])
|
}, null, [])
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
const len = this.queue.length
|
const len = multi._queue.length
|
||||||
this.errors = []
|
multi.errors = []
|
||||||
this._client.cork()
|
multi._client.cork()
|
||||||
this.wantsBuffers = new Array(len)
|
multi.wantsBuffers = new Array(len)
|
||||||
// Silently ignore this error. We'll receive the error for the exec as well
|
// Silently ignore this error. We'll receive the error for the exec as well
|
||||||
const promises = [this._client.internalSendCommand(new Command('multi', [])).catch(() => {})]
|
const promises = [multi._client.internalSendCommand(new Command('multi', [])).catch(() => {})]
|
||||||
// Drain queue, callback will catch 'QUEUED' or error
|
// Drain queue, callback will catch 'QUEUED' or error
|
||||||
for (let index = 0; index < len; index++) {
|
for (let index = 0; index < len; index++) {
|
||||||
// The commands may not be shifted off, since they are needed in the result handler
|
// The commands may not be shifted off, since they are needed in the result handler
|
||||||
promises.push(pipelineTransactionCommand(this, this.queue.get(index), index).catch((e) => e))
|
promises.push(pipelineTransactionCommand(multi, multi._queue.get(index), index).catch((e) => e))
|
||||||
}
|
}
|
||||||
|
|
||||||
const main = this._client.internalSendCommand(new Command('exec', []))
|
const main = multi._client.internalSendCommand(new Command('exec', []))
|
||||||
this._client.uncork()
|
multi._client.uncork()
|
||||||
return Promise.all(promises).then(() => main.then((replies) => multiCallback(this, replies)).catch((err) => {
|
return Promise.all(promises).then(() => main.then((replies) => multiCallback(multi, replies)).catch((err) => {
|
||||||
err.errors = this.errors
|
err.errors = multi.errors
|
||||||
return Promise.reject(err)
|
return Promise.reject(err)
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
Multi.prototype.exec = Multi.prototype.execBatch = function execBatch () {
|
/**
|
||||||
if (this.queue.length === 0) {
|
* @description Execute a pipeline without transaction (batch ... exec)
|
||||||
|
*
|
||||||
|
* @param {Multi} multi
|
||||||
|
* @returns Promise<any[]>
|
||||||
|
*/
|
||||||
|
function execBatch (multi) {
|
||||||
|
if (multi._queue.length === 0) {
|
||||||
// TODO: return an error if not "ready"
|
// TODO: return an error if not "ready"
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve) => {
|
||||||
utils.replyInOrder(this._client, (e, res) => {
|
utils.replyInOrder(multi._client, (e, res) => {
|
||||||
resolve(res)
|
resolve(res)
|
||||||
}, null, [])
|
}, null, [])
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
var error = false
|
var error = false
|
||||||
this._client.cork()
|
multi._client.cork()
|
||||||
const promises = []
|
const promises = []
|
||||||
while (this.queue.length) {
|
while (multi._queue.length) {
|
||||||
const commandObj = this.queue.shift()
|
const commandObj = multi._queue.shift()
|
||||||
promises.push(this._client.internalSendCommand(commandObj).catch((e) => {
|
promises.push(multi._client.internalSendCommand(commandObj).catch((e) => {
|
||||||
error = true
|
error = true
|
||||||
return e
|
return e
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
this._client.uncork()
|
multi._client.uncork()
|
||||||
return Promise.all(promises).then((res) => {
|
return Promise.all(promises).then((res) => {
|
||||||
if (error) {
|
if (error) {
|
||||||
const err = new Error('bla failed')
|
const err = new Error('bla failed')
|
||||||
@@ -138,4 +140,61 @@ Multi.prototype.exec = Multi.prototype.execBatch = function execBatch () {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class Multi {
|
||||||
|
/**
|
||||||
|
* Creates an instance of Multi.
|
||||||
|
* @param {RedisClient} client
|
||||||
|
* @param {string} type
|
||||||
|
* @param {any[]} [args]
|
||||||
|
*
|
||||||
|
* @memberof Multi
|
||||||
|
*/
|
||||||
|
constructor (client, type, args) {
|
||||||
|
this._client = client
|
||||||
|
this._type = type
|
||||||
|
this._queue = new Queue()
|
||||||
|
// Either undefined or an array. Fail hard if it's not an array
|
||||||
|
if (args) {
|
||||||
|
// Legacy support for passing in an array of arguments
|
||||||
|
for (let i = 0; i < args.length; i++) {
|
||||||
|
const command = args[i][0]
|
||||||
|
const tmpArgs = args[i].slice(1)
|
||||||
|
if (Array.isArray(command)) {
|
||||||
|
this[command[0]].apply(this, command.slice(1).concat(tmpArgs))
|
||||||
|
} else {
|
||||||
|
this[command].apply(this, tmpArgs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @description Check the number of commands and execute those atomic
|
||||||
|
*
|
||||||
|
* @returns Promise<any[]>
|
||||||
|
*
|
||||||
|
* @memberof Multi
|
||||||
|
*/
|
||||||
|
execAtomic () {
|
||||||
|
if (this._queue.length < 2) {
|
||||||
|
return this.execBatch()
|
||||||
|
}
|
||||||
|
return this.exec()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @description Execute the corresponding multi type
|
||||||
|
*
|
||||||
|
* @returns Promise<any[]>
|
||||||
|
*
|
||||||
|
* @memberof Multi
|
||||||
|
*/
|
||||||
|
exec () {
|
||||||
|
if (this._type === 'batch') {
|
||||||
|
return execBatch(this)
|
||||||
|
}
|
||||||
|
return execTransaction(this)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = Multi
|
module.exports = Multi
|
||||||
|
@@ -73,7 +73,6 @@ function replyInOrder (self, callback, err, res, queue) {
|
|||||||
callback(err, res)
|
callback(err, res)
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
// TODO: Change this to chain promises instead
|
|
||||||
const tmp = commandObj.callback
|
const tmp = commandObj.callback
|
||||||
commandObj.callback = function (e, r) {
|
commandObj.callback = function (e, r) {
|
||||||
tmp(e, r)
|
tmp(e, r)
|
||||||
|
@@ -32,7 +32,7 @@
|
|||||||
"redis-parser": "^3.0.0"
|
"redis-parser": "^3.0.0"
|
||||||
},
|
},
|
||||||
"engines": {
|
"engines": {
|
||||||
"node": ">=6"
|
"node": ">=4"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"coveralls": "^2.11.2",
|
"coveralls": "^2.11.2",
|
||||||
|
Reference in New Issue
Block a user