Browse Source

Merge pull request #2167 from bdarnell/send-after-response

raft: Send any waiting appends after receiving MsgAppResp.
Xiang Li 11 years ago
parent
commit
67d141a0af
2 changed files with 71 additions and 1 deletions
  1. 5 0
      raft/raft.go
  2. 66 1
      raft/raft_test.go

+ 5 - 0
raft/raft.go

@@ -491,9 +491,14 @@ func stepLeader(r *raft, m pb.Message) {
 				r.sendAppend(m.From)
 			}
 		} else {
+			oldWait := r.prs[m.From].shouldWait()
 			r.prs[m.From].update(m.Index)
 			if r.maybeCommit() {
 				r.bcastAppend()
+			} else if oldWait {
+				// update() reset the wait state on this node. If we had delayed sending
+				// an update before, send it now.
+				r.sendAppend(m.From)
 			}
 		}
 	case pb.MsgHeartbeatResp:

+ 66 - 1
raft/raft_test.go

@@ -856,17 +856,82 @@ func TestHandleHeartbeatResp(t *testing.T) {
 		Type:  pb.MsgAppResp,
 		Index: msgs[1].Index + uint64(len(msgs[1].Entries)),
 	})
+	// Consume the message sent in response to MsgAppResp
+	sm.readMessages()
+
 	sm.bcastHeartbeat() // reset wait state
 	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
 	msgs = sm.readMessages()
 	if len(msgs) != 1 {
-		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
+		t.Fatalf("len(msgs) = %d, want 1: %+v", len(msgs), msgs)
 	}
 	if msgs[0].Type != pb.MsgHeartbeat {
 		t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type)
 	}
 }
 
+// TestMsgAppRespWaitReset verifies the waitReset behavior of a leader
+// MsgAppResp.
+func TestMsgAppRespWaitReset(t *testing.T) {
+	sm := newRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage(), 0)
+	sm.becomeCandidate()
+	sm.becomeLeader()
+
+	// The new leader has just emitted a new Term 4 entry; consume those messages
+	// from the outgoing queue.
+	sm.bcastAppend()
+	sm.readMessages()
+
+	// Node 2 acks the first entry, making it committed.
+	sm.Step(pb.Message{
+		From:  2,
+		Type:  pb.MsgAppResp,
+		Index: 1,
+	})
+	if sm.Commit != 1 {
+		t.Fatalf("expected Commit to be 1, got %d", sm.Commit)
+	}
+	// Also consume the MsgApp messages that update Commit on the followers.
+	sm.readMessages()
+
+	// A new command is now proposed on node 1.
+	sm.Step(pb.Message{
+		From:    1,
+		Type:    pb.MsgProp,
+		Entries: []pb.Entry{{}},
+	})
+
+	// The command is broadcast to all nodes not in the wait state.
+	// Node 2 left the wait state due to its MsgAppResp, but node 3 is still waiting.
+	msgs := sm.readMessages()
+	if len(msgs) != 1 {
+		t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs)
+	}
+	if msgs[0].Type != pb.MsgApp || msgs[0].To != 2 {
+		t.Errorf("expected MsgApp to node 2, got %s to %d", msgs[0].Type, msgs[0].To)
+	}
+	if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 {
+		t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries)
+	}
+
+	// Now Node 3 acks the first entry. This releases the wait and entry 2 is sent.
+	sm.Step(pb.Message{
+		From:  3,
+		Type:  pb.MsgAppResp,
+		Index: 1,
+	})
+	msgs = sm.readMessages()
+	if len(msgs) != 1 {
+		t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs)
+	}
+	if msgs[0].Type != pb.MsgApp || msgs[0].To != 3 {
+		t.Errorf("expected MsgApp to node 3, got %s to %d", msgs[0].Type, msgs[0].To)
+	}
+	if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 {
+		t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries)
+	}
+}
+
 func TestRecvMsgVote(t *testing.T) {
 	tests := []struct {
 		state   StateType