You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Improve error handling
Added individual error classes Don't silently fail for commands without callback from now on General polishing (e.g. better error messages) Fix typos
This commit is contained in:
164
index.js
164
index.js
@@ -5,7 +5,7 @@ var tls = require('tls');
|
||||
var util = require('util');
|
||||
var utils = require('./lib/utils');
|
||||
var Queue = require('double-ended-queue');
|
||||
var CommandError = require('./lib/customError');
|
||||
var errorClasses = require('./lib/customErrors');
|
||||
var Command = require('./lib/command').Command;
|
||||
var OfflineCommand = require('./lib/command').OfflineCommand;
|
||||
var EventEmitter = require('events');
|
||||
@@ -189,13 +189,24 @@ function create_parser (self) {
|
||||
self.return_reply(data);
|
||||
},
|
||||
returnError: function (err) {
|
||||
self.return_error(err);
|
||||
// Return a ReplyError to indicate Redis returned an error
|
||||
self.return_error(new errorClasses.ReplyError(err));
|
||||
},
|
||||
returnFatalError: function (err) {
|
||||
// Error out all fired commands. Otherwise they might rely on faulty data. We have to reconnect to get in a working state again
|
||||
self.flush_and_error(err, ['command_queue']);
|
||||
self.stream.destroy();
|
||||
self.return_error(err);
|
||||
// Note: the execution order is important. First flush and emit, then create the stream
|
||||
err = new errorClasses.ReplyError(err);
|
||||
err.message += '. Please report this.';
|
||||
self.ready = false;
|
||||
self.flush_and_error({
|
||||
message: 'Fatal error encountert. Command aborted.',
|
||||
code: 'NR_FATAL'
|
||||
}, {
|
||||
error: err,
|
||||
queues: ['command_queue']
|
||||
});
|
||||
self.emit('error', err);
|
||||
self.create_stream();
|
||||
},
|
||||
returnBuffers: self.buffers || self.message_buffers,
|
||||
name: self.options.parser,
|
||||
@@ -240,7 +251,7 @@ RedisClient.prototype.create_stream = function () {
|
||||
this.stream.setTimeout(this.connect_timeout, function () {
|
||||
// Note: This is only tested if a internet connection is established
|
||||
self.retry_totaltime = self.connect_timeout;
|
||||
self.connection_gone('timeout', new Error('Redis connection gone from timeout event'));
|
||||
self.connection_gone('timeout');
|
||||
});
|
||||
}
|
||||
|
||||
@@ -270,11 +281,11 @@ RedisClient.prototype.create_stream = function () {
|
||||
});
|
||||
|
||||
this.stream.once('close', function (hadError) {
|
||||
self.connection_gone('close', hadError ? new Error('Stream connection closed with a transmission error') : null);
|
||||
self.connection_gone('close');
|
||||
});
|
||||
|
||||
this.stream.once('end', function () {
|
||||
self.connection_gone('end', null);
|
||||
self.connection_gone('end');
|
||||
});
|
||||
|
||||
this.stream.on('drain', function () {
|
||||
@@ -325,30 +336,46 @@ RedisClient.prototype.warn = function (msg) {
|
||||
};
|
||||
|
||||
// Flush provided queues, erroring any items with a callback first
|
||||
RedisClient.prototype.flush_and_error = function (error, queue_names) {
|
||||
var callbacks_not_called = [];
|
||||
queue_names = queue_names || ['offline_queue', 'command_queue'];
|
||||
RedisClient.prototype.flush_and_error = function (error_attributes, options) {
|
||||
options = options || {};
|
||||
var aggregated_errors = [];
|
||||
var queue_names = options.queues || ['command_queue', 'offline_queue']; // Flush the command_queue first to keep the order intakt
|
||||
for (var i = 0; i < queue_names.length; i++) {
|
||||
// If the command was fired it might have been processed so far
|
||||
if (queue_names[i] === 'command_queue') {
|
||||
error_attributes.message += ' It might have been processed.';
|
||||
} else { // As the command_queue is flushed first, remove this for the offline queue
|
||||
error_attributes.message = error_attributes.message.replace(' It might have been processed.', '');
|
||||
}
|
||||
// Don't flush everything from the queue
|
||||
for (var command_obj = this[queue_names[i]].shift(); command_obj; command_obj = this[queue_names[i]].shift()) {
|
||||
var err = new CommandError(error);
|
||||
var err = new errorClasses.AbortError(error_attributes);
|
||||
err.command = command_obj.command.toUpperCase();
|
||||
if (command_obj.args.length) {
|
||||
if (command_obj.args && command_obj.args.length) {
|
||||
err.args = command_obj.args;
|
||||
}
|
||||
if (options.error) {
|
||||
err.origin = options.error;
|
||||
}
|
||||
if (typeof command_obj.callback === 'function') {
|
||||
command_obj.callback(err);
|
||||
} else {
|
||||
callbacks_not_called.push(err);
|
||||
aggregated_errors.push(err);
|
||||
}
|
||||
}
|
||||
this[queue_names[i]] = new Queue();
|
||||
}
|
||||
// Mutate the original error that will be emitted
|
||||
// This is fine, as we don't manipulate any user errors
|
||||
if (callbacks_not_called.length !== 0) {
|
||||
error.errors = callbacks_not_called;
|
||||
// Currently this would be a breaking change, therefore it's only emitted in debug_mode
|
||||
if (exports.debug_mode && aggregated_errors.length) {
|
||||
var error;
|
||||
if (aggregated_errors.length === 1) {
|
||||
error = aggregated_errors[0];
|
||||
} else {
|
||||
error_attributes.message = error_attributes.message.replace('It', 'They').replace(/command/i, '$&s');
|
||||
error = new errorClasses.AggregateError(error_attributes);
|
||||
error.errors = aggregated_errors;
|
||||
}
|
||||
this.emit('error', error);
|
||||
}
|
||||
return callbacks_not_called.length === 0;
|
||||
};
|
||||
|
||||
RedisClient.prototype.on_error = function (err) {
|
||||
@@ -538,6 +565,7 @@ RedisClient.prototype.connection_gone = function (why, error) {
|
||||
if (this.retry_timer) {
|
||||
return;
|
||||
}
|
||||
error = error || null;
|
||||
|
||||
debug('Redis connection is gone from ' + why + ' event.');
|
||||
this.connected = false;
|
||||
@@ -564,9 +592,12 @@ RedisClient.prototype.connection_gone = function (why, error) {
|
||||
// If this is a requested shutdown, then don't retry
|
||||
if (this.closing) {
|
||||
debug('Connection ended by quit / end command, not retrying.');
|
||||
error = new Error('Stream connection ended and running command aborted. It might have been processed.');
|
||||
error.code = 'NR_OFFLINE';
|
||||
this.flush_and_error(error);
|
||||
this.flush_and_error({
|
||||
message: 'Stream connection ended and command aborted.',
|
||||
code: 'NR_CLOSED'
|
||||
}, {
|
||||
error: error
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -586,31 +617,39 @@ RedisClient.prototype.connection_gone = function (why, error) {
|
||||
if (typeof this.retry_delay !== 'number') {
|
||||
// Pass individual error through
|
||||
if (this.retry_delay instanceof Error) {
|
||||
error = new CommandError(this.retry_delay);
|
||||
}
|
||||
// Attention: there might be the case where there's no error!
|
||||
if (!error) {
|
||||
error = new Error('Stream connection ended and running command aborted. It might have been processed.');
|
||||
error.code = 'NR_OFFLINE';
|
||||
}
|
||||
// Only emit an error in case that a running command had no callback
|
||||
if (!this.flush_and_error(error)) {
|
||||
error.message = 'Stream connection ended and all running commands aborted. They might have been processed.';
|
||||
this.emit('error', error);
|
||||
error = this.retry_delay;
|
||||
}
|
||||
this.flush_and_error({
|
||||
message: 'Stream connection ended and command aborted.',
|
||||
code: 'NR_CLOSED'
|
||||
}, {
|
||||
error: error
|
||||
});
|
||||
this.end(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.max_attempts !== 0 && this.attempts >= this.max_attempts || this.retry_totaltime >= this.connect_timeout) {
|
||||
var message = this.retry_totaltime >= this.connect_timeout ?
|
||||
'connection timeout exceeded.' :
|
||||
'maximum connection attempts exceeded.';
|
||||
error = new Error('Redis connection in broken state: ' + message);
|
||||
error.code = 'CONNECTION_BROKEN';
|
||||
this.flush_and_error(error);
|
||||
this.emit('error', error);
|
||||
var message = 'Redis connection in broken state: ';
|
||||
if (this.retry_totaltime >= this.connect_timeout) {
|
||||
message += 'connection timeout exceeded.';
|
||||
} else {
|
||||
message += 'maximum connection attempts exceeded.';
|
||||
}
|
||||
|
||||
this.flush_and_error({
|
||||
message: message,
|
||||
code: 'CONNECTION_BROKEN',
|
||||
}, {
|
||||
error: error
|
||||
});
|
||||
var err = new Error(message);
|
||||
err.code = 'CONNECTION_BROKEN';
|
||||
if (error) {
|
||||
err.origin = error;
|
||||
}
|
||||
this.emit('error', err);
|
||||
this.end(false);
|
||||
return;
|
||||
}
|
||||
@@ -620,13 +659,13 @@ RedisClient.prototype.connection_gone = function (why, error) {
|
||||
this.offline_queue.unshift.apply(this.offline_queue, this.command_queue.toArray());
|
||||
this.command_queue.clear();
|
||||
} else if (this.command_queue.length !== 0) {
|
||||
error = new Error('Redis connection lost and command aborted in uncertain state. It might have been processed.');
|
||||
error.code = 'UNCERTAIN_STATE';
|
||||
if (!this.flush_and_error(error, ['command_queue'])) {
|
||||
// Only emit if not all commands had a callback that already handled the error
|
||||
error.message = 'Redis connection lost and commands aborted in uncertain state. They might have been processed.';
|
||||
this.emit('error', error);
|
||||
}
|
||||
this.flush_and_error({
|
||||
message: 'Redis connection lost and command aborted.',
|
||||
code: 'UNCERTAIN_STATE'
|
||||
}, {
|
||||
error: error,
|
||||
queues: ['command_queue']
|
||||
});
|
||||
}
|
||||
|
||||
if (this.retry_max_delay !== null && this.retry_delay > this.retry_max_delay) {
|
||||
@@ -643,11 +682,9 @@ RedisClient.prototype.connection_gone = function (why, error) {
|
||||
|
||||
RedisClient.prototype.return_error = function (err) {
|
||||
var command_obj = this.command_queue.shift();
|
||||
if (command_obj && command_obj.command && command_obj.command.toUpperCase) {
|
||||
err.command = command_obj.command.toUpperCase();
|
||||
if (command_obj.args.length) {
|
||||
err.args = command_obj.args;
|
||||
}
|
||||
err.command = command_obj.command.toUpperCase();
|
||||
if (command_obj.args && command_obj.args.length) {
|
||||
err.args = command_obj.args;
|
||||
}
|
||||
|
||||
// Count down pub sub mode if in entering modus
|
||||
@@ -661,7 +698,7 @@ RedisClient.prototype.return_error = function (err) {
|
||||
err.code = match[1];
|
||||
}
|
||||
|
||||
utils.callback_or_emit(this, command_obj && command_obj.callback, err);
|
||||
utils.callback_or_emit(this, command_obj.callback, err);
|
||||
};
|
||||
|
||||
RedisClient.prototype.drain = function () {
|
||||
@@ -809,14 +846,16 @@ function handle_offline_command (self, command_obj) {
|
||||
msg = 'Stream not writeable.';
|
||||
}
|
||||
} else {
|
||||
msg = 'The connection has already been closed.';
|
||||
msg = 'The connection is already closed.';
|
||||
}
|
||||
err = new Error(command + " can't be processed. " + msg);
|
||||
err.command = command;
|
||||
if (command_obj.args.length) {
|
||||
err = new errorClasses.AbortError({
|
||||
message: command + " can't be processed. " + msg,
|
||||
code: 'NR_CLOSED',
|
||||
command: command
|
||||
});
|
||||
if (command_obj.args && command_obj.args.length) {
|
||||
err.args = command_obj.args;
|
||||
}
|
||||
err.code = 'NR_OFFLINE';
|
||||
utils.reply_in_order(self, callback, err);
|
||||
} else {
|
||||
debug('Queueing ' + command + ' for next server connection.');
|
||||
@@ -889,8 +928,8 @@ RedisClient.prototype.internal_send_command = function (command, args, callback,
|
||||
args_copy[i] = '' + args[i];
|
||||
}
|
||||
}
|
||||
args = null;
|
||||
command_obj = new Command(command, args_copy, buffer_args, callback);
|
||||
// Pass the original args to make sure in error cases the original arguments are returned
|
||||
command_obj = new Command(command, args, buffer_args, callback);
|
||||
|
||||
if (this.options.prefix) {
|
||||
prefix_keys = commands.getKeyIndexes(command, args_copy);
|
||||
@@ -1053,6 +1092,9 @@ exports.createClient = function () {
|
||||
exports.RedisClient = RedisClient;
|
||||
exports.print = utils.print;
|
||||
exports.Multi = require('./lib/multi');
|
||||
exports.AbortError = errorClasses.AbortError;
|
||||
exports.ReplyError = errorClasses.ReplyError;
|
||||
exports.AggregateError = errorClasses.AggregateError;
|
||||
|
||||
// Add all redis commands / node_redis api to the client
|
||||
require('./lib/individualCommands');
|
||||
|
Reference in New Issue
Block a user