You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-04 15:02:09 +03:00
@@ -2,20 +2,14 @@ import { strict as assert } from 'assert';
|
||||
import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
|
||||
import RedisClient, { RedisClientType } from '.';
|
||||
import { RedisClientMultiCommandType } from './multi-command';
|
||||
import { RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands';
|
||||
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors';
|
||||
import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands';
|
||||
import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
|
||||
import { defineScript } from '../lua-script';
|
||||
import { spy } from 'sinon';
|
||||
import { once } from 'events';
|
||||
import { ClientKillFilters } from '../commands/CLIENT_KILL';
|
||||
import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT';
|
||||
import { promisify } from 'util';
|
||||
|
||||
// We need to use 'require', because it's not possible with Typescript to import
|
||||
// function that are exported as 'module.exports = function`, without esModuleInterop
|
||||
// set to true.
|
||||
const calculateSlot = require('cluster-key-slot');
|
||||
|
||||
export const SQUARE_SCRIPT = defineScript({
|
||||
SCRIPT: 'return ARGV[1] * ARGV[1];',
|
||||
NUMBER_OF_KEYS: 0,
|
||||
@@ -171,6 +165,28 @@ describe('Client', () => {
|
||||
}
|
||||
});
|
||||
|
||||
testUtils.testWithClient('client.sendCommand should reply with error', async client => {
|
||||
await assert.rejects(
|
||||
promisify(client.sendCommand).call(client, '1', '2')
|
||||
);
|
||||
}, {
|
||||
...GLOBAL.SERVERS.OPEN,
|
||||
clientOptions: {
|
||||
legacyMode: true
|
||||
}
|
||||
});
|
||||
|
||||
testUtils.testWithClient('client.hGetAll should reply with error', async client => {
|
||||
await assert.rejects(
|
||||
promisify(client.hGetAll).call(client)
|
||||
);
|
||||
}, {
|
||||
...GLOBAL.SERVERS.OPEN,
|
||||
clientOptions: {
|
||||
legacyMode: true
|
||||
}
|
||||
});
|
||||
|
||||
testUtils.testWithClient('client.v4.sendCommand should return a promise', async client => {
|
||||
assert.equal(
|
||||
await client.v4.sendCommand(['PING']),
|
||||
@@ -347,19 +363,6 @@ describe('Client', () => {
|
||||
legacyMode: true
|
||||
}
|
||||
});
|
||||
|
||||
testUtils.testWithClient('pingInterval', async client => {
|
||||
assert.deepEqual(
|
||||
await once(client, 'ping-interval'),
|
||||
['PONG']
|
||||
);
|
||||
}, {
|
||||
...GLOBAL.SERVERS.OPEN,
|
||||
clientOptions: {
|
||||
legacyMode: true,
|
||||
pingInterval: 1
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe('events', () => {
|
||||
@@ -823,34 +826,7 @@ describe('Client', () => {
|
||||
}
|
||||
}, GLOBAL.SERVERS.OPEN);
|
||||
|
||||
testUtils.testWithClient('should be able to PING in PubSub mode', async client => {
|
||||
await client.connect();
|
||||
|
||||
try {
|
||||
await client.subscribe('channel', () => {
|
||||
// noop
|
||||
});
|
||||
|
||||
const [string, buffer, customString, customBuffer] = await Promise.all([
|
||||
client.ping(),
|
||||
client.ping(client.commandOptions({ returnBuffers: true })),
|
||||
client.ping('custom'),
|
||||
client.ping(client.commandOptions({ returnBuffers: true }), 'custom')
|
||||
]);
|
||||
|
||||
assert.equal(string, 'pong');
|
||||
assert.deepEqual(buffer, Buffer.from('pong'));
|
||||
assert.equal(customString, 'custom');
|
||||
assert.deepEqual(customBuffer, Buffer.from('custom'));
|
||||
} finally {
|
||||
await client.disconnect();
|
||||
}
|
||||
}, {
|
||||
...GLOBAL.SERVERS.OPEN,
|
||||
disableClientSetup: true
|
||||
});
|
||||
|
||||
testUtils.testWithClient('should be able to QUIT in PubSub mode', async client => {
|
||||
testUtils.testWithClient('should be able to quit in PubSub mode', async client => {
|
||||
await client.subscribe('channel', () => {
|
||||
// noop
|
||||
});
|
||||
@@ -859,122 +835,6 @@ describe('Client', () => {
|
||||
|
||||
assert.equal(client.isOpen, false);
|
||||
}, GLOBAL.SERVERS.OPEN);
|
||||
|
||||
testUtils.testWithClient('should reject GET in PubSub mode', async client => {
|
||||
await client.connect();
|
||||
|
||||
try {
|
||||
await client.subscribe('channel', () => {
|
||||
// noop
|
||||
});
|
||||
|
||||
await assert.rejects(client.get('key'), ErrorReply);
|
||||
} finally {
|
||||
await client.disconnect();
|
||||
}
|
||||
}, {
|
||||
...GLOBAL.SERVERS.OPEN,
|
||||
disableClientSetup: true
|
||||
});
|
||||
|
||||
describe('shareded PubSub', () => {
|
||||
testUtils.isVersionGreaterThanHook([7]);
|
||||
|
||||
testUtils.testWithClient('should be able to receive messages', async publisher => {
|
||||
const subscriber = publisher.duplicate();
|
||||
|
||||
await subscriber.connect();
|
||||
|
||||
try {
|
||||
const listener = spy();
|
||||
await subscriber.sSubscribe('channel', listener);
|
||||
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener),
|
||||
publisher.sPublish('channel', 'message')
|
||||
]);
|
||||
|
||||
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
|
||||
|
||||
await subscriber.sUnsubscribe();
|
||||
|
||||
// should be able to send commands
|
||||
await assert.doesNotReject(subscriber.ping());
|
||||
} finally {
|
||||
await subscriber.disconnect();
|
||||
}
|
||||
}, {
|
||||
...GLOBAL.SERVERS.OPEN
|
||||
});
|
||||
|
||||
testUtils.testWithClient('should emit sharded-channel-moved event', async publisher => {
|
||||
await publisher.clusterAddSlotsRange({ start: 0, end: 16383 });
|
||||
|
||||
const subscriber = publisher.duplicate();
|
||||
|
||||
await subscriber.connect();
|
||||
|
||||
try {
|
||||
await subscriber.sSubscribe('channel', () => {});
|
||||
|
||||
await Promise.all([
|
||||
publisher.clusterSetSlot(
|
||||
calculateSlot('channel'),
|
||||
ClusterSlotStates.NODE,
|
||||
await publisher.clusterMyId()
|
||||
),
|
||||
once(subscriber, 'sharded-channel-moved')
|
||||
]);
|
||||
|
||||
assert.equal(
|
||||
await subscriber.ping(),
|
||||
'PONG'
|
||||
);
|
||||
} finally {
|
||||
await subscriber.disconnect();
|
||||
}
|
||||
}, {
|
||||
serverArguments: ['--cluster-enabled', 'yes']
|
||||
});
|
||||
});
|
||||
|
||||
testUtils.testWithClient('should handle errors in SUBSCRIBE', async publisher => {
|
||||
const subscriber = publisher.duplicate();
|
||||
|
||||
await subscriber.connect();
|
||||
|
||||
try {
|
||||
const listener1 = spy();
|
||||
await subscriber.subscribe('1', listener1);
|
||||
|
||||
await publisher.aclSetUser('default', 'resetchannels');
|
||||
|
||||
|
||||
const listener2 = spy();
|
||||
await assert.rejects(subscriber.subscribe('2', listener2));
|
||||
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener1),
|
||||
publisher.aclSetUser('default', 'allchannels'),
|
||||
publisher.publish('1', 'message'),
|
||||
]);
|
||||
assert.ok(listener1.calledOnceWithExactly('message', '1'));
|
||||
|
||||
await subscriber.subscribe('2', listener2);
|
||||
|
||||
await Promise.all([
|
||||
waitTillBeenCalled(listener2),
|
||||
publisher.publish('2', 'message'),
|
||||
]);
|
||||
assert.ok(listener2.calledOnceWithExactly('message', '2'));
|
||||
} finally {
|
||||
await subscriber.disconnect();
|
||||
}
|
||||
}, {
|
||||
// this test change ACL rules, running in isolated server
|
||||
serverArguments: [],
|
||||
minimumDockerVersion: [6 ,2] // ACL PubSub rules were added in Redis 6.2
|
||||
});
|
||||
});
|
||||
|
||||
testUtils.testWithClient('ConnectionTimeoutError', async client => {
|
||||
|
@@ -15,6 +15,7 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '.
|
||||
import { URL } from 'url';
|
||||
import { TcpSocketConnectOpts } from 'net';
|
||||
import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
|
||||
import { callbackify } from 'util';
|
||||
|
||||
export interface RedisClientOptions<
|
||||
M extends RedisModules = RedisModules,
|
||||
@@ -343,7 +344,9 @@ export default class RedisClient<
|
||||
(this as any).sendCommand = (...args: Array<any>): void => {
|
||||
const result = this.#legacySendCommand(...args);
|
||||
if (result) {
|
||||
result.promise.then(reply => result.callback(null, reply));
|
||||
result.promise
|
||||
.then(reply => result.callback(null, reply))
|
||||
.catch(err => result.callback(err));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -380,18 +383,18 @@ export default class RedisClient<
|
||||
promise.catch(err => this.emit('error', err));
|
||||
}
|
||||
|
||||
#defineLegacyCommand(this: any, name: string, command?: RedisCommand): void {
|
||||
this.#v4[name] = this[name].bind(this);
|
||||
this[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ?
|
||||
#defineLegacyCommand(name: string, command?: RedisCommand): void {
|
||||
this.#v4[name] = (this as any)[name].bind(this);
|
||||
(this as any)[name] = command && command.TRANSFORM_LEGACY_REPLY && command.transformReply ?
|
||||
(...args: Array<unknown>) => {
|
||||
const result = this.#legacySendCommand(name, ...args);
|
||||
if (result) {
|
||||
result.promise.then((reply: any) => {
|
||||
result.callback(null, command.transformReply!(reply));
|
||||
});
|
||||
result.promise
|
||||
.then(reply => result.callback(null, command.transformReply!(reply)))
|
||||
.catch(err => result.callback(err));
|
||||
}
|
||||
} :
|
||||
(...args: Array<unknown>) => this.sendCommand(name, ...args);
|
||||
(...args: Array<unknown>) => (this as any).sendCommand(name, ...args);
|
||||
}
|
||||
|
||||
#pingTimer?: NodeJS.Timer;
|
||||
|
Reference in New Issue
Block a user