Skip to content

Commit

Permalink
feat(cluster): add NAT support
Browse files Browse the repository at this point in the history
Closes #693
  • Loading branch information
luin committed Dec 8, 2018
1 parent ab63994 commit 49149d0
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 7 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ used in the world's biggest online commerce company [Alibaba](http://www.alibaba
0. Support for ES6 types, such as `Map` and `Set`.
0. Support for GEO commands (Redis 3.2 Unstable).
0. Sophisticated error handling strategy.
0. Support for NAT mapping.

# Links
* [API Documentation](API.md)
Expand Down Expand Up @@ -836,6 +837,26 @@ Promise.all(masters.map(function (node) {
});
```

### NAT Mapping
Sometimes the cluster is hosted within a internal network that can only be accessed via a NAT (Network Address Translation) instance. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) as an example.

You can specify nat mapping rules via `natMap` option:

```javascript
const cluster = new Redis.Cluster([{
host: '203.0.113.73',
port: 30001
}], {
natMap: {
'10.0.1.230:30001': {host: '203.0.113.73', port: 30001},
'10.0.1.231:30001': {host: '203.0.113.73', port: 30002},
'10.0.1.232:30001': {host: '203.0.113.73', port: 30003}
}
})
```

This option is also useful when the cluster is running inside a Docker container.

### Transaction and pipeline in Cluster mode
Almost all features that are supported by `Redis` are also supported by `Redis.Cluster`, e.g. custom commands, transaction and pipeline.
However there are some differences when using transaction and pipeline in Cluster mode:
Expand Down
2 changes: 2 additions & 0 deletions lib/cluster/ClusterOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {NodeRole} from './util'
import {lookup} from 'dns'

export type DNSLookupFunction = (hostname: string, callback: (err: NodeJS.ErrnoException, address: string, family: number) => void) => void
export type NatMap = {[key: string]: {host: string, port: number}}

/**
* Options for Cluster constructor
Expand Down Expand Up @@ -116,6 +117,7 @@ export interface IClusterOptions {
* @default require('dns').lookup
*/
dnsLookup?: DNSLookupFunction
natMap?: NatMap
}

export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = {
Expand Down
25 changes: 18 additions & 7 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {EventEmitter} from 'events'
import ClusterAllFailedError from '../errors/ClusterAllFailedError'
import {defaults, noop} from '../utils/lodash'
import ConnectionPool from './ConnectionPool'
import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole, getUniqueHostnamesFromOptions} from './util'
import {NodeKey, IRedisOptions, normalizeNodeOptions, NodeRole, getUniqueHostnamesFromOptions, nodeKeyToRedisOptions} from './util'
import ClusterSubscriber from './ClusterSubscriber'
import DelayQueue from './DelayQueue'
import ScanStream from '../ScanStream'
Expand Down Expand Up @@ -416,6 +416,18 @@ class Cluster extends EventEmitter {
}
}

natMapper(nodeKey: NodeKey | IRedisOptions): IRedisOptions {
if (this.options.natMap && typeof this.options.natMap === 'object') {
const key = typeof nodeKey === 'string' ? nodeKey : `${nodeKey.host}:${nodeKey.port}`
const mapped = this.options.natMap[key]
if (mapped) {
debug('NAT mapping %s -> %O', key, mapped)
return mapped
}
}
return typeof nodeKey === 'string' ? nodeKeyToRedisOptions(nodeKey) : nodeKey
}

sendCommand(command, stream, node) {
if (this.status === 'wait') {
this.connect().catch(noop)
Expand Down Expand Up @@ -449,16 +461,15 @@ class Cluster extends EventEmitter {
} else {
_this.slots[slot] = [key]
}
const splitKey = key.split(':')
_this.connectionPool.findOrCreate({host: splitKey[0], port: Number(splitKey[1])})
_this.connectionPool.findOrCreate(_this.natMapper(key))
tryConnection()
_this.refreshSlotsCache()
},
ask: function (slot, key) {
debug('command %s is required to ask %s:%s', command.name, key)
const splitKey = key.split(':')
_this.connectionPool.findOrCreate({host: splitKey[0], port: Number(splitKey[1])})
tryConnection(false, key)
const mapped = _this.natMapper(key)
_this.connectionPool.findOrCreate(mapped)
tryConnection(false, `${mapped.host}:${mapped.port}`)
},
tryagain: partialTry,
clusterDown: partialTry,
Expand Down Expand Up @@ -610,7 +621,7 @@ class Cluster extends EventEmitter {

const keys = []
for (let j = 2; j < items.length; j++) {
items[j] = {host: items[j][0], port: items[j][1]}
items[j] = this.natMapper({host: items[j][0], port: items[j][1]})
items[j].readOnly = j !== 2
nodes.push(items[j])
keys.push(items[j].host + ':' + items[j].port)
Expand Down
11 changes: 11 additions & 0 deletions lib/cluster/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ export function getNodeKey(node: IRedisOptions): NodeKey {
return node.host + ':' + node.port
}

export function nodeKeyToRedisOptions(nodeKey: NodeKey): IRedisOptions {
const portIndex = nodeKey.lastIndexOf(':')
if (portIndex === -1) {
throw new Error(`Invalid node key ${nodeKey}`)
}
return {
host: nodeKey.slice(0, portIndex),
port: Number(nodeKey.slice(portIndex + 1))
}
}

export function normalizeNodeOptions(nodes: Array<string | number | object>): IRedisOptions[] {
return nodes.map((node) => {
const options: any = {}
Expand Down
100 changes: 100 additions & 0 deletions test/functional/cluster/nat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
const calculateSlot = require('cluster-key-slot')

describe('NAT', () => {
it('works for normal case', (done) => {
const slotTable = [
[0, 1, ['192.168.1.1', 30001]],
[2, 16383, ['192.168.1.2', 30001]]
]

let cluster
new MockServer(30001, null, slotTable)
new MockServer(30002, ([command, arg]) => {
if (command === 'get' && arg === 'foo') {
cluster.disconnect()
done()
}
}, slotTable)

cluster = new Redis.Cluster([{
host: '127.0.0.1',
port: 30001
}], {
natMap: {
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001},
'192.168.1.2:30001': {host: '127.0.0.1', port: 30002}
}
})

cluster.get('foo')
})

it('works for moved', (done) => {
const slotTable = [
[0, 16383, ['192.168.1.1', 30001]]
]

let cluster
new MockServer(30001, ([command, arg]) => {
if (command === 'get' && arg === 'foo') {
return new Error('MOVED ' + calculateSlot('foo') + ' 192.168.1.2:30001');
}
}, slotTable)
new MockServer(30002, ([command, arg]) => {
if (command === 'get' && arg === 'foo') {
cluster.disconnect()
done()
}
}, slotTable)

cluster = new Redis.Cluster([{
host: '127.0.0.1',
port: 30001
}], {
natMap: {
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001},
'192.168.1.2:30001': {host: '127.0.0.1', port: 30002}
}
})

cluster.get('foo')
})

it('works for ask', (done) => {
const slotTable = [
[0, 16383, ['192.168.1.1', 30001]]
]

let cluster
let asked = false
new MockServer(30001, ([command, arg]) => {
if (command === 'get' && arg === 'foo') {
return new Error('ASK ' + calculateSlot('foo') + ' 192.168.1.2:30001');
}
}, slotTable)
new MockServer(30002, ([command, arg]) => {
if (command === 'asking') {
asked = true
}
if (command === 'get' && arg === 'foo') {
if (!asked) {
throw new Error('expected asked to be true')
}
cluster.disconnect()
done()
}
}, slotTable)

cluster = new Redis.Cluster([{
host: '127.0.0.1',
port: 30001
}], {
natMap: {
'192.168.1.1:30001': {host: '127.0.0.1', port: 30001},
'192.168.1.2:30001': {host: '127.0.0.1', port: 30002}
}
})

cluster.get('foo')
})
})

0 comments on commit 49149d0

Please sign in to comment.