1
0
mirror of https://github.com/redis/node-redis.git synced 2025-08-06 02:15:48 +03:00

Client Side Caching (#2947)

* CSC POC ontop of Parser

* add csc file that weren't merged after patch

* address review comments

* nits to try and fix github

* last change from review

* Update client-side cache and improve documentation

* Add client side caching RESP3 validation

* Add documentation for RESP and unstableResp3 options

* Add comprehensive cache statistics

The `CacheStats` class provides detailed metrics like hit/miss counts,
load success/failure counts, total load time, and eviction counts.
It also offers derived metrics such as hit/miss rates, load failure rate,
and average load penalty. The design is inspired by Caffeine.

`BasicClientSideCache` now uses a `StatsCounter` to accumulate these
statistics, exposed via a new `stats()` method. The previous
`cacheHits()` and `cacheMisses()` methods have been removed.

A `recordStats` option (default: true) in `ClientSideCacheConfig`
allows disabling statistics collection.

---------

Co-authored-by: Shaya Potter <shaya@redislabs.com>
This commit is contained in:
Bobby I.
2025-05-19 15:11:47 +03:00
committed by GitHub
parent 6f961bd715
commit f01f1014cb
25 changed files with 2330 additions and 101 deletions

View File

@@ -235,6 +235,24 @@ of sending a `QUIT` command to the server, the client can simply close the netwo
client.destroy(); client.destroy();
``` ```
### Client Side Caching
Node Redis v5 adds support for [Client Side Caching](https://redis.io/docs/manual/client-side-caching/), which enables clients to cache query results locally. The Redis server will notify the client when cached results are no longer valid.
```typescript
// Enable client side caching with RESP3
const client = createClient({
RESP: 3,
clientSideCache: {
ttl: 0, // Time-to-live (0 = no expiration)
maxEntries: 0, // Maximum entries (0 = unlimited)
evictPolicy: "LRU" // Eviction policy: "LRU" or "FIFO"
}
});
```
See the [V5 documentation](./docs/v5.md#client-side-caching) for more details and advanced usage.
### Auto-Pipelining ### Auto-Pipelining
Node Redis will automatically pipeline requests that are made during the same "tick". Node Redis will automatically pipeline requests that are made during the same "tick".

View File

@@ -89,3 +89,100 @@ await multi.exec(); // Array<ReplyUnion>
await multi.exec<'typed'>(); // [string] await multi.exec<'typed'>(); // [string]
await multi.execTyped(); // [string] await multi.execTyped(); // [string]
``` ```
# Client Side Caching
Node Redis v5 adds support for [Client Side Caching](https://redis.io/docs/manual/client-side-caching/), which enables clients to cache query results locally. The server will notify the client when cached results are no longer valid.
Client Side Caching is only supported with RESP3.
## Usage
There are two ways to implement client side caching:
### Anonymous Cache
```javascript
const client = createClient({
RESP: 3,
clientSideCache: {
ttl: 0, // Time-to-live in milliseconds (0 = no expiration)
maxEntries: 0, // Maximum entries to store (0 = unlimited)
evictPolicy: "LRU" // Eviction policy: "LRU" or "FIFO"
}
});
```
In this instance, the cache is managed internally by the client.
### Controllable Cache
```javascript
import { BasicClientSideCache } from 'redis';
const cache = new BasicClientSideCache({
ttl: 0,
maxEntries: 0,
evictPolicy: "LRU"
});
const client = createClient({
RESP: 3,
clientSideCache: cache
});
```
With this approach, you have direct access to the cache object for more control:
```javascript
// Manually invalidate keys
cache.invalidate(key);
// Clear the entire cache
cache.clear();
// Get cache metrics
// `cache.stats()` returns a `CacheStats` object with comprehensive statistics.
const statistics = cache.stats();
// Key metrics:
const hits = statistics.hitCount; // Number of cache hits
const misses = statistics.missCount; // Number of cache misses
const hitRate = statistics.hitRate(); // Cache hit rate (0.0 to 1.0)
// Many other metrics are available on the `statistics` object, e.g.:
// statistics.missRate(), statistics.loadSuccessCount,
// statistics.averageLoadPenalty(), statistics.requestCount()
```
## Pooled Caching
Client side caching also works with client pools. For pooled clients, the cache is shared across all clients in the pool:
```javascript
const client = createClientPool({RESP: 3}, {
clientSideCache: {
ttl: 0,
maxEntries: 0,
evictPolicy: "LRU"
},
minimum: 5
});
```
For a controllable pooled cache:
```javascript
import { BasicPooledClientSideCache } from 'redis';
const cache = new BasicPooledClientSideCache({
ttl: 0,
maxEntries: 0,
evictPolicy: "LRU"
});
const client = createClientPool({RESP: 3}, {
clientSideCache: cache,
minimum: 5
});
```

View File

@@ -34,3 +34,6 @@ export { GEO_REPLY_WITH, GeoReplyWith } from './lib/commands/GEOSEARCH_WITH';
export { SetOptions } from './lib/commands/SET'; export { SetOptions } from './lib/commands/SET';
export { REDIS_FLUSH_MODES } from './lib/commands/FLUSHALL'; export { REDIS_FLUSH_MODES } from './lib/commands/FLUSHALL';
export { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';

View File

@@ -314,11 +314,17 @@ export interface CommanderConfig<
functions?: F; functions?: F;
scripts?: S; scripts?: S;
/** /**
* TODO * Specifies the Redis Serialization Protocol version to use.
* RESP2 is the default (value 2), while RESP3 (value 3) provides
* additional data types and features introduced in Redis 6.0.
*/ */
RESP?: RESP; RESP?: RESP;
/** /**
* TODO * When set to true, enables commands that have unstable RESP3 implementations.
* When using RESP3 protocol, commands marked as having unstable RESP3 support
* will throw an error unless this flag is explicitly set to true.
* This primarily affects modules like Redis Search where response formats
* in RESP3 mode may change in future versions.
*/ */
unstableResp3?: boolean; unstableResp3?: boolean;
} }

View File

@@ -0,0 +1,700 @@
import assert from "assert";
import testUtils, { GLOBAL } from "../test-utils"
import { BasicClientSideCache, BasicPooledClientSideCache, CacheStats } from "./cache"
import { REDIS_FLUSH_MODES } from "../commands/FLUSHALL";
import { once } from 'events';
describe("Client Side Cache", () => {
describe('Basic Cache', () => {
const csc = new BasicClientSideCache({ maxEntries: 10 });
testUtils.testWithClient('Basic Cache Miss', async client => {
csc.clear();
await client.set("x", 1);
await client.get("x");
assert.equal(csc.stats().missCount, 1, "Cache Misses");
assert.equal(csc.stats().hitCount, 0, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClient('Basic Cache Hit', async client => {
csc.clear();
await client.set("x", 1);
assert.equal(await client.get("x"), '1');
assert.equal(await client.get("x"), '1');
assert.equal(csc.stats().missCount, 1, "Cache Misses");
assert.equal(csc.stats().hitCount, 1, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClient('Max Cache Entries', async client => {
csc.clear();
await client.set('1', 1);
assert.equal(await client.get('1'), '1');
assert.equal(await client.get('2'), null);
assert.equal(await client.get('3'), null);
assert.equal(await client.get('4'), null);
assert.equal(await client.get('5'), null);
assert.equal(await client.get('6'), null);
assert.equal(await client.get('7'), null);
assert.equal(await client.get('8'), null);
assert.equal(await client.get('9'), null);
assert.equal(await client.get('10'), null);
assert.equal(await client.get('11'), null);
assert.equal(await client.get('1'), '1');
assert.equal(csc.stats().missCount, 12, "Cache Misses");
assert.equal(csc.stats().hitCount, 0, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClient('LRU works correctly', async client => {
csc.clear();
await client.set('1', 1);
assert.equal(await client.get('1'), '1');
assert.equal(await client.get('2'), null);
assert.equal(await client.get('3'), null);
assert.equal(await client.get('4'), null);
assert.equal(await client.get('5'), null);
assert.equal(await client.get('1'), '1');
assert.equal(await client.get('6'), null);
assert.equal(await client.get('7'), null);
assert.equal(await client.get('8'), null);
assert.equal(await client.get('9'), null);
assert.equal(await client.get('10'), null);
assert.equal(await client.get('11'), null);
assert.equal(await client.get('1'), '1');
assert.equal(csc.stats().missCount, 11, "Cache Misses");
assert.equal(csc.stats().hitCount, 2, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClient('Basic Cache Clear', async client => {
csc.clear();
await client.set("x", 1);
await client.get("x");
csc.clear();
await client.get("x");
assert.equal(csc.stats().missCount, 1, "Cache Misses");
assert.equal(csc.stats().hitCount, 0, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClient('Null Invalidate acts as clear', async client => {
csc.clear();
await client.set("x", 1);
await client.get("x");
csc.invalidate(null);
await client.get("x");
assert.equal(2, csc.stats().missCount, "Cache Misses");
assert.equal(0, csc.stats().hitCount, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClient('flushdb causes an invalidate null', async client => {
csc.clear();
await client.set("x", 1);
assert.equal(await client.get("x"), '1');
await client.flushDb(REDIS_FLUSH_MODES.SYNC);
assert.equal(await client.get("x"), null);
assert.equal(csc.stats().missCount, 2, "Cache Misses");
assert.equal(csc.stats().hitCount, 0, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClient('Basic Cache Invalidate', async client => {
csc.clear();
await client.set("x", 1);
assert.equal(await client.get("x"), '1', 'first get');
await client.set("x", 2);
assert.equal(await client.get("x"), '2', 'second get');
await client.set("x", 3);
assert.equal(await client.get("x"), '3', 'third get');
assert.equal(csc.stats().missCount, 3, "Cache Misses");
assert.equal(csc.stats().hitCount, 0, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClient("Cached Replies Don't Mutate", async client => {
csc.clear();
await client.set("x", 1);
await client.set('y', 2);
const ret1 = await client.mGet(['x', 'y']);
assert.deepEqual(ret1, ['1', '2'], 'first mGet');
ret1[0] = '4';
const ret2 = await client.mGet(['x', 'y']);
assert.deepEqual(ret2, ['1', '2'], 'second mGet');
ret2[0] = '8';
const ret3 = await client.mGet(['x', 'y']);
assert.deepEqual(ret3, ['1', '2'], 'third mGet');
assert.equal(csc.stats().missCount, 1, "Cache Misses");
assert.equal(csc.stats().hitCount, 2, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClient("Cached cleared on disconnect", async client => {
csc.clear();
await client.set("x", 1);
await client.set('y', 2);
const ret1 = await client.mGet(['x', 'y']);
assert.deepEqual(ret1, ['1', '2'], 'first mGet');
assert.equal(csc.stats().missCount, 1, "first Cache Misses");
assert.equal(csc.stats().hitCount, 0, "first Cache Hits");
await client.close();
await client.connect();
const ret2 = await client.mGet(['x', 'y']);
assert.deepEqual(ret2, ['1', '2'], 'second mGet');
assert.equal(csc.stats().missCount, 1, "second Cache Misses");
assert.equal(csc.stats().hitCount, 0, "second Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
});
describe("Pooled Cache", () => {
const csc = new BasicPooledClientSideCache();
testUtils.testWithClient('Virtual Pool Disconnect', async client1 => {
const client2 = client1.duplicate();
await client2.connect()
assert.equal(await client2.get("x"), null);
assert.equal(await client1.get("x"), null);
assert.equal(1, csc.stats().missCount, "Cache Misses");
assert.equal(1, csc.stats().hitCount, "Cache Hits");
await client2.close();
assert.equal(await client1.get("x"), null);
assert.equal(await client1.get("x"), null);
assert.equal(2, csc.stats().missCount, "Cache Misses");
assert.equal(2, csc.stats().hitCount, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
testUtils.testWithClientPool('Basic Cache Miss and Clear', async client => {
csc.clear();
await client.set("x", 1);
assert.equal(await client.get("x"), '1');
assert.equal(1, csc.stats().missCount, "Cache Misses");
assert.equal(0, csc.stats().hitCount, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
},
poolOptions: {
minimum: 5,
maximum: 5,
acquireTimeout: 0,
cleanupDelay: 1,
clientSideCache: csc
}
})
testUtils.testWithClientPool('Basic Cache Hit', async client => {
csc.clear();
await client.set("x", 1);
assert.equal(await client.get("x"), '1');
assert.equal(await client.get("x"), '1');
assert.equal(await client.get("x"), '1');
assert.equal(csc.stats().missCount, 1, "Cache Misses");
assert.equal(csc.stats().hitCount, 2, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
},
poolOptions: {
minimum: 5,
maximum: 5,
acquireTimeout: 0,
cleanupDelay: 1,
clientSideCache: csc
}
})
testUtils.testWithClientPool('Basic Cache Manually Invalidate', async client => {
csc.clear();
await client.set("x", 1);
assert.equal(await client.get("x"), '1', 'first get');
let p: Promise<Array<string>> = once(csc, 'invalidate');
await client.set("x", 2);
let [i] = await p;
assert.equal(await client.get("x"), '2', 'second get');
p = once(csc, 'invalidate');
await client.set("x", 3);
[i] = await p;
assert.equal(await client.get("x"), '3');
assert.equal(csc.stats().missCount, 3, "Cache Misses");
assert.equal(csc.stats().hitCount, 0, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
},
poolOptions: {
minimum: 5,
maximum: 5,
acquireTimeout: 0,
cleanupDelay: 1,
clientSideCache: csc
}
})
testUtils.testWithClientPool('Basic Cache Invalidate via message', async client => {
csc.clear();
await client.set('x', 1);
await client.set('y', 2);
assert.deepEqual(await client.mGet(['x', 'y']), ['1', '2'], 'first mGet');
assert.equal(csc.stats().missCount, 1, "Cache Misses");
assert.equal(csc.stats().hitCount, 0, "Cache Hits");
let p: Promise<Array<string>> = once(csc, 'invalidate');
await client.set("x", 3);
let [i] = await p;
assert.equal(i, 'x');
assert.deepEqual(await client.mGet(['x', 'y']), ['3', '2'], 'second mGet');
assert.equal(csc.stats().missCount, 2, "Cache Misses");
assert.equal(csc.stats().hitCount, 0, "Cache Hits");
p = once(csc, 'invalidate');
await client.set("y", 4);
[i] = await p;
assert.equal(i, 'y');
assert.deepEqual(await client.mGet(['x', 'y']), ['3', '4'], 'second mGet');
assert.equal(csc.stats().missCount, 3, "Cache Misses");
assert.equal(csc.stats().hitCount, 0, "Cache Hits");
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
},
poolOptions: {
minimum: 5,
maximum: 5,
acquireTimeout: 0,
cleanupDelay: 1,
clientSideCache: csc
}
})
});
describe('Cluster Caching', () => {
const csc = new BasicPooledClientSideCache();
testUtils.testWithCluster('Basic Cache Miss and Clear', async client => {
csc.clear();
await client.set("x", 1);
await client.get("x");
await client.set("y", 1);
await client.get("y");
assert.equal(2, csc.stats().missCount, "Cache Misses");
assert.equal(0, csc.stats().hitCount, "Cache Hits");
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
RESP: 3,
clientSideCache: csc
}
})
testUtils.testWithCluster('Basic Cache Hit', async client => {
csc.clear();
await client.set("x", 1);
assert.equal(await client.get("x"), '1');
assert.equal(await client.get("x"), '1');
assert.equal(await client.get("x"), '1');
await client.set("y", 1);
assert.equal(await client.get("y"), '1');
assert.equal(await client.get("y"), '1');
assert.equal(await client.get("y"), '1');
assert.equal(2, csc.stats().missCount, "Cache Misses");
assert.equal(4, csc.stats().hitCount, "Cache Hits");
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
RESP: 3,
clientSideCache: csc
}
})
testUtils.testWithCluster('Basic Cache Invalidate', async client => {
csc.clear();
await client.set("x", 1);
assert.equal(await client.get("x"), '1');
await client.set("x", 2);
assert.equal(await client.get("x"), '2');
await client.set("x", 3);
assert.equal(await client.get("x"), '3');
await client.set("y", 1);
assert.equal(await client.get("y"), '1');
await client.set("y", 2);
assert.equal(await client.get("y"), '2');
await client.set("y", 3);
assert.equal(await client.get("y"), '3');
assert.equal(6, csc.stats().missCount, "Cache Misses");
assert.equal(0, csc.stats().hitCount, "Cache Hits");
}, {
...GLOBAL.CLUSTERS.OPEN,
clusterConfiguration: {
RESP: 3,
clientSideCache: csc
}
})
});
describe("CacheStats", () => {
describe("CacheStats.of()", () => {
it("should correctly initialize stats and calculate derived values", () => {
const stats = CacheStats.of(10, 5, 8, 2, 100, 3);
assert.strictEqual(stats.hitCount, 10, "hitCount should be 10");
assert.strictEqual(stats.missCount, 5, "missCount should be 5");
assert.strictEqual(stats.loadSuccessCount, 8, "loadSuccessCount should be 8");
assert.strictEqual(stats.loadFailureCount, 2, "loadFailureCount should be 2");
assert.strictEqual(stats.totalLoadTime, 100, "totalLoadTime should be 100");
assert.strictEqual(stats.evictionCount, 3, "evictionCount should be 3");
assert.strictEqual(stats.requestCount(), 15, "requestCount should be 15 (10 hits + 5 misses)");
assert.strictEqual(stats.hitRate(), 10 / 15, "hitRate should be 10/15");
assert.strictEqual(stats.missRate(), 5 / 15, "missRate should be 5/15");
assert.strictEqual(stats.loadCount(), 10, "loadCount should be 10 (8 success + 2 failure)");
assert.strictEqual(stats.loadFailureRate(), 2 / 10, "loadFailureRate should be 2/10");
assert.strictEqual(stats.averageLoadPenalty(), 100 / 10, "averageLoadPenalty should be 10 (100 time / 10 loads)");
});
it("should handle zero values and division by zero for derived values", () => {
const stats = CacheStats.of(0, 0, 0, 0, 0, 0);
assert.strictEqual(stats.hitCount, 0, "hitCount");
assert.strictEqual(stats.missCount, 0, "missCount");
assert.strictEqual(stats.loadSuccessCount, 0, "loadSuccessCount");
assert.strictEqual(stats.loadFailureCount, 0, "loadFailureCount");
assert.strictEqual(stats.totalLoadTime, 0, "totalLoadTime");
assert.strictEqual(stats.evictionCount, 0, "evictionCount");
assert.strictEqual(stats.requestCount(), 0, "requestCount should be 0");
assert.strictEqual(stats.hitRate(), 1, "hitRate should be 1 for 0 requests");
assert.strictEqual(stats.missRate(), 0, "missRate should be 0 for 0 requests");
assert.strictEqual(stats.loadCount(), 0, "loadCount should be 0");
assert.strictEqual(stats.loadFailureRate(), 0, "loadFailureRate should be 0 for 0 loads");
assert.strictEqual(stats.averageLoadPenalty(), 0, "averageLoadPenalty should be 0 for 0 loads");
});
});
describe("CacheStats.empty()", () => {
it("should return stats with all zero counts and 0 for rates/penalties", () => {
const stats = CacheStats.empty();
assert.strictEqual(stats.hitCount, 0, "empty.hitCount");
assert.strictEqual(stats.missCount, 0, "empty.missCount");
assert.strictEqual(stats.loadSuccessCount, 0, "empty.loadSuccessCount");
assert.strictEqual(stats.loadFailureCount, 0, "empty.loadFailureCount");
assert.strictEqual(stats.totalLoadTime, 0, "empty.totalLoadTime");
assert.strictEqual(stats.evictionCount, 0, "empty.evictionCount");
assert.strictEqual(stats.requestCount(), 0, "empty.requestCount");
assert.strictEqual(stats.hitRate(), 1, "empty.hitRate should be 1");
assert.strictEqual(stats.missRate(), 0, "empty.missRate should be 0");
assert.strictEqual(stats.loadCount(), 0, "empty.loadCount");
assert.strictEqual(stats.loadFailureRate(), 0, "empty.loadFailureRate should be 0");
assert.strictEqual(stats.averageLoadPenalty(), 0, "empty.averageLoadPenalty should be 0");
});
});
describe("instance methods", () => {
const stats1 = CacheStats.of(10, 5, 8, 2, 100, 3);
const stats2 = CacheStats.of(20, 10, 12, 3, 200, 5);
describe("plus()", () => {
it("should correctly add two CacheStats instances", () => {
const sum = stats1.plus(stats2);
assert.strictEqual(sum.hitCount, 30);
assert.strictEqual(sum.missCount, 15);
assert.strictEqual(sum.loadSuccessCount, 20);
assert.strictEqual(sum.loadFailureCount, 5);
assert.strictEqual(sum.totalLoadTime, 300);
assert.strictEqual(sum.evictionCount, 8);
});
it("should correctly sum large numbers", () => {
const statsC = CacheStats.of(Number.MAX_VALUE, 1, 1, 1, 1, 1);
const statsD = CacheStats.of(Number.MAX_VALUE, 1, 1, 1, 1, 1);
const sum = statsC.plus(statsD);
assert.strictEqual(sum.hitCount, Infinity, "Summing MAX_VALUE should result in Infinity");
});
});
describe("minus()", () => {
it("should correctly subtract one CacheStats instance from another, flooring at 0", () => {
const diff = stats2.minus(stats1);
assert.strictEqual(diff.hitCount, 10);
assert.strictEqual(diff.missCount, 5);
assert.strictEqual(diff.loadSuccessCount, 4);
assert.strictEqual(diff.loadFailureCount, 1);
assert.strictEqual(diff.totalLoadTime, 100);
assert.strictEqual(diff.evictionCount, 2);
});
it("should floor results at 0 if minuend is smaller than subtrahend", () => {
const sSmall = CacheStats.of(5, 2, 1, 0, 10, 1);
const sLarge = CacheStats.of(10, 5, 2, 1, 20, 2);
const diff = sSmall.minus(sLarge);
assert.strictEqual(diff.hitCount, 0, "hitCount should be floored at 0 (5 - 10)");
assert.strictEqual(diff.missCount, 0, "missCount should be floored at 0 (2 - 5)");
assert.strictEqual(diff.loadSuccessCount, 0, "loadSuccessCount should be floored at 0 (1 - 2)");
assert.strictEqual(diff.loadFailureCount, 0, "loadFailureCount should be floored at 0 (0 - 1)");
assert.strictEqual(diff.totalLoadTime, 0, "totalLoadTime should be floored at 0 (10 - 20)");
assert.strictEqual(diff.evictionCount, 0, "evictionCount should be floored at 0 (1 - 2)");
});
});
describe("hitRate()", () => {
it("should return 0 if requestCount is 0", () => {
const stats = CacheStats.of(0, 0, 0, 0, 0, 0);
assert.strictEqual(stats.hitRate(), 1);
});
it("should return 0 if hitCount is 0 but missCount > 0", () => {
const stats = CacheStats.of(0, 1, 0, 0, 0, 0);
assert.strictEqual(stats.hitRate(), 0);
});
it("should return 1 if missCount is 0 but hitCount > 0", () => {
const stats = CacheStats.of(1, 0, 0, 0, 0, 0);
assert.strictEqual(stats.hitRate(), 1);
});
});
describe("missRate()", () => {
it("should return 0 if requestCount is 0", () => {
const stats = CacheStats.of(0, 0, 0, 0, 0, 0);
assert.strictEqual(stats.missRate(), 0);
});
it("should return 1 if hitCount is 0 but missCount > 0", () => {
const stats = CacheStats.of(0, 1, 0, 0, 0, 0);
assert.strictEqual(stats.missRate(), 1);
});
it("should return 0 if missCount is 0 but hitCount > 0", () => {
const stats = CacheStats.of(1, 0, 0, 0, 0, 0);
assert.strictEqual(stats.missRate(), 0);
});
});
describe("loadFailureRate()", () => {
it("should return 0 if loadCount is 0", () => {
const stats = CacheStats.of(0, 0, 0, 0, 0, 0);
assert.strictEqual(stats.loadFailureRate(), 0);
});
it("should return 0 if loadFailureCount is 0 but loadSuccessCount > 0", () => {
const stats = CacheStats.of(0, 0, 1, 0, 10, 0);
assert.strictEqual(stats.loadFailureRate(), 0);
});
it("should return 1 if loadSuccessCount is 0 but loadFailureCount > 0", () => {
const stats = CacheStats.of(0, 0, 0, 1, 10, 0);
assert.strictEqual(stats.loadFailureRate(), 1);
});
});
describe("averageLoadPenalty()", () => {
it("should return 0 if loadCount is 0, even if totalLoadTime > 0", () => {
const stats = CacheStats.of(0, 0, 0, 0, 100, 0);
assert.strictEqual(stats.averageLoadPenalty(), 0);
});
it("should return 0 if totalLoadTime is 0 and loadCount > 0", () => {
const stats = CacheStats.of(0, 0, 1, 1, 0, 0);
assert.strictEqual(stats.averageLoadPenalty(), 0);
});
});
});
});
it('should reflect comprehensive cache operations in stats via BasicClientSideCache', async function () {
const csc = new BasicClientSideCache({
maxEntries: 2, // Small size to easily trigger evictions
});
testUtils.testWithClient('comprehensive_stats_run', async client => {
// --- Phase 1: Initial misses and loads ---
await client.set('keyA', 'valueA_1');
assert.strictEqual(await client.get('keyA'), 'valueA_1', "Get keyA first time");
assert.strictEqual(csc.stats().missCount, 1);
assert.strictEqual(csc.stats().loadSuccessCount, 1);
await client.set('keyB', 'valueB_1');
assert.strictEqual(await client.get('keyB'), 'valueB_1', "Get keyB first time");
assert.strictEqual(csc.stats().missCount, 2);
assert.strictEqual(csc.stats().loadSuccessCount, 2);
// --- Phase 2: Cache hits ---
assert.strictEqual(await client.get('keyA'), 'valueA_1', "Get keyA second time (hit)");
assert.strictEqual(csc.stats().hitCount, 1);
assert.strictEqual(await client.get('keyB'), 'valueB_1', "Get keyB second time (hit)");
assert.strictEqual(csc.stats().hitCount, 2);
// --- Phase 3: Trigger evictions and more misses/loads ---
await client.set('keyC', 'valueC_1');
assert.strictEqual(await client.get('keyC'), 'valueC_1', "Get keyC first time (evicts keyA)");
assert.strictEqual(csc.stats().missCount, 3);
assert.strictEqual(csc.stats().loadSuccessCount, 3);
assert.strictEqual(csc.stats().evictionCount, 1);
assert.strictEqual(await client.get('keyA'), 'valueA_1', "Get keyA again (miss after eviction)");
assert.strictEqual(csc.stats().missCount, 4);
assert.strictEqual(csc.stats().loadSuccessCount, 4);
assert.strictEqual(csc.stats().evictionCount, 2);
// --- Phase 4: More hits ---
assert.strictEqual(await client.get('keyC'), 'valueC_1', "Get keyC again (hit)");
assert.strictEqual(csc.stats().hitCount, 3);
// --- Phase 5: Update a key (results in invalidation, then miss/load on next GET) ---
// Note: A SET operation on an existing cached key should invalidate it.
// The invalidation itself isn't directly a "hit" or "miss" for stats,
// but the *next* GET will be a miss.
await client.set('keyA', 'valueA_2');
assert.strictEqual(await client.get('keyA'), 'valueA_2', "Get keyA after SET (miss due to invalidation)");
assert.strictEqual(csc.stats().hitCount, 3);
assert.strictEqual(csc.stats().loadSuccessCount, 5);
const stats = csc.stats()
assert.strictEqual(stats.hitCount, 3, "Final hitCount");
assert.strictEqual(stats.missCount, 5, "Final missCount");
assert.strictEqual(stats.loadSuccessCount, 5, "Final loadSuccessCount");
assert.strictEqual(stats.loadFailureCount, 0, "Final loadFailureCount (expected 0 for this test)");
assert.strictEqual(stats.evictionCount, 2, "Final evictionCount");
assert.ok(stats.totalLoadTime >= 0, "Final totalLoadTime should be non-negative");
assert.strictEqual(stats.requestCount(), 8, "Final requestCount (5 misses + 3 hits)");
assert.strictEqual(stats.hitRate(), 3 / 8, "Final hitRate");
assert.strictEqual(stats.missRate(), 5 / 8, "Final missRate");
assert.strictEqual(stats.loadCount(), 5, "Final loadCount (5 success + 0 failure)");
assert.strictEqual(stats.loadFailureRate(), 0, "Final loadFailureRate (0 failures / 5 loads)");
if (stats.loadCount() > 0) {
assert.ok(stats.averageLoadPenalty() >= 0, "Final averageLoadPenalty should be non-negative");
assert.strictEqual(stats.averageLoadPenalty(), stats.totalLoadTime / stats.loadCount(), "Average load penalty calculation");
} else {
assert.strictEqual(stats.averageLoadPenalty(), 0, "Final averageLoadPenalty should be 0 if no loads");
}
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3,
clientSideCache: csc
}
});
});
});

View File

@@ -0,0 +1,870 @@
import { EventEmitter } from 'stream';
import RedisClient from '.';
import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types';
import { BasicCommandParser } from './parser';
/**
* A snapshot of cache statistics.
*
* This class provides an immutable view of the cache's operational statistics at a particular
* point in time. It is heavily inspired by the statistics reporting capabilities found in
* Ben Manes's Caffeine cache (https://github.com/ben-manes/caffeine).
*
* Instances of `CacheStats` are typically obtained from a {@link StatsCounter} and can be used
* for performance monitoring, debugging, or logging. It includes metrics such as hit rate,
* miss rate, load success/failure rates, average load penalty, and eviction counts.
*
* All statistics are non-negative. Rates and averages are typically in the range `[0.0, 1.0]`,
* or `0` if the an operation has not occurred (e.g. hit rate is 0 if there are no requests).
*
* Cache statistics are incremented according to specific rules:
* - When a cache lookup encounters an existing entry, hitCount is incremented.
* - When a cache lookup encounters a missing entry, missCount is incremented.
* - When a new entry is successfully loaded, loadSuccessCount is incremented and the
* loading time is added to totalLoadTime.
* - When an entry fails to load, loadFailureCount is incremented and the
* loading time is added to totalLoadTime.
* - When an entry is evicted due to size constraints or expiration,
* evictionCount is incremented.
*/
export class CacheStats {
/**
* Creates a new CacheStats instance with the specified statistics.
*/
private constructor(
public readonly hitCount: number,
public readonly missCount: number,
public readonly loadSuccessCount: number,
public readonly loadFailureCount: number,
public readonly totalLoadTime: number,
public readonly evictionCount: number
) {
if (
hitCount < 0 ||
missCount < 0 ||
loadSuccessCount < 0 ||
loadFailureCount < 0 ||
totalLoadTime < 0 ||
evictionCount < 0
) {
throw new Error('All statistics values must be non-negative');
}
}
/**
* Creates a new CacheStats instance with the specified statistics.
*
* @param hitCount - Number of cache hits
* @param missCount - Number of cache misses
* @param loadSuccessCount - Number of successful cache loads
* @param loadFailureCount - Number of failed cache loads
* @param totalLoadTime - Total load time in milliseconds
* @param evictionCount - Number of cache evictions
*/
static of(
hitCount = 0,
missCount = 0,
loadSuccessCount = 0,
loadFailureCount = 0,
totalLoadTime = 0,
evictionCount = 0
): CacheStats {
return new CacheStats(
hitCount,
missCount,
loadSuccessCount,
loadFailureCount,
totalLoadTime,
evictionCount
);
}
/**
* Returns a statistics instance where no cache events have been recorded.
*
* @returns An empty statistics instance
*/
static empty(): CacheStats {
return CacheStats.EMPTY_STATS;
}
/**
* An empty stats instance with all counters set to zero.
*/
private static readonly EMPTY_STATS = new CacheStats(0, 0, 0, 0, 0, 0);
/**
* Returns the total number of times cache lookup methods have returned
* either a cached or uncached value.
*
* @returns Total number of requests (hits + misses)
*/
requestCount(): number {
return this.hitCount + this.missCount;
}
/**
* Returns the hit rate of the cache.
* This is defined as hitCount / requestCount, or 1.0 when requestCount is 0.
*
* @returns The ratio of cache requests that were hits (between 0.0 and 1.0)
*/
hitRate(): number {
const requestCount = this.requestCount();
return requestCount === 0 ? 1.0 : this.hitCount / requestCount;
}
/**
* Returns the miss rate of the cache.
* This is defined as missCount / requestCount, or 0.0 when requestCount is 0.
*
* @returns The ratio of cache requests that were misses (between 0.0 and 1.0)
*/
missRate(): number {
const requestCount = this.requestCount();
return requestCount === 0 ? 0.0 : this.missCount / requestCount;
}
/**
* Returns the total number of load operations (successful + failed).
*
* @returns Total number of load operations
*/
loadCount(): number {
return this.loadSuccessCount + this.loadFailureCount;
}
/**
* Returns the ratio of cache loading attempts that failed.
* This is defined as loadFailureCount / loadCount, or 0.0 when loadCount is 0.
*
* @returns Ratio of load operations that failed (between 0.0 and 1.0)
*/
loadFailureRate(): number {
const loadCount = this.loadCount();
return loadCount === 0 ? 0.0 : this.loadFailureCount / loadCount;
}
/**
* Returns the average time spent loading new values, in milliseconds.
* This is defined as totalLoadTime / loadCount, or 0.0 when loadCount is 0.
*
* @returns Average load time in milliseconds
*/
averageLoadPenalty(): number {
const loadCount = this.loadCount();
return loadCount === 0 ? 0.0 : this.totalLoadTime / loadCount;
}
/**
* Returns a new CacheStats representing the difference between this CacheStats
* and another. Negative values are rounded up to zero.
*
* @param other - The statistics to subtract from this instance
* @returns The difference between this instance and other
*/
minus(other: CacheStats): CacheStats {
return CacheStats.of(
Math.max(0, this.hitCount - other.hitCount),
Math.max(0, this.missCount - other.missCount),
Math.max(0, this.loadSuccessCount - other.loadSuccessCount),
Math.max(0, this.loadFailureCount - other.loadFailureCount),
Math.max(0, this.totalLoadTime - other.totalLoadTime),
Math.max(0, this.evictionCount - other.evictionCount)
);
}
/**
* Returns a new CacheStats representing the sum of this CacheStats and another.
*
* @param other - The statistics to add to this instance
* @returns The sum of this instance and other
*/
plus(other: CacheStats): CacheStats {
return CacheStats.of(
this.hitCount + other.hitCount,
this.missCount + other.missCount,
this.loadSuccessCount + other.loadSuccessCount,
this.loadFailureCount + other.loadFailureCount,
this.totalLoadTime + other.totalLoadTime,
this.evictionCount + other.evictionCount
);
}
}
/**
* An accumulator for cache statistics.
*
* This interface defines the contract for objects that record cache-related events
* such as hits, misses, loads (successes and failures), and evictions. The design
* is inspired by the statistics collection mechanisms in Ben Manes's Caffeine cache
* (https://github.com/ben-manes/caffeine).
*
* Implementations of this interface are responsible for aggregating these events.
* A snapshot of the current statistics can be obtained by calling the `snapshot()`
* method, which returns an immutable {@link CacheStats} object.
*
* Common implementations include `DefaultStatsCounter` for active statistics collection
* and `DisabledStatsCounter` for a no-op version when stats are not needed.
*/
export interface StatsCounter {
/**
* Records cache hits. This should be called when a cache request returns a cached value.
*
* @param count - The number of hits to record
*/
recordHits(count: number): void;
/**
* Records cache misses. This should be called when a cache request returns a value that was not
* found in the cache.
*
* @param count - The number of misses to record
*/
recordMisses(count: number): void;
/**
* Records the successful load of a new entry. This method should be called when a cache request
* causes an entry to be loaded and the loading completes successfully.
*
* @param loadTime - The number of milliseconds the cache spent computing or retrieving the new value
*/
recordLoadSuccess(loadTime: number): void;
/**
* Records the failed load of a new entry. This method should be called when a cache request
* causes an entry to be loaded, but an exception is thrown while loading the entry.
*
* @param loadTime - The number of milliseconds the cache spent computing or retrieving the new value
* prior to the failure
*/
recordLoadFailure(loadTime: number): void;
/**
* Records the eviction of an entry from the cache. This should only be called when an entry is
* evicted due to the cache's eviction strategy, and not as a result of manual invalidations.
*
* @param count - The number of evictions to record
*/
recordEvictions(count: number): void;
/**
* Returns a snapshot of this counter's values. Note that this may be an inconsistent view, as it
* may be interleaved with update operations.
*
* @return A snapshot of this counter's values
*/
snapshot(): CacheStats;
}
/**
* A StatsCounter implementation that does nothing and always returns empty stats.
*/
class DisabledStatsCounter implements StatsCounter {
static readonly INSTANCE = new DisabledStatsCounter();
private constructor() { }
recordHits(count: number): void { }
recordMisses(count: number): void { }
recordLoadSuccess(loadTime: number): void { }
recordLoadFailure(loadTime: number): void { }
recordEvictions(count: number): void { }
snapshot(): CacheStats { return CacheStats.empty(); }
}
/**
* Returns a StatsCounter that does not record any cache events.
*
* @return A StatsCounter that does not record metrics
*/
function disabledStatsCounter(): StatsCounter {
return DisabledStatsCounter.INSTANCE;
}
/**
* A StatsCounter implementation that maintains cache statistics.
*/
class DefaultStatsCounter implements StatsCounter {
#hitCount = 0;
#missCount = 0;
#loadSuccessCount = 0;
#loadFailureCount = 0;
#totalLoadTime = 0;
#evictionCount = 0;
/**
* Records cache hits.
*
* @param count - The number of hits to record
*/
recordHits(count: number): void {
this.#hitCount += count;
}
/**
* Records cache misses.
*
* @param count - The number of misses to record
*/
recordMisses(count: number): void {
this.#missCount += count;
}
/**
* Records the successful load of a new entry.
*
* @param loadTime - The number of milliseconds spent loading the entry
*/
recordLoadSuccess(loadTime: number): void {
this.#loadSuccessCount++;
this.#totalLoadTime += loadTime;
}
/**
* Records the failed load of a new entry.
*
* @param loadTime - The number of milliseconds spent attempting to load the entry
*/
recordLoadFailure(loadTime: number): void {
this.#loadFailureCount++;
this.#totalLoadTime += loadTime;
}
/**
* Records cache evictions.
*
* @param count - The number of evictions to record
*/
recordEvictions(count: number): void {
this.#evictionCount += count;
}
/**
* Returns a snapshot of the current statistics.
*
* @returns A snapshot of the current statistics
*/
snapshot(): CacheStats {
return CacheStats.of(
this.#hitCount,
this.#missCount,
this.#loadSuccessCount,
this.#loadFailureCount,
this.#totalLoadTime,
this.#evictionCount
);
}
/**
* Creates a new DefaultStatsCounter.
*
* @returns A new DefaultStatsCounter instance
*/
static create(): DefaultStatsCounter {
return new DefaultStatsCounter();
}
}
type CachingClient = RedisClient<any, any, any, any, any>;
type CmdFunc = () => Promise<ReplyUnion>;
type EvictionPolicy = "LRU" | "FIFO"
/**
* Configuration options for Client Side Cache
*/
export interface ClientSideCacheConfig {
/**
* Time-to-live in milliseconds for cached entries.
* Use 0 for no expiration.
* @default 0
*/
ttl?: number;
/**
* Maximum number of entries to store in the cache.
* Use 0 for unlimited entries.
* @default 0
*/
maxEntries?: number;
/**
* Eviction policy to use when the cache reaches its capacity.
* - "LRU" (Least Recently Used): Evicts least recently accessed entries first
* - "FIFO" (First In First Out): Evicts oldest entries first
* @default "LRU"
*/
evictPolicy?: EvictionPolicy;
/**
* Whether to collect statistics about cache operations.
* @default true
*/
recordStats?: boolean;
}
interface CacheCreator {
epoch: number;
client: CachingClient;
}
interface ClientSideCacheEntry {
invalidate(): void;
validate(): boolean;
}
/**
* Generates a unique cache key from Redis command arguments
*
* @param redisArgs - Array of Redis command arguments
* @returns A unique string key for caching
*/
function generateCacheKey(redisArgs: ReadonlyArray<RedisArgument>): string {
const tmp = new Array(redisArgs.length * 2);
for (let i = 0; i < redisArgs.length; i++) {
tmp[i] = redisArgs[i].length;
tmp[i + redisArgs.length] = redisArgs[i];
}
return tmp.join('_');
}
abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry {
#invalidated = false;
readonly #expireTime: number;
constructor(ttl: number) {
if (ttl == 0) {
this.#expireTime = 0;
} else {
this.#expireTime = Date.now() + ttl;
}
}
invalidate(): void {
this.#invalidated = true;
}
validate(): boolean {
return !this.#invalidated && (this.#expireTime == 0 || (Date.now() < this.#expireTime))
}
}
class ClientSideCacheEntryValue extends ClientSideCacheEntryBase {
readonly #value: any;
get value() {
return this.#value;
}
constructor(ttl: number, value: any) {
super(ttl);
this.#value = value;
}
}
class ClientSideCacheEntryPromise extends ClientSideCacheEntryBase {
readonly #sendCommandPromise: Promise<ReplyUnion>;
get promise() {
return this.#sendCommandPromise;
}
constructor(ttl: number, sendCommandPromise: Promise<ReplyUnion>) {
super(ttl);
this.#sendCommandPromise = sendCommandPromise;
}
}
export abstract class ClientSideCacheProvider extends EventEmitter {
abstract handleCache(client: CachingClient, parser: BasicCommandParser, fn: CmdFunc, transformReply: TransformReply | undefined, typeMapping: TypeMapping | undefined): Promise<any>;
abstract trackingOn(): Array<RedisArgument>;
abstract invalidate(key: RedisArgument | null): void;
abstract clear(): void;
abstract stats(): CacheStats;
abstract onError(): void;
abstract onClose(): void;
}
export class BasicClientSideCache extends ClientSideCacheProvider {
#cacheKeyToEntryMap: Map<string, ClientSideCacheEntry>;
#keyToCacheKeySetMap: Map<string, Set<string>>;
readonly ttl: number;
readonly maxEntries: number;
readonly lru: boolean;
#statsCounter: StatsCounter;
recordEvictions(count: number): void {
this.#statsCounter.recordEvictions(count);
}
recordHits(count: number): void {
this.#statsCounter.recordHits(count);
}
recordMisses(count: number): void {
this.#statsCounter.recordMisses(count);
}
constructor(config?: ClientSideCacheConfig) {
super();
this.#cacheKeyToEntryMap = new Map<string, ClientSideCacheEntry>();
this.#keyToCacheKeySetMap = new Map<string, Set<string>>();
this.ttl = config?.ttl ?? 0;
this.maxEntries = config?.maxEntries ?? 0;
this.lru = config?.evictPolicy !== "FIFO";
const recordStats = config?.recordStats !== false;
this.#statsCounter = recordStats ? DefaultStatsCounter.create() : disabledStatsCounter();
}
/* logic of how caching works:
1. commands use a CommandParser
it enables us to define/retrieve
cacheKey - a unique key that corresponds to this command and its arguments
redisKeys - an array of redis keys as strings that if the key is modified, will cause redis to invalidate this result when cached
2. check if cacheKey is in our cache
2b1. if its a value cacheEntry - return it
2b2. if it's a promise cache entry - wait on promise and then go to 3c.
3. if cacheEntry is not in cache
3a. send the command save the promise into a a cacheEntry and then wait on result
3b. transform reply (if required) based on transformReply
3b. check the cacheEntry is still valid - in cache and hasn't been deleted)
3c. if valid - overwrite with value entry
4. return previously non cached result
*/
override async handleCache(
client: CachingClient,
parser: BasicCommandParser,
fn: CmdFunc,
transformReply?: TransformReply,
typeMapping?: TypeMapping
) {
let reply: ReplyUnion;
const cacheKey = generateCacheKey(parser.redisArgs);
// "2"
let cacheEntry = this.get(cacheKey);
if (cacheEntry) {
// If instanceof is "too slow", can add a "type" and then use an "as" cast to call proper getters.
if (cacheEntry instanceof ClientSideCacheEntryValue) { // "2b1"
this.#statsCounter.recordHits(1);
return structuredClone(cacheEntry.value);
} else if (cacheEntry instanceof ClientSideCacheEntryPromise) { // 2b2
// This counts as a miss since the value hasn't been fully loaded yet.
this.#statsCounter.recordMisses(1);
reply = await cacheEntry.promise;
} else {
throw new Error("unknown cache entry type");
}
} else { // 3/3a
this.#statsCounter.recordMisses(1);
const startTime = performance.now();
const promise = fn();
cacheEntry = this.createPromiseEntry(client, promise);
this.set(cacheKey, cacheEntry, parser.keys);
try {
reply = await promise;
const loadTime = performance.now() - startTime;
this.#statsCounter.recordLoadSuccess(loadTime);
} catch (err) {
const loadTime = performance.now() - startTime;
this.#statsCounter.recordLoadFailure(loadTime);
if (cacheEntry.validate()) {
this.delete(cacheKey!);
}
throw err;
}
}
// 3b
let val;
if (transformReply) {
val = transformReply(reply, parser.preserve, typeMapping);
} else {
val = reply;
}
// 3c
if (cacheEntry.validate()) { // revalidating promise entry (dont save value, if promise entry has been invalidated)
// 3d
cacheEntry = this.createValueEntry(client, val);
this.set(cacheKey, cacheEntry, parser.keys);
this.emit("cached-key", cacheKey);
} else {
// cache entry for key got invalidated between execution and saving, so not saving
}
return structuredClone(val);
}
override trackingOn() {
return ['CLIENT', 'TRACKING', 'ON'];
}
override invalidate(key: RedisArgument | null) {
if (key === null) {
this.clear(false);
this.emit("invalidate", key);
return;
}
const keySet = this.#keyToCacheKeySetMap.get(key.toString());
if (keySet) {
for (const cacheKey of keySet) {
const entry = this.#cacheKeyToEntryMap.get(cacheKey);
if (entry) {
entry.invalidate();
}
this.#cacheKeyToEntryMap.delete(cacheKey);
}
this.#keyToCacheKeySetMap.delete(key.toString());
}
this.emit('invalidate', key);
}
override clear(resetStats = true) {
const oldSize = this.#cacheKeyToEntryMap.size;
this.#cacheKeyToEntryMap.clear();
this.#keyToCacheKeySetMap.clear();
if (resetStats) {
if (!(this.#statsCounter instanceof DisabledStatsCounter)) {
this.#statsCounter = DefaultStatsCounter.create();
}
} else {
// If old entries were evicted due to clear, record them as evictions
if (oldSize > 0) {
this.#statsCounter.recordEvictions(oldSize);
}
}
}
get(cacheKey: string) {
const val = this.#cacheKeyToEntryMap.get(cacheKey);
if (val && !val.validate()) {
this.delete(cacheKey);
this.#statsCounter.recordEvictions(1);
this.emit("cache-evict", cacheKey);
return undefined;
}
if (val !== undefined && this.lru) {
this.#cacheKeyToEntryMap.delete(cacheKey);
this.#cacheKeyToEntryMap.set(cacheKey, val);
}
return val;
}
delete(cacheKey: string) {
const entry = this.#cacheKeyToEntryMap.get(cacheKey);
if (entry) {
entry.invalidate();
this.#cacheKeyToEntryMap.delete(cacheKey);
}
}
has(cacheKey: string) {
return this.#cacheKeyToEntryMap.has(cacheKey);
}
set(cacheKey: string, cacheEntry: ClientSideCacheEntry, keys: Array<RedisArgument>) {
let count = this.#cacheKeyToEntryMap.size;
const oldEntry = this.#cacheKeyToEntryMap.get(cacheKey);
if (oldEntry) {
count--; // overwriting, so not incrementig
oldEntry.invalidate();
}
if (this.maxEntries > 0 && count >= this.maxEntries) {
this.deleteOldest();
this.#statsCounter.recordEvictions(1);
}
this.#cacheKeyToEntryMap.set(cacheKey, cacheEntry);
for (const key of keys) {
if (!this.#keyToCacheKeySetMap.has(key.toString())) {
this.#keyToCacheKeySetMap.set(key.toString(), new Set<string>());
}
const cacheKeySet = this.#keyToCacheKeySetMap.get(key.toString());
cacheKeySet!.add(cacheKey);
}
}
size() {
return this.#cacheKeyToEntryMap.size;
}
createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue {
return new ClientSideCacheEntryValue(this.ttl, value);
}
createPromiseEntry(client: CachingClient, sendCommandPromise: Promise<ReplyUnion>): ClientSideCacheEntryPromise {
return new ClientSideCacheEntryPromise(this.ttl, sendCommandPromise);
}
override stats(): CacheStats {
return this.#statsCounter.snapshot();
}
override onError(): void {
this.clear();
}
override onClose() {
this.clear();
}
/**
* @internal
*/
deleteOldest() {
const it = this.#cacheKeyToEntryMap[Symbol.iterator]();
const n = it.next();
if (!n.done) {
const key = n.value[0];
const entry = this.#cacheKeyToEntryMap.get(key);
if (entry) {
entry.invalidate();
}
this.#cacheKeyToEntryMap.delete(key);
}
}
/**
* Get cache entries for debugging
* @internal
*/
entryEntries(): IterableIterator<[string, ClientSideCacheEntry]> {
return this.#cacheKeyToEntryMap.entries();
}
/**
* Get key set entries for debugging
* @internal
*/
keySetEntries(): IterableIterator<[string, Set<string>]> {
return this.#keyToCacheKeySetMap.entries();
}
}
export abstract class PooledClientSideCacheProvider extends BasicClientSideCache {
#disabled = false;
disable(): void {
this.#disabled = true;
}
enable(): void {
this.#disabled = false;
}
override get(cacheKey: string): ClientSideCacheEntry | undefined {
if (this.#disabled) {
return undefined;
}
return super.get(cacheKey);
}
override has(cacheKey: string): boolean {
if (this.#disabled) {
return false;
}
return super.has(cacheKey);
}
onPoolClose(): void {
this.clear();
}
}
export class BasicPooledClientSideCache extends PooledClientSideCacheProvider {
override onError() {
this.clear(false);
}
override onClose() {
this.clear(false);
}
}
class PooledClientSideCacheEntryValue extends ClientSideCacheEntryValue {
#creator: CacheCreator;
constructor(ttl: number, creator: CacheCreator, value: any) {
super(ttl, value);
this.#creator = creator;
}
override validate(): boolean {
let ret = super.validate();
if (this.#creator) {
ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch
}
return ret;
}
}
class PooledClientSideCacheEntryPromise extends ClientSideCacheEntryPromise {
#creator: CacheCreator;
constructor(ttl: number, creator: CacheCreator, sendCommandPromise: Promise<ReplyUnion>) {
super(ttl, sendCommandPromise);
this.#creator = creator;
}
override validate(): boolean {
let ret = super.validate();
return ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch
}
}
export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache {
override createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue {
const creator = {
epoch: client.socketEpoch,
client: client
};
return new PooledClientSideCacheEntryValue(this.ttl, creator, value);
}
override createPromiseEntry(client: CachingClient, sendCommandPromise: Promise<ReplyUnion>): ClientSideCacheEntryPromise {
const creator = {
epoch: client.socketEpoch,
client: client
};
return new PooledClientSideCacheEntryPromise(this.ttl, creator, sendCommandPromise);
}
override onError() { }
override onClose() { }
}

View File

@@ -56,6 +56,8 @@ export default class RedisCommandsQueue {
return this.#pubSub.isActive; return this.#pubSub.isActive;
} }
#invalidateCallback?: (key: RedisArgument | null) => unknown;
constructor( constructor(
respVersion: RespVersions, respVersion: RespVersions,
maxLength: number | null | undefined, maxLength: number | null | undefined,
@@ -107,15 +109,34 @@ export default class RedisCommandsQueue {
return new Decoder({ return new Decoder({
onReply: reply => this.#onReply(reply), onReply: reply => this.#onReply(reply),
onErrorReply: err => this.#onErrorReply(err), onErrorReply: err => this.#onErrorReply(err),
//TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used
onPush: push => { onPush: push => {
if (!this.#onPush(push)) { if (!this.#onPush(push)) {
// currently only supporting "invalidate" over RESP3 push messages
switch (push[0].toString()) {
case "invalidate": {
if (this.#invalidateCallback) {
if (push[1] !== null) {
for (const key of push[1]) {
this.#invalidateCallback(key);
}
} else {
this.#invalidateCallback(null);
}
}
break;
}
}
} }
}, },
getTypeMapping: () => this.#getTypeMapping() getTypeMapping: () => this.#getTypeMapping()
}); });
} }
setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
this.#invalidateCallback = callback;
}
addCommand<T>( addCommand<T>(
args: ReadonlyArray<RedisArgument>, args: ReadonlyArray<RedisArgument>,
options?: CommandOptions options?: CommandOptions

View File

@@ -24,6 +24,40 @@ export const SQUARE_SCRIPT = defineScript({
}); });
describe('Client', () => { describe('Client', () => {
describe('initialization', () => {
describe('clientSideCache validation', () => {
const clientSideCacheConfig = { ttl: 0, maxEntries: 0 };
it('should throw error when clientSideCache is enabled with RESP 2', () => {
assert.throws(
() => new RedisClient({
clientSideCache: clientSideCacheConfig,
RESP: 2,
}),
new Error('Client Side Caching is only supported with RESP3')
);
});
it('should throw error when clientSideCache is enabled with RESP undefined', () => {
assert.throws(
() => new RedisClient({
clientSideCache: clientSideCacheConfig,
}),
new Error('Client Side Caching is only supported with RESP3')
);
});
it('should not throw when clientSideCache is enabled with RESP 3', () => {
assert.doesNotThrow(() =>
new RedisClient({
clientSideCache: clientSideCacheConfig,
RESP: 3,
})
);
});
});
});
describe('parseURL', () => { describe('parseURL', () => {
it('redis://user:secret@localhost:6379/0', async () => { it('redis://user:secret@localhost:6379/0', async () => {
const result = RedisClient.parseURL('redis://user:secret@localhost:6379/0'); const result = RedisClient.parseURL('redis://user:secret@localhost:6379/0');
@@ -51,8 +85,8 @@ describe('Client', () => {
// Compare non-function properties // Compare non-function properties
assert.deepEqual(resultRest, expectedRest); assert.deepEqual(resultRest, expectedRest);
if(result.credentialsProvider.type === 'async-credentials-provider' if (result?.credentialsProvider?.type === 'async-credentials-provider'
&& expected.credentialsProvider.type === 'async-credentials-provider') { && expected?.credentialsProvider?.type === 'async-credentials-provider') {
// Compare the actual output of the credentials functions // Compare the actual output of the credentials functions
const resultCreds = await result.credentialsProvider.credentials(); const resultCreds = await result.credentialsProvider.credentials();
@@ -91,10 +125,10 @@ describe('Client', () => {
// Compare non-function properties // Compare non-function properties
assert.deepEqual(resultRest, expectedRest); assert.deepEqual(resultRest, expectedRest);
assert.equal(resultCredProvider.type, expectedCredProvider.type); assert.equal(resultCredProvider?.type, expectedCredProvider?.type);
if (result.credentialsProvider.type === 'async-credentials-provider' && if (result?.credentialsProvider?.type === 'async-credentials-provider' &&
expected.credentialsProvider.type === 'async-credentials-provider') { expected?.credentialsProvider?.type === 'async-credentials-provider') {
// Compare the actual output of the credentials functions // Compare the actual output of the credentials functions
const resultCreds = await result.credentialsProvider.credentials(); const resultCreds = await result.credentialsProvider.credentials();

View File

@@ -16,6 +16,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
import { RedisPoolOptions, RedisClientPool } from './pool'; import { RedisPoolOptions, RedisClientPool } from './pool';
import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers'; import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers';
import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache';
import { BasicCommandParser, CommandParser } from './parser'; import { BasicCommandParser, CommandParser } from './parser';
import SingleEntryCache from '../single-entry-cache'; import SingleEntryCache from '../single-entry-cache';
@@ -78,9 +79,62 @@ export interface RedisClientOptions<
*/ */
pingInterval?: number; pingInterval?: number;
/** /**
* TODO * Default command options to be applied to all commands executed through this client.
*
* These options can be overridden on a per-command basis when calling specific commands.
*
* @property {symbol} [chainId] - Identifier for chaining commands together
* @property {boolean} [asap] - When true, the command is executed as soon as possible
* @property {AbortSignal} [abortSignal] - AbortSignal to cancel the command
* @property {TypeMapping} [typeMapping] - Custom type mappings between RESP and JavaScript types
*
* @example Setting default command options
* ```
* const client = createClient({
* commandOptions: {
* asap: true,
* typeMapping: {
* // Custom type mapping configuration
* }
* }
* });
* ```
*/ */
commandOptions?: CommandOptions<TYPE_MAPPING>; commandOptions?: CommandOptions<TYPE_MAPPING>;
/**
* Client Side Caching configuration.
*
* Enables Redis Servers and Clients to work together to cache results from commands
* sent to a server. The server will notify the client when cached results are no longer valid.
*
* Note: Client Side Caching is only supported with RESP3.
*
* @example Anonymous cache configuration
* ```
* const client = createClient({
* RESP: 3,
* clientSideCache: {
* ttl: 0,
* maxEntries: 0,
* evictPolicy: "LRU"
* }
* });
* ```
*
* @example Using a controllable cache
* ```
* const cache = new BasicClientSideCache({
* ttl: 0,
* maxEntries: 0,
* evictPolicy: "LRU"
* });
* const client = createClient({
* RESP: 3,
* clientSideCache: cache
* });
* ```
*/
clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig;
} }
type WithCommands< type WithCommands<
@@ -313,10 +367,13 @@ export default class RedisClient<
// was in a watch transaction when // was in a watch transaction when
// a topology change occured // a topology change occured
#dirtyWatch?: string; #dirtyWatch?: string;
#epoch: number;
#watchEpoch?: number; #watchEpoch?: number;
#clientSideCache?: ClientSideCacheProvider;
#credentialsSubscription: Disposable | null = null; #credentialsSubscription: Disposable | null = null;
get clientSideCache() {
return this._self.#clientSideCache;
}
get options(): RedisClientOptions<M, F, S, RESP> | undefined { get options(): RedisClientOptions<M, F, S, RESP> | undefined {
return this._self.#options; return this._self.#options;
@@ -334,6 +391,10 @@ export default class RedisClient<
return this._self.#queue.isPubSubActive; return this._self.#queue.isPubSubActive;
} }
get socketEpoch() {
return this._self.#socket.socketEpoch;
}
get isWatching() { get isWatching() {
return this._self.#watchEpoch !== undefined; return this._self.#watchEpoch !== undefined;
} }
@@ -358,12 +419,28 @@ export default class RedisClient<
constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) { constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
super(); super();
this.#validateOptions(options)
this.#options = this.#initiateOptions(options); this.#options = this.#initiateOptions(options);
this.#queue = this.#initiateQueue(); this.#queue = this.#initiateQueue();
this.#socket = this.#initiateSocket(); this.#socket = this.#initiateSocket();
this.#epoch = 0;
if (options?.clientSideCache) {
if (options.clientSideCache instanceof ClientSideCacheProvider) {
this.#clientSideCache = options.clientSideCache;
} else {
const cscConfig = options.clientSideCache;
this.#clientSideCache = new BasicClientSideCache(cscConfig);
}
this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache));
}
} }
#validateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
if (options?.clientSideCache && options?.RESP !== 3) {
throw new Error('Client Side Caching is only supported with RESP3');
}
}
#initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined { #initiateOptions(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>): RedisClientOptions<M, F, S, RESP, TYPE_MAPPING> | undefined {
// Convert username/password to credentialsProvider if no credentialsProvider is already in place // Convert username/password to credentialsProvider if no credentialsProvider is already in place
@@ -522,6 +599,10 @@ export default class RedisClient<
); );
} }
if (this.#clientSideCache) {
commands.push(this.#clientSideCache.trackingOn());
}
return commands; return commands;
} }
@@ -575,6 +656,7 @@ export default class RedisClient<
}) })
.on('error', err => { .on('error', err => {
this.emit('error', err); this.emit('error', err);
this.#clientSideCache?.onError();
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
this.#queue.flushWaitingForReply(err); this.#queue.flushWaitingForReply(err);
} else { } else {
@@ -583,7 +665,6 @@ export default class RedisClient<
}) })
.on('connect', () => this.emit('connect')) .on('connect', () => this.emit('connect'))
.on('ready', () => { .on('ready', () => {
this.#epoch++;
this.emit('ready'); this.emit('ready');
this.#setPingTimer(); this.#setPingTimer();
this.#maybeScheduleWrite(); this.#maybeScheduleWrite();
@@ -711,15 +792,22 @@ export default class RedisClient<
commandOptions: CommandOptions<TYPE_MAPPING> | undefined, commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
transformReply: TransformReply | undefined, transformReply: TransformReply | undefined,
) { ) {
const reply = await this.sendCommand(parser.redisArgs, commandOptions); const csc = this._self.#clientSideCache;
const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions;
const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) };
if (csc && command.CACHEABLE && defaultTypeMapping) {
return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping);
} else {
const reply = await fn();
if (transformReply) { if (transformReply) {
const res = transformReply(reply, parser.preserve, commandOptions?.typeMapping); return transformReply(reply, parser.preserve, commandOptions?.typeMapping);
return res
} }
return reply; return reply;
} }
}
/** /**
* @internal * @internal
@@ -883,7 +971,7 @@ export default class RedisClient<
const reply = await this._self.sendCommand( const reply = await this._self.sendCommand(
pushVariadicArguments(['WATCH'], key) pushVariadicArguments(['WATCH'], key)
); );
this._self.#watchEpoch ??= this._self.#epoch; this._self.#watchEpoch ??= this._self.socketEpoch;
return reply as unknown as ReplyWithTypeMapping<SimpleStringReply<'OK'>, TYPE_MAPPING>; return reply as unknown as ReplyWithTypeMapping<SimpleStringReply<'OK'>, TYPE_MAPPING>;
} }
@@ -986,7 +1074,7 @@ export default class RedisClient<
throw new WatchError(dirtyWatch); throw new WatchError(dirtyWatch);
} }
if (watchEpoch && watchEpoch !== this._self.#epoch) { if (watchEpoch && watchEpoch !== this._self.socketEpoch) {
throw new WatchError('Client reconnected after WATCH'); throw new WatchError('Client reconnected after WATCH');
} }
@@ -1210,6 +1298,7 @@ export default class RedisClient<
return new Promise<void>(resolve => { return new Promise<void>(resolve => {
clearTimeout(this._self.#pingTimer); clearTimeout(this._self.#pingTimer);
this._self.#socket.close(); this._self.#socket.close();
this._self.#clientSideCache?.onClose();
if (this._self.#queue.isEmpty()) { if (this._self.#queue.isEmpty()) {
this._self.#socket.destroySocket(); this._self.#socket.destroySocket();
@@ -1236,6 +1325,7 @@ export default class RedisClient<
clearTimeout(this._self.#pingTimer); clearTimeout(this._self.#pingTimer);
this._self.#queue.flushAll(new DisconnectsClientError()); this._self.#queue.flushAll(new DisconnectsClientError());
this._self.#socket.destroy(); this._self.#socket.destroy();
this._self.#clientSideCache?.onClose();
this._self.#credentialsSubscription?.dispose(); this._self.#credentialsSubscription?.dispose();
this._self.#credentialsSubscription = null; this._self.#credentialsSubscription = null;
} }

View File

@@ -114,6 +114,7 @@ export class DoublyLinkedList<T> {
export interface SinglyLinkedNode<T> { export interface SinglyLinkedNode<T> {
value: T; value: T;
next: SinglyLinkedNode<T> | undefined; next: SinglyLinkedNode<T> | undefined;
removed: boolean;
} }
export class SinglyLinkedList<T> { export class SinglyLinkedList<T> {
@@ -140,7 +141,8 @@ export class SinglyLinkedList<T> {
const node = { const node = {
value, value,
next: undefined next: undefined,
removed: false
}; };
if (this.#head === undefined) { if (this.#head === undefined) {
@@ -151,6 +153,9 @@ export class SinglyLinkedList<T> {
} }
remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) { remove(node: SinglyLinkedNode<T>, parent: SinglyLinkedNode<T> | undefined) {
if (node.removed) {
throw new Error("node already removed");
}
--this.#length; --this.#length;
if (this.#head === node) { if (this.#head === node) {
@@ -165,6 +170,8 @@ export class SinglyLinkedList<T> {
} else { } else {
parent!.next = node.next; parent!.next = node.next;
} }
node.removed = true;
} }
shift() { shift() {
@@ -177,6 +184,7 @@ export class SinglyLinkedList<T> {
this.#head = node.next; this.#head = node.next;
} }
node.removed = true;
return node.value; return node.value;
} }

View File

@@ -33,6 +33,17 @@ export class BasicCommandParser implements CommandParser {
return this.#keys[0]; return this.#keys[0];
} }
get cacheKey() {
const tmp = new Array(this.#redisArgs.length*2);
for (let i = 0; i < this.#redisArgs.length; i++) {
tmp[i] = this.#redisArgs[i].length;
tmp[i+this.#redisArgs.length] = this.#redisArgs[i];
}
return tmp.join('_');
}
push(...arg: Array<RedisArgument>) { push(...arg: Array<RedisArgument>) {
this.#redisArgs.push(...arg); this.#redisArgs.push(...arg);
}; };

View File

@@ -7,6 +7,7 @@ import { TimeoutError } from '../errors';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { CommandOptions } from './commands-queue'; import { CommandOptions } from './commands-queue';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider } from './cache';
import { BasicCommandParser } from './parser'; import { BasicCommandParser } from './parser';
import SingleEntryCache from '../single-entry-cache'; import SingleEntryCache from '../single-entry-cache';
@@ -24,11 +25,55 @@ export interface RedisPoolOptions {
*/ */
acquireTimeout: number; acquireTimeout: number;
/** /**
* TODO * The delay in milliseconds before a cleanup operation is performed on idle clients.
*
* After this delay, the pool will check if there are too many idle clients and destroy
* excess ones to maintain optimal pool size.
*/ */
cleanupDelay: number; cleanupDelay: number;
/** /**
* TODO * Client Side Caching configuration for the pool.
*
* Enables Redis Servers and Clients to work together to cache results from commands
* sent to a server. The server will notify the client when cached results are no longer valid.
* In pooled mode, the cache is shared across all clients in the pool.
*
* Note: Client Side Caching is only supported with RESP3.
*
* @example Anonymous cache configuration
* ```
* const client = createClientPool({RESP: 3}, {
* clientSideCache: {
* ttl: 0,
* maxEntries: 0,
* evictPolicy: "LRU"
* },
* minimum: 5
* });
* ```
*
* @example Using a controllable cache
* ```
* const cache = new BasicPooledClientSideCache({
* ttl: 0,
* maxEntries: 0,
* evictPolicy: "LRU"
* });
* const client = createClientPool({RESP: 3}, {
* clientSideCache: cache,
* minimum: 5
* });
* ```
*/
clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig;
/**
* Enable experimental support for RESP3 module commands.
*
* When enabled, allows the use of module commands that have been adapted
* for the RESP3 protocol. This is an unstable feature and may change in
* future versions.
*
* @default false
*/ */
unstableResp3Modules?: boolean; unstableResp3Modules?: boolean;
} }
@@ -120,7 +165,7 @@ export class RedisClientPool<
RESP extends RespVersions, RESP extends RespVersions,
TYPE_MAPPING extends TypeMapping = {} TYPE_MAPPING extends TypeMapping = {}
>( >(
clientOptions?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>, clientOptions?: Omit<RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>, "clientSideCache">,
options?: Partial<RedisPoolOptions> options?: Partial<RedisPoolOptions>
) { ) {
@@ -142,7 +187,7 @@ export class RedisClientPool<
// returning a "proxy" to prevent the namespaces._self to leak between "proxies" // returning a "proxy" to prevent the namespaces._self to leak between "proxies"
return Object.create( return Object.create(
new Pool( new Pool(
RedisClient.factory(clientOptions).bind(undefined, clientOptions), clientOptions,
options options
) )
) as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>; ) as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>;
@@ -216,22 +261,41 @@ export class RedisClientPool<
return this._self.#isClosing; return this._self.#isClosing;
} }
#clientSideCache?: PooledClientSideCacheProvider;
get clientSideCache() {
return this._self.#clientSideCache;
}
/** /**
* You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`}, * You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`},
* {@link RedisClientPool.fromClient `RedisClientPool.fromClient`}, * {@link RedisClientPool.fromClient `RedisClientPool.fromClient`},
* or {@link RedisClientPool.fromOptions `RedisClientPool.fromOptions`}... * or {@link RedisClientPool.fromOptions `RedisClientPool.fromOptions`}...
*/ */
constructor( constructor(
clientFactory: () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>, clientOptions?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>,
options?: Partial<RedisPoolOptions> options?: Partial<RedisPoolOptions>
) { ) {
super(); super();
this.#clientFactory = clientFactory;
this.#options = { this.#options = {
...RedisClientPool.#DEFAULTS, ...RedisClientPool.#DEFAULTS,
...options ...options
}; };
if (options?.clientSideCache) {
if (clientOptions === undefined) {
clientOptions = {};
}
if (options.clientSideCache instanceof PooledClientSideCacheProvider) {
this.#clientSideCache = clientOptions.clientSideCache = options.clientSideCache;
} else {
const cscConfig = options.clientSideCache;
this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig);
// this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
}
}
this.#clientFactory = RedisClient.factory(clientOptions).bind(undefined, clientOptions) as () => RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
} }
private _self = this; private _self = this;
@@ -295,7 +359,6 @@ export class RedisClientPool<
async connect() { async connect() {
if (this._self.#isOpen) return; // TODO: throw error? if (this._self.#isOpen) return; // TODO: throw error?
this._self.#isOpen = true; this._self.#isOpen = true;
const promises = []; const promises = [];
@@ -305,11 +368,12 @@ export class RedisClientPool<
try { try {
await Promise.all(promises); await Promise.all(promises);
return this as unknown as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>;
} catch (err) { } catch (err) {
this.destroy(); this.destroy();
throw err; throw err;
} }
return this as unknown as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>;
} }
async #create() { async #create() {
@@ -319,7 +383,8 @@ export class RedisClientPool<
); );
try { try {
await node.value.connect(); const client = node.value;
await client.connect();
} catch (err) { } catch (err) {
this._self.#clientsInUse.remove(node); this._self.#clientsInUse.remove(node);
throw err; throw err;
@@ -408,7 +473,8 @@ export class RedisClientPool<
const toDestroy = Math.min(this.#idleClients.length, this.totalClients - this.#options.minimum); const toDestroy = Math.min(this.#idleClients.length, this.totalClients - this.#options.minimum);
for (let i = 0; i < toDestroy; i++) { for (let i = 0; i < toDestroy; i++) {
// TODO: shift vs pop // TODO: shift vs pop
this.#idleClients.shift()!.destroy(); const client = this.#idleClients.shift()!
client.destroy();
} }
} }
@@ -449,6 +515,8 @@ export class RedisClientPool<
await Promise.all(promises); await Promise.all(promises);
this.#clientSideCache?.onPoolClose();
this._self.#idleClients.reset(); this._self.#idleClients.reset();
this._self.#clientsInUse.reset(); this._self.#clientsInUse.reset();
} catch (err) { } catch (err) {
@@ -467,6 +535,9 @@ export class RedisClientPool<
for (const client of this._self.#clientsInUse) { for (const client of this._self.#clientsInUse) {
client.destroy(); client.destroy();
} }
this._self.#clientSideCache?.onPoolClose();
this._self.#clientsInUse.reset(); this._self.#clientsInUse.reset();
this._self.#isOpen = false; this._self.#isOpen = false;

View File

@@ -72,6 +72,12 @@ export default class RedisSocket extends EventEmitter {
#isSocketUnrefed = false; #isSocketUnrefed = false;
#socketEpoch = 0;
get socketEpoch() {
return this.#socketEpoch;
}
constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) {
super(); super();
@@ -212,6 +218,7 @@ export default class RedisSocket extends EventEmitter {
throw err; throw err;
} }
this.#isReady = true; this.#isReady = true;
this.#socketEpoch++;
this.emit('ready'); this.emit('ready');
} catch (err) { } catch (err) {
const retryIn = this.#shouldReconnect(retries++, err as Error); const retryIn = this.#shouldReconnect(retries++, err as Error);

View File

@@ -0,0 +1,48 @@
import { strict as assert } from 'node:assert';
import { EventEmitter } from 'node:events';
import { RedisClusterOptions, RedisClusterClientOptions } from './index';
import RedisClusterSlots from './cluster-slots';
describe('RedisClusterSlots', () => {
describe('initialization', () => {
describe('clientSideCache validation', () => {
const mockEmit = ((_event: string | symbol, ..._args: any[]): boolean => true) as EventEmitter['emit'];
const clientSideCacheConfig = { ttl: 0, maxEntries: 0 };
const rootNodes: Array<RedisClusterClientOptions> = [
{ socket: { host: 'localhost', port: 30001 } }
];
it('should throw error when clientSideCache is enabled with RESP 2', () => {
assert.throws(
() => new RedisClusterSlots({
rootNodes,
clientSideCache: clientSideCacheConfig,
RESP: 2 as const,
}, mockEmit),
new Error('Client Side Caching is only supported with RESP3')
);
});
it('should throw error when clientSideCache is enabled with RESP undefined', () => {
assert.throws(
() => new RedisClusterSlots({
rootNodes,
clientSideCache: clientSideCacheConfig,
}, mockEmit),
new Error('Client Side Caching is only supported with RESP3')
);
});
it('should not throw when clientSideCache is enabled with RESP 3', () => {
assert.doesNotThrow(() =>
new RedisClusterSlots({
rootNodes,
clientSideCache: clientSideCacheConfig,
RESP: 3 as const,
}, mockEmit)
);
});
});
});
});

View File

@@ -6,6 +6,7 @@ import { ChannelListeners, PUBSUB_TYPE, PubSubTypeListeners } from '../client/pu
import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
import calculateSlot from 'cluster-key-slot'; import calculateSlot from 'cluster-key-slot';
import { RedisSocketOptions } from '../client/socket'; import { RedisSocketOptions } from '../client/socket';
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';
interface NodeAddress { interface NodeAddress {
host: string; host: string;
@@ -111,6 +112,7 @@ export default class RedisClusterSlots<
replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>(); replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>(); readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>; pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>;
clientSideCache?: PooledClientSideCacheProvider;
#isOpen = false; #isOpen = false;
@@ -118,12 +120,28 @@ export default class RedisClusterSlots<
return this.#isOpen; return this.#isOpen;
} }
#validateOptions(options?: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING>) {
if (options?.clientSideCache && options?.RESP !== 3) {
throw new Error('Client Side Caching is only supported with RESP3');
}
}
constructor( constructor(
options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING>, options: RedisClusterOptions<M, F, S, RESP, TYPE_MAPPING>,
emit: EventEmitter['emit'] emit: EventEmitter['emit']
) { ) {
this.#validateOptions(options);
this.#options = options; this.#options = options;
this.#clientFactory = RedisClient.factory(options);
if (options?.clientSideCache) {
if (options.clientSideCache instanceof PooledClientSideCacheProvider) {
this.clientSideCache = options.clientSideCache;
} else {
this.clientSideCache = new BasicPooledClientSideCache(options.clientSideCache)
}
}
this.#clientFactory = RedisClient.factory(this.#options);
this.#emit = emit; this.#emit = emit;
} }
@@ -164,6 +182,8 @@ export default class RedisClusterSlots<
} }
async #discover(rootNode: RedisClusterClientOptions) { async #discover(rootNode: RedisClusterClientOptions) {
this.clientSideCache?.clear();
this.clientSideCache?.disable();
try { try {
const addressesInUse = new Set<string>(), const addressesInUse = new Set<string>(),
promises: Array<Promise<unknown>> = [], promises: Array<Promise<unknown>> = [],
@@ -219,6 +239,7 @@ export default class RedisClusterSlots<
} }
await Promise.all(promises); await Promise.all(promises);
this.clientSideCache?.enable();
return true; return true;
} catch (err) { } catch (err) {
@@ -314,6 +335,8 @@ export default class RedisClusterSlots<
#createClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly = node.readonly) { #createClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly = node.readonly) {
return this.#clientFactory( return this.#clientFactory(
this.#clientOptionsDefaults({ this.#clientOptionsDefaults({
clientSideCache: this.clientSideCache,
RESP: this.#options.RESP,
socket: this.#getNodeAddress(node.address) ?? { socket: this.#getNodeAddress(node.address) ?? {
host: node.host, host: node.host,
port: node.port port: node.port

View File

@@ -9,11 +9,10 @@ import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-
import { PubSubListener } from '../client/pub-sub'; import { PubSubListener } from '../client/pub-sub';
import { ErrorReply } from '../errors'; import { ErrorReply } from '../errors';
import { RedisTcpSocketOptions } from '../client/socket'; import { RedisTcpSocketOptions } from '../client/socket';
import ASKING from '../commands/ASKING'; import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
import { BasicCommandParser } from '../client/parser'; import { BasicCommandParser } from '../client/parser';
import { parseArgs } from '../commands/generic-transformers'; import { ASKING_CMD } from '../commands/ASKING';
import SingleEntryCache from '../single-entry-cache'; import SingleEntryCache from '../single-entry-cache'
interface ClusterCommander< interface ClusterCommander<
M extends RedisModules, M extends RedisModules,
F extends RedisFunctions, F extends RedisFunctions,
@@ -67,6 +66,41 @@ export interface RedisClusterOptions<
* Useful when the cluster is running on another network * Useful when the cluster is running on another network
*/ */
nodeAddressMap?: NodeAddressMap; nodeAddressMap?: NodeAddressMap;
/**
* Client Side Caching configuration for the pool.
*
* Enables Redis Servers and Clients to work together to cache results from commands
* sent to a server. The server will notify the client when cached results are no longer valid.
* In pooled mode, the cache is shared across all clients in the pool.
*
* Note: Client Side Caching is only supported with RESP3.
*
* @example Anonymous cache configuration
* ```
* const client = createCluster({
* clientSideCache: {
* ttl: 0,
* maxEntries: 0,
* evictPolicy: "LRU"
* },
* minimum: 5
* });
* ```
*
* @example Using a controllable cache
* ```
* const cache = new BasicPooledClientSideCache({
* ttl: 0,
* maxEntries: 0,
* evictPolicy: "LRU"
* });
* const client = createCluster({
* clientSideCache: cache,
* minimum: 5
* });
* ```
*/
clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig;
} }
// remove once request & response policies are ready // remove once request & response policies are ready
@@ -149,6 +183,7 @@ export default class RedisCluster<
> extends EventEmitter { > extends EventEmitter {
static #createCommand(command: Command, resp: RespVersions) { static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp); const transformReply = getTransformReply(command, resp);
return async function (this: ProxyCluster, ...args: Array<unknown>) { return async function (this: ProxyCluster, ...args: Array<unknown>) {
const parser = new BasicCommandParser(); const parser = new BasicCommandParser();
command.parseCommand(parser, ...args); command.parseCommand(parser, ...args);
@@ -273,6 +308,10 @@ export default class RedisCluster<
return this._self.#slots.slots; return this._self.#slots.slots;
} }
get clientSideCache() {
return this._self.#slots.clientSideCache;
}
/** /**
* An array of the cluster masters. * An array of the cluster masters.
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node.
@@ -390,6 +429,27 @@ export default class RedisCluster<
// return this._commandOptionsProxy('policies', policies); // return this._commandOptionsProxy('policies', policies);
// } // }
#handleAsk<T>(
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, opts?: ClusterCommandOptions) => Promise<T>
) {
return async (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, options?: ClusterCommandOptions) => {
const chainId = Symbol("asking chain");
const opts = options ? {...options} : {};
opts.chainId = chainId;
const ret = await Promise.all(
[
client.sendCommand([ASKING_CMD], {chainId: chainId}),
fn(client, opts)
]
);
return ret[1];
};
}
async #execute<T>( async #execute<T>(
firstKey: RedisArgument | undefined, firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined, isReadonly: boolean | undefined,
@@ -399,14 +459,15 @@ export default class RedisCluster<
const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16;
let client = await this.#slots.getClient(firstKey, isReadonly); let client = await this.#slots.getClient(firstKey, isReadonly);
let i = 0; let i = 0;
let myOpts = options;
let myFn = fn;
while (true) { while (true) {
try { try {
return await fn(client, myOpts); return await myFn(client, options);
} catch (err) { } catch (err) {
// reset to passed in options, if changed by an ask request myFn = fn;
myOpts = options;
// TODO: error class // TODO: error class
if (++i > maxCommandRedirections || !(err instanceof Error)) { if (++i > maxCommandRedirections || !(err instanceof Error)) {
throw err; throw err;
@@ -425,13 +486,7 @@ export default class RedisCluster<
} }
client = redirectTo; client = redirectTo;
myFn = this.#handleAsk(fn);
const chainId = Symbol('Asking Chain');
myOpts = options ? {...options} : {};
myOpts.chainId = chainId;
client.sendCommand(parseArgs(ASKING), {chainId: chainId}).catch(err => { console.log(`Asking Failed: ${err}`) } );
continue; continue;
} }
@@ -582,10 +637,12 @@ export default class RedisCluster<
} }
close() { close() {
this._self.#slots.clientSideCache?.onPoolClose();
return this._self.#slots.close(); return this._self.#slots.close();
} }
destroy() { destroy() {
this._self.#slots.clientSideCache?.onPoolClose();
return this._self.#slots.destroy(); return this._self.#slots.destroy();
} }

View File

@@ -29,12 +29,7 @@ export function parseGeoSearchArguments(
from: GeoSearchFrom, from: GeoSearchFrom,
by: GeoSearchBy, by: GeoSearchBy,
options?: GeoSearchOptions, options?: GeoSearchOptions,
store?: RedisArgument
) { ) {
if (store !== undefined) {
parser.pushKey(store);
}
parser.pushKey(key); parser.pushKey(key);
if (typeof from === 'string' || from instanceof Buffer) { if (typeof from === 'string' || from instanceof Buffer) {

View File

@@ -17,7 +17,12 @@ export default {
options?: GeoSearchStoreOptions options?: GeoSearchStoreOptions
) { ) {
parser.push('GEOSEARCHSTORE'); parser.push('GEOSEARCHSTORE');
parseGeoSearchArguments(parser, source, from, by, options, destination);
if (destination !== undefined) {
parser.pushKey(destination);
}
parseGeoSearchArguments(parser, source, from, by, options);
if (options?.STOREDIST) { if (options?.STOREDIST) {
parser.push('STOREDIST'); parser.push('STOREDIST');

View File

@@ -5,11 +5,59 @@ import { RESP_TYPES } from '../RESP/decoder';
import { WatchError } from "../errors"; import { WatchError } from "../errors";
import { RedisSentinelConfig, SentinelFramework } from "./test-util"; import { RedisSentinelConfig, SentinelFramework } from "./test-util";
import { RedisSentinelEvent, RedisSentinelType, RedisSentinelClientType, RedisNode } from "./types"; import { RedisSentinelEvent, RedisSentinelType, RedisSentinelClientType, RedisNode } from "./types";
import RedisSentinel from "./index";
import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, NumberReply } from '../RESP/types'; import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, NumberReply } from '../RESP/types';
import { promisify } from 'node:util'; import { promisify } from 'node:util';
import { exec } from 'node:child_process'; import { exec } from 'node:child_process';
import { BasicPooledClientSideCache } from '../client/cache'
import { once } from 'node:events'
const execAsync = promisify(exec); const execAsync = promisify(exec);
describe('RedisSentinel', () => {
describe('initialization', () => {
describe('clientSideCache validation', () => {
const clientSideCacheConfig = { ttl: 0, maxEntries: 0 };
const options = {
name: 'mymaster',
sentinelRootNodes: [
{ host: 'localhost', port: 26379 }
]
};
it('should throw error when clientSideCache is enabled with RESP 2', () => {
assert.throws(
() => RedisSentinel.create({
...options,
clientSideCache: clientSideCacheConfig,
RESP: 2 as const,
}),
new Error('Client Side Caching is only supported with RESP3')
);
});
it('should throw error when clientSideCache is enabled with RESP undefined', () => {
assert.throws(
() => RedisSentinel.create({
...options,
clientSideCache: clientSideCacheConfig,
}),
new Error('Client Side Caching is only supported with RESP3')
);
});
it('should not throw when clientSideCache is enabled with RESP 3', () => {
assert.doesNotThrow(() =>
RedisSentinel.create({
...options,
clientSideCache: clientSideCacheConfig,
RESP: 3 as const,
})
);
});
});
});
});
[GLOBAL.SENTINEL.OPEN, GLOBAL.SENTINEL.PASSWORD].forEach(testOptions => { [GLOBAL.SENTINEL.OPEN, GLOBAL.SENTINEL.PASSWORD].forEach(testOptions => {
const passIndex = testOptions.serverArguments.indexOf('--requirepass')+1; const passIndex = testOptions.serverArguments.indexOf('--requirepass')+1;
let password: string | undefined = undefined; let password: string | undefined = undefined;
@@ -967,6 +1015,34 @@ describe.skip('legacy tests', () => {
tracer.push("added node and waiting on added promise"); tracer.push("added node and waiting on added promise");
await nodeAddedPromise; await nodeAddedPromise;
}) })
it('with client side caching', async function() {
this.timeout(30000);
const csc = new BasicPooledClientSideCache();
sentinel = frame.getSentinelClient({nodeClientOptions: {RESP: 3}, clientSideCache: csc, masterPoolSize: 5});
await sentinel.connect();
await sentinel.set('x', 1);
await sentinel.get('x');
await sentinel.get('x');
await sentinel.get('x');
await sentinel.get('x');
assert.equal(1, csc.cacheMisses());
assert.equal(3, csc.cacheHits());
const invalidatePromise = once(csc, 'invalidate');
await sentinel.set('x', 2);
await invalidatePromise;
await sentinel.get('x');
await sentinel.get('x');
await sentinel.get('x');
await sentinel.get('x');
assert.equal(csc.cacheMisses(), 2);
assert.equal(csc.cacheHits(), 6);
})
}); });
}); });

View File

@@ -16,6 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers';
import { WaitQueue } from './wait-queue'; import { WaitQueue } from './wait-queue';
import { TcpNetConnectOpts } from 'node:net'; import { TcpNetConnectOpts } from 'node:net';
import { RedisTcpSocketOptions } from '../client/socket'; import { RedisTcpSocketOptions } from '../client/socket';
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';
interface ClientInfo { interface ClientInfo {
id: number; id: number;
@@ -301,6 +302,10 @@ export default class RedisSentinel<
#masterClientCount = 0; #masterClientCount = 0;
#masterClientInfo?: ClientInfo; #masterClientInfo?: ClientInfo;
get clientSideCache() {
return this._self.#internal.clientSideCache;
}
constructor(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) { constructor(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
super(); super();
@@ -617,7 +622,7 @@ class RedisSentinelInternal<
readonly #name: string; readonly #name: string;
readonly #nodeClientOptions: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>; readonly #nodeClientOptions: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
readonly #sentinelClientOptions: RedisClientOptions<typeof RedisSentinelModule, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>; readonly #sentinelClientOptions: RedisClientOptions<typeof RedisSentinelModule, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisTcpSocketOptions>;
readonly #scanInterval: number; readonly #scanInterval: number;
readonly #passthroughClientErrorEvents: boolean; readonly #passthroughClientErrorEvents: boolean;
@@ -650,9 +655,22 @@ class RedisSentinelInternal<
#trace: (msg: string) => unknown = () => { }; #trace: (msg: string) => unknown = () => { };
#clientSideCache?: PooledClientSideCacheProvider;
get clientSideCache() {
return this.#clientSideCache;
}
#validateOptions(options?: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
if (options?.clientSideCache && options?.RESP !== 3) {
throw new Error('Client Side Caching is only supported with RESP3');
}
}
constructor(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) { constructor(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
super(); super();
this.#validateOptions(options);
this.#name = options.name; this.#name = options.name;
this.#sentinelRootNodes = Array.from(options.sentinelRootNodes); this.#sentinelRootNodes = Array.from(options.sentinelRootNodes);
@@ -662,11 +680,21 @@ class RedisSentinelInternal<
this.#scanInterval = options.scanInterval ?? 0; this.#scanInterval = options.scanInterval ?? 0;
this.#passthroughClientErrorEvents = options.passthroughClientErrorEvents ?? false; this.#passthroughClientErrorEvents = options.passthroughClientErrorEvents ?? false;
this.#nodeClientOptions = options.nodeClientOptions ? Object.assign({} as RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>, options.nodeClientOptions) : {}; this.#nodeClientOptions = (options.nodeClientOptions ? {...options.nodeClientOptions} : {}) as RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
if (this.#nodeClientOptions.url !== undefined) { if (this.#nodeClientOptions.url !== undefined) {
throw new Error("invalid nodeClientOptions for Sentinel"); throw new Error("invalid nodeClientOptions for Sentinel");
} }
if (options.clientSideCache) {
if (options.clientSideCache instanceof PooledClientSideCacheProvider) {
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = options.clientSideCache;
} else {
const cscConfig = options.clientSideCache;
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig);
// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
}
}
this.#sentinelClientOptions = options.sentinelClientOptions ? Object.assign({} as RedisClientOptions<typeof RedisSentinelModule, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>, options.sentinelClientOptions) : {}; this.#sentinelClientOptions = options.sentinelClientOptions ? Object.assign({} as RedisClientOptions<typeof RedisSentinelModule, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>, options.sentinelClientOptions) : {};
this.#sentinelClientOptions.modules = RedisSentinelModule; this.#sentinelClientOptions.modules = RedisSentinelModule;
@@ -904,6 +932,8 @@ class RedisSentinelInternal<
this.#isReady = false; this.#isReady = false;
this.#clientSideCache?.onPoolClose();
if (this.#scanTimer) { if (this.#scanTimer) {
clearInterval(this.#scanTimer); clearInterval(this.#scanTimer);
this.#scanTimer = undefined; this.#scanTimer = undefined;
@@ -952,6 +982,8 @@ class RedisSentinelInternal<
this.#isReady = false; this.#isReady = false;
this.#clientSideCache?.onPoolClose();
if (this.#scanTimer) { if (this.#scanTimer) {
clearInterval(this.#scanTimer); clearInterval(this.#scanTimer);
this.#scanTimer = undefined; this.#scanTimer = undefined;

View File

@@ -188,18 +188,22 @@ export class SentinelFramework extends DockerBase {
} }
const options: RedisSentinelOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> = { const options: RedisSentinelOptions<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> = {
...opts,
name: this.config.sentinelName, name: this.config.sentinelName,
sentinelRootNodes: this.#sentinelList.map((sentinel) => { return { host: '127.0.0.1', port: sentinel.docker.port } }), sentinelRootNodes: this.#sentinelList.map((sentinel) => { return { host: '127.0.0.1', port: sentinel.docker.port } }),
passthroughClientErrorEvents: errors passthroughClientErrorEvents: errors
} }
if (this.config.password !== undefined) { if (this.config.password !== undefined) {
options.nodeClientOptions = {password: this.config.password}; if (!options.nodeClientOptions) {
options.sentinelClientOptions = {password: this.config.password}; options.nodeClientOptions = {};
} }
options.nodeClientOptions.password = this.config.password;
if (opts) { if (!options.sentinelClientOptions) {
Object.assign(options, opts); options.sentinelClientOptions = {};
}
options.sentinelClientOptions = {password: this.config.password};
} }
return RedisSentinel.create(options); return RedisSentinel.create(options);

View File

@@ -4,6 +4,7 @@ import { CommandSignature, CommanderConfig, RedisFunctions, RedisModules, RedisS
import COMMANDS from '../commands'; import COMMANDS from '../commands';
import RedisSentinel, { RedisSentinelClient } from '.'; import RedisSentinel, { RedisSentinelClient } from '.';
import { RedisTcpSocketOptions } from '../client/socket'; import { RedisTcpSocketOptions } from '../client/socket';
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
export interface RedisNode { export interface RedisNode {
host: string; host: string;
@@ -67,6 +68,41 @@ export interface RedisSentinelOptions<
* When `false`, the sentinel object will wait for the first available client from the pool. * When `false`, the sentinel object will wait for the first available client from the pool.
*/ */
reserveClient?: boolean; reserveClient?: boolean;
/**
* Client Side Caching configuration for the pool.
*
* Enables Redis Servers and Clients to work together to cache results from commands
* sent to a server. The server will notify the client when cached results are no longer valid.
* In pooled mode, the cache is shared across all clients in the pool.
*
* Note: Client Side Caching is only supported with RESP3.
*
* @example Anonymous cache configuration
* ```
* const client = createSentinel({
* clientSideCache: {
* ttl: 0,
* maxEntries: 0,
* evictPolicy: "LRU"
* },
* minimum: 5
* });
* ```
*
* @example Using a controllable cache
* ```
* const cache = new BasicPooledClientSideCache({
* ttl: 0,
* maxEntries: 0,
* evictPolicy: "LRU"
* });
* const client = createSentinel({
* clientSideCache: cache,
* minimum: 5
* });
* ```
*/
clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig;
} }
export interface SentinelCommander< export interface SentinelCommander<

View File

@@ -1,5 +1,5 @@
import { BasicCommandParser } from '../client/parser';
import { ArrayReply, Command, RedisFunction, RedisScript, RespVersions, UnwrapReply } from '../RESP/types'; import { ArrayReply, Command, RedisFunction, RedisScript, RespVersions, UnwrapReply } from '../RESP/types';
import { BasicCommandParser } from '../client/parser';
import { RedisSocketOptions, RedisTcpSocketOptions } from '../client/socket'; import { RedisSocketOptions, RedisTcpSocketOptions } from '../client/socket';
import { functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { NamespaceProxySentinel, NamespaceProxySentinelClient, ProxySentinel, ProxySentinelClient, RedisNode } from './types'; import { NamespaceProxySentinel, NamespaceProxySentinelClient, ProxySentinel, ProxySentinelClient, RedisNode } from './types';

View File

@@ -234,6 +234,23 @@ of sending a `QUIT` command to the server, the client can simply close the netwo
```typescript ```typescript
client.destroy(); client.destroy();
``` ```
### Client Side Caching
Node Redis v5 adds support for [Client Side Caching](https://redis.io/docs/manual/client-side-caching/), which enables clients to cache query results locally. The Redis server will notify the client when cached results are no longer valid.
```typescript
// Enable client side caching with RESP3
const client = createClient({
RESP: 3,
clientSideCache: {
ttl: 0, // Time-to-live (0 = no expiration)
maxEntries: 0, // Maximum entries (0 = unlimited)
evictPolicy: "LRU" // Eviction policy: "LRU" or "FIFO"
}
});
```
See the [V5 documentation](../../docs/v5.md#client-side-caching) for more details and advanced usage.
### Auto-Pipelining ### Auto-Pipelining

View File

@@ -450,7 +450,7 @@ export default class TestUtils {
await fn(pool); await fn(pool);
} finally { } finally {
await pool.flushAll(); await pool.flushAll();
pool.destroy(); pool.close();
} }
}); });
} }