Browse Source

Merge pull request #3976 from xiang90/snap_fix

Only send snapshot when member is online
Xiang Li 10 years ago
parent
commit
791c5344b1
3 changed files with 53 additions and 11 deletions
  1. 2 2
      raft/progress.go
  2. 9 4
      raft/raft.go
  3. 42 5
      raft/raft_test.go

+ 2 - 2
raft/progress.go

@@ -59,7 +59,7 @@ type Progress struct {
 	// recentActive is true if the progress is recently active. Receiving any messages
 	// from the corresponding follower indicates the progress is active.
 	// recentActive can be reset to false after an election timeout.
-	recentActive bool
+	RecentActive bool
 
 	// inflights is a sliding window for the inflight messages.
 	// When inflights is full, no more message should be sent.
@@ -73,7 +73,7 @@ type Progress struct {
 
 func (pr *Progress) resetState(state ProgressStateType) {
 	pr.Paused = false
-	pr.recentActive = false
+	pr.RecentActive = false
 	pr.PendingSnapshot = 0
 	pr.State = state
 	pr.ins.reset()

+ 9 - 4
raft/raft.go

@@ -275,6 +275,11 @@ func (r *raft) sendAppend(to uint64) {
 	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
 
 	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
+		if !pr.RecentActive {
+			r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
+			return
+		}
+
 		m.Type = pb.MsgSnap
 		snapshot, err := r.raftLog.snapshot()
 		if err != nil {
@@ -600,7 +605,7 @@ func stepLeader(r *raft, m pb.Message) {
 	}
 	switch m.Type {
 	case pb.MsgAppResp:
-		pr.recentActive = true
+		pr.RecentActive = true
 
 		if m.Reject {
 			r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
@@ -635,7 +640,7 @@ func stepLeader(r *raft, m pb.Message) {
 			}
 		}
 	case pb.MsgHeartbeatResp:
-		pr.recentActive = true
+		pr.RecentActive = true
 
 		// free one slot for the full inflights window to allow progress.
 		if pr.State == ProgressStateReplicate && pr.ins.full() {
@@ -867,11 +872,11 @@ func (r *raft) checkQuorumActive() bool {
 			continue
 		}
 
-		if r.prs[id].recentActive {
+		if r.prs[id].RecentActive {
 			act += 1
 		}
 
-		r.prs[id].recentActive = false
+		r.prs[id].RecentActive = false
 	}
 
 	return act >= r.q()

+ 42 - 5
raft/raft_test.go

@@ -1563,8 +1563,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) {
 }
 
 func TestProvideSnap(t *testing.T) {
-	// restore the statemachin from a snapshot
-	// so it has a compacted log and a snapshot
+	// restore the state machine from a snapshot so it has a compacted log and a snapshot
 	s := pb.Snapshot{
 		Metadata: pb.SnapshotMetadata{
 			Index:     11, // magic number
@@ -1579,11 +1578,10 @@ func TestProvideSnap(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeLeader()
 
-	// force set the next of node 1, so that
-	// node 1 needs a snapshot
+	// force set the next of node 2, so that node 2 needs a snapshot
 	sm.prs[2].Next = sm.raftLog.firstIndex()
-
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true})
+
 	msgs := sm.readMessages()
 	if len(msgs) != 1 {
 		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
@@ -1594,6 +1592,35 @@ func TestProvideSnap(t *testing.T) {
 	}
 }
 
+func TestIgnoreProvidingSnap(t *testing.T) {
+	// restore the state machine from a snapshot so it has a compacted log and a snapshot
+	s := pb.Snapshot{
+		Metadata: pb.SnapshotMetadata{
+			Index:     11, // magic number
+			Term:      11, // magic number
+			ConfState: pb.ConfState{Nodes: []uint64{1, 2}},
+		},
+	}
+	storage := NewMemoryStorage()
+	sm := newTestRaft(1, []uint64{1}, 10, 1, storage)
+	sm.restore(s)
+
+	sm.becomeCandidate()
+	sm.becomeLeader()
+
+	// force set the next of node 2, so that node 2 needs a snapshot
+	// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
+	sm.prs[2].Next = sm.raftLog.firstIndex() - 1
+	sm.prs[2].RecentActive = false
+
+	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+
+	msgs := sm.readMessages()
+	if len(msgs) != 0 {
+		t.Errorf("len(msgs) = %d, want 0", len(msgs))
+	}
+}
+
 func TestRestoreFromSnapMsg(t *testing.T) {
 	s := pb.Snapshot{
 		Metadata: pb.SnapshotMetadata{
@@ -1624,8 +1651,18 @@ func TestSlowNodeRestore(t *testing.T) {
 	nt.storage[1].Compact(lead.raftLog.applied)
 
 	nt.recover()
+	// send heartbeats so that the leader can learn everyone is active.
+	// node 3 will only be considered as active when node 1 receives a reply from it.
+	for {
+		nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
+		if lead.prs[3].RecentActive {
+			break
+		}
+	}
+
 	// trigger a snapshot
 	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
+
 	follower := nt.peers[3].(*raft)
 
 	// trigger a commit