diff --git a/packages/client/lib/client/README-cache.md b/packages/client/lib/client/README-cache.md new file mode 100644 index 0000000000..01a43cf459 --- /dev/null +++ b/packages/client/lib/client/README-cache.md @@ -0,0 +1,64 @@ +# Client Side Caching Support + +Client Side Caching enables Redis Servers and Clients to work together to enable a client to cache results from command sent to a server and be informed by the server when the cached result is no longer valid. + +## Usage + +node-redis supports two ways of instantiating client side caching support + +Note: Client Side Caching is only supported with RESP3. + +### Anonymous Cache + +```javascript +const client = createClient({RESP: 3, clientSideCache: {ttl: 0, maxEntries: 0, lru: false}}) +``` + +In this instance, the cache is opaque to the user, and they have no control over it. + +### Controllable Cache + +```javascript +const ttl = 0, maxEntries = 0, lru = false; +const cache = new BasicClientSideCache(ttl, maxEntries, lru); +const client = createClient({RESP: 3, clientSideCache: cache}); +``` + +In this instance, the user has full control over the cache, as they have access to the cache object. + +They can manually invalidate keys + +```javascript +cache.invalidate(key); +``` + +they can clear the entire cache +g +```javascript +cache.clear(); +``` + +as well as get cache metrics + +```typescript +const hits: number = cache.cacheHits(); +const misses: number = cache.cacheMisses(); +``` + +## Pooled Caching + +Similar to individual clients, node-redis also supports caching for its pooled client object, with the cache being able to be instantiated in an anonymous manner or a controllable manner. + +### Anonymous Cache + +```javascript +const client = createClientPool({RESP: 3}, {clientSideCache: {ttl: 0, maxEntries: 0, lru: false}, minimum: 8}); +``` + +### Controllable Cache + +```javascript +const ttl = 0, maxEntries = 0, lru = false; +const cache = new BasicPooledClientSideCache(ttl, maxEntries, lru); +const client = createClientPool({RESP: 3}, {clientSideCache: cache, minimum: 8}); +``` \ No newline at end of file diff --git a/packages/client/lib/client/cache.spec.ts b/packages/client/lib/client/cache.spec.ts new file mode 100644 index 0000000000..ee08d3a69f --- /dev/null +++ b/packages/client/lib/client/cache.spec.ts @@ -0,0 +1,460 @@ +import assert from "assert"; +import testUtils, { GLOBAL } from "../test-utils" +import { BasicClientSideCache, BasicPooledClientSideCache } from "./cache" +import { REDIS_FLUSH_MODES } from "../commands/FLUSHALL"; +import { once } from 'events'; + +describe("Client Side Cache", () => { + describe('Basic Cache', () => { + const csc = new BasicClientSideCache({ maxEntries: 10 }); + + /* cacheNotEmpty */ + testUtils.testWithClient('Basic Cache Miss', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* cacheUsedTest */ + testUtils.testWithClient('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 1, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Max Cache Entries', async client => { + csc.clear(); + + await client.set('1', 1); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('2'), null); + assert.equal(await client.get('3'), null); + assert.equal(await client.get('4'), null); + assert.equal(await client.get('5'), null); + assert.equal(await client.get('6'), null); + assert.equal(await client.get('7'), null); + assert.equal(await client.get('8'), null); + assert.equal(await client.get('9'), null); + assert.equal(await client.get('10'), null); + assert.equal(await client.get('11'), null); + assert.equal(await client.get('1'), '1'); + + assert.equal(csc.cacheMisses(), 12, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('LRU works correctly', async client => { + csc.clear(); + + await client.set('1', 1); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('2'), null); + assert.equal(await client.get('3'), null); + assert.equal(await client.get('4'), null); + assert.equal(await client.get('5'), null); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('6'), null); + assert.equal(await client.get('7'), null); + assert.equal(await client.get('8'), null); + assert.equal(await client.get('9'), null); + assert.equal(await client.get('10'), null); + assert.equal(await client.get('11'), null); + assert.equal(await client.get('1'), '1'); + + assert.equal(csc.cacheMisses(), 11, "Cache Misses"); + assert.equal(csc.cacheHits(), 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Basic Cache Clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + csc.clear(); + await client.get("x"); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Null Invalidate acts as clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + csc.invalidate(null); + await client.get("x"); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('flushdb causes an invalidate null', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + await client.flushDb(REDIS_FLUSH_MODES.SYNC); + assert.equal(await client.get("x"), null); + + assert.equal(csc.cacheMisses(), 2, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Basic Cache Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1', 'first get'); + await client.set("x", 2); + assert.equal(await client.get("x"), '2', 'second get'); + await client.set("x", 3); + assert.equal(await client.get("x"), '3', 'third get'); + + assert.equal(csc.cacheMisses(), 3, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* immutableCacheEntriesTest */ + testUtils.testWithClient("Cached Replies Don't Mutate", async client => { + csc.clear(); + + await client.set("x", 1); + await client.set('y', 2); + const ret1 = await client.mGet(['x', 'y']); + assert.deepEqual(ret1, ['1', '2'], 'first mGet'); + ret1[0] = '4'; + const ret2 = await client.mGet(['x', 'y']); + assert.deepEqual(ret2, ['1', '2'], 'second mGet'); + ret2[0] = '8'; + const ret3 = await client.mGet(['x', 'y']); + assert.deepEqual(ret3, ['1', '2'], 'third mGet'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* clearOnDisconnectTest */ + testUtils.testWithClient("Cached cleared on disconnect", async client => { + csc.clear(); + + await client.set("x", 1); + await client.set('y', 2); + const ret1 = await client.mGet(['x', 'y']); + assert.deepEqual(ret1, ['1', '2'], 'first mGet'); + + assert.equal(csc.cacheMisses(), 1, "first Cache Misses"); + assert.equal(csc.cacheHits(), 0, "first Cache Hits"); + + await client.close(); + + await client.connect(); + + const ret2 = await client.mGet(['x', 'y']); + assert.deepEqual(ret2, ['1', '2'], 'second mGet'); + + assert.equal(csc.cacheMisses(), 1, "second Cache Misses"); + assert.equal(csc.cacheHits(), 0, "second Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + }); + + describe.only("Pooled Cache", () => { + const csc = new BasicPooledClientSideCache(); + + testUtils.testWithClient('Virtual Pool Disconnect', async client1 => { + const client2 = client1.duplicate(); + await client2.connect() + + assert.equal(await client2.get("x"), null); + assert.equal(await client1.get("x"), null); + + assert.equal(1, csc.cacheMisses(), "Cache Misses"); + assert.equal(1, csc.cacheHits(), "Cache Hits"); + + await client2.close(); + + assert.equal(await client1.get("x"), null); + assert.equal(await client1.get("x"), null); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(2, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* cacheNotEmpty */ + testUtils.testWithClientPool('Basic Cache Miss and Clear', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + + assert.equal(1, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + /* cacheUsedTest */ + testUtils.testWithClientPool('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + /* invalidationTest 1 */ + testUtils.testWithClientPool('Basic Cache Manually Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + + assert.equal(await client.get("x"), '1', 'first get'); + + let p: Promise> = once(csc, 'invalidate'); + await client.set("x", 2); + let [i] = await p; + + assert.equal(await client.get("x"), '2', 'second get'); + + p = once(csc, 'invalidate'); + await client.set("x", 3); + [i] = await p; + + assert.equal(await client.get("x"), '3'); + + assert.equal(csc.cacheMisses(), 3, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + /* invalidationTest 2 */ + testUtils.testWithClientPool('Basic Cache Invalidate via message', async client => { + csc.clear(); + + await client.set('x', 1); + await client.set('y', 2); + + assert.deepEqual(await client.mGet(['x', 'y']), ['1', '2'], 'first mGet'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + + let p: Promise> = once(csc, 'invalidate'); + await client.set("x", 3); + let [i] = await p; + + assert.equal(i, 'x'); + + assert.deepEqual(await client.mGet(['x', 'y']), ['3', '2'], 'second mGet'); + + assert.equal(csc.cacheMisses(), 2, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + + p = once(csc, 'invalidate'); + await client.set("y", 4); + [i] = await p; + + assert.equal(i, 'y'); + + assert.deepEqual(await client.mGet(['x', 'y']), ['3', '4'], 'second mGet'); + + assert.equal(csc.cacheMisses(), 3, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + }); + + describe('Cluster Caching', () => { + const csc = new BasicPooledClientSideCache(); + + testUtils.testWithCluster('Basic Cache Miss and Clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + await client.set("y", 1); + await client.get("y"); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + + testUtils.testWithCluster('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + await client.set("y", 1); + assert.equal(await client.get("y"), '1'); + assert.equal(await client.get("y"), '1'); + assert.equal(await client.get("y"), '1'); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(4, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + + testUtils.testWithCluster('Basic Cache Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + await client.set("x", 2); + assert.equal(await client.get("x"), '2'); + await client.set("x", 3); + assert.equal(await client.get("x"), '3'); + + await client.set("y", 1); + assert.equal(await client.get("y"), '1'); + await client.set("y", 2); + assert.equal(await client.get("y"), '2'); + await client.set("y", 3); + assert.equal(await client.get("y"), '3'); + + assert.equal(6, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + }); +}); \ No newline at end of file diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts new file mode 100644 index 0000000000..3a8fd951b4 --- /dev/null +++ b/packages/client/lib/client/cache.ts @@ -0,0 +1,533 @@ +import { EventEmitter } from 'stream'; +import RedisClient, { RedisClientType } from '.'; +import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types'; +import { BasicCommandParser } from './parser'; + +type CachingClient = RedisClient; +type CachingClientType = RedisClientType; +type CmdFunc = () => Promise; + +export interface ClientSideCacheConfig { + ttl?: number; + maxEntries?: number; + lru?: boolean; +} + +type CacheCreator = { + epoch: number; + client: CachingClient; +}; + +interface ClientSideCacheEntry { + invalidate(): void; + validate(): boolean; +} + +abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry { + #invalidated = false; + readonly #expireTime: number; + + constructor(ttl: number) { + if (ttl == 0) { + this.#expireTime = 0; + } else { + this.#expireTime = Date.now() + ttl; + } + } + + invalidate(): void { + this.#invalidated = true; + } + + validate(): boolean { + return !this.#invalidated && (this.#expireTime == 0 || (Date.now() < this.#expireTime)) + } +} + +export class ClientSideCacheEntryValue extends ClientSideCacheEntryBase { + readonly #value: any; + + get value() { + return this.#value; + } + + constructor(ttl: number, value: any) { + super(ttl); + this.#value = value; + } +} + +export class ClientSideCacheEntryPromise extends ClientSideCacheEntryBase { + readonly #sendCommandPromise: Promise; + + get promise() { + return this.#sendCommandPromise; + } + + constructor(ttl: number, sendCommandPromise: Promise) { + super(ttl); + this.#sendCommandPromise = sendCommandPromise; + } +} + +export abstract class ClientSideCacheProvider extends EventEmitter { + abstract handleCache(client: CachingClient, parser: BasicCommandParser, fn: CmdFunc, transformReply: TransformReply | undefined, typeMapping: TypeMapping | undefined): Promise; + abstract trackingOn(): Array; + abstract invalidate(key: RedisArgument | null): void; + abstract clear(): void; + abstract cacheHits(): number; + abstract cacheMisses(): number; + abstract onError(): void; + abstract onClose(): void; +} + +export class BasicClientSideCache extends ClientSideCacheProvider { + #cacheKeyToEntryMap: Map; + #keyToCacheKeySetMap: Map>; + readonly ttl: number; + readonly #maxEntries: number; + readonly #lru: boolean; + #cacheHits = 0; + #cacheMisses = 0; + + constructor(config?: ClientSideCacheConfig) { + super(); + + this.#cacheKeyToEntryMap = new Map(); + this.#keyToCacheKeySetMap = new Map>(); + this.ttl = config?.ttl ?? 0; + this.#maxEntries = config?.maxEntries ?? 0; + this.#lru = config?.lru ?? true; + } + + /* logic of how caching works: + + 1. commands use a CommandParser + it enables us to define/retrieve + cacheKey - a unique key that corresponds to this command and its arguments + redisKeys - an array of redis keys as strings that if the key is modified, will cause redis to invalidate this result when cached + 2. check if cacheKey is in our cache + 2b1. if its a value cacheEntry - return it + 2b2. if it's a promise cache entry - wait on promise and then go to 3c. + 3. if cacheEntry is not in cache + 3a. send the command save the promise into a a cacheEntry and then wait on result + 3b. transform reply (if required) based on transformReply + 3b. check the cacheEntry is still valid - in cache and hasn't been deleted) + 3c. if valid - overwrite with value entry + 4. return previously non cached result + */ + override async handleCache( + client: CachingClient, + parser: BasicCommandParser, + fn: CmdFunc, + transformReply: TransformReply | undefined, + typeMapping: TypeMapping | undefined + ) { + let reply: ReplyUnion; + + const cacheKey = parser.cacheKey; + + // "2" + let cacheEntry = this.get(cacheKey); + if (cacheEntry) { + // If instanceof is "too slow", can add a "type" and then use an "as" cast to call proper getters. + if (cacheEntry instanceof ClientSideCacheEntryValue) { // "2b1" + this.#cacheHit(); + + return structuredClone(cacheEntry.value); + } else if (cacheEntry instanceof ClientSideCacheEntryPromise) { // 2b2 + // unsure if this should be considered a cache hit, a miss, or neither? + reply = await cacheEntry.promise; + } else { + throw new Error("unknown cache entry type"); + } + } else { // 3/3a + this.#cacheMiss(); + + const promise = fn(); + + cacheEntry = this.createPromiseEntry(client, promise); + this.set(cacheKey, cacheEntry, parser.keys); + + try { + reply = await promise; + } catch (err) { + if (cacheEntry.validate()) { // on error, have to remove promise from cache + this.delete(cacheKey!); + } + throw err; + } + } + + // 3b + let val; + if (transformReply) { + val = transformReply(reply, parser.preserve, typeMapping); + } else { + val = reply; + } + + // 3c + if (cacheEntry.validate()) { // revalidating promise entry (dont save value, if promise entry has been invalidated) + // 3d + cacheEntry = this.createValueEntry(client, val); + this.set(cacheKey, cacheEntry, parser.keys); + this.emit("cached-key", cacheKey); + } else { +// console.log("cache entry for key got invalidated between execution and saving, so not saving"); + } + + return structuredClone(val); + } + + override trackingOn() { + return ['CLIENT', 'TRACKING', 'ON']; + } + + override invalidate(key: RedisArgument | null) { + if (key === null) { + this.clear(false); + this.emit("invalidate", key); + + return; + } + + const keySet = this.#keyToCacheKeySetMap.get(key.toString()); + if (keySet) { + for (const cacheKey of keySet) { + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + } + this.#cacheKeyToEntryMap.delete(cacheKey); + } + this.#keyToCacheKeySetMap.delete(key.toString()); + } + + this.emit('invalidate', key); + } + + override clear(reset = true) { + this.#cacheKeyToEntryMap.clear(); + this.#keyToCacheKeySetMap.clear(); + if (reset) { + this.#cacheHits = 0; + this.#cacheMisses = 0; + } + } + + get(cacheKey?: string | undefined) { + if (cacheKey === undefined) { + return undefined + } + + const val = this.#cacheKeyToEntryMap.get(cacheKey); + + if (val && !val.validate()) { + this.delete(cacheKey); + this.emit("invalidate", cacheKey); + + return undefined; + } + + if (val !== undefined && this.#lru) { + this.#cacheKeyToEntryMap.delete(cacheKey); + this.#cacheKeyToEntryMap.set(cacheKey, val); + } + + return val; + } + + delete(cacheKey: string) { + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + this.#cacheKeyToEntryMap.delete(cacheKey); + } + } + + has(cacheKey: string) { + return this.#cacheKeyToEntryMap.has(cacheKey); + } + + set(cacheKey: string, cacheEntry: ClientSideCacheEntry, keys: Array) { + let count = this.#cacheKeyToEntryMap.size; + const oldEntry = this.#cacheKeyToEntryMap.get(cacheKey); + if (oldEntry) { + count--; // overwriting, so not incrementig + oldEntry.invalidate(); + } + + if (this.#maxEntries > 0 && count >= this.#maxEntries) { + this.deleteOldest(); + } + + this.#cacheKeyToEntryMap.set(cacheKey, cacheEntry); + + for (const key of keys) { + if (!this.#keyToCacheKeySetMap.has(key.toString())) { + this.#keyToCacheKeySetMap.set(key.toString(), new Set()); + } + + const cacheKeySet = this.#keyToCacheKeySetMap.get(key.toString()); + cacheKeySet!.add(cacheKey); + } + } + + size() { + return this.#cacheKeyToEntryMap.size; + } + + createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue { + return new ClientSideCacheEntryValue(this.ttl, value); + } + + createPromiseEntry(client: CachingClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + return new ClientSideCacheEntryPromise(this.ttl, sendCommandPromise); + } + + #cacheHit(): void { + this.#cacheHits++; + } + + #cacheMiss(): void { + this.#cacheMisses++; + } + + override cacheHits(): number { + return this.#cacheHits; + } + + override cacheMisses(): number { + return this.#cacheMisses; + } + + override onError(): void { + this.clear(); + } + + override onClose() { + this.clear(); + } + + /** + * @internal + */ + deleteOldest() { + const it = this.#cacheKeyToEntryMap[Symbol.iterator](); + const n = it.next(); + if (!n.done) { + this.#cacheKeyToEntryMap.delete(n.value[0]); + } + } + + /** + * @internal + */ + entryEntries() { + return this.#cacheKeyToEntryMap.entries(); + } + + /** + * @internal + */ + keySetEntries() { + return this.#keyToCacheKeySetMap.entries(); + } +} + +export abstract class PooledClientSideCacheProvider extends BasicClientSideCache { + #disabled = false; + + abstract updateRedirect(id: number): void; + abstract addClient(client: CachingClientType): void; + abstract removeClient(client: CachingClientType): void; + + disable() { + this.#disabled = true; + } + + enable() { + this.#disabled = false; + } + + override get(cacheKey: string) { + if (this.#disabled) { + return undefined; + } + + return super.get(cacheKey); + } + + override has(cacheKey: string) { + if (this.#disabled) { + return false; + } + + return super.has(cacheKey); + } + + onPoolConnect(factory: () => CachingClientType) {}; + + onPoolClose() { + this.clear(); + }; +} + +// doesn't do anything special in pooling, clears cache on every client disconnect +export class BasicPooledClientSideCache extends PooledClientSideCacheProvider { + + override updateRedirect(id: number): void { + return; + } + + override addClient(client: CachingClientType): void { + return; + } + override removeClient(client: CachingClientType): void { + return; + } + + override onError() { + this.clear(false); + } + + override onClose() { + this.clear(false); + } +} + +class PooledClientSideCacheEntryValue extends ClientSideCacheEntryValue { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, value: any) { + super(ttl, value); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + if (this.#creator) { + ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } + + return ret; + } +} + +class PooledClientSideCacheEntryPromise extends ClientSideCacheEntryPromise { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, sendCommandPromise: Promise) { + super(ttl, sendCommandPromise); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + if (this.#creator) { + ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } + + return ret; + } +} + +// Doesn't clear cache on client disconnect, validates entries on retrieval +export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache { + override createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryValue(this.ttl, creator, value); + } + + override createPromiseEntry(client: CachingClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryPromise(this.ttl, creator, sendCommandPromise); + } + + // don't clear cache on error here + override onError() {} + + override onClose() {} +} + +// Only clears cache on "management"/"redirect" client disconnect +export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider { + #id?: number; + #clients: Set = new Set(); + #redirectClient?: CachingClientType; + + constructor(config: ClientSideCacheConfig) { + super(config); + this.disable(); + } + + override trackingOn(): string[] { + if (this.#id) { + return ['CLIENT', 'TRACKING', 'ON', 'REDIRECT', this.#id.toString()]; + } else { + return []; + } + } + + override updateRedirect(id: number) { + this.#id = id; + for (const client of this.#clients) { + client.sendCommand(this.trackingOn()).catch(() => {}); + } + } + + override addClient(client: CachingClientType) { + this.#clients.add(client); + } + + override removeClient(client: CachingClientType) { + this.#clients.delete(client); + } + + override onError(): void {}; + + override async onPoolConnect(factory: () => CachingClientType) { + const client = factory(); + this.#redirectClient = client; + + client.on("error", () => { + this.disable(); + this.clear(); + }).on("ready", async () => { + const clientId = await client.withTypeMapping({}).clientId(); + this.updateRedirect(clientId); + this.enable(); + }) + + try { + await client.connect(); + } catch (err) { + throw err; + } + } + + override onClose() {}; + + override onPoolClose() { + super.onPoolClose(); + + if (this.#redirectClient) { + this.#id = undefined; + const client = this.#redirectClient; + this.#redirectClient = undefined; + + return client.close(); + } + } +}