You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-01 16:46:54 +03:00
173 lines
5.1 KiB
JavaScript
173 lines
5.1 KiB
JavaScript
'use strict'
|
|
|
|
const Commands = require('redis-commands')
|
|
const debug = require('./debug')
|
|
const utils = require('./utils')
|
|
|
|
const copy = []
|
|
|
|
var bufferCount = 0
|
|
var errors = null
|
|
|
|
/**
|
|
* @description Pipeline and write all commands to the stream
|
|
*
|
|
* If the pipelined string exceeds X mb, write it directly to the stream and
|
|
* pipeline the rest again.
|
|
* @param {RedisClient} client
|
|
*/
|
|
function writeToStream(client) {
|
|
const stream = client._stream
|
|
const queue = client._pipelineQueue
|
|
const cache = client._strCache
|
|
var buffer = false
|
|
while (queue.length) {
|
|
buffer = stream.write(queue.shift())
|
|
}
|
|
if (cache.length !== 0) {
|
|
buffer = stream.write(cache)
|
|
client._strCache = ''
|
|
}
|
|
client.shouldBuffer = !buffer
|
|
stream.uncork()
|
|
client._pipeline = false
|
|
}
|
|
|
|
function write(client) {
|
|
if (client._pipeline === false) {
|
|
client._stream.cork()
|
|
client._pipeline = true
|
|
process.nextTick(writeToStream, client)
|
|
}
|
|
}
|
|
|
|
function pipelineBuffers(client, commandStr) {
|
|
const queue = client._pipelineQueue
|
|
client._strCache += commandStr
|
|
while (copy.length) {
|
|
const arg = copy.shift()
|
|
if (typeof arg === 'string') {
|
|
client._strCache += `$${Buffer.byteLength(arg)}\r\n${arg}\r\n`
|
|
} else {
|
|
client._strCache += `$${arg.length}\r\n`
|
|
queue.push(client._strCache)
|
|
client._strCache = ''
|
|
queue.push(arg)
|
|
client._strCache += '\r\n'
|
|
}
|
|
debug('sendCommand: buffer send %s bytes', arg.length)
|
|
}
|
|
queue.push(client._strCache)
|
|
client._strCache = ''
|
|
}
|
|
|
|
function toString(arg) {
|
|
if (typeof arg === 'string') {
|
|
copy.push(arg)
|
|
} else if (typeof arg === 'number') {
|
|
copy.push(`${arg}`)
|
|
} else if (arg instanceof Array) {
|
|
for (let i = 0; i < arg.length; i += 1) {
|
|
toString(arg[i])
|
|
}
|
|
} else if (arg && arg.constructor.name === 'Buffer') { // TODO: check performance
|
|
copy.push(arg)
|
|
bufferCount++
|
|
} else if (typeof arg === 'boolean') { // TODO: Remove this support and use hooks instead
|
|
copy.push(`${arg}`)
|
|
} else if (arg && arg.constructor.name === 'Object') { // Check if this is actually a good check or not
|
|
// TODO: As soon as we add support for JSON
|
|
// We could simple stringify this right here.
|
|
// This might interfere with nested Objects though.
|
|
// So we should only do this for the first level.
|
|
const keys = Object.keys(arg)
|
|
for (var j = 0; j < keys.length; j++) {
|
|
copy.push(keys[j])
|
|
toString(arg[keys[j]])
|
|
}
|
|
} else if (arg instanceof Map) {
|
|
arg.forEach((val, key) => {
|
|
toString(key)
|
|
toString(val)
|
|
})
|
|
} else if (arg instanceof Set) {
|
|
arg.forEach(val => toString(val))
|
|
} else if (arg && arg.constructor.name === 'Date') { // Check if this is actually a good check or not
|
|
copy.push(arg.toString())
|
|
} else {
|
|
if (errors === null) {
|
|
errors = []
|
|
}
|
|
errors.push(arg)
|
|
}
|
|
}
|
|
|
|
function returnErr(client, command) {
|
|
const err = new TypeError('NodeRedis can not handle the provided arguments (see "error.issues" property).\n\nFurther information https://github.com/asd')
|
|
err.command = command.command.toUpperCase()
|
|
err.args = command.args
|
|
err.issues = errors
|
|
errors = null
|
|
utils.replyInOrder(client, command.callback, err, undefined, client.commandQueue)
|
|
}
|
|
|
|
// Always use 'Multi bulk commands', but if passed any Buffer args, then do
|
|
// multiple writes, one for each arg. This means that using Buffers in commands
|
|
// is going to be slower, so use Strings if you don't already have a Buffer.
|
|
|
|
// TODO: It is faster to move this part somewhere else
|
|
// We could move this to the function creation as well
|
|
// if we use hooks for our individual commands!
|
|
function normalizeAndWrite(client, command) {
|
|
const args = command.args
|
|
const origName = command.command
|
|
const renameCommands = client._options.renameCommands
|
|
const name = renameCommands !== undefined && renameCommands[origName] !== undefined
|
|
? renameCommands[origName]
|
|
: origName
|
|
|
|
bufferCount = 0
|
|
for (let i = 0; i < args.length; i++) {
|
|
toString(args[i])
|
|
}
|
|
|
|
if (errors) {
|
|
return returnErr(client, command)
|
|
}
|
|
|
|
if (typeof client._options.prefix === 'string') {
|
|
const prefixKeys = Commands.getKeyIndexes(origName, copy)
|
|
prefixKeys.forEach((i) => {
|
|
// Attention it would be to expensive to detect if the input is non utf8 Buffer
|
|
// In that case the prefix *might* destroys user information
|
|
copy[i] = client._options.prefix + copy[i]
|
|
})
|
|
}
|
|
|
|
const bufferArgs = bufferCount !== 0
|
|
const len = copy.length
|
|
let commandStr = `*${len + 1}\r\n$${name.length}\r\n${name}\r\n`
|
|
|
|
command.bufferArgs = bufferArgs
|
|
command.argsLength = len
|
|
const queue = client._pipelineQueue
|
|
|
|
if (bufferArgs === false) {
|
|
while (copy.length) {
|
|
const arg = copy.shift()
|
|
commandStr += `$${Buffer.byteLength(arg)}\r\n${arg}\r\n`
|
|
}
|
|
debug('Send %s id %s: %s', client.address, client.connectionId, commandStr)
|
|
client._strCache += commandStr
|
|
if (client._strCache.length > 10 * 1024 * 1024) {
|
|
queue.push(client._strCache)
|
|
client._strCache = ''
|
|
}
|
|
} else {
|
|
pipelineBuffers(client, commandStr)
|
|
}
|
|
write(client)
|
|
}
|
|
|
|
module.exports = normalizeAndWrite
|