1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-04 15:02:09 +03:00

fix isolationPool after reconnect (#2409)

* fix #2406 - fix isolationPool after reconnect

* revert breaking change

* fix
This commit is contained in:
Leibale Eidelman
2023-04-26 14:44:09 -04:00
committed by GitHub
parent 986a510237
commit dc920d3b67
2 changed files with 59 additions and 20 deletions

View File

@@ -607,11 +607,41 @@ describe('Client', () => {
}
});
testUtils.testWithClient('executeIsolated', async client => {
const id = await client.clientId(),
isolatedId = await client.executeIsolated(isolatedClient => isolatedClient.clientId());
assert.ok(id !== isolatedId);
}, GLOBAL.SERVERS.OPEN);
describe('isolationPool', () => {
testUtils.testWithClient('executeIsolated', async client => {
const id = await client.clientId(),
isolatedId = await client.executeIsolated(isolatedClient => isolatedClient.clientId());
assert.ok(id !== isolatedId);
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('should be able to use pool even before connect', async client => {
await client.executeIsolated(() => Promise.resolve());
// make sure to destroy isolation pool
await client.connect();
await client.disconnect();
}, {
...GLOBAL.SERVERS.OPEN,
disableClientSetup: true
});
testUtils.testWithClient('should work after reconnect (#2406)', async client => {
await client.disconnect();
await client.connect();
await client.executeIsolated(() => Promise.resolve());
}, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('should throw ClientClosedError after disconnect', async client => {
await client.connect();
await client.disconnect();
await assert.rejects(
client.executeIsolated(() => Promise.resolve()),
ClientClosedError
);
}, {
...GLOBAL.SERVERS.OPEN,
disableClientSetup: true
});
});
async function killClient<
M extends RedisModules,
@@ -731,7 +761,7 @@ describe('Client', () => {
members.map<MemberTuple>(member => [member.value, member.score]).sort(sort)
);
}, GLOBAL.SERVERS.OPEN);
describe('PubSub', () => {
testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => {
function assertStringListener(message: string, channel: string) {

View File

@@ -15,7 +15,6 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '.
import { URL } from 'url';
import { TcpSocketConnectOpts } from 'net';
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { callbackify } from 'util';
export interface RedisClientOptions<
M extends RedisModules = RedisModules,
@@ -190,7 +189,7 @@ export default class RedisClient<
readonly #options?: RedisClientOptions<M, F, S>;
readonly #socket: RedisSocket;
readonly #queue: RedisCommandsQueue;
readonly #isolationPool: Pool<RedisClientType<M, F, S>>;
#isolationPool?: Pool<RedisClientType<M, F, S>>;
readonly #v4: Record<string, any> = {};
#selectedDB = 0;
@@ -223,16 +222,9 @@ export default class RedisClient<
this.#options = this.#initiateOptions(options);
this.#queue = this.#initiateQueue();
this.#socket = this.#initiateSocket();
this.#isolationPool = createPool({
create: async () => {
const duplicate = this.duplicate({
isolationPoolOptions: undefined
}).on('error', err => this.emit('error', err));
await duplicate.connect();
return duplicate;
},
destroy: client => client.disconnect()
}, options?.isolationPoolOptions);
// should be initiated in connect, not here
// TODO: consider breaking in v5
this.#isolationPool = this.#initiateIsolationPool();
this.#legacyMode();
}
@@ -337,6 +329,19 @@ export default class RedisClient<
.on('end', () => this.emit('end'));
}
#initiateIsolationPool() {
return createPool({
create: async () => {
const duplicate = this.duplicate({
isolationPoolOptions: undefined
}).on('error', err => this.emit('error', err));
await duplicate.connect();
return duplicate;
},
destroy: client => client.disconnect()
}, this.#options?.isolationPoolOptions);
}
#legacyMode(): void {
if (!this.#options?.legacyMode) return;
@@ -422,6 +427,8 @@ export default class RedisClient<
}
connect(): Promise<void> {
// see comment in constructor
this.#isolationPool ??= this.#initiateIsolationPool();
return this.#socket.connect();
}
@@ -704,6 +711,7 @@ export default class RedisClient<
}
executeIsolated<T>(fn: (client: RedisClientType<M, F, S>) => T | Promise<T>): Promise<T> {
if (!this.#isolationPool) return Promise.reject(new ClientClosedError());
return this.#isolationPool.use(fn);
}
@@ -802,8 +810,9 @@ export default class RedisClient<
}
async #destroyIsolationPool(): Promise<void> {
await this.#isolationPool.drain();
await this.#isolationPool.clear();
await this.#isolationPool!.drain();
await this.#isolationPool!.clear();
this.#isolationPool = undefined;
}
ref(): void {