diff --git a/packages/client/lib/test-utils.ts b/packages/client/lib/test-utils.ts index 809ee788e9..b9b906e943 100644 --- a/packages/client/lib/test-utils.ts +++ b/packages/client/lib/test-utils.ts @@ -93,6 +93,12 @@ export const GLOBAL = { password: 'password' } }, + OPEN_RESP_3: { + serverArguments: [...DEBUG_MODE_ARGS], + clientOptions: { + RESP: 3, + } + }, ASYNC_BASIC_AUTH: { serverArguments: ['--requirepass', 'password', ...DEBUG_MODE_ARGS], clientOptions: { diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index aab1c700f5..64b9abc7f4 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -26,6 +26,7 @@ import { hideBin } from 'yargs/helpers'; import * as fs from 'node:fs'; import * as os from 'node:os'; import * as path from 'node:path'; +import { RedisProxy, getFreePortNumber } from './redis-proxy'; interface TestUtilsConfig { /** @@ -296,7 +297,44 @@ export default class TestUtils { } }); } + testWithProxiedClient( + title: string, + fn: (proxiedClient: RedisClientType, proxy: RedisProxy) => unknown, + options: ClientTestOptions + ) { + this.testWithClient(title, async (client) => { + const freePort = await getFreePortNumber() + const socketOptions = client?.options?.socket; + const proxy = new RedisProxy({ + listenHost: '127.0.0.1', + listenPort: freePort, + //@ts-ignore + targetPort: socketOptions.port, + //@ts-ignore + targetHost: socketOptions.host, + enableLogging: true + }); + + + await proxy.start(); + const proxyClient = client.duplicate({ + socket: { + port: proxy.config.listenPort, + host: proxy.config.listenHost + }, + }); + + await proxyClient.connect(); + + try { + await fn(proxyClient, proxy); + } finally { + await proxyClient.destroy(); + await proxy.stop() + } + }, options); + } testWithClientSentinel< M extends RedisModules = {}, F extends RedisFunctions = {}, diff --git a/packages/test-utils/lib/redis-proxy-spec.ts b/packages/test-utils/lib/redis-proxy-spec.ts new file mode 100644 index 0000000000..89b3b28c35 --- /dev/null +++ b/packages/test-utils/lib/redis-proxy-spec.ts @@ -0,0 +1,111 @@ +import { strict as assert } from 'node:assert'; +import { Buffer } from 'node:buffer'; +import { testUtils, GLOBAL } from './test-utils'; +import { RedisProxy } from './redis-proxy'; +import type { RedisClientType } from '@redis/client/lib/client/index.js'; + +describe('RedisSocketProxy', function () { + testUtils.testWithClient('basic proxy functionality', async (client: RedisClientType) => { + const socketOptions = client?.options?.socket; + //@ts-ignore + assert(socketOptions?.port, 'Test requires a TCP connection to Redis'); + + const proxyPort = 50000 + Math.floor(Math.random() * 10000); + const proxy = new RedisProxy({ + listenHost: '127.0.0.1', + listenPort: proxyPort, + //@ts-ignore + targetPort: socketOptions.port, + //@ts-ignore + targetHost: socketOptions.host || '127.0.0.1', + enableLogging: true + }); + + const proxyEvents = { + connections: [] as any[], + dataTransfers: [] as any[] + }; + + proxy.on('connection', (connectionInfo) => { + proxyEvents.connections.push(connectionInfo); + }); + + proxy.on('data', (connectionId, direction, data) => { + proxyEvents.dataTransfers.push({ connectionId, direction, dataLength: data.length }); + }); + + try { + await proxy.start(); + + const proxyClient = client.duplicate({ + socket: { + port: proxyPort, + host: '127.0.0.1' + }, + }); + + await proxyClient.connect(); + + const stats = proxy.getStats(); + assert.equal(stats.activeConnections, 1, 'Should have one active connection'); + assert.equal(proxyEvents.connections.length, 1, 'Should have recorded one connection event'); + + const pingResult = await proxyClient.ping(); + assert.equal(pingResult, 'PONG', 'Client should be able to communicate with Redis through the proxy'); + + const clientToServerTransfers = proxyEvents.dataTransfers.filter(t => t.direction === 'client->server'); + const serverToClientTransfers = proxyEvents.dataTransfers.filter(t => t.direction === 'server->client'); + + assert(clientToServerTransfers.length > 0, 'Should have client->server data transfers'); + assert(serverToClientTransfers.length > 0, 'Should have server->client data transfers'); + + const testKey = `test:proxy:${Date.now()}`; + const testValue = 'proxy-test-value'; + + await proxyClient.set(testKey, testValue); + const retrievedValue = await proxyClient.get(testKey); + assert.equal(retrievedValue, testValue, 'Should be able to set and get values through proxy'); + + proxyClient.destroy(); + + + } finally { + await proxy.stop(); + } + }, GLOBAL.SERVERS.OPEN_RESP_3); + + testUtils.testWithProxiedClient('custom message injection via proxy client', + async (proxiedClient: RedisClientType, proxy: RedisProxy) => { + const customMessageTransfers: any[] = []; + + proxy.on('data', (connectionId, direction, data) => { + if (direction === 'server->client') { + customMessageTransfers.push({ connectionId, dataLength: data.length, data }); + } + }); + + + const stats = proxy.getStats(); + assert.equal(stats.activeConnections, 1, 'Should have one active connection'); + + // Send a resp3 push + const customMessage = Buffer.from('>4\r\n$6\r\nMOVING\r\n:1\r\n:2\r\n$6\r\nhost:3\r\n'); + + const sendResults = proxy.sendToAllClients(customMessage); + assert.equal(sendResults.length, 1, 'Should send to one client'); + assert.equal(sendResults[0].success, true, 'Custom message send should succeed'); + + + const customMessageFound = customMessageTransfers.find(transfer => + transfer.dataLength === customMessage.length + ); + assert(customMessageFound, 'Should have recorded the custom message transfer'); + + assert.equal(customMessageFound.dataLength, customMessage.length, + 'Custom message length should match'); + + const pingResult = await proxiedClient.ping(); + assert.equal(pingResult, 'PONG', 'Client should be able to communicate with Redis through the proxy'); + + }, GLOBAL.SERVERS.OPEN_RESP_3) +}); diff --git a/packages/test-utils/lib/redis-proxy.ts b/packages/test-utils/lib/redis-proxy.ts new file mode 100644 index 0000000000..217ec528a3 --- /dev/null +++ b/packages/test-utils/lib/redis-proxy.ts @@ -0,0 +1,329 @@ +import * as net from 'net'; +import { EventEmitter } from 'events'; + +interface ProxyConfig { + readonly listenPort: number; + readonly listenHost?: string; + readonly targetHost: string; + readonly targetPort: number; + readonly timeout?: number; + readonly enableLogging?: boolean; +} + +interface ConnectionInfo { + readonly id: string; + readonly clientAddress: string; + readonly clientPort: number; + readonly connectedAt: Date; +} + +interface ActiveConnection extends ConnectionInfo { + readonly clientSocket: net.Socket; + readonly serverSocket: net.Socket; +} + +type SendResult = + | { readonly success: true; readonly connectionId: string } + | { readonly success: false; readonly error: string; readonly connectionId: string }; + +type DataDirection = 'client->server' | 'server->client'; + +interface ProxyStats { + readonly activeConnections: number; + readonly totalConnections: number; + readonly connections: readonly ConnectionInfo[]; +} + +interface ProxyEvents { + /** Emitted when a new client connects */ + 'connection': (connectionInfo: ConnectionInfo) => void; + /** Emitted when a connection is closed */ + 'disconnect': (connectionInfo: ConnectionInfo) => void; + /** Emitted when data is transferred */ + 'data': (connectionId: string, direction: DataDirection, data: Buffer) => void; + /** Emitted when an error occurs */ + 'error': (error: Error, connectionId?: string) => void; + /** Emitted when the proxy server starts */ + 'listening': (host: string, port: number) => void; + /** Emitted when the proxy server stops */ + 'close': () => void; +} + +export class RedisProxy extends EventEmitter { + private readonly server: net.Server; + public readonly config: Required; + private readonly connections: Map; + private isRunning: boolean; + + constructor(config: ProxyConfig) { + super(); + + + this.config = { + listenHost: '127.0.0.1', + timeout: 30000, + enableLogging: false, + ...config + }; + + this.connections = new Map(); + this.isRunning = false; + this.server = this.createServer(); + } + + public async start(): Promise { + return new Promise((resolve, reject) => { + if (this.isRunning) { + reject(new Error('Proxy is already running')); + return; + } + + this.server.listen(this.config.listenPort, this.config.listenHost, () => { + this.isRunning = true; + this.log(`Proxy listening on ${this.config.listenHost}:${this.config.listenPort}`); + this.log(`Forwarding to Redis server at ${this.config.targetHost}:${this.config.targetPort}`); + this.emit('listening', this.config.listenHost, this.config.listenPort); + resolve(); + }); + + this.server.on('error', (error) => { + this.emit('error', error); + reject(error); + }); + }); + } + + public async stop(): Promise { + return new Promise((resolve) => { + if (!this.isRunning) { + resolve(); + return; + } + + Array.from(this.connections.keys()).forEach((connectionId) => { + this.closeConnection(connectionId); + }); + + this.server.close(() => { + this.isRunning = false; + this.log('Proxy server stopped'); + this.emit('close'); + resolve(); + }); + }); + } + + public getStats(): ProxyStats { + const connections = Array.from(this.connections.values()); + + return { + activeConnections: connections.length, + totalConnections: connections.length, + connections: connections.map((conn) => ({ + id: conn.id, + clientAddress: conn.clientAddress, + clientPort: conn.clientPort, + connectedAt: conn.connectedAt, + })) + }; + } + + public closeConnection(connectionId: string): boolean { + const connection = this.connections.get(connectionId); + if (!connection) { + return false; + } + + connection.clientSocket.destroy(); + connection.serverSocket.destroy(); + this.connections.delete(connectionId); + this.emit('disconnect', connection); + return true; + } + + public sendToClient(connectionId: string, data: Buffer): SendResult { + const connection = this.connections.get(connectionId); + if (!connection) { + return { + success: false, + error: 'Connection not found', + connectionId + }; + } + + if (connection.clientSocket.destroyed || !connection.clientSocket.writable) { + return { + success: false, + error: 'Client socket is not writable', + connectionId + }; + } + + try { + connection.clientSocket.write(data); + + this.log(`Sent ${data.length} bytes to client ${connectionId}`); + this.emit('data', connectionId, 'server->client', data); + + return { + success: true, + connectionId + }; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + this.log(`Failed to send data to client ${connectionId}: ${errorMessage}`); + return { + success: false, + error: errorMessage, + connectionId + }; + } + } + + public sendToAllClients(data: Buffer): readonly SendResult[] { + const connectionIds = Array.from(this.connections.keys()); + const results = connectionIds.map((connectionId) => + this.sendToClient(connectionId, data) + ); + + const successCount = results.filter((result) => result.success).length; + const totalCount = results.length; + + this.log(`Sent ${data.length} bytes to ${successCount}/${totalCount} clients`); + + return results; + } + + public sendToClients(connectionIds: readonly string[], data: Buffer): readonly SendResult[] { + const results = connectionIds.map((connectionId) => + this.sendToClient(connectionId, data) + ); + + const successCount = results.filter((result) => result.success).length; + const totalCount = results.length; + + this.log(`Sent ${data.length} bytes to ${successCount}/${totalCount} specified clients`); + + return results; + } + + public getActiveConnectionIds(): readonly string[] { + return Array.from(this.connections.keys()); + } + + private createServer(): net.Server { + return net.createServer((clientSocket) => { + this.handleClientConnection(clientSocket); + }); + } + + private handleClientConnection(clientSocket: net.Socket): void { + const connectionId = this.generateConnectionId(); + const serverSocket = net.createConnection({ + host: this.config.targetHost, + port: this.config.targetPort + }); + + const connectionInfo: ActiveConnection = { + id: connectionId, + clientAddress: clientSocket.remoteAddress || 'unknown', + clientPort: clientSocket.remotePort || 0, + connectedAt: new Date(), + clientSocket, + serverSocket + }; + + this.connections.set(connectionId, connectionInfo); + this.log(`New connection ${connectionId} from ${connectionInfo.clientAddress}:${connectionInfo.clientPort}`); + + clientSocket.setTimeout(this.config.timeout); + + serverSocket.on('connect', () => { + this.log(`Connected to Redis server for connection ${connectionId}`); + this.emit('connection', connectionInfo); + }); + + clientSocket.on('data', (data) => { + this.emit('data', connectionId, 'client->server', data); + serverSocket.write(data); + }); + + serverSocket.on('data', (data) => { + this.emit('data', connectionId, 'server->client', data); + clientSocket.write(data); + }); + + clientSocket.on('close', () => { + this.log(`Client disconnected for connection ${connectionId}`); + serverSocket.destroy(); + this.cleanupConnection(connectionId); + }); + + serverSocket.on('close', () => { + this.log(`Server disconnected for connection ${connectionId}`); + clientSocket.destroy(); + this.cleanupConnection(connectionId); + }); + + clientSocket.on('error', (error) => { + this.log(`Client error for connection ${connectionId}: ${error.message}`); + this.emit('error', error, connectionId); + serverSocket.destroy(); + this.cleanupConnection(connectionId); + }); + + serverSocket.on('error', (error) => { + this.log(`Server error for connection ${connectionId}: ${error.message}`); + this.emit('error', error, connectionId); + clientSocket.destroy(); + this.cleanupConnection(connectionId); + }); + + clientSocket.on('timeout', () => { + this.log(`Connection ${connectionId} timed out`); + clientSocket.destroy(); + serverSocket.destroy(); + this.cleanupConnection(connectionId); + }); + } + + private cleanupConnection(connectionId: string): void { + const connection = this.connections.get(connectionId); + if (connection) { + this.connections.delete(connectionId); + this.emit('disconnect', connection); + } + } + + private generateConnectionId(): string { + return `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + } + + private log(message: string): void { + if (this.config.enableLogging) { + console.log(`[RedisProxy] ${new Date().toISOString()} - ${message}`); + } + } +} +import { createServer } from 'net'; + +export function getFreePortNumber(): Promise { + return new Promise((resolve, reject) => { + const server = createServer(); + + server.listen(0, () => { + const address = server.address(); + server.close(() => { + if (address && typeof address === 'object') { + resolve(address.port); + } + }); + }); + + server.on('error', reject); + }); +} + +export { RedisProxy as RedisTransparentProxy }; +export type { ProxyConfig, ConnectionInfo, ProxyEvents, SendResult, DataDirection, ProxyStats }; + diff --git a/packages/test-utils/lib/test-utils.ts b/packages/test-utils/lib/test-utils.ts new file mode 100644 index 0000000000..fe27bd93d3 --- /dev/null +++ b/packages/test-utils/lib/test-utils.ts @@ -0,0 +1,25 @@ +import TestUtils from './index' + +export const testUtils = TestUtils.createFromConfig({ + dockerImageName: 'redislabs/client-libs-test', + dockerImageVersionArgument: 'redis-version', + defaultDockerVersion: '8.2-M01-pre' +}); + + + +export const DEBUG_MODE_ARGS = testUtils.isVersionGreaterThan([7]) ? + ['--enable-debug-command', 'yes'] : + []; + +export const GLOBAL = { + SERVERS: { + + OPEN_RESP_3: { + serverArguments: [...DEBUG_MODE_ARGS], + clientOptions: { + RESP: 3, + } + }, + } +}