Ver Fonte

raft: Only call stableTo when we have ready entries or a snapshot.

The first Ready after RestartNode (with no snapshot) will have no
unstable entries, so we don't have the correct prevLastUnstablei
when Advance is called. This would cause raftLog.unstable to move
backwards and previously-stable entries would be returned to
the application again.

This should have been caught by the "unexpected Ready" portion of
TestNodeRestart, but it went unnoticed because the Node's goroutine
takes some time to read from advancec and prepare the write to read to
readyc. Added a small (1ms) delay to all such tests to ensure that the
goroutine has time to enter its select wait.
Ben Darnell há 11 anos atrás
pai
commit
32824e053c
2 ficheiros alterados com 11 adições e 5 exclusões
  1. 7 1
      raft/node.go
  2. 4 4
      raft/node_test.go

+ 7 - 1
raft/node.go

@@ -226,6 +226,7 @@ func (n *node) run(r *raft) {
 	var readyc chan Ready
 	var readyc chan Ready
 	var advancec chan struct{}
 	var advancec chan struct{}
 	var prevLastUnstablei uint64
 	var prevLastUnstablei uint64
+	var havePrevLastUnstablei bool
 	var rd Ready
 	var rd Ready
 
 
 	lead := None
 	lead := None
@@ -294,6 +295,7 @@ func (n *node) run(r *raft) {
 			}
 			}
 			if len(rd.Entries) > 0 {
 			if len(rd.Entries) > 0 {
 				prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
 				prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
+				havePrevLastUnstablei = true
 			}
 			}
 			if !IsEmptyHardState(rd.HardState) {
 			if !IsEmptyHardState(rd.HardState) {
 				prevHardSt = rd.HardState
 				prevHardSt = rd.HardState
@@ -302,6 +304,7 @@ func (n *node) run(r *raft) {
 				prevSnapi = rd.Snapshot.Index
 				prevSnapi = rd.Snapshot.Index
 				if prevSnapi > prevLastUnstablei {
 				if prevSnapi > prevLastUnstablei {
 					prevLastUnstablei = prevSnapi
 					prevLastUnstablei = prevSnapi
+					havePrevLastUnstablei = true
 				}
 				}
 			}
 			}
 			r.msgs = nil
 			r.msgs = nil
@@ -310,7 +313,10 @@ func (n *node) run(r *raft) {
 			if prevHardSt.Commit != 0 {
 			if prevHardSt.Commit != 0 {
 				r.raftLog.appliedTo(prevHardSt.Commit)
 				r.raftLog.appliedTo(prevHardSt.Commit)
 			}
 			}
-			r.raftLog.stableTo(prevLastUnstablei)
+			if havePrevLastUnstablei {
+				r.raftLog.stableTo(prevLastUnstablei)
+				havePrevLastUnstablei = false
+			}
 			advancec = nil
 			advancec = nil
 		case <-n.stop:
 		case <-n.stop:
 			close(n.done)
 			close(n.done)

+ 4 - 4
raft/node_test.go

@@ -336,7 +336,7 @@ func TestNodeStart(t *testing.T) {
 	select {
 	select {
 	case rd := <-n.Ready():
 	case rd := <-n.Ready():
 		t.Errorf("unexpected Ready: %+v", rd)
 		t.Errorf("unexpected Ready: %+v", rd)
-	default:
+	case <-time.After(time.Millisecond):
 	}
 	}
 }
 }
 
 
@@ -364,7 +364,7 @@ func TestNodeRestart(t *testing.T) {
 	select {
 	select {
 	case rd := <-n.Ready():
 	case rd := <-n.Ready():
 		t.Errorf("unexpected Ready: %+v", rd)
 		t.Errorf("unexpected Ready: %+v", rd)
-	default:
+	case <-time.After(time.Millisecond):
 	}
 	}
 }
 }
 
 
@@ -432,12 +432,12 @@ func TestNodeAdvance(t *testing.T) {
 	select {
 	select {
 	case rd := <-n.Ready():
 	case rd := <-n.Ready():
 		t.Fatalf("unexpected Ready before Advance: %+v", rd)
 		t.Fatalf("unexpected Ready before Advance: %+v", rd)
-	default:
+	case <-time.After(time.Millisecond):
 	}
 	}
 	n.Advance()
 	n.Advance()
 	select {
 	select {
 	case <-n.Ready():
 	case <-n.Ready():
-	default:
+	case <-time.After(time.Millisecond):
 		t.Errorf("expect Ready after Advance, but there is no Ready available")
 		t.Errorf("expect Ready after Advance, but there is no Ready available")
 	}
 	}
 }
 }