You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Add Redis transparent proxy test utilities (#3019)
This commit is contained in:
@@ -93,6 +93,12 @@ export const GLOBAL = {
|
||||
password: 'password'
|
||||
}
|
||||
},
|
||||
OPEN_RESP_3: {
|
||||
serverArguments: [...DEBUG_MODE_ARGS],
|
||||
clientOptions: {
|
||||
RESP: 3,
|
||||
}
|
||||
},
|
||||
ASYNC_BASIC_AUTH: {
|
||||
serverArguments: ['--requirepass', 'password', ...DEBUG_MODE_ARGS],
|
||||
clientOptions: {
|
||||
|
@@ -26,6 +26,7 @@ import { hideBin } from 'yargs/helpers';
|
||||
import * as fs from 'node:fs';
|
||||
import * as os from 'node:os';
|
||||
import * as path from 'node:path';
|
||||
import { RedisProxy, getFreePortNumber } from './redis-proxy';
|
||||
|
||||
interface TestUtilsConfig {
|
||||
/**
|
||||
@@ -296,7 +297,44 @@ export default class TestUtils {
|
||||
}
|
||||
});
|
||||
}
|
||||
testWithProxiedClient(
|
||||
title: string,
|
||||
fn: (proxiedClient: RedisClientType<any, any, any, any, any>, proxy: RedisProxy) => unknown,
|
||||
options: ClientTestOptions<any, any, any, any, any>
|
||||
) {
|
||||
|
||||
this.testWithClient(title, async (client) => {
|
||||
const freePort = await getFreePortNumber()
|
||||
const socketOptions = client?.options?.socket;
|
||||
const proxy = new RedisProxy({
|
||||
listenHost: '127.0.0.1',
|
||||
listenPort: freePort,
|
||||
//@ts-ignore
|
||||
targetPort: socketOptions.port,
|
||||
//@ts-ignore
|
||||
targetHost: socketOptions.host,
|
||||
enableLogging: true
|
||||
});
|
||||
|
||||
|
||||
await proxy.start();
|
||||
const proxyClient = client.duplicate({
|
||||
socket: {
|
||||
port: proxy.config.listenPort,
|
||||
host: proxy.config.listenHost
|
||||
},
|
||||
});
|
||||
|
||||
await proxyClient.connect();
|
||||
|
||||
try {
|
||||
await fn(proxyClient, proxy);
|
||||
} finally {
|
||||
await proxyClient.destroy();
|
||||
await proxy.stop()
|
||||
}
|
||||
}, options);
|
||||
}
|
||||
testWithClientSentinel<
|
||||
M extends RedisModules = {},
|
||||
F extends RedisFunctions = {},
|
||||
|
111
packages/test-utils/lib/redis-proxy-spec.ts
Normal file
111
packages/test-utils/lib/redis-proxy-spec.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
import { strict as assert } from 'node:assert';
|
||||
import { Buffer } from 'node:buffer';
|
||||
import { testUtils, GLOBAL } from './test-utils';
|
||||
import { RedisProxy } from './redis-proxy';
|
||||
import type { RedisClientType } from '@redis/client/lib/client/index.js';
|
||||
|
||||
describe('RedisSocketProxy', function () {
|
||||
testUtils.testWithClient('basic proxy functionality', async (client: RedisClientType<any, any, any, any, any>) => {
|
||||
const socketOptions = client?.options?.socket;
|
||||
//@ts-ignore
|
||||
assert(socketOptions?.port, 'Test requires a TCP connection to Redis');
|
||||
|
||||
const proxyPort = 50000 + Math.floor(Math.random() * 10000);
|
||||
const proxy = new RedisProxy({
|
||||
listenHost: '127.0.0.1',
|
||||
listenPort: proxyPort,
|
||||
//@ts-ignore
|
||||
targetPort: socketOptions.port,
|
||||
//@ts-ignore
|
||||
targetHost: socketOptions.host || '127.0.0.1',
|
||||
enableLogging: true
|
||||
});
|
||||
|
||||
const proxyEvents = {
|
||||
connections: [] as any[],
|
||||
dataTransfers: [] as any[]
|
||||
};
|
||||
|
||||
proxy.on('connection', (connectionInfo) => {
|
||||
proxyEvents.connections.push(connectionInfo);
|
||||
});
|
||||
|
||||
proxy.on('data', (connectionId, direction, data) => {
|
||||
proxyEvents.dataTransfers.push({ connectionId, direction, dataLength: data.length });
|
||||
});
|
||||
|
||||
try {
|
||||
await proxy.start();
|
||||
|
||||
const proxyClient = client.duplicate({
|
||||
socket: {
|
||||
port: proxyPort,
|
||||
host: '127.0.0.1'
|
||||
},
|
||||
});
|
||||
|
||||
await proxyClient.connect();
|
||||
|
||||
const stats = proxy.getStats();
|
||||
assert.equal(stats.activeConnections, 1, 'Should have one active connection');
|
||||
assert.equal(proxyEvents.connections.length, 1, 'Should have recorded one connection event');
|
||||
|
||||
const pingResult = await proxyClient.ping();
|
||||
assert.equal(pingResult, 'PONG', 'Client should be able to communicate with Redis through the proxy');
|
||||
|
||||
const clientToServerTransfers = proxyEvents.dataTransfers.filter(t => t.direction === 'client->server');
|
||||
const serverToClientTransfers = proxyEvents.dataTransfers.filter(t => t.direction === 'server->client');
|
||||
|
||||
assert(clientToServerTransfers.length > 0, 'Should have client->server data transfers');
|
||||
assert(serverToClientTransfers.length > 0, 'Should have server->client data transfers');
|
||||
|
||||
const testKey = `test:proxy:${Date.now()}`;
|
||||
const testValue = 'proxy-test-value';
|
||||
|
||||
await proxyClient.set(testKey, testValue);
|
||||
const retrievedValue = await proxyClient.get(testKey);
|
||||
assert.equal(retrievedValue, testValue, 'Should be able to set and get values through proxy');
|
||||
|
||||
proxyClient.destroy();
|
||||
|
||||
|
||||
} finally {
|
||||
await proxy.stop();
|
||||
}
|
||||
}, GLOBAL.SERVERS.OPEN_RESP_3);
|
||||
|
||||
testUtils.testWithProxiedClient('custom message injection via proxy client',
|
||||
async (proxiedClient: RedisClientType<any, any, any, any, any>, proxy: RedisProxy) => {
|
||||
const customMessageTransfers: any[] = [];
|
||||
|
||||
proxy.on('data', (connectionId, direction, data) => {
|
||||
if (direction === 'server->client') {
|
||||
customMessageTransfers.push({ connectionId, dataLength: data.length, data });
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
const stats = proxy.getStats();
|
||||
assert.equal(stats.activeConnections, 1, 'Should have one active connection');
|
||||
|
||||
// Send a resp3 push
|
||||
const customMessage = Buffer.from('>4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n');
|
||||
|
||||
const sendResults = proxy.sendToAllClients(customMessage);
|
||||
assert.equal(sendResults.length, 1, 'Should send to one client');
|
||||
assert.equal(sendResults[0].success, true, 'Custom message send should succeed');
|
||||
|
||||
|
||||
const customMessageFound = customMessageTransfers.find(transfer =>
|
||||
transfer.dataLength === customMessage.length
|
||||
);
|
||||
assert(customMessageFound, 'Should have recorded the custom message transfer');
|
||||
|
||||
assert.equal(customMessageFound.dataLength, customMessage.length,
|
||||
'Custom message length should match');
|
||||
|
||||
const pingResult = await proxiedClient.ping();
|
||||
assert.equal(pingResult, 'PONG', 'Client should be able to communicate with Redis through the proxy');
|
||||
|
||||
}, GLOBAL.SERVERS.OPEN_RESP_3)
|
||||
});
|
329
packages/test-utils/lib/redis-proxy.ts
Normal file
329
packages/test-utils/lib/redis-proxy.ts
Normal file
@@ -0,0 +1,329 @@
|
||||
import * as net from 'net';
|
||||
import { EventEmitter } from 'events';
|
||||
|
||||
interface ProxyConfig {
|
||||
readonly listenPort: number;
|
||||
readonly listenHost?: string;
|
||||
readonly targetHost: string;
|
||||
readonly targetPort: number;
|
||||
readonly timeout?: number;
|
||||
readonly enableLogging?: boolean;
|
||||
}
|
||||
|
||||
interface ConnectionInfo {
|
||||
readonly id: string;
|
||||
readonly clientAddress: string;
|
||||
readonly clientPort: number;
|
||||
readonly connectedAt: Date;
|
||||
}
|
||||
|
||||
interface ActiveConnection extends ConnectionInfo {
|
||||
readonly clientSocket: net.Socket;
|
||||
readonly serverSocket: net.Socket;
|
||||
}
|
||||
|
||||
type SendResult =
|
||||
| { readonly success: true; readonly connectionId: string }
|
||||
| { readonly success: false; readonly error: string; readonly connectionId: string };
|
||||
|
||||
type DataDirection = 'client->server' | 'server->client';
|
||||
|
||||
interface ProxyStats {
|
||||
readonly activeConnections: number;
|
||||
readonly totalConnections: number;
|
||||
readonly connections: readonly ConnectionInfo[];
|
||||
}
|
||||
|
||||
interface ProxyEvents {
|
||||
/** Emitted when a new client connects */
|
||||
'connection': (connectionInfo: ConnectionInfo) => void;
|
||||
/** Emitted when a connection is closed */
|
||||
'disconnect': (connectionInfo: ConnectionInfo) => void;
|
||||
/** Emitted when data is transferred */
|
||||
'data': (connectionId: string, direction: DataDirection, data: Buffer) => void;
|
||||
/** Emitted when an error occurs */
|
||||
'error': (error: Error, connectionId?: string) => void;
|
||||
/** Emitted when the proxy server starts */
|
||||
'listening': (host: string, port: number) => void;
|
||||
/** Emitted when the proxy server stops */
|
||||
'close': () => void;
|
||||
}
|
||||
|
||||
export class RedisProxy extends EventEmitter {
|
||||
private readonly server: net.Server;
|
||||
public readonly config: Required<ProxyConfig>;
|
||||
private readonly connections: Map<string, ActiveConnection>;
|
||||
private isRunning: boolean;
|
||||
|
||||
constructor(config: ProxyConfig) {
|
||||
super();
|
||||
|
||||
|
||||
this.config = {
|
||||
listenHost: '127.0.0.1',
|
||||
timeout: 30000,
|
||||
enableLogging: false,
|
||||
...config
|
||||
};
|
||||
|
||||
this.connections = new Map();
|
||||
this.isRunning = false;
|
||||
this.server = this.createServer();
|
||||
}
|
||||
|
||||
public async start(): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (this.isRunning) {
|
||||
reject(new Error('Proxy is already running'));
|
||||
return;
|
||||
}
|
||||
|
||||
this.server.listen(this.config.listenPort, this.config.listenHost, () => {
|
||||
this.isRunning = true;
|
||||
this.log(`Proxy listening on ${this.config.listenHost}:${this.config.listenPort}`);
|
||||
this.log(`Forwarding to Redis server at ${this.config.targetHost}:${this.config.targetPort}`);
|
||||
this.emit('listening', this.config.listenHost, this.config.listenPort);
|
||||
resolve();
|
||||
});
|
||||
|
||||
this.server.on('error', (error) => {
|
||||
this.emit('error', error);
|
||||
reject(error);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
if (!this.isRunning) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
Array.from(this.connections.keys()).forEach((connectionId) => {
|
||||
this.closeConnection(connectionId);
|
||||
});
|
||||
|
||||
this.server.close(() => {
|
||||
this.isRunning = false;
|
||||
this.log('Proxy server stopped');
|
||||
this.emit('close');
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public getStats(): ProxyStats {
|
||||
const connections = Array.from(this.connections.values());
|
||||
|
||||
return {
|
||||
activeConnections: connections.length,
|
||||
totalConnections: connections.length,
|
||||
connections: connections.map((conn) => ({
|
||||
id: conn.id,
|
||||
clientAddress: conn.clientAddress,
|
||||
clientPort: conn.clientPort,
|
||||
connectedAt: conn.connectedAt,
|
||||
}))
|
||||
};
|
||||
}
|
||||
|
||||
public closeConnection(connectionId: string): boolean {
|
||||
const connection = this.connections.get(connectionId);
|
||||
if (!connection) {
|
||||
return false;
|
||||
}
|
||||
|
||||
connection.clientSocket.destroy();
|
||||
connection.serverSocket.destroy();
|
||||
this.connections.delete(connectionId);
|
||||
this.emit('disconnect', connection);
|
||||
return true;
|
||||
}
|
||||
|
||||
public sendToClient(connectionId: string, data: Buffer): SendResult {
|
||||
const connection = this.connections.get(connectionId);
|
||||
if (!connection) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'Connection not found',
|
||||
connectionId
|
||||
};
|
||||
}
|
||||
|
||||
if (connection.clientSocket.destroyed || !connection.clientSocket.writable) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'Client socket is not writable',
|
||||
connectionId
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
connection.clientSocket.write(data);
|
||||
|
||||
this.log(`Sent ${data.length} bytes to client ${connectionId}`);
|
||||
this.emit('data', connectionId, 'server->client', data);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
connectionId
|
||||
};
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
this.log(`Failed to send data to client ${connectionId}: ${errorMessage}`);
|
||||
return {
|
||||
success: false,
|
||||
error: errorMessage,
|
||||
connectionId
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public sendToAllClients(data: Buffer): readonly SendResult[] {
|
||||
const connectionIds = Array.from(this.connections.keys());
|
||||
const results = connectionIds.map((connectionId) =>
|
||||
this.sendToClient(connectionId, data)
|
||||
);
|
||||
|
||||
const successCount = results.filter((result) => result.success).length;
|
||||
const totalCount = results.length;
|
||||
|
||||
this.log(`Sent ${data.length} bytes to ${successCount}/${totalCount} clients`);
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
public sendToClients(connectionIds: readonly string[], data: Buffer): readonly SendResult[] {
|
||||
const results = connectionIds.map((connectionId) =>
|
||||
this.sendToClient(connectionId, data)
|
||||
);
|
||||
|
||||
const successCount = results.filter((result) => result.success).length;
|
||||
const totalCount = results.length;
|
||||
|
||||
this.log(`Sent ${data.length} bytes to ${successCount}/${totalCount} specified clients`);
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
public getActiveConnectionIds(): readonly string[] {
|
||||
return Array.from(this.connections.keys());
|
||||
}
|
||||
|
||||
private createServer(): net.Server {
|
||||
return net.createServer((clientSocket) => {
|
||||
this.handleClientConnection(clientSocket);
|
||||
});
|
||||
}
|
||||
|
||||
private handleClientConnection(clientSocket: net.Socket): void {
|
||||
const connectionId = this.generateConnectionId();
|
||||
const serverSocket = net.createConnection({
|
||||
host: this.config.targetHost,
|
||||
port: this.config.targetPort
|
||||
});
|
||||
|
||||
const connectionInfo: ActiveConnection = {
|
||||
id: connectionId,
|
||||
clientAddress: clientSocket.remoteAddress || 'unknown',
|
||||
clientPort: clientSocket.remotePort || 0,
|
||||
connectedAt: new Date(),
|
||||
clientSocket,
|
||||
serverSocket
|
||||
};
|
||||
|
||||
this.connections.set(connectionId, connectionInfo);
|
||||
this.log(`New connection ${connectionId} from ${connectionInfo.clientAddress}:${connectionInfo.clientPort}`);
|
||||
|
||||
clientSocket.setTimeout(this.config.timeout);
|
||||
|
||||
serverSocket.on('connect', () => {
|
||||
this.log(`Connected to Redis server for connection ${connectionId}`);
|
||||
this.emit('connection', connectionInfo);
|
||||
});
|
||||
|
||||
clientSocket.on('data', (data) => {
|
||||
this.emit('data', connectionId, 'client->server', data);
|
||||
serverSocket.write(data);
|
||||
});
|
||||
|
||||
serverSocket.on('data', (data) => {
|
||||
this.emit('data', connectionId, 'server->client', data);
|
||||
clientSocket.write(data);
|
||||
});
|
||||
|
||||
clientSocket.on('close', () => {
|
||||
this.log(`Client disconnected for connection ${connectionId}`);
|
||||
serverSocket.destroy();
|
||||
this.cleanupConnection(connectionId);
|
||||
});
|
||||
|
||||
serverSocket.on('close', () => {
|
||||
this.log(`Server disconnected for connection ${connectionId}`);
|
||||
clientSocket.destroy();
|
||||
this.cleanupConnection(connectionId);
|
||||
});
|
||||
|
||||
clientSocket.on('error', (error) => {
|
||||
this.log(`Client error for connection ${connectionId}: ${error.message}`);
|
||||
this.emit('error', error, connectionId);
|
||||
serverSocket.destroy();
|
||||
this.cleanupConnection(connectionId);
|
||||
});
|
||||
|
||||
serverSocket.on('error', (error) => {
|
||||
this.log(`Server error for connection ${connectionId}: ${error.message}`);
|
||||
this.emit('error', error, connectionId);
|
||||
clientSocket.destroy();
|
||||
this.cleanupConnection(connectionId);
|
||||
});
|
||||
|
||||
clientSocket.on('timeout', () => {
|
||||
this.log(`Connection ${connectionId} timed out`);
|
||||
clientSocket.destroy();
|
||||
serverSocket.destroy();
|
||||
this.cleanupConnection(connectionId);
|
||||
});
|
||||
}
|
||||
|
||||
private cleanupConnection(connectionId: string): void {
|
||||
const connection = this.connections.get(connectionId);
|
||||
if (connection) {
|
||||
this.connections.delete(connectionId);
|
||||
this.emit('disconnect', connection);
|
||||
}
|
||||
}
|
||||
|
||||
private generateConnectionId(): string {
|
||||
return `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
||||
}
|
||||
|
||||
private log(message: string): void {
|
||||
if (this.config.enableLogging) {
|
||||
console.log(`[RedisProxy] ${new Date().toISOString()} - ${message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
import { createServer } from 'net';
|
||||
|
||||
export function getFreePortNumber(): Promise<number> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const server = createServer();
|
||||
|
||||
server.listen(0, () => {
|
||||
const address = server.address();
|
||||
server.close(() => {
|
||||
if (address && typeof address === 'object') {
|
||||
resolve(address.port);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
server.on('error', reject);
|
||||
});
|
||||
}
|
||||
|
||||
export { RedisProxy as RedisTransparentProxy };
|
||||
export type { ProxyConfig, ConnectionInfo, ProxyEvents, SendResult, DataDirection, ProxyStats };
|
||||
|
25
packages/test-utils/lib/test-utils.ts
Normal file
25
packages/test-utils/lib/test-utils.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
import TestUtils from './index'
|
||||
|
||||
export const testUtils = TestUtils.createFromConfig({
|
||||
dockerImageName: 'redislabs/client-libs-test',
|
||||
dockerImageVersionArgument: 'redis-version',
|
||||
defaultDockerVersion: '8.2-M01-pre'
|
||||
});
|
||||
|
||||
|
||||
|
||||
export const DEBUG_MODE_ARGS = testUtils.isVersionGreaterThan([7]) ?
|
||||
['--enable-debug-command', 'yes'] :
|
||||
[];
|
||||
|
||||
export const GLOBAL = {
|
||||
SERVERS: {
|
||||
|
||||
OPEN_RESP_3: {
|
||||
serverArguments: [...DEBUG_MODE_ARGS],
|
||||
clientOptions: {
|
||||
RESP: 3,
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user