Browse Source

raft: clean up storage

Xiang Li 11 years ago
parent
commit
cbef6ab152
2 changed files with 32 additions and 8 deletions
  1. 19 8
      raft/storage.go
  2. 13 0
      raft/storage_test.go

+ 19 - 8
raft/storage.go

@@ -100,6 +100,9 @@ func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
 	if lo <= offset {
 	if lo <= offset {
 		return nil, ErrCompacted
 		return nil, ErrCompacted
 	}
 	}
+	if hi > ms.lastIndex()+1 {
+		log.Panicf("entries's hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex())
+	}
 	// only contains dummy entries.
 	// only contains dummy entries.
 	if len(ms.ents) == 1 {
 	if len(ms.ents) == 1 {
 		return nil, ErrUnavailable
 		return nil, ErrUnavailable
@@ -122,14 +125,22 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
 func (ms *MemoryStorage) LastIndex() (uint64, error) {
 func (ms *MemoryStorage) LastIndex() (uint64, error) {
 	ms.Lock()
 	ms.Lock()
 	defer ms.Unlock()
 	defer ms.Unlock()
-	return ms.ents[0].Index + uint64(len(ms.ents)) - 1, nil
+	return ms.lastIndex(), nil
+}
+
+func (ms *MemoryStorage) lastIndex() uint64 {
+	return ms.ents[0].Index + uint64(len(ms.ents)) - 1
 }
 }
 
 
 // FirstIndex implements the Storage interface.
 // FirstIndex implements the Storage interface.
 func (ms *MemoryStorage) FirstIndex() (uint64, error) {
 func (ms *MemoryStorage) FirstIndex() (uint64, error) {
 	ms.Lock()
 	ms.Lock()
 	defer ms.Unlock()
 	defer ms.Unlock()
-	return ms.ents[0].Index + 1, nil
+	return ms.firstIndex(), nil
+}
+
+func (ms *MemoryStorage) firstIndex() uint64 {
+	return ms.ents[0].Index + 1
 }
 }
 
 
 // Snapshot implements the Storage interface.
 // Snapshot implements the Storage interface.
@@ -163,8 +174,8 @@ func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte)
 	}
 	}
 
 
 	offset := ms.ents[0].Index
 	offset := ms.ents[0].Index
-	if i > offset+uint64(len(ms.ents))-1 {
-		log.Panicf("snapshot %d is out of bound lastindex(%d)", i, offset+uint64(len(ms.ents))-1)
+	if i > ms.lastIndex() {
+		log.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex())
 	}
 	}
 
 
 	ms.snapshot.Metadata.Index = i
 	ms.snapshot.Metadata.Index = i
@@ -184,8 +195,8 @@ func (ms *MemoryStorage) Compact(compactIndex uint64) error {
 	if compactIndex <= offset {
 	if compactIndex <= offset {
 		return ErrCompacted
 		return ErrCompacted
 	}
 	}
-	if compactIndex > offset+uint64(len(ms.ents))-1 {
-		log.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, offset+uint64(len(ms.ents))-1)
+	if compactIndex > ms.lastIndex() {
+		log.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex())
 	}
 	}
 
 
 	i := compactIndex - offset
 	i := compactIndex - offset
@@ -213,7 +224,7 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error {
 	if last < first {
 	if last < first {
 		return nil
 		return nil
 	}
 	}
-	// truncate old entries
+	// truncate compacted entries
 	if first > entries[0].Index {
 	if first > entries[0].Index {
 		entries = entries[first-entries[0].Index:]
 		entries = entries[first-entries[0].Index:]
 	}
 	}
@@ -227,7 +238,7 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error {
 		ms.ents = append(ms.ents, entries...)
 		ms.ents = append(ms.ents, entries...)
 	default:
 	default:
 		log.Panicf("missing log entry [last: %d, append at: %d]",
 		log.Panicf("missing log entry [last: %d, append at: %d]",
-			ms.ents[0].Index+uint64(len(ms.ents)), entries[0].Index)
+			ms.lastIndex(), entries[0].Index)
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 13 - 0
raft/storage_test.go

@@ -203,6 +203,19 @@ func TestStorageAppend(t *testing.T) {
 			nil,
 			nil,
 			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 5}},
 			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 5}},
 		},
 		},
+		// truncate incoming entries, truncate the existing entries and append
+		{
+			[]pb.Entry{{Index: 2, Term: 3}, {Index: 3, Term: 3}, {Index: 4, Term: 5}},
+			nil,
+			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 5}},
+		},
+		// tunncate the existing entries and append
+		{
+			[]pb.Entry{{Index: 4, Term: 5}},
+			nil,
+			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 5}},
+		},
+		// direct append
 		{
 		{
 			[]pb.Entry{{Index: 6, Term: 5}},
 			[]pb.Entry{{Index: 6, Term: 5}},
 			nil,
 			nil,