Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

rpc: add rate limit and rpc client manage in dm-master #157

Merged
merged 31 commits into from
Jun 21, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f4d1a3d
rpc: add rate limit in rpc from dm-master to dm-worker
amyangfei May 22, 2019
2a44dbc
add rpc connection manage
amyangfei May 29, 2019
5b4d354
fix make check
amyangfei May 29, 2019
6507f43
apply rpc conn management to all dm-worker conn
amyangfei May 29, 2019
2baedc5
fix worker client extractor in allWorkerConfigs
amyangfei May 29, 2019
b731a87
Merge branch 'master' into rpc-process-tuning
amyangfei May 29, 2019
6ab4d6c
Merge branch 'master' into rpc-process-tuning
amyangfei May 30, 2019
25f9b59
fix format error raised by merge conflict resolve
amyangfei May 30, 2019
0823299
check whether rpc client closed before real call
amyangfei May 30, 2019
5379a34
Merge branch 'master' into rpc-process-tuning
amyangfei May 30, 2019
a34497c
fix dm-master unit test
amyangfei May 30, 2019
f187d0d
Merge branch 'master' into rpc-process-tuning
amyangfei May 31, 2019
4028455
Merge branch 'master' into rpc-process-tuning
IANTHEREAL Jun 4, 2019
bd0ef43
address comment, change AgentPool to common struct, not singleton
amyangfei Jun 4, 2019
ba40a6f
address comment
amyangfei Jun 4, 2019
437bd6e
address comment
amyangfei Jun 5, 2019
fc3b80c
address comment
amyangfei Jun 5, 2019
6d3bc26
address comment
amyangfei Jun 5, 2019
f940c59
increase wait time to avoid ci failure
amyangfei Jun 5, 2019
caf1e10
revert temp update for passing ut
amyangfei Jun 6, 2019
1e7d535
Merge branch 'master' into rpc-process-tuning
amyangfei Jun 10, 2019
29c9853
Update dm/master/agent_pool.go
amyangfei Jun 11, 2019
e067772
address comment
amyangfei Jun 11, 2019
146e7b9
Merge branch 'master' into rpc-process-tuning
amyangfei Jun 16, 2019
aab1ddc
address comment
amyangfei Jun 19, 2019
6f6375b
address comment and run go mod tidy
amyangfei Jun 20, 2019
97b1420
address comment
amyangfei Jun 20, 2019
6c0d022
address comment
amyangfei Jun 20, 2019
c70d62a
Merge branch 'master' into rpc-process-tuning
amyangfei Jun 20, 2019
0e1d64d
address comment
amyangfei Jun 20, 2019
789988e
address comment
amyangfei Jun 21, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions dm/ctl/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ import (
"flag"
"fmt"
"net"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/errors"
)

var (
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
defaultRPCTimeout = "10m"
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
)

