From e9b7cb150bb397944c51a235780dbc5ddd5ff174 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Wed, 30 Nov 2016 12:13:11 -0800 Subject: [PATCH] raft: Avoid returning errors from ProcessRaftMessage If the ProcessRaftMessage RPC returns an error, the client treats that as a potential transport-level error, and tries to reestablish a connection. In some cases this can cause a feedback loop. If ProcessRaftMessage can't successfully check the health of the sending node, it returns an error. That causes the sending node to bounce its outgoing connection, which results in another health check failure. To solve this, only return an error from ProcessRaftMessage when it is necessary to communicate to the client that it has been removed from the cluster. Ideally, I would fix this by having the client check specifically for a transport-level error before bouncing the connection, but there doesn't seem to be a reliable way to do this. Transport errors can end up with many different codes that are commonly returned by RPC handlers, including Internal, Unavailable, FailedPrecondition, DeadlineExceeded, and Cancelled. Signed-off-by: Aaron Lehmann --- manager/state/raft/raft.go | 45 ++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index be3afcf89c..d7ce237200 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -892,18 +892,36 @@ func (n *Node) RemoveMember(ctx context.Context, id uint64) error { return n.removeMember(ctx, id) } +// processRaftMessageLogger is used to lazily create a logger for +// ProcessRaftMessage. Usually nothing will be logged, so it is useful to avoid +// formatting strings and allocating a logger when it won't be used. +func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaftMessageRequest) *logrus.Entry { + fields := logrus.Fields{ + "method": "(*Node).ProcessRaftMessage", + "raft_id": fmt.Sprintf("%x", n.Config.ID), + } + + if msg != nil && msg.Message != nil { + fields["from"] = fmt.Sprintf("%x", msg.Message.From) + } + + return log.G(ctx).WithFields(fields) +} + // ProcessRaftMessage calls 'Step' which advances the // raft state machine with the provided message on the // receiving node func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) { if msg == nil || msg.Message == nil { - return nil, grpc.Errorf(codes.InvalidArgument, "no message provided") + n.processRaftMessageLogger(ctx, msg).Debug("received empty message") + return &api.ProcessRaftMessageResponse{}, nil } // Don't process the message if this comes from // a node in the remove set if n.cluster.IsIDRemoved(msg.Message.From) { - return nil, ErrMemberRemoved + n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member") + return nil, grpc.Errorf(codes.NotFound, "%s", ErrMemberRemoved.Error()) } var sourceHost string @@ -921,16 +939,16 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa if msg.Message.Type == raftpb.MsgVote { member := n.cluster.GetMember(msg.Message.From) if member == nil || member.Conn == nil { - log.G(ctx).Errorf("received vote request from unknown member %x", msg.Message.From) - return nil, ErrMemberUnknown + n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member") + return &api.ProcessRaftMessageResponse{}, nil } healthCtx, cancel := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval) defer cancel() if err := member.HealthCheck(healthCtx); err != nil { - log.G(ctx).WithError(err).Warningf("member %x which sent vote request failed health check", msg.Message.From) - return nil, errors.Wrap(err, "member unreachable") + n.processRaftMessageLogger(ctx, msg).Debug("member which sent vote request failed health check") + return &api.ProcessRaftMessageResponse{}, nil } } @@ -939,19 +957,18 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa // current architecture depends on only the leader // making proposals, so in-flight proposals can be // guaranteed not to conflict. - return nil, grpc.Errorf(codes.InvalidArgument, "proposals not accepted") + n.processRaftMessageLogger(ctx, msg).Debug("dropped forwarded proposal") + return &api.ProcessRaftMessageResponse{}, nil } // can't stop the raft node while an async RPC is in progress n.stopMu.RLock() defer n.stopMu.RUnlock() - if !n.IsMember() { - return nil, ErrNoRaftMember - } - - if err := n.raftNode.Step(ctx, *msg.Message); err != nil { - return nil, err + if n.IsMember() { + if err := n.raftNode.Step(ctx, *msg.Message); err != nil { + n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed") + } } return &api.ProcessRaftMessageResponse{}, nil @@ -1337,7 +1354,7 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership. _, err := api.NewRaftClient(conn.Conn).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m}) if err != nil { - if grpc.ErrorDesc(err) == ErrMemberRemoved.Error() { + if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == ErrMemberRemoved.Error() { n.removeRaftFunc() } if m.Type == raftpb.MsgSnap {