Browse Source

Merge pull request #10199 from tschottdorf/fix-max-uncommitted-size

raft: fix bug in unbounded log growth prevention mechanism
Tobias Schottdorf 7 years ago
parent
commit
b42b39446b
6 changed files with 65 additions and 20 deletions
  1. 1 1
      raft/node_test.go
  2. 27 9
      raft/raft.go
  3. 1 1
      raft/raft_paper_test.go
  4. 29 8
      raft/raft_test.go
  5. 1 1
      raft/rawnode_test.go
  6. 6 0
      raft/util.go

+ 1 - 1
raft/node_test.go

@@ -1006,7 +1006,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
 	const maxEntries = 16
 	data := []byte("testdata")
 	testEntry := raftpb.Entry{Data: data}
-	maxEntrySize := uint64(maxEntries * testEntry.Size())
+	maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
 
 	s := NewMemoryStorage()
 	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)

+ 27 - 9
raft/raft.go

@@ -635,17 +635,27 @@ func (r *raft) reset(term uint64) {
 	r.readOnly = newReadOnly(r.readOnly.option)
 }
 
-func (r *raft) appendEntry(es ...pb.Entry) {
+func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
 	li := r.raftLog.lastIndex()
 	for i := range es {
 		es[i].Term = r.Term
 		es[i].Index = li + 1 + uint64(i)
 	}
+	// Track the size of this uncommitted proposal.
+	if !r.increaseUncommittedSize(es) {
+		r.logger.Debugf(
+			"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
+			r.id,
+		)
+		// Drop the proposal.
+		return false
+	}
 	// use latest "last" index after truncate/append
 	li = r.raftLog.append(es...)
 	r.getProgress(r.id).maybeUpdate(li)
 	// Regardless of maybeCommit's return, our caller will call bcastAppend.
 	r.maybeCommit()
+	return true
 }
 
 // tickElection is run by followers and candidates after r.electionTimeout.
@@ -739,7 +749,16 @@ func (r *raft) becomeLeader() {
 	// could be expensive.
 	r.pendingConfIndex = r.raftLog.lastIndex()
 
-	r.appendEntry(pb.Entry{Data: nil})
+	emptyEnt := pb.Entry{Data: nil}
+	if !r.appendEntry(emptyEnt) {
+		// This won't happen because we just called reset() above.
+		r.logger.Panic("empty entry was dropped")
+	}
+	// As a special case, don't count the initial empty entry towards the
+	// uncommitted log quota. This is because we want to preserve the
+	// behavior of allowing one entry larger than quota if the current
+	// usage is zero.
+	r.reduceUncommittedSize([]pb.Entry{emptyEnt})
 	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
 }
 
@@ -970,10 +989,6 @@ func stepLeader(r *raft, m pb.Message) error {
 			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
 			return ErrProposalDropped
 		}
-		if !r.increaseUncommittedSize(m.Entries) {
-			r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
-			return ErrProposalDropped
-		}
 
 		for i, e := range m.Entries {
 			if e.Type == pb.EntryConfChange {
@@ -986,7 +1001,10 @@ func stepLeader(r *raft, m pb.Message) error {
 				}
 			}
 		}
-		r.appendEntry(m.Entries...)
+
+		if !r.appendEntry(m.Entries...) {
+			return ErrProposalDropped
+		}
 		r.bcastAppend()
 		return nil
 	case pb.MsgReadIndex:
