From e04609cc3ae76558a787bf0a77c1910344429d3f Mon Sep 17 00:00:00 2001 From: Leibale Date: Sun, 30 Apr 2023 10:46:43 -0400 Subject: [PATCH] WIP --- packages/client/index.ts | 1 + packages/client/lib/RESP/decoder.ts | 86 ++--- packages/client/lib/RESP/types.ts | 60 ++-- packages/client/lib/client/commands-queue.ts | 6 +- packages/client/test.mjs | 338 ------------------- 5 files changed, 77 insertions(+), 414 deletions(-) delete mode 100644 packages/client/test.mjs diff --git a/packages/client/index.ts b/packages/client/index.ts index 7bb2f23946..c9b93f3377 100644 --- a/packages/client/index.ts +++ b/packages/client/index.ts @@ -1,4 +1,5 @@ export { RedisModules, RedisFunctions, RedisScripts, RespVersions } from './lib/RESP/types'; +export { RESP_TYPES } from './lib/RESP/decoder'; export { VerbatimString } from './lib/RESP/verbatim-string'; export { defineScript } from './lib/lua-script'; // export * from './lib/errors'; diff --git a/packages/client/lib/RESP/decoder.ts b/packages/client/lib/RESP/decoder.ts index 49f04a2191..afac0356a4 100644 --- a/packages/client/lib/RESP/decoder.ts +++ b/packages/client/lib/RESP/decoder.ts @@ -4,7 +4,7 @@ import { SimpleError, BlobError, ErrorReply } from '../errors'; import { Flags } from './types'; // https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md -export const TYPES = { +export const RESP_TYPES = { NULL: 95, // _ BOOLEAN: 35, // # NUMBER: 58, // : @@ -35,7 +35,7 @@ const ASCII = { } as const; export const PUSH_FLAGS = { - [TYPES.BLOB_STRING]: Buffer + [RESP_TYPES.BLOB_STRING]: Buffer }; // this was written with performance in mind, so it's not very readable... sorry :( @@ -98,98 +98,98 @@ export class Decoder { private _decodeTypeValue(type, chunk) { switch (type) { - case TYPES.NULL: + case RESP_TYPES.NULL: this._config.onReply(this._decodeNull()); return false; - case TYPES.BOOLEAN: + case RESP_TYPES.BOOLEAN: return this._handleDecodedValue( this._config.onReply, this._decodeBoolean(chunk) ); - case TYPES.NUMBER: + case RESP_TYPES.NUMBER: return this._handleDecodedValue( this._config.onReply, this._decodeNumber(chunk) ); - case TYPES.BIG_NUMBER: + case RESP_TYPES.BIG_NUMBER: return this._handleDecodedValue( this._config.onReply, this._decodeBigNumber( - this._config.getFlags()[TYPES.BIG_NUMBER], + this._config.getFlags()[RESP_TYPES.BIG_NUMBER], chunk ) ); - case TYPES.DOUBLE: + case RESP_TYPES.DOUBLE: return this._handleDecodedValue( this._config.onReply, this._decodeDouble( - this._config.getFlags()[TYPES.DOUBLE], + this._config.getFlags()[RESP_TYPES.DOUBLE], chunk ) ); - case TYPES.SIMPLE_STRING: + case RESP_TYPES.SIMPLE_STRING: return this._handleDecodedValue( this._config.onReply, this._decodeSimpleString( - this._config.getFlags()[TYPES.SIMPLE_STRING], + this._config.getFlags()[RESP_TYPES.SIMPLE_STRING], chunk ) ); - case TYPES.BLOB_STRING: + case RESP_TYPES.BLOB_STRING: return this._handleDecodedValue( this._config.onReply, this._decodeBlobString( - this._config.getFlags()[TYPES.BLOB_STRING], + this._config.getFlags()[RESP_TYPES.BLOB_STRING], chunk ) ); - case TYPES.VERBATIM_STRING: + case RESP_TYPES.VERBATIM_STRING: return this._handleDecodedValue( this._config.onReply, this._decodeVerbatimString( - this._config.getFlags()[TYPES.VERBATIM_STRING], + this._config.getFlags()[RESP_TYPES.VERBATIM_STRING], chunk ) ); - case TYPES.SIMPLE_ERROR: + case RESP_TYPES.SIMPLE_ERROR: return this._handleDecodedValue( this._config.onErrorReply, this._decodeSimpleError(chunk) ); - case TYPES.BLOB_ERROR: + case RESP_TYPES.BLOB_ERROR: return this._handleDecodedValue( this._config.onErrorReply, this._decodeBlobError(chunk) ); - case TYPES.ARRAY: + case RESP_TYPES.ARRAY: return this._handleDecodedValue( this._config.onReply, this._decodeArray(this._config.getFlags(), chunk) ); - case TYPES.SET: + case RESP_TYPES.SET: return this._handleDecodedValue( this._config.onReply, this._decodeSet(this._config.getFlags(), chunk) ); - case TYPES.MAP: + case RESP_TYPES.MAP: return this._handleDecodedValue( this._config.onReply, this._decodeMap(this._config.getFlags(), chunk) ); - case TYPES.PUSH: + case RESP_TYPES.PUSH: return this._handleDecodedValue( this._config.onPush, this._decodeArray(PUSH_FLAGS, chunk) @@ -664,43 +664,43 @@ export class Decoder { private _decodeNestedTypeValue(type, flags, chunk) { switch (type) { - case TYPES.NULL: + case RESP_TYPES.NULL: return this._decodeNull(); - case TYPES.BOOLEAN: + case RESP_TYPES.BOOLEAN: return this._decodeBoolean(chunk); - case TYPES.NUMBER: + case RESP_TYPES.NUMBER: return this._decodeNumber(chunk); - case TYPES.BIG_NUMBER: - return this._decodeBigNumber(flags[TYPES.BIG_NUMBER], chunk); + case RESP_TYPES.BIG_NUMBER: + return this._decodeBigNumber(flags[RESP_TYPES.BIG_NUMBER], chunk); - case TYPES.DOUBLE: - return this._decodeDouble(flags[TYPES.DOUBLE], chunk); + case RESP_TYPES.DOUBLE: + return this._decodeDouble(flags[RESP_TYPES.DOUBLE], chunk); - case TYPES.SIMPLE_STRING: - return this._decodeSimpleString(flags[TYPES.SIMPLE_STRING], chunk); + case RESP_TYPES.SIMPLE_STRING: + return this._decodeSimpleString(flags[RESP_TYPES.SIMPLE_STRING], chunk); - case TYPES.BLOB_STRING: - return this._decodeBlobString(flags[TYPES.BLOB_STRING], chunk); + case RESP_TYPES.BLOB_STRING: + return this._decodeBlobString(flags[RESP_TYPES.BLOB_STRING], chunk); - case TYPES.VERBATIM_STRING: - return this._decodeVerbatimString(flags[TYPES.VERBATIM_STRING], chunk); + case RESP_TYPES.VERBATIM_STRING: + return this._decodeVerbatimString(flags[RESP_TYPES.VERBATIM_STRING], chunk); - case TYPES.SIMPLE_ERROR: + case RESP_TYPES.SIMPLE_ERROR: return this._decodeSimpleError(chunk); - case TYPES.BLOB_ERROR: + case RESP_TYPES.BLOB_ERROR: return this._decodeBlobError(chunk); - case TYPES.ARRAY: + case RESP_TYPES.ARRAY: return this._decodeArray(flags, chunk); - case TYPES.SET: + case RESP_TYPES.SET: return this._decodeSet(flags, chunk); - case TYPES.MAP: + case RESP_TYPES.MAP: return this._decodeMap(flags, chunk); } } @@ -805,7 +805,7 @@ export class Decoder { } private _decodeSetItems(length, flags, chunk) { - return flags[TYPES.SET] === Set ? + return flags[RESP_TYPES.SET] === Set ? this._decodeSetAsSet( new Set(), length, @@ -888,7 +888,7 @@ export class Decoder { } private _decodeMapItems(length, flags, chunk) { - switch (flags[TYPES.MAP]) { + switch (flags[RESP_TYPES.MAP]) { case Map: return this._decodeMapAsMap( new Map(), @@ -978,11 +978,11 @@ export class Decoder { private _decodeMapKeyValue(type, flags, chunk) { switch (type) { // decode simple string map key as string (and not as buffer) - case TYPES.SIMPLE_STRING: + case RESP_TYPES.SIMPLE_STRING: return this._decodeSimpleString(String, chunk); // decode blob string map key as string (and not as buffer) - case TYPES.BLOB_STRING: + case RESP_TYPES.BLOB_STRING: return this._decodeBlobString(String, chunk); default: diff --git a/packages/client/lib/RESP/types.ts b/packages/client/lib/RESP/types.ts index 88ca88606d..d6b11db72b 100644 --- a/packages/client/lib/RESP/types.ts +++ b/packages/client/lib/RESP/types.ts @@ -1,13 +1,13 @@ import { RedisScriptConfig, SHA1 } from '../lua-script'; -import { TYPES } from './decoder'; +import { RESP_TYPES } from './decoder'; import { VerbatimString } from './verbatim-string'; -export type RespTypes = typeof TYPES; +export type RESP_TYPES = typeof RESP_TYPES; -export type RespTypesUnion = RespTypes[keyof RespTypes]; +export type RespTypes = RESP_TYPES[keyof RESP_TYPES]; type RespType< - RESP_TYPE extends RespTypesUnion, + RESP_TYPE extends RespTypes, DEFAULT, TYPES = never, FLAG_TYPES = DEFAULT | TYPES @@ -19,19 +19,19 @@ type RespType< }; export type NullReply = RespType< - RespTypes['NULL'], + RESP_TYPES['NULL'], null >; export type BooleanReply< T extends boolean = boolean > = RespType< - RespTypes['BOOLEAN'], + RESP_TYPES['BOOLEAN'], T >; export type NumberReply< T extends number = number > = RespType< - RespTypes['NUMBER'], + RESP_TYPES['NUMBER'], T, `${T}`, number | string @@ -39,7 +39,7 @@ export type NumberReply< export type BigNumberReply< T extends bigint = bigint > = RespType< - RespTypes['BIG_NUMBER'], + RESP_TYPES['BIG_NUMBER'], T, number | `${T}`, bigint | number | string @@ -47,7 +47,7 @@ export type BigNumberReply< export type DoubleReply< T extends number = number > = RespType< - RespTypes['DOUBLE'], + RESP_TYPES['DOUBLE'], T, `${T}`, number | string @@ -55,7 +55,7 @@ export type DoubleReply< export type SimpleStringReply< T extends string = string > = RespType< - RespTypes['SIMPLE_STRING'], + RESP_TYPES['SIMPLE_STRING'], T, Buffer, string | Buffer @@ -63,7 +63,7 @@ export type SimpleStringReply< export type BlobStringReply< T extends string = string > = RespType< - RespTypes['BLOB_STRING'], + RESP_TYPES['BLOB_STRING'], T, Buffer, string | Buffer @@ -71,39 +71,39 @@ export type BlobStringReply< export type VerbatimStringReply< T extends string = string > = RespType< - RespTypes['VERBATIM_STRING'], + RESP_TYPES['VERBATIM_STRING'], T, Buffer | VerbatimString, string | Buffer | VerbatimString >; export type SimpleErrorReply = RespType< - RespTypes['SIMPLE_ERROR'], + RESP_TYPES['SIMPLE_ERROR'], Buffer >; export type BlobErrorReply = RespType< - RespTypes['BLOB_ERROR'], + RESP_TYPES['BLOB_ERROR'], Buffer >; export type ArrayReply = RespType< - RespTypes['ARRAY'], + RESP_TYPES['ARRAY'], Array, never, Array >; export type TuplesReply]> = RespType< - RespTypes['ARRAY'], + RESP_TYPES['ARRAY'], T, never, Array >; export type SetReply = RespType< - RespTypes['SET'], + RESP_TYPES['SET'], Array, Set, Array | Set >; export type MapReply = RespType< - RespTypes['MAP'], + RESP_TYPES['MAP'], { [key: string]: V }, Map | Array, Map | Array @@ -114,7 +114,7 @@ type MapKeyValue = [key: BlobStringReply, value: unknown]; type MapTuples = Array; export type TuplesToMapReply = RespType< - RespTypes['MAP'], + RESP_TYPES['MAP'], { [P in T[number] as P[0] extends BlobStringReply ? S : never]: P[1]; }, @@ -134,16 +134,16 @@ type FlattenTuples = ( export type ReplyUnion = NullReply | BooleanReply | NumberReply | BigNumberReply | DoubleReply | SimpleStringReply | BlobStringReply | VerbatimStringReply | SimpleErrorReply | BlobErrorReply | // cannot reuse ArrayReply, SetReply and MapReply because of circular reference RespType< - RespTypes['ARRAY'], + RESP_TYPES['ARRAY'], Array > | RespType< - RespTypes['SET'], + RESP_TYPES['SET'], Array, Set > | RespType< - RespTypes['MAP'], + RESP_TYPES['MAP'], { [key: string]: ReplyUnion }, Map | Array >; @@ -152,10 +152,10 @@ export type Reply = ReplyWithFlags; export type Flag = ((...args: any) => T) | (new (...args: any) => T); -type RespTypeUnion = T extends RespType ? FLAG_TYPES : never; +type RespTypeUnion = T extends RespType ? FLAG_TYPES : never; export type Flags = { - [P in RespTypesUnion]?: Flag>>>; + [P in RespTypes]?: Flag>>>; }; type MapKey< @@ -163,8 +163,8 @@ type MapKey< FLAGS extends Flags > = ReplyWithFlags; export type ReplyWithFlags< @@ -299,13 +299,13 @@ type Resp2Array = ( export type Resp2Reply = ( RESP3REPLY extends RespType ? // TODO: RESP3 only scalar types - RESP_TYPE extends RespTypes['DOUBLE'] ? BlobStringReply : - RESP_TYPE extends RespTypes['ARRAY'] | RespTypes['SET'] ? RespType< + RESP_TYPE extends RESP_TYPES['DOUBLE'] ? BlobStringReply : + RESP_TYPE extends RESP_TYPES['ARRAY'] | RESP_TYPES['SET'] ? RespType< RESP_TYPE, Resp2Array > : - RESP_TYPE extends RespTypes['MAP'] ? RespType< - RespTypes['ARRAY'], + RESP_TYPE extends RESP_TYPES['MAP'] ? RespType< + RESP_TYPES['ARRAY'], Resp2Array>> > : RespType< diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 9d215799e0..69ba57425a 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,6 +1,6 @@ import * as LinkedList from 'yallist'; import encodeCommand from '../RESP/encoder'; -import { Decoder, PUSH_FLAGS, TYPES } from '../RESP/decoder'; +import { Decoder, PUSH_FLAGS, RESP_TYPES } from '../RESP/decoder'; import { CommandArguments, Flags, ReplyUnion, RespVersions } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply } from '../errors'; @@ -32,7 +32,7 @@ const PONG = Buffer.from('pong'); const RESP2_PUSH_FLAGS = { ...PUSH_FLAGS, - [TYPES.SIMPLE_STRING]: Buffer + [RESP_TYPES.SIMPLE_STRING]: Buffer }; export default class RedisCommandsQueue { @@ -125,7 +125,7 @@ export default class RedisCommandsQueue { if (PONG.equals(reply[0] as Buffer)) { const { resolve, flags } = this._waitingForReply.shift()!, buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer; - resolve(flags?.[TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString()); + resolve(flags?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString()); return; } } diff --git a/packages/client/test.mjs b/packages/client/test.mjs deleted file mode 100644 index f50279ece2..0000000000 --- a/packages/client/test.mjs +++ /dev/null @@ -1,338 +0,0 @@ -import { setTimeout } from 'timers/promises'; -import { createClient, defineScript, createCluster } from './dist/index.js'; -import { TYPES } from './dist/lib/RESP/decoder.js'; - -async function client() { - console.log(`!!! CLIENT !!!`); - - const client = createClient({ - RESP, - isolationPoolOptions: { - max: 5 - }, - modules: { - /** - * module jsdoc - */ - module: { - /** - * module ping jsdoc - */ - ping: { - /** - * @param {string} [message] - */ - transformArguments(message) { - const args = ['PING']; - if (message) { - args.push(message); - } - - return args; - }, - /** - * @callback PingReply - * @returns {import('./lib/RESP/types').SimpleStringReply} - * - * @type {PingReply} - */ - transformReply: undefined - }, - /** - * module square jsdoc - */ - square: { - /** - * @param {number} number - */ - transformArguments(number) { - return ['FCALL_RO', 'square', '0', number.toString()]; - }, - /** - * @callback SquareResp2 - * @returns {import('./lib/RESP/types').BlobStringReply} - * - * @callback SquareResp3 - * @returns {import('./lib/RESP/types').DoubleReply} - * - * @type {{ 2: SquareResp2, 3: SquareResp3 }} - */ - transformReply: undefined - } - } - }, - functions: { - /** - * library jsdoc - */ - library: { - /** - * library square jsdoc - */ - square: { - IS_READ_ONLY: true, - NUMBER_OF_KEYS: 0, - /** - * @param {number} number - */ - transformArguments(number) { - return [number.toString()]; - }, - /** - * @callback SquareResp2 - * @returns {import('./lib/RESP/types').BlobStringReply} - * - * @callback SquareResp3 - * @returns {import('./lib/RESP/types').DoubleReply} - * - * @type {{ 2: SquareResp2, 3: SquareResp3 }} - */ - transformReply: undefined - } - } - }, - scripts: { - /** - * square jsdoc - */ - square: defineScript({ - SCRIPT: 'return { double = ARGV[1] * ARGV[1] };', - NUMBER_OF_KEYS: 0, - /** - * @param {number} number - */ - transformArguments(number) { - return [number.toString()]; - }, - - /** - * @callback SquareResp2 - * @returns {import('./lib/RESP/types').BlobStringReply} - * - * @callback SquareResp3 - * @returns {import('./lib/RESP/types').DoubleReply} - * - * @type {{ 2: SquareResp2, 3: SquareResp3 }} - */ - transformReply: undefined - }) - } - }); - - const multi = client.multi() - .get('a') - .set('a', 'b'); - - for (let i = 0; i< 10; i++) { - multi.incr('a'); - } - - const result = await multi.exec(); - - const bufferClient = client.withFlags({ - [TYPES.SIMPLE_STRING]: Buffer, - [TYPES.BLOB_STRING]: Buffer - }); - - client.on('error', err => console.error(err)); - - await client.connect(); - - client.ping() - - // console.log( - // 'SCAN', - // await client.scan(0), - // await bufferClient.scan(0) - // ); - - // const fn = - // `#!LUA name=math - // redis.register_function{ - // function_name = "square", - // callback = function(keys, args) return { double = args[1] * args[1] } end, - // flags = { "no-writes" } - // }`; - - // await client.sendCommand(['FLUSHALL']); - // await client.sendCommand(['FUNCTION', 'LOAD', 'REPLACE', fn]); - - // console.log( - // 'info:\n', - // await client.info(), - // 'info with flags:\n', - // await client.withFlags({ - // [TYPES.VERBATIM_STRING]: VerbatimString - // }).info(), - // ); - - // console.log( - // 'client.module.square (module):', - // await client.module.square(1), - // await client.withFlags({ - // [TYPES.DOUBLE]: String - // }).module.square(1) - // ); - - // console.log( - // 'client.library.square (function):', - // await client.library.square(2), - // await client.withFlags({ - // [TYPES.DOUBLE]: String - // }).library.square(2) - // ); - - // console.log( - // 'client.square (script):', - // await client.square(4), - // await client.withFlags({ - // [TYPES.DOUBLE]: String - // }).square(4) - // ); - - // console.log( - // 'MULTI', - // await client.multi() - // .ping() - // .module.ping() - // .library.square(2) - // .square(4) - // .exec() - // ); - - // console.log( - // 'SET key value', - // await client.set('key', 'value'), - // ); - - // console.log( - // 'GET key', - // await client.get('key'), - // ); - - // console.log( - // 'GET key (bufferClient)', - // await bufferClient.get('key'), - // ); - - // console.log( - // 'sendCommand DEL key', - // await client.sendCommand(['DEL', 'key']) - // ); - - // console.log( - // 'HSET key field value', - // await client.hSet('key', 'field', 'value') - // ); - - // console.log( - // 'HGET key field', - // await client.hGet('key', 'field') - // ); - - // console.log( - // 'HGETALL key', - // await client.hGetAll('key') - // ); - - // console.log( - // 'HGETALL key (bufferClient)', - // await bufferClient.hGetAll('key') - // ); - - // console.log( - // 'CLIENT ID', - // await client.sendCommand(['CLIENT', 'ID']), - // ); - - // await client.subscribe('channel', message => { - // console.log('channel', message); - // }); - - // let publisherClient; - // if (RESP !== 3) { - // publisherClient = client.duplicate(); - // publisherClient.on('error', err => console.error('PubSubClient error', err)); - - // await publisherClient.connect(); - // } - - // const TIMES = 3; - // console.log( - // `[PUBLISH channel ] [PING ] * ${TIMES}`, - // await Promise.all( - // Array.from({ length: 5 }).map((_, i) => - // Promise.all([ - // (publisherClient ?? client).sendCommand(['PUBLISH', 'channel', i.toString()]).catch(), - // client.ping(i.toString()), - // client.isolated().clientId(), - // client.executeIsolated(client => client.clientId()) - // ]) - // ) - // ) - // ); - - const entries = Array.from({ length: 100 }).map((_, i) => ['{a}' + i.toString(), i.toString()]) - - await client.mSet(entries); - for await (const key of client.scanIterator()) { - console.log('SCAN', key); - } - - await client.hSet('hash', entries.flat()); - for await (const entry of client.hScanIterator('hash')) { - console.log('HSCAN', entry) - } - - await Promise.all([ - // publisherClient?.disconnect(), - client.disconnect() - ]); -} - -async function cluster() { - console.log(`!!! CLUSTER !!!`); - - const cluster = createCluster({ - rootNodes: [{}], - RESP - }); - cluster.on('error', err => console.error(err)); - - await cluster.connect(); - - console.log( - 'SET key value', - await cluster.set('key', 'value') - ); - - console.log( - 'GET key', - await cluster.get('key') - ); - - await cluster.subscribe('channel', message => { - console.log('(cluster) channel', message); - }); - - const CLUSTER_TIMES = 3; - console.log( - `[PUBLISH channel ] [PING ] * ${CLUSTER_TIMES}`, - await Promise.all( - Array.from({ length: 5 }).map(async (_, i) => { - const client = await cluster.nodeClient(cluster.getRandomNode()); - return client.sendCommand(['PUBLISH', 'channel', i.toString()]); - }) - ) - ); - - // wait for messages - await setTimeout(1000); - - await cluster.disconnect(); -} - -const RESP = 3; - -await client(); -// await cluster();