1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Fix fired but not yet returned commands not being rejected after a connection loss

This commit is contained in:
Ruben Bridgewater
2015-10-26 21:37:02 +01:00
parent ebea0872a9
commit 0ec2c43603
4 changed files with 154 additions and 47 deletions

View File

@@ -162,23 +162,19 @@ RedisClient.prototype.unref = function () {
} }
}; };
// flush offline_queue and command_queue, erroring any items with a callback first // flush provided queues, erroring any items with a callback first
RedisClient.prototype.flush_and_error = function (error) { RedisClient.prototype.flush_and_error = function (error, queue_names) {
var command_obj; var command_obj;
while (command_obj = this.offline_queue.shift()) { queue_names = queue_names || ['offline_queue', 'command_queue'];
if (typeof command_obj.callback === 'function') { for (var i = 0; i < queue_names.length; i++) {
error.command = command_obj.command.toUpperCase(); while (command_obj = this[queue_names[i]].shift()) {
command_obj.callback(error); if (typeof command_obj.callback === 'function') {
error.command = command_obj.command.toUpperCase();
command_obj.callback(error);
}
} }
this[queue_names[i]] = new Queue();
} }
while (command_obj = this.command_queue.shift()) {
if (typeof command_obj.callback === 'function') {
error.command = command_obj.command.toUpperCase();
command_obj.callback(error);
}
}
this.offline_queue = new Queue();
this.command_queue = new Queue();
}; };
RedisClient.prototype.on_error = function (err) { RedisClient.prototype.on_error = function (err) {
@@ -477,6 +473,7 @@ var retry_connection = function (self) {
}; };
RedisClient.prototype.connection_gone = function (why) { RedisClient.prototype.connection_gone = function (why) {
var error;
// If a retry is already in progress, just let that happen // If a retry is already in progress, just let that happen
if (this.retry_timer) { if (this.retry_timer) {
return; return;
@@ -515,7 +512,7 @@ RedisClient.prototype.connection_gone = function (why) {
var message = this.retry_totaltime >= this.connect_timeout ? var message = this.retry_totaltime >= this.connect_timeout ?
'connection timeout exceeded.' : 'connection timeout exceeded.' :
'maximum connection attempts exceeded.'; 'maximum connection attempts exceeded.';
var error = new Error('Redis connection in broken state: ' + message); error = new Error('Redis connection in broken state: ' + message);
error.code = 'CONNECTION_BROKEN'; error.code = 'CONNECTION_BROKEN';
this.flush_and_error(error); this.flush_and_error(error);
this.emit('error', error); this.emit('error', error);
@@ -523,6 +520,18 @@ RedisClient.prototype.connection_gone = function (why) {
return; return;
} }
// Flush all commands that have not yet returned. We can't handle them appropriatly
if (this.command_queue.length !== 0) {
error = new Error('Redis connection lost and command aborted in uncertain state. It might have been processed.');
error.code = 'UNCERTAIN_STATE';
// TODO: Evaluate to add this
// if (this.options.retry_commands) {
// this.offline_queue.unshift(this.command_queue.toArray());
// error.message = 'Command aborted in uncertain state and queued for next connection.';
// }
this.flush_and_error(error, ['command_queue']);
}
if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) { if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) {
this.retry_delay = this.retry_max_delay; this.retry_delay = this.retry_max_delay;
} else if (this.retry_totaltime + this.retry_delay > this.connect_timeout) { } else if (this.retry_totaltime + this.retry_delay > this.connect_timeout) {

View File

@@ -236,6 +236,63 @@ describe("connection tests", function () {
}); });
} }
it("redis still loading <= 1000ms", function (done) {
client = redis.createClient.apply(redis.createClient, args);
var tmp = client.info.bind(client);
var end = helper.callFuncAfter(done, 3);
var delayed = false;
var time;
// Mock original function and pretent redis is still loading
client.info = function (cb) {
tmp(function(err, res) {
if (!delayed) {
assert(!err);
res = res.toString().replace(/loading:0/, 'loading:1\r\nloading_eta_seconds:0.5');
delayed = true;
time = Date.now();
}
end();
cb(err, res);
});
};
client.on("ready", function () {
var rest = Date.now() - time;
// Be on the safe side and accept 100ms above the original value
assert(rest - 100 < 500 && rest >= 500);
assert(delayed);
end();
});
});
it("redis still loading > 1000ms", function (done) {
client = redis.createClient.apply(redis.createClient, args);
var tmp = client.info.bind(client);
var end = helper.callFuncAfter(done, 3);
var delayed = false;
var time;
// Mock original function and pretent redis is still loading
client.info = function (cb) {
tmp(function(err, res) {
if (!delayed) {
assert(!err);
// Try reconnecting after one second even if redis tells us the time needed is above one second
res = res.toString().replace(/loading:0/, 'loading:1\r\nloading_eta_seconds:2.5');
delayed = true;
time = Date.now();
}
end();
cb(err, res);
});
};
client.on("ready", function () {
var rest = Date.now() - time;
// Be on the safe side and accept 100ms above the original value
assert(rest - 100 < 1000 && rest >= 1000);
assert(delayed);
end();
});
});
}); });
}); });

