You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
chore: use arrow functions
This commit is contained in:
@@ -70,12 +70,11 @@ Test.prototype.run = function (callback) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Test.prototype.newClient = function (id) {
|
Test.prototype.newClient = function (id) {
|
||||||
const self = this
|
|
||||||
const newClient = redis.createClient(this.clientOptions)
|
const newClient = redis.createClient(this.clientOptions)
|
||||||
newClient.createTime = Date.now()
|
newClient.createTime = Date.now()
|
||||||
|
|
||||||
newClient.on('connect', () => {
|
newClient.on('connect', () => {
|
||||||
self.connectLatency.update(Date.now() - newClient.createTime)
|
this.connectLatency.update(Date.now() - newClient.createTime)
|
||||||
})
|
})
|
||||||
|
|
||||||
newClient.on('ready', () => {
|
newClient.on('ready', () => {
|
||||||
@@ -88,10 +87,10 @@ Test.prototype.newClient = function (id) {
|
|||||||
].join(', '))
|
].join(', '))
|
||||||
versionsLogged = true
|
versionsLogged = true
|
||||||
}
|
}
|
||||||
self.readyLatency.update(Date.now() - newClient.createTime)
|
this.readyLatency.update(Date.now() - newClient.createTime)
|
||||||
self.clientsReady++
|
this.clientsReady++
|
||||||
if (self.clientsReady === self.clients.length) {
|
if (this.clientsReady === this.clients.length) {
|
||||||
self.onClientsReady()
|
this.onClientsReady()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -113,7 +112,7 @@ Test.prototype.newClient = function (id) {
|
|||||||
}, path.resolve(__dirname, conf))
|
}, path.resolve(__dirname, conf))
|
||||||
})
|
})
|
||||||
|
|
||||||
self.clients[id] = newClient
|
this.clients[id] = newClient
|
||||||
}
|
}
|
||||||
|
|
||||||
Test.prototype.onClientsReady = function () {
|
Test.prototype.onClientsReady = function () {
|
||||||
@@ -147,7 +146,6 @@ Test.prototype.fillPipeline = function () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Test.prototype.batch = function () {
|
Test.prototype.batch = function () {
|
||||||
const self = this
|
|
||||||
const curClient = clientNr++ % this.clients.length
|
const curClient = clientNr++ % this.clients.length
|
||||||
const start = process.hrtime()
|
const start = process.hrtime()
|
||||||
let i = 0
|
let i = 0
|
||||||
@@ -159,19 +157,17 @@ Test.prototype.batch = function () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
batch.exec().then((res) => {
|
batch.exec().then((res) => {
|
||||||
self.commandsCompleted += res.length
|
this.commandsCompleted += res.length
|
||||||
self.commandLatency.update(process.hrtime(start)[1])
|
this.commandLatency.update(process.hrtime(start)[1])
|
||||||
return self.fillPipeline()
|
return this.fillPipeline()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
Test.prototype.stopClients = function () {
|
Test.prototype.stopClients = function () {
|
||||||
const self = this
|
|
||||||
|
|
||||||
return Promise.all(this.clients.map((client, pos) => {
|
return Promise.all(this.clients.map((client, pos) => {
|
||||||
if (pos === self.clients.length - 1) {
|
if (pos === this.clients.length - 1) {
|
||||||
return client.quit().then((res) => {
|
return client.quit().then((res) => {
|
||||||
self.callback()
|
this.callback()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return client.quit()
|
return client.quit()
|
||||||
@@ -179,14 +175,13 @@ Test.prototype.stopClients = function () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Test.prototype.sendNext = function () {
|
Test.prototype.sendNext = function () {
|
||||||
const self = this
|
|
||||||
const curClient = this.commandsSent % this.clients.length
|
const curClient = this.commandsSent % this.clients.length
|
||||||
const start = process.hrtime()
|
const start = process.hrtime()
|
||||||
|
|
||||||
this.clients[curClient][this.args.command](this.args.args).then((res) => {
|
this.clients[curClient][this.args.command](this.args.args).then((res) => {
|
||||||
self.commandsCompleted++
|
this.commandsCompleted++
|
||||||
self.commandLatency.update(process.hrtime(start)[1])
|
this.commandLatency.update(process.hrtime(start)[1])
|
||||||
return self.fillPipeline()
|
return this.fillPipeline()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
51
index.js
51
index.js
@@ -51,7 +51,6 @@ function RedisClient (options, stream) {
|
|||||||
options = utils.clone(options)
|
options = utils.clone(options)
|
||||||
EventEmitter.call(this)
|
EventEmitter.call(this)
|
||||||
const cnxOptions = {}
|
const cnxOptions = {}
|
||||||
const self = this
|
|
||||||
/* istanbul ignore next: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */
|
/* istanbul ignore next: travis does not work with stunnel atm. Therefore the tls tests are skipped on travis */
|
||||||
for (const tlsOption in options.tls) {
|
for (const tlsOption in options.tls) {
|
||||||
cnxOptions[tlsOption] = options.tls[tlsOption]
|
cnxOptions[tlsOption] = options.tls[tlsOption]
|
||||||
@@ -89,7 +88,7 @@ function RedisClient (options, stream) {
|
|||||||
options.detectBuffers = !!options.detectBuffers
|
options.detectBuffers = !!options.detectBuffers
|
||||||
// Override the detectBuffers setting if returnBuffers is active and print a warning
|
// Override the detectBuffers setting if returnBuffers is active and print a warning
|
||||||
if (options.returnBuffers && options.detectBuffers) {
|
if (options.returnBuffers && options.detectBuffers) {
|
||||||
self.warn('WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.')
|
this.warn('WARNING: You activated returnBuffers and detectBuffers at the same time. The return value is always going to be a buffer.')
|
||||||
options.detectBuffers = false
|
options.detectBuffers = false
|
||||||
}
|
}
|
||||||
if (options.detectBuffers) {
|
if (options.detectBuffers) {
|
||||||
@@ -184,9 +183,6 @@ function createParser (self) {
|
|||||||
|
|
||||||
// Attention: the function name "createStream" should not be changed, as other libraries need this to mock the stream (e.g. fakeredis)
|
// Attention: the function name "createStream" should not be changed, as other libraries need this to mock the stream (e.g. fakeredis)
|
||||||
RedisClient.prototype.createStream = function () {
|
RedisClient.prototype.createStream = function () {
|
||||||
// TODO: Remove self and use array functions instead
|
|
||||||
const self = this
|
|
||||||
|
|
||||||
// Init parser
|
// Init parser
|
||||||
this.replyParser = createParser(this)
|
this.replyParser = createParser(this)
|
||||||
|
|
||||||
@@ -215,7 +211,7 @@ RedisClient.prototype.createStream = function () {
|
|||||||
// TODO: Investigate why this is not properly triggered
|
// TODO: Investigate why this is not properly triggered
|
||||||
this.stream.setTimeout(this.connectTimeout, () => {
|
this.stream.setTimeout(this.connectTimeout, () => {
|
||||||
// Note: This is only tested if a internet connection is established
|
// Note: This is only tested if a internet connection is established
|
||||||
self.connectionGone('timeout')
|
this.connectionGone('timeout')
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -223,32 +219,32 @@ RedisClient.prototype.createStream = function () {
|
|||||||
const connectEvent = this.options.tls ? 'secureConnect' : 'connect'
|
const connectEvent = this.options.tls ? 'secureConnect' : 'connect'
|
||||||
this.stream.once(connectEvent, () => {
|
this.stream.once(connectEvent, () => {
|
||||||
this.stream.removeAllListeners('timeout')
|
this.stream.removeAllListeners('timeout')
|
||||||
self.timesConnected++
|
this.timesConnected++
|
||||||
self.onConnect()
|
this.onConnect()
|
||||||
})
|
})
|
||||||
|
|
||||||
this.stream.on('data', (bufferFromSocket) => {
|
this.stream.on('data', (bufferFromSocket) => {
|
||||||
// The bufferFromSocket.toString() has a significant impact on big chunks and therefore this should only be used if necessary
|
// The bufferFromSocket.toString() has a significant impact on big chunks and therefore this should only be used if necessary
|
||||||
debug('Net read %s id %s', self.address, self.connectionId) // + ': ' + bufferFromSocket.toString());
|
debug('Net read %s id %s', this.address, this.connectionId) // + ': ' + bufferFromSocket.toString());
|
||||||
self.replyParser.execute(bufferFromSocket)
|
this.replyParser.execute(bufferFromSocket)
|
||||||
})
|
})
|
||||||
|
|
||||||
this.stream.on('error', (err) => {
|
this.stream.on('error', (err) => {
|
||||||
self.onError(err)
|
this.onError(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
/* istanbul ignore next: difficult to test and not important as long as we keep this listener */
|
/* istanbul ignore next: difficult to test and not important as long as we keep this listener */
|
||||||
this.stream.on('clientError', (err) => {
|
this.stream.on('clientError', (err) => {
|
||||||
debug('clientError occurred')
|
debug('clientError occurred')
|
||||||
self.onError(err)
|
this.onError(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
this.stream.once('close', (hadError) => {
|
this.stream.once('close', (hadError) => {
|
||||||
self.connectionGone('close')
|
this.connectionGone('close')
|
||||||
})
|
})
|
||||||
|
|
||||||
this.stream.once('end', () => {
|
this.stream.once('end', () => {
|
||||||
self.connectionGone('end')
|
this.connectionGone('end')
|
||||||
})
|
})
|
||||||
|
|
||||||
this.stream.setNoDelay()
|
this.stream.setNoDelay()
|
||||||
@@ -285,12 +281,11 @@ RedisClient.prototype.initializeRetryVars = function () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
RedisClient.prototype.warn = function (msg) {
|
RedisClient.prototype.warn = function (msg) {
|
||||||
const self = this
|
|
||||||
// Warn on the next tick. Otherwise no event listener can be added
|
// Warn on the next tick. Otherwise no event listener can be added
|
||||||
// for warnings that are emitted in the redis client constructor
|
// for warnings that are emitted in the redis client constructor
|
||||||
process.nextTick(() => {
|
process.nextTick(() => {
|
||||||
if (self.listeners('warning').length !== 0) {
|
if (this.listeners('warning').length !== 0) {
|
||||||
self.emit('warning', msg)
|
this.emit('warning', msg)
|
||||||
} else {
|
} else {
|
||||||
console.warn('nodeRedis:', msg)
|
console.warn('nodeRedis:', msg)
|
||||||
}
|
}
|
||||||
@@ -365,25 +360,23 @@ RedisClient.prototype.onConnect = function () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
RedisClient.prototype.onReady = function () {
|
RedisClient.prototype.onReady = function () {
|
||||||
const self = this
|
|
||||||
|
|
||||||
debug('onReady called %s id %s', this.address, this.connectionId)
|
debug('onReady called %s id %s', this.address, this.connectionId)
|
||||||
this.ready = true
|
this.ready = true
|
||||||
|
|
||||||
this.cork = function () {
|
this.cork = () => {
|
||||||
self.pipeline = true
|
this.pipeline = true
|
||||||
self.stream.cork()
|
this.stream.cork()
|
||||||
}
|
}
|
||||||
this.uncork = function () {
|
this.uncork = () => {
|
||||||
if (self.fireStrings) {
|
if (this.fireStrings) {
|
||||||
self.writeStrings()
|
this.writeStrings()
|
||||||
} else {
|
} else {
|
||||||
self.writeBuffers()
|
this.writeBuffers()
|
||||||
}
|
}
|
||||||
self.pipeline = false
|
this.pipeline = false
|
||||||
self.fireStrings = true
|
this.fireStrings = true
|
||||||
// TODO: Consider using next tick here. See https://github.com/NodeRedis/nodeRedis/issues/1033
|
// TODO: Consider using next tick here. See https://github.com/NodeRedis/nodeRedis/issues/1033
|
||||||
self.stream.uncork()
|
this.stream.uncork()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore modal commands from previous connection. The order of the commands is important
|
// Restore modal commands from previous connection. The order of the commands is important
|
||||||
|
@@ -49,13 +49,12 @@ Multi.prototype.select = function select (db) {
|
|||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function monitor () {
|
RedisClient.prototype.monitor = function monitor () {
|
||||||
// Use a individual command, as this is a special case that does not has to be checked for any other command
|
// Use a individual command, as this is a special case that does not has to be checked for any other command
|
||||||
const self = this
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
|
||||||
// Activating monitor mode has to happen before Redis returned the callback. The monitor result is returned first.
|
// Activating monitor mode has to happen before Redis returned the callback. The monitor result is returned first.
|
||||||
// Therefore we expect the command to be properly processed. If this is not the case, it's not an issue either.
|
// Therefore we expect the command to be properly processed. If this is not the case, it's not an issue either.
|
||||||
self.monitoring = true
|
this.monitoring = true
|
||||||
}
|
}
|
||||||
return this.internalSendCommand(new Command('monitor', [], callOnWrite))
|
return this.internalSendCommand(new Command('monitor', [], callOnWrite))
|
||||||
}
|
}
|
||||||
@@ -64,9 +63,8 @@ RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function monitor
|
|||||||
Multi.prototype.monitor = function monitor () {
|
Multi.prototype.monitor = function monitor () {
|
||||||
// Use a individual command, as this is a special case that does not has to be checked for any other command
|
// Use a individual command, as this is a special case that does not has to be checked for any other command
|
||||||
if (this.exec !== this.execTransaction) {
|
if (this.exec !== this.execTransaction) {
|
||||||
const self = this
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
this._client.monitoring = true
|
||||||
self._client.monitoring = true
|
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('monitor', [], callOnWrite))
|
this.queue.push(new Command('monitor', [], callOnWrite))
|
||||||
return this
|
return this
|
||||||
@@ -108,13 +106,12 @@ RedisClient.prototype.quit = function quit () {
|
|||||||
|
|
||||||
// Only works with batch, not in a transaction
|
// Only works with batch, not in a transaction
|
||||||
Multi.prototype.quit = function quit () {
|
Multi.prototype.quit = function quit () {
|
||||||
const self = this._client
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
|
||||||
// If called in a multi context, we expect redis is available
|
// If called in a multi context, we expect redis is available
|
||||||
self.closing = true
|
this._client.closing = true
|
||||||
self.ready = false
|
this._client.ready = false
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('quit', [], null, quitCallback(self), callOnWrite))
|
this.queue.push(new Command('quit', [], null, quitCallback(this._client), callOnWrite))
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -209,15 +206,14 @@ RedisClient.prototype.client = function client () {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
const self = this
|
|
||||||
var callOnWrite
|
var callOnWrite
|
||||||
// CLIENT REPLY ON|OFF|SKIP
|
// CLIENT REPLY ON|OFF|SKIP
|
||||||
/* istanbul ignore next: TODO: Remove this as soon as Travis runs Redis 3.2 */
|
/* istanbul ignore next: TODO: Remove this as soon as Travis runs Redis 3.2 */
|
||||||
if (arr.length === 2 && arr[0].toString().toUpperCase() === 'REPLY') {
|
if (arr.length === 2 && arr[0].toString().toUpperCase() === 'REPLY') {
|
||||||
const replyOnOff = arr[1].toString().toUpperCase()
|
const replyOnOff = arr[1].toString().toUpperCase()
|
||||||
if (replyOnOff === 'ON' || replyOnOff === 'OFF' || replyOnOff === 'SKIP') {
|
if (replyOnOff === 'ON' || replyOnOff === 'OFF' || replyOnOff === 'SKIP') {
|
||||||
callOnWrite = function () {
|
callOnWrite = () => {
|
||||||
self.reply = replyOnOff
|
this.reply = replyOnOff
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -225,34 +221,19 @@ RedisClient.prototype.client = function client () {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Multi.prototype.client = function client () {
|
Multi.prototype.client = function client () {
|
||||||
var arr
|
const len = arguments.length
|
||||||
var len = arguments.length
|
const arr = new Array(len)
|
||||||
var i = 0
|
for (var i = 0; i < len; i += 1) {
|
||||||
if (Array.isArray(arguments[0])) {
|
arr[i] = arguments[i]
|
||||||
arr = arguments[0]
|
|
||||||
} else if (Array.isArray(arguments[1])) {
|
|
||||||
len = arguments[1].length
|
|
||||||
arr = new Array(len + 1)
|
|
||||||
arr[0] = arguments[0]
|
|
||||||
for (; i < len; i += 1) {
|
|
||||||
arr[i + 1] = arguments[1][i]
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
len = arguments.length
|
|
||||||
arr = new Array(len)
|
|
||||||
for (; i < len; i += 1) {
|
|
||||||
arr[i] = arguments[i]
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
const self = this._client
|
|
||||||
var callOnWrite
|
var callOnWrite
|
||||||
// CLIENT REPLY ON|OFF|SKIP
|
// CLIENT REPLY ON|OFF|SKIP
|
||||||
/* istanbul ignore next: TODO: Remove this as soon as Travis runs Redis 3.2 */
|
/* istanbul ignore next: TODO: Remove this as soon as Travis runs Redis 3.2 */
|
||||||
if (arr.length === 2 && arr[0].toString().toUpperCase() === 'REPLY') {
|
if (arr.length === 2 && arr[0].toString().toUpperCase() === 'REPLY') {
|
||||||
const replyOnOff = arr[1].toString().toUpperCase()
|
const replyOnOff = arr[1].toString().toUpperCase()
|
||||||
if (replyOnOff === 'ON' || replyOnOff === 'OFF' || replyOnOff === 'SKIP') {
|
if (replyOnOff === 'ON' || replyOnOff === 'OFF' || replyOnOff === 'SKIP') {
|
||||||
callOnWrite = function () {
|
callOnWrite = () => {
|
||||||
self.reply = replyOnOff
|
this._client.reply = replyOnOff
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -266,9 +247,8 @@ RedisClient.prototype.subscribe = function subscribe () {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
const self = this
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
this.pubSubMode = this.pubSubMode || this.commandQueue.length + 1
|
||||||
self.pubSubMode = self.pubSubMode || self.commandQueue.length + 1
|
|
||||||
}
|
}
|
||||||
return this.internalSendCommand(new Command('subscribe', arr, callOnWrite))
|
return this.internalSendCommand(new Command('subscribe', arr, callOnWrite))
|
||||||
}
|
}
|
||||||
@@ -279,9 +259,8 @@ Multi.prototype.subscribe = function subscribe () {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
const self = this._client
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
||||||
self.pubSubMode = self.pubSubMode || self.commandQueue.length + 1
|
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('subscribe', arr, callOnWrite))
|
this.queue.push(new Command('subscribe', arr, callOnWrite))
|
||||||
return this
|
return this
|
||||||
@@ -293,10 +272,9 @@ RedisClient.prototype.unsubscribe = function unsubscribe () {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
const self = this
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
|
||||||
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
||||||
self.pubSubMode = self.pubSubMode || self.commandQueue.length + 1
|
this.pubSubMode = this.pubSubMode || this.commandQueue.length + 1
|
||||||
}
|
}
|
||||||
return this.internalSendCommand(new Command('unsubscribe', arr, callOnWrite))
|
return this.internalSendCommand(new Command('unsubscribe', arr, callOnWrite))
|
||||||
}
|
}
|
||||||
@@ -307,10 +285,9 @@ Multi.prototype.unsubscribe = function unsubscribe () {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
const self = this._client
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
|
||||||
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
||||||
self.pubSubMode = self.pubSubMode || self.commandQueue.length + 1
|
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('unsubscribe', arr, callOnWrite))
|
this.queue.push(new Command('unsubscribe', arr, callOnWrite))
|
||||||
return this
|
return this
|
||||||
@@ -322,9 +299,8 @@ RedisClient.prototype.psubscribe = function psubscribe () {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
const self = this
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
this.pubSubMode = this.pubSubMode || this.commandQueue.length + 1
|
||||||
self.pubSubMode = self.pubSubMode || self.commandQueue.length + 1
|
|
||||||
}
|
}
|
||||||
return this.internalSendCommand(new Command('psubscribe', arr, callOnWrite))
|
return this.internalSendCommand(new Command('psubscribe', arr, callOnWrite))
|
||||||
}
|
}
|
||||||
@@ -335,9 +311,8 @@ Multi.prototype.psubscribe = function psubscribe () {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
const self = this._client
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
||||||
self.pubSubMode = self.pubSubMode || self.commandQueue.length + 1
|
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('psubscribe', arr, callOnWrite))
|
this.queue.push(new Command('psubscribe', arr, callOnWrite))
|
||||||
return this
|
return this
|
||||||
@@ -349,10 +324,9 @@ RedisClient.prototype.punsubscribe = function punsubscribe () {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
const self = this
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
|
||||||
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
||||||
self.pubSubMode = self.pubSubMode || self.commandQueue.length + 1
|
this.pubSubMode = this.pubSubMode || this.commandQueue.length + 1
|
||||||
}
|
}
|
||||||
return this.internalSendCommand(new Command('punsubscribe', arr, callOnWrite))
|
return this.internalSendCommand(new Command('punsubscribe', arr, callOnWrite))
|
||||||
}
|
}
|
||||||
@@ -363,10 +337,9 @@ Multi.prototype.punsubscribe = function punsubscribe () {
|
|||||||
for (var i = 0; i < len; i += 1) {
|
for (var i = 0; i < len; i += 1) {
|
||||||
arr[i] = arguments[i]
|
arr[i] = arguments[i]
|
||||||
}
|
}
|
||||||
const self = this._client
|
const callOnWrite = () => {
|
||||||
const callOnWrite = function () {
|
|
||||||
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
// Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback
|
||||||
self.pubSubMode = self.pubSubMode || self.commandQueue.length + 1
|
this._client.pubSubMode = this._client.pubSubMode || this._client.commandQueue.length + 1
|
||||||
}
|
}
|
||||||
this.queue.push(new Command('punsubscribe', arr, callOnWrite))
|
this.queue.push(new Command('punsubscribe', arr, callOnWrite))
|
||||||
return this
|
return this
|
||||||
|
@@ -101,9 +101,8 @@ Multi.prototype.execTransaction = function execTransaction () {
|
|||||||
|
|
||||||
const main = this._client.internalSendCommand(new Command('exec', []))
|
const main = this._client.internalSendCommand(new Command('exec', []))
|
||||||
this._client.uncork()
|
this._client.uncork()
|
||||||
const self = this
|
return Promise.all(promises).then(() => main.then((replies) => multiCallback(this, replies)).catch((err) => {
|
||||||
return Promise.all(promises).then(() => main.then((replies) => multiCallback(self, replies)).catch((err) => {
|
err.errors = this.errors
|
||||||
err.errors = self.errors
|
|
||||||
return Promise.reject(err)
|
return Promise.reject(err)
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
// helper to start and stop the stunnel process.
|
// Helper to start and stop the stunnel process.
|
||||||
const spawn = require('child_process').spawn
|
const spawn = require('child_process').spawn
|
||||||
const EventEmitter = require('events')
|
const EventEmitter = require('events')
|
||||||
const fs = require('fs')
|
const fs = require('fs')
|
||||||
@@ -19,38 +19,37 @@ function once (cb) {
|
|||||||
function StunnelProcess (confDir) {
|
function StunnelProcess (confDir) {
|
||||||
EventEmitter.call(this)
|
EventEmitter.call(this)
|
||||||
|
|
||||||
// set up an stunnel to redis; edit the conf file to include required absolute paths
|
// Set up an stunnel to redis; edit the conf file to include required absolute paths
|
||||||
const confFile = path.resolve(confDir, 'stunnel.conf')
|
const confFile = path.resolve(confDir, 'stunnel.conf')
|
||||||
const confText = fs.readFileSync(`${confFile }.template`).toString().replace(/__dirname,/g, confDir)
|
const confText = fs.readFileSync(`${confFile}.template`).toString().replace(/__dirname,/g, confDir)
|
||||||
|
|
||||||
fs.writeFileSync(confFile, confText)
|
fs.writeFileSync(confFile, confText)
|
||||||
const stunnel = this.stunnel = spawn('stunnel', [confFile])
|
const stunnel = this.stunnel = spawn('stunnel', [confFile])
|
||||||
|
|
||||||
// handle child process events, and failure to set up tunnel
|
// Handle child process events, and failure to set up tunnel
|
||||||
const self = this
|
|
||||||
this.timer = setTimeout(() => {
|
this.timer = setTimeout(() => {
|
||||||
self.emit('error', new Error('Timeout waiting for stunnel to start'))
|
this.emit('error', new Error('Timeout waiting for stunnel to start'))
|
||||||
}, 8000)
|
}, 8000)
|
||||||
|
|
||||||
stunnel.on('error', (err) => {
|
stunnel.on('error', (err) => {
|
||||||
self.clear()
|
this.clear()
|
||||||
self.emit('error', err)
|
this.emit('error', err)
|
||||||
})
|
})
|
||||||
|
|
||||||
stunnel.on('exit', (code) => {
|
stunnel.on('exit', (code) => {
|
||||||
self.clear()
|
this.clear()
|
||||||
if (code === 0) {
|
if (code === 0) {
|
||||||
self.emit('stopped')
|
this.emit('stopped')
|
||||||
} else {
|
} else {
|
||||||
self.emit('error', new Error(`Stunnel exited unexpectedly; code = ${code}`))
|
this.emit('error', new Error(`Stunnel exited unexpectedly; code = ${code}`))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// wait to stunnel to start
|
// Wait to stunnel to start
|
||||||
stunnel.stderr.on('data', function (data) {
|
stunnel.stderr.on('data', (data) => {
|
||||||
if (data.toString().match(/Service.+redis.+bound/)) {
|
if (data.toString().match(/Service.+redis.+bound/)) {
|
||||||
clearTimeout(this.timer)
|
clearTimeout(this.timer)
|
||||||
self.emit('started')
|
this.emit('started')
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user