From e3e241e622bf8c333ec6318a0474b59dee1763e3 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 22 May 2019 15:23:08 +0800 Subject: [PATCH] rpc: add rate limit in rpc from dm-master to dm-worker --- dm/ctl/common/config.go | 26 +++++++++- dm/ctl/common/util.go | 14 ++++- dm/ctl/ctl.go | 2 +- dm/ctl/dmctl.toml | 3 ++ dm/ctl/master/query_status.go | 4 +- dm/master/agent_pool.go | 98 +++++++++++++++++++++++++---------- dm/master/agent_pool_test.go | 59 +++++++++++++++------ dm/master/config.go | 14 +++++ dm/master/dm-master.toml | 3 ++ dm/master/server.go | 68 +++++++++++++++++++----- go.mod | 4 +- go.sum | 5 +- 12 files changed, 234 insertions(+), 66 deletions(-) diff --git a/dm/ctl/common/config.go b/dm/ctl/common/config.go index dd1d53b4c8..9bb6d7a9a2 100644 --- a/dm/ctl/common/config.go +++ b/dm/ctl/common/config.go @@ -18,12 +18,17 @@ import ( "flag" "fmt" "net" + "time" "github.com/BurntSushi/toml" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/errors" ) +var ( + defaultRPCTimeout = "10m" +) + // NewConfig creates a new base config for dmctl. func NewConfig() *Config { cfg := &Config{} @@ -33,6 +38,7 @@ func NewConfig() *Config { fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit") fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file") fs.StringVar(&cfg.MasterAddr, "master-addr", "", "master API server addr") + fs.StringVar(&cfg.RPCTimeoutStr, "rpc-timeout", defaultRPCTimeout, "rpc timeout, default is 10m") fs.StringVar(&cfg.encrypt, "encrypt", "", "encrypt plaintext to ciphertext") return cfg @@ -44,6 +50,9 @@ type Config struct { MasterAddr string `toml:"master-addr" json:"master-addr"` + RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"` + RPCTimeout time.Duration `json:"-"` + ConfigFile string `json:"config-file"` printVersion bool @@ -106,7 +115,11 @@ func (c *Config) Parse(arguments []string) error { return errors.Annotatef(err, "specify master addr %s", c.MasterAddr) } - c.adjust() + err = c.adjust() + if err != nil { + return errors.Trace(err) + } + return nil } @@ -117,7 +130,16 @@ func (c *Config) configFromFile(path string) error { } // adjust adjusts configs -func (c *Config) adjust() { +func (c *Config) adjust() error { + if c.RPCTimeoutStr == "" { + c.RPCTimeoutStr = defaultRPCTimeout + } + timeout, err := time.ParseDuration(c.RPCTimeoutStr) + if err != nil { + return errors.Trace(err) + } + c.RPCTimeout = timeout + return nil } // validate host:port format address diff --git a/dm/ctl/common/util.go b/dm/ctl/common/util.go index e0ed6f1aa1..cad15734ac 100644 --- a/dm/ctl/common/util.go +++ b/dm/ctl/common/util.go @@ -32,9 +32,16 @@ import ( var ( masterClient pb.MasterClient + globalConfig *Config ) -// InitClient initializes dm-worker client or dm-master client +// InitUtils inits necessary dmctl utils +func InitUtils(cfg *Config) error { + globalConfig = cfg + return errors.Trace(InitClient(cfg.MasterAddr)) +} + +// InitClient initializes dm-master client func InitClient(addr string) error { conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second)) if err != nil { @@ -44,6 +51,11 @@ func InitClient(addr string) error { return nil } +// GlobalConfig returns global dmctl config +func GlobalConfig() *Config { + return globalConfig +} + // MasterClient returns dm-master client func MasterClient() pb.MasterClient { return masterClient diff --git a/dm/ctl/ctl.go b/dm/ctl/ctl.go index 210f822026..a922c32548 100644 --- a/dm/ctl/ctl.go +++ b/dm/ctl/ctl.go @@ -36,7 +36,7 @@ type CommandMasterFlags struct { func Init(cfg *common.Config) error { // set the log level temporarily log.SetLevelByString("info") - return errors.Trace(common.InitClient(cfg.MasterAddr)) + return errors.Trace(common.InitUtils(cfg)) } // Start starts running a command diff --git a/dm/ctl/dmctl.toml b/dm/ctl/dmctl.toml index 511b4e64ae..69f3450de1 100644 --- a/dm/ctl/dmctl.toml +++ b/dm/ctl/dmctl.toml @@ -1,3 +1,6 @@ # dmctl Configuration. +# rpc configuration +rpc-timeout = "5s" + master-addr = ":8261" diff --git a/dm/ctl/master/query_status.go b/dm/ctl/master/query_status.go index cdfdcd49d0..db4a7a9f85 100644 --- a/dm/ctl/master/query_status.go +++ b/dm/ctl/master/query_status.go @@ -47,9 +47,9 @@ func queryStatusFunc(cmd *cobra.Command, _ []string) { return } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() cli := common.MasterClient() + ctx, cancel := context.WithTimeout(context.Background(), common.GlobalConfig().RPCTimeout) + defer cancel() resp, err := cli.QueryStatus(ctx, &pb.QueryStatusListRequest{ Name: taskName, Workers: workers, diff --git a/dm/master/agent_pool.go b/dm/master/agent_pool.go index 1dd495d798..6089a0baa9 100644 --- a/dm/master/agent_pool.go +++ b/dm/master/agent_pool.go @@ -14,20 +14,37 @@ package master import ( + "context" + "math" "sync" + + "golang.org/x/time/rate" ) var ( - pool *AgentPool // singleton instance - once sync.Once - agentlimit = 20 + pool *AgentPool // singleton instance + once sync.Once + defalutRate float64 = 10 + defaultBurst = 40 ) +type emitFunc func(args ...interface{}) + // AgentPool is a pool to control communication with dm-workers +// It provides rate limit control for agent acquire, including dispatch rate r +// and permits bursts of at most b tokens. // caller shouldn't to hold agent to avoid deadlock type AgentPool struct { - limit int - agents chan *Agent + requests chan int + agents chan *Agent + cfg *RateLimitConfig + limiter *rate.Limiter +} + +// RateLimitConfig holds rate limit config +type RateLimitConfig struct { + rate float64 // dispatch rate + burst int // max permits bursts } // Agent communicate with dm-workers @@ -36,42 +53,67 @@ type Agent struct { } // NewAgentPool returns a agent pool -func NewAgentPool(limit int) *AgentPool { - agents := make(chan *Agent, limit) - for i := 0; i < limit; i++ { - agents <- &Agent{ID: i + 1} - } +func NewAgentPool(cfg *RateLimitConfig) *AgentPool { + requests := make(chan int, int(math.Ceil(1/cfg.rate))+cfg.burst) + agents := make(chan *Agent, cfg.burst) + limiter := rate.NewLimiter(rate.Limit(cfg.rate), cfg.burst) return &AgentPool{ - limit: limit, - agents: agents, + requests: requests, + agents: agents, + cfg: cfg, + limiter: limiter, } } // Apply applies for a agent -func (pool *AgentPool) Apply() *Agent { - agent := <-pool.agents - return agent -} +func (pool *AgentPool) Apply(ctx context.Context, id int) *Agent { + select { + case <-ctx.Done(): + return nil + case pool.requests <- id: + } -// Recycle recycles agent -func (pool *AgentPool) Recycle(agent *Agent) { - pool.agents <- agent + select { + case <-ctx.Done(): + return nil + case agent := <-pool.agents: + return agent + } } -// GetAgentPool a singleton agent pool -func GetAgentPool() *AgentPool { +// InitAgentPool initials agent pool singleton +func InitAgentPool(cfg *RateLimitConfig) *AgentPool { once.Do(func() { - pool = NewAgentPool(agentlimit) + pool = NewAgentPool(&RateLimitConfig{rate: cfg.rate, burst: cfg.burst}) + go pool.dispatch() }) return pool } -// Emit apply for a agent to communicates with dm-worker -func Emit(fn func(args ...interface{}), args ...interface{}) { - ap := GetAgentPool() - agent := ap.Apply() - defer ap.Recycle(agent) +func (pool *AgentPool) dispatch() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { + select { + case <-ctx.Done(): + return + case id := <-pool.requests: + err := pool.limiter.Wait(ctx) + if err == context.Canceled { + return + } + pool.agents <- &Agent{ID: id} + } + } +} - fn(args...) +// Emit applies for an agent to communicates with dm-worker +func Emit(ctx context.Context, id int, fn emitFunc, errFn emitFunc, args ...interface{}) { + agent := pool.Apply(ctx, id) + if agent == nil { + errFn(args...) + } else { + fn(args...) + } } diff --git a/dm/master/agent_pool_test.go b/dm/master/agent_pool_test.go index ea4a115b48..c32fa058d1 100644 --- a/dm/master/agent_pool_test.go +++ b/dm/master/agent_pool_test.go @@ -14,6 +14,9 @@ package master import ( + "context" + "time" + . "github.com/pingcap/check" ) @@ -23,33 +26,39 @@ func (t *testMaster) TestAgentPool(c *C) { } func (t *testMaster) testPool(c *C) { + var ( + rate = 10 + burst = 100 + ) // test limit - agentlimit = 2 + InitAgentPool(&RateLimitConfig{rate: float64(rate), burst: burst}) pc := make(chan *Agent) go func() { - ap := GetAgentPool() - pc <- ap.Apply() - pc <- ap.Apply() - pc <- ap.Apply() - + for i := 0; i < rate+burst; i++ { + pc <- pool.Apply(context.Background(), i) + } }() - agent1 := <-pc - c.Assert(agent1.ID, Equals, 1) - agent2 := <-pc - c.Assert(agent2.ID, Equals, 2) + for i := 0; i < burst; i++ { + agent := <-pc + c.Assert(agent.ID, Equals, i) + } select { case <-pc: c.FailNow() default: } - GetAgentPool().Recycle(agent1) - agent := <-pc - c.Assert(agent.ID, Equals, 1) - GetAgentPool().Recycle(agent2) - GetAgentPool().Recycle(agent) + for i := 0; i < rate; i++ { + select { + case agent := <-pc: + c.Assert(agent.ID, Equals, i+burst) + case <-time.After(time.Millisecond * 150): + // add 50ms time drift here + c.FailNow() + } + } } func (t *testMaster) testEmit(c *C) { @@ -60,7 +69,7 @@ func (t *testMaster) testEmit(c *C) { worker testWorkerType = 1 ) - Emit(func(args ...interface{}) { + Emit(context.Background(), 1, func(args ...interface{}) { if len(args) != 2 { c.Fatalf("args count is not 2, args %v", args) } @@ -80,6 +89,22 @@ func (t *testMaster) testEmit(c *C) { if worker1 != worker { c.Fatalf("args[1] is not expected worker, args[1] %v vs %v", worker1, worker) } - }, []interface{}{id, worker}...) + }, func(args ...interface{}) {}, []interface{}{id, worker}...) + counter := 0 + ctx, cancel := context.WithCancel(context.Background()) + cancel() + Emit(ctx, 1, func(args ...interface{}) { + c.FailNow() + }, func(args ...interface{}) { + if len(args) != 1 { + c.Fatalf("args count is not 1, args %v", args) + } + pCounter, ok := args[0].(*int) + if !ok { + c.Fatalf("args[0] is not *int, args %+v", args) + } + *pCounter++ + }, []interface{}{&counter}...) + c.Assert(counter, Equals, 1) } diff --git a/dm/master/config.go b/dm/master/config.go index 45f79eba76..325c91b8c3 100644 --- a/dm/master/config.go +++ b/dm/master/config.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" "strings" + "time" "github.com/BurntSushi/toml" "github.com/pingcap/dm/pkg/log" @@ -32,6 +33,8 @@ import ( // and assign it to SampleConfigFile while we build dm-master var SampleConfigFile string +var defaultRPCTimeout = "30s" + // NewConfig creates a config for dm-master func NewConfig() *Config { cfg := &Config{} @@ -73,6 +76,9 @@ type Config struct { LogFile string `toml:"log-file" json:"log-file"` LogRotate string `toml:"log-rotate" json:"log-rotate"` + RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"` + RPCTimeout time.Duration `json:"-"` + MasterAddr string `toml:"master-addr" json:"master-addr"` Deploy []*DeployMapper `toml:"deploy" json:"-"` @@ -161,6 +167,14 @@ func (c *Config) adjust() error { c.DeployMap[item.Source] = item.Worker } + if c.RPCTimeoutStr == "" { + c.RPCTimeoutStr = defaultRPCTimeout + } + timeout, err := time.ParseDuration(c.RPCTimeoutStr) + if err != nil { + return errors.Trace(err) + } + c.RPCTimeout = timeout return nil } diff --git a/dm/master/dm-master.toml b/dm/master/dm-master.toml index 4937ecdfc0..74c9a859c0 100644 --- a/dm/master/dm-master.toml +++ b/dm/master/dm-master.toml @@ -1,5 +1,8 @@ # Master Configuration. +# rpc configuration +rpc-timeout = "30s" + #log configuration log-level = "info" log-file = "dm-master.log" diff --git a/dm/master/server.go b/dm/master/server.go index 099c8605aa..94477228f0 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -80,6 +80,9 @@ func NewServer(cfg *Config) *Server { sqlOperatorHolder: operator.NewHolder(), idGen: tracing.NewIDGen(), } + + InitAgentPool(&RateLimitConfig{rate: defalutRate, burst: defaultBurst}) + return &server } @@ -220,23 +223,49 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S } } + handleErr := func(err error) bool { + log.Error(err) + workerRespCh <- &pb.CommonWorkerResponse{ + Result: false, + Msg: err.Error(), + } + return false + } + + argsExtractor := func(args ...interface{}) (*config.SubTaskConfig, bool) { + if len(args) != 1 { + return nil, handleErr(errors.Errorf("fail to start task, miss argument %v", args)) + } + cfg, ok := args[0].(*config.SubTaskConfig) + if !ok { + return nil, handleErr(errors.Errorf("fail to start task, can't get task config, arguments %v", args)) + } + return cfg, true + } + validWorkerCh := make(chan string, len(stCfgs)) var wg sync.WaitGroup for _, stCfg := range stCfgs { wg.Add(1) - go func(stCfg *config.SubTaskConfig) { + go Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() - worker, ok1 := s.cfg.DeployMap[stCfg.SourceID] + + stCfg2, ok := argsExtractor(args...) + if !ok { + return + } + + worker, ok1 := s.cfg.DeployMap[stCfg2.SourceID] cli, ok2 := s.workerClients[worker] if !ok1 || !ok2 { workerRespCh <- &pb.CommonWorkerResponse{ Result: false, - Msg: fmt.Sprintf("%s relevant worker not found", stCfg.SourceID), + Msg: fmt.Sprintf("%s relevant worker not found", stCfg2.SourceID), } return } validWorkerCh <- worker - stCfgToml, err := stCfg.Toml() // convert to TOML format + stCfg2Toml, err := stCfg2.Toml() // convert to TOML format if err != nil { workerRespCh <- &pb.CommonWorkerResponse{ Result: false, @@ -245,11 +274,23 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S } return } - workerResp, err := cli.StartSubTask(ctx, &pb.StartSubTaskRequest{Task: stCfgToml}) - workerResp = s.handleOperationResult(ctx, cli, stCfg.Name, err, workerResp) + workerResp, err := cli.StartSubTask(ctx, &pb.StartSubTaskRequest{Task: stCfg2Toml}) + workerResp = s.handleOperationResult(ctx, cli, stCfg2.Name, err, workerResp) workerResp.Meta.Worker = worker workerRespCh <- workerResp.Meta - }(stCfg) + + }, func(args ...interface{}) { + defer wg.Done() + + stCfg2, ok := argsExtractor(args...) + if !ok { + return + } + workerRespCh <- &pb.CommonWorkerResponse{ + Result: false, + Msg: fmt.Sprintf("fail to get emit opporunity for source %s", stCfg2.SourceID), + } + }, stCfg) } wg.Wait() @@ -1051,7 +1092,9 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, workers []string, tas go func(worker string) { defer wg.Done() cli := s.workerClients[worker] - workerStatus, err := cli.QueryStatus(ctx, workerReq) + cctx, cancel := context.WithTimeout(ctx, s.cfg.RPCTimeout) + defer cancel() + workerStatus, err := cli.QueryStatus(cctx, workerReq) if err != nil { workerStatus = &pb.QueryStatusResponse{ Result: false, @@ -1602,15 +1645,13 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf err error ) handErr := func(err2 error) { - if err2 != nil { - log.Error(err2) - } + log.Error(err2) errCh <- errors.Trace(err2) } for id, worker := range s.workerClients { wg.Add(1) - go Emit(func(args ...interface{}) { + go Emit(ctx, 0, func(args ...interface{}) { defer wg.Done() if len(args) != 2 { handErr(errors.Errorf("fail to call emit to fetch worker config, miss some arguments %v", args)) @@ -1661,6 +1702,9 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf workerCfgs[resp.SourceID] = *dbCfg workerMutex.Unlock() + }, func(args ...interface{}) { + defer wg.Done() + handErr(errors.Errorf("fail to get emit opporunity for worker %v", args)) }, []interface{}{id, worker}...) } diff --git a/go.mod b/go.mod index 7da2a2af70..4f8fe02567 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,9 @@ require ( github.com/soheilhy/cmux v0.1.4 github.com/spf13/cobra v0.0.4 github.com/syndtr/goleveldb v1.0.0 - golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 + golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc // indirect + golang.org/x/sys v0.0.0-20190422165155-953cdadca894 + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 google.golang.org/grpc v1.17.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.2.2 diff --git a/go.sum b/go.sum index 55b502982b..d219687ea7 100644 --- a/go.sum +++ b/go.sum @@ -323,6 +323,8 @@ golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc h1:F5tKCVGp+MUAHhKp5MZtGqAlGX3+oCsiL1Q629FL90M= +golang.org/x/crypto v0.0.0-20190103213133-ff983b9c42bc/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -344,9 +346,8 @@ golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U= -golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=