View File

@@ -556,6 +556,43 @@ describe("The node_redis client", function () {
assert.strictEqual(client.offline_queue.length, 0); assert.strictEqual(client.offline_queue.length, 0);
}); });
}); });
it("flushes the command queue if connection is lost", function (done) {
client = redis.createClient({
parser: parser
});
client.once('ready', function() {
var multi = client.multi();
multi.config("bar");
var cb = function(err, reply) {
assert.equal(err.code, 'UNCERTAIN_STATE');
};
for (var i = 0; i < 12; i += 3) {
client.set("foo" + i, "bar" + i);
multi.set("foo" + (i + 1), "bar" + (i + 1), cb);
multi.set("foo" + (i + 2), "bar" + (i + 2));
}
multi.exec();
assert.equal(client.command_queue.length, 15);
helper.killConnection(client);
});
client.on("reconnecting", function (params) {
assert.equal(client.command_queue.length, 15);
});
client.on('error', function(err) {
if (/uncertain state/.test(err.message)) {
assert.equal(client.command_queue.length, 0);
done();
} else {
assert.equal(err.code, 'ECONNREFUSED');
assert.equal(err.errno, 'ECONNREFUSED');
assert.equal(err.syscall, 'connect');
}
});
});
}); });
describe('false', function () { describe('false', function () {
@@ -599,7 +636,7 @@ describe("The node_redis client", function () {
}); });
}); });
it("flushes the command queue connection if in broken connection mode", function (done) { it("flushes the command queue if connection is lost", function (done) {
client = redis.createClient({ client = redis.createClient({
parser: parser, parser: parser,
max_attempts: 2, max_attempts: 2,
@@ -610,7 +647,7 @@ describe("The node_redis client", function () {
var multi = client.multi(); var multi = client.multi();
multi.config("bar"); multi.config("bar");
var cb = function(err, reply) { var cb = function(err, reply) {
assert.equal(err.code, 'CONNECTION_BROKEN'); assert.equal(err.code, 'UNCERTAIN_STATE');
}; };
for (var i = 0; i < 12; i += 3) { for (var i = 0; i < 12; i += 3) {
client.set("foo" + i, "bar" + i); client.set("foo" + i, "bar" + i);
@@ -627,7 +664,7 @@ describe("The node_redis client", function () {
}); });
client.on('error', function(err) { client.on('error', function(err) {
if (/Redis connection in broken state:/.test(err.message)) { if (err.code === 'UNCERTAIN_STATE') {
assert.equal(client.command_queue.length, 0); assert.equal(client.command_queue.length, 0);
done(); done();
} else { } else {

View File

@@ -317,6 +317,39 @@ describe("publish/subscribe", function () {
}); });
}); });
it("should not publish a message multiple times per command", function (done) {
var published = {};
function subscribe(message) {
sub.removeAllListeners('subscribe');
sub.removeAllListeners('message');
sub.removeAllListeners('unsubscribe');
sub.on('subscribe', function () {
pub.publish('/foo', message);
});
sub.on('message', function (channel, message) {
if (published[message]) {
done(new Error('Message published more than once.'));
}
published[message] = true;
});
sub.on('unsubscribe', function (channel, count) {
assert.strictEqual(count, 0);
});
sub.subscribe('/foo');
}
subscribe('hello');
setTimeout(function () {
sub.unsubscribe();
setTimeout(function () {
subscribe('world');
setTimeout(done, 50);
}, 40);
}, 40);
});
// TODO: Fix pub sub // TODO: Fix pub sub
// And there's more than just those two issues // And there's more than just those two issues
describe.skip('FIXME: broken pub sub', function () { describe.skip('FIXME: broken pub sub', function () {
@@ -331,35 +364,6 @@ describe("publish/subscribe", function () {
}); });
setTimeout(done, 200); setTimeout(done, 200);
}); });
it("should not publish a message multiple times per command", function (done) {
var published = {};
function subscribe(message) {
sub.on('subscribe', function () {
pub.publish('/foo', message);
});
sub.on('message', function (channel, message) {
if (published[message]) {
done(new Error('Message published more than once.'));
}
published[message] = true;
});
sub.on('unsubscribe', function (channel, count) {
assert.strictEqual(count, 0);
});
sub.subscribe('/foo');
}
subscribe('hello');
setTimeout(function () {
sub.unsubscribe();
setTimeout(function () {
subscribe('world');
}, 40);
}, 40);
});
}); });
afterEach(function () { afterEach(function () {