1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00
This commit is contained in:
Leibale
2023-06-07 11:41:23 -04:00
parent eb47bb03fc
commit 05f9f0ee0d
31 changed files with 390 additions and 386 deletions

View File

@@ -90,5 +90,5 @@ await client.connect();
// Add your example code here...
await client.quit();
client.destroy();
```

View File

@@ -27,4 +27,4 @@ console.log('blpopPromise resolved');
// {"key":"keyName","element":"value"}
console.log(`listItem is '${JSON.stringify(listItem)}'`);
await client.quit();
client.destroy();

View File

@@ -77,4 +77,4 @@ const info = await client.bf.info('mybloom');
// }
console.log(info);
await client.quit();
client.destroy();

View File

@@ -25,4 +25,4 @@ console.log('Afer connectPromise has resolved...');
// isReady will return True here, client is ready to use.
console.log(`client.isOpen: ${client.isOpen}, client.isReady: ${client.isReady}`);
await client.quit();
client.destroy();

View File

@@ -1,25 +1,31 @@
// Define a custom script that shows example of SET command
// with several modifiers.
import { createClient } from 'redis';
import { createClient } from '../packages/client';
const client = createClient();
await client.connect();
await client.del('mykey');
let result = await client.set('mykey', 'myvalue', {
EX: 60,
GET: true
});
console.log(
await client.set('mykey', 'myvalue', {
expiration: {
type: 'EX',
value: 60
},
GET: true
})
); // null
console.log(result); //null
console.log(
await client.set('mykey', 'newvalue', {
expiration: {
type: 'EX',
value: 60
},
GET: true
})
); // 'myvalue'
result = await client.set('mykey', 'newvalue', {
EX: 60,
GET: true
});
console.log(result); //myvalue
await client.quit();
await client.close();

View File

@@ -23,4 +23,4 @@ try {
console.log(`GET command failed: ${e.message}`);
}
await client.quit();
client.destroy();

View File

@@ -77,4 +77,4 @@ console.log('Count-Min Sketch info:');
// }
console.log(info);
await client.quit();
client.destroy();

View File

@@ -76,4 +76,4 @@ const info = await client.cf.info('mycuckoo');
// }
console.log(info);
await client.quit();
client.destroy();

View File

@@ -9,4 +9,4 @@ const serverTime = await client.time();
// 2022-02-25T12:57:40.000Z { microseconds: 351346 }
console.log(serverTime);
await client.quit();
client.destroy();

View File

@@ -48,4 +48,4 @@ try {
console.error(e);
}
await client.quit();
client.destroy();

View File

@@ -24,4 +24,4 @@ await client.connect();
await client.set('mykey', '5');
console.log(await client.mincr('mykey', 'myotherkey', 10)); // [ 15, 10 ]
await client.quit();
client.destroy();

View File

@@ -73,4 +73,4 @@ const numPets = await client.json.arrLen('noderedis:jsondata', '$.pets');
// We now have 4 pets.
console.log(`We now have ${numPets} pets.`);
await client.quit();
client.destroy();

View File

@@ -85,4 +85,4 @@ for (const doc of results.documents) {
console.log(`${doc.id}: ${doc.value.name}, ${doc.value.age} years old.`);
}
await client.quit();
client.destroy();

View File

@@ -145,4 +145,4 @@ console.log(
// ]
// }
await client.quit();
client.destroy();

View File

@@ -88,4 +88,4 @@ console.log(JSON.stringify(results, null, 2));
// }
// ]
// }
await client.quit();
client.destroy();

View File

@@ -12,4 +12,4 @@ for await (const member of client.sScanIterator(setName)) {
console.log(member);
}
await client.quit();
client.destroy();

View File

@@ -28,4 +28,4 @@ for await (const memberWithScore of client.zScanIterator('mysortedset')) {
console.log(memberWithScore);
}
await client.quit();
client.destroy();

View File

@@ -47,4 +47,4 @@ console.log(`Length of mystream: ${await client.xLen('mystream')}.`);
// Should be approximately 1000:
console.log(`Length of mytrimmedstream: ${await client.xLen('mytrimmedstream')}.`);
await client.quit();
client.destroy();

View File

@@ -119,4 +119,4 @@ try {
console.error(e);
}
await client.quit();
client.destroy();

View File

@@ -110,4 +110,4 @@ const [ simonCount, lanceCount ] = await client.topK.count('mytopk', [
console.log(`Count estimate for simon: ${simonCount}.`);
console.log(`Count estimate for lance: ${lanceCount}.`);
await client.quit();
client.destroy();

View File

@@ -37,4 +37,4 @@ console.log(responses);
// Clean up fixtures.
await client.del(['hash1', 'hash2', 'hash3']);
await client.quit();
client.destroy();

View File

@@ -1,7 +1,7 @@
// @ts-nocheck
import { VerbatimString } from './verbatim-string';
import { SimpleError, BlobError, ErrorReply } from '../errors';
import { Flags } from './types';
import { TypeMapping } from './types';
// https://github.com/redis/redis-specifications/blob/master/protocol/RESP3.md
export const RESP_TYPES = {
@@ -34,7 +34,7 @@ const ASCII = {
'e': 101
} as const;
export const PUSH_FLAGS = {
export const PUSH_TYPE_MAPPING = {
[RESP_TYPES.BLOB_STRING]: Buffer
};
@@ -45,7 +45,7 @@ interface DecoderOptions {
onReply(reply: any): unknown;
onErrorReply(err: ErrorReply): unknown;
onPush(push: Array<any>): unknown;
getFlags(): Flags;
getTypeMapping(): TypeMapping;
}
export class Decoder {
@@ -118,7 +118,7 @@ export class Decoder {
return this._handleDecodedValue(
this._config.onReply,
this._decodeBigNumber(
this._config.getFlags()[RESP_TYPES.BIG_NUMBER],
this._config.getTypeMapping()[RESP_TYPES.BIG_NUMBER],
chunk
)
);
@@ -127,7 +127,7 @@ export class Decoder {
return this._handleDecodedValue(
this._config.onReply,
this._decodeDouble(
this._config.getFlags()[RESP_TYPES.DOUBLE],
this._config.getTypeMapping()[RESP_TYPES.DOUBLE],
chunk
)
);
@@ -136,7 +136,7 @@ export class Decoder {
return this._handleDecodedValue(
this._config.onReply,
this._decodeSimpleString(
this._config.getFlags()[RESP_TYPES.SIMPLE_STRING],
this._config.getTypeMapping()[RESP_TYPES.SIMPLE_STRING],
chunk
)
);
@@ -145,7 +145,7 @@ export class Decoder {
return this._handleDecodedValue(
this._config.onReply,
this._decodeBlobString(
this._config.getFlags()[RESP_TYPES.BLOB_STRING],
this._config.getTypeMapping()[RESP_TYPES.BLOB_STRING],
chunk
)
);
@@ -154,7 +154,7 @@ export class Decoder {
return this._handleDecodedValue(
this._config.onReply,
this._decodeVerbatimString(
this._config.getFlags()[RESP_TYPES.VERBATIM_STRING],
this._config.getTypeMapping()[RESP_TYPES.VERBATIM_STRING],
chunk
)
);
@@ -174,26 +174,29 @@ export class Decoder {
case RESP_TYPES.ARRAY:
return this._handleDecodedValue(
this._config.onReply,
this._decodeArray(this._config.getFlags(), chunk)
this._decodeArray(this._config.getTypeMapping(), chunk)
);
case RESP_TYPES.SET:
return this._handleDecodedValue(
this._config.onReply,
this._decodeSet(this._config.getFlags(), chunk)
this._decodeSet(this._config.getTypeMapping(), chunk)
);
case RESP_TYPES.MAP:
return this._handleDecodedValue(
this._config.onReply,
this._decodeMap(this._config.getFlags(), chunk)
this._decodeMap(this._config.getTypeMapping(), chunk)
);
case RESP_TYPES.PUSH:
return this._handleDecodedValue(
this._config.onPush,
this._decodeArray(PUSH_FLAGS, chunk)
this._decodeArray(PUSH_TYPE_MAPPING, chunk)
);
default:
throw new Error(`Unknown RESP type ${type} "${String.fromCharCode(type)}"`);
}
}
@@ -269,8 +272,8 @@ export class Decoder {
return this._decodeUnsingedNumber.bind(this, number);
}
private _decodeBigNumber(flag, chunk) {
if (flag === String) {
private _decodeBigNumber(type, chunk) {
if (type === String) {
return this._decodeSimpleString(String, chunk);
}
@@ -319,8 +322,8 @@ export class Decoder {
return this._decodeUnsingedBigNumber.bind(this, bigNumber);
}
private _decodeDouble(flag, chunk) {
if (flag === String) {
private _decodeDouble(type, chunk) {
if (type === String) {
return this._decodeSimpleString(String, chunk);
}
@@ -464,38 +467,38 @@ export class Decoder {
return cursor;
}
private _decodeSimpleString(flag, chunk) {
private _decodeSimpleString(type, chunk) {
const start = this._cursor,
crlfIndex = this._findCRLF(chunk, start);
if (crlfIndex === -1) {
return this._continueDecodeSimpleString.bind(
this,
[chunk.subarray(start)],
flag
type
);
}
const slice = chunk.subarray(start, crlfIndex);
return flag === Buffer ?
return type === Buffer ?
slice :
slice.toString();
}
private _continueDecodeSimpleString(chunks, flag, chunk) {
private _continueDecodeSimpleString(chunks, type, chunk) {
const start = this._cursor,
crlfIndex = this._findCRLF(chunk, start);
if (crlfIndex === -1) {
chunks.push(chunk.subarray(start));
return this._continueDecodeSimpleString.bind(this, chunks, flag);
return this._continueDecodeSimpleString.bind(this, chunks, type);
}
chunks.push(chunk.subarray(start, crlfIndex));
return flag === Buffer ?
return type === Buffer ?
Buffer.concat(chunks) :
chunks.join('');
}
private _decodeBlobString(flag, chunk) {
private _decodeBlobString(type, chunk) {
// RESP 2 bulk string null
// https://github.com/redis/redis-specifications/blob/master/protocol/RESP2.md#resp-bulk-strings
if (chunk[this._cursor] === ASCII['-']) {
@@ -505,26 +508,26 @@ export class Decoder {
const length = this._decodeUnsingedNumber(0, chunk);
if (typeof length === 'function') {
return this._continueDecodeBlobStringLength.bind(this, length, flag);
return this._continueDecodeBlobStringLength.bind(this, length, type);
} else if (this._cursor >= chunk.length) {
return this._decodeBlobStringWithLength.bind(this, length, flag);
return this._decodeBlobStringWithLength.bind(this, length, type);
}
return this._decodeBlobStringWithLength(length, flag, chunk);
return this._decodeBlobStringWithLength(length, type, chunk);
}
private _continueDecodeBlobStringLength(lengthCb, flag, chunk) {
private _continueDecodeBlobStringLength(lengthCb, type, chunk) {
const length = lengthCb(chunk);
if (typeof length === 'function') {
return this._continueDecodeBlobStringLength.bind(this, length, flag);
return this._continueDecodeBlobStringLength.bind(this, length, type);
} else if (this._cursor >= chunk.length) {
return this._decodeBlobStringWithLength.bind(this, length, flag);
return this._decodeBlobStringWithLength.bind(this, length, type);
}
return this._decodeBlobStringWithLength(length, flag, chunk);
return this._decodeBlobStringWithLength(length, type, chunk);
}
private _decodeStringWithLength(length, skip, flag, chunk) {
private _decodeStringWithLength(length, skip, type, chunk) {
const end = this._cursor + length;
if (end >= chunk.length) {
const slice = chunk.subarray(this._cursor);
@@ -534,18 +537,18 @@ export class Decoder {
length - slice.length,
[slice],
skip,
flag
type
);
}
const slice = chunk.subarray(this._cursor, end);
this._cursor = end + skip;
return flag === Buffer ?
return type === Buffer ?
slice :
slice.toString();
}
private _continueDecodeStringWithLength(length, chunks, skip, flag, chunk) {
private _continueDecodeStringWithLength(length, chunks, skip, type, chunk) {
const end = this._cursor + length;
if (end >= chunk.length) {
const slice = chunk.subarray(this._cursor);
@@ -556,46 +559,46 @@ export class Decoder {
length - slice.length,
chunks,
skip,
flag
type
);
}
chunks.push(chunk.subarray(this._cursor, end));
this._cursor = end + skip;
return flag === Buffer ?
return type === Buffer ?
Buffer.concat(chunks) :
chunks.join('');
}
private _decodeBlobStringWithLength(length, flag, chunk) {
return this._decodeStringWithLength(length, 2, flag, chunk);
private _decodeBlobStringWithLength(length, type, chunk) {
return this._decodeStringWithLength(length, 2, type, chunk);
}
private _decodeVerbatimString(flag, chunk) {
private _decodeVerbatimString(type, chunk) {
return this._continueDecodeVerbatimStringLength(
this._decodeUnsingedNumber.bind(this, 0),
flag,
type,
chunk
);
}
private _continueDecodeVerbatimStringLength(lengthCb, flag, chunk) {
private _continueDecodeVerbatimStringLength(lengthCb, type, chunk) {
const length = lengthCb(chunk);
return typeof length === 'function'?
this._continueDecodeVerbatimStringLength.bind(this, length, flag) :
this._decodeVerbatimStringWithLength(length, flag, chunk);
this._continueDecodeVerbatimStringLength.bind(this, length, type) :
this._decodeVerbatimStringWithLength(length, type, chunk);
}
private _decodeVerbatimStringWithLength(length, flag, chunk) {
private _decodeVerbatimStringWithLength(length, type, chunk) {
const stringLength = length - 4; // skip <format>:
if (flag === VerbatimString) {
if (type === VerbatimString) {
return this._decodeVerbatimStringFormat(stringLength, chunk);
}
this._cursor += 4; // skip <format>:
return this._cursor >= chunk.length ?
this._decodeBlobStringWithLength.bind(this, stringLength, flag) :
this._decodeBlobStringWithLength(stringLength, flag, chunk);
this._decodeBlobStringWithLength.bind(this, stringLength, type) :
this._decodeBlobStringWithLength(stringLength, type, chunk);
}
private _decodeVerbatimStringFormat(stringLength, chunk) {
@@ -656,14 +659,14 @@ export class Decoder {
new BlobError(string);
}
private _decodeNestedType(flags, chunk) {
private _decodeNestedType(typeMapping, chunk) {
const type = chunk[this._cursor];
return ++this._cursor === chunk.length ?
this._decodeNestedTypeValue.bind(this, type, flags) :
this._decodeNestedTypeValue(type, flags, chunk);
this._decodeNestedTypeValue.bind(this, type, typeMapping) :
this._decodeNestedTypeValue(type, typeMapping, chunk);
}
private _decodeNestedTypeValue(type, flags, chunk) {
private _decodeNestedTypeValue(type, typeMapping, chunk) {
switch (type) {
case RESP_TYPES.NULL:
return this._decodeNull();
@@ -675,19 +678,19 @@ export class Decoder {
return this._decodeNumber(chunk);
case RESP_TYPES.BIG_NUMBER:
return this._decodeBigNumber(flags[RESP_TYPES.BIG_NUMBER], chunk);
return this._decodeBigNumber(typeMapping[RESP_TYPES.BIG_NUMBER], chunk);
case RESP_TYPES.DOUBLE:
return this._decodeDouble(flags[RESP_TYPES.DOUBLE], chunk);
return this._decodeDouble(typeMapping[RESP_TYPES.DOUBLE], chunk);
case RESP_TYPES.SIMPLE_STRING:
return this._decodeSimpleString(flags[RESP_TYPES.SIMPLE_STRING], chunk);
return this._decodeSimpleString(typeMapping[RESP_TYPES.SIMPLE_STRING], chunk);
case RESP_TYPES.BLOB_STRING:
return this._decodeBlobString(flags[RESP_TYPES.BLOB_STRING], chunk);
return this._decodeBlobString(typeMapping[RESP_TYPES.BLOB_STRING], chunk);
case RESP_TYPES.VERBATIM_STRING:
return this._decodeVerbatimString(flags[RESP_TYPES.VERBATIM_STRING], chunk);
return this._decodeVerbatimString(typeMapping[RESP_TYPES.VERBATIM_STRING], chunk);
case RESP_TYPES.SIMPLE_ERROR:
return this._decodeSimpleError(chunk);
@@ -696,17 +699,20 @@ export class Decoder {
return this._decodeBlobError(chunk);
case RESP_TYPES.ARRAY:
return this._decodeArray(flags, chunk);
return this._decodeArray(typeMapping, chunk);
case RESP_TYPES.SET:
return this._decodeSet(flags, chunk);
return this._decodeSet(typeMapping, chunk);
case RESP_TYPES.MAP:
return this._decodeMap(flags, chunk);
return this._decodeMap(typeMapping, chunk);
default:
throw new Error(`Unknown RESP type ${type} "${String.fromCharCode(type)}"`);
}
}
private _decodeArray(flags, chunk) {
private _decodeArray(typeMapping, chunk) {
// RESP 2 null
// https://github.com/redis/redis-specifications/blob/master/protocol/RESP2.md#resp-arrays
if (chunk[this._cursor] === ASCII['-']) {
@@ -716,49 +722,49 @@ export class Decoder {
return this._decodeArrayWithLength(
this._decodeUnsingedNumber(0, chunk),
flags,
typeMapping,
chunk
);
}
private _decodeArrayWithLength(length, flags, chunk) {
private _decodeArrayWithLength(length, typeMapping, chunk) {
return typeof length === 'function' ?
this._continueDecodeArrayLength.bind(this, length, flags) :
this._continueDecodeArrayLength.bind(this, length, typeMapping) :
this._decodeArrayItems(
new Array(length),
0,
flags,
typeMapping,
chunk
);
}
private _continueDecodeArrayLength(lengthCb, flags, chunk) {
private _continueDecodeArrayLength(lengthCb, typeMapping, chunk) {
return this._decodeArrayWithLength(
lengthCb(chunk),
flags,
typeMapping,
chunk
);
}
private _decodeArrayItems(array, filled, flags, chunk) {
private _decodeArrayItems(array, filled, typeMapping, chunk) {
for (let i = filled; i < array.length; i++) {
if (this._cursor >= chunk.length) {
return this._decodeArrayItems.bind(
this,
array,
i,
flags
typeMapping
);
}
const item = this._decodeNestedType(flags, chunk);
const item = this._decodeNestedType(typeMapping, chunk);
if (typeof item === 'function') {
return this._continueDecodeArrayItems.bind(
this,
array,
i,
item,
flags
typeMapping
);
}
@@ -768,7 +774,7 @@ export class Decoder {
return array;
}
private _continueDecodeArrayItems(array, filled, itemCb, flags, chunk) {
private _continueDecodeArrayItems(array, filled, itemCb, typeMapping, chunk) {
const item = itemCb(chunk);
if (typeof item === 'function') {
return this._continueDecodeArrayItems.bind(
@@ -776,52 +782,52 @@ export class Decoder {
array,
filled,
item,
flags
typeMapping
);
}
array[filled++] = item;
return this._decodeArrayItems(array, filled, flags, chunk);
return this._decodeArrayItems(array, filled, typeMapping, chunk);
}
private _decodeSet(flags, chunk) {
private _decodeSet(typeMapping, chunk) {
const length = this._decodeUnsingedNumber(0, chunk);
if (typeof length === 'function') {
return this._continueDecodeSetLength.bind(this, length, flags);
return this._continueDecodeSetLength.bind(this, length, typeMapping);
}
return this._decodeSetItems(
length,
flags,
typeMapping,
chunk
);
}
private _continueDecodeSetLength(lengthCb, flags, chunk) {
private _continueDecodeSetLength(lengthCb, typeMapping, chunk) {
const length = lengthCb(chunk);
return typeof length === 'function' ?
this._continueDecodeSetLength.bind(this, length, flags) :
this._decodeSetItems(length, flags, chunk);
this._continueDecodeSetLength.bind(this, length, typeMapping) :
this._decodeSetItems(length, typeMapping, chunk);
}
private _decodeSetItems(length, flags, chunk) {
return flags[RESP_TYPES.SET] === Set ?
private _decodeSetItems(length, typeMapping, chunk) {
return typeMapping[RESP_TYPES.SET] === Set ?
this._decodeSetAsSet(
new Set(),
length,
flags,
typeMapping,
chunk
) :
this._decodeArrayItems(
new Array(length),
0,
flags,
typeMapping,
chunk
);
}
private _decodeSetAsSet(set, remaining, flags, chunk) {
private _decodeSetAsSet(set, remaining, typeMapping, chunk) {
// using `remaining` instead of `length` & `set.size` to make it work even if the set contains duplicates
while (remaining > 0) {
if (this._cursor >= chunk.length) {
@@ -829,18 +835,18 @@ export class Decoder {
this,
set,
remaining,
flags
typeMapping
);
}
const item = this._decodeNestedType(flags, chunk);
const item = this._decodeNestedType(typeMapping, chunk);
if (typeof item === 'function') {
return this._continueDecodeSetAsSet.bind(
this,
set,
remaining,
item,
flags
typeMapping
);
}
@@ -851,7 +857,7 @@ export class Decoder {
return set;
}
private _continueDecodeSetAsSet(set, remaining, itemCb, flags, chunk) {
private _continueDecodeSetAsSet(set, remaining, itemCb, typeMapping, chunk) {
const item = itemCb(chunk);
if (typeof item === 'function') {
return this._continueDecodeSetAsSet.bind(
@@ -859,42 +865,42 @@ export class Decoder {
set,
remaining,
item,
flags
typeMapping
);
}
set.add(item);
return this._decodeSetAsSet(set, remaining - 1, flags, chunk);
return this._decodeSetAsSet(set, remaining - 1, typeMapping, chunk);
}
private _decodeMap(flags, chunk) {
private _decodeMap(typeMapping, chunk) {
const length = this._decodeUnsingedNumber(0, chunk);
if (typeof length === 'function') {
return this._continueDecodeMapLength.bind(this, length, flags);
return this._continueDecodeMapLength.bind(this, length, typeMapping);
}
return this._decodeMapItems(
length,
flags,
typeMapping,
chunk
);
}
private _continueDecodeMapLength(lengthCb, flags, chunk) {
private _continueDecodeMapLength(lengthCb, typeMapping, chunk) {
const length = lengthCb(chunk);
return typeof length === 'function' ?
this._continueDecodeMapLength.bind(this, length, flags) :
this._decodeMapItems(length, flags, chunk);
this._continueDecodeMapLength.bind(this, length, typeMapping) :
this._decodeMapItems(length, typeMapping, chunk);
}
private _decodeMapItems(length, flags, chunk) {
switch (flags[RESP_TYPES.MAP]) {
private _decodeMapItems(length, typeMapping, chunk) {
switch (typeMapping[RESP_TYPES.MAP]) {
case Map:
return this._decodeMapAsMap(
new Map(),
length,
flags,
typeMapping,
chunk
);
@@ -902,7 +908,7 @@ export class Decoder {
return this._decodeArrayItems(
new Array(length * 2),
0,
flags,
typeMapping,
chunk
);
@@ -910,13 +916,13 @@ export class Decoder {
return this._decodeMapAsObject(
Object.create(null),
length,
flags,
typeMapping,
chunk
);
}
}
private _decodeMapAsMap(map, remaining, flags, chunk) {
private _decodeMapAsMap(map, remaining, typeMapping, chunk) {
// using `remaining` instead of `length` & `map.size` to make it work even if the map contains duplicate keys
while (remaining > 0) {
if (this._cursor >= chunk.length) {
@@ -924,18 +930,18 @@ export class Decoder {
this,
map,
remaining,
flags
typeMapping
);
}
const key = this._decodeMapKey(flags, chunk);
const key = this._decodeMapKey(typeMapping, chunk);
if (typeof key === 'function') {
return this._continueDecodeMapKey.bind(
this,
map,
remaining,
key,
flags
typeMapping
);
}
@@ -945,12 +951,12 @@ export class Decoder {
map,
remaining,
key,
this._decodeNestedType.bind(this, flags),
flags
this._decodeNestedType.bind(this, typeMapping),
typeMapping
);
}
const value = this._decodeNestedType(flags, chunk);
const value = this._decodeNestedType(typeMapping, chunk);
if (typeof value === 'function') {
return this._continueDecodeMapValue.bind(
this,
@@ -958,7 +964,7 @@ export class Decoder {
remaining,
key,
value,
flags
typeMapping
);
}
@@ -969,14 +975,14 @@ export class Decoder {
return map;
}
private _decodeMapKey(flags, chunk) {
private _decodeMapKey(typeMapping, chunk) {
const type = chunk[this._cursor];
return ++this._cursor === chunk.length ?
this._decodeMapKeyValue.bind(this, type, flags) :
this._decodeMapKeyValue(type, flags, chunk);
this._decodeMapKeyValue.bind(this, type, typeMapping) :
this._decodeMapKeyValue(type, typeMapping, chunk);
}
private _decodeMapKeyValue(type, flags, chunk) {
private _decodeMapKeyValue(type, typeMapping, chunk) {
switch (type) {
// decode simple string map key as string (and not as buffer)
case RESP_TYPES.SIMPLE_STRING:
@@ -987,11 +993,11 @@ export class Decoder {
return this._decodeBlobString(String, chunk);
default:
return this._decodeNestedTypeValue(type, flags, chunk);
return this._decodeNestedTypeValue(type, typeMapping, chunk);
}
}
private _continueDecodeMapKey(map, remaining, keyCb, flags, chunk) {
private _continueDecodeMapKey(map, remaining, keyCb, typeMapping, chunk) {
const key = keyCb(chunk);
if (typeof key === 'function') {
return this._continueDecodeMapKey.bind(
@@ -999,7 +1005,7 @@ export class Decoder {
map,
remaining,
key,
flags
typeMapping
);
}
@@ -1009,12 +1015,12 @@ export class Decoder {
map,
remaining,
key,
this._decodeNestedType.bind(this, flags),
flags
this._decodeNestedType.bind(this, typeMapping),
typeMapping
);
}
const value = this._decodeNestedType(flags, chunk);
const value = this._decodeNestedType(typeMapping, chunk);
if (typeof value === 'function') {
return this._continueDecodeMapValue.bind(
this,
@@ -1022,15 +1028,15 @@ export class Decoder {
remaining,
key,
value,
flags
typeMapping
);
}
map.set(key, value);
return this._decodeMapAsMap(map, remaining - 1, flags, chunk);
return this._decodeMapAsMap(map, remaining - 1, typeMapping, chunk);
}
private _continueDecodeMapValue(map, remaining, key, valueCb, flags, chunk) {
private _continueDecodeMapValue(map, remaining, key, valueCb, typeMapping, chunk) {
const value = valueCb(chunk);
if (typeof value === 'function') {
return this._continueDecodeMapValue.bind(
@@ -1039,34 +1045,34 @@ export class Decoder {
remaining,
key,
value,
flags
typeMapping
);
}
map.set(key, value);
return this._decodeMapAsMap(map, remaining - 1, flags, chunk);
return this._decodeMapAsMap(map, remaining - 1, typeMapping, chunk);
}
private _decodeMapAsObject(object, remaining, flags, chunk) {
private _decodeMapAsObject(object, remaining, typeMapping, chunk) {
while (remaining > 0) {
if (this._cursor >= chunk.length) {
return this._decodeMapAsObject.bind(
this,
object,
remaining,
flags
typeMapping
);
}
const key = this._decodeMapKey(flags, chunk);
const key = this._decodeMapKey(typeMapping, chunk);
if (typeof key === 'function') {
return this._continueDecodeMapAsObjectKey.bind(
this,
object,
remaining,
key,
flags
typeMapping
);
}
@@ -1076,12 +1082,12 @@ export class Decoder {
object,
remaining,
key,
this._decodeNestedType.bind(this, flags),
flags
this._decodeNestedType.bind(this, typeMapping),
typeMapping
);
}
const value = this._decodeNestedType(flags, chunk);
const value = this._decodeNestedType(typeMapping, chunk);
if (typeof value === 'function') {
return this._continueDecodeMapAsObjectValue.bind(
this,
@@ -1089,7 +1095,7 @@ export class Decoder {
remaining,
key,
value,
flags
typeMapping
);
}
@@ -1100,7 +1106,7 @@ export class Decoder {
return object;
}
private _continueDecodeMapAsObjectKey(object, remaining, keyCb, flags, chunk) {
private _continueDecodeMapAsObjectKey(object, remaining, keyCb, typeMapping, chunk) {
const key = keyCb(chunk);
if (typeof key === 'function') {
return this._continueDecodeMapAsObjectKey.bind(
@@ -1108,7 +1114,7 @@ export class Decoder {
object,
remaining,
key,
flags
typeMapping
);
}
@@ -1118,12 +1124,12 @@ export class Decoder {
object,
remaining,
key,
this._decodeNestedType.bind(this, flags),
flags
this._decodeNestedType.bind(this, typeMapping),
typeMapping
);
}
const value = this._decodeNestedType(flags, chunk);
const value = this._decodeNestedType(typeMapping, chunk);
if (typeof value === 'function') {
return this._continueDecodeMapAsObjectValue.bind(
this,
@@ -1131,16 +1137,16 @@ export class Decoder {
remaining,
key,
value,
flags
typeMapping
);
}
object[key] = value;
return this._decodeMapAsObject(object, remaining - 1, flags, chunk);
return this._decodeMapAsObject(object, remaining - 1, typeMapping, chunk);
}
private _continueDecodeMapAsObjectValue(object, remaining, key, valueCb, flags, chunk) {
private _continueDecodeMapAsObjectValue(object, remaining, key, valueCb, typeMapping, chunk) {
const value = valueCb(chunk);
if (typeof value === 'function') {
return this._continueDecodeMapAsObjectValue.bind(
@@ -1149,12 +1155,12 @@ export class Decoder {
remaining,
key,
value,
flags
typeMapping
);
}
object[key] = value;
return this._decodeMapAsObject(object, remaining - 1, flags, chunk);
return this._decodeMapAsObject(object, remaining - 1, typeMapping, chunk);
}
}

View File

@@ -148,45 +148,45 @@ export type ReplyUnion = NullReply | BooleanReply | NumberReply | BigNumberReply
Map<ReplyUnion, ReplyUnion> | Array<ReplyUnion | ReplyUnion>
>;
export type Reply = ReplyWithFlags<ReplyUnion, {}>;
export type Reply = ReplyWithTypeMapping<ReplyUnion, {}>;
export type Flag<T> = ((...args: any) => T) | (new (...args: any) => T);
type RespTypeUnion<T> = T extends RespType<RespTypes, unknown, unknown, infer FLAG_TYPES> ? FLAG_TYPES : never;
export type Flags = {
export type TypeMapping = {
[P in RespTypes]?: Flag<RespTypeUnion<Extract<ReplyUnion, RespType<P, any, any, any>>>>;
};
type MapKey<
T,
FLAGS extends Flags
> = ReplyWithFlags<T, FLAGS & {
TYPE_MAPPING extends TypeMapping
> = ReplyWithTypeMapping<T, TYPE_MAPPING & {
// simple and blob strings as map keys decoded as strings
[RESP_TYPES.SIMPLE_STRING]: StringConstructor;
[RESP_TYPES.BLOB_STRING]: StringConstructor;
}>;
export type ReplyWithFlags<
export type ReplyWithTypeMapping<
REPLY,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = (
// if REPLY is a type, extract the coresponding type from FLAGS or use the default type
// if REPLY is a type, extract the coresponding type from TYPE_MAPPING or use the default type
REPLY extends RespType<infer RESP_TYPE, infer DEFAULT, infer TYPES, unknown> ?
FLAGS[RESP_TYPE] extends Flag<infer T> ?
ReplyWithFlags<Extract<DEFAULT | TYPES, T>, FLAGS> :
ReplyWithFlags<DEFAULT, FLAGS>
TYPE_MAPPING[RESP_TYPE] extends Flag<infer T> ?
ReplyWithTypeMapping<Extract<DEFAULT | TYPES, T>, TYPE_MAPPING> :
ReplyWithTypeMapping<DEFAULT, TYPE_MAPPING>
: (
// if REPLY is a known generic type, convert its generic arguments
// TODO: tuples?
REPLY extends Array<infer T> ? Array<ReplyWithFlags<T, FLAGS>> :
REPLY extends Set<infer T> ? Set<ReplyWithFlags<T, FLAGS>> :
REPLY extends Map<infer K, infer V> ? Map<MapKey<K, FLAGS>, ReplyWithFlags<V, FLAGS>> :
REPLY extends Array<infer T> ? Array<ReplyWithTypeMapping<T, TYPE_MAPPING>> :
REPLY extends Set<infer T> ? Set<ReplyWithTypeMapping<T, TYPE_MAPPING>> :
REPLY extends Map<infer K, infer V> ? Map<MapKey<K, TYPE_MAPPING>, ReplyWithTypeMapping<V, TYPE_MAPPING>> :
// `Date` & `Buffer` are supersets of `Record`, so they need to be checked first
REPLY extends Date ? REPLY :
REPLY extends Buffer ? REPLY :
REPLY extends Record<PropertyKey, any> ? {
[P in keyof REPLY]: ReplyWithFlags<REPLY[P], FLAGS>;
[P in keyof REPLY]: ReplyWithTypeMapping<REPLY[P], TYPE_MAPPING>;
} :
// otherwise, just return the REPLY as is
REPLY
@@ -333,17 +333,17 @@ export type CommandReply<
export type CommandSignature<
COMMAND extends Command,
RESP extends RespVersions,
FLAGS extends Flags
> = (...args: Parameters<COMMAND['transformArguments']>) => Promise<ReplyWithFlags<CommandReply<COMMAND, RESP>, FLAGS>>;
TYPE_MAPPING extends TypeMapping
> = (...args: Parameters<COMMAND['transformArguments']>) => Promise<ReplyWithTypeMapping<CommandReply<COMMAND, RESP>, TYPE_MAPPING>>;
export type CommandWithPoliciesSignature<
COMMAND extends Command,
RESP extends RespVersions,
FLAGS extends Flags,
TYPE_MAPPING extends TypeMapping,
POLICIES extends CommandPolicies
> = (...args: Parameters<COMMAND['transformArguments']>) => Promise<
ReplyWithPolicy<
ReplyWithFlags<CommandReply<COMMAND, RESP>, FLAGS>,
ReplyWithTypeMapping<CommandReply<COMMAND, RESP>, TYPE_MAPPING>,
MergePolicies<COMMAND, POLICIES>
>
>;

View File

@@ -1,7 +1,7 @@
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_FLAGS, RESP_TYPES } from '../RESP/decoder';
import { CommandArguments, Flags, ReplyUnion, RespVersions } from '../RESP/types';
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types';
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply } from '../errors';
import { EventEmitter } from 'stream';
@@ -10,7 +10,7 @@ export interface QueueCommandOptions {
chainId?: symbol;
asap?: boolean;
abortSignal?: AbortSignal;
flags?: Flags;
typeMapping?: TypeMapping;
}
export interface CommandWaitingToBeSent extends CommandWaitingForReply {
@@ -26,15 +26,15 @@ interface CommandWaitingForReply {
resolve(reply?: unknown): void;
reject(err: unknown): void;
channelsCounter?: number;
flags?: Flags;
typeMapping?: TypeMapping;
}
export type OnShardedChannelMoved = (channel: string, listeners: ChannelListeners) => void;
const PONG = Buffer.from('pong');
const RESP2_PUSH_FLAGS = {
...PUSH_FLAGS,
const RESP2_PUSH_TYPE_MAPPING = {
...PUSH_TYPE_MAPPING,
[RESP_TYPES.SIMPLE_STRING]: Buffer
};
@@ -102,8 +102,8 @@ export default class RedisCommandsQueue {
}
}
private _getFlags() {
return this._waitingForReply.head!.value.flags ?? {};
private _getTypeMapping() {
return this._waitingForReply.head!.value.typeMapping ?? {};
}
private _initiateResp3Decoder() {
@@ -115,7 +115,7 @@ export default class RedisCommandsQueue {
}
},
getFlags: () => this._getFlags()
getTypeMapping: () => this._getTypeMapping()
});
}
@@ -126,9 +126,9 @@ export default class RedisCommandsQueue {
if (this._onPush(reply)) return;
if (PONG.equals(reply[0] as Buffer)) {
const { resolve, flags } = this._waitingForReply.shift()!,
const { resolve, typeMapping } = this._waitingForReply.shift()!,
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
resolve(flags?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
return;
}
}
@@ -140,11 +140,11 @@ export default class RedisCommandsQueue {
// PubSub is handled in onReply
// @ts-expect-error
onPush: undefined,
getFlags: () => {
getTypeMapping: () => {
// PubSub push is an Array in RESP2
return this._pubSub.isActive ?
RESP2_PUSH_FLAGS :
this._getFlags();
RESP2_PUSH_TYPE_MAPPING :
this._getTypeMapping();
}
});
}
@@ -161,7 +161,7 @@ export default class RedisCommandsQueue {
const value: CommandWaitingToBeSent = {
args,
chainId: options?.chainId,
flags: options?.flags,
typeMapping: options?.typeMapping,
resolve,
reject,
abort: undefined
@@ -243,7 +243,7 @@ export default class RedisCommandsQueue {
this._waitingToBeSent.push({
args: command.args,
channelsCounter: command.channelsCounter,
flags: PUSH_FLAGS,
typeMapping: PUSH_TYPE_MAPPING,
resolve: () => {
command.resolve();
resolve();
@@ -282,7 +282,7 @@ export default class RedisCommandsQueue {
return encoded;
}
#flushWaitingForReply(err: Error): void {
private _flushWaitingForReply(err: Error): void {
while (this._waitingForReply.head) {
this._waitingForReply.shift()!.reject(err);
}
@@ -304,7 +304,7 @@ export default class RedisCommandsQueue {
this.decoder.reset();
this._pubSub.reset();
this.#flushWaitingForReply(err);
this._flushWaitingForReply(err);
if (!this._chainInExecution) return;
@@ -321,7 +321,7 @@ export default class RedisCommandsQueue {
flushAll(err: Error): void {
this.decoder.reset();
this._pubSub.reset();
this.#flushWaitingForReply(err);
this._flushWaitingForReply(err);
while (this._waitingToBeSent.head) {
RedisCommandsQueue._flushWaitingToBeSent(
this._waitingToBeSent.shift()!,

View File

@@ -7,14 +7,14 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchErr
import { URL } from 'url';
import { TcpSocketConnectOpts } from 'net';
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { Command, CommandArguments, CommandSignature, Flags, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { Command, CommandArguments, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
import { RedisMultiQueuedCommand } from '../multi-command';
import HELLO, { HelloOptions } from '../commands/HELLO';
import { ReplyWithFlags, CommandReply } from '../RESP/types';
import { ReplyWithTypeMapping, CommandReply } from '../RESP/types';
import SCAN, { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
import { RedisClientPool } from './pool';
// import { RedisClientPool } from './pool';
export interface RedisClientOptions<
M extends RedisModules = RedisModules,
@@ -69,37 +69,37 @@ export interface RedisClientOptions<
type WithCommands<
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, FLAGS>;
[P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, TYPE_MAPPING>;
};
type WithModules<
M extends RedisModules,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof M]: {
[C in keyof M[P]]: CommandSignature<M[P][C], RESP, FLAGS>;
[C in keyof M[P]]: CommandSignature<M[P][C], RESP, TYPE_MAPPING>;
};
};
type WithFunctions<
F extends RedisFunctions,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[L in keyof F]: {
[C in keyof F[L]]: CommandSignature<F[L][C], RESP, FLAGS>;
[C in keyof F[L]]: CommandSignature<F[L][C], RESP, TYPE_MAPPING>;
};
};
type WithScripts<
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof S]: CommandSignature<S[P], RESP, FLAGS>;
[P in keyof S]: CommandSignature<S[P], RESP, TYPE_MAPPING>;
};
export type RedisClientType<
@@ -107,20 +107,20 @@ export type RedisClientType<
F extends RedisFunctions = {},
S extends RedisScripts = {},
RESP extends RespVersions = 2,
FLAGS extends Flags = {}
TYPE_MAPPING extends TypeMapping = {}
> = (
RedisClient<M, F, S, RESP, FLAGS> &
WithCommands<RESP, FLAGS> &
WithModules<M, RESP, FLAGS> &
WithFunctions<F, RESP, FLAGS> &
WithScripts<S, RESP, FLAGS>
RedisClient<M, F, S, RESP, TYPE_MAPPING> &
WithCommands<RESP, TYPE_MAPPING> &
WithModules<M, RESP, TYPE_MAPPING> &
WithFunctions<F, RESP, TYPE_MAPPING> &
WithScripts<S, RESP, TYPE_MAPPING>
);
export interface ClientCommandOptions extends QueueCommandOptions {
// isolated?: boolean;
}
type ProxyClient = RedisClient<{}, {}, {}, RespVersions, Flags> & { commandOptions?: ClientCommandOptions };
type ProxyClient = RedisClient<{}, {}, {}, RespVersions, TypeMapping> & { commandOptions?: ClientCommandOptions };
type NamespaceProxyClient = { self: ProxyClient };
@@ -133,7 +133,7 @@ export default class RedisClient<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> extends EventEmitter {
private static _createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
@@ -396,7 +396,15 @@ export default class RedisClient<
};
return new RedisSocket(socketInitiator, this._options?.socket)
.on('data', chunk => this._queue.decoder.write(chunk))
.on('data', chunk => {
try {
this._queue.decoder.write(chunk);
} catch (err) {
this._queue.decoder.reset();
this.emit('error', err);
}
})
.on('error', err => {
this.emit('error', err);
if (this._socket.isOpen && !this._options?.disableOfflineQueue) {
@@ -440,7 +448,7 @@ export default class RedisClient<
F,
S,
RESP,
T['flags'] extends Flags ? T['flags'] : {}
T['typeMapping'] extends TypeMapping ? T['typeMapping'] : {}
>;
}
@@ -459,17 +467,20 @@ export default class RedisClient<
F,
S,
RESP,
K extends 'flags' ? V extends Flags ? V : {} : FLAGS
K extends 'typeMapping' ? V extends TypeMapping ? V : {} : TYPE_MAPPING
>;
}
/**
* Override the `flags` command option
* Override the `typeMapping` command option
*/
withFlags<FLAGS extends Flags>(flags: FLAGS) {
return this._commandOptionsProxy('flags', flags);
withTypeMapping<TYPE_MAPPING extends TypeMapping>(typeMapping: TYPE_MAPPING) {
return this._commandOptionsProxy('typeMapping', typeMapping);
}
/**
* Override the `abortSignal` command option
*/
withAbortSignal(abortSignal: AbortSignal) {
return this._commandOptionsProxy('abortSignal', abortSignal);
}
@@ -482,7 +493,7 @@ export default class RedisClient<
}
/**
* Get the "legacy" (v3/callback) interface
* Create the "legacy" (v3/callback) interface
*/
legacy(): RedisLegacyClientType {
return new RedisLegacyClient(
@@ -493,11 +504,11 @@ export default class RedisClient<
/**
* Create `RedisClientPool` using this client as a prototype
*/
pool() {
return RedisClientPool.fromClient(
this as unknown as RedisClientType<M, F, S, RESP>
);
}
// pool() {
// return RedisClientPool.fromClient(
// this as unknown as RedisClientType<M, F, S, RESP>
// );
// }
duplicate(overrides?: Partial<RedisClientOptions<M, F, S, RESP>>) {
return new (Object.getPrototypeOf(this).constructor)({
@@ -679,12 +690,12 @@ export default class RedisClient<
private _addMultiCommands(
commands: Array<RedisMultiQueuedCommand>,
chainId?: symbol,
flags?: Flags
typeMapping?: TypeMapping
) {
return Promise.all(
commands.map(({ args }) => this._queue.addCommand(args, {
chainId,
flags
typeMapping
}))
);
}
@@ -699,7 +710,7 @@ export default class RedisClient<
const promise = Promise.all(
commands.map(({ args }) => this._queue.addCommand(args, {
flags: (this as ProxyClient).commandOptions?.flags
typeMapping: (this as ProxyClient).commandOptions?.typeMapping
}))
);
this._tick();
@@ -710,24 +721,24 @@ export default class RedisClient<
* @internal
*/
async executeMulti(
commands: Array<RedisMultiQueuedCommand>,
commands: Array<RedisMultiQueuedCommand>,
selectedDB?: number
) {
if (!this._socket.isOpen) {
return Promise.reject(new ClientClosedError());
}
if (!this._socket.isOpen) {
return Promise.reject(new ClientClosedError());
}
const flags = (this as ProxyClient).commandOptions?.flags,
const typeMapping = (this as ProxyClient).commandOptions?.typeMapping,
chainId = Symbol('MULTI Chain'),
promises = [
this._queue.addCommand(['MULTI'], { chainId }),
this._queue.addCommand(['MULTI'], { chainId }),
];
for (const { args } of commands) {
promises.push(
this._queue.addCommand(args, {
chainId,
flags
typeMapping
})
);
}
@@ -736,7 +747,7 @@ export default class RedisClient<
this._queue.addCommand(['EXEC'], { chainId })
);
this._tick();
this._tick();
const results = await Promise.all(promises),
execResult = results[results.length - 1];
@@ -745,23 +756,23 @@ export default class RedisClient<
throw new WatchError();
}
if (selectedDB !== undefined) {
this._selectedDB = selectedDB;
}
if (selectedDB !== undefined) {
this._selectedDB = selectedDB;
}
return execResult as Array<unknown>;
}
}
MULTI(): RedisClientMultiCommandType<[], M, F, S, RESP, FLAGS> {
MULTI(): RedisClientMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING> {
return new (this as any).Multi(this);
}
multi = this.MULTI;
async* scanIterator(
this: RedisClientType<M, F, S, RESP, FLAGS>,
this: RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
options?: ScanOptions & ScanIteratorOptions
): AsyncIterable<ReplyWithFlags<CommandReply<typeof SCAN, RESP>['keys'], FLAGS>> {
): AsyncIterable<ReplyWithTypeMapping<CommandReply<typeof SCAN, RESP>['keys'], TYPE_MAPPING>> {
let cursor = options?.cursor ?? 0;
do {
const reply = await this.scan(cursor, options);
@@ -771,7 +782,7 @@ export default class RedisClient<
}
async* hScanIterator(
this: RedisClientType<M, F, S, RESP, FLAGS>,
this: RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
key: RedisArgument,
options?: ScanCommonOptions & ScanIteratorOptions
) {
@@ -784,7 +795,7 @@ export default class RedisClient<
}
async* sScanIterator(
this: RedisClientType<M, F, S, RESP, FLAGS>,
this: RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
key: RedisArgument,
options?: ScanCommonOptions & ScanIteratorOptions
) {
@@ -797,7 +808,7 @@ export default class RedisClient<
}
async* zScanIterator(
this: RedisClientType<M, F, S, RESP, FLAGS>,
this: RedisClientType<M, F, S, RESP, TYPE_MAPPING>,
key: RedisArgument,
options?: ScanCommonOptions & ScanIteratorOptions
) {
@@ -843,7 +854,7 @@ export default class RedisClient<
const maybeClose = () => {
if (!this._queue.isEmpty()) return;
this._socket.off('data', maybeClose);
this._socket.destroySocket();
resolve();

View File

@@ -1,4 +1,4 @@
import { RedisModules, RedisFunctions, RedisScripts, RespVersions, Flags, Command, CommandArguments, ReplyUnion } from '../RESP/types';
import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, Command, CommandArguments, ReplyUnion } from '../RESP/types';
import { RedisClientType } from '.';
import { getTransformReply } from '../commander';
import { ErrorReply } from '../errors';

View File

@@ -1,6 +1,6 @@
import COMMANDS from '../commands';
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command';
import { ReplyWithFlags, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, Flags, ReplyUnion } from '../RESP/types';
import { ReplyWithTypeMapping, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, TypeMapping, ReplyUnion } from '../RESP/types';
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
import { RedisClientType } from '.';
@@ -11,14 +11,14 @@ type CommandSignature<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = (...args: Parameters<C['transformArguments']>) => RedisClientMultiCommandType<
[...REPLIES, ReplyWithFlags<CommandReply<C, RESP>, FLAGS>],
[...REPLIES, ReplyWithTypeMapping<CommandReply<C, RESP>, TYPE_MAPPING>],
M,
F,
S,
RESP,
FLAGS
TYPE_MAPPING
>;
type WithCommands<
@@ -27,9 +27,9 @@ type WithCommands<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof typeof COMMANDS]: CommandSignature<REPLIES, (typeof COMMANDS)[P], M, F, S, RESP, FLAGS>;
[P in keyof typeof COMMANDS]: CommandSignature<REPLIES, (typeof COMMANDS)[P], M, F, S, RESP, TYPE_MAPPING>;
};
type WithModules<
@@ -38,10 +38,10 @@ type WithModules<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof M]: {
[C in keyof M[P]]: CommandSignature<REPLIES, M[P][C], M, F, S, RESP, FLAGS>;
[C in keyof M[P]]: CommandSignature<REPLIES, M[P][C], M, F, S, RESP, TYPE_MAPPING>;
};
};
@@ -51,10 +51,10 @@ type WithFunctions<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[L in keyof F]: {
[C in keyof F[L]]: CommandSignature<REPLIES, F[L][C], M, F, S, RESP, FLAGS>;
[C in keyof F[L]]: CommandSignature<REPLIES, F[L][C], M, F, S, RESP, TYPE_MAPPING>;
};
};
@@ -64,9 +64,9 @@ type WithScripts<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof S]: CommandSignature<REPLIES, S[P], M, F, S, RESP, FLAGS>;
[P in keyof S]: CommandSignature<REPLIES, S[P], M, F, S, RESP, TYPE_MAPPING>;
};
export type RedisClientMultiCommandType<
@@ -75,13 +75,13 @@ export type RedisClientMultiCommandType<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = (
RedisClientMultiCommand<REPLIES> &
WithCommands<REPLIES, M, F, S, RESP, FLAGS> &
WithModules<REPLIES, M, F, S, RESP, FLAGS> &
WithFunctions<REPLIES, M, F, S, RESP, FLAGS> &
WithScripts<REPLIES, M, F, S, RESP, FLAGS>
WithCommands<REPLIES, M, F, S, RESP, TYPE_MAPPING> &
WithModules<REPLIES, M, F, S, RESP, TYPE_MAPPING> &
WithFunctions<REPLIES, M, F, S, RESP, TYPE_MAPPING> &
WithScripts<REPLIES, M, F, S, RESP, TYPE_MAPPING>
);
export default class RedisClientMultiCommand<REPLIES = []> {

View File

@@ -46,7 +46,7 @@ interface CreateSocketReturn<T> {
export type RedisSocketInitiator = () => Promise<void>;
export default class RedisSocket extends EventEmitter {
static #initiateOptions(options?: RedisSocketOptions): RedisSocketOptions {
private static _initiateOptions(options?: RedisSocketOptions): RedisSocketOptions {
options ??= {};
if (!(options as net.IpcSocketConnectOpts).path) {
(options as net.TcpSocketConnectOpts).port ??= 6379;
@@ -60,7 +60,7 @@ export default class RedisSocket extends EventEmitter {
return options;
}
static #isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions {
private static _isTlsSocket(options: RedisSocketOptions): options is RedisTlsSocketOptions {
return (options as RedisTlsSocketOptions).tls === true;
}
@@ -96,7 +96,7 @@ export default class RedisSocket extends EventEmitter {
super();
this._initiator = initiator;
this._options = RedisSocket.#initiateOptions(options);
this._options = RedisSocket._initiateOptions(options);
}
private _reconnectStrategy(retries: number, cause: Error) {
@@ -176,7 +176,7 @@ export default class RedisSocket extends EventEmitter {
private _createSocket(): Promise<net.Socket | tls.TLSSocket> {
return new Promise((resolve, reject) => {
const { connectEvent, socket } = RedisSocket.#isTlsSocket(this._options) ?
const { connectEvent, socket } = RedisSocket._isTlsSocket(this._options) ?
this._createTlsSocket() :
this._createNetSocket();

View File

@@ -1,6 +1,6 @@
import COMMANDS from '../commands';
import RedisMultiCommand, { MULTI_REPLY, MultiReply, MultiReplyType } from '../multi-command';
import { ReplyWithFlags, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, Flags, ReplyUnion, RedisArgument } from '../RESP/types';
import { ReplyWithTypeMapping, CommandReply, Command, CommandArguments, CommanderConfig, RedisFunctions, RedisModules, RedisScripts, RespVersions, TransformReply, RedisScript, RedisFunction, TypeMapping, ReplyUnion, RedisArgument } from '../RESP/types';
import { attachConfig, functionArgumentsPrefix, getTransformReply } from '../commander';
import RedisCluster, { RedisClusterType } from '.';
@@ -11,14 +11,14 @@ type CommandSignature<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = (...args: Parameters<C['transformArguments']>) => RedisClusterMultiCommandType<
[...REPLIES, ReplyWithFlags<CommandReply<C, RESP>, FLAGS>],
[...REPLIES, ReplyWithTypeMapping<CommandReply<C, RESP>, TYPE_MAPPING>],
M,
F,
S,
RESP,
FLAGS
TYPE_MAPPING
>;
type WithCommands<
@@ -27,9 +27,9 @@ type WithCommands<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof typeof COMMANDS]: CommandSignature<REPLIES, (typeof COMMANDS)[P], M, F, S, RESP, FLAGS>;
[P in keyof typeof COMMANDS]: CommandSignature<REPLIES, (typeof COMMANDS)[P], M, F, S, RESP, TYPE_MAPPING>;
};
type WithModules<
@@ -38,10 +38,10 @@ type WithModules<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof M]: {
[C in keyof M[P]]: CommandSignature<REPLIES, M[P][C], M, F, S, RESP, FLAGS>;
[C in keyof M[P]]: CommandSignature<REPLIES, M[P][C], M, F, S, RESP, TYPE_MAPPING>;
};
};
@@ -51,10 +51,10 @@ type WithFunctions<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[L in keyof F]: {
[C in keyof F[L]]: CommandSignature<REPLIES, F[L][C], M, F, S, RESP, FLAGS>;
[C in keyof F[L]]: CommandSignature<REPLIES, F[L][C], M, F, S, RESP, TYPE_MAPPING>;
};
};
@@ -64,9 +64,9 @@ type WithScripts<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = {
[P in keyof S]: CommandSignature<REPLIES, S[P], M, F, S, RESP, FLAGS>;
[P in keyof S]: CommandSignature<REPLIES, S[P], M, F, S, RESP, TYPE_MAPPING>;
};
export type RedisClusterMultiCommandType<
@@ -75,13 +75,13 @@ export type RedisClusterMultiCommandType<
F extends RedisFunctions,
S extends RedisScripts,
RESP extends RespVersions,
FLAGS extends Flags
TYPE_MAPPING extends TypeMapping
> = (
RedisClusterMultiCommand<REPLIES> &
WithCommands<REPLIES, M, F, S, RESP, FLAGS> &
WithModules<REPLIES, M, F, S, RESP, FLAGS> &
WithFunctions<REPLIES, M, F, S, RESP, FLAGS> &
WithScripts<REPLIES, M, F, S, RESP, FLAGS>
WithCommands<REPLIES, M, F, S, RESP, TYPE_MAPPING> &
WithModules<REPLIES, M, F, S, RESP, TYPE_MAPPING> &
WithFunctions<REPLIES, M, F, S, RESP, TYPE_MAPPING> &
WithScripts<REPLIES, M, F, S, RESP, TYPE_MAPPING>
);
export default class RedisClusterMultiCommand<REPLIES = []> {

View File

@@ -1,96 +1,77 @@
import { strict as assert } from 'assert';
import RedisMultiCommand from './multi-command';
import { WatchError } from './errors';
import { SQUARE_SCRIPT } from './client/index.spec';
describe('Multi Command', () => {
it('generateChainId', () => {
assert.equal(
typeof RedisMultiCommand.generateChainId(),
'symbol'
);
it('addCommand', () => {
const multi = new RedisMultiCommand();
multi.addCommand(['PING']);
assert.deepEqual(
multi.queue[0].args,
['PING']
);
});
describe('addScript', () => {
const multi = new RedisMultiCommand();
it('should use EVAL', () => {
multi.addScript(SQUARE_SCRIPT, ['1']);
assert.deepEqual(
Array.from(multi.queue.at(-1).args),
['EVAL', SQUARE_SCRIPT.SCRIPT, '0', '1']
);
});
it('addCommand', () => {
const multi = new RedisMultiCommand();
multi.addCommand(['PING']);
assert.deepEqual(
multi.queue[0].args,
['PING']
);
it('should use EVALSHA', () => {
multi.addScript(SQUARE_SCRIPT, ['2']);
assert.deepEqual(
Array.from(multi.queue.at(-1).args),
['EVALSHA', SQUARE_SCRIPT.SHA1, '0', '2']
);
});
it('addScript', () => {
const multi = new RedisMultiCommand();
it('without NUMBER_OF_KEYS', () => {
multi.addScript({
...SQUARE_SCRIPT,
NUMBER_OF_KEYS: undefined
}, ['2']);
assert.deepEqual(
Array.from(multi.queue.at(-1).args),
['EVALSHA', SQUARE_SCRIPT.SHA1, '2']
);
});
});
multi.addScript(SQUARE_SCRIPT, ['1']);
assert.equal(
multi.scriptsInUse.has(SQUARE_SCRIPT.SHA1),
true
);
assert.deepEqual(
multi.queue[0].args,
['EVAL', SQUARE_SCRIPT.SCRIPT, '0', '1']
);
multi.addScript(SQUARE_SCRIPT, ['2']);
assert.equal(
multi.scriptsInUse.has(SQUARE_SCRIPT.SHA1),
true
);
assert.deepEqual(
multi.queue[1].args,
['EVALSHA', SQUARE_SCRIPT.SHA1, '0', '2']
);
describe('exec', () => {
it('without commands', () => {
assert.deepEqual(
new RedisMultiCommand().queue,
[]
);
});
describe('exec', () => {
it('without commands', () => {
assert.deepEqual(
new RedisMultiCommand().queue,
[]
);
});
it('with commands', () => {
const multi = new RedisMultiCommand();
multi.addCommand(['PING']);
it('with commands', () => {
const multi = new RedisMultiCommand();
multi.addCommand(['PING']);
assert.deepEqual(
multi.queue,
[{
args: ['PING'],
transformReply: undefined
}]
);
});
assert.deepEqual(
multi.queue,
[{
args: ['PING'],
transformReply: undefined
}]
);
});
});
describe('handleExecReplies', () => {
it('WatchError', () => {
assert.throws(
() => new RedisMultiCommand().handleExecReplies([null]),
WatchError
);
});
it('with replies', () => {
const multi = new RedisMultiCommand();
multi.addCommand(['PING']);
assert.deepEqual(
multi.handleExecReplies(['OK', 'QUEUED', ['PONG']]),
['PONG']
);
});
});
it('transformReplies', () => {
const multi = new RedisMultiCommand();
multi.addCommand(['PING'], (reply: string) => reply.substring(0, 2));
assert.deepEqual(
multi.transformReplies(['PONG']),
['PO']
);
});
it('transformReplies', () => {
const multi = new RedisMultiCommand();
multi.addCommand(['PING'], (reply: string) => reply.substring(0, 2));
assert.deepEqual(
multi.transformReplies(['PONG']),
['PO']
);
});
});

View File

@@ -29,6 +29,7 @@ export default class RedisMultiCommand {
addScript(script: RedisScript, args: CommandArguments, transformReply?: TransformReply) {
const redisArgs: CommandArguments = [];
redisArgs.preserve = args.preserve;
if (this.scriptsInUse.has(script.SHA1)) {
redisArgs.push('EVALSHA', script.SHA1);
} else {
@@ -41,7 +42,6 @@ export default class RedisMultiCommand {
}
redisArgs.push(...args);
redisArgs.preserve = args.preserve;
this.addCommand(redisArgs, transformReply);
}