You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
611 lines
18 KiB
TypeScript
611 lines
18 KiB
TypeScript
import { createConnection, Socket } from 'node:net';
|
|
import { setTimeout } from 'node:timers/promises';
|
|
import { once } from 'node:events';
|
|
import { promisify } from 'node:util';
|
|
import { exec } from 'node:child_process';
|
|
import { RedisSentinelOptions, RedisSentinelType } from './types';
|
|
import RedisClient from '../client';
|
|
import RedisSentinel from '.';
|
|
import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
|
|
const execAsync = promisify(exec);
|
|
import RedisSentinelModule from './module'
|
|
|
|
interface ErrorWithCode extends Error {
|
|
code: string;
|
|
}
|
|
|
|
async function isPortAvailable(port: number): Promise<boolean> {
|
|
var socket: Socket | undefined = undefined;
|
|
try {
|
|
socket = createConnection({ port });
|
|
await once(socket, 'connect');
|
|
} catch (err) {
|
|
if (err instanceof Error && (err as ErrorWithCode).code === 'ECONNREFUSED') {
|
|
return true;
|
|
}
|
|
} finally {
|
|
if (socket !== undefined) {
|
|
socket.end();
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
const portIterator = (async function* (): AsyncIterableIterator<number> {
|
|
for (let i = 6379; i < 65535; i++) {
|
|
if (await isPortAvailable(i)) {
|
|
yield i;
|
|
}
|
|
}
|
|
|
|
throw new Error('All ports are in use');
|
|
})();
|
|
|
|
export interface RedisServerDockerConfig {
|
|
image: string;
|
|
version: string;
|
|
}
|
|
|
|
export interface RedisServerDocker {
|
|
port: number;
|
|
dockerId: string;
|
|
}
|
|
|
|
abstract class DockerBase {
|
|
async spawnRedisServerDocker({ image, version }: RedisServerDockerConfig, serverArguments: Array<string>, environment?: string): Promise<RedisServerDocker> {
|
|
const port = (await portIterator.next()).value;
|
|
let cmdLine = `docker run --init -d --network host `;
|
|
if (environment !== undefined) {
|
|
cmdLine += `-e ${environment} `;
|
|
}
|
|
cmdLine += `${image}:${version} ${serverArguments.join(' ')}`;
|
|
cmdLine = cmdLine.replace('{port}', `--port ${port.toString()}`);
|
|
// console.log("spawnRedisServerDocker: cmdLine = " + cmdLine);
|
|
const { stdout, stderr } = await execAsync(cmdLine);
|
|
|
|
if (!stdout) {
|
|
throw new Error(`docker run error - ${stderr}`);
|
|
}
|
|
|
|
while (await isPortAvailable(port)) {
|
|
await setTimeout(50);
|
|
}
|
|
|
|
return {
|
|
port,
|
|
dockerId: stdout.trim()
|
|
};
|
|
}
|
|
|
|
async dockerRemove(dockerId: string): Promise<void> {
|
|
try {
|
|
await this.dockerStop(dockerId); ``
|
|
} catch (err) {
|
|
// its ok if stop failed, as we are just going to remove, will just be slower
|
|
console.log(`dockerStop failed in remove: ${err}`);
|
|
}
|
|
|
|
const { stderr } = await execAsync(`docker rm -f ${dockerId}`);
|
|
if (stderr) {
|
|
console.log("docker rm failed");
|
|
throw new Error(`docker rm error - ${stderr}`);
|
|
}
|
|
}
|
|
|
|
async dockerStop(dockerId: string): Promise<void> {
|
|
/* this is an optimization to get around slow docker stop times, but will fail if container is already stopped */
|
|
try {
|
|
await execAsync(`docker exec ${dockerId} /bin/bash -c "kill -SIGINT 1"`);
|
|
} catch (err) {
|
|
/* this will fail if container is already not running, can be ignored */
|
|
}
|
|
|
|
let ret = await execAsync(`docker stop ${dockerId}`);
|
|
if (ret.stderr) {
|
|
throw new Error(`docker stop error - ${ret.stderr}`);
|
|
}
|
|
}
|
|
|
|
async dockerStart(dockerId: string): Promise<void> {
|
|
const { stderr } = await execAsync(`docker start ${dockerId}`);
|
|
if (stderr) {
|
|
throw new Error(`docker start error - ${stderr}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
export interface RedisSentinelConfig {
|
|
numberOfNodes?: number;
|
|
nodeDockerConfig?: RedisServerDockerConfig;
|
|
nodeServerArguments?: Array<string>
|
|
|
|
numberOfSentinels?: number;
|
|
sentinelDockerConfig?: RedisServerDockerConfig;
|
|
sentinelServerArgument?: Array<string>
|
|
|
|
sentinelName: string;
|
|
sentinelQuorum?: number;
|
|
|
|
password?: string;
|
|
}
|
|
|
|
type ArrayElement<ArrayType extends readonly unknown[]> =
|
|
ArrayType extends readonly (infer ElementType)[] ? ElementType : never;
|
|
|
|
export interface SentinelController {
|
|
getMaster(): Promise<string>;
|
|
getMasterPort(): Promise<number>;
|
|
getRandomNode(): string;
|
|
getRandonNonMasterNode(): Promise<string>;
|
|
getNodePort(id: string): number;
|
|
getAllNodesPort(): Array<number>;
|
|
getSentinelPort(id: string): number;
|
|
getAllSentinelsPort(): Array<number>;
|
|
getSetinel(i: number): string;
|
|
stopNode(id: string): Promise<void>;
|
|
restartNode(id: string): Promise<void>;
|
|
stopSentinel(id: string): Promise<void>;
|
|
restartSentinel(id: string): Promise<void>;
|
|
getSentinelClient(opts?: Partial<RedisSentinelOptions<{}, {}, {}, 2, {}>>): RedisSentinelType<{}, {}, {}, 2, {}>;
|
|
}
|
|
|
|
export class SentinelFramework extends DockerBase {
|
|
#nodeList: Awaited<ReturnType<SentinelFramework['spawnRedisSentinelNodes']>> = [];
|
|
/* port -> docker info/client */
|
|
#nodeMap: Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelNodes']>>>>;
|
|
#sentinelList: Awaited<ReturnType<SentinelFramework['spawnRedisSentinelSentinels']>> = [];
|
|
/* port -> docker info/client */
|
|
#sentinelMap: Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelSentinels']>>>>;
|
|
|
|
config: RedisSentinelConfig;
|
|
|
|
#spawned: boolean = false;
|
|
|
|
get spawned() {
|
|
return this.#spawned;
|
|
}
|
|
|
|
constructor(config: RedisSentinelConfig) {
|
|
super();
|
|
|
|
this.config = config;
|
|
|
|
this.#nodeMap = new Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelNodes']>>>>();
|
|
this.#sentinelMap = new Map<string, ArrayElement<Awaited<ReturnType<SentinelFramework['spawnRedisSentinelSentinels']>>>>();
|
|
}
|
|
|
|
getSentinelClient(opts?: Partial<RedisSentinelOptions<RedisModules,
|
|
RedisFunctions,
|
|
RedisScripts,
|
|
RespVersions,
|
|
TypeMapping>>, errors = true) {
|
|
// remove this safeguard
|
|
// we want to test the case when
|
|
// we cannot connect to sentinel
|
|
|
|
// if (opts?.sentinelRootNodes !== undefined) {
|
|
// throw new Error("cannot specify sentinelRootNodes here");
|
|
// }
|
|
|
|
if (opts?.name !== undefined) {
|
|
throw new Error("cannot specify sentinel db name here");
|
|
}
|
|
|
|
const options: RedisSentinelOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> = {
|
|
name: this.config.sentinelName,
|
|
sentinelRootNodes: this.#sentinelList.map((sentinel) => { return { host: '127.0.0.1', port: sentinel.docker.port } }),
|
|
passthroughClientErrorEvents: errors
|
|
}
|
|
|
|
if (this.config.password !== undefined) {
|
|
options.nodeClientOptions = {password: this.config.password};
|
|
options.sentinelClientOptions = {password: this.config.password};
|
|
}
|
|
|
|
if (opts) {
|
|
Object.assign(options, opts);
|
|
}
|
|
|
|
return RedisSentinel.create(options);
|
|
}
|
|
|
|
async spawnRedisSentinel() {
|
|
if (this.#spawned) {
|
|
return;
|
|
}
|
|
|
|
if (this.#nodeMap.size != 0 || this.#sentinelMap.size != 0) {
|
|
throw new Error("inconsistent state with partial setup");
|
|
}
|
|
|
|
this.#nodeList = await this.spawnRedisSentinelNodes();
|
|
this.#nodeList.map((value) => this.#nodeMap.set(value.docker.port.toString(), value));
|
|
|
|
this.#sentinelList = await this.spawnRedisSentinelSentinels();
|
|
this.#sentinelList.map((value) => this.#sentinelMap.set(value.docker.port.toString(), value));
|
|
|
|
this.#spawned = true;
|
|
}
|
|
|
|
async cleanup() {
|
|
if (!this.#spawned) {
|
|
return;
|
|
}
|
|
|
|
return Promise.all(
|
|
[...this.#nodeMap!.values(), ...this.#sentinelMap!.values()].map(
|
|
async ({ docker, client }) => {
|
|
if (client.isOpen) {
|
|
client.destroy();
|
|
}
|
|
this.dockerRemove(docker.dockerId);
|
|
}
|
|
)
|
|
).finally(async () => {
|
|
this.#spawned = false;
|
|
this.#nodeMap.clear();
|
|
this.#sentinelMap.clear();
|
|
});
|
|
}
|
|
|
|
protected async spawnRedisSentinelNodeDocker() {
|
|
const imageInfo: RedisServerDockerConfig = this.config.nodeDockerConfig ?? { image: "redis/redis-stack-server", version: "latest" };
|
|
const serverArguments: Array<string> = this.config.nodeServerArguments ?? [];
|
|
let environment;
|
|
if (this.config.password !== undefined) {
|
|
environment = `REDIS_ARGS="{port} --requirepass ${this.config.password}"`;
|
|
} else {
|
|
environment = 'REDIS_ARGS="{port}"';
|
|
}
|
|
|
|
const docker = await this.spawnRedisServerDocker(imageInfo, serverArguments, environment);
|
|
const client = await RedisClient.create({
|
|
password: this.config.password,
|
|
socket: {
|
|
port: docker.port
|
|
}
|
|
}).on("error", () => { }).connect();
|
|
|
|
return {
|
|
docker,
|
|
client
|
|
};
|
|
}
|
|
|
|
protected async spawnRedisSentinelNodes() {
|
|
const master = await this.spawnRedisSentinelNodeDocker();
|
|
|
|
const promises: Array<ReturnType<SentinelFramework['spawnRedisSentinelNodeDocker']>> = [];
|
|
|
|
for (let i = 0; i < (this.config.numberOfNodes ?? 0) - 1; i++) {
|
|
promises.push(
|
|
this.spawnRedisSentinelNodeDocker().then(async node => {
|
|
if (this.config.password !== undefined) {
|
|
await node.client.configSet({'masterauth': this.config.password})
|
|
}
|
|
await node.client.replicaOf('127.0.0.1', master.docker.port);
|
|
return node;
|
|
})
|
|
);
|
|
}
|
|
|
|
return [
|
|
master,
|
|
...await Promise.all(promises)
|
|
];
|
|
}
|
|
|
|
protected async spawnRedisSentinelSentinelDocker() {
|
|
const imageInfo: RedisServerDockerConfig = this.config.sentinelDockerConfig ?? { image: "redis", version: "latest" }
|
|
let serverArguments: Array<string>;
|
|
if (this.config.password === undefined) {
|
|
serverArguments = this.config.sentinelServerArgument ??
|
|
[
|
|
"/bin/bash",
|
|
"-c",
|
|
"\"touch /tmp/sentinel.conf ; /usr/local/bin/redis-sentinel /tmp/sentinel.conf {port} \""
|
|
];
|
|
} else {
|
|
serverArguments = this.config.sentinelServerArgument ??
|
|
[
|
|
"/bin/bash",
|
|
"-c",
|
|
`"touch /tmp/sentinel.conf ; /usr/local/bin/redis-sentinel /tmp/sentinel.conf {port} --requirepass ${this.config.password}"`
|
|
];
|
|
}
|
|
|
|
const docker = await this.spawnRedisServerDocker(imageInfo, serverArguments);
|
|
const client = await RedisClient.create({
|
|
modules: RedisSentinelModule,
|
|
password: this.config.password,
|
|
socket: {
|
|
port: docker.port
|
|
}
|
|
}).on("error", () => { }).connect();
|
|
|
|
return {
|
|
docker,
|
|
client
|
|
};
|
|
}
|
|
|
|
protected async spawnRedisSentinelSentinels() {
|
|
const quorum = this.config.sentinelQuorum?.toString() ?? "2";
|
|
const node = this.#nodeList[0];
|
|
|
|
const promises: Array<ReturnType<SentinelFramework['spawnRedisSentinelSentinelDocker']>> = [];
|
|
|
|
for (let i = 0; i < (this.config.numberOfSentinels ?? 3); i++) {
|
|
promises.push(
|
|
this.spawnRedisSentinelSentinelDocker().then(async sentinel => {
|
|
await sentinel.client.sentinel.sentinelMonitor(this.config.sentinelName, '127.0.0.1', node.docker.port.toString(), quorum);
|
|
const options: Array<{option: RedisArgument, value: RedisArgument}> = [];
|
|
options.push({ option: "down-after-milliseconds", value: "100" });
|
|
options.push({ option: "failover-timeout", value: "5000" });
|
|
if (this.config.password !== undefined) {
|
|
options.push({ option: "auth-pass", value: this.config.password });
|
|
}
|
|
await sentinel.client.sentinel.sentinelSet(this.config.sentinelName, options)
|
|
return sentinel;
|
|
})
|
|
);
|
|
}
|
|
|
|
return [
|
|
...await Promise.all(promises)
|
|
]
|
|
}
|
|
|
|
async getAllRunning() {
|
|
for (const port of this.getAllNodesPort()) {
|
|
let first = true;
|
|
while (await isPortAvailable(port)) {
|
|
if (!first) {
|
|
console.log(`problematic restart ${port}`);
|
|
await setTimeout(500);
|
|
} else {
|
|
first = false;
|
|
}
|
|
await this.restartNode(port.toString());
|
|
}
|
|
}
|
|
|
|
for (const port of this.getAllSentinelsPort()) {
|
|
let first = true;
|
|
while (await isPortAvailable(port)) {
|
|
if (!first) {
|
|
await setTimeout(500);
|
|
} else {
|
|
first = false;
|
|
}
|
|
await this.restartSentinel(port.toString());
|
|
}
|
|
}
|
|
}
|
|
|
|
async addSentinel() {
|
|
const quorum = this.config.sentinelQuorum?.toString() ?? "2";
|
|
const node = this.#nodeList[0];
|
|
const sentinel = await this.spawnRedisSentinelSentinelDocker();
|
|
|
|
await sentinel.client.sentinel.sentinelMonitor(this.config.sentinelName, '127.0.0.1', node.docker.port.toString(), quorum);
|
|
const options: Array<{option: RedisArgument, value: RedisArgument}> = [];
|
|
options.push({ option: "down-after-milliseconds", value: "100" });
|
|
options.push({ option: "failover-timeout", value: "5000" });
|
|
if (this.config.password !== undefined) {
|
|
options.push({ option: "auth-pass", value: this.config.password });
|
|
}
|
|
await sentinel.client.sentinel.sentinelSet(this.config.sentinelName, options);
|
|
|
|
this.#sentinelList.push(sentinel);
|
|
this.#sentinelMap.set(sentinel.docker.port.toString(), sentinel);
|
|
}
|
|
|
|
async addNode() {
|
|
const masterPort = await this.getMasterPort();
|
|
const newNode = await this.spawnRedisSentinelNodeDocker();
|
|
|
|
if (this.config.password !== undefined) {
|
|
await newNode.client.configSet({'masterauth': this.config.password})
|
|
}
|
|
await newNode.client.replicaOf('127.0.0.1', masterPort);
|
|
|
|
this.#nodeList.push(newNode);
|
|
this.#nodeMap.set(newNode.docker.port.toString(), newNode);
|
|
}
|
|
|
|
async getMaster(tracer?: Array<string>): Promise<string | undefined> {
|
|
for (const sentinel of this.#sentinelMap!.values()) {
|
|
let info;
|
|
|
|
try {
|
|
if (!sentinel.client.isReady) {
|
|
continue;
|
|
}
|
|
|
|
info = await sentinel.client.sentinel.sentinelMaster(this.config.sentinelName);
|
|
if (tracer) {
|
|
tracer.push('getMaster: master data returned from sentinel');
|
|
tracer.push(JSON.stringify(info, undefined, '\t'))
|
|
}
|
|
} catch (err) {
|
|
console.log("getMaster: sentinelMaster call failed: " + err);
|
|
continue;
|
|
}
|
|
|
|
const master = this.#nodeMap.get(info.port);
|
|
if (master === undefined) {
|
|
throw new Error(`couldn't find master node for ${info.port}`);
|
|
}
|
|
|
|
if (tracer) {
|
|
tracer.push(`getMaster: master port is either ${info.port} or ${master.docker.port}`);
|
|
}
|
|
|
|
if (!master.client.isOpen) {
|
|
throw new Error(`Sentinel's expected master node (${info.port}) is now down`);
|
|
}
|
|
|
|
return info.port;
|
|
}
|
|
|
|
throw new Error("Couldn't get master");
|
|
}
|
|
|
|
async getMasterPort(tracer?: Array<string>): Promise<number> {
|
|
const data = await this.getMaster(tracer)
|
|
|
|
return this.#nodeMap.get(data!)!.docker.port;
|
|
}
|
|
|
|
getRandomNode() {
|
|
return this.#nodeList[Math.floor(Math.random() * this.#nodeList.length)].docker.port.toString();
|
|
}
|
|
|
|
async getRandonNonMasterNode(): Promise<string> {
|
|
const masterPort = await this.getMasterPort();
|
|
while (true) {
|
|
const node = this.#nodeList[Math.floor(Math.random() * this.#nodeList.length)];
|
|
if (node.docker.port != masterPort) {
|
|
return node.docker.port.toString();
|
|
}
|
|
}
|
|
}
|
|
|
|
async stopNode(id: string) {
|
|
// console.log(`stopping node ${id}`);
|
|
let node = this.#nodeMap.get(id);
|
|
if (node === undefined) {
|
|
throw new Error("unknown node: " + id);
|
|
}
|
|
|
|
if (node.client.isOpen) {
|
|
node.client.destroy();
|
|
}
|
|
|
|
return await this.dockerStop(node.docker.dockerId);
|
|
}
|
|
|
|
async restartNode(id: string) {
|
|
let node = this.#nodeMap.get(id);
|
|
if (node === undefined) {
|
|
throw new Error("unknown node: " + id);
|
|
}
|
|
|
|
await this.dockerStart(node.docker.dockerId);
|
|
if (!node.client.isOpen) {
|
|
node.client = await RedisClient.create({
|
|
password: this.config.password,
|
|
socket: {
|
|
port: node.docker.port
|
|
}
|
|
}).on("error", () => { }).connect();
|
|
}
|
|
}
|
|
|
|
async stopSentinel(id: string) {
|
|
let sentinel = this.#sentinelMap.get(id);
|
|
if (sentinel === undefined) {
|
|
throw new Error("unknown sentinel: " + id);
|
|
}
|
|
|
|
if (sentinel.client.isOpen) {
|
|
sentinel.client.destroy();
|
|
}
|
|
|
|
return await this.dockerStop(sentinel.docker.dockerId);
|
|
}
|
|
|
|
async restartSentinel(id: string) {
|
|
let sentinel = this.#sentinelMap.get(id);
|
|
if (sentinel === undefined) {
|
|
throw new Error("unknown sentinel: " + id);
|
|
}
|
|
|
|
await this.dockerStart(sentinel.docker.dockerId);
|
|
if (!sentinel.client.isOpen) {
|
|
sentinel.client = await RedisClient.create({
|
|
modules: RedisSentinelModule,
|
|
password: this.config.password,
|
|
socket: {
|
|
port: sentinel.docker.port
|
|
}
|
|
}).on("error", () => { }).connect();
|
|
}
|
|
}
|
|
|
|
getNodePort(id: string) {
|
|
let node = this.#nodeMap.get(id);
|
|
if (node === undefined) {
|
|
throw new Error("unknown node: " + id);
|
|
}
|
|
|
|
return node.docker.port;
|
|
}
|
|
|
|
getAllNodesPort() {
|
|
let ports: Array<number> = [];
|
|
for (const node of this.#nodeList) {
|
|
ports.push(node.docker.port);
|
|
}
|
|
|
|
return ports
|
|
}
|
|
|
|
getAllDockerIds() {
|
|
let ids = new Map<string, number>();
|
|
for (const node of this.#nodeList) {
|
|
ids.set(node.docker.dockerId, node.docker.port);
|
|
}
|
|
|
|
return ids;
|
|
}
|
|
|
|
getSentinelPort(id: string) {
|
|
let sentinel = this.#sentinelMap.get(id);
|
|
if (sentinel === undefined) {
|
|
throw new Error("unknown sentinel: " + id);
|
|
}
|
|
|
|
return sentinel.docker.port;
|
|
}
|
|
|
|
getAllSentinelsPort() {
|
|
let ports: Array<number> = [];
|
|
for (const sentinel of this.#sentinelList) {
|
|
ports.push(sentinel.docker.port);
|
|
}
|
|
|
|
return ports
|
|
}
|
|
|
|
getSetinel(i: number): string {
|
|
return this.#sentinelList[i].docker.port.toString();
|
|
}
|
|
|
|
sentinelSentinels() {
|
|
for (const sentinel of this.#sentinelList) {
|
|
if (sentinel.client.isReady) {
|
|
return sentinel.client.sentinel.sentinelSentinels(this.config.sentinelName);
|
|
}
|
|
}
|
|
}
|
|
|
|
sentinelMaster() {
|
|
for (const sentinel of this.#sentinelList) {
|
|
if (sentinel.client.isReady) {
|
|
return sentinel.client.sentinel.sentinelMaster(this.config.sentinelName);
|
|
}
|
|
}
|
|
}
|
|
|
|
sentinelReplicas() {
|
|
for (const sentinel of this.#sentinelList) {
|
|
if (sentinel.client.isReady) {
|
|
return sentinel.client.sentinel.sentinelReplicas(this.config.sentinelName);
|
|
}
|
|
}
|
|
}
|
|
}
|