Browse Source

raft: Introduce CommittedEntries pagination

The MaxSizePerMsg setting is now used to limit the size of
Ready.CommittedEntries. This prevents out-of-memory errors if the raft
log has become very large and commits all at once.
Ben Darnell 7 years ago
parent
commit
0a670b7c9b
4 changed files with 76 additions and 6 deletions
  1. 15 5
      raft/log.go
  2. 9 0
      raft/node.go
  3. 51 0
      raft/node_test.go
  4. 1 1
      raft/raft.go

+ 15 - 5
raft/log.go

@@ -38,17 +38,27 @@ type raftLog struct {
 	applied uint64
 
 	logger Logger
+
+	maxMsgSize uint64
 }
 
-// newLog returns log using the given storage. It recovers the log to the state
-// that it just commits and applies the latest snapshot.
+// newLog returns log using the given storage and default options. It
+// recovers the log to the state that it just commits and applies the
+// latest snapshot.
 func newLog(storage Storage, logger Logger) *raftLog {
+	return newLogWithSize(storage, logger, noLimit)
+}
+
+// newLogWithSize returns a log using the given storage and max
+// message size.
+func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
 	if storage == nil {
 		log.Panic("storage must not be nil")
 	}
 	log := &raftLog{
-		storage: storage,
-		logger:  logger,
+		storage:    storage,
+		logger:     logger,
+		maxMsgSize: maxMsgSize,
 	}
 	firstIndex, err := storage.FirstIndex()
 	if err != nil {
@@ -139,7 +149,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
 func (l *raftLog) nextEnts() (ents []pb.Entry) {
 	off := max(l.applied+1, l.firstIndex())
 	if l.committed+1 > off {
-		ents, err := l.slice(off, l.committed+1, noLimit)
+		ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
 		if err != nil {
 			l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
 		}

+ 9 - 0
raft/node.go

@@ -559,6 +559,15 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	}
 	if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
 		rd.HardState = hardSt
+		// If we hit a size limit when loadaing CommittedEntries, clamp
+		// our HardState.Commit to what we're actually returning. This is
+		// also used as our cursor to resume for the next Ready batch.
+		if len(rd.CommittedEntries) > 0 {
+			lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
+			if rd.HardState.Commit > lastCommit.Index {
+				rd.HardState.Commit = lastCommit.Index
+			}
+		}
 	}
 	if r.raftLog.unstable.snapshot != nil {
 		rd.Snapshot = *r.raftLog.unstable.snapshot

+ 51 - 0
raft/node_test.go

@@ -26,6 +26,19 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
+// readyWithTimeout selects from n.Ready() with a 1-second timeout. It
+// panics on timeout, which is better than the indefinite wait that
+// would occur if this channel were read without being wrapped in a
+// select.
+func readyWithTimeout(n Node) Ready {
+	select {
+	case rd := <-n.Ready():
+		return rd
+	case <-time.After(time.Second):
+		panic("timed out waiting for ready")
+	}
+}
+
 // TestNodeStep ensures that node.Step sends msgProp to propc chan
 // and other kinds of messages to recvc chan.
 func TestNodeStep(t *testing.T) {
@@ -875,3 +888,41 @@ func TestAppendPagination(t *testing.T) {
 		t.Error("didn't see any messages more than half the max size; something is wrong with this test")
 	}
 }
+
+func TestCommitPagination(t *testing.T) {
+	s := NewMemoryStorage()
+	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
+	cfg.MaxSizePerMsg = 2048
+	r := newRaft(cfg)
+	n := newNode()
+	go n.run(r)
+	n.Campaign(context.TODO())
+
+	rd := readyWithTimeout(&n)
+	if len(rd.CommittedEntries) != 1 {
+		t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
+	}
+	s.Append(rd.Entries)
+	n.Advance()
+
+	blob := []byte(strings.Repeat("a", 1000))
+	for i := 0; i < 3; i++ {
+		if err := n.Propose(context.TODO(), blob); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// The 3 proposals will commit in two batches.
+	rd = readyWithTimeout(&n)
+	if len(rd.CommittedEntries) != 2 {
+		t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
+	}
+	s.Append(rd.Entries)
+	n.Advance()
+	rd = readyWithTimeout(&n)
+	if len(rd.CommittedEntries) != 1 {
+		t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
+	}
+	s.Append(rd.Entries)
+	n.Advance()
+}

+ 1 - 1
raft/raft.go

@@ -301,7 +301,7 @@ func newRaft(c *Config) *raft {
 	if err := c.validate(); err != nil {
 		panic(err.Error())
 	}
-	raftlog := newLog(c.Storage, c.Logger)
+	raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg)
 	hs, cs, err := c.Storage.InitialState()
 	if err != nil {
 		panic(err) // TODO(bdarnell)