1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00
Files
node-redis/packages/test-utils/lib/dockers.ts
Leibale Eidelman 3b1bad2296 Add support for sharded PubSub (#2373)
* refactor pubsub, add support for sharded pub sub

* run tests in redis 7 only, fix PUBSUB SHARDCHANNELS test

* add some comments and fix some bugs

* PubSubType, not PubSubTypes 🤦‍♂️

* remove test.txt

* fix some bugs, add tests

* add some tests

* fix #2345 - allow PING in PubSub mode (remove client side validation)

* remove .only

* revert changes in cluster/index.ts

* fix tests minimum version

* handle server sunsubscribe

* add 'sharded-channel-moved' event to docs, improve the events section in the main README (fix #2302)

* exit "resubscribe" if pubsub not active

* Update commands-queue.ts

* Release client@1.5.0-rc.0

* WIP

* use `node:util` instead of `node:util/types` (to support node 14)

* run PubSub resharding test with Redis 7+

* fix inconsistency in live resharding test

* add some tests

* fix iterateAllNodes when starting from a replica

* fix iterateAllNodes random

* fix slotNodesIterator

* fix slotNodesIterator

* clear pubSubNode when node in use

* wait for all nodes cluster state to be ok before testing

* `cluster.minimizeConections` tests

* `client.reconnectStrategry = false | 0` tests

* sharded pubsub + cluster 🎉

* add minimum version to sharded pubsub tests

* add cluster sharded pubsub live reshard test, use stable dockers for tests, make sure to close pubsub clients when a node disconnects from the cluster

* fix "ssubscribe & sunsubscribe" test

* lock search docker to 2.4.9

* change numberOfMasters default to 2

* use edge for bloom

* add tests

* add back getMasters and getSlotMaster as deprecated functions

* add some tests

* fix reconnect strategy + docs

* sharded pubsub docs

* Update pub-sub.md

* some jsdoc, docs, cluster topology test

* clean pub-sub docs

Co-authored-by: Simon Prickett <simon@redislabs.com>

* reconnect startegy docs and bug fix

Co-authored-by: Simon Prickett <simon@redislabs.com>

* refine jsdoc and some docs

Co-authored-by: Simon Prickett <simon@redislabs.com>

* I'm stupid

* fix cluster topology test

* fix cluster topology test

* Update README.md

* Update clustering.md

* Update pub-sub.md

Co-authored-by: Simon Prickett <simon@redislabs.com>
2023-01-25 11:00:39 -05:00

261 lines
7.4 KiB
TypeScript

import { createConnection } from 'net';
import { once } from 'events';
import RedisClient from '@redis/client/dist/lib/client';
import { promiseTimeout } from '@redis/client/dist/lib/utils';
import { ClusterSlotsReply } from '@redis/client/dist/lib/commands/CLUSTER_SLOTS';
import * as path from 'path';
import { promisify } from 'util';
import { exec } from 'child_process';
const execAsync = promisify(exec);
interface ErrorWithCode extends Error {
code: string;
}
async function isPortAvailable(port: number): Promise<boolean> {
try {
const socket = createConnection({ port });
await once(socket, 'connect');
socket.end();
} catch (err) {
if (err instanceof Error && (err as ErrorWithCode).code === 'ECONNREFUSED') {
return true;
}
}
return false;
}
const portIterator = (async function*(): AsyncIterableIterator<number> {
for (let i = 6379; i < 65535; i++) {
if (await isPortAvailable(i)) {
yield i;
}
}
throw new Error('All ports are in use');
})();
export interface RedisServerDockerConfig {
image: string;
version: string;
}
export interface RedisServerDocker {
port: number;
dockerId: string;
}
// ".." cause it'll be in `./dist`
const DOCKER_FODLER_PATH = path.join(__dirname, '../docker');
async function spawnRedisServerDocker({ image, version }: RedisServerDockerConfig, serverArguments: Array<string>): Promise<RedisServerDocker> {
const port = (await portIterator.next()).value,
{ stdout, stderr } = await execAsync(
'docker run -d --network host $(' +
`docker build ${DOCKER_FODLER_PATH} -q ` +
`--build-arg IMAGE=${image}:${version} ` +
`--build-arg REDIS_ARGUMENTS="--save '' --port ${port.toString()} ${serverArguments.join(' ')}"` +
')'
);
if (!stdout) {
throw new Error(`docker run error - ${stderr}`);
}
while (await isPortAvailable(port)) {
await promiseTimeout(50);
}
return {
port,
dockerId: stdout.trim()
};
}
const RUNNING_SERVERS = new Map<Array<string>, ReturnType<typeof spawnRedisServerDocker>>();
export function spawnRedisServer(dockerConfig: RedisServerDockerConfig, serverArguments: Array<string>): Promise<RedisServerDocker> {
const runningServer = RUNNING_SERVERS.get(serverArguments);
if (runningServer) {
return runningServer;
}
const dockerPromise = spawnRedisServerDocker(dockerConfig, serverArguments);
RUNNING_SERVERS.set(serverArguments, dockerPromise);
return dockerPromise;
}
async function dockerRemove(dockerId: string): Promise<void> {
const { stderr } = await execAsync(`docker rm -f ${dockerId}`);
if (stderr) {
throw new Error(`docker rm error - ${stderr}`);
}
}
after(() => {
return Promise.all(
[...RUNNING_SERVERS.values()].map(async dockerPromise =>
await dockerRemove((await dockerPromise).dockerId)
)
);
});
export interface RedisClusterDockersConfig extends RedisServerDockerConfig {
numberOfMasters?: number;
numberOfReplicas?: number;
}
async function spawnRedisClusterNodeDockers(
dockersConfig: RedisClusterDockersConfig,
serverArguments: Array<string>,
fromSlot: number,
toSlot: number
) {
const range: Array<number> = [];
for (let i = fromSlot; i < toSlot; i++) {
range.push(i);
}
const master = await spawnRedisClusterNodeDocker(
dockersConfig,
serverArguments
);
await master.client.clusterAddSlots(range);
if (!dockersConfig.numberOfReplicas) return [master];
const replicasPromises: Array<ReturnType<typeof spawnRedisClusterNodeDocker>> = [];
for (let i = 0; i < (dockersConfig.numberOfReplicas ?? 0); i++) {
replicasPromises.push(
spawnRedisClusterNodeDocker(dockersConfig, [
...serverArguments,
'--cluster-enabled',
'yes',
'--cluster-node-timeout',
'5000'
]).then(async replica => {
await replica.client.clusterMeet('127.0.0.1', master.docker.port);
while ((await replica.client.clusterSlots()).length === 0) {
await promiseTimeout(50);
}
await replica.client.clusterReplicate(
await master.client.clusterMyId()
);
return replica;
})
);
}
return [
master,
...await Promise.all(replicasPromises)
];
}
async function spawnRedisClusterNodeDocker(
dockersConfig: RedisClusterDockersConfig,
serverArguments: Array<string>
) {
const docker = await spawnRedisServerDocker(dockersConfig, [
...serverArguments,
'--cluster-enabled',
'yes',
'--cluster-node-timeout',
'5000'
]),
client = RedisClient.create({
socket: {
port: docker.port
}
});
await client.connect();
return {
docker,
client
};
}
const SLOTS = 16384;
async function spawnRedisClusterDockers(
dockersConfig: RedisClusterDockersConfig,
serverArguments: Array<string>
): Promise<Array<RedisServerDocker>> {
const numberOfMasters = dockersConfig.numberOfMasters ?? 2,
slotsPerNode = Math.floor(SLOTS / numberOfMasters),
spawnPromises: Array<ReturnType<typeof spawnRedisClusterNodeDockers>> = [];
for (let i = 0; i < numberOfMasters; i++) {
const fromSlot = i * slotsPerNode,
toSlot = i === numberOfMasters - 1 ? SLOTS : fromSlot + slotsPerNode;
spawnPromises.push(
spawnRedisClusterNodeDockers(
dockersConfig,
serverArguments,
fromSlot,
toSlot
)
);
}
const nodes = (await Promise.all(spawnPromises)).flat(),
meetPromises: Array<Promise<unknown>> = [];
for (let i = 1; i < nodes.length; i++) {
meetPromises.push(
nodes[i].client.clusterMeet('127.0.0.1', nodes[0].docker.port)
);
}
await Promise.all(meetPromises);
await Promise.all(
nodes.map(async ({ client }) => {
while (totalNodes(await client.clusterSlots()) !== nodes.length) {
await promiseTimeout(50);
}
return client.disconnect();
})
);
return nodes.map(({ docker }) => docker);
}
function totalNodes(slots: ClusterSlotsReply) {
let total = slots.length;
for (const slot of slots) {
total += slot.replicas.length;
}
return total;
}
const RUNNING_CLUSTERS = new Map<Array<string>, ReturnType<typeof spawnRedisClusterDockers>>();
export function spawnRedisCluster(dockersConfig: RedisClusterDockersConfig, serverArguments: Array<string>): Promise<Array<RedisServerDocker>> {
const runningCluster = RUNNING_CLUSTERS.get(serverArguments);
if (runningCluster) {
return runningCluster;
}
const dockersPromise = spawnRedisClusterDockers(dockersConfig, serverArguments);
RUNNING_CLUSTERS.set(serverArguments, dockersPromise);
return dockersPromise;
}
after(() => {
return Promise.all(
[...RUNNING_CLUSTERS.values()].map(async dockersPromise => {
return Promise.all(
(await dockersPromise).map(({ dockerId }) => dockerRemove(dockerId))
);
})
);
});