1
0
mirror of https://github.com/redis/node-redis.git synced 2025-12-09 21:21:11 +03:00

refactor(test): improve test scenario reliability and maintainability (#3077)

* refactor(test): improve test scenario reliability and maintainability

* tests: add resp3 check test (#1)

* test: refactor connection handoff tests with enhanced spy utility (#2)

* test: add comprehensive push notification disabled scenarios (#3)

* tests: add params config tests (#4)

* tests: add feature enablement tests (#5)

---------

Co-authored-by: Nikolay Karadzhov <nikolay.karadzhov@redis.com>
This commit is contained in:
Pavel Pashov
2025-09-18 12:45:55 +03:00
committed by GitHub
parent 073db12dbb
commit b97bbbe8c6
10 changed files with 960 additions and 444 deletions

View File

@@ -51,9 +51,10 @@ interface Client {
_pause: () => void;
_unpause: () => void;
_maintenanceUpdate: (update: MaintenanceUpdate) => void;
duplicate: (options: RedisClientOptions) => Client;
duplicate: () => Client;
connect: () => Promise<Client>;
destroy: () => void;
on: (event: string, callback: (value: unknown) => void) => void;
}
export default class EnterpriseMaintenanceManager {
@@ -211,21 +212,25 @@ export default class EnterpriseMaintenanceManager {
dbgMaintenance("Creating new tmp client");
let start = performance.now();
const tmpOptions = this.#options;
// If the URL is provided, it takes precedense
if(tmpOptions.url) {
const u = new URL(tmpOptions.url);
// the options object could just be mutated
if(this.#options.url) {
const u = new URL(this.#options.url);
u.hostname = host;
u.port = String(port);
tmpOptions.url = u.toString();
this.#options.url = u.toString();
} else {
tmpOptions.socket = {
...tmpOptions.socket,
this.#options.socket = {
...this.#options.socket,
host,
port
}
}
const tmpClient = this.#client.duplicate(tmpOptions);
const tmpClient = this.#client.duplicate();
tmpClient.on('error', (error: unknown) => {
//We dont know how to handle tmp client errors
dbgMaintenance(`[ERR]`, error)
});
dbgMaintenance(`Tmp client created in ${( performance.now() - start ).toFixed(2)}ms`);
dbgMaintenance(
`Set timeout for tmp client to ${this.#options.maintRelaxedSocketTimeout}`,

View File

@@ -0,0 +1,201 @@
import assert from "node:assert";
import diagnostics_channel from "node:diagnostics_channel";
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
import {
RedisConnectionConfig,
createTestClient,
getDatabaseConfig,
getDatabaseConfigFromEnv,
getEnvConfig,
} from "./test-scenario.util";
import { createClient } from "../../..";
import { FaultInjectorClient } from "./fault-injector-client";
import { MovingEndpointType } from "../../../dist/lib/client/enterprise-maintenance-manager";
import { RedisTcpSocketOptions } from "../../client/socket";
describe("Client Configuration and Handshake", () => {
let clientConfig: RedisConnectionConfig;
let client: ReturnType<typeof createClient<any, any, any, any>>;
let faultInjectorClient: FaultInjectorClient;
let log: DiagnosticsEvent[] = [];
before(() => {
const envConfig = getEnvConfig();
const redisConfig = getDatabaseConfigFromEnv(
envConfig.redisEndpointsConfigPath,
);
faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
clientConfig = getDatabaseConfig(redisConfig);
diagnostics_channel.subscribe("redis.maintenance", (event) => {
log.push(event as DiagnosticsEvent);
});
});
beforeEach(() => {
log.length = 0;
});
afterEach(async () => {
if (client && client.isOpen) {
await client.flushAll();
client.destroy();
}
});
describe("Parameter Configuration", () => {
const endpoints: MovingEndpointType[] = [
"auto",
// "internal-ip",
// "internal-fqdn",
"external-ip",
"external-fqdn",
"none",
];
for (const endpointType of endpoints) {
it(`clientHandshakeWithEndpointType '${endpointType}'`, async () => {
try {
client = await createTestClient(clientConfig, {
maintMovingEndpointType: endpointType,
});
client.on("error", () => {});
//need to copy those because they will be mutated later
const oldOptions = JSON.parse(JSON.stringify(client.options));
assert.ok(oldOptions);
const { action_id } = await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
await faultInjectorClient.waitForAction(action_id);
const movingEvent = log.find((event) => event.type === "MOVING");
assert(!!movingEvent, "Didnt receive moving PN");
let endpoint: string | undefined;
try {
//@ts-ignore
endpoint = movingEvent.data.push[3];
} catch (err) {
assert(
false,
`couldnt get endpoint from event ${JSON.stringify(movingEvent)}`,
);
}
assert(endpoint !== undefined, "no endpoint");
const newOptions = client.options;
assert.ok(newOptions);
if (oldOptions?.url) {
if (endpointType === "none") {
assert.equal(
newOptions!.url,
oldOptions.url,
"For movingEndpointTpe 'none', we expect old and new url to be the same",
);
} else {
assert.equal(
newOptions.url,
endpoint,
"Expected what came through the wire to be set in the new client",
);
assert.notEqual(
newOptions!.url,
oldOptions.url,
`For movingEndpointTpe ${endpointType}, we expect old and new url to be different`,
);
}
} else {
const oldSocket = oldOptions.socket as RedisTcpSocketOptions;
const newSocket = newOptions.socket as RedisTcpSocketOptions;
assert.ok(oldSocket);
assert.ok(newSocket);
if (endpointType === "none") {
assert.equal(
newSocket.host,
oldSocket.host,
"For movingEndpointTpe 'none', we expect old and new host to be the same",
);
} else {
assert.equal(
newSocket.host + ":" + newSocket.port,
endpoint,
"Expected what came through the wire to be set in the new client",
);
assert.notEqual(
newSocket.host,
oldSocket.host,
`For movingEndpointTpe ${endpointType}, we expect old and new host to be different`,
);
}
}
} catch (error: any) {
if (
endpointType === "internal-fqdn" ||
endpointType === "internal-ip"
) {
// errors are expected here, because we cannot connect to internal endpoints unless we are deployed in the same place as the server
} else {
assert(false, error);
}
}
});
}
});
describe("Feature Enablement", () => {
it("connectionHandshakeIncludesEnablingNotifications", async () => {
client = await createTestClient(clientConfig, {
maintPushNotifications: "enabled",
});
const { action_id } = await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
await faultInjectorClient.waitForAction(action_id);
let movingEvent = false;
let migratingEvent = false;
let migratedEvent = false;
for (const event of log) {
if (event.type === "MOVING") movingEvent = true;
if (event.type === "MIGRATING") migratingEvent = true;
if (event.type === "MIGRATED") migratedEvent = true;
}
assert.ok(movingEvent, "didnt receive MOVING PN");
assert.ok(migratingEvent, "didnt receive MIGRATING PN");
assert.ok(migratedEvent, "didnt receive MIGRATED PN");
});
it("disabledDontReceiveNotifications", async () => {
try {
client = await createTestClient(clientConfig, {
maintPushNotifications: "disabled",
socket: {
reconnectStrategy: false
}
});
client.on('error', console.log.bind(console))
const { action_id } = await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
await faultInjectorClient.waitForAction(action_id);
assert.equal(log.length, 0, "received a PN while feature is disabled");
} catch (error: any) { }
});
});
});

View File

@@ -1,78 +1,162 @@
import diagnostics_channel from "node:diagnostics_channel";
import { FaultInjectorClient } from "./fault-injector-client";
import {
createTestClient,
getDatabaseConfig,
getDatabaseConfigFromEnv,
getEnvConfig,
RedisConnectionConfig,
} from "./test-scenario.util";
import { createClient } from "../../..";
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
import { createClient, RedisClientOptions } from "../../..";
import { before } from "mocha";
import { spy } from "sinon";
import Sinon, { SinonSpy, spy, stub } from "sinon";
import assert from "node:assert";
import { TestCommandRunner } from "./test-command-runner";
import net from "node:net";
describe("Connection Handoff", () => {
const diagnosticsLog: DiagnosticsEvent[] = [];
/**
* Creates a spy on a duplicated client method
* @param client - The Redis client instance
* @param funcName - The name of the method to spy on
* @returns Object containing the promise that resolves with the spy and restore function
*/
const spyOnTemporaryClientInstanceMethod = (
client: ReturnType<typeof createClient<any, any, any, any>>,
methodName: string
) => {
const { promise, resolve } = (
Promise as typeof Promise & {
withResolvers: () => {
promise: Promise<{ spy: SinonSpy<any[], any>; restore: () => void }>;
resolve: (value: any) => void;
};
}
).withResolvers();
const onMessageHandler = (message: unknown) => {
diagnosticsLog.push(message as DiagnosticsEvent);
const originalDuplicate = client.duplicate.bind(client);
const duplicateStub: Sinon.SinonStub<any[], any> = stub(
// Temporary clients (in the context of hitless upgrade)
// are created by calling the duplicate method on the client.
Object.getPrototypeOf(client),
"duplicate"
).callsFake((opts) => {
const tmpClient = originalDuplicate(opts);
resolve({
spy: spy(tmpClient, methodName),
restore: duplicateStub.restore,
});
return tmpClient;
});
return {
getSpy: () => promise,
};
};
describe("Connection Handoff", () => {
let clientConfig: RedisConnectionConfig;
let client: ReturnType<typeof createClient<any, any, any, 3>>;
let client: ReturnType<typeof createClient<any, any, any, any>>;
let faultInjectorClient: FaultInjectorClient;
let connectSpy = spy(net, "createConnection");
before(() => {
const envConfig = getEnvConfig();
const redisConfig = getDatabaseConfigFromEnv(
envConfig.redisEndpointsConfigPath,
envConfig.redisEndpointsConfigPath
);
faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
clientConfig = getDatabaseConfig(redisConfig);
});
beforeEach(async () => {
diagnosticsLog.length = 0;
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
connectSpy.resetHistory();
client = createClient({
socket: {
host: clientConfig.host,
port: clientConfig.port,
...(clientConfig.tls === true ? { tls: true } : {}),
},
password: clientConfig.password,
username: clientConfig.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "external-ip",
maintRelaxedCommandTimeout: 10000,
maintRelaxedSocketTimeout: 10000,
});
client.on("error", (err: Error) => {
throw new Error(`Client error: ${err.message}`);
});
await client.connect();
afterEach(async () => {
if (client && client.isOpen) {
await client.flushAll();
});
afterEach(() => {
diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler);
client.destroy();
}
});
describe("New Connection Establishment", () => {
it("should establish new connection", async () => {
assert.equal(connectSpy.callCount, 1);
describe("New Connection Establishment & Traffic Resumption", () => {
const cases: Array<{
name: string;
clientOptions: Partial<RedisClientOptions>;
}> = [
{
name: "default options",
clientOptions: {},
},
{
name: "external-ip",
clientOptions: {
maintMovingEndpointType: "external-ip",
},
},
{
name: "external-fqdn",
clientOptions: {
maintMovingEndpointType: "external-fqdn",
},
},
{
name: "auto",
clientOptions: {
maintMovingEndpointType: "auto",
},
},
{
name: "none",
clientOptions: {
maintMovingEndpointType: "none",
},
},
];
for (const { name, clientOptions } of cases) {
it(`should establish new connection and resume traffic afterwards - ${name}`, async () => {
client = await createTestClient(clientConfig, clientOptions);
const spyObject = spyOnTemporaryClientInstanceMethod(client, "connect");
// PART 1 Establish initial connection
const { action_id: lowTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
await faultInjectorClient.waitForAction(
lowTimeoutBindAndMigrateActionId
);
const spyResult = await spyObject.getSpy();
assert.strictEqual(spyResult.spy.callCount, 1);
// PART 2 Verify traffic resumption
const currentTime = Date.now().toString();
await client.set("key", currentTime);
const result = await client.get("key");
assert.strictEqual(result, currentTime);
spyResult.restore();
});
}
});
describe("TLS Connection Handoff", () => {
it.skip("TODO receiveMessagesWithTLSEnabledTest", async () => {
//
});
it.skip("TODO connectionHandoffWithStaticInternalNameTest", async () => {
//
});
it.skip("TODO connectionHandoffWithStaticExternalNameTest", async () => {
//
});
});
describe("Connection Cleanup", () => {
it("should shut down old connection", async () => {
const spyObject = spyOnTemporaryClientInstanceMethod(client, "destroy");
const { action_id: lowTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
@@ -80,47 +164,13 @@ describe("Connection Handoff", () => {
clusterIndex: 0,
});
const lowTimeoutWaitPromise = faultInjectorClient.waitForAction(
lowTimeoutBindAndMigrateActionId,
);
await faultInjectorClient.waitForAction(lowTimeoutBindAndMigrateActionId);
await lowTimeoutWaitPromise;
assert.equal(connectSpy.callCount, 2);
});
});
const spyResult = await spyObject.getSpy();
describe("TLS Connection Handoff", () => {
it("TODO receiveMessagesWithTLSEnabledTest", async () => {
//
});
it("TODO connectionHandoffWithStaticInternalNameTest", async () => {
//
});
it("TODO connectionHandoffWithStaticExternalNameTest", async () => {
//
});
});
assert.equal(spyResult.spy.callCount, 1);
describe("Traffic Resumption", () => {
it("Traffic resumed after handoff", async () => {
const { action_id } = await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
const workloadPromise = faultInjectorClient.waitForAction(action_id);
const commandPromises =
await TestCommandRunner.fireCommandsUntilStopSignal(
client,
workloadPromise,
);
const rejected = (
await Promise.all(commandPromises.commandPromises)
).filter((result) => result.status === "rejected");
assert.ok(rejected.length === 0);
spyResult.restore();
});
});
});

View File

@@ -9,7 +9,8 @@ export type ActionType =
| "execute_rlutil_command"
| "execute_rladmin_command"
| "migrate"
| "bind";
| "bind"
| "update_cluster_config";
export interface ActionRequest {
type: ActionType;
@@ -47,7 +48,9 @@ export class FaultInjectorClient {
* @param action The action request to trigger
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
*/
public triggerAction<T = unknown>(action: ActionRequest): Promise<T> {
public triggerAction<T extends { action_id: string }>(
action: ActionRequest
): Promise<T> {
return this.#request<T>("POST", "/action", action);
}
@@ -60,20 +63,6 @@ export class FaultInjectorClient {
return this.#request<T>("GET", `/action/${actionId}`);
}
/**
* Executes an rladmin command.
* @param command The rladmin command to execute
* @param bdbId Optional database ID to target
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
*/
public executeRladminCommand<T = unknown>(
command: string,
bdbId?: string
): Promise<T> {
const cmd = bdbId ? `rladmin -b ${bdbId} ${command}` : `rladmin ${command}`;
return this.#request<T>("POST", "/rladmin", cmd);
}
/**
* Waits for an action to complete.
* @param actionId The ID of the action to wait for

View File

@@ -0,0 +1,15 @@
import assert from "assert";
import { createClient } from "../../..";
describe("Negative tests", () => {
it("should only be enabled with RESP3", () => {
assert.throws(
() =>
createClient({
RESP: 2,
maintPushNotifications: "enabled",
}),
"Error: Graceful Maintenance is only supported with RESP3",
);
});
});

View File

@@ -2,6 +2,7 @@ import assert from "node:assert";
import diagnostics_channel from "node:diagnostics_channel";
import { FaultInjectorClient } from "./fault-injector-client";
import {
createTestClient,
getDatabaseConfig,
getDatabaseConfigFromEnv,
getEnvConfig,
@@ -12,14 +13,21 @@ import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
import { before } from "mocha";
describe("Push Notifications", () => {
const diagnosticsLog: DiagnosticsEvent[] = [];
const onMessageHandler = (message: unknown) => {
diagnosticsLog.push(message as DiagnosticsEvent);
const createNotificationMessageHandler = (
result: Record<DiagnosticsEvent["type"], number>,
notifications: Array<DiagnosticsEvent["type"]>
) => {
return (message: unknown) => {
if (notifications.includes((message as DiagnosticsEvent).type)) {
const event = message as DiagnosticsEvent;
result[event.type] = (result[event.type] ?? 0) + 1;
}
};
};
let onMessageHandler: ReturnType<typeof createNotificationMessageHandler>;
let clientConfig: RedisConnectionConfig;
let client: ReturnType<typeof createClient<any, any, any, 3>>;
let client: ReturnType<typeof createClient<any, any, any, any>>;
let faultInjectorClient: FaultInjectorClient;
before(() => {
@@ -32,63 +40,310 @@ describe("Push Notifications", () => {
clientConfig = getDatabaseConfig(redisConfig);
});
beforeEach(async () => {
diagnosticsLog.length = 0;
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
client = createClient({
socket: {
host: clientConfig.host,
port: clientConfig.port,
...(clientConfig.tls === true ? { tls: true } : {}),
},
password: clientConfig.password,
username: clientConfig.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "external-ip",
maintRelaxedCommandTimeout: 10000,
maintRelaxedSocketTimeout: 10000,
});
client.on("error", (err: Error) => {
throw new Error(`Client error: ${err.message}`);
});
await client.connect();
});
afterEach(() => {
if (onMessageHandler!) {
diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler);
}
if (client && client.isOpen) {
client.destroy();
}
});
describe("Push Notifications Enabled", () => {
beforeEach(async () => {
client = await createTestClient(clientConfig);
await client.flushAll();
});
it("should receive MOVING, MIGRATING, and MIGRATED push notifications", async () => {
const { action_id: migrateActionId } =
await faultInjectorClient.triggerAction<{ action_id: string }>({
type: "migrate",
const notifications: Array<DiagnosticsEvent["type"]> = [
"MOVING",
"MIGRATING",
"MIGRATED",
];
const diagnosticsMap: Record<DiagnosticsEvent["type"], number> = {};
onMessageHandler = createNotificationMessageHandler(
diagnosticsMap,
notifications
);
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
const { action_id: bindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
await faultInjectorClient.waitForAction(bindAndMigrateActionId);
assert.strictEqual(
diagnosticsMap.MOVING,
1,
"Should have received exactly one MOVING notification"
);
assert.strictEqual(
diagnosticsMap.MIGRATING,
1,
"Should have received exactly one MIGRATING notification"
);
assert.strictEqual(
diagnosticsMap.MIGRATED,
1,
"Should have received exactly one MIGRATED notification"
);
});
it("should receive FAILING_OVER and FAILED_OVER push notifications", async () => {
const notifications: Array<DiagnosticsEvent["type"]> = [
"FAILING_OVER",
"FAILED_OVER",
];
const diagnosticsMap: Record<DiagnosticsEvent["type"], number> = {};
onMessageHandler = createNotificationMessageHandler(
diagnosticsMap,
notifications
);
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
const { action_id: failoverActionId } =
await faultInjectorClient.triggerAction({
type: "failover",
parameters: {
cluster_index: "0",
bdb_id: clientConfig.bdbId.toString(),
cluster_index: 0,
},
});
await faultInjectorClient.waitForAction(migrateActionId);
await faultInjectorClient.waitForAction(failoverActionId);
const { action_id: bindActionId } =
await faultInjectorClient.triggerAction<{ action_id: string }>({
type: "bind",
assert.strictEqual(
diagnosticsMap.FAILING_OVER,
1,
"Should have received exactly one FAILING_OVER notification"
);
assert.strictEqual(
diagnosticsMap.FAILED_OVER,
1,
"Should have received exactly one FAILED_OVER notification"
);
});
});
describe("Push Notifications Disabled - Client", () => {
beforeEach(async () => {
client = await createTestClient(clientConfig, {
maintPushNotifications: "disabled",
});
client.on("error", (_err) => {
// Expect the socket to be closed
// Ignore errors
});
await client.flushAll();
});
it("should NOT receive MOVING, MIGRATING, and MIGRATED push notifications", async () => {
const notifications: Array<DiagnosticsEvent["type"]> = [
"MOVING",
"MIGRATING",
"MIGRATED",
];
const diagnosticsMap: Record<DiagnosticsEvent["type"], number> = {};
onMessageHandler = createNotificationMessageHandler(
diagnosticsMap,
notifications
);
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
const { action_id: bindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
await faultInjectorClient.waitForAction(bindAndMigrateActionId);
assert.strictEqual(
diagnosticsMap.MOVING,
undefined,
"Should NOT have received exactly one MOVING notification"
);
assert.strictEqual(
diagnosticsMap.MIGRATING,
undefined,
"Should NOT have received exactly one MIGRATING notification"
);
assert.strictEqual(
diagnosticsMap.MIGRATED,
undefined,
"Should NOT have received exactly one MIGRATED notification"
);
});
it("should NOT receive FAILING_OVER and FAILED_OVER push notifications", async () => {
const notifications: Array<DiagnosticsEvent["type"]> = [
"FAILING_OVER",
"FAILED_OVER",
];
const diagnosticsMap: Record<DiagnosticsEvent["type"], number> = {};
onMessageHandler = createNotificationMessageHandler(
diagnosticsMap,
notifications
);
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
const { action_id: failoverActionId } =
await faultInjectorClient.triggerAction({
type: "failover",
parameters: {
cluster_index: "0",
bdb_id: `${clientConfig.bdbId}`,
bdb_id: clientConfig.bdbId.toString(),
cluster_index: 0,
},
});
await faultInjectorClient.waitForAction(bindActionId);
await faultInjectorClient.waitForAction(failoverActionId);
const pushNotificationLogs = diagnosticsLog.filter((log) => {
return ["MOVING", "MIGRATING", "MIGRATED"].includes(log?.type);
assert.strictEqual(
diagnosticsMap.FAILING_OVER,
undefined,
"Should have received exactly one FAILING_OVER notification"
);
assert.strictEqual(
diagnosticsMap.FAILED_OVER,
undefined,
"Should have received exactly one FAILED_OVER notification"
);
});
});
assert.strictEqual(pushNotificationLogs.length, 3);
describe("Push Notifications Disabled - Server", () => {
beforeEach(async () => {
client = await createTestClient(clientConfig);
client.on("error", (_err) => {
// Expect the socket to be closed
// Ignore errors
});
await client.flushAll();
});
before(async () => {
const { action_id: disablePushNotificationsActionId } =
await faultInjectorClient.triggerAction({
type: "update_cluster_config",
parameters: {
config: { client_maint_notifications: false },
},
});
await faultInjectorClient.waitForAction(disablePushNotificationsActionId);
});
after(async () => {
const { action_id: enablePushNotificationsActionId } =
await faultInjectorClient.triggerAction({
type: "update_cluster_config",
parameters: {
config: { client_maint_notifications: true },
},
});
await faultInjectorClient.waitForAction(enablePushNotificationsActionId);
});
it("should NOT receive MOVING, MIGRATING, and MIGRATED push notifications", async () => {
const notifications: Array<DiagnosticsEvent["type"]> = [
"MOVING",
"MIGRATING",
"MIGRATED",
];
const diagnosticsMap: Record<DiagnosticsEvent["type"], number> = {};
onMessageHandler = createNotificationMessageHandler(
diagnosticsMap,
notifications
);
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
const { action_id: bindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
await faultInjectorClient.waitForAction(bindAndMigrateActionId);
assert.strictEqual(
diagnosticsMap.MOVING,
undefined,
"Should NOT have received exactly one MOVING notification"
);
assert.strictEqual(
diagnosticsMap.MIGRATING,
undefined,
"Should NOT have received exactly one MIGRATING notification"
);
assert.strictEqual(
diagnosticsMap.MIGRATED,
undefined,
"Should NOT have received exactly one MIGRATED notification"
);
});
it("should NOT receive FAILING_OVER and FAILED_OVER push notifications", async () => {
const notifications: Array<DiagnosticsEvent["type"]> = [
"FAILING_OVER",
"FAILED_OVER",
];
const diagnosticsMap: Record<DiagnosticsEvent["type"], number> = {};
onMessageHandler = createNotificationMessageHandler(
diagnosticsMap,
notifications
);
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
const { action_id: failoverActionId } =
await faultInjectorClient.triggerAction({
type: "failover",
parameters: {
bdb_id: clientConfig.bdbId.toString(),
cluster_index: 0,
},
});
await faultInjectorClient.waitForAction(failoverActionId);
assert.strictEqual(
diagnosticsMap.FAILING_OVER,
undefined,
"Should have received exactly one FAILING_OVER notification"
);
assert.strictEqual(
diagnosticsMap.FAILED_OVER,
undefined,
"Should have received exactly one FAILED_OVER notification"
);
});
});
});

View File

@@ -1,108 +0,0 @@
import { randomUUID } from "node:crypto";
import { setTimeout } from "node:timers/promises";
import { createClient } from "../../..";
/**
* Options for the `fireCommandsUntilStopSignal` method
*/
type FireCommandsUntilStopSignalOptions = {
/**
* Number of commands to fire in each batch
*/
batchSize: number;
/**
* Timeout between batches in milliseconds
*/
timeoutMs: number;
/**
* Function that creates the commands to be executed
*/
createCommands: (
client: ReturnType<typeof createClient<any, any, any, any>>
) => Array<() => Promise<unknown>>;
};
/**
* Utility class for running test commands until a stop signal is received
*/
export class TestCommandRunner {
private static readonly defaultOptions: FireCommandsUntilStopSignalOptions = {
batchSize: 60,
timeoutMs: 10,
createCommands: (
client: ReturnType<typeof createClient<any, any, any, any>>
) => [
() => client.set(randomUUID(), Date.now()),
() => client.get(randomUUID()),
],
};
static #toSettled<T>(p: Promise<T>) {
return p
.then((value) => ({ status: "fulfilled" as const, value, error: null }))
.catch((reason) => ({
status: "rejected" as const,
value: null,
error: reason,
}));
}
static async #racePromises<S, T>({
timeout,
stopper,
}: {
timeout: Promise<S>;
stopper: Promise<T>;
}) {
return Promise.race([
TestCommandRunner.#toSettled<S>(timeout).then((result) => ({
...result,
stop: false,
})),
TestCommandRunner.#toSettled<T>(stopper).then((result) => ({
...result,
stop: true,
})),
]);
}
/**
* Fires a batch of test commands until a stop signal is received
* @param client - The Redis client to use
* @param stopSignalPromise - Promise that resolves when the execution should stop
* @param options - Options for the command execution
* @returns An object containing the promises of all executed commands and the result of the stop signal
*/
static async fireCommandsUntilStopSignal(
client: ReturnType<typeof createClient<any, any, any, any>>,
stopSignalPromise: Promise<unknown>,
options?: Partial<FireCommandsUntilStopSignalOptions>
) {
const executeOptions = {
...TestCommandRunner.defaultOptions,
...options,
};
const commandPromises = [];
while (true) {
for (let i = 0; i < executeOptions.batchSize; i++) {
for (const command of executeOptions.createCommands(client)) {
commandPromises.push(TestCommandRunner.#toSettled(command()));
}
}
const result = await TestCommandRunner.#racePromises({
timeout: setTimeout(executeOptions.timeoutMs),
stopper: stopSignalPromise,
});
if (result.stop) {
return {
commandPromises,
stopResult: result,
};
}
}
}
}

View File

@@ -110,8 +110,18 @@ export function getDatabaseConfig(
};
}
// TODO this should be moved in the tests utils package
export async function blockSetImmediate(fn: () => Promise<unknown>) {
/**
* Executes the provided function in a context where setImmediate is stubbed to not do anything.
* This blocks setImmediate callbacks from executing
*
* @param command - The command to execute
* @returns The error and duration of the command execution
*/
export async function blockCommand(command: () => Promise<unknown>) {
let error: any;
const start = performance.now();
let setImmediateStub: any;
try {
@@ -119,79 +129,46 @@ export async function blockSetImmediate(fn: () => Promise<unknown>) {
setImmediateStub.callsFake(() => {
//Dont call the callback, effectively blocking execution
});
await fn();
await command();
} catch (err: any) {
error = err;
} finally {
if (setImmediateStub) {
setImmediateStub.restore();
}
}
return {
error,
duration: performance.now() - start,
};
}
/**
* Factory class for creating and managing Redis clients
*/
export class ClientFactory {
private readonly clients = new Map<
string,
ReturnType<typeof createClient<any, any, any, any>>
>();
constructor(private readonly config: RedisConnectionConfig) {}
/**
* Creates a new client with the specified options and connects it to the database
* @param key - The key to store the client under
* Creates a test client with the provided configuration, connects it and attaches an error handler listener
* @param clientConfig - The Redis connection configuration
* @param options - Optional client options
* @returns The created and connected client
* @returns The created Redis client
*/
async create(key: string, options: Partial<RedisClientOptions> = {}) {
export async function createTestClient(
clientConfig: RedisConnectionConfig,
options: Partial<RedisClientOptions> = {}
) {
const client = createClient({
socket: {
host: this.config.host,
port: this.config.port,
...(this.config.tls === true ? { tls: true } : {}),
host: clientConfig.host,
port: clientConfig.port,
...(clientConfig.tls === true ? { tls: true } : {}),
},
password: this.config.password,
username: this.config.username,
password: clientConfig.password,
username: clientConfig.username,
RESP: 3,
maintPushNotifications: "auto",
maintMovingEndpointType: "auto",
...options,
});
client.on("error", (err: Error) => {
throw new Error(`Client error: ${err.message}`);
});
await client.connect();
this.clients.set(key, client);
return client;
}
/**
* Gets an existing client by key or the first one if no key is provided
* @param key - The key of the client to retrieve
* @returns The client if found, undefined otherwise
*/
get(key?: string) {
if (key) {
return this.clients.get(key);
}
// Get the first one if no key is provided
return this.clients.values().next().value;
}
/**
* Destroys all created clients
*/
destroyAll() {
this.clients.forEach((client) => {
if (client && client.isOpen) {
client.destroy();
}
});
}
}

View File

@@ -2,22 +2,48 @@ import assert from "node:assert";
import { FaultInjectorClient } from "./fault-injector-client";
import {
ClientFactory,
getDatabaseConfig,
getDatabaseConfigFromEnv,
getEnvConfig,
RedisConnectionConfig,
blockSetImmediate
blockCommand,
createTestClient,
} from "./test-scenario.util";
import { createClient } from "../../..";
import { before } from "mocha";
import { TestCommandRunner } from "./test-command-runner";
import diagnostics_channel from "node:diagnostics_channel";
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
describe("Timeout Handling During Notifications", () => {
let clientConfig: RedisConnectionConfig;
let clientFactory: ClientFactory;
let faultInjectorClient: FaultInjectorClient;
let defaultClient: ReturnType<typeof createClient<any, any, any, any>>;
let client: ReturnType<typeof createClient<any, any, any, any>>;
const NORMAL_COMMAND_TIMEOUT = 50;
const RELAXED_COMMAND_TIMEOUT = 2000;
/**
* Creates a handler for the `redis.maintenance` channel that will execute and block a command on the client
* when a notification is received and save the result in the `result` object.
* This is used to test that the command timeout is relaxed during notifications.
*/
const createNotificationMessageHandler = (
client: ReturnType<typeof createClient<any, any, any, any>>,
result: Record<DiagnosticsEvent["type"], { error: any; duration: number }>,
notifications: Array<DiagnosticsEvent["type"]>
) => {
return (message: unknown) => {
if (notifications.includes((message as DiagnosticsEvent).type)) {
setImmediate(async () => {
result[(message as DiagnosticsEvent).type] = await blockCommand(
async () => {
await client.set("key", "value");
}
);
});
}
};
};
before(() => {
const envConfig = getEnvConfig();
@@ -27,96 +53,64 @@ describe("Timeout Handling During Notifications", () => {
clientConfig = getDatabaseConfig(redisConfig);
faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
clientFactory = new ClientFactory(clientConfig);
});
beforeEach(async () => {
defaultClient = await clientFactory.create("default");
await defaultClient.flushAll();
client = await createTestClient(clientConfig, {
commandOptions: { timeout: NORMAL_COMMAND_TIMEOUT },
maintRelaxedCommandTimeout: RELAXED_COMMAND_TIMEOUT,
});
afterEach(async () => {
clientFactory.destroyAll();
await client.flushAll();
});
it("should relax command timeout on MOVING, MIGRATING, and MIGRATED", async () => {
afterEach(() => {
if (client && client.isOpen) {
client.destroy();
}
});
it("should relax command timeout on MOVING, MIGRATING", async () => {
// PART 1
// Set very low timeout to trigger errors
const lowTimeoutClient = await clientFactory.create("lowTimeout", {
maintRelaxedCommandTimeout: 50,
// Normal command timeout
const { error, duration } = await blockCommand(async () => {
await client.set("key", "value");
});
const { action_id: lowTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
const lowTimeoutWaitPromise = faultInjectorClient.waitForAction(
lowTimeoutBindAndMigrateActionId
assert.ok(
error instanceof Error,
"Command Timeout error should be instanceof Error"
);
const lowTimeoutCommandPromises =
await TestCommandRunner.fireCommandsUntilStopSignal(
lowTimeoutClient,
lowTimeoutWaitPromise
assert.ok(
duration > NORMAL_COMMAND_TIMEOUT &&
duration < NORMAL_COMMAND_TIMEOUT * 1.1,
`Normal command should timeout within normal timeout ms`
);
const lowTimeoutRejectedCommands = (
await Promise.all(lowTimeoutCommandPromises.commandPromises)
).filter((result) => result.status === "rejected");
assert.ok(lowTimeoutRejectedCommands.length > 0);
assert.strictEqual(
lowTimeoutRejectedCommands.filter((rejected) => {
return (
// TODO instanceof doesn't work for some reason
rejected.error.constructor.name ===
"CommandTimeoutDuringMaintananceError"
);
}).length,
lowTimeoutRejectedCommands.length
error?.constructor?.name,
"TimeoutError",
"Command Timeout error should be TimeoutError"
);
// PART 2
// Set high timeout to avoid errors
const highTimeoutClient = await clientFactory.create("highTimeout", {
maintRelaxedCommandTimeout: 10000,
});
// Command timeout during maintenance
const notifications: Array<DiagnosticsEvent["type"]> = [
"MOVING",
"MIGRATING",
];
const { action_id: highTimeoutBindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
bdbId: clientConfig.bdbId,
clusterIndex: 0,
});
const result: Record<
DiagnosticsEvent["type"],
{ error: any; duration: number }
> = {};
const highTimeoutWaitPromise = faultInjectorClient.waitForAction(
highTimeoutBindAndMigrateActionId
const onMessageHandler = createNotificationMessageHandler(
client,
result,
notifications
);
const highTimeoutCommandPromises =
await TestCommandRunner.fireCommandsUntilStopSignal(
highTimeoutClient,
highTimeoutWaitPromise
);
const highTimeoutRejectedCommands = (
await Promise.all(highTimeoutCommandPromises.commandPromises)
).filter((result) => result.status === "rejected");
assert.strictEqual(highTimeoutRejectedCommands.length, 0);
});
it("should unrelax command timeout after MAINTENANCE", async () => {
const clientWithCommandTimeout = await clientFactory.create(
"clientWithCommandTimeout",
{
commandOptions: {
timeout: 100,
},
}
);
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
const { action_id: bindAndMigrateActionId } =
await faultInjectorClient.migrateAndBindAction({
@@ -124,36 +118,173 @@ describe("Timeout Handling During Notifications", () => {
clusterIndex: 0,
});
const lowTimeoutWaitPromise = faultInjectorClient.waitForAction(
bindAndMigrateActionId
await faultInjectorClient.waitForAction(bindAndMigrateActionId);
diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler);
notifications.forEach((notification) => {
assert.ok(
result[notification]?.error instanceof Error,
`${notification} notification error should be instanceof Error`
);
assert.ok(
result[notification]?.duration > RELAXED_COMMAND_TIMEOUT &&
result[notification]?.duration < RELAXED_COMMAND_TIMEOUT * 1.1,
`${notification} notification should timeout within relaxed timeout`
);
assert.strictEqual(
result[notification]?.error?.constructor?.name,
"CommandTimeoutDuringMaintenanceError",
`${notification} notification error should be CommandTimeoutDuringMaintenanceError`
);
});
});
it("should unrelax command timeout after MIGRATED and MOVING", async () => {
const { action_id: migrateActionId } =
await faultInjectorClient.triggerAction({
type: "migrate",
parameters: {
cluster_index: 0,
},
});
await faultInjectorClient.waitForAction(migrateActionId);
// PART 1
// After migration
const { error: errorMigrate, duration: durationMigrate } =
await blockCommand(async () => {
await client.set("key", "value");
});
assert.ok(
errorMigrate instanceof Error,
"Command Timeout error should be instanceof Error"
);
assert.ok(
durationMigrate > NORMAL_COMMAND_TIMEOUT &&
durationMigrate < NORMAL_COMMAND_TIMEOUT * 1.1,
`Normal command should timeout within normal timeout ms`
);
assert.strictEqual(
errorMigrate?.constructor?.name,
"TimeoutError",
"Command Timeout error should be TimeoutError"
);
const relaxedTimeoutCommandPromises =
await TestCommandRunner.fireCommandsUntilStopSignal(
clientWithCommandTimeout,
lowTimeoutWaitPromise
);
const relaxedTimeoutRejectedCommands = (
await Promise.all(relaxedTimeoutCommandPromises.commandPromises)
).filter((result) => result.status === "rejected");
assert.ok(relaxedTimeoutRejectedCommands.length === 0);
const start = performance.now();
let error: any;
await blockSetImmediate(async () => {
try {
await clientWithCommandTimeout.set("key", "value");
} catch (err: any) {
error = err;
const { action_id: bindActionId } = await faultInjectorClient.triggerAction(
{
type: "bind",
parameters: {
bdb_id: clientConfig.bdbId.toString(),
cluster_index: 0,
},
}
);
await faultInjectorClient.waitForAction(bindActionId);
// PART 2
// After bind
const { error: errorBind, duration: durationBind } = await blockCommand(
async () => {
await client.set("key", "value");
}
);
assert.ok(
errorBind instanceof Error,
"Command Timeout error should be instanceof Error"
);
assert.ok(
durationBind > NORMAL_COMMAND_TIMEOUT &&
durationBind < NORMAL_COMMAND_TIMEOUT * 1.1,
`Normal command should timeout within normal timeout ms`
);
assert.strictEqual(
errorBind?.constructor?.name,
"TimeoutError",
"Command Timeout error should be TimeoutError"
);
});
// Make sure it took less than 1sec to fail
assert.ok(performance.now() - start < 1000);
assert.ok(error instanceof Error);
assert.ok(error.constructor.name === "TimeoutError");
it("should relax command timeout on FAILING_OVER", async () => {
const notifications: Array<DiagnosticsEvent["type"]> = ["FAILING_OVER"];
const result: Record<
DiagnosticsEvent["type"],
{ error: any; duration: number }
> = {};
const onMessageHandler = createNotificationMessageHandler(
client,
result,
notifications
);
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
const { action_id: failoverActionId } =
await faultInjectorClient.triggerAction({
type: "failover",
parameters: {
bdb_id: clientConfig.bdbId.toString(),
cluster_index: 0,
},
});
await faultInjectorClient.waitForAction(failoverActionId);
diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler);
notifications.forEach((notification) => {
assert.ok(
result[notification]?.error instanceof Error,
`${notification} notification error should be instanceof Error`
);
assert.ok(
result[notification]?.duration > RELAXED_COMMAND_TIMEOUT &&
result[notification]?.duration < RELAXED_COMMAND_TIMEOUT * 1.1,
`${notification} notification should timeout within relaxed timeout`
);
assert.strictEqual(
result[notification]?.error?.constructor?.name,
"CommandTimeoutDuringMaintenanceError",
`${notification} notification error should be CommandTimeoutDuringMaintenanceError`
);
});
});
it("should unrelax command timeout after FAILED_OVER", async () => {
const { action_id: failoverActionId } =
await faultInjectorClient.triggerAction({
type: "failover",
parameters: {
bdb_id: clientConfig.bdbId.toString(),
cluster_index: 0,
},
});
await faultInjectorClient.waitForAction(failoverActionId);
const { error, duration } = await blockCommand(async () => {
await client.set("key", "value");
});
assert.ok(
error instanceof Error,
"Command Timeout error should be instanceof Error"
);
assert.ok(
duration > NORMAL_COMMAND_TIMEOUT &&
duration < NORMAL_COMMAND_TIMEOUT * 1.1,
`Normal command should timeout within normal timeout ms`
);
assert.strictEqual(
error?.constructor?.name,
"TimeoutError",
"Command Timeout error should be TimeoutError"
);
});
});

View File

@@ -11,7 +11,8 @@
"exclude": [
"./lib/test-utils.ts",
"./lib/**/*.spec.ts",
"./lib/sentinel/test-util.ts"
"./lib/sentinel/test-util.ts",
"./lib/tests/**/*.ts"
],
"typedocOptions": {
"entryPoints": [