You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-12-11 09:22:35 +03:00
fix(socket): prevent false-ready state when socket errors during handshake (#3128)
* fix(socket): prevent false-ready state when socket errors during handshake Fixes race condition where async socket errors during connection handshake don't trigger reconnection. Validates socket state after initiator completes to catch errors swallowed by command handlers. fixes: #3108 * remove comments
This commit is contained in:
committed by
GitHub
parent
b1c39fe02f
commit
f8841c880e
@@ -4,12 +4,16 @@ import RedisClient, { RedisClientOptions, RedisClientType } from '.';
|
||||
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, MultiErrorReply, TimeoutError, WatchError } from '../errors';
|
||||
import { defineScript } from '../lua-script';
|
||||
import { spy, stub } from 'sinon';
|
||||
import { once } from 'node:events';
|
||||
import EventEmitter, { once } from 'node:events';
|
||||
import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec';
|
||||
import { RESP_TYPES } from '../RESP/decoder';
|
||||
import { BlobStringReply, NumberReply } from '../RESP/types';
|
||||
import { SortedSetMember } from '../commands/generic-transformers';
|
||||
import { CommandParser } from './parser';
|
||||
import { RedisSocketOptions } from './socket';
|
||||
import { getFreePortNumber } from '@redis/test-utils/lib/proxy/redis-proxy';
|
||||
import { createClient } from '../../';
|
||||
import net from 'node:net'
|
||||
|
||||
export const SQUARE_SCRIPT = defineScript({
|
||||
SCRIPT:
|
||||
@@ -1008,6 +1012,89 @@ describe('Client', () => {
|
||||
}
|
||||
}, GLOBAL.SERVERS.OPEN);
|
||||
});
|
||||
|
||||
describe("socket errors during handshake", () => {
|
||||
|
||||
it("should successfully connect when server accepts connection immediately", async () => {
|
||||
const { log, client, teardown } = await setup({}, 0);
|
||||
await client.connect();
|
||||
assert.deepEqual(["connect", "ready"], log);
|
||||
teardown();
|
||||
});
|
||||
|
||||
it("should reconnect after multiple connection drops during handshake", async () => {
|
||||
const { log, client, teardown } = await setup({}, 2);
|
||||
await client.connect();
|
||||
assert.deepEqual(
|
||||
[
|
||||
"connect",
|
||||
"error",
|
||||
"reconnecting",
|
||||
"connect",
|
||||
"error",
|
||||
"reconnecting",
|
||||
"connect",
|
||||
"ready",
|
||||
],
|
||||
log,
|
||||
);
|
||||
teardown();
|
||||
});
|
||||
|
||||
async function setup(
|
||||
socketOptions: Partial<RedisSocketOptions>,
|
||||
dropCount: number,
|
||||
) {
|
||||
const port = await getFreePortNumber();
|
||||
const server = setupMockServer(dropCount);
|
||||
const options = {
|
||||
...{
|
||||
socket: {
|
||||
host: "localhost",
|
||||
port,
|
||||
},
|
||||
...socketOptions,
|
||||
},
|
||||
};
|
||||
const client = createClient(options);
|
||||
const log = setupLog(client);
|
||||
await once(server.listen(port), "listening");
|
||||
return {
|
||||
log,
|
||||
client,
|
||||
server,
|
||||
teardown: async function () {
|
||||
client.destroy();
|
||||
server.close();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function setupLog(client: EventEmitter): string[] {
|
||||
const log: string[] = [];
|
||||
client.on("connect", () => log.push("connect"));
|
||||
client.on("ready", () => log.push("ready"));
|
||||
client.on("reconnecting", () => log.push("reconnecting"));
|
||||
client.on("error", () => log.push("error"));
|
||||
return log;
|
||||
}
|
||||
|
||||
// Create a TCP server that accepts connections but immediately drops them <dropImmediately> times
|
||||
// This simulates what happens when Docker container is stopped:
|
||||
// - TCP connection succeeds (OS accepts it)
|
||||
// - But socket is immediately destroyed, causing ECONNRESET during handshake
|
||||
function setupMockServer(dropImmediately: number) {
|
||||
const server = net.createServer(async (socket) => {
|
||||
if (dropImmediately > 0) {
|
||||
dropImmediately--;
|
||||
socket.destroy();
|
||||
}
|
||||
socket.write("+OK\r\n+OK\r\n");
|
||||
});
|
||||
return server;
|
||||
}
|
||||
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
|
||||
@@ -220,6 +220,15 @@ export default class RedisSocket extends EventEmitter {
|
||||
|
||||
try {
|
||||
await this.#initiator();
|
||||
|
||||
// Check if socket was closed/destroyed during initiator execution
|
||||
if (!this.#socket || this.#socket.destroyed || !this.#socket.readable || !this.#socket.writable) {
|
||||
const retryIn = this.#shouldReconnect(retries++, new SocketClosedUnexpectedlyError());
|
||||
if (typeof retryIn !== 'number') { throw retryIn; }
|
||||
await setTimeout(retryIn);
|
||||
this.emit('reconnecting');
|
||||
continue;
|
||||
}
|
||||
} catch (err) {
|
||||
this.#socket.destroy();
|
||||
this.#socket = undefined;
|
||||
@@ -312,6 +321,7 @@ export default class RedisSocket extends EventEmitter {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
write(iterable: Iterable<ReadonlyArray<RedisArgument>>) {
|
||||
if (!this.#socket) return;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user