فهرست منبع

raft: stableTo checks term matching

stableTo should only mark the index stable if the term is matched. After raft sends out unstable
entries to application, raft makes progress without waiting for reply. When the appliaction
calls the stableTo to notify the entries up to "index" are stable, raft might have truncated
some entries before "index" due to leader lost. raft must verify the (index,term) of stableTo,
before marking the entries as stable.
Xiang Li 11 سال پیش
والد
کامیت
3c0fbe285c
6فایلهای تغییر یافته به همراه16 افزوده شده و 18 حذف شده
  1. 1 1
      raft/log.go
  2. 3 3
      raft/log_test.go
  3. 7 11
      raft/log_unstable.go
  4. 3 1
      raft/node.go
  5. 1 1
      raft/raft_paper_test.go
  6. 1 1
      raft/raft_test.go

+ 1 - 1
raft/log.go

@@ -187,7 +187,7 @@ func (l *raftLog) appliedTo(i uint64) {
 	l.applied = i
 	l.applied = i
 }
 }
 
 
-func (l *raftLog) stableTo(i uint64) { l.unstable.stableTo(i) }
+func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
 
 
 func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
 func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
 
 

+ 3 - 3
raft/log_test.go

@@ -392,7 +392,7 @@ func TestUnstableEnts(t *testing.T) {
 
 
 		ents := raftLog.unstableEntries()
 		ents := raftLog.unstableEntries()
 		if l := len(ents); l > 0 {
 		if l := len(ents); l > 0 {
-			raftLog.stableTo(ents[l-1].Index)
+			raftLog.stableTo(ents[l-1].Index, ents[l-i].Term)
 		}
 		}
 		if !reflect.DeepEqual(ents, tt.wents) {
 		if !reflect.DeepEqual(ents, tt.wents) {
 			t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents)
 			t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents)
@@ -446,8 +446,8 @@ func TestStableTo(t *testing.T) {
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		raftLog := newLog(NewMemoryStorage())
 		raftLog := newLog(NewMemoryStorage())
-		raftLog.append(0, []pb.Entry{{}, {}}...)
-		raftLog.stableTo(tt.stable)
+		raftLog.append(0, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}...)
+		raftLog.stableTo(tt.stable, 1)
 		if raftLog.unstable.offset != tt.wunstable {
 		if raftLog.unstable.offset != tt.wunstable {
 			t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable)
 			t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable)
 		}
 		}

+ 7 - 11
raft/log_unstable.go

@@ -16,11 +16,7 @@
 
 
 package raft
 package raft
 
 
-import (
-	"log"
-
-	pb "github.com/coreos/etcd/raft/raftpb"
-)
+import pb "github.com/coreos/etcd/raft/raftpb"
 
 
 // unstable.entris[i] has raft log position i+unstable.offset.
 // unstable.entris[i] has raft log position i+unstable.offset.
 // Note that unstable.offset may be less than the highest log
 // Note that unstable.offset may be less than the highest log
@@ -77,13 +73,13 @@ func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
 	return u.entries[i-u.offset].Term, true
 	return u.entries[i-u.offset].Term, true
 }
 }
 
 
-func (u *unstable) stableTo(i uint64) {
-	if i < u.offset || i+1-u.offset > uint64(len(u.entries)) {
-		log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
-			i, u.offset, len(u.entries))
+func (u *unstable) stableTo(i, t uint64) {
+	if gt, ok := u.maybeTerm(i); ok {
+		if gt == t && i >= u.offset {
+			u.entries = u.entries[i+1-u.offset:]
+			u.offset = i + 1
+		}
 	}
 	}
-	u.entries = u.entries[i+1-u.offset:]
-	u.offset = i + 1
 }
 }
 
 
 func (u *unstable) stableSnapTo(i uint64) {
 func (u *unstable) stableSnapTo(i uint64) {

+ 3 - 1
raft/node.go

@@ -209,6 +209,7 @@ func (n *node) run(r *raft) {
 	var readyc chan Ready
 	var readyc chan Ready
 	var advancec chan struct{}
 	var advancec chan struct{}
 	var prevLastUnstablei uint64
 	var prevLastUnstablei uint64
+	var prevLastUnstablet uint64
 	var havePrevLastUnstablei bool
 	var havePrevLastUnstablei bool
 	var prevSnapi uint64
 	var prevSnapi uint64
 	var rd Ready
 	var rd Ready
@@ -284,6 +285,7 @@ func (n *node) run(r *raft) {
 			}
 			}
 			if len(rd.Entries) > 0 {
 			if len(rd.Entries) > 0 {
 				prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
 				prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
+				prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
 				havePrevLastUnstablei = true
 				havePrevLastUnstablei = true
 			}
 			}
 			if !IsEmptyHardState(rd.HardState) {
 			if !IsEmptyHardState(rd.HardState) {
@@ -303,7 +305,7 @@ func (n *node) run(r *raft) {
 				r.raftLog.appliedTo(prevHardSt.Commit)
 				r.raftLog.appliedTo(prevHardSt.Commit)
 			}
 			}
 			if havePrevLastUnstablei {
 			if havePrevLastUnstablei {
-				r.raftLog.stableTo(prevLastUnstablei)
+				r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
 				havePrevLastUnstablei = false
 				havePrevLastUnstablei = false
 			}
 			}
 			r.raftLog.stableSnapTo(prevSnapi)
 			r.raftLog.stableSnapTo(prevSnapi)

+ 1 - 1
raft/raft_paper_test.go

@@ -902,7 +902,7 @@ func commitNoopEntry(r *raft, s *MemoryStorage) {
 	r.readMessages()
 	r.readMessages()
 	s.Append(r.raftLog.unstableEntries())
 	s.Append(r.raftLog.unstableEntries())
 	r.raftLog.appliedTo(r.raftLog.committed)
 	r.raftLog.appliedTo(r.raftLog.committed)
-	r.raftLog.stableTo(r.raftLog.lastIndex())
+	r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
 }
 }
 
 
 func acceptAndReply(m pb.Message) pb.Message {
 func acceptAndReply(m pb.Message) pb.Message {

+ 1 - 1
raft/raft_test.go

@@ -31,7 +31,7 @@ import (
 func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
 func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
 	// Transfer all unstable entries to "stable" storage.
 	// Transfer all unstable entries to "stable" storage.
 	s.Append(r.raftLog.unstableEntries())
 	s.Append(r.raftLog.unstableEntries())
-	r.raftLog.stableTo(r.raftLog.lastIndex())
+	r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
 
 
 	ents = r.raftLog.nextEnts()
 	ents = r.raftLog.nextEnts()
 	r.raftLog.appliedTo(r.raftLog.committed)
 	r.raftLog.appliedTo(r.raftLog.committed)