Переглянути джерело

raft: fix memory storage

Memory storage should append all entries that have greater index
than the snap.Matedata.Index. We first truncate the old parts of
incoming entries. Then truncate the existing entries in the storage.
At last, we append the incoming entries to the existing entries.
Xiang Li 11 роки тому
батько
коміт
312db7f0f3
1 змінених файлів з 19 додано та 5 видалено
  1. 19 5
      raft/storage.go

+ 19 - 5
raft/storage.go

@@ -179,13 +179,27 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) {
 	if len(entries) == 0 {
 		return
 	}
-	offset := entries[0].Index - ms.snapshot.Metadata.Index
-	// do not append out of date entries
-	if offset < 0 {
+	first := ms.snapshot.Metadata.Index + 1
+	last := entries[0].Index + uint64(len(entries)) - 1
+
+	// shortcut if there is no new entry.
+	if last < first {
 		return
 	}
-	if uint64(len(ms.ents)) > offset {
+	// truncate old entries
+	if first > entries[0].Index {
+		entries = entries[first-entries[0].Index:]
+	}
+
+	offset := entries[0].Index - ms.snapshot.Metadata.Index
+	switch {
+	case uint64(len(ms.ents)) > offset:
 		ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
+		ms.ents = append(ms.ents, entries...)
+	case uint64(len(ms.ents)) == offset:
+		ms.ents = append(ms.ents, entries...)
+	default:
+		log.Panicf("missing log entry [last: %d, append at: %d]",
+			ms.snapshot.Metadata.Index+uint64(len(ms.ents)), entries[0].Index)
 	}
-	ms.ents = append(ms.ents, entries...)
 }