Browse Source

Merge pull request #1800 from xiang90/unstable

raft: move unstable related function to log_unstable.go
Xiang Li 11 years ago
parent
commit
35cf7b5a31
3 changed files with 170 additions and 72 deletions
  1. 35 69
      raft/log.go
  2. 134 0
      raft/log_unstable.go
  3. 1 3
      raft/node.go

+ 35 - 69
raft/log.go

@@ -41,18 +41,6 @@ type raftLog struct {
 	applied uint64
 	applied uint64
 }
 }
 
 
-// unstable.entris[i] has raft log position i+unstable.offset.
-// Note that unstable.offset may be less than the highest log
-// position in storage; this means that the next write to storage
-// might need to truncate the log before persisting unstable.entries.
-type unstable struct {
-	// the incoming unstable snapshot, if any.
-	snapshot *pb.Snapshot
-	// all entries that have not yet been written to storage.
-	entries []pb.Entry
-	offset  uint64
-}
-
 // newLog returns log using the given storage. It recovers the log to the state
 // newLog returns log using the given storage. It recovers the log to the state
 // that it just commits and applies the lastest snapshot.
 // that it just commits and applies the lastest snapshot.
 func newLog(storage Storage) *raftLog {
 func newLog(storage Storage) *raftLog {
@@ -106,15 +94,7 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {
 	if after < l.committed {
 	if after < l.committed {
 		log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
 		log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
 	}
 	}
-	if after < l.unstable.offset {
-		// The log is being truncated to before our current unstable
-		// portion, so discard it and reset unstable.
-		l.unstable.entries = nil
-		l.unstable.offset = after + 1
-	}
-	// Truncate any unstable entries that are being replaced, then
-	// append the new ones.
-	l.unstable.entries = append(l.unstable.entries[:after+1-l.unstable.offset], ents...)
+	l.unstable.truncateAndAppend(after, ents)
 	return l.lastIndex()
 	return l.lastIndex()
 }
 }
 
 
@@ -166,8 +146,8 @@ func (l *raftLog) snapshot() (pb.Snapshot, error) {
 }
 }
 
 
 func (l *raftLog) firstIndex() uint64 {
 func (l *raftLog) firstIndex() uint64 {
-	if l.unstable.snapshot != nil {
-		return l.unstable.snapshot.Metadata.Index + 1
+	if i, ok := l.unstable.maybeFirstIndex(); ok {
+		return i
 	}
 	}
 	index, err := l.storage.FirstIndex()
 	index, err := l.storage.FirstIndex()
 	if err != nil {
 	if err != nil {
@@ -177,7 +157,14 @@ func (l *raftLog) firstIndex() uint64 {
 }
 }
 
 
 func (l *raftLog) lastIndex() uint64 {
 func (l *raftLog) lastIndex() uint64 {
-	return l.unstable.offset + uint64(len(l.unstable.entries)) - 1
+	if i, ok := l.unstable.maybeLastIndex(); ok {
+		return i
+	}
+	i, err := l.storage.LastIndex()
+	if err != nil {
+		panic(err) // TODO(bdarnell)
+	}
+	return i
 }
 }
 
 
 func (l *raftLog) commitTo(tocommit uint64) {
 func (l *raftLog) commitTo(tocommit uint64) {
@@ -200,52 +187,35 @@ func (l *raftLog) appliedTo(i uint64) {
 	l.applied = i
 	l.applied = i
 }
 }
 
 
-func (l *raftLog) stableTo(i uint64) {
-	if i < l.unstable.offset || i+1-l.unstable.offset > uint64(len(l.unstable.entries)) {
-		log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
-			i, l.unstable.offset, len(l.unstable.entries))
-	}
-	l.unstable.entries = l.unstable.entries[i+1-l.unstable.offset:]
-	l.unstable.offset = i + 1
-}
+func (l *raftLog) stableTo(i uint64) { l.unstable.stableTo(i) }
 
 
-func (l *raftLog) lastTerm() uint64 {
-	return l.term(l.lastIndex())
-}
+func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
+
+func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) }
 
 
 func (l *raftLog) term(i uint64) uint64 {
 func (l *raftLog) term(i uint64) uint64 {
-	switch {
-	case i > l.lastIndex():
+	if i > l.lastIndex() {
 		return 0
 		return 0
-	case i < l.unstable.offset:
-		if snap := l.unstable.snapshot; snap != nil {
-			if i == snap.Metadata.Index {
-				return snap.Metadata.Term
-			}
-			return 0
-		}
-		t, err := l.storage.Term(i)
-		switch err {
-		case nil:
-			return t
-		case ErrCompacted:
-			return 0
-		default:
-			panic(err) // TODO(bdarnell)
-		}
-	default:
-		return l.unstable.entries[i-l.unstable.offset].Term
 	}
 	}
-}
 
 
-func (l *raftLog) entries(i uint64) []pb.Entry {
-	return l.slice(i, l.lastIndex()+1)
+	if t, ok := l.unstable.maybeTerm(i); ok {
+		return t
+	}
+
+	t, err := l.storage.Term(i)
+	if err == nil {
+		return t
+	}
+	if err == ErrCompacted {
+		return 0
+	}
+	panic(err) // TODO(bdarnell)
 }
 }
 
 
+func (l *raftLog) entries(i uint64) []pb.Entry { return l.slice(i, l.lastIndex()+1) }
+
 // allEntries returns all entries in the log.
 // allEntries returns all entries in the log.
-func (l *raftLog) allEntries() []pb.Entry {
-	return l.entries(l.firstIndex())
-}
+func (l *raftLog) allEntries() []pb.Entry { return l.entries(l.firstIndex()) }
 
 
 // isUpToDate determines if the given (lastIndex,term) log is more up-to-date
 // isUpToDate determines if the given (lastIndex,term) log is more up-to-date
 // by comparing the index and term of the last entries in the existing logs.
 // by comparing the index and term of the last entries in the existing logs.
@@ -257,9 +227,7 @@ func (l *raftLog) isUpToDate(lasti, term uint64) bool {
 	return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
 	return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
 }
 }
 
 
