From aa45e2c1404b65258b5f91b60c19c8d8f1f16349 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Sun, 27 Mar 2016 04:21:02 +0200 Subject: [PATCH] Make private api private functions --- index.js | 338 ++++++++++++++++--------------------- lib/individualFunctions.js | 56 ++++++ 2 files changed, 201 insertions(+), 193 deletions(-) create mode 100644 lib/individualFunctions.js diff --git a/index.js b/index.js index 5245f824c1..56fa3ee388 100644 --- a/index.js +++ b/index.js @@ -128,7 +128,7 @@ function RedisClient (options, stream) { 'This replaces the max_attempts and retry_max_delay option.' ); } - this.initialize_retry_vars(); + initialize_retry_vars(this); this.pub_sub_mode = 0; this.subscription_set = {}; this.monitoring = false; @@ -148,13 +148,13 @@ function RedisClient (options, stream) { self.return_reply(data); }, returnError: function (err) { - self.return_error(err); + return_error(self, err); }, returnFatalError: function (err) { // Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again self.flush_and_error(err, ['command_queue']); self.stream.destroy(); - self.return_error(err); + return_error(self, err); }, returnBuffers: this.buffers, name: options.parser, @@ -208,7 +208,7 @@ RedisClient.prototype.create_stream = function () { if (this.options.connect_timeout) { this.stream.setTimeout(this.connect_timeout, function () { self.retry_totaltime = self.connect_timeout; - self.connection_gone('timeout', new Error('Redis connection gone from timeout event')); + connection_gone(self, 'timeout', new Error('Redis connection gone from timeout event')); }); } @@ -217,36 +217,36 @@ RedisClient.prototype.create_stream = function () { this.stream.once(connect_event, function () { this.removeAllListeners('timeout'); self.times_connected++; - self.on_connect(); + on_connect(self); }); this.stream.on('data', function (buffer_from_socket) { // The buffer_from_socket.toString() has a significant impact on big chunks and therefor this should only be used if necessary debug('Net read ' + self.address + ' id ' + self.connection_id); // + ': ' + buffer_from_socket.toString()); self.reply_parser.execute(buffer_from_socket); - self.emit_idle(); + emit_idle(self); }); this.stream.on('error', function (err) { - self.on_error(err); + on_error(self, err); }); /* istanbul ignore next: difficult to test and not important as long as we keep this listener */ this.stream.on('clientError', function (err) { debug('clientError occured'); - self.on_error(err); + on_error(self, err); }); this.stream.once('close', function (hadError) { - self.connection_gone('close', new Error('Stream connection closed' + (hadError ? ' because of a transmission error' : ''))); + connection_gone(self, 'close', new Error('Stream connection closed' + (hadError ? ' because of a transmission error' : ''))); }); this.stream.once('end', function () { - self.connection_gone('end', new Error('Stream connection ended')); + connection_gone(self, 'end', new Error('Stream connection ended')); }); this.stream.on('drain', function () { - self.drain(); + drain(self); }); if (this.options.socket_nodelay) { @@ -269,36 +269,13 @@ RedisClient.prototype.handle_reply = function (reply, command) { RedisClient.prototype.cork = noop; RedisClient.prototype.uncork = noop; -RedisClient.prototype.duplicate = function (options) { - var existing_options = utils.clone(this.options); - options = utils.clone(options); - for (var elem in options) { // jshint ignore: line - existing_options[elem] = options[elem]; - } - var client = new RedisClient(existing_options); - client.selected_db = this.selected_db; - return client; -}; - -RedisClient.prototype.initialize_retry_vars = function () { - this.retry_timer = null; - this.retry_totaltime = 0; - this.retry_delay = 200; - this.retry_backoff = 1.7; - this.attempts = 1; -}; - -RedisClient.prototype.unref = function () { - if (this.connected) { - debug("Unref'ing the socket connection"); - this.stream.unref(); - } else { - debug('Not connected yet, will unref later'); - this.once('connect', function () { - this.unref(); - }); - } -}; +function initialize_retry_vars (self) { + self.retry_timer = null; + self.retry_totaltime = 0; + self.retry_delay = 200; + self.retry_backoff = 1.7; + self.attempts = 1; +} RedisClient.prototype.warn = function (msg) { var self = this; @@ -327,52 +304,50 @@ RedisClient.prototype.flush_and_error = function (error, queue_names) { } }; -RedisClient.prototype.on_error = function (err) { - if (this.closing) { +function on_error (self, err) { + if (self.closing) { return; } - err.message = 'Redis connection to ' + this.address + ' failed - ' + err.message; + err.message = 'Redis connection to ' + self.address + ' failed - ' + err.message; debug(err.message); - this.connected = false; - this.ready = false; + self.connected = false; + self.ready = false; // Only emit the error if the retry_stategy option is not set - if (!this.options.retry_strategy) { - this.emit('error', err); + if (!self.options.retry_strategy) { + self.emit('error', err); } - // 'error' events get turned into exceptions if they aren't listened for. If the user handled this error + // 'error' events get turned into exceptions if they aren't listened for. If the user handled self error // then we should try to reconnect. - this.connection_gone('error', err); -}; + connection_gone(self, 'error', err); +} -RedisClient.prototype.on_connect = function () { - debug('Stream connected ' + this.address + ' id ' + this.connection_id); +function on_connect (self) { + debug('Stream connected ' + self.address + ' id ' + self.connection_id); - this.connected = true; - this.ready = false; - this.emitted_end = false; - this.stream.setKeepAlive(this.options.socket_keepalive); - this.stream.setTimeout(0); + self.connected = true; + self.ready = false; + self.emitted_end = false; + self.stream.setKeepAlive(self.options.socket_keepalive); + self.stream.setTimeout(0); - this.emit('connect'); - this.initialize_retry_vars(); + self.emit('connect'); + initialize_retry_vars(self); - if (this.options.no_ready_check) { - this.on_ready(); + if (self.options.no_ready_check) { + on_ready(self); } else { - this.ready_check(); + ready_check(self); } -}; +} -RedisClient.prototype.on_ready = function () { - var self = this; - - debug('on_ready called ' + this.address + ' id ' + this.connection_id); - this.ready = true; +function on_ready (self) { + debug('on_ready called ' + self.address + ' id ' + self.connection_id); + self.ready = true; var cork; - if (!this.stream.cork) { + if (!self.stream.cork) { cork = function (len) { self.pipeline = len; self.pipeline_queue = new Queue(len); @@ -383,27 +358,27 @@ RedisClient.prototype.on_ready = function () { self.pipeline_queue = new Queue(len); self.stream.cork(); }; - this.uncork = function () { + self.uncork = function () { self.stream.uncork(); }; } - this.cork = cork; + self.cork = cork; // Restore modal commands from previous connection. The order of the commands is important - if (this.selected_db !== undefined) { - this.send_command('select', [this.selected_db]); + if (self.selected_db !== undefined) { + self.send_command('select', [self.selected_db]); } - if (this.old_state !== null) { - this.monitoring = this.old_state.monitoring; - this.pub_sub_mode = this.old_state.pub_sub_mode; + if (self.old_state !== null) { + self.monitoring = self.old_state.monitoring; + self.pub_sub_mode = self.old_state.pub_sub_mode; } - if (this.monitoring) { // Monitor has to be fired before pub sub commands - this.send_command('monitor', []); + if (self.monitoring) { // Monitor has to be fired before pub sub commands + self.send_command('monitor', []); } - var callback_count = Object.keys(this.subscription_set).length; - if (!this.options.disable_resubscribing && callback_count) { + var callback_count = Object.keys(self.subscription_set).length; + if (!self.options.disable_resubscribing && callback_count) { // only emit 'ready' when all subscriptions were made again - // TODO: Remove the countdown for ready here. This is not coherent with all other modes and should therefor not be handled special + // TODO: Remove the countdown for ready here. self is not coherent with all other modes and should therefor not be handled special // We know we are ready as soon as all commands were fired var callback = function () { callback_count--; @@ -412,78 +387,77 @@ RedisClient.prototype.on_ready = function () { } }; debug('Sending pub/sub on_ready commands'); - for (var key in this.subscription_set) { // jshint ignore: line + for (var key in self.subscription_set) { // jshint ignore: line var command = key.slice(0, key.indexOf('_')); var args = self.subscription_set[key]; self.send_command(command, [args], callback); } - this.send_offline_queue(); + send_offline_queue(self); return; } - this.send_offline_queue(); - this.emit('ready'); -}; + send_offline_queue(self); + self.emit('ready'); +} -RedisClient.prototype.on_info_cmd = function (err, res) { +function on_info_cmd (self, err, res) { if (err) { if (err.message === "ERR unknown command 'info'") { - this.on_ready(); + on_ready(self); return; } err.message = 'Ready check failed: ' + err.message; - this.emit('error', err); + self.emit('error', err); return; } /* istanbul ignore if: some servers might not respond with any info data. This is just a safety check that is difficult to test */ if (!res) { debug('The info command returned without any data.'); - this.on_ready(); + on_ready(self); return; } - if (!this.server_info.loading || this.server_info.loading === '0') { + if (!self.server_info.loading || self.server_info.loading === '0') { // If the master_link_status exists but the link is not up, try again after 50 ms - if (this.server_info.master_link_status && this.server_info.master_link_status !== 'up') { - this.server_info.loading_eta_seconds = 0.05; + if (self.server_info.master_link_status && self.server_info.master_link_status !== 'up') { + self.server_info.loading_eta_seconds = 0.05; } else { // Eta loading should change debug('Redis server ready.'); - this.on_ready(); + on_ready(self); return; } } - var retry_time = +this.server_info.loading_eta_seconds * 1000; + var retry_time = +self.server_info.loading_eta_seconds * 1000; if (retry_time > 1000) { retry_time = 1000; } debug('Redis server still loading, trying again in ' + retry_time); setTimeout(function (self) { - self.ready_check(); - }, retry_time, this); -}; + ready_check(self); + }, retry_time, self); +} -RedisClient.prototype.ready_check = function () { - var self = this; +function ready_check (self) { debug('Checking server ready state...'); // Always fire this info command as first command even if other commands are already queued up - this.ready = true; - this.info(function (err, res) { - self.on_info_cmd(err, res); + self.ready = true; + self.info(function (err, res) { + on_info_cmd(self, err, res); }); - this.ready = false; -}; + self.ready = false; +} -RedisClient.prototype.send_offline_queue = function () { - for (var command_obj = this.offline_queue.shift(); command_obj; command_obj = this.offline_queue.shift()) { +function send_offline_queue (self) { + for (var command_obj = self.offline_queue.shift(); command_obj; command_obj = self.offline_queue.shift()) { debug('Sending offline command: ' + command_obj.command); - this.send_command(command_obj.command, command_obj.args, command_obj.callback); + self.send_command(command_obj.command, command_obj.args, command_obj.callback); } - this.drain(); + drain(self); // Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue - this.offline_queue = new Queue(); -}; + self.offline_queue = new Queue(); +} var retry_connection = function (self, error) { debug('Retrying connection...'); @@ -503,97 +477,97 @@ var retry_connection = function (self, error) { self.retry_timer = null; }; -RedisClient.prototype.connection_gone = function (why, error) { +function connection_gone (self, why, error) { // If a retry is already in progress, just let that happen - if (this.retry_timer) { + if (self.retry_timer) { return; } debug('Redis connection is gone from ' + why + ' event.'); - this.connected = false; - this.ready = false; + self.connected = false; + self.ready = false; // Deactivate cork to work with the offline queue - this.cork = noop; - this.pipeline = 0; + self.cork = noop; + self.pipeline = 0; var state = { - monitoring: this.monitoring, - pub_sub_mode: this.pub_sub_mode + monitoring: self.monitoring, + pub_sub_mode: self.pub_sub_mode }; - this.old_state = state; - this.monitoring = false; - this.pub_sub_mode = 0; + self.old_state = state; + self.monitoring = false; + self.pub_sub_mode = 0; // since we are collapsing end and close, users don't expect to be called twice - if (!this.emitted_end) { - this.emit('end'); - this.emitted_end = true; + if (!self.emitted_end) { + self.emit('end'); + self.emitted_end = true; } // If this is a requested shutdown, then don't retry - if (this.closing) { + if (self.closing) { debug('Connection ended from quit command, not retrying.'); - this.flush_and_error(new Error('Redis connection gone from ' + why + ' event.')); + self.flush_and_error(new Error('Redis connection gone from ' + why + ' event.')); return; } - if (typeof this.options.retry_strategy === 'function') { - this.retry_delay = this.options.retry_strategy({ - attempt: this.attempts, + if (typeof self.options.retry_strategy === 'function') { + self.retry_delay = self.options.retry_strategy({ + attempt: self.attempts, error: error, - total_retry_time: this.retry_totaltime, - times_connected: this.times_connected + total_retry_time: self.retry_totaltime, + times_connected: self.times_connected }); - if (typeof this.retry_delay !== 'number') { + if (typeof self.retry_delay !== 'number') { // Pass individual error through - if (this.retry_delay instanceof Error) { - error = this.retry_delay; + if (self.retry_delay instanceof Error) { + error = self.retry_delay; } - this.flush_and_error(error); - this.emit('error', error); - this.end(false); + self.flush_and_error(error); + self.emit('error', error); + self.end(false); return; } } - if (this.max_attempts !== 0 && this.attempts >= this.max_attempts || this.retry_totaltime >= this.connect_timeout) { - var message = this.retry_totaltime >= this.connect_timeout ? + if (self.max_attempts !== 0 && self.attempts >= self.max_attempts || self.retry_totaltime >= self.connect_timeout) { + var message = self.retry_totaltime >= self.connect_timeout ? 'connection timeout exceeded.' : 'maximum connection attempts exceeded.'; error = new Error('Redis connection in broken state: ' + message); error.code = 'CONNECTION_BROKEN'; - this.flush_and_error(error); - this.emit('error', error); - this.end(false); + self.flush_and_error(error); + self.emit('error', error); + self.end(false); return; } // Retry commands after a reconnect instead of throwing an error. Use this with caution - if (this.options.retry_unfulfilled_commands) { - this.offline_queue.unshift.apply(this.offline_queue, this.command_queue.toArray()); - this.command_queue.clear(); - } else if (this.command_queue.length !== 0) { + if (self.options.retry_unfulfilled_commands) { + self.offline_queue.unshift.apply(self.offline_queue, self.command_queue.toArray()); + self.command_queue.clear(); + } else if (self.command_queue.length !== 0) { error = new Error('Redis connection lost and command aborted in uncertain state. It might have been processed.'); error.code = 'UNCERTAIN_STATE'; - this.flush_and_error(error, ['command_queue']); + self.flush_and_error(error, ['command_queue']); error.message = 'Redis connection lost and commands aborted in uncertain state. They might have been processed.'; - this.emit('error', error); + self.emit('error', error); } - if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) { - this.retry_delay = this.retry_max_delay; - } else if (this.retry_totaltime + this.retry_delay > this.connect_timeout) { + if (self.retry_max_delay !== null && self.retry_delay > self.retry_max_delay) { + self.retry_delay = self.retry_max_delay; + } else if (self.retry_totaltime + self.retry_delay > self.connect_timeout) { // Do not exceed the maximum - this.retry_delay = this.connect_timeout - this.retry_totaltime; + self.retry_delay = self.connect_timeout - self.retry_totaltime; } - debug('Retry connection in ' + this.retry_delay + ' ms'); + debug('Retry connection in ' + self.retry_delay + ' ms'); - this.retry_timer = setTimeout(retry_connection, this.retry_delay, this, error); -}; + self.retry_timer = setTimeout(retry_connection, self.retry_delay, self, error); +} -RedisClient.prototype.return_error = function (err) { - var command_obj = this.command_queue.shift(); +function return_error (self, err) { + var command_obj = self.command_queue.shift(); if (command_obj && command_obj.command && command_obj.command.toUpperCase) { err.command = command_obj.command.toUpperCase(); } @@ -604,19 +578,19 @@ RedisClient.prototype.return_error = function (err) { err.code = match[1]; } - utils.callback_or_emit(this, command_obj && command_obj.callback, err); -}; + utils.callback_or_emit(self, command_obj && command_obj.callback, err); +} -RedisClient.prototype.drain = function () { - this.emit('drain'); - this.should_buffer = false; -}; +function drain (self) { + self.emit('drain'); + self.should_buffer = false; +} -RedisClient.prototype.emit_idle = function () { - if (this.command_queue.length === 0 && this.pub_sub_mode === 0) { - this.emit('idle'); +function emit_idle (self) { + if (self.command_queue.length === 0 && self.pub_sub_mode === 0) { + self.emit('idle'); } -}; +} function normal_reply (self, reply) { var command_obj = self.command_queue.shift(); @@ -925,29 +899,6 @@ RedisClient.prototype.write = function (data) { return; }; -RedisClient.prototype.end = function (flush) { - // Flush queue if wanted - if (flush) { - this.flush_and_error(new Error("The command can't be processed. The connection has already been closed.")); - } else if (arguments.length === 0) { - this.warn( - 'Using .end() without the flush parameter is deprecated and throws from v.3.0.0 on.\n' + - 'Please check the doku (https://github.com/NodeRedis/node_redis) and explictly use flush.' - ); - } - // Clear retry_timer - if (this.retry_timer) { - clearTimeout(this.retry_timer); - this.retry_timer = null; - } - this.stream.removeAllListeners(); - this.stream.on('error', noop); - this.connected = false; - this.ready = false; - this.closing = true; - return this.stream.destroySoon(); -}; - exports.createClient = function () { return new RedisClient(unifyOptions.apply(null, arguments)); }; @@ -955,6 +906,7 @@ exports.RedisClient = RedisClient; exports.print = utils.print; exports.Multi = require('./lib/multi'); -// Add all redis commands to the client +// Add all redis commands etc to the RedisClient require('./lib/individualCommands'); +require('./lib/individualFunctions'); require('./lib/commands'); diff --git a/lib/individualFunctions.js b/lib/individualFunctions.js new file mode 100644 index 0000000000..47c9682cae --- /dev/null +++ b/lib/individualFunctions.js @@ -0,0 +1,56 @@ +'use strict'; + +var RedisClient = require('../').RedisClient; +var debug = require('./debug'); +var utils = require('./utils'); +var noop = function () {}; + +/******************************** +Additional individual exposed API +********************************/ + +RedisClient.prototype.duplicate = function (options) { + var existing_options = utils.clone(this.options); + options = utils.clone(options); + for (var elem in options) { // jshint ignore: line + existing_options[elem] = options[elem]; + } + var client = new RedisClient(existing_options); + client.selected_db = this.selected_db; + return client; +}; + +RedisClient.prototype.unref = function () { + if (this.connected) { + debug("Unref'ing the socket connection"); + this.stream.unref(); + } else { + debug('Not connected yet, will unref later'); + this.once('connect', function () { + this.unref(); + }); + } +}; + +RedisClient.prototype.end = function (flush) { + // Flush queue if wanted + if (flush) { + this.flush_and_error(new Error("The command can't be processed. The connection has already been closed.")); + } else if (arguments.length === 0) { + this.warn( + 'Using .end() without the flush parameter is deprecated and throws from v.3.0.0 on.\n' + + 'Please check the doku (https://github.com/NodeRedis/node_redis) and explictly use flush.' + ); + } + // Clear retry_timer + if (this.retry_timer) { + clearTimeout(this.retry_timer); + this.retry_timer = null; + } + this.stream.removeAllListeners(); + this.stream.on('error', noop); + this.connected = false; + this.ready = false; + this.closing = true; + return this.stream.destroySoon(); +};