1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

fix client pubsub and uncomment tests

This commit is contained in:
Leibale
2023-12-05 12:05:12 -05:00
parent 0b3d5fa9e0
commit b99502b874
3 changed files with 190 additions and 191 deletions

View File

@@ -211,9 +211,7 @@ export default class RedisCommandsQueue {
signal.addEventListener('abort', value.abort.listener, { once: true });
}
node = options?.asap ?
this._toWrite.unshift(value) :
this._toWrite.push(value);
node = this._toWrite.add(value, options?.asap);
});
}
@@ -272,7 +270,7 @@ export default class RedisCommandsQueue {
if (command === undefined) return;
return new Promise<void>((resolve, reject) => {
(asap ? this._toWrite.unshift : this._toWrite.push)({
this._toWrite.add({
args: command.args,
chainId: undefined,
abort: undefined,
@@ -287,7 +285,7 @@ export default class RedisCommandsQueue {
},
channelsCounter: command.channelsCounter,
typeMapping: PUSH_TYPE_MAPPING
});
}, asap);
});
}

View File

@@ -525,222 +525,217 @@ describe('Client', () => {
assert.deepEqual(map, results);
}, GLOBAL.SERVERS.OPEN);
// describe('PubSub', () => {
// testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => {
// function assertStringListener(message: string, channel: string) {
// assert.equal(typeof message, 'string');
// assert.equal(typeof channel, 'string');
// }
describe('PubSub', () => {
testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => {
function assertStringListener(message: string, channel: string) {
assert.equal(typeof message, 'string');
assert.equal(typeof channel, 'string');
}
// function assertBufferListener(message: Buffer, channel: Buffer) {
// assert.ok(message instanceof Buffer);
// assert.ok(channel instanceof Buffer);
// }
function assertBufferListener(message: Buffer, channel: Buffer) {
assert.ok(message instanceof Buffer);
assert.ok(channel instanceof Buffer);
}
// const subscriber = publisher.duplicate();
const subscriber = await publisher.duplicate().connect();
// await subscriber.connect();
try {
const channelListener1 = spy(assertBufferListener),
channelListener2 = spy(assertStringListener),
patternListener = spy(assertStringListener);
// try {
// const channelListener1 = spy(assertBufferListener),
// channelListener2 = spy(assertStringListener),
// patternListener = spy(assertStringListener);
await Promise.all([
subscriber.subscribe('channel', channelListener1, true),
subscriber.subscribe('channel', channelListener2),
subscriber.pSubscribe('channel*', patternListener)
]);
await Promise.all([
waitTillBeenCalled(channelListener1),
waitTillBeenCalled(channelListener2),
waitTillBeenCalled(patternListener),
publisher.publish(Buffer.from('channel'), Buffer.from('message'))
]);
assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel')));
assert.ok(channelListener2.calledOnceWithExactly('message', 'channel'));
assert.ok(patternListener.calledOnceWithExactly('message', 'channel'));
// await Promise.all([
// subscriber.subscribe('channel', channelListener1, true),
// subscriber.subscribe('channel', channelListener2),
// subscriber.pSubscribe('channel*', patternListener)
// ]);
// await Promise.all([
// waitTillBeenCalled(channelListener1),
// waitTillBeenCalled(channelListener2),
// waitTillBeenCalled(patternListener),
// publisher.publish(Buffer.from('channel'), Buffer.from('message'))
// ]);
await subscriber.unsubscribe('channel', channelListener1, true);
await Promise.all([
waitTillBeenCalled(channelListener2),
waitTillBeenCalled(patternListener),
publisher.publish('channel', 'message')
]);
assert.ok(channelListener1.calledOnce);
assert.ok(channelListener2.calledTwice);
assert.ok(channelListener2.secondCall.calledWithExactly('message', 'channel'));
assert.ok(patternListener.calledTwice);
assert.ok(patternListener.secondCall.calledWithExactly('message', 'channel'));
await subscriber.unsubscribe('channel');
await Promise.all([
waitTillBeenCalled(patternListener),
publisher.publish('channel', 'message')
]);
assert.ok(channelListener1.calledOnce);
assert.ok(channelListener2.calledTwice);
assert.ok(patternListener.calledThrice);
assert.ok(patternListener.thirdCall.calledWithExactly('message', 'channel'));
// assert.ok(channelListener1.calledOnceWithExactly(Buffer.from('message'), Buffer.from('channel')));
// assert.ok(channelListener2.calledOnceWithExactly('message', 'channel'));
// assert.ok(patternListener.calledOnceWithExactly('message', 'channel'));
await subscriber.pUnsubscribe();
await publisher.publish('channel', 'message');
assert.ok(channelListener1.calledOnce);
assert.ok(channelListener2.calledTwice);
assert.ok(patternListener.calledThrice);
// await subscriber.unsubscribe('channel', channelListener1, true);
// await Promise.all([
// waitTillBeenCalled(channelListener2),
// waitTillBeenCalled(patternListener),
// publisher.publish('channel', 'message')
// ]);
// assert.ok(channelListener1.calledOnce);
// assert.ok(channelListener2.calledTwice);
// assert.ok(channelListener2.secondCall.calledWithExactly('message', 'channel'));
// assert.ok(patternListener.calledTwice);
// assert.ok(patternListener.secondCall.calledWithExactly('message', 'channel'));
// await subscriber.unsubscribe('channel');
// await Promise.all([
// waitTillBeenCalled(patternListener),
// publisher.publish('channel', 'message')
// ]);
// assert.ok(channelListener1.calledOnce);
// assert.ok(channelListener2.calledTwice);
// assert.ok(patternListener.calledThrice);
// assert.ok(patternListener.thirdCall.calledWithExactly('message', 'channel'));
// await subscriber.pUnsubscribe();
// await publisher.publish('channel', 'message');
// assert.ok(channelListener1.calledOnce);
// assert.ok(channelListener2.calledTwice);
// assert.ok(patternListener.calledThrice);
// // should be able to send commands when unsubsribed from all channels (see #1652)
// await assert.doesNotReject(subscriber.ping());
// } finally {
// await subscriber.disconnect();
// }
// }, GLOBAL.SERVERS.OPEN);
// should be able to send commands when unsubsribed from all channels (see #1652)
await assert.doesNotReject(subscriber.ping());
} finally {
subscriber.destroy();
}
}, GLOBAL.SERVERS.OPEN);
// testUtils.testWithClient('should resubscribe', async publisher => {
// const subscriber = publisher.duplicate();
testUtils.testWithClient('should resubscribe', async publisher => {
const subscriber = await publisher.duplicate().connect();
// await subscriber.connect();
try {
const channelListener = spy();
await subscriber.subscribe('channel', channelListener);
// try {
// const channelListener = spy();
// await subscriber.subscribe('channel', channelListener);
const patternListener = spy();
await subscriber.pSubscribe('channe*', patternListener);
// const patternListener = spy();
// await subscriber.pSubscribe('channe*', patternListener);
await Promise.all([
once(subscriber, 'error'),
publisher.clientKill({
filter: 'SKIPME',
skipMe: true
})
]);
// await Promise.all([
// once(subscriber, 'error'),
// publisher.clientKill({
// filter: ClientKillFilters.SKIP_ME,
// skipMe: true
// })
// ]);
await once(subscriber, 'ready');
// await once(subscriber, 'ready');
await Promise.all([
waitTillBeenCalled(channelListener),
waitTillBeenCalled(patternListener),
publisher.publish('channel', 'message')
]);
} finally {
subscriber.destroy();
}
}, GLOBAL.SERVERS.OPEN);
// await Promise.all([
// waitTillBeenCalled(channelListener),
// waitTillBeenCalled(patternListener),
// publisher.publish('channel', 'message')
// ]);
// } finally {
// await subscriber.disconnect();
// }
// }, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('should not fail when message arrives right after subscribe', async publisher => {
const subscriber = await publisher.duplicate().connect();
// testUtils.testWithClient('should not fail when message arrives right after subscribe', async publisher => {
// const subscriber = publisher.duplicate();
try {
await assert.doesNotReject(Promise.all([
subscriber.subscribe('channel', () => {
// noop
}),
publisher.publish('channel', 'message')
]));
} finally {
subscriber.destroy();
}
}, GLOBAL.SERVERS.OPEN);
// await subscriber.connect();
testUtils.testWithClient('should be able to quit in PubSub mode', async client => {
await client.subscribe('channel', () => {
// noop
});
// try {
// await assert.doesNotReject(Promise.all([
// subscriber.subscribe('channel', () => {
// // noop
// }),
// publisher.publish('channel', 'message')
// ]));
// } finally {
// await subscriber.disconnect();
// }
// }, GLOBAL.SERVERS.OPEN);
await assert.doesNotReject(client.quit());
// testUtils.testWithClient('should be able to quit in PubSub mode', async client => {
// await client.subscribe('channel', () => {
// // noop
// });
assert.equal(client.isOpen, false);
}, GLOBAL.SERVERS.OPEN);
});
// await assert.doesNotReject(client.quit());
testUtils.testWithClient('ConnectionTimeoutError', async client => {
const promise = assert.rejects(client.connect(), ConnectionTimeoutError),
start = process.hrtime.bigint();
// assert.equal(client.isOpen, false);
// }, GLOBAL.SERVERS.OPEN);
// });
while (process.hrtime.bigint() - start < 1_000_000) {
// block the event loop for 1ms, to make sure the connection will timeout
}
// testUtils.testWithClient('ConnectionTimeoutError', async client => {
// const promise = assert.rejects(client.connect(), ConnectionTimeoutError),
// start = process.hrtime.bigint();
await promise;
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
socket: {
connectTimeout: 1
}
},
disableClientSetup: true
});
// while (process.hrtime.bigint() - start < 1_000_000) {
// // block the event loop for 1ms, to make sure the connection will timeout
// }
testUtils.testWithClient('client.quit', async client => {
await client.connect();
// await promise;
// }, {
// ...GLOBAL.SERVERS.OPEN,
// clientOptions: {
// socket: {
// connectTimeout: 1
// }
// },
// disableClientSetup: true
// });
const pingPromise = client.ping(),
quitPromise = client.quit();
assert.equal(client.isOpen, false);
// testUtils.testWithClient('client.quit', async client => {
// await client.connect();
const [ping, quit] = await Promise.all([
pingPromise,
quitPromise,
assert.rejects(client.ping(), ClientClosedError)
]);
// const pingPromise = client.ping(),
// quitPromise = client.quit();
// assert.equal(client.isOpen, false);
assert.equal(ping, 'PONG');
assert.equal(quit, 'OK');
}, {
...GLOBAL.SERVERS.OPEN,
disableClientSetup: true
});
// const [ping, quit] = await Promise.all([
// pingPromise,
// quitPromise,
// assert.rejects(client.ping(), ClientClosedError)
// ]);
testUtils.testWithClient('client.disconnect', async client => {
const pingPromise = client.ping(),
disconnectPromise = client.disconnect();
assert.equal(client.isOpen, false);
await Promise.all([
assert.rejects(pingPromise, DisconnectsClientError),
assert.doesNotReject(disconnectPromise),
assert.rejects(client.ping(), ClientClosedError)
]);
}, GLOBAL.SERVERS.OPEN);
// assert.equal(ping, 'PONG');
// assert.equal(quit, 'OK');
// }, {
// ...GLOBAL.SERVERS.OPEN,
// disableClientSetup: true
// });
testUtils.testWithClient('should be able to connect after disconnect (see #1801)', async client => {
await client.disconnect();
await client.connect();
}, GLOBAL.SERVERS.OPEN);
// testUtils.testWithClient('client.disconnect', async client => {
// const pingPromise = client.ping(),
// disconnectPromise = client.disconnect();
// assert.equal(client.isOpen, false);
// await Promise.all([
// assert.rejects(pingPromise, DisconnectsClientError),
// assert.doesNotReject(disconnectPromise),
// assert.rejects(client.ping(), ClientClosedError)
// ]);
// }, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('should be able to use ref and unref', client => {
client.unref();
client.ref();
}, GLOBAL.SERVERS.OPEN);
// testUtils.testWithClient('should be able to connect after disconnect (see #1801)', async client => {
// await client.disconnect();
// await client.connect();
// }, GLOBAL.SERVERS.OPEN);
testUtils.testWithClient('pingInterval', async client => {
assert.deepEqual(
await once(client, 'ping-interval'),
['PONG']
);
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
pingInterval: 1
}
});
// testUtils.testWithClient('should be able to use ref and unref', client => {
// client.unref();
// client.ref();
// }, GLOBAL.SERVERS.OPEN);
// testUtils.testWithClient('pingInterval', async client => {
// assert.deepEqual(
// await once(client, 'ping-interval'),
// ['PONG']
// );
// }, {
// ...GLOBAL.SERVERS.OPEN,
// clientOptions: {
// pingInterval: 1
// }
// });
// testUtils.testWithClient('should reject commands in connect phase when `disableOfflineQueue`', async client => {
// const connectPromise = client.connect();
// await assert.rejects(
// client.ping(),
// ClientOfflineError
// );
// await connectPromise;
// await client.disconnect();
// }, {
// ...GLOBAL.SERVERS.OPEN,
// clientOptions: {
// disableOfflineQueue: true
// },
// disableClientSetup: true
// });
testUtils.testWithClient('should reject commands in connect phase when `disableOfflineQueue`', async client => {
const connectPromise = client.connect();
await assert.rejects(
client.ping(),
ClientOfflineError
);
await connectPromise;
await client.disconnect();
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
disableOfflineQueue: true
},
disableClientSetup: true
});
describe('MONITOR', () => {
testUtils.testWithClient('should be able to monitor commands', async client => {

View File

@@ -59,6 +59,12 @@ export class DoublyLinkedList<T> {
};
}
add(value: T, prepend = false) {
return prepend ?
this.unshift(value) :
this.push(value);
}
shift() {
if (this._head === undefined) return undefined;