You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-12-14 09:42:12 +03:00
* feat(errors): Add specialized timeout error types for maintenance scenarios - Added `SocketTimeoutDuringMaintananceError`, a subclass of `TimeoutError`, to handle socket timeouts during maintenance. - Added `CommandTimeoutDuringMaintenanceError`, another subclass of `TimeoutError`, to address command write timeouts during maintenance. * feat(linked-list): Add EmptyAwareSinglyLinkedList and enhance DoublyLinkedList functionality - Introduced `EmptyAwareSinglyLinkedList`, a subclass of `SinglyLinkedList` that emits an `empty` event when the list becomes empty due to `reset`, `shift`, or `remove` operations. - Added `nodes()` iterator method to `DoublyLinkedList` for iterating over nodes directly. - Enhanced unit tests for `DoublyLinkedList` and `SinglyLinkedList` to cover edge cases and new functionality. - Added comprehensive tests for `EmptyAwareSinglyLinkedList` to validate `empty` event emission under various scenarios. - Improved code formatting and consistency. * refactor(commands-queue): Improve push notification handling - Replaced `setInvalidateCallback` with a more flexible `addPushHandler` method, allowing multiple handlers for push notifications. - Introduced the `PushHandler` type to standardize push notification processing. - Refactored `RedisCommandsQueue` to use a `#pushHandlers` array, enabling dynamic and modular handling of push notifications. - Updated `RedisClient` to leverage the new handler mechanism for `invalidate` push notifications, simplifying and decoupling logic. * feat(commands-queue): Add method to wait for in-flight commands to complete - Introduced `waitForInflightCommandsToComplete` method to asynchronously wait for all in-flight commands to finish processing. - Utilized the `empty` event from `#waitingForReply` to signal when all commands have been completed. * feat(commands-queue): Introduce maintenance mode support for commands-queue - Added `#maintenanceCommandTimeout` and `setMaintenanceCommandTimeout` method to dynamically adjust command timeouts during maintenance * refator(client): Extract socket event listener setup into helper method * refactor(socket): Add maintenance mode support and dynamic timeout handling - Added `#maintenanceTimeout` and `setMaintenanceTimeout` method to dynamically adjust socket timeouts during maintenance. * feat(client): Add Redis Enterprise maintenance configuration options - Added `maintPushNotifications` option to control how the client handles Redis Enterprise maintenance push notifications (`disabled`, `enabled`, `au to`). - Added `maintMovingEndpointType` option to specify the endpoint type for reconnecting during a MOVING notification (`auto`, `internal-ip`, `external-ip`, etc.). - Added `maintRelaxedCommandTimeout` option to define a relaxed timeout for commands during maintenance. - Added `maintRelaxedSocketTimeout` option to define a relaxed timeout for the socket during maintenance. - Enforced RESP3 requirement for maintenance-related features (`maintPushNotifications`). * feat(client): Add socket helpers and pause mechanism - Introduced `#paused` flag with corresponding `_pause` and `_unpause` methods to temporarily halt writing commands to the socket during maintenance windows. - Updated `#write` method to respect the `#paused` flag, preventing new commands from being written during maintenance. - Added `_ejectSocket` method to safely detach from and return the current socket - Added `_insertSocket` method to receive and start using a new socket * feat(client): Add Redis Enterprise maintenance handling capabilities - Introduced `EnterpriseMaintenanceManager` to manage Redis Enterprise maintenance events and push notifications. - Integrated `EnterpriseMaintenanceManager` into `RedisClient` to handle maintenance push notifications and manage socket transitions. - Implemented graceful handling of MOVING, MIGRATING, and FAILOVER push notifications, including socket replacement and timeout adjustments. * test: add E2E test infrastructure for Redis maintenance scenarios * test: add E2E tests for Redis Enterprise maintenance timeout handling (#3) * test: add connection handoff test --------- Co-authored-by: Pavel Pashov <pavel.pashov@redis.com> Co-authored-by: Pavel Pashov <60297174+PavelPashov@users.noreply.github.com>
188 lines
4.6 KiB
TypeScript
188 lines
4.6 KiB
TypeScript
import { setTimeout } from "node:timers/promises";
|
|
|
|
export type ActionType =
|
|
| "dmc_restart"
|
|
| "failover"
|
|
| "reshard"
|
|
| "sequence_of_actions"
|
|
| "network_failure"
|
|
| "execute_rlutil_command"
|
|
| "execute_rladmin_command"
|
|
| "migrate"
|
|
| "bind";
|
|
|
|
export interface ActionRequest {
|
|
type: ActionType;
|
|
parameters?: {
|
|
bdb_id?: string;
|
|
[key: string]: unknown;
|
|
};
|
|
}
|
|
|
|
export interface ActionStatus {
|
|
status: string;
|
|
error: unknown;
|
|
output: string;
|
|
}
|
|
|
|
export class FaultInjectorClient {
|
|
private baseUrl: string;
|
|
#fetch: typeof fetch;
|
|
|
|
constructor(baseUrl: string, fetchImpl: typeof fetch = fetch) {
|
|
this.baseUrl = baseUrl.replace(/\/+$/, ""); // trim trailing slash
|
|
this.#fetch = fetchImpl;
|
|
}
|
|
|
|
/**
|
|
* Lists all available actions.
|
|
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
|
|
*/
|
|
public listActions<T = unknown>(): Promise<T> {
|
|
return this.#request<T>("GET", "/action");
|
|
}
|
|
|
|
/**
|
|
* Triggers a specific action.
|
|
* @param action The action request to trigger
|
|
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
|
|
*/
|
|
public triggerAction<T = unknown>(action: ActionRequest): Promise<T> {
|
|
return this.#request<T>("POST", "/action", action);
|
|
}
|
|
|
|
/**
|
|
* Gets the status of a specific action.
|
|
* @param actionId The ID of the action to check
|
|
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
|
|
*/
|
|
public getActionStatus<T = ActionStatus>(actionId: string): Promise<T> {
|
|
return this.#request<T>("GET", `/action/${actionId}`);
|
|
}
|
|
|
|
/**
|
|
* Executes an rladmin command.
|
|
* @param command The rladmin command to execute
|
|
* @param bdbId Optional database ID to target
|
|
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
|
|
*/
|
|
public executeRladminCommand<T = unknown>(
|
|
command: string,
|
|
bdbId?: string
|
|
): Promise<T> {
|
|
const cmd = bdbId ? `rladmin -b ${bdbId} ${command}` : `rladmin ${command}`;
|
|
return this.#request<T>("POST", "/rladmin", cmd);
|
|
}
|
|
|
|
/**
|
|
* Waits for an action to complete.
|
|
* @param actionId The ID of the action to wait for
|
|
* @param options Optional timeout and max wait time
|
|
* @throws {Error} When the action does not complete within the max wait time
|
|
*/
|
|
public async waitForAction(
|
|
actionId: string,
|
|
{
|
|
timeoutMs,
|
|
maxWaitTimeMs,
|
|
}: {
|
|
timeoutMs?: number;
|
|
maxWaitTimeMs?: number;
|
|
} = {}
|
|
): Promise<ActionStatus> {
|
|
const timeout = timeoutMs || 1000;
|
|
const maxWaitTime = maxWaitTimeMs || 60000;
|
|
|
|
const startTime = Date.now();
|
|
|
|
while (Date.now() - startTime < maxWaitTime) {
|
|
const action = await this.getActionStatus<ActionStatus>(actionId);
|
|
|
|
if (["finished", "failed", "success"].includes(action.status)) {
|
|
return action;
|
|
}
|
|
|
|
await setTimeout(timeout);
|
|
}
|
|
|
|
throw new Error(`Timeout waiting for action ${actionId}`);
|
|
}
|
|
|
|
async migrateAndBindAction({
|
|
bdbId,
|
|
clusterIndex,
|
|
}: {
|
|
bdbId: string | number;
|
|
clusterIndex: string | number;
|
|
}) {
|
|
const bdbIdStr = bdbId.toString();
|
|
const clusterIndexStr = clusterIndex.toString();
|
|
|
|
return this.triggerAction<{
|
|
action_id: string;
|
|
}>({
|
|
type: "sequence_of_actions",
|
|
parameters: {
|
|
bdbId: bdbIdStr,
|
|
actions: [
|
|
{
|
|
type: "migrate",
|
|
params: {
|
|
cluster_index: clusterIndexStr,
|
|
},
|
|
},
|
|
{
|
|
type: "bind",
|
|
params: {
|
|
cluster_index: clusterIndexStr,
|
|
bdb_id: bdbIdStr,
|
|
},
|
|
},
|
|
],
|
|
},
|
|
});
|
|
}
|
|
|
|
async #request<T>(
|
|
method: string,
|
|
path: string,
|
|
body?: Object | string
|
|
): Promise<T> {
|
|
const url = `${this.baseUrl}${path}`;
|
|
const headers: Record<string, string> = {
|
|
"Content-Type": "application/json",
|
|
};
|
|
|
|
let payload: string | undefined;
|
|
|
|
if (body) {
|
|
if (typeof body === "string") {
|
|
headers["Content-Type"] = "text/plain";
|
|
payload = body;
|
|
} else {
|
|
headers["Content-Type"] = "application/json";
|
|
payload = JSON.stringify(body);
|
|
}
|
|
}
|
|
|
|
const response = await this.#fetch(url, { method, headers, body: payload });
|
|
|
|
if (!response.ok) {
|
|
try {
|
|
const text = await response.text();
|
|
throw new Error(`HTTP ${response.status} - ${text}`);
|
|
} catch {
|
|
throw new Error(`HTTP ${response.status}`);
|
|
}
|
|
}
|
|
|
|
try {
|
|
return (await response.json()) as T;
|
|
} catch {
|
|
throw new Error(
|
|
`HTTP ${response.status} - Unable to parse response as JSON`
|
|
);
|
|
}
|
|
}
|
|
}
|