Browse Source

raft: resume paused followers on receipt of MsgHeartbeatResp

Previously, paused followers were resumed upon sending a MsgHearbeat.

Fixes #7037
Peter Mattis 9 years ago
parent
commit
e625400f1d
2 changed files with 48 additions and 36 deletions
  1. 1 1
      raft/raft.go
  2. 47 35
      raft/raft_test.go

+ 1 - 1
raft/raft.go

@@ -469,7 +469,6 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
 			continue
 			continue
 		}
 		}
 		r.sendHeartbeat(id, ctx)
 		r.sendHeartbeat(id, ctx)
-		r.prs[id].resume()
 	}
 	}
 }
 }
 
 
@@ -898,6 +897,7 @@ func stepLeader(r *raft, m pb.Message) {
 		}
 		}
 	case pb.MsgHeartbeatResp:
 	case pb.MsgHeartbeatResp:
 		pr.RecentActive = true
 		pr.RecentActive = true
+		pr.resume()
 
 
 		// free one slot for the full inflights window to allow progress.
 		// free one slot for the full inflights window to allow progress.
 		if pr.State == ProgressStateReplicate && pr.ins.full() {
 		if pr.State == ProgressStateReplicate && pr.ins.full() {

+ 47 - 35
raft/raft_test.go

@@ -260,14 +260,20 @@ func TestProgressResume(t *testing.T) {
 	}
 	}
 }
 }
 
 
-// TestProgressResumeByHeartbeat ensures raft.heartbeat reset progress.paused by heartbeat.
-func TestProgressResumeByHeartbeat(t *testing.T) {
+// TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat.
+func TestProgressResumeByHeartbeatResp(t *testing.T) {
 	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 	r.prs[2].Paused = true
 	r.prs[2].Paused = true
 
 
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
+	if !r.prs[2].Paused {
+		t.Errorf("paused = %v, want false", r.prs[2].Paused)
+	}
+
+	r.prs[2].becomeReplicate()
+	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
 	if r.prs[2].Paused {
 	if r.prs[2].Paused {
 		t.Errorf("paused = %v, want false", r.prs[2].Paused)
 		t.Errorf("paused = %v, want false", r.prs[2].Paused)
 	}
 	}
@@ -1139,44 +1145,29 @@ func TestHandleHeartbeatResp(t *testing.T) {
 		t.Errorf("type = %v, want MsgApp", msgs[0].Type)
 		t.Errorf("type = %v, want MsgApp", msgs[0].Type)
 	}
 	}
 
 
-	// A second heartbeat response with no AppResp does not re-send because we are in the wait state.
-	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
-	msgs = sm.readMessages()
-	if len(msgs) != 0 {
-		t.Fatalf("len(msgs) = %d, want 0", len(msgs))
-	}
-
-	// Send a heartbeat to reset the wait state; next heartbeat will re-send MsgApp.
-	sm.bcastHeartbeat()
+	// A second heartbeat response generates another MsgApp re-send
 	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
 	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
 	msgs = sm.readMessages()
 	msgs = sm.readMessages()
-	if len(msgs) != 2 {
-		t.Fatalf("len(msgs) = %d, want 2", len(msgs))
-	}
-	if msgs[0].Type != pb.MsgHeartbeat {
-		t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type)
+	if len(msgs) != 1 {
+		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
 	}
 	}
-	if msgs[1].Type != pb.MsgApp {
-		t.Errorf("type = %v, want MsgApp", msgs[1].Type)
+	if msgs[0].Type != pb.MsgApp {
+		t.Errorf("type = %v, want MsgApp", msgs[0].Type)
 	}
 	}
 
 
 	// Once we have an MsgAppResp, heartbeats no longer send MsgApp.
 	// Once we have an MsgAppResp, heartbeats no longer send MsgApp.
 	sm.Step(pb.Message{
 	sm.Step(pb.Message{
 		From:  2,
 		From:  2,
 		Type:  pb.MsgAppResp,
 		Type:  pb.MsgAppResp,
-		Index: msgs[1].Index + uint64(len(msgs[1].Entries)),
+		Index: msgs[0].Index + uint64(len(msgs[0].Entries)),
 	})
 	})
 	// Consume the message sent in response to MsgAppResp
 	// Consume the message sent in response to MsgAppResp
 	sm.readMessages()
 	sm.readMessages()
 
 
