浏览代码

etcdserver: keep a min number of entries in memory

Do not aggressively compact raft log entries. After a snapshot,
etcd server can compact the raft log upto snapshot index. etcd server
compacts to an index smaller than snapshot to keep some entries in memory.
The leader can still read out the in memory entries to send to a slightly
slow follower. If all the entries are compacted, the leader will send the
whole snapshot or read entries from disk if possible.
Xiang Li 11 年之前
父节点
当前提交
428b77afc3
共有 2 个文件被更改,包括 17 次插入2 次删除
  1. 9 0
      etcdserver/raft.go
  2. 8 2
      etcdserver/server.go

+ 9 - 0
etcdserver/raft.go

@@ -32,6 +32,15 @@ import (
 	"github.com/coreos/etcd/wal/walpb"
 )
 
+const (
+	// Number of entries for slow follower to catch-up after compacting
+	// the raft storage entries.
+	// We expect the follower has a millisecond level latency with the leader.
+	// The max throughput is around 10K. Keep a 5K entries is enough for helping
+	// follower to catch up.
+	numberOfCatchUpEntries = 5000
+)
+
 var (
 	// indirection for expvar func interface
 	// expvar panics when publishing duplicate name

+ 8 - 2
etcdserver/server.go

@@ -836,8 +836,14 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 		if err := s.r.storage.SaveSnap(snap); err != nil {
 			log.Fatalf("etcdserver: save snapshot error: %v", err)
 		}
+		log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
 
-		err = s.r.raftStorage.Compact(snapi)
+		// keep some in memory log entries for slow followers.
+		compacti := uint64(1)
+		if snapi > numberOfCatchUpEntries {
+			compacti = snapi - numberOfCatchUpEntries
+		}
+		err = s.r.raftStorage.Compact(compacti)
 		if err != nil {
 			// the compaction was done asynchronously with the progress of raft.
 			// raft log might already been compact.
@@ -846,7 +852,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 			}
 			log.Panicf("etcdserver: unexpected compaction error %v", err)
 		}
-		log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
+		log.Printf("etcdserver: compacted raft log at %d", compacti)
 	}()
 }