You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-01 16:46:54 +03:00
This also improves the performance for multi / batch commands a lot. The reason is that now there are only callbacks internally even if a promise is going to be returned in the end.
151 lines
4.3 KiB
JavaScript
151 lines
4.3 KiB
JavaScript
'use strict'
|
|
|
|
const debug = require('./debug')
|
|
const utils = require('./utils')
|
|
|
|
function onConnect(client) {
|
|
debug('Stream connected %s id %s', client.address, client.connectionId)
|
|
|
|
// TODO: Check if the clients prototype and the clients instance have
|
|
// fast properties. If that's not the case, make them fast properties
|
|
// again!
|
|
client.connected = true
|
|
client._stream.setKeepAlive(client._options.socketKeepalive)
|
|
client._stream.setTimeout(0)
|
|
|
|
// TODO: Deprecate the connect event.
|
|
client.emit('connect')
|
|
utils.setReconnectDefaults(client)
|
|
|
|
if (client._options.noReadyCheck) {
|
|
readyHandler(client)
|
|
} else {
|
|
readyCheck(client)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @description Empty the offline queue and call the commands
|
|
*
|
|
* @param {RedisClient} client
|
|
*/
|
|
function sendOfflineQueue(client) {
|
|
const queue = client.offlineQueue
|
|
while (queue.length) {
|
|
const command = queue.shift()
|
|
debug('Sending offline command: %s', command.command)
|
|
client.internalSendCommand(command)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @description Transparently perform predefined commands and emit ready.
|
|
*
|
|
* Emit ready before the all commands returned.
|
|
* The order of the commands is important.
|
|
*
|
|
* @param {RedisClient} client
|
|
*/
|
|
function readyHandler(client) {
|
|
debug('readyHandler called %s id %s', client.address, client.connectionId)
|
|
client.ready = true
|
|
|
|
if (client.selectedDb !== undefined) {
|
|
client.select(client.selectedDb, (err) => {
|
|
if (err !== null && !client._closing) {
|
|
// TODO: These internal things should be wrapped in a
|
|
// special error that describes what is happening
|
|
process.nextTick(client.emit, 'error', err)
|
|
}
|
|
})
|
|
}
|
|
if (client._monitoring) { // Monitor has to be fired before pub sub commands
|
|
client.monitor((err) => {
|
|
if (err !== null && !client._closing) {
|
|
process.nextTick(client.emit, 'error', err)
|
|
}
|
|
})
|
|
}
|
|
const callbackCount = Object.keys(client._subscriptionSet).length
|
|
// TODO: Replace the disableResubscribing by a individual function that may be called
|
|
// Add HOOKS!!!
|
|
// Replace the disableResubscribing by:
|
|
// resubmit: {
|
|
// select: true,
|
|
// monitor: true,
|
|
// subscriptions: true,
|
|
// // individual: function noop () {}
|
|
// }
|
|
if (!client._options.disableResubscribing && callbackCount) {
|
|
debug('Sending pub/sub commands')
|
|
const keys = Object.keys(client._subscriptionSet)
|
|
for (var i = 0; i < keys.length; i++) {
|
|
const key = keys[i]
|
|
const command = key.slice(0, key.indexOf('_'))
|
|
const args = client._subscriptionSet[key]
|
|
client[command]([args]).catch((err) => {
|
|
if (!client._closing) {
|
|
process.nextTick(client.emit, 'error', err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
sendOfflineQueue(client)
|
|
client.emit('ready')
|
|
}
|
|
|
|
/**
|
|
* @description Perform a info command and check if Redis is ready
|
|
*
|
|
* @param {RedisClient} client
|
|
*/
|
|
function readyCheck(client) {
|
|
debug('Checking server ready state...')
|
|
// Always fire client info command as first command even if other commands are already queued up
|
|
client.ready = true
|
|
client.info().then((res) => {
|
|
if (!res) {
|
|
debug('The info command returned without any data.')
|
|
readyHandler(client)
|
|
return
|
|
}
|
|
|
|
const persistence = client.serverInfo.persistence
|
|
if (persistence === undefined ||
|
|
persistence.loading === undefined ||
|
|
persistence.loading === 0) {
|
|
// If the master_link_status exists but the link is not up, try again after 50 ms
|
|
const replication = client.serverInfo.replication
|
|
if (replication && typeof replication.master_link_status === 'string' && replication.master_link_status !== 'up') {
|
|
persistence.loading_eta_seconds = 0.05
|
|
} else {
|
|
// Eta loading should change
|
|
debug('Redis server ready.')
|
|
readyHandler(client)
|
|
return
|
|
}
|
|
}
|
|
|
|
let retryTime = +persistence.loading_eta_seconds * 1000
|
|
if (retryTime > 1000) {
|
|
retryTime = 1000
|
|
}
|
|
debug('Redis server still loading, trying again in %s', retryTime)
|
|
setTimeout(client => readyCheck(client), retryTime, client)
|
|
}).catch((err) => {
|
|
if (client._closing) {
|
|
return
|
|
}
|
|
|
|
if (err.message === "ERR unknown command 'info'") {
|
|
readyHandler(client)
|
|
return
|
|
}
|
|
err.message = `Ready check failed: ${err.message}`
|
|
client.emit('error', err)
|
|
})
|
|
client.ready = false
|
|
}
|
|
|
|
module.exports = onConnect
|