|
@@ -27,16 +27,10 @@ type raftLog struct {
|
|
|
// storage contains all stable entries since the last snapshot.
|
|
// storage contains all stable entries since the last snapshot.
|
|
|
storage Storage
|
|
storage Storage
|
|
|
|
|
|
|
|
- // the incoming unstable snapshot, if any.
|
|
|
|
|
- unstableSnapshot *pb.Snapshot
|
|
|
|
|
- // unstableEnts contains all entries that have not yet been written
|
|
|
|
|
- // to storage.
|
|
|
|
|
- unstableEnts []pb.Entry
|
|
|
|
|
- // unstableEnts[i] has raft log position i+unstable. Note that
|
|
|
|
|
- // unstable may be less than the highest log position in storage;
|
|
|
|
|
- // this means that the next write to storage will truncate the log
|
|
|
|
|
- // before persisting unstableEnts.
|
|
|
|
|
- unstable uint64
|
|
|
|
|
|
|
+ // unstable contains all unstable entries and snapshot.
|
|
|
|
|
+ // they will be saved into storage.
|
|
|
|
|
+ unstable unstable
|
|
|
|
|
+
|
|
|
// committed is the highest log position that is known to be in
|
|
// committed is the highest log position that is known to be in
|
|
|
// stable storage on a quorum of nodes.
|
|
// stable storage on a quorum of nodes.
|
|
|
// Invariant: committed < unstable
|
|
// Invariant: committed < unstable
|
|
@@ -47,6 +41,18 @@ type raftLog struct {
|
|
|
applied uint64
|
|
applied uint64
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// unstable.entris[i] has raft log position i+unstable.offset.
|
|
|
|
|
+// Note that unstable.offset may be less than the highest log
|
|
|
|
|
+// position in storage; this means that the next write to storage
|
|
|
|
|
+// might need to truncate the log before persisting unstable.entries.
|
|
|
|
|
+type unstable struct {
|
|
|
|
|
+ // the incoming unstable snapshot, if any.
|
|
|
|
|
+ snapshot *pb.Snapshot
|
|
|
|
|
+ // all entries that have not yet been written to storage.
|
|
|
|
|
+ entries []pb.Entry
|
|
|
|
|
+ offset uint64
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// newLog returns log using the given storage. It recovers the log to the state
|
|
// newLog returns log using the given storage. It recovers the log to the state
|
|
|
// that it just commits and applies the lastest snapshot.
|
|
// that it just commits and applies the lastest snapshot.
|
|
|
func newLog(storage Storage) *raftLog {
|
|
func newLog(storage Storage) *raftLog {
|
|
@@ -64,7 +70,7 @@ func newLog(storage Storage) *raftLog {
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
panic(err) // TODO(bdarnell)
|
|
panic(err) // TODO(bdarnell)
|
|
|
}
|
|
}
|
|
|
- log.unstable = lastIndex + 1
|
|
|
|
|
|
|
+ log.unstable.offset = lastIndex + 1
|
|
|
// Initialize our committed and applied pointers to the time of the last compaction.
|
|
// Initialize our committed and applied pointers to the time of the last compaction.
|
|
|
log.committed = firstIndex - 1
|
|
log.committed = firstIndex - 1
|
|
|
log.applied = firstIndex - 1
|
|
log.applied = firstIndex - 1
|
|
@@ -73,7 +79,7 @@ func newLog(storage Storage) *raftLog {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *raftLog) String() string {
|
|
func (l *raftLog) String() string {
|
|
|
- return fmt.Sprintf("unstable=%d committed=%d applied=%d len(unstableEntries)=%d", l.unstable, l.committed, l.applied, len(l.unstableEnts))
|
|
|
|
|
|
|
+ return fmt.Sprintf("unstable=%d committed=%d applied=%d len(unstableEntries)=%d", l.unstable.offset, l.committed, l.applied, len(l.unstable.entries))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
|
|
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
|
|
@@ -100,15 +106,15 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {
|
|
|
if after < l.committed {
|
|
if after < l.committed {
|
|
|
log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
|
|
log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
|
|
|
}
|
|
}
|
|
|
- if after < l.unstable {
|
|
|
|
|
|
|
+ if after < l.unstable.offset {
|
|
|
// The log is being truncated to before our current unstable
|
|
// The log is being truncated to before our current unstable
|
|
|
// portion, so discard it and reset unstable.
|
|
// portion, so discard it and reset unstable.
|
|
|
- l.unstableEnts = nil
|
|
|
|
|
- l.unstable = after + 1
|
|
|
|
|
|
|
+ l.unstable.entries = nil
|
|
|
|
|
+ l.unstable.offset = after + 1
|
|
|
}
|
|
}
|
|
|
// Truncate any unstable entries that are being replaced, then
|
|
// Truncate any unstable entries that are being replaced, then
|
|
|
// append the new ones.
|
|
// append the new ones.
|
|
|
- l.unstableEnts = append(l.unstableEnts[:after+1-l.unstable], ents...)
|
|
|
|
|
|
|
+ l.unstable.entries = append(l.unstable.entries[:after+1-l.unstable.offset], ents...)
|
|
|
return l.lastIndex()
|
|
return l.lastIndex()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -134,11 +140,11 @@ func (l *raftLog) findConflict(from uint64, ents []pb.Entry) uint64 {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *raftLog) unstableEntries() []pb.Entry {
|
|
func (l *raftLog) unstableEntries() []pb.Entry {
|
|
|
- if len(l.unstableEnts) == 0 {
|
|
|
|
|
|
|
+ if len(l.unstable.entries) == 0 {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
// copy unstable entries to an empty slice
|
|
// copy unstable entries to an empty slice
|
|
|
- return append([]pb.Entry{}, l.unstableEnts...)
|
|
|
|
|
|
|
+ return append([]pb.Entry{}, l.unstable.entries...)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// nextEnts returns all the available entries for execution.
|
|
// nextEnts returns all the available entries for execution.
|
|
@@ -153,15 +159,15 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *raftLog) snapshot() (pb.Snapshot, error) {
|
|
func (l *raftLog) snapshot() (pb.Snapshot, error) {
|
|
|
- if l.unstableSnapshot != nil {
|
|
|
|
|
- return *l.unstableSnapshot, nil
|
|
|
|
|
|
|
+ if l.unstable.snapshot != nil {
|
|
|
|
|
+ return *l.unstable.snapshot, nil
|
|
|
}
|
|
}
|
|
|
return l.storage.Snapshot()
|
|
return l.storage.Snapshot()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *raftLog) firstIndex() uint64 {
|
|
func (l *raftLog) firstIndex() uint64 {
|
|
|
- if l.unstableSnapshot != nil {
|
|
|
|
|
- return l.unstableSnapshot.Metadata.Index + 1
|
|
|
|
|
|
|
+ if l.unstable.snapshot != nil {
|
|
|
|
|
+ return l.unstable.snapshot.Metadata.Index + 1
|
|
|
}
|
|
}
|
|
|
index, err := l.storage.FirstIndex()
|
|
index, err := l.storage.FirstIndex()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -171,7 +177,7 @@ func (l *raftLog) firstIndex() uint64 {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *raftLog) lastIndex() uint64 {
|
|
func (l *raftLog) lastIndex() uint64 {
|
|
|
- return l.unstable + uint64(len(l.unstableEnts)) - 1
|
|
|
|
|
|
|
+ return l.unstable.offset + uint64(len(l.unstable.entries)) - 1
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *raftLog) commitTo(tocommit uint64) {
|
|
func (l *raftLog) commitTo(tocommit uint64) {
|
|
@@ -195,12 +201,12 @@ func (l *raftLog) appliedTo(i uint64) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *raftLog) stableTo(i uint64) {
|
|
func (l *raftLog) stableTo(i uint64) {
|
|
|
- if i < l.unstable || i+1-l.unstable > uint64(len(l.unstableEnts)) {
|
|
|
|
|
|
|
+ if i < l.unstable.offset || i+1-l.unstable.offset > uint64(len(l.unstable.entries)) {
|
|
|
log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
|
|
log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
|
|
|
- i, l.unstable, len(l.unstableEnts))
|
|
|
|
|
|
|
+ i, l.unstable.offset, len(l.unstable.entries))
|
|
|
}
|
|
}
|
|
|
- l.unstableEnts = l.unstableEnts[i+1-l.unstable:]
|
|
|
|
|
- l.unstable = i + 1
|
|
|
|
|
|
|
+ l.unstable.entries = l.unstable.entries[i+1-l.unstable.offset:]
|
|
|
|
|
+ l.unstable.offset = i + 1
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (l *raftLog) lastTerm() uint64 {
|
|
func (l *raftLog) lastTerm() uint64 {
|
|
@@ -211,8 +217,8 @@ func (l *raftLog) term(i uint64) uint64 {
|
|
|
switch {
|
|
switch {
|
|
|
case i > l.lastIndex():
|
|
case i > l.lastIndex():
|
|
|
return 0
|
|
return 0
|
|
|
- case i < l.unstable:
|
|
|
|
|
- if snap := l.unstableSnapshot; snap != nil {
|
|
|
|
|
|
|
+ case i < l.unstable.offset:
|
|
|
|
|
+ if snap := l.unstable.snapshot; snap != nil {
|
|
|
if i == snap.Metadata.Index {
|
|
if i == snap.Metadata.Index {
|
|
|
return snap.Metadata.Term
|
|
return snap.Metadata.Term
|
|
|
}
|
|
}
|
|
@@ -228,7 +234,7 @@ func (l *raftLog) term(i uint64) uint64 {
|
|
|
panic(err) // TODO(bdarnell)
|
|
panic(err) // TODO(bdarnell)
|
|
|
}
|
|
}
|
|
|
default:
|
|
default:
|
|
|
- return l.unstableEnts[i-l.unstable].Term
|
|
|
|
|
|
|
+ return l.unstable.entries[i-l.unstable.offset].Term
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -265,9 +271,9 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
|
|
|
|
|
|
|
|
func (l *raftLog) restore(s pb.Snapshot) {
|
|
func (l *raftLog) restore(s pb.Snapshot) {
|
|
|
l.committed = s.Metadata.Index
|
|
l.committed = s.Metadata.Index
|
|
|
- l.unstable = l.committed
|
|
|
|
|
- l.unstableEnts = []pb.Entry{{Index: s.Metadata.Index, Term: s.Metadata.Term}}
|
|
|
|
|
- l.unstableSnapshot = &s
|
|
|
|
|
|
|
+ l.unstable.offset = l.committed
|
|
|
|
|
+ l.unstable.entries = []pb.Entry{{Index: s.Metadata.Index, Term: s.Metadata.Term}}
|
|
|
|
|
+ l.unstable.snapshot = &s
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// slice returns a slice of log entries from lo through hi-1, inclusive.
|
|
// slice returns a slice of log entries from lo through hi-1, inclusive.
|
|
@@ -279,20 +285,20 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
var ents []pb.Entry
|
|
var ents []pb.Entry
|
|
|
- if lo < l.unstable {
|
|
|
|
|
- storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable))
|
|
|
|
|
|
|
+ if lo < l.unstable.offset {
|
|
|
|
|
+ storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset))
|
|
|
if err == ErrCompacted {
|
|
if err == ErrCompacted {
|
|
|
// This should never fail because it has been checked before.
|
|
// This should never fail because it has been checked before.
|
|
|
- log.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable))
|
|
|
|
|
|
|
+ log.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset))
|
|
|
return nil
|
|
return nil
|
|
|
} else if err != nil {
|
|
} else if err != nil {
|
|
|
panic(err) // TODO(bdarnell)
|
|
panic(err) // TODO(bdarnell)
|
|
|
}
|
|
}
|
|
|
ents = append(ents, storedEnts...)
|
|
ents = append(ents, storedEnts...)
|
|
|
}
|
|
}
|
|
|
- if hi > l.unstable {
|
|
|
|
|
- firstUnstable := max(lo, l.unstable)
|
|
|
|
|
- ents = append(ents, l.unstableEnts[firstUnstable-l.unstable:hi-l.unstable]...)
|
|
|
|
|
|
|
+ if hi > l.unstable.offset {
|
|
|
|
|
+ firstUnstable := max(lo, l.unstable.offset)
|
|
|
|
|
+ ents = append(ents, l.unstable.entries[firstUnstable-l.unstable.offset:hi-l.unstable.offset]...)
|
|
|
}
|
|
}
|
|
|
return ents
|
|
return ents
|
|
|
}
|
|
}
|