You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Add support for redis functions (#2020)
* fix #1906 - implement BITFIELD_RO * initial support for redis functions * fix test utils * redis functions commands and tests * upgrade deps * fix "Property 'uninstall' does not exist on type 'SinonFakeTimers'" * upgrade dockers version * Merge branch 'master' of github.com:redis/node-redis into functions * fix FUNCTION LIST WITHCODE and FUNCTION STATS * upgrade deps * set minimum version for FCALL and FCALL_RO * fix FUNCTION LOAD * FUNCTION LOAD * fix FUNCTION LOAD & FUNCTION LIST & FUNCTION LOAD WITHCODE * fix FUNCTION_LIST_WITHCODE test
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import RedisClient, { InstantiableRedisClient, RedisClientType } from '../client';
|
||||
import { RedisClusterMasterNode, RedisClusterReplicaNode } from '../commands/CLUSTER_NODES';
|
||||
import { RedisClusterClientOptions, RedisClusterOptions } from '.';
|
||||
import { RedisCommandArgument, RedisModules, RedisScripts } from '../commands';
|
||||
import { RedisCommandArgument, RedisFunctions, RedisModules, RedisScripts } from '../commands';
|
||||
import { RootNodesUnavailableError } from '../errors';
|
||||
|
||||
// We need to use 'require', because it's not possible with Typescript to import
|
||||
@@ -9,9 +9,13 @@ import { RootNodesUnavailableError } from '../errors';
|
||||
// set to true.
|
||||
const calculateSlot = require('cluster-key-slot');
|
||||
|
||||
export interface ClusterNode<M extends RedisModules, S extends RedisScripts> {
|
||||
export interface ClusterNode<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> {
|
||||
id: string;
|
||||
client: RedisClientType<M, S>;
|
||||
client: RedisClientType<M, F, S>;
|
||||
}
|
||||
|
||||
interface NodeAddress {
|
||||
@@ -23,22 +27,30 @@ export type NodeAddressMap = {
|
||||
[address: string]: NodeAddress;
|
||||
} | ((address: string) => NodeAddress | undefined);
|
||||
|
||||
interface SlotNodes<M extends RedisModules, S extends RedisScripts> {
|
||||
master: ClusterNode<M, S>;
|
||||
replicas: Array<ClusterNode<M, S>>;
|
||||
clientIterator: IterableIterator<RedisClientType<M, S>> | undefined;
|
||||
interface SlotNodes<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> {
|
||||
master: ClusterNode<M, F, S>;
|
||||
replicas: Array<ClusterNode<M, F, S>>;
|
||||
clientIterator: IterableIterator<RedisClientType<M, F, S>> | undefined;
|
||||
}
|
||||
|
||||
type OnError = (err: unknown) => void;
|
||||
|
||||
export default class RedisClusterSlots<M extends RedisModules, S extends RedisScripts> {
|
||||
readonly #options: RedisClusterOptions<M, S>;
|
||||
readonly #Client: InstantiableRedisClient<M, S>;
|
||||
export default class RedisClusterSlots<
|
||||
M extends RedisModules,
|
||||
F extends RedisFunctions,
|
||||
S extends RedisScripts
|
||||
> {
|
||||
readonly #options: RedisClusterOptions<M, F, S>;
|
||||
readonly #Client: InstantiableRedisClient<M, F, S>;
|
||||
readonly #onError: OnError;
|
||||
readonly #nodeByAddress = new Map<string, ClusterNode<M, S>>();
|
||||
readonly #slots: Array<SlotNodes<M, S>> = [];
|
||||
readonly #nodeByAddress = new Map<string, ClusterNode<M, F, S>>();
|
||||
readonly #slots: Array<SlotNodes<M, F, S>> = [];
|
||||
|
||||
constructor(options: RedisClusterOptions<M, S>, onError: OnError) {
|
||||
constructor(options: RedisClusterOptions<M, F, S>, onError: OnError) {
|
||||
this.#options = options;
|
||||
this.#Client = RedisClient.extend(options);
|
||||
this.#onError = onError;
|
||||
@@ -72,7 +84,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
|
||||
#runningRediscoverPromise?: Promise<void>;
|
||||
|
||||
async rediscover(startWith: RedisClientType<M, S>): Promise<void> {
|
||||
async rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
|
||||
if (!this.#runningRediscoverPromise) {
|
||||
this.#runningRediscoverPromise = this.#rediscover(startWith)
|
||||
.finally(() => this.#runningRediscoverPromise = undefined);
|
||||
@@ -81,7 +93,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
return this.#runningRediscoverPromise;
|
||||
}
|
||||
|
||||
async #rediscover(startWith: RedisClientType<M, S>): Promise<void> {
|
||||
async #rediscover(startWith: RedisClientType<M, F, S>): Promise<void> {
|
||||
if (await this.#discoverNodes(startWith.options)) return;
|
||||
|
||||
for (const { client } of this.#nodeByAddress.values()) {
|
||||
@@ -137,7 +149,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
};
|
||||
}
|
||||
|
||||
#initiateClient(options?: RedisClusterClientOptions): RedisClientType<M, S> {
|
||||
#initiateClient(options?: RedisClusterClientOptions): RedisClientType<M, F, S> {
|
||||
return new this.#Client(this.#clientOptionsDefaults(options))
|
||||
.on('error', this.#onError);
|
||||
}
|
||||
@@ -152,7 +164,12 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
}
|
||||
}
|
||||
|
||||
#initiateClientForNode(nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, readonly: boolean, clientsInUse: Set<string>, promises: Array<Promise<void>>): ClusterNode<M, S> {
|
||||
#initiateClientForNode(
|
||||
nodeData: RedisClusterMasterNode | RedisClusterReplicaNode,
|
||||
readonly: boolean,
|
||||
clientsInUse: Set<string>,
|
||||
promises: Array<Promise<void>>
|
||||
): ClusterNode<M, F, S> {
|
||||
const address = `${nodeData.host}:${nodeData.port}`;
|
||||
clientsInUse.add(address);
|
||||
|
||||
@@ -175,11 +192,11 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
return node;
|
||||
}
|
||||
|
||||
getSlotMaster(slot: number): ClusterNode<M, S> {
|
||||
getSlotMaster(slot: number): ClusterNode<M, F, S> {
|
||||
return this.#slots[slot].master;
|
||||
}
|
||||
|
||||
*#slotClientIterator(slotNumber: number): IterableIterator<RedisClientType<M, S>> {
|
||||
*#slotClientIterator(slotNumber: number): IterableIterator<RedisClientType<M, F, S>> {
|
||||
const slot = this.#slots[slotNumber];
|
||||
yield slot.master.client;
|
||||
|
||||
@@ -188,7 +205,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
}
|
||||
}
|
||||
|
||||
#getSlotClient(slotNumber: number): RedisClientType<M, S> {
|
||||
#getSlotClient(slotNumber: number): RedisClientType<M, F, S> {
|
||||
const slot = this.#slots[slotNumber];
|
||||
if (!slot.clientIterator) {
|
||||
slot.clientIterator = this.#slotClientIterator(slotNumber);
|
||||
@@ -203,9 +220,9 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
return value;
|
||||
}
|
||||
|
||||
#randomClientIterator?: IterableIterator<ClusterNode<M, S>>;
|
||||
#randomClientIterator?: IterableIterator<ClusterNode<M, F, S>>;
|
||||
|
||||
#getRandomClient(): RedisClientType<M, S> {
|
||||
#getRandomClient(): RedisClientType<M, F, S> {
|
||||
if (!this.#nodeByAddress.size) {
|
||||
throw new Error('Cluster is not connected');
|
||||
}
|
||||
@@ -223,7 +240,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
return value.client;
|
||||
}
|
||||
|
||||
getClient(firstKey?: RedisCommandArgument, isReadonly?: boolean): RedisClientType<M, S> {
|
||||
getClient(firstKey?: RedisCommandArgument, isReadonly?: boolean): RedisClientType<M, F, S> {
|
||||
if (!firstKey) {
|
||||
return this.#getRandomClient();
|
||||
}
|
||||
@@ -236,7 +253,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
return this.#getSlotClient(slot);
|
||||
}
|
||||
|
||||
getMasters(): Array<ClusterNode<M, S>> {
|
||||
getMasters(): Array<ClusterNode<M, F, S>> {
|
||||
const masters = [];
|
||||
for (const node of this.#nodeByAddress.values()) {
|
||||
if (node.client.options?.readonly) continue;
|
||||
@@ -247,7 +264,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
return masters;
|
||||
}
|
||||
|
||||
getNodeByAddress(address: string): ClusterNode<M, S> | undefined {
|
||||
getNodeByAddress(address: string): ClusterNode<M, F, S> | undefined {
|
||||
const mappedAddress = this.#getNodeAddress(address);
|
||||
return this.#nodeByAddress.get(
|
||||
mappedAddress ? `${mappedAddress.host}:${mappedAddress.port}` : address
|
||||
@@ -262,7 +279,7 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisSc
|
||||
return this.#destroy(client => client.disconnect());
|
||||
}
|
||||
|
||||
async #destroy(fn: (client: RedisClientType<M, S>) => Promise<unknown>): Promise<void> {
|
||||
async #destroy(fn: (client: RedisClientType<M, F, S>) => Promise<unknown>): Promise<void> {
|
||||
const promises = [];
|
||||
for (const { client } of this.#nodeByAddress.values()) {
|
||||
promises.push(fn(client));
|
||||
|
Reference in New Issue
Block a user