Browse Source

raft: move all unstable stuff into one struct for future cleanup

Xiang Li 11 years ago
parent
commit
66252c7d62
4 changed files with 63 additions and 57 deletions
  1. 45 39
      raft/log.go
  2. 4 4
      raft/log_test.go
  3. 4 4
      raft/node.go
  4. 10 10
      raft/raft_test.go

+ 45 - 39
raft/log.go

@@ -27,16 +27,10 @@ type raftLog struct {
 	// storage contains all stable entries since the last snapshot.
 	// storage contains all stable entries since the last snapshot.
 	storage Storage
 	storage Storage
 
 
-	// the incoming unstable snapshot, if any.
-	unstableSnapshot *pb.Snapshot
-	// unstableEnts contains all entries that have not yet been written
-	// to storage.
-	unstableEnts []pb.Entry
-	// unstableEnts[i] has raft log position i+unstable.  Note that
-	// unstable may be less than the highest log position in storage;
-	// this means that the next write to storage will truncate the log
-	// before persisting unstableEnts.
-	unstable uint64
+	// unstable contains all unstable entries and snapshot.
+	// they will be saved into storage.
+	unstable unstable
+
 	// committed is the highest log position that is known to be in
 	// committed is the highest log position that is known to be in
 	// stable storage on a quorum of nodes.
 	// stable storage on a quorum of nodes.
 	// Invariant: committed < unstable
 	// Invariant: committed < unstable
@@ -47,6 +41,18 @@ type raftLog struct {
 	applied uint64
 	applied uint64
 }
 }
 
 
+// unstable.entris[i] has raft log position i+unstable.offset.
+// Note that unstable.offset may be less than the highest log
+// position in storage; this means that the next write to storage
+// might need to truncate the log before persisting unstable.entries.
+type unstable struct {
+	// the incoming unstable snapshot, if any.
+	snapshot *pb.Snapshot
+	// all entries that have not yet been written to storage.
+	entries []pb.Entry
+	offset  uint64
+}
+
 // newLog returns log using the given storage. It recovers the log to the state
 // newLog returns log using the given storage. It recovers the log to the state
 // that it just commits and applies the lastest snapshot.
 // that it just commits and applies the lastest snapshot.
 func newLog(storage Storage) *raftLog {
 func newLog(storage Storage) *raftLog {
@@ -64,7 +70,7 @@ func newLog(storage Storage) *raftLog {
 	if err != nil {
 	if err != nil {
 		panic(err) // TODO(bdarnell)
 		panic(err) // TODO(bdarnell)
 	}
 	}
-	log.unstable = lastIndex + 1
+	log.unstable.offset = lastIndex + 1
 	// Initialize our committed and applied pointers to the time of the last compaction.
 	// Initialize our committed and applied pointers to the time of the last compaction.
 	log.committed = firstIndex - 1
 	log.committed = firstIndex - 1
 	log.applied = firstIndex - 1
 	log.applied = firstIndex - 1
@@ -73,7 +79,7 @@ func newLog(storage Storage) *raftLog {
 }
 }
 
 
 func (l *raftLog) String() string {
 func (l *raftLog) String() string {
-	return fmt.Sprintf("unstable=%d committed=%d applied=%d len(unstableEntries)=%d", l.unstable, l.committed, l.applied, len(l.unstableEnts))
+	return fmt.Sprintf("unstable=%d committed=%d applied=%d len(unstableEntries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries))
 }
 }
 
 
 // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
 // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
@@ -100,15 +106,15 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {
 	if after < l.committed {
 	if after < l.committed {
 		log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
 		log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
 	}
 	}
-	if after < l.unstable {
+	if after < l.unstable.offset {
 		// The log is being truncated to before our current unstable
 		// The log is being truncated to before our current unstable
 		// portion, so discard it and reset unstable.
 		// portion, so discard it and reset unstable.
-		l.unstableEnts = nil
-		l.unstable = after + 1
+		l.unstable.entries = nil
+		l.unstable.offset = after + 1
 	}
 	}
 	// Truncate any unstable entries that are being replaced, then
 	// Truncate any unstable entries that are being replaced, then
 	// append the new ones.
 	// append the new ones.
-	l.unstableEnts = append(l.unstableEnts[:after+1-l.unstable], ents...)
+	l.unstable.entries = append(l.unstable.entries[:after+1-l.unstable.offset], ents...)
 	return l.lastIndex()
 	return l.lastIndex()
 }
 }
 
 
@@ -134,11 +140,11 @@ func (l *raftLog) findConflict(from uint64, ents []pb.Entry) uint64 {
 }
 }
 
 
 func (l *raftLog) unstableEntries() []pb.Entry {
 func (l *raftLog) unstableEntries() []pb.Entry {
-	if len(l.unstableEnts) == 0 {
+	if len(l.unstable.entries) == 0 {
 		return nil
 		return nil
 	}
 	}
 	// copy unstable entries to an empty slice
 	// copy unstable entries to an empty slice
-	return append([]pb.Entry{}, l.unstableEnts...)
+	return append([]pb.Entry{}, l.unstable.entries...)
 }
 }
 
 
 // nextEnts returns all the available entries for execution.
 // nextEnts returns all the available entries for execution.
@@ -153,15 +159,15 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
 }
 }
 
 
 func (l *raftLog) snapshot() (pb.Snapshot, error) {
 func (l *raftLog) snapshot() (pb.Snapshot, error) {
-	if l.unstableSnapshot != nil {
-		return *l.unstableSnapshot, nil
+	if l.unstable.snapshot != nil {
+		return *l.unstable.snapshot, nil
 	}
 	}
 	return l.storage.Snapshot()
 	return l.storage.Snapshot()
 }
 }
 
 
 func (l *raftLog) firstIndex() uint64 {
 func (l *raftLog) firstIndex() uint64 {
-	if l.unstableSnapshot != nil {
-		return l.unstableSnapshot.Metadata.Index + 1
+	if l.unstable.snapshot != nil {
+		return l.unstable.snapshot.Metadata.Index + 1
 	}
 	}
 	index, err := l.storage.FirstIndex()
 	index, err := l.storage.FirstIndex()
 	if err != nil {
 	if err != nil {
@@ -171,7 +177,7 @@ func (l *raftLog) firstIndex() uint64 {
 }
 }
 
 
 func (l *raftLog) lastIndex() uint64 {
 func (l *raftLog) lastIndex() uint64 {
-	return l.unstable + uint64(len(l.unstableEnts)) - 1
+	return l.unstable.offset + uint64(len(l.unstable.entries)) - 1
 }
 }
 
 
 func (l *raftLog) commitTo(tocommit uint64) {
 func (l *raftLog) commitTo(tocommit uint64) {
@@ -195,12 +201,12 @@ func (l *raftLog) appliedTo(i uint64) {
 }
 }
 
 
 func (l *raftLog) stableTo(i uint64) {
 func (l *raftLog) stableTo(i uint64) {
-	if i < l.unstable || i+1-l.unstable > uint64(len(l.unstableEnts)) {
+	if i < l.unstable.offset || i+1-l.unstable.offset > uint64(len(l.unstable.entries)) {
 		log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
 		log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
-			i, l.unstable, len(l.unstableEnts))
+			i, l.unstable.offset, len(l.unstable.entries))
 	}
 	}
-	l.unstableEnts = l.unstableEnts[i+1-l.unstable:]
-	l.unstable = i + 1
+	l.unstable.entries = l.unstable.entries[i+1-l.unstable.offset:]
+	l.unstable.offset = i + 1
 }
 }
 
 
 func (l *raftLog) lastTerm() uint64 {
 func (l *raftLog) lastTerm() uint64 {
@@ -211,8 +217,8 @@ func (l *raftLog) term(i uint64) uint64 {
 	switch {
 	switch {
 	case i > l.lastIndex():
 	case i > l.lastIndex():
 		return 0
 		return 0
-	case i < l.unstable:
-		if snap := l.unstableSnapshot; snap != nil {
+	case i < l.unstable.offset:
+		if snap := l.unstable.snapshot; snap != nil {
 			if i == snap.Metadata.Index {
 			if i == snap.Metadata.Index {
 				return snap.Metadata.Term
 				return snap.Metadata.Term
 			}
 			}
@@ -228,7 +234,7 @@ func (l *raftLog) term(i uint64) uint64 {
 			panic(err) // TODO(bdarnell)
 			panic(err) // TODO(bdarnell)
 		}
 		}
 	default:
 	default:
-		return l.unstableEnts[i-l.unstable].Term
+		return l.unstable.entries[i-l.unstable.offset].Term
 	}
 	}
 }
 }
 
 
@@ -265,9 +271,9 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
 
 
 func (l *raftLog) restore(s pb.Snapshot) {
 func (l *raftLog) restore(s pb.Snapshot) {
 	l.committed = s.Metadata.Index
 	l.committed = s.Metadata.Index
-	l.unstable = l.committed
-	l.unstableEnts = []pb.Entry{{Index: s.Metadata.Index, Term: s.Metadata.Term}}
-	l.unstableSnapshot = &s
+	l.unstable.offset = l.committed
+	l.unstable.entries = []pb.Entry{{Index: s.Metadata.Index, Term: s.Metadata.Term}}
+	l.unstable.snapshot = &s
 }
 }
 
 
 // slice returns a slice of log entries from lo through hi-1, inclusive.
 // slice returns a slice of log entries from lo through hi-1, inclusive.
@@ -279,20 +285,20 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
 		return nil
 		return nil
 	}
 	}
 	var ents []pb.Entry
 	var ents []pb.Entry
-	if lo < l.unstable {
-		storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable))
+	if lo < l.unstable.offset {
+		storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset))
 		if err == ErrCompacted {
 		if err == ErrCompacted {
 			// This should never fail because it has been checked before.
 			// This should never fail because it has been checked before.
-			log.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable))
+			log.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset))
 			return nil
 			return nil
 		} else if err != nil {
 		} else if err != nil {
 			panic(err) // TODO(bdarnell)
 			panic(err) // TODO(bdarnell)
 		}
 		}
 		ents = append(ents, storedEnts...)
 		ents = append(ents, storedEnts...)
 	}
 	}
-	if hi > l.unstable {
-		firstUnstable := max(lo, l.unstable)
-		ents = append(ents, l.unstableEnts[firstUnstable-l.unstable:hi-l.unstable]...)
+	if hi > l.unstable.offset {
+		firstUnstable := max(lo, l.unstable.offset)
+		ents = append(ents, l.unstable.entries[firstUnstable-l.unstable.offset:hi-l.unstable.offset]...)
 	}
 	}
 	return ents
 	return ents
 }
 }

+ 4 - 4
raft/log_test.go

@@ -143,7 +143,7 @@ func TestAppend(t *testing.T) {
 		if g := raftLog.entries(1); !reflect.DeepEqual(g, tt.wents) {
 		if g := raftLog.entries(1); !reflect.DeepEqual(g, tt.wents) {
 			t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents)
 			t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents)
 		}
 		}
-		if g := raftLog.unstable; g != tt.wunstable {
+		if g := raftLog.unstable.offset; g != tt.wunstable {
 			t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable)
 			t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable)
 		}
 		}
 	}
 	}
@@ -398,7 +398,7 @@ func TestUnstableEnts(t *testing.T) {
 			t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents)
 			t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents)
 		}
 		}
 		w := previousEnts[len(previousEnts)-1].Index + 1
 		w := previousEnts[len(previousEnts)-1].Index + 1
-		if g := raftLog.unstable; g != w {
+		if g := raftLog.unstable.offset; g != w {
 			t.Errorf("#%d: unstable = %d, want %d", i, g, w)
 			t.Errorf("#%d: unstable = %d, want %d", i, g, w)
 		}
 		}
 	}
 	}
@@ -448,7 +448,7 @@ func TestStableTo(t *testing.T) {
 		raftLog := newLog(NewMemoryStorage())
 		raftLog := newLog(NewMemoryStorage())
 		raftLog.append(0, []pb.Entry{{}, {}}...)
 		raftLog.append(0, []pb.Entry{{}, {}}...)
 		raftLog.stableTo(tt.stable)
 		raftLog.stableTo(tt.stable)
-		if raftLog.unstable != tt.wunstable {
+		if raftLog.unstable.offset != tt.wunstable {
 			t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable)
 			t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable)
 		}
 		}
 	}
 	}
@@ -520,7 +520,7 @@ func TestLogRestore(t *testing.T) {
 	if raftLog.committed != index {
 	if raftLog.committed != index {
 		t.Errorf("comitted = %d, want %d", raftLog.committed, index)
 		t.Errorf("comitted = %d, want %d", raftLog.committed, index)
 	}
 	}
-	if raftLog.unstable != index+1 {
+	if raftLog.unstable.offset != index+1 {
 		t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1)
 		t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1)
 	}
 	}
 	if raftLog.term(index) != term {
 	if raftLog.term(index) != term {

+ 4 - 4
raft/node.go

@@ -306,8 +306,8 @@ func (n *node) run(r *raft) {
 				r.raftLog.stableTo(prevLastUnstablei)
 				r.raftLog.stableTo(prevLastUnstablei)
 				havePrevLastUnstablei = false
 				havePrevLastUnstablei = false
 			}
 			}
-			if r.raftLog.unstableSnapshot != nil && r.raftLog.unstableSnapshot.Metadata.Index == prevSnapi {
-				r.raftLog.unstableSnapshot = nil
+			if r.raftLog.unstable.snapshot != nil && r.raftLog.unstable.snapshot.Metadata.Index == prevSnapi {
+				r.raftLog.unstable.snapshot = nil
 			}
 			}
 			advancec = nil
 			advancec = nil
 		case <-n.stop:
 		case <-n.stop:
@@ -405,8 +405,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	if !isHardStateEqual(r.HardState, prevHardSt) {
 	if !isHardStateEqual(r.HardState, prevHardSt) {
 		rd.HardState = r.HardState
 		rd.HardState = r.HardState
 	}
 	}
-	if r.raftLog.unstableSnapshot != nil {
-		rd.Snapshot = *r.raftLog.unstableSnapshot
+	if r.raftLog.unstable.snapshot != nil {
+		rd.Snapshot = *r.raftLog.unstable.snapshot
 	}
 	}
 	return rd
 	return rd
 }
 }

