Skip to content

Commit

Permalink
raft: Avoid returning errors from ProcessRaftMessage
Browse files Browse the repository at this point in the history
If the ProcessRaftMessage RPC returns an error, the client treats that
as a potential transport-level error, and tries to reestablish a
connection.

In some cases this can cause a feedback loop. If ProcessRaftMessage
can't successfully check the health of the sending node, it returns an
error. That causes the sending node to bounce its outgoing connection,
which results in another health check failure.

To solve this, only return an error from ProcessRaftMessage when it is
necessary to communicate to the client that it has been removed from the
cluster. Ideally, I would fix this by having the client check
specifically for a transport-level error before bouncing the connection,
but there doesn't seem to be a reliable way to do this. Transport errors
can end up with many different codes that are commonly returned by RPC
handlers, including Internal, Unavailable, FailedPrecondition,
DeadlineExceeded, and Cancelled.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Nov 30, 2016
1 parent a1801a7 commit e9b7cb1
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,18 +892,36 @@ func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
return n.removeMember(ctx, id)
}

// processRaftMessageLogger is used to lazily create a logger for
// ProcessRaftMessage. Usually nothing will be logged, so it is useful to avoid
// formatting strings and allocating a logger when it won't be used.
func (n *Node) processRaftMessageLogger(ctx context.Context, msg *api.ProcessRaftMessageRequest) *logrus.Entry {
fields := logrus.Fields{
"method": "(*Node).ProcessRaftMessage",
"raft_id": fmt.Sprintf("%x", n.Config.ID),
}

if msg != nil && msg.Message != nil {
fields["from"] = fmt.Sprintf("%x", msg.Message.From)
}

return log.G(ctx).WithFields(fields)
}

// ProcessRaftMessage calls 'Step' which advances the
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if msg == nil || msg.Message == nil {
return nil, grpc.Errorf(codes.InvalidArgument, "no message provided")
n.processRaftMessageLogger(ctx, msg).Debug("received empty message")
return &api.ProcessRaftMessageResponse{}, nil
}

// Don't process the message if this comes from
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
return nil, ErrMemberRemoved
n.processRaftMessageLogger(ctx, msg).Debug("received message from removed member")
return nil, grpc.Errorf(codes.NotFound, "%s", ErrMemberRemoved.Error())
}

var sourceHost string
Expand All @@ -921,16 +939,16 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
if msg.Message.Type == raftpb.MsgVote {
member := n.cluster.GetMember(msg.Message.From)
if member == nil || member.Conn == nil {
log.G(ctx).Errorf("received vote request from unknown member %x", msg.Message.From)
return nil, ErrMemberUnknown
n.processRaftMessageLogger(ctx, msg).Debug("received message from unknown member")
return &api.ProcessRaftMessageResponse{}, nil
}

healthCtx, cancel := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval)
defer cancel()

if err := member.HealthCheck(healthCtx); err != nil {
log.G(ctx).WithError(err).Warningf("member %x which sent vote request failed health check", msg.Message.From)
return nil, errors.Wrap(err, "member unreachable")
n.processRaftMessageLogger(ctx, msg).Debug("member which sent vote request failed health check")
return &api.ProcessRaftMessageResponse{}, nil
}
}

Expand All @@ -939,19 +957,18 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
return nil, grpc.Errorf(codes.InvalidArgument, "proposals not accepted")
n.processRaftMessageLogger(ctx, msg).Debug("dropped forwarded proposal")
return &api.ProcessRaftMessageResponse{}, nil
}

// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()

if !n.IsMember() {
return nil, ErrNoRaftMember
}

if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
return nil, err
if n.IsMember() {
if err := n.raftNode.Step(ctx, *msg.Message); err != nil {
n.processRaftMessageLogger(ctx, msg).WithError(err).Debug("raft Step failed")
}
}

return &api.ProcessRaftMessageResponse{}, nil
Expand Down Expand Up @@ -1337,7 +1354,7 @@ func (n *Node) sendToMember(ctx context.Context, members map[uint64]*membership.

_, err := api.NewRaftClient(conn.Conn).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
if err != nil {
if grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == ErrMemberRemoved.Error() {
n.removeRaftFunc()
}
if m.Type == raftpb.MsgSnap {
Expand Down

0 comments on commit e9b7cb1

Please sign in to comment.