From 0ec2c43603ccbad95185475447f832f3f44866bd Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Mon, 26 Oct 2015 21:37:02 +0100 Subject: [PATCH] Fix fired but not yet returned commands not being rejected after a connection loss --- index.js | 39 ++++++++++++++++---------- test/connection.spec.js | 57 +++++++++++++++++++++++++++++++++++++ test/node_redis.spec.js | 43 ++++++++++++++++++++++++++-- test/pubsub.spec.js | 62 ++++++++++++++++++++++------------------- 4 files changed, 154 insertions(+), 47 deletions(-) diff --git a/index.js b/index.js index 2975d9cc58..2f840ecfed 100644 --- a/index.js +++ b/index.js @@ -162,23 +162,19 @@ RedisClient.prototype.unref = function () { } }; -// flush offline_queue and command_queue, erroring any items with a callback first -RedisClient.prototype.flush_and_error = function (error) { +// flush provided queues, erroring any items with a callback first +RedisClient.prototype.flush_and_error = function (error, queue_names) { var command_obj; - while (command_obj = this.offline_queue.shift()) { - if (typeof command_obj.callback === 'function') { - error.command = command_obj.command.toUpperCase(); - command_obj.callback(error); + queue_names = queue_names || ['offline_queue', 'command_queue']; + for (var i = 0; i < queue_names.length; i++) { + while (command_obj = this[queue_names[i]].shift()) { + if (typeof command_obj.callback === 'function') { + error.command = command_obj.command.toUpperCase(); + command_obj.callback(error); + } } + this[queue_names[i]] = new Queue(); } - while (command_obj = this.command_queue.shift()) { - if (typeof command_obj.callback === 'function') { - error.command = command_obj.command.toUpperCase(); - command_obj.callback(error); - } - } - this.offline_queue = new Queue(); - this.command_queue = new Queue(); }; RedisClient.prototype.on_error = function (err) { @@ -477,6 +473,7 @@ var retry_connection = function (self) { }; RedisClient.prototype.connection_gone = function (why) { + var error; // If a retry is already in progress, just let that happen if (this.retry_timer) { return; @@ -515,7 +512,7 @@ RedisClient.prototype.connection_gone = function (why) { var message = this.retry_totaltime >= this.connect_timeout ? 'connection timeout exceeded.' : 'maximum connection attempts exceeded.'; - var error = new Error('Redis connection in broken state: ' + message); + error = new Error('Redis connection in broken state: ' + message); error.code = 'CONNECTION_BROKEN'; this.flush_and_error(error); this.emit('error', error); @@ -523,6 +520,18 @@ RedisClient.prototype.connection_gone = function (why) { return; } + // Flush all commands that have not yet returned. We can't handle them appropriatly + if (this.command_queue.length !== 0) { + error = new Error('Redis connection lost and command aborted in uncertain state. It might have been processed.'); + error.code = 'UNCERTAIN_STATE'; + // TODO: Evaluate to add this + // if (this.options.retry_commands) { + // this.offline_queue.unshift(this.command_queue.toArray()); + // error.message = 'Command aborted in uncertain state and queued for next connection.'; + // } + this.flush_and_error(error, ['command_queue']); + } + if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) { this.retry_delay = this.retry_max_delay; } else if (this.retry_totaltime + this.retry_delay > this.connect_timeout) { diff --git a/test/connection.spec.js b/test/connection.spec.js index 0ea6e16613..1fda737951 100644 --- a/test/connection.spec.js +++ b/test/connection.spec.js @@ -236,6 +236,63 @@ describe("connection tests", function () { }); } + it("redis still loading <= 1000ms", function (done) { + client = redis.createClient.apply(redis.createClient, args); + var tmp = client.info.bind(client); + var end = helper.callFuncAfter(done, 3); + var delayed = false; + var time; + // Mock original function and pretent redis is still loading + client.info = function (cb) { + tmp(function(err, res) { + if (!delayed) { + assert(!err); + res = res.toString().replace(/loading:0/, 'loading:1\r\nloading_eta_seconds:0.5'); + delayed = true; + time = Date.now(); + } + end(); + cb(err, res); + }); + }; + client.on("ready", function () { + var rest = Date.now() - time; + // Be on the safe side and accept 100ms above the original value + assert(rest - 100 < 500 && rest >= 500); + assert(delayed); + end(); + }); + }); + + it("redis still loading > 1000ms", function (done) { + client = redis.createClient.apply(redis.createClient, args); + var tmp = client.info.bind(client); + var end = helper.callFuncAfter(done, 3); + var delayed = false; + var time; + // Mock original function and pretent redis is still loading + client.info = function (cb) { + tmp(function(err, res) { + if (!delayed) { + assert(!err); + // Try reconnecting after one second even if redis tells us the time needed is above one second + res = res.toString().replace(/loading:0/, 'loading:1\r\nloading_eta_seconds:2.5'); + delayed = true; + time = Date.now(); + } + end(); + cb(err, res); + }); + }; + client.on("ready", function () { + var rest = Date.now() - time; + // Be on the safe side and accept 100ms above the original value + assert(rest - 100 < 1000 && rest >= 1000); + assert(delayed); + end(); + }); + }); + }); }); diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index 84ef29b3e9..4650a0a333 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -556,6 +556,43 @@ describe("The node_redis client", function () { assert.strictEqual(client.offline_queue.length, 0); }); }); + + it("flushes the command queue if connection is lost", function (done) { + client = redis.createClient({ + parser: parser + }); + + client.once('ready', function() { + var multi = client.multi(); + multi.config("bar"); + var cb = function(err, reply) { + assert.equal(err.code, 'UNCERTAIN_STATE'); + }; + for (var i = 0; i < 12; i += 3) { + client.set("foo" + i, "bar" + i); + multi.set("foo" + (i + 1), "bar" + (i + 1), cb); + multi.set("foo" + (i + 2), "bar" + (i + 2)); + } + multi.exec(); + assert.equal(client.command_queue.length, 15); + helper.killConnection(client); + }); + + client.on("reconnecting", function (params) { + assert.equal(client.command_queue.length, 15); + }); + + client.on('error', function(err) { + if (/uncertain state/.test(err.message)) { + assert.equal(client.command_queue.length, 0); + done(); + } else { + assert.equal(err.code, 'ECONNREFUSED'); + assert.equal(err.errno, 'ECONNREFUSED'); + assert.equal(err.syscall, 'connect'); + } + }); + }); }); describe('false', function () { @@ -599,7 +636,7 @@ describe("The node_redis client", function () { }); }); - it("flushes the command queue connection if in broken connection mode", function (done) { + it("flushes the command queue if connection is lost", function (done) { client = redis.createClient({ parser: parser, max_attempts: 2, @@ -610,7 +647,7 @@ describe("The node_redis client", function () { var multi = client.multi(); multi.config("bar"); var cb = function(err, reply) { - assert.equal(err.code, 'CONNECTION_BROKEN'); + assert.equal(err.code, 'UNCERTAIN_STATE'); }; for (var i = 0; i < 12; i += 3) { client.set("foo" + i, "bar" + i); @@ -627,7 +664,7 @@ describe("The node_redis client", function () { }); client.on('error', function(err) { - if (/Redis connection in broken state:/.test(err.message)) { + if (err.code === 'UNCERTAIN_STATE') { assert.equal(client.command_queue.length, 0); done(); } else { diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index f7dbd57fe4..545f85b6c8 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -317,6 +317,39 @@ describe("publish/subscribe", function () { }); }); + it("should not publish a message multiple times per command", function (done) { + var published = {}; + + function subscribe(message) { + sub.removeAllListeners('subscribe'); + sub.removeAllListeners('message'); + sub.removeAllListeners('unsubscribe'); + sub.on('subscribe', function () { + pub.publish('/foo', message); + }); + sub.on('message', function (channel, message) { + if (published[message]) { + done(new Error('Message published more than once.')); + } + published[message] = true; + }); + sub.on('unsubscribe', function (channel, count) { + assert.strictEqual(count, 0); + }); + sub.subscribe('/foo'); + } + + subscribe('hello'); + + setTimeout(function () { + sub.unsubscribe(); + setTimeout(function () { + subscribe('world'); + setTimeout(done, 50); + }, 40); + }, 40); + }); + // TODO: Fix pub sub // And there's more than just those two issues describe.skip('FIXME: broken pub sub', function () { @@ -331,35 +364,6 @@ describe("publish/subscribe", function () { }); setTimeout(done, 200); }); - - it("should not publish a message multiple times per command", function (done) { - var published = {}; - - function subscribe(message) { - sub.on('subscribe', function () { - pub.publish('/foo', message); - }); - sub.on('message', function (channel, message) { - if (published[message]) { - done(new Error('Message published more than once.')); - } - published[message] = true; - }); - sub.on('unsubscribe', function (channel, count) { - assert.strictEqual(count, 0); - }); - sub.subscribe('/foo'); - } - - subscribe('hello'); - - setTimeout(function () { - sub.unsubscribe(); - setTimeout(function () { - subscribe('world'); - }, 40); - }, 40); - }); }); afterEach(function () {