+ 10 - 10
raft/raft_test.go

@@ -337,7 +337,7 @@ func TestDuelingCandidates(t *testing.T) {
 	wlog := &raftLog{
 	wlog := &raftLog{
 		storage:   &MemoryStorage{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}},
 		storage:   &MemoryStorage{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}},
 		committed: 1,
 		committed: 1,
-		unstable:  2,
+		unstable:  unstable{offset: 2},
 	}
 	}
 	tests := []struct {
 	tests := []struct {
 		sm      *raft
 		sm      *raft
@@ -394,7 +394,7 @@ func TestCandidateConcede(t *testing.T) {
 		storage: &MemoryStorage{
 		storage: &MemoryStorage{
 			ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
 			ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
 		},
 		},
-		unstable:  3,
+		unstable:  unstable{offset: 3},
 		committed: 2,
 		committed: 2,
 	})
 	})
 	for i, p := range tt.peers {
 	for i, p := range tt.peers {
@@ -435,7 +435,7 @@ func TestOldMessages(t *testing.T) {
 				{Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
 				{Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
 			},
 			},
 		},
 		},
-		unstable:  4,
+		unstable:  unstable{offset: 4},
 		committed: 3,
 		committed: 3,
 	}
 	}
 	base := ltoa(l)
 	base := ltoa(l)
@@ -492,7 +492,7 @@ func TestProposal(t *testing.T) {
 				storage: &MemoryStorage{
 				storage: &MemoryStorage{
 					ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
 					ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
 				},
 				},
-				unstable:  3,
+				unstable:  unstable{offset: 3},
 				committed: 2}
 				committed: 2}
 		}
 		}
 		base := ltoa(wantLog)
 		base := ltoa(wantLog)
@@ -531,7 +531,7 @@ func TestProposalByProxy(t *testing.T) {
 			storage: &MemoryStorage{
 			storage: &MemoryStorage{
 				ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
 				ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
 			},
 			},
-			unstable:  3,
+			unstable:  unstable{offset: 3},
 			committed: 2}
 			committed: 2}
 		base := ltoa(wantLog)
 		base := ltoa(wantLog)
 		for i, p := range tt.peers {
 		for i, p := range tt.peers {
@@ -585,7 +585,7 @@ func TestCommit(t *testing.T) {
 			prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
 			prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
 		}
 		}
 		sm := &raft{
 		sm := &raft{
-			raftLog:   &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: uint64(len(tt.logs))},
+			raftLog:   &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: unstable{offset: uint64(len(tt.logs))}},
 			prs:       prs,
 			prs:       prs,
 			HardState: pb.HardState{Term: tt.smTerm},
 			HardState: pb.HardState{Term: tt.smTerm},
 		}
 		}
@@ -681,7 +681,7 @@ func TestHandleMsgApp(t *testing.T) {
 			raftLog: &raftLog{
 			raftLog: &raftLog{
 				committed: 0,
 				committed: 0,
 				storage:   &MemoryStorage{ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
 				storage:   &MemoryStorage{ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
-				unstable:  3,
+				unstable:  unstable{offset: 3},
 			},
 			},
 		}
 		}
 
 
@@ -781,7 +781,7 @@ func TestRecvMsgVote(t *testing.T) {
 		sm.HardState = pb.HardState{Vote: tt.voteFor}
 		sm.HardState = pb.HardState{Vote: tt.voteFor}
 		sm.raftLog = &raftLog{
 		sm.raftLog = &raftLog{
 			storage:  &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}},
 			storage:  &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}},
-			unstable: 3,
+			unstable: unstable{offset: 3},
 		}
 		}
 
 
 		sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
 		sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
@@ -929,7 +929,7 @@ func TestLeaderAppResp(t *testing.T) {
 		sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 		sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
 		sm.raftLog = &raftLog{
 		sm.raftLog = &raftLog{
 			storage:  &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
 			storage:  &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
-			unstable: 3,
+			unstable: unstable{offset: 3},
 		}
 		}
 		sm.becomeCandidate()
 		sm.becomeCandidate()
 		sm.becomeLeader()
 		sm.becomeLeader()
@@ -1351,7 +1351,7 @@ func ents(terms ...uint64) *raft {
 	sm := &raft{
 	sm := &raft{
 		raftLog: &raftLog{
 		raftLog: &raftLog{
 			storage:  &MemoryStorage{ents: ents},
 			storage:  &MemoryStorage{ents: ents},
-			unstable: uint64(len(ents)),
+			unstable: unstable{offset: uint64(len(ents))},
 		},
 		},
 	}
 	}
 	sm.reset(0)
 	sm.reset(0)