-func (l *raftLog) matchTerm(i, term uint64) bool {
-	return l.term(i) == term
-}
+func (l *raftLog) matchTerm(i, term uint64) bool { return l.term(i) == term }
 
 
 func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
 func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
 	if maxIndex > l.committed && l.term(maxIndex) == term {
 	if maxIndex > l.committed && l.term(maxIndex) == term {
@@ -271,9 +239,7 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
 
 
 func (l *raftLog) restore(s pb.Snapshot) {
 func (l *raftLog) restore(s pb.Snapshot) {
 	l.committed = s.Metadata.Index
 	l.committed = s.Metadata.Index
-	l.unstable.offset = l.committed + 1
-	l.unstable.entries = nil
-	l.unstable.snapshot = &s
+	l.unstable.restore(s)
 }
 }
 
 
 // slice returns a slice of log entries from lo through hi-1, inclusive.
 // slice returns a slice of log entries from lo through hi-1, inclusive.
@@ -297,8 +263,8 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
 		ents = append(ents, storedEnts...)
 		ents = append(ents, storedEnts...)
 	}
 	}
 	if hi > l.unstable.offset {
 	if hi > l.unstable.offset {
-		firstUnstable := max(lo, l.unstable.offset)
-		ents = append(ents, l.unstable.entries[firstUnstable-l.unstable.offset:hi-l.unstable.offset]...)
+		unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
+		ents = append(ents, unstable...)
 	}
 	}
 	return ents
 	return ents
 }
 }

+ 134 - 0
raft/log_unstable.go