-	sm.bcastHeartbeat() // reset wait state
 	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
 	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
 	msgs = sm.readMessages()
 	msgs = sm.readMessages()
-	if len(msgs) != 1 {
-		t.Fatalf("len(msgs) = %d, want 1: %+v", len(msgs), msgs)
-	}
-	if msgs[0].Type != pb.MsgHeartbeat {
-		t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type)
+	if len(msgs) != 0 {
+		t.Fatalf("len(msgs) = %d, want 0: %+v", len(msgs), msgs)
 	}
 	}
 }
 }
 
 
@@ -1988,15 +1979,19 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 
 
 	// each round is a heartbeat
 	// each round is a heartbeat
 	for i := 0; i < 3; i++ {
 	for i := 0; i < 3; i++ {
-		// we expect that raft will only send out one msgAPP per heartbeat timeout
-		r.appendEntry(pb.Entry{Data: []byte("somedata")})
-		r.sendAppend(2)
-		msg := r.readMessages()
-		if len(msg) != 1 {
-			t.Errorf("len(msg) = %d, want %d", len(msg), 1)
-		}
-		if msg[0].Index != 0 {
-			t.Errorf("index = %d, want %d", msg[0].Index, 0)
+		if i == 0 {
+			// we expect that raft will only send out one msgAPP on the first
+			// loop. After that, the follower is paused until a heartbeat response is
+			// received.
+			r.appendEntry(pb.Entry{Data: []byte("somedata")})
+			r.sendAppend(2)
+			msg := r.readMessages()
+			if len(msg) != 1 {
+				t.Errorf("len(msg) = %d, want %d", len(msg), 1)
+			}
+			if msg[0].Index != 0 {
+				t.Errorf("index = %d, want %d", msg[0].Index, 0)
+			}
 		}
 		}
 
 
 		if !r.prs[2].Paused {
 		if !r.prs[2].Paused {
@@ -2014,8 +2009,12 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 		for j := 0; j < r.heartbeatTimeout; j++ {
 		for j := 0; j < r.heartbeatTimeout; j++ {
 			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 		}
 		}
+		if !r.prs[2].Paused {
+			t.Errorf("paused = %v, want true", r.prs[2].Paused)
+		}
+
 		// consume the heartbeat
 		// consume the heartbeat
-		msg = r.readMessages()
+		msg := r.readMessages()
 		if len(msg) != 1 {
 		if len(msg) != 1 {
 			t.Errorf("len(msg) = %d, want %d", len(msg), 1)
 			t.Errorf("len(msg) = %d, want %d", len(msg), 1)
 		}
 		}
@@ -2023,6 +2022,19 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 			t.Errorf("type = %v, want %v", msg[0].Type, pb.MsgHeartbeat)
 			t.Errorf("type = %v, want %v", msg[0].Type, pb.MsgHeartbeat)
 		}
 		}
 	}
 	}
+
+	// a heartbeat response will allow another message to be sent
+	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
+	msg := r.readMessages()
+	if len(msg) != 1 {
+		t.Errorf("len(msg) = %d, want %d", len(msg), 1)
+	}
+	if msg[0].Index != 0 {
+		t.Errorf("index = %d, want %d", msg[0].Index, 0)
+	}
+	if !r.prs[2].Paused {
+		t.Errorf("paused = %v, want true", r.prs[2].Paused)
+	}
 }
 }
 
 
 func TestSendAppendForProgressReplicate(t *testing.T) {
 func TestSendAppendForProgressReplicate(t *testing.T) {