Browse Source

Merge pull request #1831 from xiang90/fix_unstable

raft: fix unstable
Xiang Li 11 years ago
parent
commit
79014556e9
3 changed files with 18 additions and 8 deletions
  1. 2 0
      raft/log.go
  2. 7 6
      raft/log_unstable.go
  3. 9 2
      raft/storage.go

+ 2 - 0
raft/log.go

@@ -263,6 +263,8 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
 			// This should never fail because it has been checked before.
 			log.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset))
 			return nil
+		} else if err == ErrUnavailable {
+			return nil
 		} else if err != nil {
 			panic(err) // TODO(bdarnell)
 		}

+ 7 - 6
raft/log_unstable.go

@@ -34,10 +34,11 @@ type unstable struct {
 	offset  uint64
 }
 
-// maybeFirstIndex returns the first index if it has a snapshot.
+// maybeFirstIndex returns the index of the first possible entry in entries
+// if it has a snapshot.
 func (u *unstable) maybeFirstIndex() (uint64, bool) {
 	if u.snapshot != nil {
-		return u.snapshot.Metadata.Index, true
+		return u.snapshot.Metadata.Index + 1, true
 	}
 	return 0, false
 }
@@ -106,16 +107,16 @@ func (u *unstable) restore(s pb.Snapshot) {
 func (u *unstable) truncateAndAppend(ents []pb.Entry) {
 	after := ents[0].Index - 1
 	switch {
+	case after == u.offset+uint64(len(u.entries))-1:
+		// after is the last index in the u.entries
+		// directly append
+		u.entries = append(u.entries, ents...)
 	case after < u.offset:
 		log.Printf("raftlog: replace the unstable entries from index %d", after+1)
 		// The log is being truncated to before our current offset
 		// portion, so set the offset and replace the entries
 		u.offset = after + 1
 		u.entries = ents
-	case after == u.offset+uint64(len(u.entries))-1:
-		// after is the last index in the u.entries
-		// directly append
-		u.entries = append(u.entries, ents...)
 	default:
 		// truncate to after and copy to u.entries
 		// then append

+ 9 - 2
raft/storage.go

@@ -28,6 +28,8 @@ import (
 // index is unavailable because it predates the last snapshot.
 var ErrCompacted = errors.New("requested index is unavailable due to compaction")
 
+var ErrUnavailable = errors.New("requested entry at index is unavailable")
+
 // Storage is an interface that may be implemented by the application
 // to retrieve log entries from storage.
 //
@@ -47,8 +49,9 @@ type Storage interface {
 	// LastIndex returns the index of the last entry in the log.
 	LastIndex() (uint64, error)
 	// FirstIndex returns the index of the first log entry that is
-	// available via Entries (older entries have been incorporated
-	// into the latest Snapshot).
+	// possibly available via Entries (older entries have been incorporated
+	// into the latest Snapshot; if storage only contains the dummy entry the
+	// first log entry is not available).
 	FirstIndex() (uint64, error)
 	// Snapshot returns the most recent snapshot.
 	Snapshot() (pb.Snapshot, error)
@@ -95,6 +98,10 @@ func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
 	if lo <= offset {
 		return nil, ErrCompacted
 	}
+	// only contains dummy entries.
+	if len(ms.ents) == 1 {
+		return nil, ErrUnavailable
+	}
 	return ms.ents[lo-offset : hi-offset], nil
 }