You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-07 13:22:56 +03:00
legacy mode and isolation pool
This commit is contained in:
@@ -11,9 +11,10 @@ import { Command, CommandArguments, CommandSignature, Flags, CommanderConfig, Re
|
|||||||
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
|
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
|
||||||
import { RedisMultiQueuedCommand } from '../multi-command';
|
import { RedisMultiQueuedCommand } from '../multi-command';
|
||||||
import HELLO, { HelloOptions } from '../commands/HELLO';
|
import HELLO, { HelloOptions } from '../commands/HELLO';
|
||||||
import { Pool, Options as PoolOptions, createPool } from 'generic-pool';
|
|
||||||
import { ReplyWithFlags, CommandReply } from '../RESP/types';
|
import { ReplyWithFlags, CommandReply } from '../RESP/types';
|
||||||
import SCAN, { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
|
import SCAN, { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
|
||||||
|
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
|
||||||
|
import { RedisClientPool } from './pool';
|
||||||
|
|
||||||
export interface RedisClientOptions<
|
export interface RedisClientOptions<
|
||||||
M extends RedisModules = RedisModules,
|
M extends RedisModules = RedisModules,
|
||||||
@@ -59,14 +60,6 @@ export interface RedisClientOptions<
|
|||||||
* Connect in [`READONLY`](https://redis.io/commands/readonly) mode
|
* Connect in [`READONLY`](https://redis.io/commands/readonly) mode
|
||||||
*/
|
*/
|
||||||
readonly?: boolean;
|
readonly?: boolean;
|
||||||
/**
|
|
||||||
* TODO
|
|
||||||
*/
|
|
||||||
legacyMode?: boolean;
|
|
||||||
/**
|
|
||||||
* TODO
|
|
||||||
*/
|
|
||||||
isolationPoolOptions?: PoolOptions;
|
|
||||||
/**
|
/**
|
||||||
* Send `PING` command at interval (in ms).
|
* Send `PING` command at interval (in ms).
|
||||||
* Useful with Redis deployments that do not use TCP Keep-Alive.
|
* Useful with Redis deployments that do not use TCP Keep-Alive.
|
||||||
@@ -79,7 +72,7 @@ type WithCommands<
|
|||||||
FLAGS extends Flags
|
FLAGS extends Flags
|
||||||
> = {
|
> = {
|
||||||
[P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, FLAGS>;
|
[P in keyof typeof COMMANDS]: CommandSignature<(typeof COMMANDS)[P], RESP, FLAGS>;
|
||||||
};
|
};
|
||||||
|
|
||||||
type WithModules<
|
type WithModules<
|
||||||
M extends RedisModules,
|
M extends RedisModules,
|
||||||
@@ -89,7 +82,7 @@ type WithModules<
|
|||||||
[P in keyof M]: {
|
[P in keyof M]: {
|
||||||
[C in keyof M[P]]: CommandSignature<M[P][C], RESP, FLAGS>;
|
[C in keyof M[P]]: CommandSignature<M[P][C], RESP, FLAGS>;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
type WithFunctions<
|
type WithFunctions<
|
||||||
F extends RedisFunctions,
|
F extends RedisFunctions,
|
||||||
@@ -99,7 +92,7 @@ type WithFunctions<
|
|||||||
[L in keyof F]: {
|
[L in keyof F]: {
|
||||||
[C in keyof F[L]]: CommandSignature<F[L][C], RESP, FLAGS>;
|
[C in keyof F[L]]: CommandSignature<F[L][C], RESP, FLAGS>;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
type WithScripts<
|
type WithScripts<
|
||||||
S extends RedisScripts,
|
S extends RedisScripts,
|
||||||
@@ -107,7 +100,7 @@ type WithScripts<
|
|||||||
FLAGS extends Flags
|
FLAGS extends Flags
|
||||||
> = {
|
> = {
|
||||||
[P in keyof S]: CommandSignature<S[P], RESP, FLAGS>;
|
[P in keyof S]: CommandSignature<S[P], RESP, FLAGS>;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type RedisClientType<
|
export type RedisClientType<
|
||||||
M extends RedisModules = {},
|
M extends RedisModules = {},
|
||||||
@@ -121,14 +114,12 @@ export type RedisClientType<
|
|||||||
WithModules<M, RESP, FLAGS> &
|
WithModules<M, RESP, FLAGS> &
|
||||||
WithFunctions<F, RESP, FLAGS> &
|
WithFunctions<F, RESP, FLAGS> &
|
||||||
WithScripts<S, RESP, FLAGS>
|
WithScripts<S, RESP, FLAGS>
|
||||||
);
|
);
|
||||||
|
|
||||||
export interface ClientCommandOptions extends QueueCommandOptions {
|
export interface ClientCommandOptions extends QueueCommandOptions {
|
||||||
isolated?: boolean;
|
// isolated?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
// type ClientLegacyCallback = (err: Error | null, reply?: RedisCommandRawReply) => void;
|
|
||||||
|
|
||||||
type ProxyClient = RedisClient<{}, {}, {}, RespVersions, Flags> & { commandOptions?: ClientCommandOptions };
|
type ProxyClient = RedisClient<{}, {}, {}, RespVersions, Flags> & { commandOptions?: ClientCommandOptions };
|
||||||
|
|
||||||
type NamespaceProxyClient = { self: ProxyClient };
|
type NamespaceProxyClient = { self: ProxyClient };
|
||||||
@@ -148,7 +139,7 @@ export default class RedisClient<
|
|||||||
const transformReply = getTransformReply(command, resp);
|
const transformReply = getTransformReply(command, resp);
|
||||||
return async function (this: ProxyClient) {
|
return async function (this: ProxyClient) {
|
||||||
const args = command.transformArguments.apply(undefined, arguments as any),
|
const args = command.transformArguments.apply(undefined, arguments as any),
|
||||||
reply = await this._sendCommand(args, this.commandOptions);
|
reply = await this.sendCommand(args, this.commandOptions);
|
||||||
return transformReply ?
|
return transformReply ?
|
||||||
transformReply(reply, args.preserve) :
|
transformReply(reply, args.preserve) :
|
||||||
reply;
|
reply;
|
||||||
@@ -159,7 +150,7 @@ export default class RedisClient<
|
|||||||
const transformReply = getTransformReply(command, resp);
|
const transformReply = getTransformReply(command, resp);
|
||||||
return async function (this: NamespaceProxyClient) {
|
return async function (this: NamespaceProxyClient) {
|
||||||
const args = command.transformArguments.apply(undefined, arguments as any),
|
const args = command.transformArguments.apply(undefined, arguments as any),
|
||||||
reply = await this.self._sendCommand(args, this.self.commandOptions);
|
reply = await this.self.sendCommand(args, this.self.commandOptions);
|
||||||
return transformReply ?
|
return transformReply ?
|
||||||
transformReply(reply, args.preserve) :
|
transformReply(reply, args.preserve) :
|
||||||
reply;
|
reply;
|
||||||
@@ -171,7 +162,7 @@ export default class RedisClient<
|
|||||||
transformReply = getTransformReply(fn, resp);
|
transformReply = getTransformReply(fn, resp);
|
||||||
return async function (this: NamespaceProxyClient) {
|
return async function (this: NamespaceProxyClient) {
|
||||||
const fnArgs = fn.transformArguments.apply(undefined, arguments as any),
|
const fnArgs = fn.transformArguments.apply(undefined, arguments as any),
|
||||||
reply = await this.self._sendCommand(
|
reply = await this.self.sendCommand(
|
||||||
prefix.concat(fnArgs),
|
prefix.concat(fnArgs),
|
||||||
this.self.commandOptions
|
this.self.commandOptions
|
||||||
);
|
);
|
||||||
@@ -187,12 +178,12 @@ export default class RedisClient<
|
|||||||
return async function (this: ProxyClient) {
|
return async function (this: ProxyClient) {
|
||||||
const scriptArgs = script.transformArguments.apply(undefined, arguments as any),
|
const scriptArgs = script.transformArguments.apply(undefined, arguments as any),
|
||||||
args = prefix.concat(scriptArgs),
|
args = prefix.concat(scriptArgs),
|
||||||
reply = await this._sendCommand(args, this.commandOptions).catch((err: unknown) => {
|
reply = await this.sendCommand(args, this.commandOptions).catch((err: unknown) => {
|
||||||
if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err;
|
if (!(err as Error)?.message?.startsWith?.('NOSCRIPT')) throw err;
|
||||||
|
|
||||||
args[0] = 'EVAL';
|
args[0] = 'EVAL';
|
||||||
args[1] = script.SCRIPT;
|
args[1] = script.SCRIPT;
|
||||||
return this._sendCommand(args, this.commandOptions);
|
return this.sendCommand(args, this.commandOptions);
|
||||||
});
|
});
|
||||||
return transformReply ?
|
return transformReply ?
|
||||||
transformReply(reply, scriptArgs.preserve) :
|
transformReply(reply, scriptArgs.preserve) :
|
||||||
@@ -230,7 +221,7 @@ export default class RedisClient<
|
|||||||
F extends RedisFunctions = {},
|
F extends RedisFunctions = {},
|
||||||
S extends RedisScripts = {},
|
S extends RedisScripts = {},
|
||||||
RESP extends RespVersions = 2
|
RESP extends RespVersions = 2
|
||||||
>(options?: RedisClientOptions<M, F, S, RESP>) {
|
>(this: void, options?: RedisClientOptions<M, F, S, RESP>) {
|
||||||
return RedisClient.factory(options)(options);
|
return RedisClient.factory(options)(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -278,8 +269,6 @@ export default class RedisClient<
|
|||||||
private readonly _options?: RedisClientOptions<M, F, S, RESP>;
|
private readonly _options?: RedisClientOptions<M, F, S, RESP>;
|
||||||
private readonly _socket: RedisSocket;
|
private readonly _socket: RedisSocket;
|
||||||
private readonly _queue: RedisCommandsQueue;
|
private readonly _queue: RedisCommandsQueue;
|
||||||
private _isolationPool?: Pool<RedisClientType<M, F, S, RESP, FLAGS>>;
|
|
||||||
// readonly #v4: Record<string, any> = {};
|
|
||||||
private _selectedDB = 0;
|
private _selectedDB = 0;
|
||||||
|
|
||||||
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
|
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
|
||||||
@@ -298,20 +287,11 @@ export default class RedisClient<
|
|||||||
return this._queue.isPubSubActive;
|
return this._queue.isPubSubActive;
|
||||||
}
|
}
|
||||||
|
|
||||||
// get v4(): Record<string, any> {
|
|
||||||
// if (!this.client.#options?.legacyMode) {
|
|
||||||
// throw new Error('the client is not in "legacy mode"');
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return this.client.#v4;
|
|
||||||
// }
|
|
||||||
|
|
||||||
constructor(options?: RedisClientOptions<M, F, S, RESP>) {
|
constructor(options?: RedisClientOptions<M, F, S, RESP>) {
|
||||||
super();
|
super();
|
||||||
this._options = this._initiateOptions(options);
|
this._options = this._initiateOptions(options);
|
||||||
this._queue = this._initiateQueue();
|
this._queue = this._initiateQueue();
|
||||||
this._socket = this._initiateSocket();
|
this._socket = this._initiateSocket();
|
||||||
// this.#legacyMode();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private _initiateOptions(options?: RedisClientOptions<M, F, S, RESP>): RedisClientOptions<M, F, S, RESP> | undefined {
|
private _initiateOptions(options?: RedisClientOptions<M, F, S, RESP>): RedisClientOptions<M, F, S, RESP> | undefined {
|
||||||
@@ -353,12 +333,12 @@ export default class RedisClient<
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (this._options?.readonly) {
|
if (this._options?.readonly) {
|
||||||
// promises.push(
|
promises.push(
|
||||||
// this.#queue.addCommand(
|
this._queue.addCommand(
|
||||||
// COMMANDS.READONLY.transformArguments(),
|
COMMANDS.READONLY.transformArguments(),
|
||||||
// { asap: true }
|
{ asap: true }
|
||||||
// )
|
)
|
||||||
// );
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this._options?.RESP) {
|
if (this._options?.RESP) {
|
||||||
@@ -383,24 +363,24 @@ export default class RedisClient<
|
|||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
if (this._options?.name) {
|
if (this._options?.name) {
|
||||||
// promises.push(
|
promises.push(
|
||||||
// this.#queue.addCommand(
|
this._queue.addCommand(
|
||||||
// COMMANDS.CLIENT_SETNAME.transformArguments(this.#options.name),
|
COMMANDS.CLIENT_SETNAME.transformArguments(this._options.name),
|
||||||
// { asap: true }
|
{ asap: true }
|
||||||
// )
|
)
|
||||||
// );
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this._options?.username || this._options?.password) {
|
if (this._options?.username || this._options?.password) {
|
||||||
// promises.push(
|
promises.push(
|
||||||
// this.#queue.addCommand(
|
this._queue.addCommand(
|
||||||
// COMMANDS.AUTH.transformArguments({
|
COMMANDS.AUTH.transformArguments({
|
||||||
// username: this.#options.username,
|
username: this._options.username,
|
||||||
// password: this.#options.password ?? ''
|
password: this._options.password ?? ''
|
||||||
// }),
|
}),
|
||||||
// { asap: true }
|
{ asap: true }
|
||||||
// )
|
)
|
||||||
// );
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -436,66 +416,6 @@ export default class RedisClient<
|
|||||||
.on('end', () => this.emit('end'));
|
.on('end', () => this.emit('end'));
|
||||||
}
|
}
|
||||||
|
|
||||||
// #legacyMode(): void {
|
|
||||||
// if (!this.#options?.legacyMode) return;
|
|
||||||
|
|
||||||
// (this as any).#v4.sendCommand = this.#sendCommand.bind(this);
|
|
||||||
// (this as any).sendCommand = (...args: Array<any>): void => {
|
|
||||||
// const result = this.#legacySendCommand(...args);
|
|
||||||
// if (result) {
|
|
||||||
// result.promise
|
|
||||||
// .then(reply => result.callback(null, reply))
|
|
||||||
// .catch(err => result.callback(err));
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
|
|
||||||
// for (const [name, command] of Object.entries(COMMANDS)) {
|
|
||||||
// this.#defineLegacyCommand(name, command);
|
|
||||||
// (this as any)[name.toLowerCase()] ??= (this as any)[name];
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // hard coded commands
|
|
||||||
// this.#defineLegacyCommand('SELECT');
|
|
||||||
// this.#defineLegacyCommand('select');
|
|
||||||
// this.#defineLegacyCommand('SUBSCRIBE');
|
|
||||||
// this.#defineLegacyCommand('subscribe');
|
|
||||||
// this.#defineLegacyCommand('PSUBSCRIBE');
|
|
||||||
// this.#defineLegacyCommand('pSubscribe');
|
|
||||||
// this.#defineLegacyCommand('UNSUBSCRIBE');
|
|
||||||
// this.#defineLegacyCommand('unsubscribe');
|
|
||||||
// this.#defineLegacyCommand('PUNSUBSCRIBE');
|
|
||||||
// this.#defineLegacyCommand('pUnsubscribe');
|
|
||||||
// this.#defineLegacyCommand('QUIT');
|
|
||||||
// this.#defineLegacyCommand('quit');
|
|
||||||
// }
|
|
||||||
|
|
||||||
// #legacySendCommand(...args: Array<any>) {
|
|
||||||
// const callback = typeof args[args.length - 1] === 'function' ?
|
|
||||||
// args.pop() as ClientLegacyCallback :
|
|
||||||
// undefined;
|
|
||||||
|
|
||||||
// const promise = this.#sendCommand(transformLegacyCommandArguments(args));
|
|
||||||
// if (callback) return {
|
|
||||||
// promise,
|
|
||||||
// callback
|
|
||||||
// };
|
|
||||||
// promise.catch(err => this.emit('error', err));
|
|
||||||
// }
|
|
||||||
|
|
||||||
// #defineLegacyCommand(name: string, command?: RedisCommand): void {
|
|
||||||
// this.#v4[name] = (this as any)[name].bind(this);
|
|
||||||
// (this as any)[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ?
|
|
||||||
// (...args: Array<unknown>) => {
|
|
||||||
// const result = this.#legacySendCommand(name, ...args);
|
|
||||||
// if (result) {
|
|
||||||
// result.promise
|
|
||||||
// .then(reply => result.callback(null, command.transformReply!(reply)))
|
|
||||||
// .catch(err => result.callback(err));
|
|
||||||
// }
|
|
||||||
// } :
|
|
||||||
// (...args: Array<unknown>) => (this as any).sendCommand(name, ...args);
|
|
||||||
// }
|
|
||||||
|
|
||||||
private _pingTimer?: NodeJS.Timer;
|
private _pingTimer?: NodeJS.Timer;
|
||||||
|
|
||||||
private _setPingTimer(): void {
|
private _setPingTimer(): void {
|
||||||
@@ -505,8 +425,7 @@ export default class RedisClient<
|
|||||||
this._pingTimer = setTimeout(() => {
|
this._pingTimer = setTimeout(() => {
|
||||||
if (!this._socket.isReady) return;
|
if (!this._socket.isReady) return;
|
||||||
|
|
||||||
// using _sendCommand to support legacy mode
|
this.sendCommand(['PING'])
|
||||||
this._sendCommand(['PING'])
|
|
||||||
.then(reply => this.emit('ping-interval', reply))
|
.then(reply => this.emit('ping-interval', reply))
|
||||||
.catch(err => this.emit('error', err))
|
.catch(err => this.emit('error', err))
|
||||||
.finally(() => this._setPingTimer());
|
.finally(() => this._setPingTimer());
|
||||||
@@ -559,10 +478,21 @@ export default class RedisClient<
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Override the `isolated` command option to `true`
|
* Get the "legacy" (v3/callback) interface
|
||||||
*/
|
*/
|
||||||
isolated() {
|
legacy(): RedisLegacyClientType {
|
||||||
return this._commandOptionsProxy('isolated', true);
|
return new RedisLegacyClient(
|
||||||
|
this as unknown as RedisClientType<M, F, S>
|
||||||
|
) as RedisLegacyClientType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create `RedisClientPool` using this client as a prototype
|
||||||
|
*/
|
||||||
|
pool() {
|
||||||
|
return RedisClientPool.fromClient(
|
||||||
|
this as unknown as RedisClientType<M, F, S, RESP>
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
duplicate(overrides?: Partial<RedisClientOptions<M, F, S, RESP>>) {
|
duplicate(overrides?: Partial<RedisClientOptions<M, F, S, RESP>>) {
|
||||||
@@ -572,36 +502,16 @@ export default class RedisClient<
|
|||||||
}) as RedisClientType<M, F, S, RESP>;
|
}) as RedisClientType<M, F, S, RESP>;
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect(): Promise<void> {
|
connect() {
|
||||||
await this._socket.connect();
|
return this._socket.connect();
|
||||||
this.self._isolationPool = createPool({
|
|
||||||
create: async () => {
|
|
||||||
const duplicate = this.duplicate({
|
|
||||||
isolationPoolOptions: undefined
|
|
||||||
}).on('error', err => this.emit('error', err));
|
|
||||||
await duplicate.connect();
|
|
||||||
return duplicate;
|
|
||||||
},
|
|
||||||
destroy: client => client.disconnect()
|
|
||||||
}, this._options?.isolationPoolOptions);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sendCommand = this._sendCommand.bind(this);
|
sendCommand<T = ReplyUnion>(
|
||||||
|
|
||||||
// using `_` to avoid conflicts with the legacy mode
|
|
||||||
_sendCommand<T = ReplyUnion>(
|
|
||||||
args: CommandArguments,
|
args: CommandArguments,
|
||||||
options?: ClientCommandOptions
|
options?: ClientCommandOptions
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
if (!this._socket.isOpen) {
|
if (!this._socket.isOpen) {
|
||||||
return Promise.reject(new ClientClosedError());
|
return Promise.reject(new ClientClosedError());
|
||||||
} else if (options?.isolated) {
|
|
||||||
return this.executeIsolated(isolatedClient =>
|
|
||||||
isolatedClient.sendCommand(args, {
|
|
||||||
...options,
|
|
||||||
isolated: false
|
|
||||||
})
|
|
||||||
);
|
|
||||||
} else if (!this._socket.isReady && this._options?.disableOfflineQueue) {
|
} else if (!this._socket.isReady && this._options?.disableOfflineQueue) {
|
||||||
return Promise.reject(new ClientOfflineError());
|
return Promise.reject(new ClientOfflineError());
|
||||||
}
|
}
|
||||||
@@ -612,7 +522,7 @@ export default class RedisClient<
|
|||||||
}
|
}
|
||||||
|
|
||||||
async SELECT(db: number): Promise<void> {
|
async SELECT(db: number): Promise<void> {
|
||||||
await this._sendCommand(['SELECT', db.toString()]);
|
await this.sendCommand(['SELECT', db.toString()]);
|
||||||
this._selectedDB = db;
|
this._selectedDB = db;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -630,7 +540,6 @@ export default class RedisClient<
|
|||||||
listener: PubSubListener<T>,
|
listener: PubSubListener<T>,
|
||||||
bufferMode?: T
|
bufferMode?: T
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
console.log('SUBSCRIBE', channels, listener, bufferMode, this._options?.RESP);
|
|
||||||
return this._pubSubCommand(
|
return this._pubSubCommand(
|
||||||
this._queue.subscribe(
|
this._queue.subscribe(
|
||||||
PubSubType.CHANNELS,
|
PubSubType.CHANNELS,
|
||||||
@@ -658,7 +567,7 @@ export default class RedisClient<
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
unsubscribe = this.UNSUBSCRIBE;
|
unsubscribe = this.UNSUBSCRIBE
|
||||||
|
|
||||||
PSUBSCRIBE<T extends boolean = false>(
|
PSUBSCRIBE<T extends boolean = false>(
|
||||||
patterns: string | Array<string>,
|
patterns: string | Array<string>,
|
||||||
@@ -752,17 +661,13 @@ export default class RedisClient<
|
|||||||
return this._socket.quit(async () => {
|
return this._socket.quit(async () => {
|
||||||
const quitPromise = this._queue.addCommand<string>(['QUIT']);
|
const quitPromise = this._queue.addCommand<string>(['QUIT']);
|
||||||
this._tick();
|
this._tick();
|
||||||
const [reply] = await Promise.all([
|
return quitPromise;
|
||||||
quitPromise,
|
|
||||||
this._destroyIsolationPool()
|
|
||||||
]);
|
|
||||||
return reply;
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
quit = this.QUIT;
|
quit = this.QUIT;
|
||||||
|
|
||||||
_tick(force = false): void {
|
private _tick(force = false): void {
|
||||||
if (this._socket.writableNeedDrain || (!force && !this._socket.isReady)) {
|
if (this._socket.writableNeedDrain || (!force && !this._socket.isReady)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -777,12 +682,6 @@ export default class RedisClient<
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
executeIsolated<T>(fn: (client: RedisClientType<M, F, S, RESP, FLAGS>) => T | Promise<T>): Promise<T> {
|
|
||||||
return this._isolationPool ?
|
|
||||||
this._isolationPool.use(fn) :
|
|
||||||
Promise.reject(new ClientClosedError());
|
|
||||||
}
|
|
||||||
|
|
||||||
private _addMultiCommands(
|
private _addMultiCommands(
|
||||||
commands: Array<RedisMultiQueuedCommand>,
|
commands: Array<RedisMultiQueuedCommand>,
|
||||||
chainId?: symbol,
|
chainId?: symbol,
|
||||||
@@ -827,7 +726,6 @@ export default class RedisClient<
|
|||||||
|
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
// self.#options?.legacyMode
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -884,23 +782,16 @@ export default class RedisClient<
|
|||||||
} while (cursor !== 0);
|
} while (cursor !== 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
async disconnect(): Promise<void> {
|
disconnect() {
|
||||||
this._queue.flushAll(new DisconnectsClientError());
|
this._queue.flushAll(new DisconnectsClientError());
|
||||||
this._socket.disconnect();
|
this._socket.disconnect();
|
||||||
await this._destroyIsolationPool();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async _destroyIsolationPool(): Promise<void> {
|
ref() {
|
||||||
await this._isolationPool!.drain();
|
|
||||||
await this._isolationPool!.clear();
|
|
||||||
this.self._isolationPool = undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
ref(): void {
|
|
||||||
this._socket.ref();
|
this._socket.ref();
|
||||||
}
|
}
|
||||||
|
|
||||||
unref(): void {
|
unref() {
|
||||||
this._socket.unref();
|
this._socket.unref();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
107
packages/client/lib/client/legacy-mode.ts
Normal file
107
packages/client/lib/client/legacy-mode.ts
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
import { RedisModules, RedisFunctions, RedisScripts, RespVersions, Flags, Command, CommandArguments, ReplyUnion } from '../RESP/types';
|
||||||
|
import { RedisClientType } from '.';
|
||||||
|
import { getTransformReply } from '../commander';
|
||||||
|
import { ErrorReply } from '../errors';
|
||||||
|
import COMMANDS from '../commands';
|
||||||
|
|
||||||
|
type LegacyArgument = string | Buffer | number | Date;
|
||||||
|
|
||||||
|
type LegacyArguments = Array<LegacyArgument | LegacyArguments>;
|
||||||
|
|
||||||
|
type LegacyCallback = (err: ErrorReply | null, reply?: ReplyUnion) => unknown
|
||||||
|
|
||||||
|
type LegacyCommandArguments = LegacyArguments | [
|
||||||
|
...args: LegacyArguments,
|
||||||
|
callback: LegacyCallback
|
||||||
|
];
|
||||||
|
|
||||||
|
export type CommandSignature = (...args: LegacyCommandArguments) => void;
|
||||||
|
|
||||||
|
type WithCommands = {
|
||||||
|
[P in keyof typeof COMMANDS]: CommandSignature;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type RedisLegacyClientType = RedisLegacyClient & WithCommands;
|
||||||
|
|
||||||
|
export class RedisLegacyClient {
|
||||||
|
private static _transformArguments(redisArgs: CommandArguments, args: LegacyCommandArguments) {
|
||||||
|
let callback: LegacyCallback | undefined;
|
||||||
|
if (typeof args[args.length - 1] === 'function') {
|
||||||
|
callback = args.pop() as LegacyCallback;
|
||||||
|
}
|
||||||
|
|
||||||
|
RedisLegacyClient._pushArguments(redisArgs, args as LegacyArguments);
|
||||||
|
|
||||||
|
return callback;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static _pushArguments(redisArgs: CommandArguments, args: LegacyArguments) {
|
||||||
|
for (let i = 0; i < args.length; ++i) {
|
||||||
|
const arg = args[i];
|
||||||
|
if (Array.isArray(arg)) {
|
||||||
|
RedisLegacyClient._pushArguments(redisArgs, arg);
|
||||||
|
} else {
|
||||||
|
redisArgs.push(
|
||||||
|
typeof arg === 'number' || arg instanceof Date ?
|
||||||
|
arg.toString() :
|
||||||
|
arg
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static _getTransformReply(command: Command, resp: RespVersions) {
|
||||||
|
return command.TRANSFORM_LEGACY_REPLY ?
|
||||||
|
getTransformReply(command, resp) :
|
||||||
|
undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static _createCommand(name: string, command: Command, resp: RespVersions) {
|
||||||
|
const transformReply = RedisLegacyClient._getTransformReply(command, resp);
|
||||||
|
return async function (this: RedisLegacyClient, ...args: LegacyCommandArguments) {
|
||||||
|
const redisArgs = [name],
|
||||||
|
callback = RedisLegacyClient._transformArguments(redisArgs, args),
|
||||||
|
promise = this._client.sendCommand(redisArgs);
|
||||||
|
|
||||||
|
if (!callback) {
|
||||||
|
promise.catch(err => this._client.emit('error', err));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
promise
|
||||||
|
.then(reply => callback(null, transformReply ? transformReply(reply) : reply))
|
||||||
|
.catch(err => callback(err));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private _client: RedisClientType<RedisModules, RedisFunctions, RedisScripts>
|
||||||
|
) {
|
||||||
|
const RESP = _client.options?.RESP ?? 2;
|
||||||
|
for (const [name, command] of Object.entries(COMMANDS)) {
|
||||||
|
// TODO: as any?
|
||||||
|
(this as any)[name] = RedisLegacyClient._createCommand(
|
||||||
|
name,
|
||||||
|
command,
|
||||||
|
RESP
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Multi
|
||||||
|
}
|
||||||
|
|
||||||
|
sendCommand(...args: LegacyArguments) {
|
||||||
|
const redisArgs: CommandArguments = [],
|
||||||
|
callback = RedisLegacyClient._transformArguments(redisArgs, args),
|
||||||
|
promise = this._client.sendCommand(redisArgs);
|
||||||
|
|
||||||
|
if (!callback) {
|
||||||
|
promise.catch(err => this._client.emit('error', err));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
promise
|
||||||
|
.then(reply => callback(null, reply))
|
||||||
|
.catch(err => callback(err));
|
||||||
|
}
|
||||||
|
}
|
84
packages/client/lib/client/pool.ts
Normal file
84
packages/client/lib/client/pool.ts
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import { Pool, Options as PoolOptions, createPool } from 'generic-pool';
|
||||||
|
import { RedisFunctions, RedisModules, RedisScripts, RespVersions } from '../RESP/types';
|
||||||
|
import RedisClient, { RedisClientType, RedisClientOptions } from '.';
|
||||||
|
import { EventEmitter } from 'events';
|
||||||
|
|
||||||
|
type RedisClientPoolOptions<
|
||||||
|
M extends RedisModules,
|
||||||
|
F extends RedisFunctions,
|
||||||
|
S extends RedisScripts,
|
||||||
|
RESP extends RespVersions
|
||||||
|
> = RedisClientOptions<M, F, S, RESP> & PoolOptions;
|
||||||
|
|
||||||
|
export class RedisClientPool<
|
||||||
|
M extends RedisModules,
|
||||||
|
F extends RedisFunctions,
|
||||||
|
S extends RedisScripts,
|
||||||
|
RESP extends RespVersions
|
||||||
|
> extends EventEmitter {
|
||||||
|
_pool: Pool<RedisClientType<M, F, S, RESP>>;
|
||||||
|
|
||||||
|
static fromClient<
|
||||||
|
M extends RedisModules,
|
||||||
|
F extends RedisFunctions,
|
||||||
|
S extends RedisScripts,
|
||||||
|
RESP extends RespVersions
|
||||||
|
>(
|
||||||
|
client: RedisClientType<M, F, S, RESP>,
|
||||||
|
poolOptions?: PoolOptions
|
||||||
|
) {
|
||||||
|
return new RedisClientPool<M, F, S, RESP>(
|
||||||
|
() => client.duplicate(),
|
||||||
|
poolOptions
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
static fromOptions<
|
||||||
|
M extends RedisModules,
|
||||||
|
F extends RedisFunctions,
|
||||||
|
S extends RedisScripts,
|
||||||
|
RESP extends RespVersions
|
||||||
|
>(
|
||||||
|
options: RedisClientPoolOptions<M, F, S, RESP>,
|
||||||
|
poolOptions?: PoolOptions
|
||||||
|
) {
|
||||||
|
return new RedisClientPool(
|
||||||
|
RedisClient.factory(options),
|
||||||
|
poolOptions
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
clientFactory: () => RedisClientType<M, F, S, RESP>,
|
||||||
|
options?: PoolOptions
|
||||||
|
) {
|
||||||
|
super();
|
||||||
|
|
||||||
|
this._pool = createPool({
|
||||||
|
create: async () => {
|
||||||
|
const client = clientFactory();
|
||||||
|
|
||||||
|
// TODO: more events?
|
||||||
|
client.on('error', (err: Error) => this.emit('error', err));
|
||||||
|
|
||||||
|
await client.connect();
|
||||||
|
|
||||||
|
return client;
|
||||||
|
},
|
||||||
|
// TODO: destroy has to return a Promise?!
|
||||||
|
destroy: async client => client.disconnect()
|
||||||
|
}, options);
|
||||||
|
}
|
||||||
|
|
||||||
|
execute<T>(fn: () => T): Promise<T> {
|
||||||
|
return this._pool.use(fn);
|
||||||
|
}
|
||||||
|
|
||||||
|
close() {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
disconnect() {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
}
|
@@ -1,4 +1,4 @@
|
|||||||
import { CommandArguments, RedisArgument, BlobStringReply, ArrayReply, Command } from '../RESP/types';
|
import { RedisArgument, CommandArguments, BlobStringReply, ArrayReply, Command } from '../RESP/types';
|
||||||
|
|
||||||
export interface ScanCommonOptions {
|
export interface ScanCommonOptions {
|
||||||
MATCH?: string;
|
MATCH?: string;
|
||||||
|
@@ -133,6 +133,7 @@ import PING from './PING';
|
|||||||
import PSETEX from './PSETEX';
|
import PSETEX from './PSETEX';
|
||||||
import PTTL from './PTTL';
|
import PTTL from './PTTL';
|
||||||
import RANDOMKEY from './RANDOMKEY';
|
import RANDOMKEY from './RANDOMKEY';
|
||||||
|
import READONLY from './READONLY';
|
||||||
import RENAME from './RENAME';
|
import RENAME from './RENAME';
|
||||||
import RENAMENX from './RENAMENX';
|
import RENAMENX from './RENAMENX';
|
||||||
import RPOP_COUNT from './RPOP_COUNT';
|
import RPOP_COUNT from './RPOP_COUNT';
|
||||||
@@ -487,6 +488,8 @@ export default {
|
|||||||
pTTL: PTTL,
|
pTTL: PTTL,
|
||||||
RANDOMKEY,
|
RANDOMKEY,
|
||||||
randomKey: RANDOMKEY,
|
randomKey: RANDOMKEY,
|
||||||
|
READONLY,
|
||||||
|
readonly: READONLY,
|
||||||
RENAME,
|
RENAME,
|
||||||
rename: RENAME,
|
rename: RENAME,
|
||||||
RENAMENX,
|
RENAMENX,
|
||||||
|
Reference in New Issue
Block a user