@@ -0,0 +1,134 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package raft
+
+import (
+	"log"
+
+	pb "github.com/coreos/etcd/raft/raftpb"
+)
+
+// unstable.entris[i] has raft log position i+unstable.offset.
+// Note that unstable.offset may be less than the highest log
+// position in storage; this means that the next write to storage
+// might need to truncate the log before persisting unstable.entries.
+type unstable struct {
+	// the incoming unstable snapshot, if any.
+	snapshot *pb.Snapshot
+	// all entries that have not yet been written to storage.
+	entries []pb.Entry
+	offset  uint64
+}
+
+// maybeFirstIndex returns the first index if it has a snapshot.
+func (u *unstable) maybeFirstIndex() (uint64, bool) {
+	if u.snapshot != nil {
+		return u.snapshot.Metadata.Index, true
+	}
+	return 0, false
+}
+
+// maybeLastIndex returns the last index if it has at least one
+// unstable entry or snapshot.
+func (u *unstable) maybeLastIndex() (uint64, bool) {
+	if l := len(u.entries); l != 0 {
+		return u.offset + uint64(l) - 1, true
+	}
+	if u.snapshot != nil {
+		return u.snapshot.Metadata.Index, true
+	}
+	return 0, false
+}
+
+// myabeTerm returns the term of the entry at index i, if there
+// is any.
+func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
+	if i < u.offset {
+		if u.snapshot == nil {
+			return 0, false
+		}
+		if u.snapshot.Metadata.Index == i {
+			return u.snapshot.Metadata.Term, true
+		}
+		return 0, false
+	}
+
+	last, ok := u.maybeLastIndex()
+	if !ok {
+		return 0, false
+	}
+	if i > last {
+		return 0, false
+	}
+	return u.entries[i-u.offset].Term, true
+}
+
+func (u *unstable) stableTo(i uint64) {
+	if i < u.offset || i+1-u.offset > uint64(len(u.entries)) {
+		log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
+			i, u.offset, len(u.entries))
+	}
+	u.entries = u.entries[i+1-u.offset:]
+	u.offset = i + 1
+}
+
+func (u *unstable) stableSnapTo(i uint64) {
+	if u.snapshot != nil && u.snapshot.Metadata.Index == i {
+		u.snapshot = nil
+	}
+}
+
+func (u *unstable) restore(s pb.Snapshot) {
+	u.offset = s.Metadata.Index + 1
+	u.entries = nil
+	u.snapshot = &s
+}
+
+func (u *unstable) resetEntries(offset uint64) {
+	u.entries = nil
+	u.offset = offset
+}
+
+func (u *unstable) truncateAndAppend(after uint64, ents []pb.Entry) {
+	if after < u.offset {
+		// The log is being truncated to before our current unstable
+		// portion, so discard it and reset unstable.
+		u.resetEntries(after + 1)
+	}
+	u.entries = append(u.slice(u.offset, after+1), ents...)
+}
+
+func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
+	if lo >= hi {
+		return nil
+	}
+	if u.isOutOfBounds(lo) || u.isOutOfBounds(hi-1) {
+		return nil
+	}
+	return u.entries[lo-u.offset : hi-u.offset]
+}
+
+func (u *unstable) isOutOfBounds(i uint64) bool {
+	if len(u.entries) == 0 {
+		return true
+	}
+	last := u.offset + uint64(len(u.entries)) - 1
+	if i < u.offset || i > last {
+		return true
+	}
+	return false
+}

+ 1 - 3
raft/node.go

@@ -306,9 +306,7 @@ func (n *node) run(r *raft) {
 				r.raftLog.stableTo(prevLastUnstablei)
 				r.raftLog.stableTo(prevLastUnstablei)
 				havePrevLastUnstablei = false
 				havePrevLastUnstablei = false
 			}
 			}
-			if r.raftLog.unstable.snapshot != nil && r.raftLog.unstable.snapshot.Metadata.Index == prevSnapi {
-				r.raftLog.unstable.snapshot = nil
-			}
+			r.raftLog.stableSnapTo(prevSnapi)
 			advancec = nil
 			advancec = nil
 		case <-n.stop:
 		case <-n.stop:
 			close(n.done)
 			close(n.done)