1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00

feat: add auto pipeline

This commit is contained in:
Ruben Bridgewater
2017-05-28 04:31:37 +02:00
parent 8c63233968
commit 0d53d3dcdf
8 changed files with 79 additions and 121 deletions

View File

@@ -17,6 +17,10 @@ It will not restore the support for old Node.js versions, the return value of
connectTimeout behavior. It will also only partially restore snake_case support connectTimeout behavior. It will also only partially restore snake_case support
and maybe more. and maybe more.
Features
- Native promise support
- Auto pipelining
Breaking Changes Breaking Changes
- Dropped support for `UPPER_CASE` commands - Dropped support for `UPPER_CASE` commands

View File

@@ -8,12 +8,9 @@
const net = require('net') const net = require('net')
const util = require('util') const util = require('util')
const utils = require('./lib/utils') const utils = require('./lib/utils')
const reconnect = require('./lib/reconnect')
const Queue = require('denque') const Queue = require('denque')
const errorClasses = require('./lib/customErrors')
const EventEmitter = require('events') const EventEmitter = require('events')
const Errors = require('redis-errors') const Errors = require('redis-errors')
const debug = require('./lib/debug')
const connect = require('./lib/connect') const connect = require('./lib/connect')
const Commands = require('redis-commands') const Commands = require('redis-commands')
const addCommand = require('./lib/commands') const addCommand = require('./lib/commands')
@@ -22,8 +19,6 @@ const Multi = require('./lib/multi')
const normalizeAndWriteCommand = require('./lib/writeCommands') const normalizeAndWriteCommand = require('./lib/writeCommands')
const offlineCommand = require('./lib/offlineCommand') const offlineCommand = require('./lib/offlineCommand')
function noop () {}
// Attention: The second parameter might be removed at will and is not officially supported. // Attention: The second parameter might be removed at will and is not officially supported.
// Do not rely on this // Do not rely on this
function RedisClient (options, stream) { function RedisClient (options, stream) {
@@ -80,7 +75,7 @@ function RedisClient (options, stream) {
this.shouldBuffer = false this.shouldBuffer = false
this.commandQueue = new Queue() // Holds sent commands to de-pipeline them this.commandQueue = new Queue() // Holds sent commands to de-pipeline them
this.offlineQueue = new Queue() // Holds commands issued but not able to be sent this.offlineQueue = new Queue() // Holds commands issued but not able to be sent
this.pipelineQueue = new Queue() // Holds all pipelined commands this._pipelineQueue = new Queue() // Holds all pipelined commands
// Only used as timeout until redis has to be connected to redis until throwing an connection error // Only used as timeout until redis has to be connected to redis until throwing an connection error
this.connectTimeout = +options.connectTimeout || 60000 // 60 * 1000 ms this.connectTimeout = +options.connectTimeout || 60000 // 60 * 1000 ms
this.enableOfflineQueue = options.enableOfflineQueue !== false this.enableOfflineQueue = options.enableOfflineQueue !== false
@@ -94,8 +89,8 @@ function RedisClient (options, stream) {
this.authPass = options.authPass || options.password this.authPass = options.authPass || options.password
this.selectedDb = options.db // Save the selected db here, used when reconnecting this.selectedDb = options.db // Save the selected db here, used when reconnecting
this.oldState = null this.oldState = null
this.fireStrings = true // Determine if strings or buffers should be written to the stream this._strCache = ''
this.pipeline = false this._pipeline = false
this.subCommandsLeft = 0 this.subCommandsLeft = 0
this.renameCommands = options.renameCommands || {} this.renameCommands = options.renameCommands || {}
this.timesConnected = 0 this.timesConnected = 0
@@ -125,17 +120,6 @@ util.inherits(RedisClient, EventEmitter)
RedisClient.connectionId = 0 RedisClient.connectionId = 0
/******************************************************************************
All functions in here are internal besides the RedisClient constructor
and the exported functions. Don't rely on them as they will be private
functions in nodeRedis v.3
******************************************************************************/
RedisClient.prototype.cork = noop
RedisClient.prototype.uncork = noop
RedisClient.prototype.initializeRetryVars = function () { RedisClient.prototype.initializeRetryVars = function () {
this.retryTimer = null this.retryTimer = null
this.retryTotaltime = 0 this.retryTotaltime = 0
@@ -221,39 +205,6 @@ RedisClient.prototype.internalSendCommand = function (commandObj) {
return commandObj.promise return commandObj.promise
} }
RedisClient.prototype.writeStrings = function () {
var str = ''
for (var command = this.pipelineQueue.shift(); command; command = this.pipelineQueue.shift()) {
// Write to stream if the string is bigger than 4mb. The biggest string may be Math.pow(2, 28) - 15 chars long
if (str.length + command.length > 4 * 1024 * 1024) {
this.shouldBuffer = !this._stream.write(str)
str = ''
}
str += command
}
if (str !== '') {
this.shouldBuffer = !this._stream.write(str)
}
}
RedisClient.prototype.writeBuffers = function () {
for (var command = this.pipelineQueue.shift(); command; command = this.pipelineQueue.shift()) {
this.shouldBuffer = !this._stream.write(command)
}
}
// TODO: This can be significantly improved!
// We can concat the string instead of using the queue
// in most cases. This improves the performance.
// This can only be used for strings only though.
RedisClient.prototype.write = function (data) {
if (this.pipeline === false) {
this.shouldBuffer = !this._stream.write(data)
return
}
this.pipelineQueue.push(data)
}
Commands.list.forEach((name) => addCommand(RedisClient.prototype, Multi.prototype, name)) Commands.list.forEach((name) => addCommand(RedisClient.prototype, Multi.prototype, name))
module.exports = { module.exports = {

View File

@@ -88,7 +88,6 @@ function execTransaction (multi) {
} }
const len = queue.length const len = queue.length
multi.errors = [] multi.errors = []
client.cork()
client._multi = true client._multi = true
multi.wantsBuffers = new Array(len) multi.wantsBuffers = new Array(len)
// Silently ignore this error. We'll receive the error for the exec as well // Silently ignore this error. We'll receive the error for the exec as well
@@ -100,7 +99,6 @@ function execTransaction (multi) {
} }
const main = client.internalSendCommand(new Command('exec', [])) const main = client.internalSendCommand(new Command('exec', []))
client.uncork()
return Promise.all(promises).then(() => main.then((replies) => multiCallback(multi, replies)).catch((err) => { return Promise.all(promises).then(() => main.then((replies) => multiCallback(multi, replies)).catch((err) => {
err.errors = multi.errors err.errors = multi.errors
return Promise.reject(err) return Promise.reject(err)
@@ -125,7 +123,6 @@ function execBatch (multi) {
}) })
} }
var error = false var error = false
client.cork()
const promises = [] const promises = []
while (queue.length) { while (queue.length) {
const command = queue.shift() const command = queue.shift()
@@ -134,7 +131,6 @@ function execBatch (multi) {
return e return e
})) }))
} }
client.uncork()
return Promise.all(promises).then((res) => { return Promise.all(promises).then((res) => {
if (error) { if (error) {
const err = new Errors.RedisError('bla failed') const err = new Errors.RedisError('bla failed')

View File

@@ -6,6 +6,9 @@ const Command = require('./command')
function onConnect (client) { function onConnect (client) {
debug('Stream connected %s id %s', client.address, client.connectionId) debug('Stream connected %s id %s', client.address, client.connectionId)
// TODO: Check if the clients prototype and the clients instance have
// fast properties. If that's not the case, make them fast properties
// again!
client.connected = true client.connected = true
client.ready = false client.ready = false
client.emittedEnd = false client.emittedEnd = false
@@ -49,22 +52,6 @@ function readyHandler (client) {
debug('readyHandler called %s id %s', client.address, client.connectionId) debug('readyHandler called %s id %s', client.address, client.connectionId)
client.ready = true client.ready = true
client.cork = () => {
client.pipeline = true
client._stream.cork()
}
client.uncork = () => {
if (client.fireStrings) {
client.writeStrings()
} else {
client.writeBuffers()
}
client.pipeline = false
client.fireStrings = true
// TODO: Consider using next tick here. See https://github.com/NodeRedis/nodeRedis/issues/1033
client._stream.uncork()
}
if (client.selectedDb !== undefined) { if (client.selectedDb !== undefined) {
client.internalSendCommand(new Command('select', [client.selectedDb])).catch((err) => { client.internalSendCommand(new Command('select', [client.selectedDb])).catch((err) => {
if (!client.closing) { if (!client.closing) {

View File

@@ -6,7 +6,6 @@ var lazyConnect = function (client) {
lazyConnect = require('./connect') lazyConnect = require('./connect')
lazyConnect(client) lazyConnect(client)
} }
const noop = () => {}
/** /**
* @description Try connecting to a server again * @description Try connecting to a server again
@@ -50,10 +49,6 @@ function reconnect (client, why, error) {
debug('Redis connection is gone from %s event.', why) debug('Redis connection is gone from %s event.', why)
client.connected = false client.connected = false
client.ready = false client.ready = false
// Deactivate cork to work with the offline queue
client.cork = noop
client.uncork = noop
client.pipeline = false
client.pubSubMode = 0 client.pubSubMode = 0
// since we are collapsing end and close, users don't expect to be called twice // since we are collapsing end and close, users don't expect to be called twice
@@ -108,10 +103,13 @@ function reconnect (client, why, error) {
if (client.options.retryUnfulfilledCommands) { if (client.options.retryUnfulfilledCommands) {
client.offlineQueue.unshift.apply(client.offlineQueue, client.commandQueue.toArray()) client.offlineQueue.unshift.apply(client.offlineQueue, client.commandQueue.toArray())
client.commandQueue.clear() client.commandQueue.clear()
} else if (client.commandQueue.length !== 0) { // TODO: If only the pipelineQueue contains the error we could improve the situation.
// We could postpone writing to the stream until we connected again and fire the commands.
// The commands in the pipelineQueue are also not uncertain. They never left the client.
} else if (client.commandQueue.length !== 0 || client._pipelineQueue.length !== 0) {
client.flushAndError('Redis connection lost and command aborted.', 'UNCERTAIN_STATE', { client.flushAndError('Redis connection lost and command aborted.', 'UNCERTAIN_STATE', {
error, error,
queues: ['commandQueue'] queues: ['commandQueue', '_pipelineQueue']
}) })
} }

View File

@@ -9,30 +9,72 @@ const debug = require('./debug')
// } catch (e) { // } catch (e) {
// // Fallback // // Fallback
// return (val) => { // return (val) => {
// return Buffer.isBuffer(val) || ArrayBuffer.isView(val) // return Buffer.isBuffer(val) || val instanceof Uint8Array
// } // }
// } // }
// })() // })()
const copy = [] const copy = []
const RN = Buffer.from('\r\n')
var bufferCount = 0 var bufferCount = 0
var errors = null var errors = null
function writeBuffers (client) { /**
client.fireStrings = false * @description Pipeline and write all commands to the stream
*
* If the pipelined string exceeds X mb, write it directly to the stream and pipeline the rest again.
* @param {RedisClient} client
*/
function writeToStream (client) {
const stream = client._stream
const queue = client._pipelineQueue
const cache = client._strCache
var buffer = false
while (queue.length) {
buffer = stream.write(queue.shift())
}
if (cache.length !== 0) {
buffer = stream.write(cache)
client._strCache = ''
}
client.shouldBuffer = !buffer
stream.uncork()
client._pipeline = false
}
// TODO: This can be significantly improved!
// We can concat the string instead of using the queue
// in most cases. This improves the performance.
// This can only be used for strings only though.
function write (client) {
if (client._pipeline === false) {
client._stream.cork()
client._pipeline = true
process.nextTick(writeToStream, client)
}
}
// TODO: Check if the performance is really increased
// by converting the strings to Buffers.
// At least from Node 8 on it should be better.
// TODO: Consider caching the arg.length buffer
function pipelineBuffers (client, commandStr) {
const queue = client._pipelineQueue
const cache = client._strCache
if (cache !== '') {
queue.push(Buffer.from(cache))
client._strCache = ''
}
queue.push(Buffer.from(commandStr))
while (copy.length) { while (copy.length) {
const arg = copy.shift() var arg = copy.shift()
// TODO: Consider to convert the strings to buffers
// This might actually improve the performance at
// least in more modern Node versions
if (typeof arg === 'string') { if (typeof arg === 'string') {
client.write(`$${Buffer.byteLength(arg)}\r\n${arg}\r\n`) arg = Buffer.from(arg)
} else { // buffer
client.write(`$${arg.length}\r\n`)
client.write(arg)
client.write('\r\n')
} }
queue.push(Buffer.from(`$${arg.length}\r\n`))
queue.push(arg)
queue.push(RN)
debug('sendCommand: buffer send %s bytes', arg.length) debug('sendCommand: buffer send %s bytes', arg.length)
} }
} }
@@ -46,7 +88,7 @@ function toString (arg) {
for (var i = 0; i < arg.length; i += 1) { for (var i = 0; i < arg.length; i += 1) {
toString(arg[i]) toString(arg[i])
} }
} else if (arg && arg.constructor.name === 'Buffer') { } else if (arg && arg.constructor.name === 'Buffer') { // TODO: check performance
copy.push(arg) copy.push(arg)
bufferCount++ bufferCount++
} else if (typeof arg === 'boolean') { // TODO: Remove this support and use hooks instead } else if (typeof arg === 'boolean') { // TODO: Remove this support and use hooks instead
@@ -125,6 +167,7 @@ function normalizeAndWrite (client, command) {
command.bufferArgs = bufferArgs command.bufferArgs = bufferArgs
command.argsLength = len command.argsLength = len
const queue = client._pipelineQueue
if (bufferArgs === false) { if (bufferArgs === false) {
while (copy.length) { while (copy.length) {
@@ -132,11 +175,15 @@ function normalizeAndWrite (client, command) {
commandStr += `$${Buffer.byteLength(arg)}\r\n${arg}\r\n` commandStr += `$${Buffer.byteLength(arg)}\r\n${arg}\r\n`
} }
debug('Send %s id %s: %s', client.address, client.connectionId, commandStr) debug('Send %s id %s: %s', client.address, client.connectionId, commandStr)
client.write(commandStr) client._strCache += commandStr
if (client._strCache.length > 10 * 1024 * 1024) {
queue.push(client._strCache)
client._strCache = ''
}
} else { } else {
client.write(commandStr) pipelineBuffers(client, commandStr)
writeBuffers(client)
} }
write(client)
} }
module.exports = normalizeAndWrite module.exports = normalizeAndWrite

View File

@@ -49,6 +49,7 @@ describe('The \'info\' method', () => {
}) })
it('return error after a failure', () => { it('return error after a failure', () => {
client.on('error', helper.isError(/This socket is closed/))
const promise = client.info().then(helper.fail).catch((err) => { const promise = client.info().then(helper.fail).catch((err) => {
assert.strictEqual(err.code, 'UNCERTAIN_STATE') assert.strictEqual(err.code, 'UNCERTAIN_STATE')
assert.strictEqual(err.command, 'INFO') assert.strictEqual(err.command, 'INFO')

View File

@@ -135,32 +135,6 @@ describe('The nodeRedis client', () => {
}) })
}) })
describe('big data', () => {
// Check if the fast mode for big strings is working correct
// TODO: Evaluate if this is still necessary after the refactoring
it.skip('safe strings that are bigger than 30000 characters with multi', () => {
let str = 'foo ಠ_ಠ bar '
while (str.length < 111111) {
str += str
}
let called = false
const temp = client.writeBuffers.bind(client)
assert(client.fireStrings)
client.writeBuffers = function (data) {
called = true
// To increase write performance for strings the value is converted to a buffer
assert(!client.fireStrings)
temp(data)
}
const promise = client.multi().set('foo', str).get('foo').exec().then((res) => {
assert.strictEqual(called, true)
assert.strictEqual(res[1], str)
})
assert(client.fireStrings)
return promise
})
})
describe('sendCommand', () => { describe('sendCommand', () => {
it('omitting args should be fine', () => { it('omitting args should be fine', () => {
client.serverInfo = {} client.serverInfo = {}