From 0d53d3dcdf14185eaef2810e2ba88e1e7002fa25 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Sun, 28 May 2017 04:31:37 +0200 Subject: [PATCH] feat: add auto pipeline --- changelog.md | 4 ++ index.js | 55 ++------------------------ lib/multi.js | 4 -- lib/readyHandler.js | 19 ++------- lib/reconnect.js | 12 +++--- lib/writeCommands.js | 79 ++++++++++++++++++++++++++++++-------- test/commands/info.spec.js | 1 + test/node_redis.spec.js | 26 ------------- 8 files changed, 79 insertions(+), 121 deletions(-) diff --git a/changelog.md b/changelog.md index bd49478b3f..780df3be25 100644 --- a/changelog.md +++ b/changelog.md @@ -17,6 +17,10 @@ It will not restore the support for old Node.js versions, the return value of connectTimeout behavior. It will also only partially restore snake_case support and maybe more. +Features +- Native promise support +- Auto pipelining + Breaking Changes - Dropped support for `UPPER_CASE` commands diff --git a/index.js b/index.js index 32c2cc9ef3..7e4f50e130 100644 --- a/index.js +++ b/index.js @@ -8,12 +8,9 @@ const net = require('net') const util = require('util') const utils = require('./lib/utils') -const reconnect = require('./lib/reconnect') const Queue = require('denque') -const errorClasses = require('./lib/customErrors') const EventEmitter = require('events') const Errors = require('redis-errors') -const debug = require('./lib/debug') const connect = require('./lib/connect') const Commands = require('redis-commands') const addCommand = require('./lib/commands') @@ -22,8 +19,6 @@ const Multi = require('./lib/multi') const normalizeAndWriteCommand = require('./lib/writeCommands') const offlineCommand = require('./lib/offlineCommand') -function noop () {} - // Attention: The second parameter might be removed at will and is not officially supported. // Do not rely on this function RedisClient (options, stream) { @@ -80,7 +75,7 @@ function RedisClient (options, stream) { this.shouldBuffer = false this.commandQueue = new Queue() // Holds sent commands to de-pipeline them this.offlineQueue = new Queue() // Holds commands issued but not able to be sent - this.pipelineQueue = new Queue() // Holds all pipelined commands + this._pipelineQueue = new Queue() // Holds all pipelined commands // Only used as timeout until redis has to be connected to redis until throwing an connection error this.connectTimeout = +options.connectTimeout || 60000 // 60 * 1000 ms this.enableOfflineQueue = options.enableOfflineQueue !== false @@ -94,8 +89,8 @@ function RedisClient (options, stream) { this.authPass = options.authPass || options.password this.selectedDb = options.db // Save the selected db here, used when reconnecting this.oldState = null - this.fireStrings = true // Determine if strings or buffers should be written to the stream - this.pipeline = false + this._strCache = '' + this._pipeline = false this.subCommandsLeft = 0 this.renameCommands = options.renameCommands || {} this.timesConnected = 0 @@ -125,17 +120,6 @@ util.inherits(RedisClient, EventEmitter) RedisClient.connectionId = 0 -/****************************************************************************** - - All functions in here are internal besides the RedisClient constructor - and the exported functions. Don't rely on them as they will be private - functions in nodeRedis v.3 - -******************************************************************************/ - -RedisClient.prototype.cork = noop -RedisClient.prototype.uncork = noop - RedisClient.prototype.initializeRetryVars = function () { this.retryTimer = null this.retryTotaltime = 0 @@ -221,39 +205,6 @@ RedisClient.prototype.internalSendCommand = function (commandObj) { return commandObj.promise } -RedisClient.prototype.writeStrings = function () { - var str = '' - for (var command = this.pipelineQueue.shift(); command; command = this.pipelineQueue.shift()) { - // Write to stream if the string is bigger than 4mb. The biggest string may be Math.pow(2, 28) - 15 chars long - if (str.length + command.length > 4 * 1024 * 1024) { - this.shouldBuffer = !this._stream.write(str) - str = '' - } - str += command - } - if (str !== '') { - this.shouldBuffer = !this._stream.write(str) - } -} - -RedisClient.prototype.writeBuffers = function () { - for (var command = this.pipelineQueue.shift(); command; command = this.pipelineQueue.shift()) { - this.shouldBuffer = !this._stream.write(command) - } -} - -// TODO: This can be significantly improved! -// We can concat the string instead of using the queue -// in most cases. This improves the performance. -// This can only be used for strings only though. -RedisClient.prototype.write = function (data) { - if (this.pipeline === false) { - this.shouldBuffer = !this._stream.write(data) - return - } - this.pipelineQueue.push(data) -} - Commands.list.forEach((name) => addCommand(RedisClient.prototype, Multi.prototype, name)) module.exports = { diff --git a/lib/multi.js b/lib/multi.js index a6d8f8ff56..9291cb61b9 100644 --- a/lib/multi.js +++ b/lib/multi.js @@ -88,7 +88,6 @@ function execTransaction (multi) { } const len = queue.length multi.errors = [] - client.cork() client._multi = true multi.wantsBuffers = new Array(len) // Silently ignore this error. We'll receive the error for the exec as well @@ -100,7 +99,6 @@ function execTransaction (multi) { } const main = client.internalSendCommand(new Command('exec', [])) - client.uncork() return Promise.all(promises).then(() => main.then((replies) => multiCallback(multi, replies)).catch((err) => { err.errors = multi.errors return Promise.reject(err) @@ -125,7 +123,6 @@ function execBatch (multi) { }) } var error = false - client.cork() const promises = [] while (queue.length) { const command = queue.shift() @@ -134,7 +131,6 @@ function execBatch (multi) { return e })) } - client.uncork() return Promise.all(promises).then((res) => { if (error) { const err = new Errors.RedisError('bla failed') diff --git a/lib/readyHandler.js b/lib/readyHandler.js index 3a6e4dedf4..ed23061984 100644 --- a/lib/readyHandler.js +++ b/lib/readyHandler.js @@ -6,6 +6,9 @@ const Command = require('./command') function onConnect (client) { debug('Stream connected %s id %s', client.address, client.connectionId) + // TODO: Check if the clients prototype and the clients instance have + // fast properties. If that's not the case, make them fast properties + // again! client.connected = true client.ready = false client.emittedEnd = false @@ -49,22 +52,6 @@ function readyHandler (client) { debug('readyHandler called %s id %s', client.address, client.connectionId) client.ready = true - client.cork = () => { - client.pipeline = true - client._stream.cork() - } - client.uncork = () => { - if (client.fireStrings) { - client.writeStrings() - } else { - client.writeBuffers() - } - client.pipeline = false - client.fireStrings = true - // TODO: Consider using next tick here. See https://github.com/NodeRedis/nodeRedis/issues/1033 - client._stream.uncork() - } - if (client.selectedDb !== undefined) { client.internalSendCommand(new Command('select', [client.selectedDb])).catch((err) => { if (!client.closing) { diff --git a/lib/reconnect.js b/lib/reconnect.js index d357a9b992..68ce508a45 100644 --- a/lib/reconnect.js +++ b/lib/reconnect.js @@ -6,7 +6,6 @@ var lazyConnect = function (client) { lazyConnect = require('./connect') lazyConnect(client) } -const noop = () => {} /** * @description Try connecting to a server again @@ -50,10 +49,6 @@ function reconnect (client, why, error) { debug('Redis connection is gone from %s event.', why) client.connected = false client.ready = false - // Deactivate cork to work with the offline queue - client.cork = noop - client.uncork = noop - client.pipeline = false client.pubSubMode = 0 // since we are collapsing end and close, users don't expect to be called twice @@ -108,10 +103,13 @@ function reconnect (client, why, error) { if (client.options.retryUnfulfilledCommands) { client.offlineQueue.unshift.apply(client.offlineQueue, client.commandQueue.toArray()) client.commandQueue.clear() - } else if (client.commandQueue.length !== 0) { + // TODO: If only the pipelineQueue contains the error we could improve the situation. + // We could postpone writing to the stream until we connected again and fire the commands. + // The commands in the pipelineQueue are also not uncertain. They never left the client. + } else if (client.commandQueue.length !== 0 || client._pipelineQueue.length !== 0) { client.flushAndError('Redis connection lost and command aborted.', 'UNCERTAIN_STATE', { error, - queues: ['commandQueue'] + queues: ['commandQueue', '_pipelineQueue'] }) } diff --git a/lib/writeCommands.js b/lib/writeCommands.js index 3d45efc911..7bb0cc5cff 100644 --- a/lib/writeCommands.js +++ b/lib/writeCommands.js @@ -9,30 +9,72 @@ const debug = require('./debug') // } catch (e) { // // Fallback // return (val) => { -// return Buffer.isBuffer(val) || ArrayBuffer.isView(val) +// return Buffer.isBuffer(val) || val instanceof Uint8Array // } // } // })() const copy = [] +const RN = Buffer.from('\r\n') var bufferCount = 0 var errors = null -function writeBuffers (client) { - client.fireStrings = false +/** + * @description Pipeline and write all commands to the stream + * + * If the pipelined string exceeds X mb, write it directly to the stream and pipeline the rest again. + * @param {RedisClient} client + */ +function writeToStream (client) { + const stream = client._stream + const queue = client._pipelineQueue + const cache = client._strCache + var buffer = false + while (queue.length) { + buffer = stream.write(queue.shift()) + } + if (cache.length !== 0) { + buffer = stream.write(cache) + client._strCache = '' + } + client.shouldBuffer = !buffer + stream.uncork() + client._pipeline = false +} +// TODO: This can be significantly improved! +// We can concat the string instead of using the queue +// in most cases. This improves the performance. +// This can only be used for strings only though. +function write (client) { + if (client._pipeline === false) { + client._stream.cork() + client._pipeline = true + process.nextTick(writeToStream, client) + } +} + +// TODO: Check if the performance is really increased +// by converting the strings to Buffers. +// At least from Node 8 on it should be better. +// TODO: Consider caching the arg.length buffer +function pipelineBuffers (client, commandStr) { + const queue = client._pipelineQueue + const cache = client._strCache + if (cache !== '') { + queue.push(Buffer.from(cache)) + client._strCache = '' + } + queue.push(Buffer.from(commandStr)) while (copy.length) { - const arg = copy.shift() - // TODO: Consider to convert the strings to buffers - // This might actually improve the performance at - // least in more modern Node versions + var arg = copy.shift() if (typeof arg === 'string') { - client.write(`$${Buffer.byteLength(arg)}\r\n${arg}\r\n`) - } else { // buffer - client.write(`$${arg.length}\r\n`) - client.write(arg) - client.write('\r\n') + arg = Buffer.from(arg) } + + queue.push(Buffer.from(`$${arg.length}\r\n`)) + queue.push(arg) + queue.push(RN) debug('sendCommand: buffer send %s bytes', arg.length) } } @@ -46,7 +88,7 @@ function toString (arg) { for (var i = 0; i < arg.length; i += 1) { toString(arg[i]) } - } else if (arg && arg.constructor.name === 'Buffer') { + } else if (arg && arg.constructor.name === 'Buffer') { // TODO: check performance copy.push(arg) bufferCount++ } else if (typeof arg === 'boolean') { // TODO: Remove this support and use hooks instead @@ -125,6 +167,7 @@ function normalizeAndWrite (client, command) { command.bufferArgs = bufferArgs command.argsLength = len + const queue = client._pipelineQueue if (bufferArgs === false) { while (copy.length) { @@ -132,11 +175,15 @@ function normalizeAndWrite (client, command) { commandStr += `$${Buffer.byteLength(arg)}\r\n${arg}\r\n` } debug('Send %s id %s: %s', client.address, client.connectionId, commandStr) - client.write(commandStr) + client._strCache += commandStr + if (client._strCache.length > 10 * 1024 * 1024) { + queue.push(client._strCache) + client._strCache = '' + } } else { - client.write(commandStr) - writeBuffers(client) + pipelineBuffers(client, commandStr) } + write(client) } module.exports = normalizeAndWrite diff --git a/test/commands/info.spec.js b/test/commands/info.spec.js index b65ec48190..818384db29 100644 --- a/test/commands/info.spec.js +++ b/test/commands/info.spec.js @@ -49,6 +49,7 @@ describe('The \'info\' method', () => { }) it('return error after a failure', () => { + client.on('error', helper.isError(/This socket is closed/)) const promise = client.info().then(helper.fail).catch((err) => { assert.strictEqual(err.code, 'UNCERTAIN_STATE') assert.strictEqual(err.command, 'INFO') diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index bd500d1985..89d1a63dd8 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -135,32 +135,6 @@ describe('The nodeRedis client', () => { }) }) - describe('big data', () => { - // Check if the fast mode for big strings is working correct - // TODO: Evaluate if this is still necessary after the refactoring - it.skip('safe strings that are bigger than 30000 characters with multi', () => { - let str = 'foo ಠ_ಠ bar ' - while (str.length < 111111) { - str += str - } - let called = false - const temp = client.writeBuffers.bind(client) - assert(client.fireStrings) - client.writeBuffers = function (data) { - called = true - // To increase write performance for strings the value is converted to a buffer - assert(!client.fireStrings) - temp(data) - } - const promise = client.multi().set('foo', str).get('foo').exec().then((res) => { - assert.strictEqual(called, true) - assert.strictEqual(res[1], str) - }) - assert(client.fireStrings) - return promise - }) - }) - describe('sendCommand', () => { it('omitting args should be fine', () => { client.serverInfo = {}