You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-07 13:22:56 +03:00
feat: consolidate pubsub emitters
This commit is contained in:
@@ -41,7 +41,10 @@ Breaking Changes
|
||||
- Removed `Redis.print` helper function
|
||||
- Removed backpressure indicator from function return value
|
||||
- Removed the `stream` parameter from the RedisClient constructor.
|
||||
Please set the stream in the options instead
|
||||
- Please set the stream in the options instead
|
||||
- Removed `pmessage` and `pmessageBuffer` emitters
|
||||
- From now on `message` and `messageBuffer` receive a third argument `pattern`
|
||||
in case the message type is a pattern.
|
||||
- Changed return value of `(p)(un)subscribe`
|
||||
- Return an array with the number of current subscribed channels and an array
|
||||
with all affected channels
|
||||
@@ -59,7 +62,7 @@ Breaking Changes
|
||||
- Changed the `serverInfo` into a nested object and to parse numbers
|
||||
- Changed the `serverInfo.versions` to `serverInfo.version`
|
||||
- Changed the `message` and `pmessage` listener to always return a string
|
||||
If you want to receive a buffer, please listen to the `messageBuffer` or `pmessageBuffer`
|
||||
- If you want to receive a buffer, please listen to the `messageBuffer` or `pmessageBuffer`
|
||||
- Using `.end` without the flush parameter is now going to throw an TypeError
|
||||
- Only emit ready when all commands were truly send to Redis
|
||||
|
||||
|
@@ -58,9 +58,6 @@ function subscribeUnsubscribe (client, reply, type) {
|
||||
|
||||
function returnPubSub (client, reply) {
|
||||
const type = reply[0].toString()
|
||||
// TODO: Consolidate `message` and `pmessage`.
|
||||
// It would be more straight forward to only listen to a single "message"
|
||||
// and in case of a "pmessage" a third argument would be passed (the pattern).
|
||||
if (type === 'message') { // Channel, message
|
||||
if (typeof reply[1] !== 'string') {
|
||||
client.emit('message', reply[1].toString(), reply[2].toString())
|
||||
@@ -68,12 +65,12 @@ function returnPubSub (client, reply) {
|
||||
} else {
|
||||
client.emit('message', reply[1], reply[2])
|
||||
}
|
||||
} else if (type === 'pmessage') { // Pattern, channel, message
|
||||
} else if (type === 'pmessage') { // Channel, message, pattern
|
||||
if (typeof reply[1] !== 'string') {
|
||||
client.emit('pmessage', reply[1].toString(), reply[2].toString(), reply[3].toString())
|
||||
client.emit('pmessageBuffer', reply[1], reply[2], reply[3])
|
||||
client.emit('message', reply[2].toString(), reply[3].toString(), reply[1].toString())
|
||||
client.emit('messageBuffer', reply[2], reply[3], reply[1])
|
||||
} else {
|
||||
client.emit('pmessage', reply[1], reply[2], reply[3])
|
||||
client.emit('message', reply[2], reply[3], reply[1])
|
||||
}
|
||||
} else {
|
||||
subscribeUnsubscribe(client, reply, type)
|
||||
|
@@ -369,7 +369,7 @@ describe('publish/subscribe', () => {
|
||||
assert.strictEqual(count, rest++ - 1)
|
||||
}
|
||||
})
|
||||
sub.on('pmessage', (pattern, channel, msg) => {
|
||||
sub.on('message', (channel, msg, pattern) => {
|
||||
assert.strictEqual(msg, 'test')
|
||||
assert.strictEqual(pattern, 'prefix:*')
|
||||
assert.strictEqual(channel, 'prefix:1')
|
||||
@@ -436,6 +436,7 @@ describe('publish/subscribe', () => {
|
||||
const sub2 = redis.createClient({
|
||||
returnBuffers: true
|
||||
})
|
||||
const end = helper.callFuncAfter(() => sub2.quit().then(() => done()), 2)
|
||||
sub.subscribe('/foo').then(() => {
|
||||
sub2.on('ready', () => {
|
||||
sub2.batch().psubscribe('*').exec().then(helper.isDeepEqual([[1, ['*']]]))
|
||||
@@ -444,28 +445,32 @@ describe('publish/subscribe', () => {
|
||||
// sub2 is counted twice as it subscribed with psubscribe and subscribe
|
||||
pub.publish('/foo', 'hello world').then(helper.isNumber(3))
|
||||
})
|
||||
sub2.on('pmessageBuffer', (pattern, channel, message) => {
|
||||
sub2.on('messageBuffer', (channel, message, pattern) => {
|
||||
if (pattern) {
|
||||
assert.strictEqual(pattern.inspect(), Buffer.from('*').inspect())
|
||||
}
|
||||
assert.strictEqual(channel.inspect(), Buffer.from('/foo').inspect())
|
||||
assert.strictEqual(message.inspect(), Buffer.from('hello world').inspect())
|
||||
sub2.quit().then(() => done())
|
||||
end()
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
it('allows to listen to pmessageBuffer and pmessage', (done) => {
|
||||
const end = helper.callFuncAfter(done, 3)
|
||||
const end = helper.callFuncAfter(done, 4)
|
||||
const data = Array(10000).join('äüs^öéÉÉ`e')
|
||||
sub.set('foo', data).then(() => {
|
||||
sub.get('foo').then((res) => assert.strictEqual(typeof res, 'string'))
|
||||
sub._stream.once('data', () => {
|
||||
assert.strictEqual(sub.messageBuffers, false)
|
||||
assert.strictEqual(sub.shouldBuffer, false)
|
||||
sub.on('pmessageBuffer', (pattern, channel, message) => {
|
||||
sub.on('messageBuffer', (channel, message, pattern) => {
|
||||
if (pattern) {
|
||||
assert.strictEqual(pattern.inspect(), Buffer.from('*').inspect())
|
||||
}
|
||||
assert.strictEqual(channel.inspect(), Buffer.from('/foo').inspect())
|
||||
sub.quit().then(end)
|
||||
end()
|
||||
})
|
||||
assert.notStrictEqual(sub.messageBuffers, sub.buffers)
|
||||
})
|
||||
@@ -485,13 +490,10 @@ describe('publish/subscribe', () => {
|
||||
pub.publish('/foo', 'hello world').then(helper.isNumber(2))
|
||||
})
|
||||
// Either messageBuffers or buffers has to be true, but not both at the same time
|
||||
sub.on('pmessage', (pattern, channel, message) => {
|
||||
sub.on('message', (channel, message, pattern) => {
|
||||
if (pattern) {
|
||||
assert.strictEqual(pattern, '*')
|
||||
assert.strictEqual(channel, '/foo')
|
||||
assert.strictEqual(message, 'hello world')
|
||||
end()
|
||||
})
|
||||
sub.on('message', (channel, message) => {
|
||||
}
|
||||
assert.strictEqual(channel, '/foo')
|
||||
assert.strictEqual(message, 'hello world')
|
||||
end()
|
||||
|
@@ -239,9 +239,9 @@ describe('returnBuffers', () => {
|
||||
pub.publish(channel, message)
|
||||
})
|
||||
|
||||
sub.on('message', (chnl, msg) => {
|
||||
assert.strictEqual(true, Buffer.isBuffer(msg))
|
||||
assert.strictEqual('<Buffer 74 65 73 74 20 6d 65 73 73 61 67 65>', msg.inspect())
|
||||
sub.on('messageBuffer', (chnl, msg) => {
|
||||
assert.strictEqual(Buffer.isBuffer(msg), true)
|
||||
assert.strictEqual(msg.inspect(), '<Buffer 74 65 73 74 20 6d 65 73 73 61 67 65>')
|
||||
done()
|
||||
})
|
||||
|
||||
|
Reference in New Issue
Block a user