Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
rpc: add rate limit in rpc from dm-master to dm-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed May 27, 2019
1 parent f04a93f commit e3e241e
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 66 deletions.
26 changes: 24 additions & 2 deletions dm/ctl/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import (
"flag"
"fmt"
"net"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/errors"
)

var (
defaultRPCTimeout = "10m"
)

// NewConfig creates a new base config for dmctl.
func NewConfig() *Config {
cfg := &Config{}
Expand All @@ -33,6 +38,7 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file")
fs.StringVar(&cfg.MasterAddr, "master-addr", "", "master API server addr")
fs.StringVar(&cfg.RPCTimeoutStr, "rpc-timeout", defaultRPCTimeout, "rpc timeout, default is 10m")
fs.StringVar(&cfg.encrypt, "encrypt", "", "encrypt plaintext to ciphertext")

return cfg
Expand All @@ -44,6 +50,9 @@ type Config struct {

MasterAddr string `toml:"master-addr" json:"master-addr"`

RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"`
RPCTimeout time.Duration `json:"-"`

ConfigFile string `json:"config-file"`

printVersion bool
Expand Down Expand Up @@ -106,7 +115,11 @@ func (c *Config) Parse(arguments []string) error {
return errors.Annotatef(err, "specify master addr %s", c.MasterAddr)
}

c.adjust()
err = c.adjust()
if err != nil {
return errors.Trace(err)
}

return nil
}

Expand All @@ -117,7 +130,16 @@ func (c *Config) configFromFile(path string) error {
}

// adjust adjusts configs
func (c *Config) adjust() {
func (c *Config) adjust() error {
if c.RPCTimeoutStr == "" {
c.RPCTimeoutStr = defaultRPCTimeout
}
timeout, err := time.ParseDuration(c.RPCTimeoutStr)
if err != nil {
return errors.Trace(err)
}
c.RPCTimeout = timeout
return nil
}

// validate host:port format address
Expand Down
14 changes: 13 additions & 1 deletion dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ import (

var (
masterClient pb.MasterClient
globalConfig *Config
)

// InitClient initializes dm-worker client or dm-master client
// InitUtils inits necessary dmctl utils
func InitUtils(cfg *Config) error {
globalConfig = cfg
return errors.Trace(InitClient(cfg.MasterAddr))
}

// InitClient initializes dm-master client
func InitClient(addr string) error {
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second))
if err != nil {
Expand All @@ -44,6 +51,11 @@ func InitClient(addr string) error {
return nil
}

// GlobalConfig returns global dmctl config
func GlobalConfig() *Config {
return globalConfig
}

// MasterClient returns dm-master client
func MasterClient() pb.MasterClient {
return masterClient
Expand Down
2 changes: 1 addition & 1 deletion dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type CommandMasterFlags struct {
func Init(cfg *common.Config) error {
// set the log level temporarily
log.SetLevelByString("info")
return errors.Trace(common.InitClient(cfg.MasterAddr))
return errors.Trace(common.InitUtils(cfg))
}

// Start starts running a command
Expand Down
3 changes: 3 additions & 0 deletions dm/ctl/dmctl.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# dmctl Configuration.

# rpc configuration
rpc-timeout = "5s"

master-addr = ":8261"
4 changes: 2 additions & 2 deletions dm/ctl/master/query_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func queryStatusFunc(cmd *cobra.Command, _ []string) {
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := common.MasterClient()
ctx, cancel := context.WithTimeout(context.Background(), common.GlobalConfig().RPCTimeout)
defer cancel()
resp, err := cli.QueryStatus(ctx, &pb.QueryStatusListRequest{
Name: taskName,
Workers: workers,
Expand Down
98 changes: 70 additions & 28 deletions dm/master/agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,37 @@
package master

import (
"context"
"math"
"sync"

"golang.org/x/time/rate"
)

var (
pool *AgentPool // singleton instance
once sync.Once
agentlimit = 20
pool *AgentPool // singleton instance
once sync.Once
defalutRate float64 = 10
defaultBurst = 40
)

type emitFunc func(args ...interface{})

// AgentPool is a pool to control communication with dm-workers
// It provides rate limit control for agent acquire, including dispatch rate r
// and permits bursts of at most b tokens.
// caller shouldn't to hold agent to avoid deadlock
type AgentPool struct {
limit int
agents chan *Agent
requests chan int
agents chan *Agent
cfg *RateLimitConfig
limiter *rate.Limiter
}

// RateLimitConfig holds rate limit config
type RateLimitConfig struct {
rate float64 // dispatch rate
burst int // max permits bursts
}

// Agent communicate with dm-workers
Expand All @@ -36,42 +53,67 @@ type Agent struct {
}

// NewAgentPool returns a agent pool
func NewAgentPool(limit int) *AgentPool {
agents := make(chan *Agent, limit)
for i := 0; i < limit; i++ {
agents <- &Agent{ID: i + 1}
}
func NewAgentPool(cfg *RateLimitConfig) *AgentPool {
requests := make(chan int, int(math.Ceil(1/cfg.rate))+cfg.burst)
agents := make(chan *Agent, cfg.burst)
limiter := rate.NewLimiter(rate.Limit(cfg.rate), cfg.burst)

return &AgentPool{
limit: limit,
agents: agents,
requests: requests,
agents: agents,
cfg: cfg,
limiter: limiter,
}
}

// Apply applies for a agent
func (pool *AgentPool) Apply() *Agent {
agent := <-pool.agents
return agent
}
func (pool *AgentPool) Apply(ctx context.Context, id int) *Agent {
select {
case <-ctx.Done():
return nil
case pool.requests <- id:
}

// Recycle recycles agent
func (pool *AgentPool) Recycle(agent *Agent) {
pool.agents <- agent
select {
case <-ctx.Done():
return nil
case agent := <-pool.agents:
return agent
}
}

// GetAgentPool a singleton agent pool
func GetAgentPool() *AgentPool {
// InitAgentPool initials agent pool singleton
func InitAgentPool(cfg *RateLimitConfig) *AgentPool {
once.Do(func() {
pool = NewAgentPool(agentlimit)
pool = NewAgentPool(&RateLimitConfig{rate: cfg.rate, burst: cfg.burst})
go pool.dispatch()
})
return pool
}

// Emit apply for a agent to communicates with dm-worker
func Emit(fn func(args ...interface{}), args ...interface{}) {
ap := GetAgentPool()
agent := ap.Apply()
defer ap.Recycle(agent)
func (pool *AgentPool) dispatch() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
select {
case <-ctx.Done():
return
case id := <-pool.requests:
err := pool.limiter.Wait(ctx)
if err == context.Canceled {
return
}
pool.agents <- &Agent{ID: id}
}
}
}

fn(args...)
// Emit applies for an agent to communicates with dm-worker
func Emit(ctx context.Context, id int, fn emitFunc, errFn emitFunc, args ...interface{}) {
agent := pool.Apply(ctx, id)
if agent == nil {
errFn(args...)
} else {
fn(args...)
}
}
59 changes: 42 additions & 17 deletions dm/master/agent_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package master

import (
"context"
"time"

. "github.com/pingcap/check"
)

Expand All @@ -23,33 +26,39 @@ func (t *testMaster) TestAgentPool(c *C) {
}

func (t *testMaster) testPool(c *C) {
var (
rate = 10
burst = 100
)
// test limit
agentlimit = 2
InitAgentPool(&RateLimitConfig{rate: float64(rate), burst: burst})
pc := make(chan *Agent)

go func() {
ap := GetAgentPool()
pc <- ap.Apply()
pc <- ap.Apply()
pc <- ap.Apply()

for i := 0; i < rate+burst; i++ {
pc <- pool.Apply(context.Background(), i)
}
}()

agent1 := <-pc
c.Assert(agent1.ID, Equals, 1)
agent2 := <-pc
c.Assert(agent2.ID, Equals, 2)
for i := 0; i < burst; i++ {
agent := <-pc
c.Assert(agent.ID, Equals, i)
}
select {
case <-pc:
c.FailNow()
default:
}

GetAgentPool().Recycle(agent1)
agent := <-pc
c.Assert(agent.ID, Equals, 1)
GetAgentPool().Recycle(agent2)
GetAgentPool().Recycle(agent)
for i := 0; i < rate; i++ {
select {
case agent := <-pc:
c.Assert(agent.ID, Equals, i+burst)
case <-time.After(time.Millisecond * 150):
// add 50ms time drift here
c.FailNow()
}
}
}

func (t *testMaster) testEmit(c *C) {
Expand All @@ -60,7 +69,7 @@ func (t *testMaster) testEmit(c *C) {
worker testWorkerType = 1
)

Emit(func(args ...interface{}) {
Emit(context.Background(), 1, func(args ...interface{}) {
if len(args) != 2 {
c.Fatalf("args count is not 2, args %v", args)
}
Expand All @@ -80,6 +89,22 @@ func (t *testMaster) testEmit(c *C) {
if worker1 != worker {
c.Fatalf("args[1] is not expected worker, args[1] %v vs %v", worker1, worker)
}
}, []interface{}{id, worker}...)
}, func(args ...interface{}) {}, []interface{}{id, worker}...)

counter := 0
ctx, cancel := context.WithCancel(context.Background())
cancel()
Emit(ctx, 1, func(args ...interface{}) {
c.FailNow()
}, func(args ...interface{}) {
if len(args) != 1 {
c.Fatalf("args count is not 1, args %v", args)
}
pCounter, ok := args[0].(*int)
if !ok {
c.Fatalf("args[0] is not *int, args %+v", args)
}
*pCounter++
}, []interface{}{&counter}...)
c.Assert(counter, Equals, 1)
}
14 changes: 14 additions & 0 deletions dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io/ioutil"
"strings"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/dm/pkg/log"
Expand All @@ -32,6 +33,8 @@ import (
// and assign it to SampleConfigFile while we build dm-master
var SampleConfigFile string

var defaultRPCTimeout = "30s"

// NewConfig creates a config for dm-master
func NewConfig() *Config {
cfg := &Config{}
Expand Down Expand Up @@ -73,6 +76,9 @@ type Config struct {
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`

RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"`
RPCTimeout time.Duration `json:"-"`

MasterAddr string `toml:"master-addr" json:"master-addr"`

Deploy []*DeployMapper `toml:"deploy" json:"-"`
Expand Down Expand Up @@ -161,6 +167,14 @@ func (c *Config) adjust() error {

c.DeployMap[item.Source] = item.Worker
}
if c.RPCTimeoutStr == "" {
c.RPCTimeoutStr = defaultRPCTimeout
}
timeout, err := time.ParseDuration(c.RPCTimeoutStr)
if err != nil {
return errors.Trace(err)
}
c.RPCTimeout = timeout
return nil
}

Expand Down
Loading

0 comments on commit e3e241e

Please sign in to comment.