You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-01 16:46:54 +03:00
Standard is not as up to date and still uses a old eslint version. Instead, use the airbnb default with a couple of modifications. All required changes are included.
607 lines
21 KiB
JavaScript
607 lines
21 KiB
JavaScript
'use strict'
|
|
|
|
const { Buffer } = require('buffer')
|
|
const assert = require('assert')
|
|
const config = require('./lib/config')
|
|
const helper = require('./helper')
|
|
|
|
const { redis } = config
|
|
|
|
describe('publish/subscribe', () => {
|
|
helper.allTests((ip, args) => {
|
|
describe(`using ${ip}`, () => {
|
|
let pub = null
|
|
let sub = null
|
|
const channel = 'test channel'
|
|
const channel2 = 'test channel 2'
|
|
const message = 'test message'
|
|
|
|
beforeEach((done) => {
|
|
const end = helper.callFuncAfter(done, 2)
|
|
|
|
pub = redis.createClient.apply(null, args)
|
|
sub = redis.createClient.apply(null, args)
|
|
pub.flushdb().then(() => end())
|
|
sub.once('connect', end)
|
|
})
|
|
|
|
describe('disable resubscribe', () => {
|
|
beforeEach((done) => {
|
|
sub.end(false)
|
|
sub = redis.createClient({
|
|
disableResubscribing: true
|
|
})
|
|
sub.once('connect', done)
|
|
})
|
|
|
|
it('does not fire subscribe events after reconnecting', (done) => {
|
|
let a = false
|
|
sub.on('subscribe', (chnl, count) => {
|
|
if (chnl === channel2) {
|
|
if (a) {
|
|
return done(new Error('Test failed'))
|
|
}
|
|
assert.strictEqual(2, count)
|
|
sub._stream.destroy()
|
|
}
|
|
})
|
|
|
|
sub.on('reconnecting', () => {
|
|
a = true
|
|
sub.on('ready', () => {
|
|
assert.strictEqual(sub.commandQueue.length, 0)
|
|
done()
|
|
})
|
|
})
|
|
|
|
sub.subscribe(channel, channel2)
|
|
})
|
|
})
|
|
|
|
describe('stringNumbers and pub sub', () => {
|
|
beforeEach((done) => {
|
|
sub.end(false)
|
|
sub = redis.createClient({
|
|
stringNumbers: true
|
|
})
|
|
sub.once('connect', done)
|
|
})
|
|
|
|
it('does not fire subscribe events after reconnecting', (done) => {
|
|
let i = 0
|
|
const end = helper.callFuncAfter(done, 2)
|
|
sub.on('subscribe', (chnl, count) => {
|
|
assert.strictEqual(typeof count, 'number')
|
|
assert.strictEqual(++i, count)
|
|
})
|
|
sub.on('unsubscribe', (chnl, count) => {
|
|
assert.strictEqual(typeof count, 'number')
|
|
assert.strictEqual(--i, count)
|
|
if (count === 0) {
|
|
assert.deepStrictEqual(sub._subscriptionSet, {})
|
|
end()
|
|
}
|
|
})
|
|
sub.subscribe(channel, channel2)
|
|
sub.unsubscribe()
|
|
sub.set('foo', 'bar').then(helper.isString('OK'))
|
|
sub.subscribe(channel2).then(end)
|
|
})
|
|
})
|
|
|
|
describe('subscribe', () => {
|
|
it('fires a subscribe event for each channel subscribed to even after reconnecting', (done) => {
|
|
let a = false
|
|
sub.on('subscribe', (chnl, count) => {
|
|
if (chnl === channel2) {
|
|
assert.strictEqual(2, count)
|
|
if (a) return done()
|
|
sub._stream.destroy()
|
|
}
|
|
})
|
|
|
|
sub.on('reconnecting', () => {
|
|
a = true
|
|
})
|
|
|
|
sub.subscribe(channel, channel2)
|
|
})
|
|
|
|
it('fires a subscribe event for each channel as buffer subscribed to even after reconnecting', (done) => {
|
|
let a = false
|
|
sub.end(true)
|
|
sub = redis.createClient({
|
|
detectBuffers: true
|
|
})
|
|
sub.on('subscribe', (chnl, count) => {
|
|
if (chnl.inspect() === Buffer.from([0xAA, 0xBB, 0x00, 0xF0]).inspect()) {
|
|
assert.strictEqual(1, count)
|
|
if (a) {
|
|
return done()
|
|
}
|
|
sub._stream.destroy()
|
|
}
|
|
})
|
|
|
|
sub.on('reconnecting', () => {
|
|
a = true
|
|
})
|
|
|
|
sub.subscribe(Buffer.from([0xAA, 0xBB, 0x00, 0xF0]), channel2)
|
|
})
|
|
|
|
it('receives messages on subscribed channel', (done) => {
|
|
const end = helper.callFuncAfter(done, 2)
|
|
sub.on('subscribe', (chnl, count) => {
|
|
pub.publish(channel, message).then((res) => {
|
|
helper.isNumber(1)(res)
|
|
end()
|
|
})
|
|
})
|
|
|
|
sub.on('message', (chnl, msg) => {
|
|
assert.strictEqual(chnl, channel)
|
|
assert.strictEqual(msg, message)
|
|
end()
|
|
})
|
|
|
|
sub.subscribe(channel)
|
|
})
|
|
|
|
it('receives messages if subscribe is called after unsubscribe', (done) => {
|
|
const end = helper.callFuncAfter(done, 2)
|
|
sub.once('subscribe', (chnl, count) => {
|
|
pub.publish(channel, message).then((res) => {
|
|
helper.isNumber(1)(res)
|
|
end()
|
|
})
|
|
})
|
|
|
|
sub.on('message', (chnl, msg) => {
|
|
assert.strictEqual(chnl, channel)
|
|
assert.strictEqual(msg, message)
|
|
end()
|
|
})
|
|
|
|
sub.subscribe(channel)
|
|
sub.unsubscribe(channel)
|
|
sub.subscribe(channel)
|
|
})
|
|
|
|
it('handles SUB UNSUB MSG SUB', () => {
|
|
return Promise.all([
|
|
sub.subscribe('chan8'),
|
|
sub.subscribe('chan9'),
|
|
sub.unsubscribe('chan9'),
|
|
pub.publish('chan8', 'something'),
|
|
sub.subscribe('chan9')
|
|
])
|
|
})
|
|
|
|
it('handles SUB UNSUB MSG SUB 2', () => {
|
|
return Promise.all([
|
|
sub.psubscribe('abc*').then(helper.isDeepEqual([1, ['abc*']])),
|
|
sub.subscribe('xyz'),
|
|
sub.unsubscribe('xyz'),
|
|
pub.publish('abcd', 'something'),
|
|
sub.subscribe('xyz')
|
|
])
|
|
})
|
|
|
|
it('emits end event if quit is called from within subscribe', (done) => {
|
|
sub.on('end', done)
|
|
sub.on('subscribe', (chnl, count) => {
|
|
sub.quit()
|
|
})
|
|
sub.subscribe(channel)
|
|
})
|
|
|
|
it('subscribe; close; resubscribe with prototype inherited property names', (done) => {
|
|
let count = 0
|
|
const channels = ['channel 1', 'channel 2']
|
|
const msg = ['hi from channel 1', 'hi from channel 2']
|
|
|
|
sub.on('message', (channel, message) => {
|
|
const n = Math.max(count - 1, 0)
|
|
assert.strictEqual(channel, channels[n])
|
|
assert.strictEqual(message, msg[n])
|
|
if (count === 2) return done()
|
|
sub._stream.end()
|
|
})
|
|
|
|
sub.select(3)
|
|
sub.subscribe(channels)
|
|
|
|
sub.on('ready', () => {
|
|
pub.publish(channels[count], msg[count])
|
|
count++
|
|
})
|
|
|
|
pub.publish(channels[count], msg[count])
|
|
})
|
|
})
|
|
|
|
describe('multiple subscribe / unsubscribe commands', () => {
|
|
it('reconnects properly with pub sub and select command', (done) => {
|
|
const end = helper.callFuncAfter(done, 2)
|
|
sub.select(3)
|
|
sub.set('foo', 'bar')
|
|
sub.set('failure').then(helper.fail, helper.isError()) // Triggering a warning while subscribing should work
|
|
sub.mget('foo', 'bar', 'baz', 'hello', 'world').then(helper.isDeepEqual(['bar', null, null, null, null]))
|
|
sub.subscribe('somechannel', 'another channel').then((res) => {
|
|
end()
|
|
sub._stream.destroy()
|
|
})
|
|
assert(sub.ready)
|
|
sub.on('ready', () => {
|
|
sub.unsubscribe()
|
|
sub.del('foo')
|
|
sub.info().then(end)
|
|
})
|
|
})
|
|
|
|
it('should not go into pubsub mode with unsubscribe commands', () => {
|
|
sub.on('unsubscribe', (msg) => {
|
|
// The unsubscribe should not be triggered, as there was no corresponding channel
|
|
throw new Error('Test failed')
|
|
})
|
|
return Promise.all([
|
|
sub.set('foo', 'bar'),
|
|
sub.unsubscribe().then(helper.isDeepEqual([0, []])),
|
|
sub.del('foo')
|
|
])
|
|
})
|
|
|
|
it('handles multiple channels with the same channel name properly, even with buffers', () => {
|
|
const channels = ['a', 'b', 'a', Buffer.from('a'), 'c', 'b']
|
|
const subscribedChannels = [1, 2, 2, 2, 3, 3]
|
|
sub.subscribe(channels)
|
|
sub.on('subscribe', (channel, count) => {
|
|
const compareChannel = channels.shift()
|
|
if (Buffer.isBuffer(channel)) {
|
|
assert.strictEqual(channel.inspect(), Buffer.from(compareChannel).inspect())
|
|
} else {
|
|
assert.strictEqual(channel, compareChannel.toString())
|
|
}
|
|
assert.strictEqual(count, subscribedChannels.shift())
|
|
})
|
|
sub.unsubscribe('a', 'c', 'b')
|
|
return sub.get('foo')
|
|
})
|
|
|
|
it('should only resubscribe to channels not unsubscribed earlier on a reconnect', (done) => {
|
|
sub.subscribe('/foo', '/bar')
|
|
sub.batch().unsubscribe(['/bar']).exec().then(() => {
|
|
pub.pubsub('channels').then((res) => {
|
|
helper.isDeepEqual(['/foo'])(res)
|
|
sub._stream.destroy()
|
|
sub.once('ready', () => {
|
|
pub.pubsub('channels').then((res) => {
|
|
helper.isDeepEqual(['/foo'])(res)
|
|
sub.unsubscribe('/foo').then(() => done())
|
|
})
|
|
})
|
|
})
|
|
})
|
|
})
|
|
|
|
it('unsubscribes, subscribes, unsubscribes... single and multiple entries mixed', (done) => {
|
|
function subscribe(channels) {
|
|
sub.unsubscribe().then(helper.isNull)
|
|
sub.subscribe(channels).then(helper.isNull)
|
|
}
|
|
let all = false
|
|
const subscribeMsg = ['1', '3', '2', '5', 'test', 'bla']
|
|
sub.on('subscribe', (msg, count) => {
|
|
subscribeMsg.splice(subscribeMsg.indexOf(msg), 1)
|
|
if (subscribeMsg.length === 0 && all) {
|
|
assert.strictEqual(count, 3)
|
|
done()
|
|
}
|
|
})
|
|
const unsubscribeMsg = ['1', '3', '2']
|
|
sub.on('unsubscribe', (msg, count) => {
|
|
unsubscribeMsg.splice(unsubscribeMsg.indexOf(msg), 1)
|
|
if (unsubscribeMsg.length === 0) {
|
|
assert.strictEqual(count, 0)
|
|
all = true
|
|
}
|
|
})
|
|
|
|
subscribe(['1', '3'])
|
|
subscribe(['2'])
|
|
subscribe(['5', 'test', 'bla'])
|
|
})
|
|
|
|
it('unsubscribes, subscribes, unsubscribes... single and multiple entries mixed. Without concrete channels', (done) => {
|
|
function subscribe(channels) {
|
|
sub.unsubscribe(channels)
|
|
sub.unsubscribe(channels)
|
|
sub.subscribe(channels)
|
|
}
|
|
let all = false
|
|
const subscribeMsg = ['1', '3', '2', '5', 'test', 'bla']
|
|
sub.on('subscribe', (msg, count) => {
|
|
subscribeMsg.splice(subscribeMsg.indexOf(msg), 1)
|
|
if (subscribeMsg.length === 0 && all) {
|
|
assert.strictEqual(count, 6)
|
|
done()
|
|
}
|
|
})
|
|
const unsubscribeMsg = ['1', '3', '2', '5', 'test', 'bla']
|
|
sub.on('unsubscribe', (msg, count) => {
|
|
const pos = unsubscribeMsg.indexOf(msg)
|
|
if (pos !== -1) { unsubscribeMsg.splice(pos, 1) }
|
|
if (unsubscribeMsg.length === 0) {
|
|
all = true
|
|
}
|
|
})
|
|
|
|
subscribe(['1', '3'])
|
|
subscribe(['2'])
|
|
subscribe(['5', 'test', 'bla'])
|
|
})
|
|
|
|
it('unsubscribes, subscribes, unsubscribes... with pattern matching', (done) => {
|
|
function subscribe(channels, callback) {
|
|
sub.punsubscribe('prefix:*').then(helper.isNull)
|
|
sub.psubscribe(channels).then(callback)
|
|
}
|
|
let all = false
|
|
const end = helper.callFuncAfter(done, 8)
|
|
const subscribeMsg = ['prefix:*', 'prefix:3', 'prefix:2', '5', 'test:a', 'bla']
|
|
sub.on('psubscribe', (msg, count) => {
|
|
subscribeMsg.splice(subscribeMsg.indexOf(msg), 1)
|
|
if (subscribeMsg.length === 0) {
|
|
assert.strictEqual(count, 5)
|
|
all = true
|
|
}
|
|
})
|
|
let rest = 1
|
|
const unsubscribeMsg = ['prefix:*', 'prefix:*', 'prefix:*', '*']
|
|
sub.on('punsubscribe', (msg, count) => {
|
|
unsubscribeMsg.splice(unsubscribeMsg.indexOf(msg), 1)
|
|
if (all) {
|
|
assert.strictEqual(unsubscribeMsg.length, 0)
|
|
assert.strictEqual(count, rest--) // Print the remaining channels
|
|
end()
|
|
} else {
|
|
assert.strictEqual(msg, 'prefix:*')
|
|
assert.strictEqual(count, rest++ - 1)
|
|
}
|
|
})
|
|
sub.on('message', (channel, msg, pattern) => {
|
|
assert.strictEqual(msg, 'test')
|
|
assert.strictEqual(pattern, 'prefix:*')
|
|
assert.strictEqual(channel, 'prefix:1')
|
|
end()
|
|
})
|
|
|
|
subscribe(['prefix:*', 'prefix:3'], () => {
|
|
pub.publish('prefix:1', Buffer.from('test')).then(() => {
|
|
subscribe(['prefix:2'])
|
|
subscribe(['5', 'test:a', 'bla'], () => assert(all))
|
|
sub.punsubscribe().then((res) => {
|
|
assert.deepStrictEqual(res, [0, ['prefix:3', 'prefix:2', '5', 'test:a', 'bla']])
|
|
assert(all)
|
|
all = false // Make sure the callback is actually after the emit
|
|
end()
|
|
})
|
|
sub.pubsub('channels').then(helper.isDeepEqual([])).then(end)
|
|
})
|
|
})
|
|
})
|
|
})
|
|
|
|
describe('unsubscribe', () => {
|
|
it('fires an unsubscribe event', (done) => {
|
|
sub.on('subscribe', (chnl, count) => {
|
|
sub.unsubscribe(channel)
|
|
})
|
|
|
|
sub.subscribe(channel)
|
|
|
|
sub.on('unsubscribe', (chnl, count) => {
|
|
assert.strictEqual(chnl, channel)
|
|
assert.strictEqual(count, 0)
|
|
done()
|
|
})
|
|
})
|
|
|
|
it('puts client back into write mode', (done) => {
|
|
sub.on('subscribe', (chnl, count) => {
|
|
sub.unsubscribe(channel)
|
|
})
|
|
|
|
sub.subscribe(channel)
|
|
|
|
sub.on('unsubscribe', (chnl, count) => {
|
|
pub.incr('foo').then(helper.isNumber(1)).then(done)
|
|
})
|
|
})
|
|
|
|
it('sub executes when unsubscribe is called and there are no subscriptions', () => {
|
|
return sub.unsubscribe().then(helper.isDeepEqual([0, []]))
|
|
})
|
|
|
|
it('pub executes when unsubscribe is called and there are no subscriptions', () => {
|
|
return Promise.all([
|
|
pub.unsubscribe().then(helper.isDeepEqual([0, []])),
|
|
pub.get('foo')
|
|
])
|
|
})
|
|
})
|
|
|
|
describe('psubscribe', () => {
|
|
it('allows all channels to be subscribed to using a * pattern', (done) => {
|
|
const sub2 = redis.createClient({
|
|
returnBuffers: true
|
|
})
|
|
const end = helper.callFuncAfter(() => sub2.quit().then(() => done()), 2)
|
|
sub.subscribe('/foo').then(() => {
|
|
sub2.on('ready', () => {
|
|
sub2.batch().psubscribe('*').exec().then(helper.isDeepEqual([[1, ['*']]]))
|
|
sub2.subscribe('/foo').then(() => {
|
|
pub.pubsub('numsub', '/foo').then(helper.isDeepEqual(['/foo', 2]))
|
|
// sub2 is counted twice as it subscribed with psubscribe and subscribe
|
|
pub.publish('/foo', 'hello world').then(helper.isNumber(3))
|
|
})
|
|
sub2.on('messageBuffer', (channel, message, pattern) => {
|
|
if (pattern) {
|
|
assert.strictEqual(pattern.inspect(), Buffer.from('*').inspect())
|
|
}
|
|
assert.strictEqual(channel.inspect(), Buffer.from('/foo').inspect())
|
|
assert.strictEqual(message.inspect(), Buffer.from('hello world').inspect())
|
|
end()
|
|
})
|
|
})
|
|
})
|
|
})
|
|
|
|
it('allows to listen to pmessageBuffer and pmessage', (done) => {
|
|
const end = helper.callFuncAfter(done, 5)
|
|
const data = Array(10000).join('äüs^öéÉÉ`e')
|
|
sub.set('foo', data).then(() => {
|
|
sub.get('foo').then(res => assert.strictEqual(typeof res, 'string'))
|
|
sub._stream.once('data', () => {
|
|
assert.strictEqual(sub._messageBuffers, false)
|
|
assert.strictEqual(sub.shouldBuffer, false)
|
|
assert.strictEqual(args[2].detectBuffers, sub._parserReturningBuffers)
|
|
assert.strictEqual(sub._messageBuffers, false)
|
|
sub.on('messageBuffer', (channel, message, pattern) => {
|
|
if (pattern) {
|
|
assert.strictEqual(pattern.inspect(), Buffer.from('*').inspect())
|
|
end()
|
|
}
|
|
assert.strictEqual(channel.inspect(), Buffer.from('/foo').inspect())
|
|
end()
|
|
})
|
|
assert.strictEqual(sub._messageBuffers, sub._parserReturningBuffers)
|
|
assert.strictEqual(sub._messageBuffers, true)
|
|
})
|
|
const batch = sub.batch()
|
|
batch.psubscribe('*')
|
|
batch.subscribe('/foo')
|
|
batch.unsubscribe('/foo')
|
|
batch.unsubscribe()
|
|
batch.subscribe(['/foo'])
|
|
batch.exec().then(() => {
|
|
// There's one subscriber to this channel
|
|
pub.pubsub('numsub', '/foo').then(helper.isDeepEqual(['/foo', 1]))
|
|
// There's exactly one channel that is listened too
|
|
pub.pubsub('channels').then(helper.isDeepEqual(['/foo']))
|
|
// One pattern is active
|
|
pub.pubsub('numpat').then(helper.isNumber(1))
|
|
pub.publish('/foo', 'hello world').then(helper.isNumber(2))
|
|
})
|
|
// Either messageBuffers or buffers has to be true, but not both at the same time
|
|
sub.on('message', (channel, message, pattern) => {
|
|
if (pattern) {
|
|
assert.strictEqual(pattern, '*')
|
|
}
|
|
assert.strictEqual(channel, '/foo')
|
|
assert.strictEqual(message, 'hello world')
|
|
end()
|
|
})
|
|
})
|
|
})
|
|
})
|
|
|
|
describe('punsubscribe', () => {
|
|
it('does not complain when punsubscribe is called and there are no subscriptions', () => {
|
|
return sub.punsubscribe()
|
|
})
|
|
|
|
it('executes when punsubscribe is called and there are no subscriptions', () => {
|
|
return pub.batch().punsubscribe().exec().then(helper.isDeepEqual([[0, []]]))
|
|
})
|
|
})
|
|
|
|
describe('fail for other commands while in pub sub mode', () => {
|
|
it('return error if only pub sub commands are allowed', () => {
|
|
return Promise.all([
|
|
sub.subscribe('channel'),
|
|
// Ping is allowed even if not listed as such!
|
|
sub.ping().then(helper.isDeepEqual(['pong', ''])),
|
|
// Get is forbidden
|
|
sub.get('foo').then(helper.fail).catch((err) => {
|
|
assert(/^ERR only \(P\)SUBSCRIBE \/ \(P\)UNSUBSCRIBE/.test(err.message))
|
|
assert.strictEqual(err.command, 'GET')
|
|
}),
|
|
// Quit is allowed
|
|
sub.quit()
|
|
])
|
|
})
|
|
})
|
|
|
|
it('should not publish a message multiple times per command', (done) => {
|
|
const published = {}
|
|
|
|
function subscribe(message) {
|
|
sub.removeAllListeners('subscribe')
|
|
sub.removeAllListeners('message')
|
|
sub.removeAllListeners('unsubscribe')
|
|
sub.on('subscribe', () => {
|
|
pub.publish('/foo', message)
|
|
})
|
|
sub.on('message', (channel, message) => {
|
|
if (published[message]) {
|
|
done(new Error('Message published more than once.'))
|
|
}
|
|
published[message] = true
|
|
})
|
|
sub.on('unsubscribe', (channel, count) => {
|
|
assert.strictEqual(count, 0)
|
|
})
|
|
sub.subscribe('/foo')
|
|
}
|
|
|
|
subscribe('hello')
|
|
|
|
setTimeout(() => {
|
|
sub.unsubscribe()
|
|
setTimeout(() => {
|
|
subscribe('world')
|
|
setTimeout(done, 50)
|
|
}, 40)
|
|
}, 40)
|
|
})
|
|
|
|
it('should not publish a message without any publish command', (done) => {
|
|
pub.set('foo', 'message')
|
|
pub.set('bar', 'hello')
|
|
pub.mget('foo', 'bar')
|
|
pub.subscribe('channel').then(() => setTimeout(done, 50))
|
|
pub.on('message', (msg) => {
|
|
done(new Error(`This message should not have been published: ${msg}`))
|
|
})
|
|
})
|
|
|
|
it('arguments variants', () => {
|
|
return sub.batch()
|
|
.info(['stats'])
|
|
.info()
|
|
.client('KILL', ['type', 'pubsub'])
|
|
.client('KILL', ['type', 'pubsub'])
|
|
.unsubscribe()
|
|
.psubscribe(['pattern:*'])
|
|
.punsubscribe('unknown*')
|
|
.punsubscribe(['pattern:*'])
|
|
.exec()
|
|
.then(() => Promise.all([
|
|
sub.client('kill', ['type', 'pubsub']),
|
|
sub.psubscribe('*'),
|
|
sub.punsubscribe('pa*'),
|
|
sub.punsubscribe(['a', '*'])
|
|
]))
|
|
})
|
|
|
|
afterEach(() => {
|
|
// Explicitly ignore still running commands
|
|
pub.end(false)
|
|
sub.end(false)
|
|
})
|
|
})
|
|
})
|
|
})
|