// NewConfig creates a new base config for dmctl.
func NewConfig() *Config {
cfg := &Config{}
Expand All @@ -33,6 +38,7 @@ func NewConfig() *Config {
fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file")
fs.StringVar(&cfg.MasterAddr, "master-addr", "", "master API server addr")
fs.StringVar(&cfg.RPCTimeoutStr, "rpc-timeout", defaultRPCTimeout, "rpc timeout, default is 10m")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since defaultRPCTimeout, maybe we should use it in the description string to make sure it's always in sync with the variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may use fs.DurationVar with &cfg.RPCTimeout here.

fs.StringVar(&cfg.encrypt, "encrypt", "", "encrypt plaintext to ciphertext")

return cfg
Expand All @@ -44,6 +50,9 @@ type Config struct {

MasterAddr string `toml:"master-addr" json:"master-addr"`

RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"`
RPCTimeout time.Duration `json:"-"`

ConfigFile string `json:"config-file"`

printVersion bool
Expand Down Expand Up @@ -106,7 +115,11 @@ func (c *Config) Parse(arguments []string) error {
return errors.Annotatef(err, "specify master addr %s", c.MasterAddr)
}

c.adjust()
err = c.adjust()
if err != nil {
return errors.Trace(err)
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

Expand All @@ -117,7 +130,16 @@ func (c *Config) configFromFile(path string) error {
}

// adjust adjusts configs
func (c *Config) adjust() {
func (c *Config) adjust() error {
if c.RPCTimeoutStr == "" {
c.RPCTimeoutStr = defaultRPCTimeout
}
timeout, err := time.ParseDuration(c.RPCTimeoutStr)
if err != nil {
return errors.Trace(err)
}
c.RPCTimeout = timeout
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// validate host:port format address
Expand Down
14 changes: 13 additions & 1 deletion dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ import (

var (
masterClient pb.MasterClient
globalConfig *Config
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
)

// InitClient initializes dm-worker client or dm-master client
// InitUtils inits necessary dmctl utils
func InitUtils(cfg *Config) error {
globalConfig = cfg
return errors.Trace(InitClient(cfg.MasterAddr))
}

// InitClient initializes dm-master client
func InitClient(addr string) error {
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second))
if err != nil {
Expand All @@ -44,6 +51,11 @@ func InitClient(addr string) error {
return nil
}

// GlobalConfig returns global dmctl config
func GlobalConfig() *Config {
return globalConfig
}

// MasterClient returns dm-master client
func MasterClient() pb.MasterClient {
return masterClient
Expand Down
2 changes: 1 addition & 1 deletion dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type CommandMasterFlags struct {
func Init(cfg *common.Config) error {
// set the log level temporarily
log.SetLevelByString("info")
return errors.Trace(common.InitClient(cfg.MasterAddr))
return errors.Trace(common.InitUtils(cfg))
}

// Start starts running a command
Expand Down
3 changes: 3 additions & 0 deletions dm/ctl/dmctl.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# dmctl Configuration.

# rpc configuration
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
rpc-timeout = "5s"

master-addr = ":8261"
4 changes: 2 additions & 2 deletions dm/ctl/master/query_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func queryStatusFunc(cmd *cobra.Command, _ []string) {
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := common.MasterClient()
ctx, cancel := context.WithTimeout(context.Background(), common.GlobalConfig().RPCTimeout)
defer cancel()
resp, err := cli.QueryStatus(ctx, &pb.QueryStatusListRequest{
Name: taskName,
Workers: workers,
Expand Down
100 changes: 72 additions & 28 deletions dm/master/agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,38 @@
package master

import (
"context"
"math"
"sync"

"golang.org/x/time/rate"
)

var (
pool *AgentPool // singleton instance
once sync.Once
agentlimit = 20
pool *AgentPool // singleton instance
once sync.Once
defalutRate float64 = 10
defaultBurst = 40
errorNoEmitToken = "fail to get emit opporunity for %s"
)

type emitFunc func(args ...interface{})
amyangfei marked this conversation as resolved.
Show resolved Hide resolved

// AgentPool is a pool to control communication with dm-workers
// It provides rate limit control for agent acquire, including dispatch rate r
// and permits bursts of at most b tokens.
// caller shouldn't to hold agent to avoid deadlock
type AgentPool struct {
limit int
agents chan *Agent
requests chan int
agents chan *Agent
cfg *RateLimitConfig
limiter *rate.Limiter
}

// RateLimitConfig holds rate limit config
type RateLimitConfig struct {
rate float64 // dispatch rate
burst int // max permits bursts
}

// Agent communicate with dm-workers
Expand All @@ -36,42 +54,68 @@ type Agent struct {
}

// NewAgentPool returns a agent pool
func NewAgentPool(limit int) *AgentPool {
agents := make(chan *Agent, limit)
for i := 0; i < limit; i++ {
agents <- &Agent{ID: i + 1}
}
func NewAgentPool(cfg *RateLimitConfig) *AgentPool {
requests := make(chan int, int(math.Ceil(1/cfg.rate))+cfg.burst)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need some logic to make sure cfg.rate is positive?

agents := make(chan *Agent, cfg.burst)
limiter := rate.NewLimiter(rate.Limit(cfg.rate), cfg.burst)

return &AgentPool{
limit: limit,
agents: agents,
requests: requests,
agents: agents,
cfg: cfg,
limiter: limiter,
}
}

// Apply applies for a agent
func (pool *AgentPool) Apply() *Agent {
agent := <-pool.agents
return agent
}
// if ctx is canceled before we get an agent, returns nil
func (pool *AgentPool) Apply(ctx context.Context, id int) *Agent {
select {
case <-ctx.Done():
return nil
case pool.requests <- id:
}

// Recycle recycles agent
func (pool *AgentPool) Recycle(agent *Agent) {
pool.agents <- agent
select {
case <-ctx.Done():
return nil
case agent := <-pool.agents:
return agent
}
}

// GetAgentPool a singleton agent pool
func GetAgentPool() *AgentPool {
// InitAgentPool initials agent pool singleton
func InitAgentPool(cfg *RateLimitConfig) *AgentPool {
once.Do(func() {
pool = NewAgentPool(agentlimit)
pool = NewAgentPool(&RateLimitConfig{rate: cfg.rate, burst: cfg.burst})
go pool.dispatch()
})
return pool
}

// Emit apply for a agent to communicates with dm-worker
func Emit(fn func(args ...interface{}), args ...interface{}) {
ap := GetAgentPool()
agent := ap.Apply()
defer ap.Recycle(agent)
func (pool *AgentPool) dispatch() {
ctx, cancel := context.WithCancel(context.Background())
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
for {
select {
case <-ctx.Done():
return
case id := <-pool.requests:
err := pool.limiter.Wait(ctx)
if err == context.Canceled {
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
return
}
pool.agents <- &Agent{ID: id}
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

fn(args...)
// Emit applies for an agent to communicates with dm-worker
func Emit(ctx context.Context, id int, fn emitFunc, errFn emitFunc, args ...interface{}) {
agent := pool.Apply(ctx, id)
if agent == nil {
errFn(args...)
} else {
fn(args...)
}
}
59 changes: 42 additions & 17 deletions dm/master/agent_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package master

import (
"context"
"time"

. "github.com/pingcap/check"
)

Expand All @@ -23,33 +26,39 @@ func (t *testMaster) TestAgentPool(c *C) {
}

func (t *testMaster) testPool(c *C) {
var (
rate = 10
burst = 100
)
// test limit
agentlimit = 2
InitAgentPool(&RateLimitConfig{rate: float64(rate), burst: burst})
pc := make(chan *Agent)

go func() {
ap := GetAgentPool()
pc <- ap.Apply()
pc <- ap.Apply()
pc <- ap.Apply()

for i := 0; i < rate+burst; i++ {
pc <- pool.Apply(context.Background(), i)
}
}()

agent1 := <-pc
c.Assert(agent1.ID, Equals, 1)
agent2 := <-pc
c.Assert(agent2.ID, Equals, 2)
for i := 0; i < burst; i++ {
agent := <-pc
c.Assert(agent.ID, Equals, i)
}
select {
case <-pc:
c.FailNow()
default:
}

GetAgentPool().Recycle(agent1)
agent := <-pc
c.Assert(agent.ID, Equals, 1)
GetAgentPool().Recycle(agent2)
GetAgentPool().Recycle(agent)
for i := 0; i < rate; i++ {
select {
case agent := <-pc:
c.Assert(agent.ID, Equals, i+burst)
case <-time.After(time.Millisecond * 150):
// add 50ms time drift here
c.FailNow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add some failing message to explain what's failing.

}
}
}

func (t *testMaster) testEmit(c *C) {
Expand All @@ -60,7 +69,7 @@ func (t *testMaster) testEmit(c *C) {
worker testWorkerType = 1
)

Emit(func(args ...interface{}) {
Emit(context.Background(), 1, func(args ...interface{}) {
if len(args) != 2 {
c.Fatalf("args count is not 2, args %v", args)
}
Expand All @@ -80,6 +89,22 @@ func (t *testMaster) testEmit(c *C) {
if worker1 != worker {
c.Fatalf("args[1] is not expected worker, args[1] %v vs %v", worker1, worker)
}
}, []interface{}{id, worker}...)
}, func(args ...interface{}) {}, []interface{}{id, worker}...)

counter := 0
ctx, cancel := context.WithCancel(context.Background())
cancel()
Emit(ctx, 1, func(args ...interface{}) {
c.FailNow()
}, func(args ...interface{}) {
if len(args) != 1 {
c.Fatalf("args count is not 1, args %v", args)
}
pCounter, ok := args[0].(*int)
if !ok {
c.Fatalf("args[0] is not *int, args %+v", args)
}
*pCounter++
}, []interface{}{&counter}...)
c.Assert(counter, Equals, 1)
}
Loading