1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00

Implement CLIENT REPLY ON|OFF|SKIP

This commit is contained in:
Ruben Bridgewater
2016-04-14 01:17:43 +02:00
parent 3038c9043d
commit 97ae78877b
5 changed files with 233 additions and 24 deletions

View File

@@ -150,6 +150,7 @@ function RedisClient (options, stream) {
this.times_connected = 0;
this.options = options;
this.buffers = options.return_buffers || options.detect_buffers;
this.reply = 'ON'; // Returning replies is the default
// Init parser
this.reply_parser = create_parser(this, options);
this.create_stream();
@@ -901,6 +902,22 @@ RedisClient.prototype.internal_send_command = function (command, args, callback,
if (call_on_write) {
call_on_write();
}
// Handle `CLIENT REPLY ON|OFF|SKIP`
// This has to be checked after call_on_write
if (this.reply === 'ON') {
this.command_queue.push(command_obj);
} else {
// Do not expect a reply
// Does this work in combination with the pub sub mode?
if (callback) {
utils.reply_in_order(this, callback, null, undefined, this.command_queue);
}
if (this.reply === 'SKIP') {
this.reply = 'SKIP_ONE_MORE';
} else if (this.reply === 'SKIP_ONE_MORE') {
this.reply = 'ON';
}
}
return !this.should_buffer;
};

View File

@@ -226,6 +226,97 @@ Multi.prototype.auth = Multi.prototype.AUTH = function auth (pass, callback) {
return this;
};
RedisClient.prototype.client = RedisClient.prototype.CLIENT = function client () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else if (Array.isArray(arguments[1])) {
if (len === 3) {
callback = arguments[2];
}
len = arguments[1].length;
arr = new Array(len + 1);
arr[0] = arguments[0];
for (; i < len; i += 1) {
arr[i + 1] = arguments[1][i];
}
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this;
var call_on_write = undefined;
// CLIENT REPLY ON|OFF|SKIP
/* istanbul ignore next: TODO: Remove this as soon as Travis runs Redis 3.2 */
if (arr.length === 2 && arr[0].toString().toUpperCase() === 'REPLY') {
var reply_on_off = arr[1].toString().toUpperCase();
if (reply_on_off === 'ON' || reply_on_off === 'OFF' || reply_on_off === 'SKIP') {
call_on_write = function () {
self.reply = reply_on_off;
};
}
}
return this.internal_send_command('client', arr, callback, call_on_write);
};
Multi.prototype.client = Multi.prototype.CLIENT = function client () {
var arr,
len = arguments.length,
callback,
i = 0;
if (Array.isArray(arguments[0])) {
arr = arguments[0];
callback = arguments[1];
} else if (Array.isArray(arguments[1])) {
if (len === 3) {
callback = arguments[2];
}
len = arguments[1].length;
arr = new Array(len + 1);
arr[0] = arguments[0];
for (; i < len; i += 1) {
arr[i + 1] = arguments[1][i];
}
} else {
len = arguments.length;
// The later should not be the average use case
if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) {
len--;
callback = arguments[len];
}
arr = new Array(len);
for (; i < len; i += 1) {
arr[i] = arguments[i];
}
}
var self = this._client;
var call_on_write = undefined;
// CLIENT REPLY ON|OFF|SKIP
/* istanbul ignore next: TODO: Remove this as soon as Travis runs Redis 3.2 */
if (arr.length === 2 && arr[0].toString().toUpperCase() === 'REPLY') {
var reply_on_off = arr[1].toString().toUpperCase();
if (reply_on_off === 'ON' || reply_on_off === 'OFF' || reply_on_off === 'SKIP') {
call_on_write = function () {
self.reply = reply_on_off;
};
}
}
this.queue.push(['client', arr, callback, call_on_write]);
return this;
};
RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () {
var arr,
len = arguments.length,

View File

@@ -90,9 +90,15 @@ function callbackOrEmit (self, callback, err, res) {
}
}
function replyInOrder (self, callback, err, res) {
// The offline queue has to be checked first, as there might be commands in both queues at the same time
var command_obj = self.offline_queue.peekBack() || self.command_queue.peekBack();
function replyInOrder (self, callback, err, res, queue) {
// If the queue is explicitly passed, use that, otherwise fall back to the offline queue first,
// as there might be commands in both queues at the same time
var command_obj;
if (queue) {
command_obj = queue.peekBack();
} else {
command_obj = self.offline_queue.peekBack() || self.command_queue.peekBack();
}
if (!command_obj) {
process.nextTick(function () {
callbackOrEmit(self, callback, err, res);

View File

@@ -30,25 +30,89 @@ describe("The 'client' method", function () {
});
it("lists connected clients when invoked with multi's chaining syntax", function (done) {
client.multi().client('list').exec(function (err, results) {
assert(pattern.test(results[0]), "expected string '" + results + "' to match " + pattern.toString());
return done();
});
client.multi().client('list', helper.isType.string()).exec(helper.match(pattern, done));
});
it('lists connected clients when invoked with array syntax on client', function (done) {
client.multi().client(['list']).exec(function (err, results) {
assert(pattern.test(results[0]), "expected string '" + results + "' to match " + pattern.toString());
return done();
});
client.multi().client(['list']).exec(helper.match(pattern, done));
});
it("lists connected clients when invoked with multi's array syntax", function (done) {
client.multi([
['client', 'list']
]).exec(function (err, results) {
assert(pattern.test(results[0]), "expected string '" + results + "' to match " + pattern.toString());
return done();
]).exec(helper.match(pattern, done));
});
});
describe('reply', function () {
describe('as normal command', function () {
it('on', function (done) {
helper.serverVersionAtLeast.call(this, client, [3, 2, 0]);
assert.strictEqual(client.reply, 'ON');
client.client('reply', 'on', helper.isString('OK'));
assert.strictEqual(client.reply, 'ON');
client.set('foo', 'bar', done);
});
it('off', function (done) {
helper.serverVersionAtLeast.call(this, client, [3, 2, 0]);
assert.strictEqual(client.reply, 'ON');
client.client(new Buffer('REPLY'), 'OFF', helper.isUndefined());
assert.strictEqual(client.reply, 'OFF');
client.set('foo', 'bar', helper.isUndefined(done));
});
it('skip', function (done) {
helper.serverVersionAtLeast.call(this, client, [3, 2, 0]);
assert.strictEqual(client.reply, 'ON');
client.client('REPLY', new Buffer('SKIP'), helper.isUndefined());
assert.strictEqual(client.reply, 'SKIP_ONE_MORE');
client.set('foo', 'bar', helper.isUndefined());
client.get('foo', helper.isString('bar', done));
});
});
describe('in a batch context', function () {
it('on', function (done) {
helper.serverVersionAtLeast.call(this, client, [3, 2, 0]);
var batch = client.batch();
assert.strictEqual(client.reply, 'ON');
batch.client('reply', 'on', helper.isString('OK'));
assert.strictEqual(client.reply, 'ON');
batch.set('foo', 'bar');
batch.exec(function (err, res) {
assert.deepEqual(res, ['OK', 'OK']);
done(err);
});
});
it('off', function (done) {
helper.serverVersionAtLeast.call(this, client, [3, 2, 0]);
var batch = client.batch();
assert.strictEqual(client.reply, 'ON');
batch.set('hello', 'world');
batch.client(new Buffer('REPLY'), new Buffer('OFF'), helper.isUndefined());
batch.set('foo', 'bar', helper.isUndefined());
batch.exec(function (err, res) {
assert.strictEqual(client.reply, 'OFF');
assert.deepEqual(res, ['OK', undefined, undefined]);
done(err);
});
});
it('skip', function (done) {
helper.serverVersionAtLeast.call(this, client, [3, 2, 0]);
assert.strictEqual(client.reply, 'ON');
client.batch()
.set('hello', 'world')
.client('REPLY', 'SKIP', helper.isUndefined())
.set('foo', 'bar', helper.isUndefined())
.get('foo')
.exec(function (err, res) {
assert.strictEqual(client.reply, 'ON');
assert.deepEqual(res, ['OK', undefined, undefined, 'bar']);
done(err);
});
});
});
});

View File

@@ -29,6 +29,14 @@ if (!process.env.REDIS_TESTS_STARTED) {
});
}
function arrayHelper (results) {
if (results instanceof Array) {
assert.strictEqual(results.length, 1, 'The array length may only be one element');
return results[0];
}
return results;
}
module.exports = {
redisProcess: function () {
return rp;
@@ -52,8 +60,9 @@ module.exports = {
},
isNumber: function (expected, done) {
return function (err, results) {
assert.strictEqual(null, err, 'expected ' + expected + ', got error: ' + err);
assert.strictEqual(expected, results, expected + ' !== ' + results);
assert.strictEqual(err, null, 'expected ' + expected + ', got error: ' + err);
results = arrayHelper(results);
assert.strictEqual(results, expected, expected + ' !== ' + results);
assert.strictEqual(typeof results, 'number', 'expected a number, got ' + typeof results);
if (done) done();
};
@@ -61,18 +70,28 @@ module.exports = {
isString: function (str, done) {
str = '' + str; // Make sure it's a string
return function (err, results) {
assert.strictEqual(null, err, "expected string '" + str + "', got error: " + err);
assert.strictEqual(err, null, "expected string '" + str + "', got error: " + err);
results = arrayHelper(results);
if (Buffer.isBuffer(results)) { // If options are passed to return either strings or buffers...
results = results.toString();
}
assert.strictEqual(str, results, str + ' does not match ' + results);
assert.strictEqual(results, str, str + ' does not match ' + results);
if (done) done();
};
},
isNull: function (done) {
return function (err, results) {
assert.strictEqual(null, err, 'expected null, got error: ' + err);
assert.strictEqual(null, results, results + ' is not null');
assert.strictEqual(err, null, 'expected null, got error: ' + err);
results = arrayHelper(results);
assert.strictEqual(results, null, results + ' is not null');
if (done) done();
};
},
isUndefined: function (done) {
return function (err, results) {
assert.strictEqual(err, null, 'expected null, got error: ' + err);
results = arrayHelper(results);
assert.strictEqual(results, undefined, results + ' is not undefined');
if (done) done();
};
},
@@ -91,27 +110,39 @@ module.exports = {
isType: {
number: function (done) {
return function (err, results) {
assert.strictEqual(null, err, 'expected any number, got error: ' + err);
assert.strictEqual(err, null, 'expected any number, got error: ' + err);
assert.strictEqual(typeof results, 'number', results + ' is not a number');
if (done) done();
};
},
string: function (done) {
return function (err, results) {
assert.strictEqual(err, null, 'expected any string, got error: ' + err);
assert.strictEqual(typeof results, 'string', results + ' is not a string');
if (done) done();
};
},
positiveNumber: function (done) {
return function (err, results) {
assert.strictEqual(null, err, 'expected positive number, got error: ' + err);
assert.strictEqual(true, (results > 0), results + ' is not a positive number');
assert.strictEqual(err, null, 'expected positive number, got error: ' + err);
assert(results > 0, results + ' is not a positive number');
if (done) done();
};
}
},
match: function (pattern, done) {
return function (err, results) {
assert.strictEqual(null, err, 'expected ' + pattern.toString() + ', got error: ' + err);
assert.strictEqual(err, null, 'expected ' + pattern.toString() + ', got error: ' + err);
results = arrayHelper(results);
assert(pattern.test(results), "expected string '" + results + "' to match " + pattern.toString());
if (done) done();
};
},
serverVersionAtLeast: function (connection, desired_version) {
// Wait until a connection has established (otherwise a timeout is going to be triggered at some point)
if (Object.keys(connection.server_info).length === 0) {
throw new Error('Version check not possible as the client is not yet ready or did not expose the version');
}
// Return true if the server version >= desired_version
var version = connection.server_info.versions;
for (var i = 0; i < 3; i++) {