From 49149d0fb040216689478374de3078b5b84c3e2a Mon Sep 17 00:00:00 2001 From: luin Date: Sun, 9 Dec 2018 00:52:47 +0800 Subject: [PATCH] feat(cluster): add NAT support Closes #693 --- README.md | 21 +++++++ lib/cluster/ClusterOptions.ts | 2 + lib/cluster/index.ts | 25 ++++++--- lib/cluster/util.ts | 11 ++++ test/functional/cluster/nat.js | 100 +++++++++++++++++++++++++++++++++ 5 files changed, 152 insertions(+), 7 deletions(-) create mode 100644 test/functional/cluster/nat.js diff --git a/README.md b/README.md index b8a057e0..40edf724 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ used in the world's biggest online commerce company [Alibaba](http://www.alibaba 0. Support for ES6 types, such as `Map` and `Set`. 0. Support for GEO commands (Redis 3.2 Unstable). 0. Sophisticated error handling strategy. +0. Support for NAT mapping. # Links * [API Documentation](API.md) @@ -836,6 +837,26 @@ Promise.all(masters.map(function (node) { }); ``` +### NAT Mapping +Sometimes the cluster is hosted within a internal network that can only be accessed via a NAT (Network Address Translation) instance. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) as an example. + +You can specify nat mapping rules via `natMap` option: + +```javascript +const cluster = new Redis.Cluster([{ + host: '203.0.113.73', + port: 30001 +}], { + natMap: { + '10.0.1.230:30001': {host: '203.0.113.73', port: 30001}, + '10.0.1.231:30001': {host: '203.0.113.73', port: 30002}, + '10.0.1.232:30001': {host: '203.0.113.73', port: 30003} + } +}) +``` + +This option is also useful when the cluster is running inside a Docker container. + ### Transaction and pipeline in Cluster mode Almost all features that are supported by `Redis` are also supported by `Redis.Cluster`, e.g. custom commands, transaction and pipeline. However there are some differences when using transaction and pipeline in Cluster mode: diff --git a/lib/cluster/ClusterOptions.ts b/lib/cluster/ClusterOptions.ts index 3fe9c8e6..b3cfb558 100644 --- a/lib/cluster/ClusterOptions.ts +++ b/lib/cluster/ClusterOptions.ts @@ -2,6 +2,7 @@ import {NodeRole} from './util' import {lookup} from 'dns' export type DNSLookupFunction = (hostname: string, callback: (err: NodeJS.ErrnoException, address: string, family: number) => void) => void +export type NatMap = {[key: string]: {host: string, port: number}} /** * Options for Cluster constructor @@ -116,6 +117,7 @@ export interface IClusterOptions { * @default require('dns').lookup */ dnsLookup?: DNSLookupFunction + natMap?: NatMap } export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = { diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 3221bd4c..bf403c9a 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -2,7 +2,7 @@ import {EventEmitter} from 'events' import ClusterAllFailedError from '../errors/ClusterAllFailedError' import {defaults, noop} from '../utils/lodash' import ConnectionPool from './ConnectionPool' -import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole, getUniqueHostnamesFromOptions} from './util' +import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole, getUniqueHostnamesFromOptions, nodeKeyToRedisOptions} from './util' import ClusterSubscriber from './ClusterSubscriber' import DelayQueue from './DelayQueue' import ScanStream from '../ScanStream' @@ -416,6 +416,18 @@ class Cluster extends EventEmitter { } } + natMapper(nodeKey: NodeKey | IRedisOptions): IRedisOptions { + if (this.options.natMap && typeof this.options.natMap === 'object') { + const key = typeof nodeKey === 'string' ? nodeKey : `${nodeKey.host}:${nodeKey.port}` + const mapped = this.options.natMap[key] + if (mapped) { + debug('NAT mapping %s -> %O', key, mapped) + return mapped + } + } + return typeof nodeKey === 'string' ? nodeKeyToRedisOptions(nodeKey) : nodeKey + } + sendCommand(command, stream, node) { if (this.status === 'wait') { this.connect().catch(noop) @@ -449,16 +461,15 @@ class Cluster extends EventEmitter { } else { _this.slots[slot] = [key] } - const splitKey = key.split(':') - _this.connectionPool.findOrCreate({host: splitKey[0], port: Number(splitKey[1])}) + _this.connectionPool.findOrCreate(_this.natMapper(key)) tryConnection() _this.refreshSlotsCache() }, ask: function (slot, key) { debug('command %s is required to ask %s:%s', command.name, key) - const splitKey = key.split(':') - _this.connectionPool.findOrCreate({host: splitKey[0], port: Number(splitKey[1])}) - tryConnection(false, key) + const mapped = _this.natMapper(key) + _this.connectionPool.findOrCreate(mapped) + tryConnection(false, `${mapped.host}:${mapped.port}`) }, tryagain: partialTry, clusterDown: partialTry, @@ -610,7 +621,7 @@ class Cluster extends EventEmitter { const keys = [] for (let j = 2; j < items.length; j++) { - items[j] = {host: items[j][0], port: items[j][1]} + items[j] = this.natMapper({host: items[j][0], port: items[j][1]}) items[j].readOnly = j !== 2 nodes.push(items[j]) keys.push(items[j].host + ':' + items[j].port) diff --git a/lib/cluster/util.ts b/lib/cluster/util.ts index e40a73cb..8c2c943f 100644 --- a/lib/cluster/util.ts +++ b/lib/cluster/util.ts @@ -18,6 +18,17 @@ export function getNodeKey(node: IRedisOptions): NodeKey { return node.host + ':' + node.port } +export function nodeKeyToRedisOptions(nodeKey: NodeKey): IRedisOptions { + const portIndex = nodeKey.lastIndexOf(':') + if (portIndex === -1) { + throw new Error(`Invalid node key ${nodeKey}`) + } + return { + host: nodeKey.slice(0, portIndex), + port: Number(nodeKey.slice(portIndex + 1)) + } +} + export function normalizeNodeOptions(nodes: Array): IRedisOptions[] { return nodes.map((node) => { const options: any = {} diff --git a/test/functional/cluster/nat.js b/test/functional/cluster/nat.js new file mode 100644 index 00000000..c1f7c226 --- /dev/null +++ b/test/functional/cluster/nat.js @@ -0,0 +1,100 @@ +const calculateSlot = require('cluster-key-slot') + +describe('NAT', () => { + it('works for normal case', (done) => { + const slotTable = [ + [0, 1, ['192.168.1.1', 30001]], + [2, 16383, ['192.168.1.2', 30001]] + ] + + let cluster + new MockServer(30001, null, slotTable) + new MockServer(30002, ([command, arg]) => { + if (command === 'get' && arg === 'foo') { + cluster.disconnect() + done() + } + }, slotTable) + + cluster = new Redis.Cluster([{ + host: '127.0.0.1', + port: 30001 + }], { + natMap: { + '192.168.1.1:30001': {host: '127.0.0.1', port: 30001}, + '192.168.1.2:30001': {host: '127.0.0.1', port: 30002} + } + }) + + cluster.get('foo') + }) + + it('works for moved', (done) => { + const slotTable = [ + [0, 16383, ['192.168.1.1', 30001]] + ] + + let cluster + new MockServer(30001, ([command, arg]) => { + if (command === 'get' && arg === 'foo') { + return new Error('MOVED ' + calculateSlot('foo') + ' 192.168.1.2:30001'); + } + }, slotTable) + new MockServer(30002, ([command, arg]) => { + if (command === 'get' && arg === 'foo') { + cluster.disconnect() + done() + } + }, slotTable) + + cluster = new Redis.Cluster([{ + host: '127.0.0.1', + port: 30001 + }], { + natMap: { + '192.168.1.1:30001': {host: '127.0.0.1', port: 30001}, + '192.168.1.2:30001': {host: '127.0.0.1', port: 30002} + } + }) + + cluster.get('foo') + }) + + it('works for ask', (done) => { + const slotTable = [ + [0, 16383, ['192.168.1.1', 30001]] + ] + + let cluster + let asked = false + new MockServer(30001, ([command, arg]) => { + if (command === 'get' && arg === 'foo') { + return new Error('ASK ' + calculateSlot('foo') + ' 192.168.1.2:30001'); + } + }, slotTable) + new MockServer(30002, ([command, arg]) => { + if (command === 'asking') { + asked = true + } + if (command === 'get' && arg === 'foo') { + if (!asked) { + throw new Error('expected asked to be true') + } + cluster.disconnect() + done() + } + }, slotTable) + + cluster = new Redis.Cluster([{ + host: '127.0.0.1', + port: 30001 + }], { + natMap: { + '192.168.1.1:30001': {host: '127.0.0.1', port: 30001}, + '192.168.1.2:30001': {host: '127.0.0.1', port: 30002} + } + }) + + cluster.get('foo') + }) +})