import RedisSocket, { RedisSocketOptions } from './socket'; import RedisCommandsQueue, { AddCommandOptions } from './commands-queue'; import COMMANDS from './commands/client'; import { RedisCommand, RedisModules, RedisModule, RedisReply } from './commands'; import RedisMultiCommand, { MultiQueuedCommand, RedisMultiCommandType } from './multi-command'; import EventEmitter from 'events'; export interface RedisClientOptions { socket?: RedisSocketOptions; modules?: M; } export type RedisCommandSignature = (...args: Parameters) => Promise>; type WithCommands = { [P in keyof typeof COMMANDS]: RedisCommandSignature<(typeof COMMANDS)[P]>; }; type WithModules> = { [P in keyof M[number]]: RedisCommandSignature; }; type WithMulti> = { multi(): RedisMultiCommandType }; export type RedisClientType = WithCommands & WithModules & WithMulti & RedisClient; export default class RedisClient extends EventEmitter { static defineCommand(on: any, name: string, command: RedisCommand) { on[name] = async function (...args: Array): Promise { return command.transformReply( await this.sendCommand(command.transformArguments(...args)) ); }; } static create(options?: RedisClientOptions): RedisClientType { return new RedisClient(options); } readonly #socket: RedisSocket; readonly #queue: RedisCommandsQueue; readonly #Multi; readonly #modules?: RedisModules; constructor(options?: RedisClientOptions) { super(); this.#socket = this.#initiateSocket(options?.socket); this.#queue = this.#initiateQueue(); this.#Multi = this.#initiateMulti(); this.#modules = this.#initiateModules(options?.modules); } #initiateSocket(socketOptions?: RedisSocketOptions): RedisSocket { const socketInitiator = async (): Promise => { if (socketOptions?.password) { await (this as any).auth(socketOptions); } }; return new RedisSocket(socketInitiator, socketOptions) .on('data', data => this.#queue.parseResponse(data)) .on('error', err => this.emit('error', err)); } #initiateQueue(): RedisCommandsQueue { return new RedisCommandsQueue((encodedCommands: string) => this.#socket.write(encodedCommands)); } #initiateMulti() { const executor = async (commands: Array): Promise> => { const promise = Promise.all( commands.map(({encodedCommand}) => { return this.#queue.addEncodedCommand(encodedCommand); }) ); this.#tick(); return promise; }; const modules = this.#modules; return class extends RedisMultiCommand { constructor() { super(executor, modules); } }; } #initiateModules(modules?: RedisModules): RedisModules | undefined { if (!modules) return; for (const m of modules) { for (const [name, command] of Object.entries(m)) { RedisClient.defineCommand(this, name, command); this.#Multi.defineCommand(this.#Multi, name, command); } } return modules; } async connect() { await this.#socket.connect(); this.#tick(); } sendCommand(args: Array, options?: AddCommandOptions): Promise { const promise = this.#queue.addCommand(args, options); this.#tick(); return promise; } multi() { return new this.#Multi(); } disconnect(): Promise { return this.#socket.disconnect(); } #tick(chunkRecommendedSize: number = this.#socket.chunkRecommendedSize): void { if (!chunkRecommendedSize) { return; } // TODO: batch using process.nextTick? maybe socket.setNoDelay(false)? const isBuffering = this.#queue.executeChunk(chunkRecommendedSize); if (isBuffering === true) { this.#socket.once('drain', () => this.#tick()); } else if (isBuffering === false) { this.#tick(); } } } for (const [name, command] of Object.entries(COMMANDS)) { RedisClient.defineCommand(RedisClient.prototype, name, command); }