1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Fix an issue with .multi after a reconnect on node 0.10

Add .path to .createClient options object for unix sockets
This commit is contained in:
Ruben Bridgewater
2015-10-29 23:21:08 +01:00
parent c3502c799f
commit d454e4025b
6 changed files with 113 additions and 87 deletions

View File

@@ -3,7 +3,7 @@ redis - a node.js redis client
[![Build Status](https://travis-ci.org/NodeRedis/node_redis.png)](https://travis-ci.org/NodeRedis/node_redis)
[![Coverage Status](https://coveralls.io/repos/NodeRedis/node_redis/badge.svg?branch=)](https://coveralls.io/r/NodeRedis/node_redis?branch=)
[![Windows Tests](https://ci.appveyor.com/api/projects/status/koc3xraik0xq3b56/branch/master?svg=true)](https://ci.appveyor.com/project/BridgeAR/node-redis/branch/master)
[![Windows Tests](https://ci.appveyor.com/api/projects/status/koc3xraik0xq3b56/branch/master?svg=true&label=Windows%20Tests)](https://ci.appveyor.com/project/BridgeAR/node-redis/branch/master)
[![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/NodeRedis/node_redis?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge)
This is a complete and feature rich Redis client for node.js. It supports all Redis commands and focuses on performance.
@@ -179,6 +179,7 @@ port and host are probably fine and you don't need to supply any arguments. `cre
#### `options` is an object with the following possible properties:
* `host`: *127.0.0.1*; The host to connect to
* `port`: *6370*; The port to connect to
* `path`: *null*; The unix socket string to connect to
* `parser`: *hiredis*; Which Redis protocol reply parser to use. If `hiredis` is not installed it will fallback to `javascript`.
* `return_buffers`: *false*; If set to `true`, then all replies will be sent to callbacks as Buffers instead of Strings.
* `detect_buffers`: *false*; If set to `true`, then replies will be sent to callbacks as Buffers

View File

@@ -14,6 +14,7 @@ Features
- Removed the high water mark and low water mark. Such a mechanism should be implemented by a user instead
- The `drain` event is from now on only emitted if the stream really had to buffer
- Reduced the default connect_timeout to be one hour instead of 24h ([@BridgeAR](https://github.com/BridgeAR))
- Added .path to redis.createClient(options); ([@BridgeAR](https://github.com/BridgeAR))
Bugfixes

122
index.js
View File

@@ -21,6 +21,7 @@ var debug = function(msg) {
};
function noop () {}
function clone (obj) { return JSON.parse(JSON.stringify(obj || {})); }
exports.debug_mode = /\bredis\b/i.test(process.env.NODE_DEBUG);
@@ -34,31 +35,22 @@ try {
parsers.push(require('./lib/parsers/javascript'));
function RedisClient(stream, options) {
function RedisClient(options) {
// Copy the options so they are not mutated
options = JSON.parse(JSON.stringify(options || {}));
options = clone(options);
events.EventEmitter.call(this);
var self = this;
this.pipeline = 0;
var cork;
if (!stream.cork) {
cork = function (len) {
self.pipeline = len;
self.pipeline_queue = new Queue(len);
};
this.uncork = noop;
var cnx_options = {};
if (options.path) {
cnx_options.path = options.path;
this.address = options.path;
} else {
cork = function (len) {
self.pipeline = len;
self.pipeline_queue = new Queue(len);
self.stream.cork();
};
cnx_options.port = options.port || default_port;
cnx_options.host = options.host || default_host;
cnx_options.family = options.family === 'IPv6' ? 6 : 4;
this.address = cnx_options.host + ':' + cnx_options.port;
}
this.once('ready', function () {
self.cork = cork;
});
this.stream = stream;
this.connection_option = cnx_options;
this.connection_id = ++connection_id;
this.connected = false;
this.ready = false;
@@ -69,11 +61,9 @@ function RedisClient(stream, options) {
if (options.socket_keepalive === undefined) {
options.socket_keepalive = true;
}
if (options.rename_commands) {
for (var command in options.rename_commands) { // jshint ignore: line
options.rename_commands[command.toLowerCase()] = options.rename_commands[command];
}
}
options.return_buffers = !!options.return_buffers;
options.detect_buffers = !!options.detect_buffers;
// Override the detect_buffers setting if return_buffers is active and print a warning
@@ -98,14 +88,15 @@ function RedisClient(stream, options) {
this.parser_module = null;
this.selected_db = null; // Save the selected db here, used when reconnecting
this.old_state = null;
this.pipeline = 0;
this.options = options;
this.install_stream_listeners();
events.EventEmitter.call(this);
self.stream = net.createConnection(cnx_options);
self.install_stream_listeners();
}
util.inherits(RedisClient, events.EventEmitter);
RedisClient.prototype.install_stream_listeners = function() {
RedisClient.prototype.install_stream_listeners = function () {
var self = this;
if (this.options.connect_timeout) {
@@ -144,9 +135,7 @@ RedisClient.prototype.install_stream_listeners = function() {
};
RedisClient.prototype.cork = noop;
RedisClient.prototype.uncork = function () {
this.stream.uncork();
};
RedisClient.prototype.uncork = noop;
RedisClient.prototype.initialize_retry_vars = function () {
this.retry_timer = null;
@@ -332,6 +321,24 @@ RedisClient.prototype.on_ready = function () {
this.old_state = null;
}
var cork;
if (!this.stream.cork) {
cork = function (len) {
self.pipeline = len;
self.pipeline_queue = new Queue(len);
};
} else {
cork = function (len) {
self.pipeline = len;
self.pipeline_queue = new Queue(len);
self.stream.cork();
};
this.uncork = function () {
self.stream.uncork();
};
}
this.cork = cork;
// magically restore any modal commands from a previous connection
if (this.selected_db !== null) {
// this trick works if and only if the following send_command
@@ -472,7 +479,7 @@ var retry_connection = function (self) {
self.attempts += 1;
self.retry_delay = Math.round(self.retry_delay * self.retry_backoff);
self.stream = net.createConnection(self.connectionOption);
self.stream = net.createConnection(self.connection_option);
self.install_stream_listeners();
self.retry_timer = null;
@@ -488,6 +495,9 @@ RedisClient.prototype.connection_gone = function (why) {
debug('Redis connection is gone from ' + why + ' event.');
this.connected = false;
this.ready = false;
// Deactivate cork to work with the offline queue
this.cork = noop;
this.pipeline = 0;
if (this.old_state === null) {
var state = {
@@ -1219,56 +1229,30 @@ Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = funct
return this._client.should_buffer;
};
var createClient_unix = function (path, options){
var cnxOptions = {
path: path
};
var net_client = net.createConnection(cnxOptions);
var redis_client = new RedisClient(net_client, options);
redis_client.connectionOption = cnxOptions;
redis_client.address = path;
return redis_client;
};
var createClient_tcp = function (port_arg, host_arg, options) {
var cnxOptions = {
port : port_arg || default_port,
host : host_arg || default_host,
family : options.family === 'IPv6' ? 6 : 4
};
var net_client = net.createConnection(cnxOptions);
var redis_client = new RedisClient(net_client, options);
redis_client.connectionOption = cnxOptions;
redis_client.address = cnxOptions.host + ':' + cnxOptions.port;
return redis_client;
};
var createClient = function (port_arg, host_arg, options) {
if (typeof port_arg === 'object' || port_arg === undefined) {
options = port_arg || options || {};
return createClient_tcp(+options.port, options.host, options);
}
if (typeof port_arg === 'number' || typeof port_arg === 'string' && /^\d+$/.test(port_arg)) {
return createClient_tcp(port_arg, host_arg, options || {});
}
if (typeof port_arg === 'string') {
options = host_arg || options || {};
} else if (typeof port_arg === 'number' || typeof port_arg === 'string' && /^\d+$/.test(port_arg)) {
options = clone(options);
options.host = host_arg;
options.port = port_arg;
} else if (typeof port_arg === 'string') {
options = clone(host_arg || options);
var parsed = URL.parse(port_arg, true, true);
if (parsed.hostname) {
if (parsed.auth) {
options.auth_pass = parsed.auth.split(':')[1];
}
return createClient_tcp(parsed.port, parsed.hostname, options);
options.host = parsed.hostname;
options.port = parsed.port;
} else {
options.path = port_arg;
}
return createClient_unix(port_arg, options);
}
if (!options) {
throw new Error('Unknown type of connection in createClient()');
}
return new RedisClient(options);
};
exports.createClient = createClient;

View File

@@ -65,6 +65,31 @@ describe("The 'multi' method", function () {
multi1.get('m1');
multi1.exec(done);
});
it("executes a pipelined multi properly after a reconnect in combination with the offline queue", function (done) {
client.once('ready', function () {
client.stream.destroy();
var called = false;
var multi1 = client.multi();
multi1.set("m1", "123");
multi1.get('m1');
multi1.exec(function (err, res) {
assert(!err);
called = true;
});
client.once('ready', function () {
var multi1 = client.multi();
multi1.set("m2", "456");
multi1.get('m2');
multi1.exec(function (err, res) {
assert(called);
assert(!err);
assert.strictEqual(res, '456');
done();
});
});
});
});
});
describe("when connection is broken", function () {

View File

@@ -4,7 +4,6 @@ var assert = require("assert");
var config = require("./lib/config");
var helper = require('./helper');
var redis = config.redis;
var net = require('net');
describe("connection tests", function () {
helper.allTests(function(parser, ip, args) {
@@ -68,7 +67,7 @@ describe("connection tests", function () {
client.on('error', function(err) {
if (/Redis connection in broken state: connection timeout.*?exceeded./.test(err.message)) {
setTimeout(function () {
assert(time === connect_timeout);
assert.strictEqual(time, connect_timeout);
done();
}, 500);
}
@@ -133,7 +132,9 @@ describe("connection tests", function () {
host: '192.168.74.167',
connect_timeout: connect_timeout
});
process.nextTick(function() {
assert(client.stream._events.timeout);
});
assert.strictEqual(client.address, '192.168.74.167:6379');
var time = Date.now();
@@ -153,7 +154,9 @@ describe("connection tests", function () {
parser: parser,
host: '192.168.74.167'
});
assert(client.stream._events.timeout === undefined);
process.nextTick(function() {
assert.strictEqual(client.stream._events.timeout, undefined);
});
});
it("clears the socket timeout after a connection has been established", function (done) {
@@ -161,10 +164,12 @@ describe("connection tests", function () {
parser: parser,
connect_timeout: 1000
});
process.nextTick(function() {
assert.strictEqual(client.stream._idleTimeout, 1000);
});
client.on('connect', function () {
assert.strictEqual(client.stream._idleTimeout, -1);
assert(client.stream._events.timeout === undefined);
assert.strictEqual(client.stream._events.timeout, undefined);
done();
});
});
@@ -182,6 +187,22 @@ describe("connection tests", function () {
});
});
it("connect with path provided in the options object", function (done) {
client = redis.createClient({
path: '/tmp/redis.sock',
parser: parser,
connect_timeout: 1000
});
var end = helper.callFuncAfter(done, 2);
client.once('ready', function() {
end();
});
client.set('foo', 'bar', end);
});
it("connects correctly with args", function (done) {
client = redis.createClient.apply(redis.createClient, args);
client.on("error", done);
@@ -247,13 +268,7 @@ describe("connection tests", function () {
it("works with missing options object for new redis instances", function () {
// This is needed for libraries that have their own createClient function like fakeredis
var cnxOptions = {
port : 6379,
host : '127.0.0.1',
family : ip === 'IPv6' ? 6 : 4
};
var net_client = net.createConnection(cnxOptions);
client = new redis.RedisClient(net_client);
client = new redis.RedisClient({ on: function () {}});
});
it("throws on strange connection info", function () {

View File

@@ -161,7 +161,7 @@ module.exports = {
},
killConnection: function (client) {
// Change the connection option to a non existing one and destroy the stream
client.connectionOption = {
client.connection_option = {
port: 65535,
host: '127.0.0.1',
family: 4