1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Make private api private functions

This commit is contained in:
Ruben Bridgewater
2016-03-27 04:21:02 +02:00
parent 3704ebdd9d
commit aa45e2c140
2 changed files with 201 additions and 193 deletions

338
index.js
View File

@@ -128,7 +128,7 @@ function RedisClient (options, stream) {
'This replaces the max_attempts and retry_max_delay option.'
);
}
this.initialize_retry_vars();
initialize_retry_vars(this);
this.pub_sub_mode = 0;
this.subscription_set = {};
this.monitoring = false;
@@ -148,13 +148,13 @@ function RedisClient (options, stream) {
self.return_reply(data);
},
returnError: function (err) {
self.return_error(err);
return_error(self, err);
},
returnFatalError: function (err) {
// Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again
self.flush_and_error(err, ['command_queue']);
self.stream.destroy();
self.return_error(err);
return_error(self, err);
},
returnBuffers: this.buffers,
name: options.parser,
@@ -208,7 +208,7 @@ RedisClient.prototype.create_stream = function () {
if (this.options.connect_timeout) {
this.stream.setTimeout(this.connect_timeout, function () {
self.retry_totaltime = self.connect_timeout;
self.connection_gone('timeout', new Error('Redis connection gone from timeout event'));
connection_gone(self, 'timeout', new Error('Redis connection gone from timeout event'));
});
}
@@ -217,36 +217,36 @@ RedisClient.prototype.create_stream = function () {
this.stream.once(connect_event, function () {
this.removeAllListeners('timeout');
self.times_connected++;
self.on_connect();
on_connect(self);
});
this.stream.on('data', function (buffer_from_socket) {
// The buffer_from_socket.toString() has a significant impact on big chunks and therefor this should only be used if necessary
debug('Net read ' + self.address + ' id ' + self.connection_id); // + ': ' + buffer_from_socket.toString());
self.reply_parser.execute(buffer_from_socket);
self.emit_idle();
emit_idle(self);
});
this.stream.on('error', function (err) {
self.on_error(err);
on_error(self, err);
});
/* istanbul ignore next: difficult to test and not important as long as we keep this listener */
this.stream.on('clientError', function (err) {
debug('clientError occured');
self.on_error(err);
on_error(self, err);
});
this.stream.once('close', function (hadError) {
self.connection_gone('close', new Error('Stream connection closed' + (hadError ? ' because of a transmission error' : '')));
connection_gone(self, 'close', new Error('Stream connection closed' + (hadError ? ' because of a transmission error' : '')));
});
this.stream.once('end', function () {
self.connection_gone('end', new Error('Stream connection ended'));
connection_gone(self, 'end', new Error('Stream connection ended'));
});
this.stream.on('drain', function () {
self.drain();
drain(self);
});
if (this.options.socket_nodelay) {
@@ -269,36 +269,13 @@ RedisClient.prototype.handle_reply = function (reply, command) {
RedisClient.prototype.cork = noop;
RedisClient.prototype.uncork = noop;
RedisClient.prototype.duplicate = function (options) {
var existing_options = utils.clone(this.options);
options = utils.clone(options);
for (var elem in options) { // jshint ignore: line
existing_options[elem] = options[elem];
}
var client = new RedisClient(existing_options);
client.selected_db = this.selected_db;
return client;
};
RedisClient.prototype.initialize_retry_vars = function () {
this.retry_timer = null;
this.retry_totaltime = 0;
this.retry_delay = 200;
this.retry_backoff = 1.7;
this.attempts = 1;
};
RedisClient.prototype.unref = function () {
if (this.connected) {
debug("Unref'ing the socket connection");
this.stream.unref();
} else {
debug('Not connected yet, will unref later');
this.once('connect', function () {
this.unref();
});
}
};
function initialize_retry_vars (self) {
self.retry_timer = null;
self.retry_totaltime = 0;
self.retry_delay = 200;
self.retry_backoff = 1.7;
self.attempts = 1;
}
RedisClient.prototype.warn = function (msg) {
var self = this;
@@ -327,52 +304,50 @@ RedisClient.prototype.flush_and_error = function (error, queue_names) {
}
};
RedisClient.prototype.on_error = function (err) {
if (this.closing) {
function on_error (self, err) {
if (self.closing) {
return;
}
err.message = 'Redis connection to ' + this.address + ' failed - ' + err.message;
err.message = 'Redis connection to ' + self.address + ' failed - ' + err.message;
debug(err.message);
this.connected = false;
this.ready = false;
self.connected = false;
self.ready = false;
// Only emit the error if the retry_stategy option is not set
if (!this.options.retry_strategy) {
this.emit('error', err);
if (!self.options.retry_strategy) {
self.emit('error', err);
}
// 'error' events get turned into exceptions if they aren't listened for. If the user handled this error
// 'error' events get turned into exceptions if they aren't listened for. If the user handled self error
// then we should try to reconnect.
this.connection_gone('error', err);
};
connection_gone(self, 'error', err);
}
RedisClient.prototype.on_connect = function () {
debug('Stream connected ' + this.address + ' id ' + this.connection_id);
function on_connect (self) {
debug('Stream connected ' + self.address + ' id ' + self.connection_id);
this.connected = true;
this.ready = false;
this.emitted_end = false;
this.stream.setKeepAlive(this.options.socket_keepalive);
this.stream.setTimeout(0);
self.connected = true;
self.ready = false;
self.emitted_end = false;
self.stream.setKeepAlive(self.options.socket_keepalive);
self.stream.setTimeout(0);
this.emit('connect');
this.initialize_retry_vars();
self.emit('connect');
initialize_retry_vars(self);
if (this.options.no_ready_check) {
this.on_ready();
if (self.options.no_ready_check) {
on_ready(self);
} else {
this.ready_check();
ready_check(self);
}
};
}
RedisClient.prototype.on_ready = function () {
var self = this;
debug('on_ready called ' + this.address + ' id ' + this.connection_id);
this.ready = true;
function on_ready (self) {
debug('on_ready called ' + self.address + ' id ' + self.connection_id);
self.ready = true;
var cork;
if (!this.stream.cork) {
if (!self.stream.cork) {
cork = function (len) {
self.pipeline = len;
self.pipeline_queue = new Queue(len);
@@ -383,27 +358,27 @@ RedisClient.prototype.on_ready = function () {
self.pipeline_queue = new Queue(len);
self.stream.cork();
};
this.uncork = function () {
self.uncork = function () {
self.stream.uncork();
};
}
this.cork = cork;
self.cork = cork;
// Restore modal commands from previous connection. The order of the commands is important
if (this.selected_db !== undefined) {
this.send_command('select', [this.selected_db]);
if (self.selected_db !== undefined) {
self.send_command('select', [self.selected_db]);
}
if (this.old_state !== null) {
this.monitoring = this.old_state.monitoring;
this.pub_sub_mode = this.old_state.pub_sub_mode;
if (self.old_state !== null) {
self.monitoring = self.old_state.monitoring;
self.pub_sub_mode = self.old_state.pub_sub_mode;
}
if (this.monitoring) { // Monitor has to be fired before pub sub commands
this.send_command('monitor', []);
if (self.monitoring) { // Monitor has to be fired before pub sub commands
self.send_command('monitor', []);
}
var callback_count = Object.keys(this.subscription_set).length;
if (!this.options.disable_resubscribing && callback_count) {
var callback_count = Object.keys(self.subscription_set).length;
if (!self.options.disable_resubscribing && callback_count) {
// only emit 'ready' when all subscriptions were made again
// TODO: Remove the countdown for ready here. This is not coherent with all other modes and should therefor not be handled special
// TODO: Remove the countdown for ready here. self is not coherent with all other modes and should therefor not be handled special
// We know we are ready as soon as all commands were fired
var callback = function () {
callback_count--;
@@ -412,78 +387,77 @@ RedisClient.prototype.on_ready = function () {
}
};
debug('Sending pub/sub on_ready commands');
for (var key in this.subscription_set) { // jshint ignore: line
for (var key in self.subscription_set) { // jshint ignore: line
var command = key.slice(0, key.indexOf('_'));
var args = self.subscription_set[key];
self.send_command(command, [args], callback);
}
this.send_offline_queue();
send_offline_queue(self);
return;
}
this.send_offline_queue();
this.emit('ready');
};
send_offline_queue(self);
self.emit('ready');
}
RedisClient.prototype.on_info_cmd = function (err, res) {
function on_info_cmd (self, err, res) {
if (err) {
if (err.message === "ERR unknown command 'info'") {
this.on_ready();
on_ready(self);
return;
}
err.message = 'Ready check failed: ' + err.message;
this.emit('error', err);
self.emit('error', err);
return;
}
/* istanbul ignore if: some servers might not respond with any info data. This is just a safety check that is difficult to test */
if (!res) {
debug('The info command returned without any data.');
this.on_ready();
on_ready(self);
return;
}
if (!this.server_info.loading || this.server_info.loading === '0') {
if (!self.server_info.loading || self.server_info.loading === '0') {
// If the master_link_status exists but the link is not up, try again after 50 ms
if (this.server_info.master_link_status && this.server_info.master_link_status !== 'up') {
this.server_info.loading_eta_seconds = 0.05;
if (self.server_info.master_link_status && self.server_info.master_link_status !== 'up') {
self.server_info.loading_eta_seconds = 0.05;
} else {
// Eta loading should change
debug('Redis server ready.');
this.on_ready();
on_ready(self);
return;
}
}
var retry_time = +this.server_info.loading_eta_seconds * 1000;
var retry_time = +self.server_info.loading_eta_seconds * 1000;
if (retry_time > 1000) {
retry_time = 1000;
}
debug('Redis server still loading, trying again in ' + retry_time);
setTimeout(function (self) {
self.ready_check();
}, retry_time, this);
};
ready_check(self);
}, retry_time, self);
}
RedisClient.prototype.ready_check = function () {
var self = this;
function ready_check (self) {
debug('Checking server ready state...');
// Always fire this info command as first command even if other commands are already queued up
this.ready = true;
this.info(function (err, res) {
self.on_info_cmd(err, res);
self.ready = true;
self.info(function (err, res) {
on_info_cmd(self, err, res);
});
this.ready = false;
};
self.ready = false;
}
RedisClient.prototype.send_offline_queue = function () {
for (var command_obj = this.offline_queue.shift(); command_obj; command_obj = this.offline_queue.shift()) {
function send_offline_queue (self) {
for (var command_obj = self.offline_queue.shift(); command_obj; command_obj = self.offline_queue.shift()) {
debug('Sending offline command: ' + command_obj.command);
this.send_command(command_obj.command, command_obj.args, command_obj.callback);
self.send_command(command_obj.command, command_obj.args, command_obj.callback);
}
this.drain();
drain(self);
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
this.offline_queue = new Queue();
};
self.offline_queue = new Queue();
}
var retry_connection = function (self, error) {
debug('Retrying connection...');
@@ -503,97 +477,97 @@ var retry_connection = function (self, error) {
self.retry_timer = null;
};
RedisClient.prototype.connection_gone = function (why, error) {
function connection_gone (self, why, error) {
// If a retry is already in progress, just let that happen
if (this.retry_timer) {
if (self.retry_timer) {
return;
}
debug('Redis connection is gone from ' + why + ' event.');
this.connected = false;
this.ready = false;
self.connected = false;
self.ready = false;
// Deactivate cork to work with the offline queue
this.cork = noop;
this.pipeline = 0;
self.cork = noop;
self.pipeline = 0;
var state = {
monitoring: this.monitoring,
pub_sub_mode: this.pub_sub_mode
monitoring: self.monitoring,
pub_sub_mode: self.pub_sub_mode
};
this.old_state = state;
this.monitoring = false;
this.pub_sub_mode = 0;
self.old_state = state;
self.monitoring = false;
self.pub_sub_mode = 0;
// since we are collapsing end and close, users don't expect to be called twice
if (!this.emitted_end) {
this.emit('end');
this.emitted_end = true;
if (!self.emitted_end) {
self.emit('end');
self.emitted_end = true;
}
// If this is a requested shutdown, then don't retry
if (this.closing) {
if (self.closing) {
debug('Connection ended from quit command, not retrying.');
this.flush_and_error(new Error('Redis connection gone from ' + why + ' event.'));
self.flush_and_error(new Error('Redis connection gone from ' + why + ' event.'));
return;
}
if (typeof this.options.retry_strategy === 'function') {
this.retry_delay = this.options.retry_strategy({
attempt: this.attempts,
if (typeof self.options.retry_strategy === 'function') {
self.retry_delay = self.options.retry_strategy({
attempt: self.attempts,
error: error,
total_retry_time: this.retry_totaltime,
times_connected: this.times_connected
total_retry_time: self.retry_totaltime,
times_connected: self.times_connected
});
if (typeof this.retry_delay !== 'number') {
if (typeof self.retry_delay !== 'number') {
// Pass individual error through
if (this.retry_delay instanceof Error) {
error = this.retry_delay;
if (self.retry_delay instanceof Error) {
error = self.retry_delay;
}
this.flush_and_error(error);
this.emit('error', error);
this.end(false);
self.flush_and_error(error);
self.emit('error', error);
self.end(false);
return;
}
}
if (this.max_attempts !== 0 && this.attempts >= this.max_attempts || this.retry_totaltime >= this.connect_timeout) {
var message = this.retry_totaltime >= this.connect_timeout ?
if (self.max_attempts !== 0 && self.attempts >= self.max_attempts || self.retry_totaltime >= self.connect_timeout) {
var message = self.retry_totaltime >= self.connect_timeout ?
'connection timeout exceeded.' :
'maximum connection attempts exceeded.';
error = new Error('Redis connection in broken state: ' + message);
error.code = 'CONNECTION_BROKEN';
this.flush_and_error(error);
this.emit('error', error);
this.end(false);
self.flush_and_error(error);
self.emit('error', error);
self.end(false);
return;
}
// Retry commands after a reconnect instead of throwing an error. Use this with caution
if (this.options.retry_unfulfilled_commands) {
this.offline_queue.unshift.apply(this.offline_queue, this.command_queue.toArray());
this.command_queue.clear();
} else if (this.command_queue.length !== 0) {
if (self.options.retry_unfulfilled_commands) {
self.offline_queue.unshift.apply(self.offline_queue, self.command_queue.toArray());
self.command_queue.clear();
} else if (self.command_queue.length !== 0) {
error = new Error('Redis connection lost and command aborted in uncertain state. It might have been processed.');
error.code = 'UNCERTAIN_STATE';
this.flush_and_error(error, ['command_queue']);
self.flush_and_error(error, ['command_queue']);
error.message = 'Redis connection lost and commands aborted in uncertain state. They might have been processed.';
this.emit('error', error);
self.emit('error', error);
}
if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) {
this.retry_delay = this.retry_max_delay;
} else if (this.retry_totaltime + this.retry_delay > this.connect_timeout) {
if (self.retry_max_delay !== null && self.retry_delay > self.retry_max_delay) {
self.retry_delay = self.retry_max_delay;
} else if (self.retry_totaltime + self.retry_delay > self.connect_timeout) {
// Do not exceed the maximum
this.retry_delay = this.connect_timeout - this.retry_totaltime;
self.retry_delay = self.connect_timeout - self.retry_totaltime;
}
debug('Retry connection in ' + this.retry_delay + ' ms');
debug('Retry connection in ' + self.retry_delay + ' ms');
this.retry_timer = setTimeout(retry_connection, this.retry_delay, this, error);
};
self.retry_timer = setTimeout(retry_connection, self.retry_delay, self, error);
}
RedisClient.prototype.return_error = function (err) {
var command_obj = this.command_queue.shift();
function return_error (self, err) {
var command_obj = self.command_queue.shift();
if (command_obj && command_obj.command && command_obj.command.toUpperCase) {
err.command = command_obj.command.toUpperCase();
}
@@ -604,19 +578,19 @@ RedisClient.prototype.return_error = function (err) {
err.code = match[1];
}
utils.callback_or_emit(this, command_obj && command_obj.callback, err);
};
utils.callback_or_emit(self, command_obj && command_obj.callback, err);
}
RedisClient.prototype.drain = function () {
this.emit('drain');
this.should_buffer = false;
};
function drain (self) {
self.emit('drain');
self.should_buffer = false;
}
RedisClient.prototype.emit_idle = function () {
if (this.command_queue.length === 0 && this.pub_sub_mode === 0) {
this.emit('idle');
function emit_idle (self) {
if (self.command_queue.length === 0 && self.pub_sub_mode === 0) {
self.emit('idle');
}
};
}
function normal_reply (self, reply) {
var command_obj = self.command_queue.shift();
@@ -925,29 +899,6 @@ RedisClient.prototype.write = function (data) {
return;
};
RedisClient.prototype.end = function (flush) {
// Flush queue if wanted
if (flush) {
this.flush_and_error(new Error("The command can't be processed. The connection has already been closed."));
} else if (arguments.length === 0) {
this.warn(
'Using .end() without the flush parameter is deprecated and throws from v.3.0.0 on.\n' +
'Please check the doku (https://github.com/NodeRedis/node_redis) and explictly use flush.'
);
}
// Clear retry_timer
if (this.retry_timer) {
clearTimeout(this.retry_timer);
this.retry_timer = null;
}
this.stream.removeAllListeners();
this.stream.on('error', noop);
this.connected = false;
this.ready = false;
this.closing = true;
return this.stream.destroySoon();
};
exports.createClient = function () {
return new RedisClient(unifyOptions.apply(null, arguments));
};
@@ -955,6 +906,7 @@ exports.RedisClient = RedisClient;
exports.print = utils.print;
exports.Multi = require('./lib/multi');
// Add all redis commands to the client
// Add all redis commands etc to the RedisClient
require('./lib/individualCommands');
require('./lib/individualFunctions');
require('./lib/commands');