Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trainer library discover master by etcd #2551

Merged
merged 17 commits into from
Jun 29, 2017

Conversation

Yancey1989
Copy link
Contributor

Fixed #2514

Copy link
Contributor

@helinwang helinwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great PR! Could you move the code to /~https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/c/client.go#L58 ? We already have function paddle_new_master_client which takes master address rather than etcd address, we can just add a new function paddle_new_etcd_master_client. Sorry that I have not mentioned to you earlier!

Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #2559?


I think this PR will implement in the master client with paddle_new_etcd_master_client

--Yancey1989

@Yancey1989
Copy link
Contributor Author

Updated with @helinwang 's comment.


mAddr := kvs[0].Value
log.Debugf("Fetched master address: %s\n", mAddr)
return NewClient(MasterAddresser(mAddr), bufSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you take a look at this piece of code: /~https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/client.go#L80

The idea is the client takes a Addresser and query it to know if it have changed.
This current implementation is create a "constant" Addresser that returns the same address every time. The thing is master server could be killed and restarts with a different address.

So I think a better approach could be this etcd query function be the implementation of the Addresser interface. So the master server will know the updated address when calling Address.

Another detail is when implementing the Addresser, it don't need to create a client in every call to Address, it should reuse the same client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, move GET operator to Address method.

@@ -20,6 +25,51 @@ type Client struct {
ch chan []byte
}

// MasterAddresser provide master address
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment function name must be the same to function name masterAddresser

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Delete MasterAddresser.

time.Sleep(m.timeout)
continue
}
// reconnect to etcd server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seem that etcd have a some kind of retry method for grpc but don't know how to use yet, may be help here /~https://github.com/coreos/etcd/blob/master/clientv3/retry.go#L28

