Skip to content

Commit

Permalink
feat(pubsub): Add shared context among Redis instances (#1110)
Browse files Browse the repository at this point in the history
* feat(pubsub): add shared context among redis instances, fix #773

* fix: command transformers

Co-authored-by: Sibelius Seraphini <sibeliusseraphini@gmail.com>
  • Loading branch information
fersilva16 and sibelius authored Jan 22, 2022
1 parent cb17f5a commit 3da06de
Show file tree
Hide file tree
Showing 13 changed files with 363 additions and 128 deletions.
1 change: 1 addition & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ module.exports = {
testMatch: ['**/test/**/*.js'],
coverageDirectory: 'coverage',
setupFiles: ['<rootDir>/testSetupBabel.js'],
setupFilesAfterEnv: ['<rootDir>/testSetupAfterEnv.js'],
};
18 changes: 17 additions & 1 deletion src/commands/flushall.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
import contextMap, { createContext } from '../context';
import { createData } from '../data';
import { createExpires } from '../expires';

export function flushall() {
this.data.clear();
const oldContext = contextMap.get(this.keyData);
const newContext = createContext(oldContext.keyPrefix);

contextMap.set(this.keyData, newContext);

this.expires = createExpires(newContext.expires, newContext.keyPrefix);
this.data = createData(
newContext.data,
this.expires,
{},
newContext.keyPrefix
);

return 'OK';
}
2 changes: 1 addition & 1 deletion src/commands/flushdb.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export function flushdb() {
this.data.clear();
this.flushall();
return 'OK';
}
19 changes: 19 additions & 0 deletions src/context.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { EventEmitter } from 'events';
import { createSharedData } from './data';
import { createSharedExpires } from './expires';

const contextMap = new Map();

export default contextMap;

export function createContext(keyPrefix) {
const expires = createSharedExpires();

return {
channels: new EventEmitter(),
expires,
data: createSharedData(expires),
patternChannels: new EventEmitter(),
keyPrefix,
};
}
144 changes: 80 additions & 64 deletions src/data.js
Original file line number Diff line number Diff line change
@@ -1,79 +1,95 @@
import { assign } from 'lodash';

export default function createData(
expiresInstance,
initial = {},
keyPrefix = ''
) {
export function createSharedData(sharedExpires) {
let raw = {};

function createInstance(prefix, expires) {
return Object.freeze({
clear() {
raw = {};
},
delete(key) {
if (expires.has(key)) {
expires.delete(key);
}
delete raw[`${prefix}${key}`];
},
get(key) {
if (expires.has(key) && expires.isExpired(key)) {
this.delete(key);
}
return Object.freeze({
clear() {
raw = {};
},
delete(key) {
if (sharedExpires.has(key)) {
sharedExpires.delete(key);
}
delete raw[key];
},
get(key) {
if (sharedExpires.has(key) && sharedExpires.isExpired(key)) {
this.delete(key);
}

const value = raw[`${prefix}${key}`];
const value = raw[key];

if (Array.isArray(value)) {
return value.slice();
}
if (Array.isArray(value)) {
return value.slice();
}

if (Buffer.isBuffer(value)) {
return Buffer.from(value);
}
if (Buffer.isBuffer(value)) {
return Buffer.from(value);
}

if (value instanceof Set) {
return new Set(value);
}
if (value instanceof Set) {
return new Set(value);
}

if (value instanceof Map) {
return new Map(value);
}
if (value instanceof Map) {
return new Map(value);
}

if (typeof value === 'object' && value) {
return assign({}, value);
}
if (typeof value === 'object' && value) {
return assign({}, value);
}

return value;
},
has(key) {
if (expires.has(key) && expires.isExpired(key)) {
this.delete(key);
}
return value;
},
has(key) {
if (sharedExpires.has(key) && sharedExpires.isExpired(key)) {
this.delete(key);
}

return {}.hasOwnProperty.call(raw, `${prefix}${key}`);
},
keys() {
return Object.keys(raw);
},
set(key, val) {
let item = val;

if (Array.isArray(val)) {
item = val.slice();
} else if (Buffer.isBuffer(val)) {
item = Buffer.from(val);
} else if (val instanceof Set) {
item = new Set(val);
} else if (val instanceof Map) {
item = new Map(val);
} else if (typeof val === 'object' && val) {
item = assign({}, val);
}

raw[`${prefix}${key}`] = item;
},
return {}.hasOwnProperty.call(raw, key);
},
keys(prefix) {
const keys = Object.keys(raw);

if (!prefix) return keys;

return keys.filter((key) => key.startsWith(prefix));
},
set(key, val) {
let item = val;

if (Array.isArray(val)) {
item = val.slice();
} else if (Buffer.isBuffer(val)) {
item = Buffer.from(val);
} else if (val instanceof Set) {
item = new Set(val);
} else if (val instanceof Map) {
item = new Map(val);
} else if (typeof val === 'object' && val) {
item = assign({}, val);
}

raw[key] = item;
},
});
}

export function createData(
sharedData,
expiresInstance,
initial = {},
keyPrefix = ''
) {
function createInstance(prefix, expires) {
return Object.freeze({
clear: () => sharedData.clear(),
delete: (key) => sharedData.delete(`${prefix}${key}`),
get: (key) => sharedData.get(`${prefix}${key}`),
has: (key) => sharedData.has(`${prefix}${key}`),
keys: () => sharedData.keys(prefix),
set: (key, val) => sharedData.set(`${prefix}${key}`, val),
withKeyPrefix(newKeyPrefix) {
if (newKeyPrefix === prefix) return this;
return createInstance(
Expand Down
44 changes: 28 additions & 16 deletions src/expires.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
export default function createExpires(keyPrefix = '') {
export function createSharedExpires() {
const expires = {};

return Object.freeze({
get(key) {
return expires[key];
},
set(key, timestamp) {
expires[key] = +timestamp;
},
has(key) {
return {}.hasOwnProperty.call(expires, key);
},
isExpired(key) {
return expires[key] <= Date.now();
},
delete(key) {
delete expires[key];
},
});
}

export function createExpires(sharedExpires, keyPrefix = '') {
function createInstance(prefix) {
return {
get(key) {
return expires[`${prefix}${key}`];
},
set(key, timestamp) {
expires[`${prefix}${key}`] = +timestamp;
},
has(key) {
return {}.hasOwnProperty.call(expires, `${prefix}${key}`);
},
isExpired(key) {
return expires[`${prefix}${key}`] <= Date.now();
},
delete(key) {
delete expires[`${prefix}${key}`];
},
get: (key) => sharedExpires.get(`${prefix}${key}`),
set: (key, timestamp) => sharedExpires.set(`${prefix}${key}`, timestamp),
has: (key) => sharedExpires.has(`${prefix}${key}`),
isExpired: (key) => sharedExpires.isExpired(`${prefix}${key}`),
delete: (key) => sharedExpires.delete(`${prefix}${key}`),
withKeyPrefix(newPrefix) {
if (newPrefix === prefix) return this;
return createInstance(newPrefix);
},
};
}

return createInstance(keyPrefix);
}
54 changes: 48 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ import redisCommands from 'redis-commands';
import * as commands from './commands';
import * as commandsStream from './commands-stream';
import createCommand, { Command } from './command';
import createData from './data';
import createExpires from './expires';
import emitConnectEvent from './commands-utils/emitConnectEvent';
import Pipeline from './pipeline';
import promiseContainer from './promise-container';
import parseKeyspaceEvents from './keyspace-notifications';
import contextMap, { createContext } from './context';
import { createExpires } from './expires';
import { createData } from './data';

const defaultOptions = {
data: {},
keyPrefix: '',
lazyConnect: false,
notifyKeyspaceEvents: '', // string pattern as specified in https://redis.io/topics/notifications#configuration e.g. 'gxK'
host: 'localhost',
port: '6379',
};

class RedisMock extends EventEmitter {
Expand All @@ -29,8 +32,7 @@ class RedisMock extends EventEmitter {

constructor(options = {}) {
super();
this.channels = new EventEmitter();
this.patternChannels = new EventEmitter();

this.batch = undefined;
this.connected = false;
this.subscriberMode = false;
Expand All @@ -41,9 +43,19 @@ class RedisMock extends EventEmitter {
// eslint-disable-next-line prefer-object-spread
const optionsWithDefault = Object.assign({}, defaultOptions, options);

this.expires = createExpires(optionsWithDefault.keyPrefix);
this.keyData = `${optionsWithDefault.host}:${optionsWithDefault.port}`;

if (!contextMap.get(this.keyData)) {
const context = createContext(optionsWithDefault.keyPrefix);

contextMap.set(this.keyData, context);
}

const context = contextMap.get(this.keyData);

this.expires = createExpires(context.expires, optionsWithDefault.keyPrefix);
this.data = createData(
context.data,
this.expires,
optionsWithDefault.data,
optionsWithDefault.keyPrefix
Expand All @@ -61,6 +73,36 @@ class RedisMock extends EventEmitter {
}
}

get channels() {
return contextMap.get(this.keyData).channels;
}

set channels(channels) {
const oldContext = contextMap.get(this.keyData);

const newContext = {
...oldContext,
channels,
};

contextMap.set(this.keyData, newContext);
}

get patternChannels() {
return contextMap.get(this.keyData).patternChannels;
}

set patternChannels(patternChannels) {
const oldContext = contextMap.get(this.keyData);

const newContext = {
...oldContext,
patternChannels,
};

contextMap.set(this.keyData, newContext);
}

multi(batch = []) {
this.batch = new Pipeline(this);
// eslint-disable-next-line no-underscore-dangle
Expand Down Expand Up @@ -106,7 +148,7 @@ class RedisMock extends EventEmitter {
}

duplicate() {
return this.createConnectedClient()
return this.createConnectedClient();
}

// eslint-disable-next-line class-methods-use-this
Expand Down
12 changes: 0 additions & 12 deletions test/command.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@ import _ from 'lodash';
import Redis from 'ioredis';
import command from '../src/command';

// Ensure that we're getting the correct instance of Command when running in test:jest.js, as jest.js isn't designed to test code directly imported private functions like src/command
jest.mock('ioredis', () => {
const { Command } = jest.requireActual('ioredis');
const RedisMock = jest.requireActual('../src/index');

return {
__esModule: true,
Command,
default: RedisMock,
};
});

describe('basic command', () => {
const stub = command((...args) => args, 'testCommandName', {
Command: { transformers: { argument: {}, reply: {} } },
Expand Down
Loading

0 comments on commit 3da06de

Please sign in to comment.