Skip to content

Commit

Permalink
raft: Avoid returning errors from ProcessRaftMessage
Browse files Browse the repository at this point in the history
If the ProcessRaftMessage RPC returns an error, the client treats that
as a potential transport-level error, and tries to reestablish a
connection.

In some cases this can cause a feedback loop. If ProcessRaftMessage
can't successfully check the health of the sending node, it returns an
error. That causes the sending node to bounce its outgoing connection,
which results in another health check failure.

To solve this, only return an error from ProcessRaftMessage when it is
necessary to communicate to the client that it has been removed from the
cluster. Ideally, I would fix this by having the client check
specifically for a transport-level error before bouncing the connection,
but there doesn't seem to be a reliable way to do this. Transport errors
can end up with many different codes that are commonly returned by RPC
handlers, including Internal, Unavailable, FailedPrecondition,
DeadlineExceeded, and Cancelled.
  • Loading branch information
aaronlehmann committed Nov 30, 2016
1 parent a1801a7 commit a3f7705
Showing 1 changed file with 8 additions and 12 deletions.
20 changes: 8 additions & 12 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,13 +897,13 @@ func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if msg == nil || msg.Message == nil {
return nil, grpc.Errorf(codes.InvalidArgument, "no message provided")
return &api.ProcessRaftMessageResponse{}, nil
}

// Don't process the message if this comes from
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
return nil, ErrMemberRemoved
return nil, grpc.Errorf(codes.NotFound, "%s", ErrMemberRemoved.Error())
}

var sourceHost string
Expand All @@ -922,15 +922,15 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
member := n.cluster.GetMember(msg.Message.From)
if member == nil || member.Conn == nil {
log.G(ctx).Errorf("received vote request from unknown member %x", msg.Message.From)
return nil, ErrMemberUnknown
return &api.ProcessRaftMessageResponse{}, nil
}

healthCtx, cancel := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
defer cancel()

if err := member.HealthCheck(healthCtx); err != nil {
log.G(ctx).WithError(err).Warningf("member %x which sent vote request failed health check", msg.Message.From)
return nil, errors.Wrap(err, "member unreachable")
return &api.ProcessRaftMessageResponse{}, nil
}
}

Expand All @@ -939,19 +939,15 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
return nil, grpc.Errorf(codes.InvalidArgument, "proposals not accepted")
return &api.ProcessRaftMessageResponse{}, nil
}

// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()

if !n.IsMember() {
return nil, ErrNoRaftMember
}

if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
return nil, err
if n.IsMember() {
n.raftNode.Step(ctx, *msg.Message)
}

return &api.ProcessRaftMessageResponse{}, nil
Expand Down Expand Up @@ -1337,7 +1333,7 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.

_, err := api.NewRaftClient(conn.Conn).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
if err != nil {
if grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
n.removeRaftFunc()
}
if m.Type == raftpb.MsgSnap {
Expand Down

0 comments on commit a3f7705

Please sign in to comment.