func (m masterAddresser) Address() string {
for {
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
resp, err := m.client.Get(ctx, masterAddrPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clients should use watch to retrieve notification about master node events. masterMonitor should also change to handle these events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clients should use watch to retrieve notification about master node events
I think it's a good idea, it will reduce the count for GET operator to etcd.

masterMonitor should also change to handle these events

I do not think masterMonitor will handle the events, we can handle these in a new method named masterAddresser.init(), and masterAddresser.Addresser() only return the latest master address.

Copy link
Contributor

@helinwang helinwang Jun 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally thought client can poll etcd to get the latest master address. But I think @typhoonzero 's idea is better.
Because Addresser is an polling interface, to get notification about the change rather than polling to get the change. We need to change the interface. One possible solution could be:
Change:

func NewClient(addr Addresser, bufSize int)

To:

func NewClient(addrCh <-chan string, bufSize int)

And maybe change moniterMaster to:

func (c *Client) monitorMaster(addrCh <-chan string) {
        lastMaster := ""
        for curMaster := range addrCh {
                // connect to the new address once address changed.                                                                                                                                                                         
                if curMaster != lastMaster {
                        if curMaster == "" {
                                err := c.conn.Close()
                                if err != nil {
                                        log.Errorln(err)
                                }
                        } else {
                                err := c.conn.Connect(curMaster)
                                if err != nil {
                                        log.Errorln(err)

                                        // connect to addr failed, set                                                                                                                                                                      
                                        // to last known addr in order                                                                                                                                                                      
                                        // to retry next time.                                                                                                                                                                              
                                        curMaster = lastMaster
                                }

                        }
                }
	        lastMaster = curMaster
        }
}

Copy link
Contributor Author

@Yancey1989 Yancey1989 Jun 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
I think we also need an interface with etcd operator, so that we can do some unit test easier, I'll add this feature in the next PR.

Copy link
Contributor

@helinwang helinwang Jun 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 Thanks!
We can use interface to make unit test easier. For example, the master server is taking an interface Store as the parameter: /~https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/service.go#L94.
The definition of store is:

type Store interface {
	Save([]byte) error
	Load() ([]byte, error)
}

We have an implementation of Store using etcd. But to test the master server, it's easy, we don't to run a etcd cluster. We can just write a stub interface for Store.
However this only tests master service. Does not test the etcd Store implementation. But that's a good start.

@typhoonzero typhoonzero mentioned this pull request Jun 26, 2017
go/.gitignore Outdated
@@ -0,0 +1,4 @@
CMakeCache.txt
Copy link
Member

@jacquesqiao jacquesqiao Jun 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this files? Since the build result is now moved to build dir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.


// Client is the client of the master server.
type Client struct {
conn *connection.Conn
ch chan []byte
}

// EtcdClient is the client of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete this comment

Copy link
Contributor Author

@Yancey1989 Yancey1989 Jun 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Move to etcd_client.go

//export paddle_new_etcd_master_client
func paddle_new_etcd_master_client(etcdEndpoints *C.char, bufSize int) C.paddle_master_client {
p := C.GoString(etcdEndpoints)
c := master.NewEtcdClient(addresser(p), bufSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just c := master.NewEtcdClient(p), bufSize) (remove addresser)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if curMaster != lastMaster {
if curMaster == "" {
err := c.conn.Close()
fmt.Printf("close conn error: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use log.Errorf instead of fmt.Printf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

})
if err != nil {
log.Errorf("Init etcd connection failed: %v", err)
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please return error here instead of panic, and don't need to log error. Caller will get the error and log it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
etcdClient.ch = make(chan string)
c := NewClient(etcdClient.ch, bufSize)
//go etcdClient.monitorMasterAddr()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No commented out code please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// Client is the client of the master server.
type Client struct {
conn *connection.Conn
ch chan []byte
}

// EtcdClient is the client of
type EtcdClient struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps put EtcdClient into etcd_client.go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

e.ch <- mAddr
break
}
fmt.Println("init master addr finished.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use log rather than fmt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

fmt.Println("init master addr finished.")
}
func (e *EtcdClient) monitorMasterAddr() {
rch := e.client.Watch(context.Background(), masterAddrPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

第一次调用watch的时候能返回现有的值吗,如果可以的话,就不需要initMasterAddr这个函数了。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看起来是不行的,还是得先Getwatch
initMasterAddr 改为了 Get这样更通用一些。

func paddle_new_etcd_master_client(etcdEndpoints *C.char, bufSize int) C.paddle_master_client {
p := C.GoString(etcdEndpoints)
c := master.NewEtcdClient(addresser(p), bufSize)
return add(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉把NewEtcdClient里面的c := NewClient(etcdClient.ch, bufSize)放在这里比较好。这样master.EtcdClient就和master.Client不耦合了。EtcdClient负责和etcd交互,Client只读chan string里的新地址。这样unit test也比较好写。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
直接用了etcd_clint.go中的EtcdClient,因为想要复用NewEtcdClient所以将原有初始化时写入master地址的逻辑独立到一个Put函数中。

}

//export paddle_new_master_client
func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client {
a := C.GoString(addr)
c := master.NewClient(addresser(a), bufSize)
ch := make(chan string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use:

ch := make(chan string, 1)
ch <- a
c := master.NewClient(ch, bufSize)

Otherwise this function will not return unless master.Client read the value. We don't want to add unnecessary requirement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comment, thanks @helinwang !!
Done.

@@ -82,9 +75,11 @@ func TestGetFinishTask(t *testing.T) {
// Manually intialize client to avoid calling c.getRecords()
c := &Client{}
c.conn = connection.New()
go c.monitorMaster(TestAddresser(fmt.Sprintf(":%d", p)))
addr := fmt.Sprintf(":%d", p)
ch := make(chan string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe ch := make(chan string, 1) is better, so ch <- addr does not need to wait for channel to be read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -63,10 +74,10 @@ func TestNextRecord(t *testing.T) {
}
w.Close()
f.Close()

c := master.NewClient(master.TestAddresser(fmt.Sprintf(":%d", p)), 10)
curAddr := make(chan string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe curAddr := make(chan string, 1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

go c.getRecords()
return c
}

// NewEtcdMasterClient creates a new Client by etcd
func NewEtcdMasterClient(db DatabaseOperator, bufSize int) *Client {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

db is only used here to create ch, which is used as input for NewClient. this is a sign that maybe we don't need the function NewEtcdMasterClient.
I think we need to follow rule: less code is better.
Here is a possible way to do it:

func paddle_new_master_client(...) {
  p := C.GoString(etcdEndpoints)
  e, err := master.NewEtcdClientWithoutLock(strings.Split(p, ","))
  if err != nil {
    panic(err)
  }
  ch := make(chan string, 1)
  ch <- string(db.WaitMasterReady(DefaultAddrPath, 3))
  go db.WatchWithKey(DefaultAddrPath, ch)
  c := master.NewClient(ch, bufSize)
  return add(c)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Delete function NewEtcdMasterClient


// EtcdClientWithoutLock is the etcd client the master users
// for service discovery
type EtcdClientWithoutLock struct {
Copy link
Contributor

@helinwang helinwang Jun 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EtcdClientWithoutLock is a very confusing name, it's hard to figure out what does "WithoutLock" means.
Considering the two function are only few lines, maybe just remove this type, and add two helper functions in go/master/etcd_client.go:

func GetKey(c *clientv3.Client, key string) string {
  // ...
}

func WatchKey(c *clientv3.Client, key string, ch chan<- string) {
  // ...
}

So paddle_new_etcd_master_client could be changed to:

func paddle_new_etcd_master_client(...) {
  p := C.GoString(etcdEndpoints)
  cli, err := clientv3.New(...)
  if err != nil {
    panic(err)
  }
  ch := make(chan string, 1)
  ch <- master.GetKey(cli, DefaultAddrPath)
  go master.WatchKey(cli, DefaultAddrPath, ch)
  c := master.NewClient(ch, bufSize)
  return add(c)
}

Just an idea, feel free to modify!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
WaitMasterReady => GetKey, and I also think this function does not need to be blocked, if the master address is empty, the function monitorMaster will not connect to master.

@@ -142,3 +172,38 @@ func (e *EtcdClient) Load() ([]byte, error) {
state := kvs[0].Value
return state, nil
}

// WaitMasterReady will wait for master is ready
func (e *EtcdClientWithoutLock) WaitMasterReady(key string, timeout int) []byte {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WaitMasterReady is very long, maybe Getkey would be sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// WatchWithKey watch the specify key and send to valChan
func (e *EtcdClientWithoutLock) WatchWithKey(key string, valChan chan string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WatchWithKey is very long, maybe WatchKey would be sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// WatchWithKey watch the specify key and send to valChan
func (e *EtcdClientWithoutLock) WatchWithKey(key string, valChan chan string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valChan chan string can be changed to valChan chan<- string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -18,6 +18,19 @@ const (
DefaultAddrPath = "/master/addr"
)

// DatabaseOperator is an interface fo database operator,
// it's useful for unittest
type DatabaseOperator interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my other comment on "removing NewEtcdMasterClient". Maybe we don't need this interface anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@helinwang helinwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM++ after the doc string has been fixed.

@@ -27,7 +27,7 @@ type EtcdClient struct {
lock *concurrency.Mutex
}

// NewEtcdClient creates a new EtcdClient.
// NewEtcdClient creates a new EtcdClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subsequent sentences and/or paragraphs can give more details. Sentences should be properly punctuated.

/~https://github.com/golang/go/wiki/Comments

The comment should be a sentence (ends with ".").

@@ -142,3 +142,31 @@ func (e *EtcdClient) Load() ([]byte, error) {
state := kvs[0].Value
return state, nil
}

// GetKey gets the value by the specify key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subsequent sentences and/or paragraphs can give more details. Sentences should be properly punctuated.

/~https://github.com/golang/go/wiki/Comments

The comment should be a sentence (ends with ".").

return string(v), nil
}

// WatchKey watches the specify key and send to valChan if there is some event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subsequent sentences and/or paragraphs can give more details. Sentences should be properly punctuated.

/~https://github.com/golang/go/wiki/Comments

The comment should be a sentence (ends with ".").

@Yancey1989 Yancey1989 merged commit 9af8d86 into PaddlePaddle:develop Jun 29, 2017
@Yancey1989 Yancey1989 deleted the trainer_client branch June 29, 2017 05:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants