From 3da06de737a1d7040ce296a13c0da46ae73834e6 Mon Sep 17 00:00:00 2001 From: Fernando Silva Date: Sat, 22 Jan 2022 01:20:19 -0300 Subject: [PATCH] feat(pubsub): Add shared context among Redis instances (#1110) * feat(pubsub): add shared context among redis instances, fix #773 * fix: command transformers Co-authored-by: Sibelius Seraphini --- jest.config.js | 1 + src/commands/flushall.js | 18 ++++- src/commands/flushdb.js | 2 +- src/context.js | 19 ++++++ src/data.js | 144 ++++++++++++++++++++++----------------- src/expires.js | 44 +++++++----- src/index.js | 54 +++++++++++++-- test/command.js | 12 ---- test/data.js | 91 ++++++++++++++++++++++--- test/expires.js | 83 ++++++++++++++++++---- test/keyprefix.js | 7 +- testSetupAfterEnv.js | 5 ++ testSetupJest.js | 11 ++- 13 files changed, 363 insertions(+), 128 deletions(-) create mode 100644 src/context.js create mode 100644 testSetupAfterEnv.js diff --git a/jest.config.js b/jest.config.js index 005e929cc..e0a862a99 100644 --- a/jest.config.js +++ b/jest.config.js @@ -3,4 +3,5 @@ module.exports = { testMatch: ['**/test/**/*.js'], coverageDirectory: 'coverage', setupFiles: ['/testSetupBabel.js'], + setupFilesAfterEnv: ['/testSetupAfterEnv.js'], }; diff --git a/src/commands/flushall.js b/src/commands/flushall.js index 82048d094..bdf783356 100644 --- a/src/commands/flushall.js +++ b/src/commands/flushall.js @@ -1,4 +1,20 @@ +import contextMap, { createContext } from '../context'; +import { createData } from '../data'; +import { createExpires } from '../expires'; + export function flushall() { - this.data.clear(); + const oldContext = contextMap.get(this.keyData); + const newContext = createContext(oldContext.keyPrefix); + + contextMap.set(this.keyData, newContext); + + this.expires = createExpires(newContext.expires, newContext.keyPrefix); + this.data = createData( + newContext.data, + this.expires, + {}, + newContext.keyPrefix + ); + return 'OK'; } diff --git a/src/commands/flushdb.js b/src/commands/flushdb.js index c8283fa53..deeaabdfb 100644 --- a/src/commands/flushdb.js +++ b/src/commands/flushdb.js @@ -1,4 +1,4 @@ export function flushdb() { - this.data.clear(); + this.flushall(); return 'OK'; } diff --git a/src/context.js b/src/context.js new file mode 100644 index 000000000..56d6c4c2a --- /dev/null +++ b/src/context.js @@ -0,0 +1,19 @@ +import { EventEmitter } from 'events'; +import { createSharedData } from './data'; +import { createSharedExpires } from './expires'; + +const contextMap = new Map(); + +export default contextMap; + +export function createContext(keyPrefix) { + const expires = createSharedExpires(); + + return { + channels: new EventEmitter(), + expires, + data: createSharedData(expires), + patternChannels: new EventEmitter(), + keyPrefix, + }; +} diff --git a/src/data.js b/src/data.js index 917c05b19..dcbeea3cd 100644 --- a/src/data.js +++ b/src/data.js @@ -1,79 +1,95 @@ import { assign } from 'lodash'; -export default function createData( - expiresInstance, - initial = {}, - keyPrefix = '' -) { +export function createSharedData(sharedExpires) { let raw = {}; - function createInstance(prefix, expires) { - return Object.freeze({ - clear() { - raw = {}; - }, - delete(key) { - if (expires.has(key)) { - expires.delete(key); - } - delete raw[`${prefix}${key}`]; - }, - get(key) { - if (expires.has(key) && expires.isExpired(key)) { - this.delete(key); - } + return Object.freeze({ + clear() { + raw = {}; + }, + delete(key) { + if (sharedExpires.has(key)) { + sharedExpires.delete(key); + } + delete raw[key]; + }, + get(key) { + if (sharedExpires.has(key) && sharedExpires.isExpired(key)) { + this.delete(key); + } - const value = raw[`${prefix}${key}`]; + const value = raw[key]; - if (Array.isArray(value)) { - return value.slice(); - } + if (Array.isArray(value)) { + return value.slice(); + } - if (Buffer.isBuffer(value)) { - return Buffer.from(value); - } + if (Buffer.isBuffer(value)) { + return Buffer.from(value); + } - if (value instanceof Set) { - return new Set(value); - } + if (value instanceof Set) { + return new Set(value); + } - if (value instanceof Map) { - return new Map(value); - } + if (value instanceof Map) { + return new Map(value); + } - if (typeof value === 'object' && value) { - return assign({}, value); - } + if (typeof value === 'object' && value) { + return assign({}, value); + } - return value; - }, - has(key) { - if (expires.has(key) && expires.isExpired(key)) { - this.delete(key); - } + return value; + }, + has(key) { + if (sharedExpires.has(key) && sharedExpires.isExpired(key)) { + this.delete(key); + } - return {}.hasOwnProperty.call(raw, `${prefix}${key}`); - }, - keys() { - return Object.keys(raw); - }, - set(key, val) { - let item = val; - - if (Array.isArray(val)) { - item = val.slice(); - } else if (Buffer.isBuffer(val)) { - item = Buffer.from(val); - } else if (val instanceof Set) { - item = new Set(val); - } else if (val instanceof Map) { - item = new Map(val); - } else if (typeof val === 'object' && val) { - item = assign({}, val); - } - - raw[`${prefix}${key}`] = item; - }, + return {}.hasOwnProperty.call(raw, key); + }, + keys(prefix) { + const keys = Object.keys(raw); + + if (!prefix) return keys; + + return keys.filter((key) => key.startsWith(prefix)); + }, + set(key, val) { + let item = val; + + if (Array.isArray(val)) { + item = val.slice(); + } else if (Buffer.isBuffer(val)) { + item = Buffer.from(val); + } else if (val instanceof Set) { + item = new Set(val); + } else if (val instanceof Map) { + item = new Map(val); + } else if (typeof val === 'object' && val) { + item = assign({}, val); + } + + raw[key] = item; + }, + }); +} + +export function createData( + sharedData, + expiresInstance, + initial = {}, + keyPrefix = '' +) { + function createInstance(prefix, expires) { + return Object.freeze({ + clear: () => sharedData.clear(), + delete: (key) => sharedData.delete(`${prefix}${key}`), + get: (key) => sharedData.get(`${prefix}${key}`), + has: (key) => sharedData.has(`${prefix}${key}`), + keys: () => sharedData.keys(prefix), + set: (key, val) => sharedData.set(`${prefix}${key}`, val), withKeyPrefix(newKeyPrefix) { if (newKeyPrefix === prefix) return this; return createInstance( diff --git a/src/expires.js b/src/expires.js index e05b6467a..27241db4b 100644 --- a/src/expires.js +++ b/src/expires.js @@ -1,27 +1,39 @@ -export default function createExpires(keyPrefix = '') { +export function createSharedExpires() { const expires = {}; + + return Object.freeze({ + get(key) { + return expires[key]; + }, + set(key, timestamp) { + expires[key] = +timestamp; + }, + has(key) { + return {}.hasOwnProperty.call(expires, key); + }, + isExpired(key) { + return expires[key] <= Date.now(); + }, + delete(key) { + delete expires[key]; + }, + }); +} + +export function createExpires(sharedExpires, keyPrefix = '') { function createInstance(prefix) { return { - get(key) { - return expires[`${prefix}${key}`]; - }, - set(key, timestamp) { - expires[`${prefix}${key}`] = +timestamp; - }, - has(key) { - return {}.hasOwnProperty.call(expires, `${prefix}${key}`); - }, - isExpired(key) { - return expires[`${prefix}${key}`] <= Date.now(); - }, - delete(key) { - delete expires[`${prefix}${key}`]; - }, + get: (key) => sharedExpires.get(`${prefix}${key}`), + set: (key, timestamp) => sharedExpires.set(`${prefix}${key}`, timestamp), + has: (key) => sharedExpires.has(`${prefix}${key}`), + isExpired: (key) => sharedExpires.isExpired(`${prefix}${key}`), + delete: (key) => sharedExpires.delete(`${prefix}${key}`), withKeyPrefix(newPrefix) { if (newPrefix === prefix) return this; return createInstance(newPrefix); }, }; } + return createInstance(keyPrefix); } diff --git a/src/index.js b/src/index.js index bf12b7ef4..5434e8871 100644 --- a/src/index.js +++ b/src/index.js @@ -4,18 +4,21 @@ import redisCommands from 'redis-commands'; import * as commands from './commands'; import * as commandsStream from './commands-stream'; import createCommand, { Command } from './command'; -import createData from './data'; -import createExpires from './expires'; import emitConnectEvent from './commands-utils/emitConnectEvent'; import Pipeline from './pipeline'; import promiseContainer from './promise-container'; import parseKeyspaceEvents from './keyspace-notifications'; +import contextMap, { createContext } from './context'; +import { createExpires } from './expires'; +import { createData } from './data'; const defaultOptions = { data: {}, keyPrefix: '', lazyConnect: false, notifyKeyspaceEvents: '', // string pattern as specified in https://redis.io/topics/notifications#configuration e.g. 'gxK' + host: 'localhost', + port: '6379', }; class RedisMock extends EventEmitter { @@ -29,8 +32,7 @@ class RedisMock extends EventEmitter { constructor(options = {}) { super(); - this.channels = new EventEmitter(); - this.patternChannels = new EventEmitter(); + this.batch = undefined; this.connected = false; this.subscriberMode = false; @@ -41,9 +43,19 @@ class RedisMock extends EventEmitter { // eslint-disable-next-line prefer-object-spread const optionsWithDefault = Object.assign({}, defaultOptions, options); - this.expires = createExpires(optionsWithDefault.keyPrefix); + this.keyData = `${optionsWithDefault.host}:${optionsWithDefault.port}`; + + if (!contextMap.get(this.keyData)) { + const context = createContext(optionsWithDefault.keyPrefix); + + contextMap.set(this.keyData, context); + } + + const context = contextMap.get(this.keyData); + this.expires = createExpires(context.expires, optionsWithDefault.keyPrefix); this.data = createData( + context.data, this.expires, optionsWithDefault.data, optionsWithDefault.keyPrefix @@ -61,6 +73,36 @@ class RedisMock extends EventEmitter { } } + get channels() { + return contextMap.get(this.keyData).channels; + } + + set channels(channels) { + const oldContext = contextMap.get(this.keyData); + + const newContext = { + ...oldContext, + channels, + }; + + contextMap.set(this.keyData, newContext); + } + + get patternChannels() { + return contextMap.get(this.keyData).patternChannels; + } + + set patternChannels(patternChannels) { + const oldContext = contextMap.get(this.keyData); + + const newContext = { + ...oldContext, + patternChannels, + }; + + contextMap.set(this.keyData, newContext); + } + multi(batch = []) { this.batch = new Pipeline(this); // eslint-disable-next-line no-underscore-dangle @@ -106,7 +148,7 @@ class RedisMock extends EventEmitter { } duplicate() { - return this.createConnectedClient() + return this.createConnectedClient(); } // eslint-disable-next-line class-methods-use-this diff --git a/test/command.js b/test/command.js index aecfa1842..ea0892b95 100644 --- a/test/command.js +++ b/test/command.js @@ -2,18 +2,6 @@ import _ from 'lodash'; import Redis from 'ioredis'; import command from '../src/command'; -// Ensure that we're getting the correct instance of Command when running in test:jest.js, as jest.js isn't designed to test code directly imported private functions like src/command -jest.mock('ioredis', () => { - const { Command } = jest.requireActual('ioredis'); - const RedisMock = jest.requireActual('../src/index'); - - return { - __esModule: true, - Command, - default: RedisMock, - }; -}); - describe('basic command', () => { const stub = command((...args) => args, 'testCommandName', { Command: { transformers: { argument: {}, reply: {} } }, diff --git a/test/data.js b/test/data.js index 05eed746a..ece001f80 100644 --- a/test/data.js +++ b/test/data.js @@ -1,34 +1,92 @@ -import createData from '../src/data'; -import createExpires from '../src/expires'; +import { createData, createSharedData } from '../src/data'; +import { createExpires, createSharedExpires } from '../src/expires'; + +describe('createSharedData', () => { + const sharedExpires = createSharedExpires(); + const sharedData = createSharedData(sharedExpires); + const expires = createExpires(sharedExpires); + const data1 = createData(sharedData, expires, { foo: 'bar1' }, 'data1:'); + const data2 = createData(sharedData, expires, { foo: 'bar2' }, 'data2:'); + + it('should return all keys and filter by data keyprefix', () => { + expect(data1.keys()).toEqual(['data1:foo']); + expect(data2.keys()).toEqual(['data2:foo']); + expect(sharedData.keys()).toEqual(['data1:foo', 'data2:foo']); + }); + + it('should has all data keys', () => { + expect(data1.has('foo')).toBe(true); + expect(data2.has('foo')).toBe(true); + expect(sharedData.has('data1:foo')).toBe(true); + expect(sharedData.has('data2:foo')).toBe(true); + }); + + it('should get all data keys', () => { + expect(data1.get('foo')).toEqual('bar1'); + expect(data2.get('foo')).toEqual('bar2'); + expect(sharedData.get('data1:foo')).toEqual('bar1'); + expect(sharedData.get('data2:foo')).toEqual('bar2'); + }); + + it('should be synced with client datas and vice-versa', () => { + sharedData.set('data1:bar', 'test1'); + sharedData.set('data2:bar', 'test2'); + data1.set('baz', 'foo1'); + data2.set('baz', 'foo2'); + + expect(data1.has('bar')).toBe(true); + expect(data1.get('bar')).toEqual('test1'); + expect(data2.has('bar')).toBe(true); + expect(data2.get('bar')).toEqual('test2'); + + expect(sharedData.has('data1:baz')).toBe(true); + expect(sharedData.get('data1:baz')).toEqual('foo1'); + expect(sharedData.has('data2:baz')).toBe(true); + expect(sharedData.get('data2:baz')).toEqual('foo2'); + }); +}); describe('createData', () => { - const expires = createExpires(); - const data = createData(expires, { foo: 'bar' }); + const sharedExpires = createSharedExpires(); + const sharedData = createSharedData(sharedExpires); + const expires = createExpires(sharedExpires); + const data = createData(sharedData, expires, { foo: 'bar' }); + it('should check expiry on get', () => { expires.set('foo', Date.now()); expect(data.get('foo')).toBeFalsy(); + expect(sharedData.get('foo')).toBeFalsy(); }); }); describe('createData with keyprefix', () => { - const expires = createExpires('test:'); - const data = createData(expires, { foo: 'bar' }, 'test:'); + const sharedExpires = createSharedExpires(); + const sharedData = createSharedData(sharedExpires); + const expires = createExpires(sharedExpires, 'test:'); + const data = createData(sharedData, expires, { foo: 'bar' }, 'test:'); - it('should return array keys of data that with keyprefix', () => { + it('should return array keys of data that with keyprefix', () => { expect(data.keys()).toEqual(['test:foo']); + expect(sharedData.keys()).toEqual(['test:foo']); }); it('should check expiry on get with', () => { expires.set('foo', Date.now()); expect(data.get('foo')).toBeFalsy(); + expect(sharedData.get('foo')).toBeFalsy(); }); }); describe('get', () => { + let sharedData; let data; beforeEach(() => { - data = createData(createExpires(), { + const sharedExpires = createSharedExpires(); + const expires = createExpires(sharedExpires); + + sharedData = createSharedData(sharedExpires); + data = createData(sharedData, expires, { myString: 'qwerty', mySet: new Set([1, 2, 3]), myBuffer: Buffer.from([0x31, 0x32, 0x33]), @@ -39,43 +97,54 @@ describe('get', () => { it('should return string values from the cache', () => { expect(data.get('myString')).toEqual('qwerty'); + expect(sharedData.get('myString')).toEqual('qwerty'); }); it('should return array copies from the cache', () => { const myArray = data.get('myArray'); myArray.push(4); expect(data.get('myArray')).toEqual([1, 2, 3]); + expect(sharedData.get('myArray')).toEqual([1, 2, 3]); }); it('should return object copies in the cache', () => { const myObject = data.get('myObject'); myObject.d = 4; expect(data.get('myObject')).toEqual({ a: 1, b: 2, c: 3 }); + expect(sharedData.get('myObject')).toEqual({ a: 1, b: 2, c: 3 }); }); it('should return set copies from the cache', () => { const mySet = data.get('mySet'); mySet.add(4); expect(data.get('mySet')).toEqual(new Set([1, 2, 3])); + expect(sharedData.get('mySet')).toEqual(new Set([1, 2, 3])); }); it('should return buffer copies from the cache', () => { const myBuffer = data.get('myBuffer'); myBuffer[0] = 0x32; expect(data.get('myBuffer')).toEqual(Buffer.from([0x31, 0x32, 0x33])); + expect(sharedData.get('myBuffer')).toEqual(Buffer.from([0x31, 0x32, 0x33])); }); }); describe('set', () => { + let sharedData; let data; beforeEach(() => { - data = createData(createExpires(), {}); + const sharedExpires = createSharedExpires(); + const expires = createExpires(sharedExpires); + + sharedData = createSharedData(sharedExpires); + data = createData(sharedData, expires, {}); }); it('should set string values in the cache', () => { data.set('myString', 'qwerty'); expect(data.get('myString')).toEqual('qwerty'); + expect(sharedData.get('myString')).toEqual('qwerty'); }); it('should set copies of arrays in the cache', () => { @@ -83,6 +152,7 @@ describe('set', () => { data.set('myArray', myArray); myArray.push(4); expect(data.get('myArray')).toEqual([1, 2, 3]); + expect(sharedData.get('myArray')).toEqual([1, 2, 3]); }); it('should set copies of objects in the cache', () => { @@ -90,6 +160,7 @@ describe('set', () => { data.set('myObject', myObject); myObject.d = 4; expect(data.get('myObject')).toEqual({ a: 1, b: 2, c: 3 }); + expect(sharedData.get('myObject')).toEqual({ a: 1, b: 2, c: 3 }); }); it('should set copies of sets in the cache', () => { @@ -97,6 +168,7 @@ describe('set', () => { data.set('mySet', mySet); mySet.add(4); expect(data.get('mySet')).toEqual(new Set([1, 2, 3])); + expect(sharedData.get('mySet')).toEqual(new Set([1, 2, 3])); }); it('should set copies of buffers in the cache', () => { @@ -104,5 +176,6 @@ describe('set', () => { data.set('myBuffer', myBuffer); myBuffer[0] = 0x32; expect(data.get('myBuffer')).toEqual(Buffer.from([0x31, 0x32, 0x33])); + expect(sharedData.get('myBuffer')).toEqual(Buffer.from([0x31, 0x32, 0x33])); }); }); diff --git a/test/expires.js b/test/expires.js index 02f7189bf..d9a02bccf 100644 --- a/test/expires.js +++ b/test/expires.js @@ -1,23 +1,82 @@ -import createExpires from '../src/expires'; +import { createExpires, createSharedExpires } from '../src/expires'; + +describe('createSharedExpires', () => { + const sharedExpires = createSharedExpires(); + + it('should implement get, set, has and delete lifecycle hooks', () => { + expect(sharedExpires.has('foo')).toBe(false); + + const expireAt = Date.now(); + sharedExpires.set('foo', expireAt); + + expect(sharedExpires.has('foo')).toBe(true); + + expect(sharedExpires.get('foo')).toBe(expireAt); + + expect(sharedExpires.isExpired('foo')).toBe(true); + + sharedExpires.delete('foo'); + expect(sharedExpires.has('foo')).toBe(false); + + sharedExpires.set('foo', Date.now() + 1000); + expect(sharedExpires.isExpired('foo')).toBe(false); + }); +}); + +describe('createSharedExpires', () => { + const sharedExpires = createSharedExpires(); -describe('createExpires', () => { - const expires = createExpires(); it('should implement get, set, has and delete lifecycle hooks', () => { - expect(expires.has('foo')).toBe(false); + const expires = createExpires(sharedExpires); + + expect(expires.has('bar')).toBe(false); + expect(sharedExpires.has('bar')).toBe(false); + + const expireAt = Date.now(); + expires.set('bar', expireAt); + + expect(expires.has('bar')).toBe(true); + expect(sharedExpires.has('bar')).toBe(true); + + expect(expires.get('bar')).toBe(expireAt); + expect(sharedExpires.get('bar')).toBe(expireAt); + + expect(expires.isExpired('bar')).toBe(true); + expect(sharedExpires.isExpired('bar')).toBe(true); + + expires.delete('bar'); + expect(expires.has('bar')).toBe(false); + expect(sharedExpires.has('bar')).toBe(false); + + expires.set('bar', Date.now() + 1000); + expect(expires.isExpired('bar')).toBe(false); + expect(sharedExpires.isExpired('bar')).toBe(false); + }); + + it('should implement get, set, has and delete lifecycle hooks with keyprefix', () => { + const expires = createExpires(sharedExpires, 'foo:'); + + expect(expires.has('bar')).toBe(false); + expect(sharedExpires.has('foo:bar')).toBe(false); const expireAt = Date.now(); - expires.set('foo', expireAt); + expires.set('bar', expireAt); - expect(expires.has('foo')).toBe(true); + expect(expires.has('bar')).toBe(true); + expect(sharedExpires.has('foo:bar')).toBe(true); - expect(expires.get('foo')).toBe(expireAt); + expect(expires.get('bar')).toBe(expireAt); + expect(sharedExpires.get('foo:bar')).toBe(expireAt); - expect(expires.isExpired('foo')).toBe(true); + expect(expires.isExpired('bar')).toBe(true); + expect(sharedExpires.isExpired('foo:bar')).toBe(true); - expires.delete('foo'); - expect(expires.has('foo')).toBe(false); + expires.delete('bar'); + expect(expires.has('bar')).toBe(false); + expect(sharedExpires.has('foo:bar')).toBe(false); - expires.set('foo', Date.now() + 1000); - expect(expires.isExpired('foo')).toBe(false); + expires.set('bar', Date.now() + 1000); + expect(expires.isExpired('bar')).toBe(false); + expect(sharedExpires.isExpired('foo:bar')).toBe(false); }); }); diff --git a/test/keyprefix.js b/test/keyprefix.js index 0cae8cc07..c45ddaab8 100644 --- a/test/keyprefix.js +++ b/test/keyprefix.js @@ -14,12 +14,7 @@ describe('keyprefix', () => { describe('get', () => { it('should return null on keys that do not exist', () => { const redis = new Redis({ keyPrefix: 'test:' }); - return redis.get('foo').then((result) => expect(result).toBe(null)); - }); - - it('should return null on keys that do not exist', () => { - const redis = new Redis({ keyPrefix: 'test:' }); - return redis.get('foo').then((result) => expect(result).toBe(null)); + return redis.get('bar').then((result) => expect(result).toBe(null)); }); it('should return value of key', () => { diff --git a/testSetupAfterEnv.js b/testSetupAfterEnv.js new file mode 100644 index 000000000..c02d2ca6a --- /dev/null +++ b/testSetupAfterEnv.js @@ -0,0 +1,5 @@ +import Redis from 'ioredis'; + +afterEach((done) => { + new Redis().flushall().then(() => done()); +}); diff --git a/testSetupJest.js b/testSetupJest.js index a7a766138..ad6176f43 100644 --- a/testSetupJest.js +++ b/testSetupJest.js @@ -1,4 +1,13 @@ // Redirects 'ioredis' imports to the compiled jest.js file that inlines the Command API from 'ioredis' // eslint-disable-next-line global-require, import/no-unresolved -jest.mock('ioredis', () => require('./jest')); +jest.mock('ioredis', () => { + const { Command } = jest.requireActual('ioredis'); + const Redis = jest.requireActual('./jest'); + + return { + __esModule: true, + Command, + default: Redis, + }; +});