@@ -1490,7 +1508,7 @@ func (r *raft) abortLeaderTransfer() {
 func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
 	var s uint64
 	for _, e := range ents {
-		s += uint64(e.Size())
+		s += uint64(PayloadSize(e))
 	}
 
 	if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
@@ -1513,7 +1531,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
 
 	var s uint64
 	for _, e := range ents {
-		s += uint64(e.Size())
+		s += uint64(PayloadSize(e))
 	}
 	if s > r.uncommittedSize {
 		// uncommittedSize may underestimate the size of the uncommitted Raft

+ 1 - 1
raft/raft_paper_test.go

@@ -114,7 +114,7 @@ func TestLeaderBcastBeat(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeLeader()
 	for i := 0; i < 10; i++ {
-		r.appendEntry(pb.Entry{Index: uint64(i) + 1})
+		mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
 	}
 
 	for i := 0; i < hi; i++ {

+ 29 - 8
raft/raft_test.go

@@ -37,6 +37,12 @@ func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
 	return ents
 }
 
+func mustAppendEntry(r *raft, ents ...pb.Entry) {
+	if !r.appendEntry(ents...) {
+		panic("entry unexpectedly dropped")
+	}
+}
+
 type stateMachine interface {
 	Step(m pb.Message) error
 	readMessages() []pb.Message
@@ -363,15 +369,24 @@ func TestProgressFlowControl(t *testing.T) {
 }
 
 func TestUncommittedEntryLimit(t *testing.T) {
-	const maxEntries = 16
+	// Use a relatively large number of entries here to prevent regression of a
+	// bug which computed the size before it was fixed. This test would fail
+	// with the bug, either because we'd get dropped proposals earlier than we
+	// expect them, or because the final tally ends up nonzero. (At the time of
+	// writing, the former).
+	const maxEntries = 1024
 	testEntry := pb.Entry{Data: []byte("testdata")}
-	maxEntrySize := maxEntries * testEntry.Size()
+	maxEntrySize := maxEntries * PayloadSize(testEntry)
 
 	cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
 	cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
+	cfg.MaxInflightMsgs = 2 * 1024 // avoid interference
 	r := newRaft(cfg)
 	r.becomeCandidate()
 	r.becomeLeader()
+	if n := r.uncommittedSize; n != 0 {
+		t.Fatalf("expected zero uncommitted size, got %d bytes", n)
+	}
 
 	// Set the two followers to the replicate state. Commit to tail of log.
 	const numFollowers = 2
@@ -401,6 +416,9 @@ func TestUncommittedEntryLimit(t *testing.T) {
 		t.Fatalf("expected %d messages, got %d", e, len(ms))
 	}
 	r.reduceUncommittedSize(propEnts)
+	if r.uncommittedSize != 0 {
+		t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize)
+	}
 
 	// Send a single large proposal to r1. Should be accepted even though it
 	// pushes us above the limit because we were beneath it before the proposal.
@@ -425,6 +443,9 @@ func TestUncommittedEntryLimit(t *testing.T) {
 		t.Fatalf("expected %d messages, got %d", e, len(ms))
 	}
 	r.reduceUncommittedSize(propEnts)
+	if n := r.uncommittedSize; n != 0 {
+		t.Fatalf("expected zero uncommitted size, got %d", n)
+	}
 }
 
 func TestLeaderElection(t *testing.T) {
@@ -2585,7 +2606,7 @@ func TestBcastBeat(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	for i := 0; i < 10; i++ {
-		sm.appendEntry(pb.Entry{Index: uint64(i) + 1})
+		mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
 	}
 	// slow follower
 	sm.prs[2].Match, sm.prs[2].Next = 5, 6
@@ -2709,7 +2730,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 			// we expect that raft will only send out one msgAPP on the first
 			// loop. After that, the follower is paused until a heartbeat response is
 			// received.
-			r.appendEntry(pb.Entry{Data: []byte("somedata")})
+			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 			r.sendAppend(2)
 			msg := r.readMessages()
 			if len(msg) != 1 {
@@ -2724,7 +2745,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 			t.Errorf("paused = %v, want true", r.prs[2].Paused)
 		}
 		for j := 0; j < 10; j++ {
-			r.appendEntry(pb.Entry{Data: []byte("somedata")})
+			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 			r.sendAppend(2)
 			if l := len(r.readMessages()); l != 0 {
 				t.Errorf("len(msg) = %d, want %d", l, 0)
@@ -2771,7 +2792,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
 	r.prs[2].becomeReplicate()
 
 	for i := 0; i < 10; i++ {
-		r.appendEntry(pb.Entry{Data: []byte("somedata")})
+		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 		r.sendAppend(2)
 		msgs := r.readMessages()
 		if len(msgs) != 1 {
@@ -2788,7 +2809,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
 	r.prs[2].becomeSnapshot(10)
 
 	for i := 0; i < 10; i++ {
-		r.appendEntry(pb.Entry{Data: []byte("somedata")})
+		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 		r.sendAppend(2)
 		msgs := r.readMessages()
 		if len(msgs) != 0 {
@@ -3182,7 +3203,7 @@ func TestNewLeaderPendingConfig(t *testing.T) {
 	for i, tt := range tests {
 		r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 		if tt.addEntry {
-			r.appendEntry(pb.Entry{Type: pb.EntryNormal})
+			mustAppendEntry(r, pb.Entry{Type: pb.EntryNormal})
 		}
 		r.becomeCandidate()
 		r.becomeLeader()

+ 1 - 1
raft/rawnode_test.go

@@ -493,7 +493,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
 	const maxEntries = 16
 	data := []byte("testdata")
 	testEntry := raftpb.Entry{Data: data}
-	maxEntrySize := uint64(maxEntries * testEntry.Size())
+	maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
 
 	s := NewMemoryStorage()
 	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)

+ 6 - 0
raft/util.go

@@ -101,6 +101,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
 	return buf.String()
 }
 
+// PayloadSize is the size of the payload of this Entry. Notably, it does not
+// depend on its Index or Term.
+func PayloadSize(e pb.Entry) int {
+	return len(e.Data)
+}
+
 // DescribeEntry returns a concise human-readable description of an
 // Entry for debugging.
 func DescribeEntry(e pb.Entry, f EntryFormatter) string {