-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Trainer library discover master by etcd #2551
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great PR! Could you move the code to /~https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/c/client.go#L58 ? We already have function paddle_new_master_client
which takes master address rather than etcd address, we can just add a new function paddle_new_etcd_master_client
. Sorry that I have not mentioned to you earlier!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to #2559?
I think this PR will implement in the master client with paddle_new_etcd_master_client
--Yancey1989
Updated with @helinwang 's comment. |
go/master/client.go
Outdated
|
||
mAddr := kvs[0].Value | ||
log.Debugf("Fetched master address: %s\n", mAddr) | ||
return NewClient(MasterAddresser(mAddr), bufSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you take a look at this piece of code: /~https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/client.go#L80
The idea is the client takes a Addresser
and query it to know if it have changed.
This current implementation is create a "constant" Addresser
that returns the same address every time. The thing is master server could be killed and restarts with a different address.
So I think a better approach could be this etcd query function be the implementation of the Addresser
interface. So the master server will know the updated address when calling Address
.
Another detail is when implementing the Addresser
, it don't need to create a client in every call to Address
, it should reuse the same client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, move GET
operator to Address
method.
go/master/client.go
Outdated
@@ -20,6 +25,51 @@ type Client struct { | |||
ch chan []byte | |||
} | |||
|
|||
// MasterAddresser provide master address |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment function name must be the same to function name masterAddresser
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Delete MasterAddresser
.
go/master/client.go
Outdated
time.Sleep(m.timeout) | ||
continue | ||
} | ||
// reconnect to etcd server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seem that etcd have a some kind of retry method for grpc
but don't know how to use yet, may be help here /~https://github.com/coreos/etcd/blob/master/clientv3/retry.go#L28
go/master/client.go
Outdated
func (m masterAddresser) Address() string { | ||
for { | ||
ctx, cancel := context.WithTimeout(context.Background(), m.timeout) | ||
resp, err := m.client.Get(ctx, masterAddrPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clients should use watch
to retrieve notification about master node events. masterMonitor
should also change to handle these events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clients should use watch to retrieve notification about master node events
I think it's a good idea, it will reduce the count forGET
operator to etcd.
masterMonitor should also change to handle these events
I do not think masterMonitor
will handle the events, we can handle these in a new method named masterAddresser.init()
, and masterAddresser.Addresser()
only return the latest master address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally thought client can poll etcd to get the latest master address. But I think @typhoonzero 's idea is better.
Because Addresser
is an polling interface, to get notification about the change rather than polling to get the change. We need to change the interface. One possible solution could be:
Change:
func NewClient(addr Addresser, bufSize int)
To:
func NewClient(addrCh <-chan string, bufSize int)
And maybe change moniterMaster to:
func (c *Client) monitorMaster(addrCh <-chan string) {
lastMaster := ""
for curMaster := range addrCh {
// connect to the new address once address changed.
if curMaster != lastMaster {
if curMaster == "" {
err := c.conn.Close()
if err != nil {
log.Errorln(err)
}
} else {
err := c.conn.Connect(curMaster)
if err != nil {
log.Errorln(err)
// connect to addr failed, set
// to last known addr in order
// to retry next time.
curMaster = lastMaster
}
}
}
lastMaster = curMaster
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
I think we also need an interface with etcd operator, so that we can do some unit test easier, I'll add this feature in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yancey1989 Thanks!
We can use interface to make unit test easier. For example, the master server is taking an interface Store
as the parameter: /~https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/service.go#L94.
The definition of store is:
type Store interface {
Save([]byte) error
Load() ([]byte, error)
}
We have an implementation of Store
using etcd. But to test the master server, it's easy, we don't to run a etcd cluster. We can just write a stub interface for Store
.
However this only tests master service. Does not test the etcd Store
implementation. But that's a good start.
go/.gitignore
Outdated
@@ -0,0 +1,4 @@ | |||
CMakeCache.txt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need this files? Since the build result is now moved to build dir
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done.
go/master/client.go
Outdated
|
||
// Client is the client of the master server. | ||
type Client struct { | ||
conn *connection.Conn | ||
ch chan []byte | ||
} | ||
|
||
// EtcdClient is the client of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
complete this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Move to etcd_client.go
go/master/c/client.go
Outdated
//export paddle_new_etcd_master_client | ||
func paddle_new_etcd_master_client(etcdEndpoints *C.char, bufSize int) C.paddle_master_client { | ||
p := C.GoString(etcdEndpoints) | ||
c := master.NewEtcdClient(addresser(p), bufSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just c := master.NewEtcdClient(p), bufSize)
(remove addresser
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
if curMaster != lastMaster { | ||
if curMaster == "" { | ||
err := c.conn.Close() | ||
fmt.Printf("close conn error: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use log.Errorf
instead of fmt.Printf
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
}) | ||
if err != nil { | ||
log.Errorf("Init etcd connection failed: %v", err) | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please return error here instead of panic, and don't need to log error. Caller will get the error and log it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
} | ||
etcdClient.ch = make(chan string) | ||
c := NewClient(etcdClient.ch, bufSize) | ||
//go etcdClient.monitorMasterAddr() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No commented out code please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
|
||
// Client is the client of the master server. | ||
type Client struct { | ||
conn *connection.Conn | ||
ch chan []byte | ||
} | ||
|
||
// EtcdClient is the client of | ||
type EtcdClient struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps put EtcdClient
into etcd_client.go
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
e.ch <- mAddr | ||
break | ||
} | ||
fmt.Println("init master addr finished.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use log rather than fmt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
fmt.Println("init master addr finished.") | ||
} | ||
func (e *EtcdClient) monitorMasterAddr() { | ||
rch := e.client.Watch(context.Background(), masterAddrPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
第一次调用watch
的时候能返回现有的值吗,如果可以的话,就不需要initMasterAddr
这个函数了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
看起来是不行的,还是得先Get
再watch
。
initMasterAddr
改为了 Get
这样更通用一些。
go/master/c/client.go
Outdated
func paddle_new_etcd_master_client(etcdEndpoints *C.char, bufSize int) C.paddle_master_client { | ||
p := C.GoString(etcdEndpoints) | ||
c := master.NewEtcdClient(addresser(p), bufSize) | ||
return add(c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感觉把NewEtcdClient
里面的c := NewClient(etcdClient.ch, bufSize)
放在这里比较好。这样master.EtcdClient就和master.Client不耦合了。EtcdClient负责和etcd交互,Client只读chan string里的新地址。这样unit test也比较好写。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
直接用了etcd_clint.go中的EtcdClient
,因为想要复用NewEtcdClient
所以将原有初始化时写入master地址的逻辑独立到一个Put
函数中。
go/master/c/client.go
Outdated
} | ||
|
||
//export paddle_new_master_client | ||
func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client { | ||
a := C.GoString(addr) | ||
c := master.NewClient(addresser(a), bufSize) | ||
ch := make(chan string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better use:
ch := make(chan string, 1)
ch <- a
c := master.NewClient(ch, bufSize)
Otherwise this function will not return unless master.Client read the value. We don't want to add unnecessary requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good comment, thanks @helinwang !!
Done.
go/master/client_internal_test.go
Outdated
@@ -82,9 +75,11 @@ func TestGetFinishTask(t *testing.T) { | |||
// Manually intialize client to avoid calling c.getRecords() | |||
c := &Client{} | |||
c.conn = connection.New() | |||
go c.monitorMaster(TestAddresser(fmt.Sprintf(":%d", p))) | |||
addr := fmt.Sprintf(":%d", p) | |||
ch := make(chan string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe ch := make(chan string, 1)
is better, so ch <- addr
does not need to wait for channel to be read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client_test.go
Outdated
@@ -63,10 +74,10 @@ func TestNextRecord(t *testing.T) { | |||
} | |||
w.Close() | |||
f.Close() | |||
|
|||
c := master.NewClient(master.TestAddresser(fmt.Sprintf(":%d", p)), 10) | |||
curAddr := make(chan string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe curAddr := make(chan string, 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/client.go
Outdated
go c.getRecords() | ||
return c | ||
} | ||
|
||
// NewEtcdMasterClient creates a new Client by etcd | ||
func NewEtcdMasterClient(db DatabaseOperator, bufSize int) *Client { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
db
is only used here to create ch
, which is used as input for NewClient
. this is a sign that maybe we don't need the function NewEtcdMasterClient
.
I think we need to follow rule: less code is better.
Here is a possible way to do it:
func paddle_new_master_client(...) {
p := C.GoString(etcdEndpoints)
e, err := master.NewEtcdClientWithoutLock(strings.Split(p, ","))
if err != nil {
panic(err)
}
ch := make(chan string, 1)
ch <- string(db.WaitMasterReady(DefaultAddrPath, 3))
go db.WatchWithKey(DefaultAddrPath, ch)
c := master.NewClient(ch, bufSize)
return add(c)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Delete function NewEtcdMasterClient
go/master/etcd_client.go
Outdated
|
||
// EtcdClientWithoutLock is the etcd client the master users | ||
// for service discovery | ||
type EtcdClientWithoutLock struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EtcdClientWithoutLock
is a very confusing name, it's hard to figure out what does "WithoutLock" means.
Considering the two function are only few lines, maybe just remove this type, and add two helper functions in go/master/etcd_client.go
:
func GetKey(c *clientv3.Client, key string) string {
// ...
}
func WatchKey(c *clientv3.Client, key string, ch chan<- string) {
// ...
}
So paddle_new_etcd_master_client
could be changed to:
func paddle_new_etcd_master_client(...) {
p := C.GoString(etcdEndpoints)
cli, err := clientv3.New(...)
if err != nil {
panic(err)
}
ch := make(chan string, 1)
ch <- master.GetKey(cli, DefaultAddrPath)
go master.WatchKey(cli, DefaultAddrPath, ch)
c := master.NewClient(ch, bufSize)
return add(c)
}
Just an idea, feel free to modify!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
WaitMasterReady
=> GetKey
, and I also think this function does not need to be blocked, if the master address is empty, the function monitorMaster
will not connect to master.
go/master/etcd_client.go
Outdated
@@ -142,3 +172,38 @@ func (e *EtcdClient) Load() ([]byte, error) { | |||
state := kvs[0].Value | |||
return state, nil | |||
} | |||
|
|||
// WaitMasterReady will wait for master is ready | |||
func (e *EtcdClientWithoutLock) WaitMasterReady(key string, timeout int) []byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WaitMasterReady
is very long, maybe Getkey
would be sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/etcd_client.go
Outdated
} | ||
|
||
// WatchWithKey watch the specify key and send to valChan | ||
func (e *EtcdClientWithoutLock) WatchWithKey(key string, valChan chan string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WatchWithKey
is very long, maybe WatchKey
would be sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/etcd_client.go
Outdated
} | ||
|
||
// WatchWithKey watch the specify key and send to valChan | ||
func (e *EtcdClientWithoutLock) WatchWithKey(key string, valChan chan string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
valChan chan string
can be changed to valChan chan<- string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/master/etcd_client.go
Outdated
@@ -18,6 +18,19 @@ const ( | |||
DefaultAddrPath = "/master/addr" | |||
) | |||
|
|||
// DatabaseOperator is an interface fo database operator, | |||
// it's useful for unittest | |||
type DatabaseOperator interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see my other comment on "removing NewEtcdMasterClient
". Maybe we don't need this interface anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM++ after the doc string has been fixed.
go/master/etcd_client.go
Outdated
@@ -27,7 +27,7 @@ type EtcdClient struct { | |||
lock *concurrency.Mutex | |||
} | |||
|
|||
// NewEtcdClient creates a new EtcdClient. | |||
// NewEtcdClient creates a new EtcdClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subsequent sentences and/or paragraphs can give more details. Sentences should be properly punctuated.
/~https://github.com/golang/go/wiki/Comments
The comment should be a sentence (ends with ".").
go/master/etcd_client.go
Outdated
@@ -142,3 +142,31 @@ func (e *EtcdClient) Load() ([]byte, error) { | |||
state := kvs[0].Value | |||
return state, nil | |||
} | |||
|
|||
// GetKey gets the value by the specify key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subsequent sentences and/or paragraphs can give more details. Sentences should be properly punctuated.
/~https://github.com/golang/go/wiki/Comments
The comment should be a sentence (ends with ".").
go/master/etcd_client.go
Outdated
return string(v), nil | ||
} | ||
|
||
// WatchKey watches the specify key and send to valChan if there is some event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subsequent sentences and/or paragraphs can give more details. Sentences should be properly punctuated.
/~https://github.com/golang/go/wiki/Comments
The comment should be a sentence (ends with ".").
Fixed #2514