Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition (also a regression of the PR 19139) #19221

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,23 @@

Server *etcdserver.EtcdServer

cfg Config
stopc chan struct{}
errc chan error
cfg Config

// closeOnce is to ensure `stopc` is closed only once, no matter
// how many times the Close() method is called.
closeOnce sync.Once
wg sync.WaitGroup
// stopc is used to notify the sub goroutines not to send
// any errors to `errc`.
stopc chan struct{}
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
// errc is used to receive error from sub goroutines (including
// client handler, peer handler and metrics handler). It's closed
// after all these sub goroutines exit (checked via `wg`). Writers
// should avoid writing after `stopc` is closed by selecting on
// reading from `stopc`.
errc chan error

// wg is used to track the lifecycle of all sub goroutines created by `StartEtcd`.
wg sync.WaitGroup
}

type peerListener struct {
Expand Down Expand Up @@ -388,6 +399,24 @@
// Close gracefully shuts down all servers/listeners.
// Client requests will be terminated with request timeout.
// After timeout, enforce remaning requests be closed immediately.
//
// The rough workflow to shut down etcd:
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
// 1. close the `stopc` channel, so that all error handlers (child
// goroutines) won't send back any errors anymore;
// 2. stop the http and grpc servers gracefully, within request timeout;
// 3. close all client and metrics listeners, so that etcd server
// stops receiving any new connection;
// 4. call the cancel function to close the gateway context, so that
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
// all gateway connections are closed.
// 5. stop etcd server gracefully, and ensure the main raft loop
// goroutine is stopped;
// 6. stop all peer listeners, so that it stops receiving peer connections
// and messages (wait up to 1-second);
// 7. wait for all child goroutines (i.e. client handlers, peer handlers
// and metrics handlers) to exit;
// 8. close the `errc` channel to release the resource. Note that it's only
// safe to close the `errc` after step 7 above is done, otherwise the
// child goroutines may send errors back to already closed `errc` channel.
func (e *Etcd) Close() {
fields := []zap.Field{
zap.String("name", e.cfg.Name),
Expand Down Expand Up @@ -607,7 +636,9 @@

// start peer servers in a goroutine
for _, pl := range e.Peers {
e.wg.Add(1)
go func(l *peerListener) {
defer e.wg.Done()
u := l.Addr().String()
e.cfg.logger.Info(
"serving peer traffic",
Expand Down Expand Up @@ -774,7 +805,9 @@

// start client servers in each goroutine
for _, sctx := range e.sctxs {
e.wg.Add(1)
go func(s *serveCtx) {
defer e.wg.Done()
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...))
}(sctx)
}
Expand Down Expand Up @@ -859,7 +892,9 @@
return err
}
e.metricsListeners = append(e.metricsListeners, ml)
e.wg.Add(1)

Check warning on line 895 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L895

Added line #L895 was not covered by tests
go func(u url.URL, ln net.Listener) {
defer e.wg.Done()

Check warning on line 897 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L897

Added line #L897 was not covered by tests
e.cfg.logger.Info(
"serving metrics",
zap.String("address", u.String()),
Expand All @@ -872,9 +907,6 @@
}

func (e *Etcd) errHandler(err error) {
e.wg.Add(1)
defer e.wg.Done()

if err != nil {
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
}
Expand Down
36 changes: 29 additions & 7 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,23 @@
insecure bool
httpOnly bool

// ctx is used to control the grpc gateway. Terminate the grpc gateway
// by calling `cancel` when shutting down the etcd.
ctx context.Context
cancel context.CancelFunc

userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
serversC chan *servers
closeOnce sync.Once

// serversC is used to receive the http and grpc server objects (created
// in `serve`), both of which will be closed when shutting down the etcd.
// Close it when `serve` returns or when etcd fails to bootstrap.
serversC chan *servers
// closeOnce is to ensure `serversC` is closed only once.
closeOnce sync.Once

// wg is used to track the lifecycle of all sub goroutines created by `serve`.
wg sync.WaitGroup
}

type servers struct {
Expand Down Expand Up @@ -182,13 +192,17 @@
server = m.Serve

httpl := m.Match(cmux.HTTP1())
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsLis net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsLis))
}(srv, httpl)

if grpcEnabled {
grpcl := m.Match(cmux.HTTP2())
sctx.wg.Add(1)
go func(gs *grpc.Server, l net.Listener) {
defer sctx.wg.Done()
errHandler(gs.Serve(l))
}(gs, grpcl)
}
Expand Down Expand Up @@ -237,7 +251,7 @@
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := configureHTTPServer(srv, s.Cfg); err != nil {
if err = configureHTTPServer(srv, s.Cfg); err != nil {
sctx.lg.Error("Configure https server failed", zap.Error(err))
return err
}
Expand All @@ -248,11 +262,13 @@
} else {
server = m.Serve

tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if err != nil {
return err
tlsl, tlsErr := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
if tlsErr != nil {
return tlsErr

Check warning on line 267 in server/embed/serve.go

View check run for this annotation

Codecov / codecov/patch

server/embed/serve.go#L267

Added line #L267 was not covered by tests
}
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsl net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsl))
}(srv, tlsl)
}
Expand All @@ -265,7 +281,11 @@
)
}

return server()
err = server()
sctx.close()
// ensure all goroutines, which are created by this method, to complete before this method returns.
sctx.wg.Wait()
return err
}

func configureHTTPServer(srv *http.Server, cfg config.ServerConfig) error {
Expand Down Expand Up @@ -334,7 +354,9 @@
return nil, err
}
}
sctx.wg.Add(1)
go func() {
defer sctx.wg.Done()
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
sctx.lg.Warn(
Expand Down
Loading