Selaa lähdekoodia

raft: fix bug in unbounded log growth prevention mechanism

The previous code was using the proto-generated `Size()` method to
track the size of an incoming proposal at the leader. This includes
the Index and Term, which were mutated after the call to `Size()`
when appending to the log. Additionally, it was not taking into
account that an ignored configuration change would ignore the
original proposal and append an empty entry instead.

As a result, a fully committed Raft group could end up with a non-
zero tracked uncommitted Raft log counter that would eventually hit
the ceiling and drop all future proposals indiscriminately. It would
also immediately imply that proposals exceeding the threshold alone
would get refused (as the "first uncommitted proposal" gets special
treatment and is always allowed in).

Track only the size of the payload actually appended to the Raft log
instead.

For context, see:
https://github.com/cockroachdb/cockroach/issues/31618#issuecomment-431374938
Tobias Schottdorf 7 vuotta sitten
vanhempi
commit
ad49c8fd98
6 muutettua tiedostoa jossa 65 lisäystä ja 20 poistoa
  1. 1 1
      raft/node_test.go
  2. 27 9
      raft/raft.go
  3. 1 1
      raft/raft_paper_test.go
  4. 29 8
      raft/raft_test.go
  5. 1 1
      raft/rawnode_test.go
  6. 6 0
      raft/util.go

+ 1 - 1
raft/node_test.go

@@ -1006,7 +1006,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
 	const maxEntries = 16
 	const maxEntries = 16
 	data := []byte("testdata")
 	data := []byte("testdata")
 	testEntry := raftpb.Entry{Data: data}
 	testEntry := raftpb.Entry{Data: data}
-	maxEntrySize := uint64(maxEntries * testEntry.Size())
+	maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
 
 
 	s := NewMemoryStorage()
 	s := NewMemoryStorage()
 	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
 	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)

+ 27 - 9
raft/raft.go

@@ -635,17 +635,27 @@ func (r *raft) reset(term uint64) {
 	r.readOnly = newReadOnly(r.readOnly.option)
 	r.readOnly = newReadOnly(r.readOnly.option)
 }
 }
 
 
-func (r *raft) appendEntry(es ...pb.Entry) {
+func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
 	li := r.raftLog.lastIndex()
 	li := r.raftLog.lastIndex()
 	for i := range es {
 	for i := range es {
 		es[i].Term = r.Term
 		es[i].Term = r.Term
 		es[i].Index = li + 1 + uint64(i)
 		es[i].Index = li + 1 + uint64(i)
 	}
 	}
+	// Track the size of this uncommitted proposal.
+	if !r.increaseUncommittedSize(es) {
+		r.logger.Debugf(
+			"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
+			r.id,
+		)
+		// Drop the proposal.
+		return false
+	}
 	// use latest "last" index after truncate/append
 	// use latest "last" index after truncate/append
 	li = r.raftLog.append(es...)
 	li = r.raftLog.append(es...)
 	r.getProgress(r.id).maybeUpdate(li)
 	r.getProgress(r.id).maybeUpdate(li)
 	// Regardless of maybeCommit's return, our caller will call bcastAppend.
 	// Regardless of maybeCommit's return, our caller will call bcastAppend.
 	r.maybeCommit()
 	r.maybeCommit()
+	return true
 }
 }
 
 
 // tickElection is run by followers and candidates after r.electionTimeout.
 // tickElection is run by followers and candidates after r.electionTimeout.
@@ -739,7 +749,16 @@ func (r *raft) becomeLeader() {
 	// could be expensive.
 	// could be expensive.
 	r.pendingConfIndex = r.raftLog.lastIndex()
 	r.pendingConfIndex = r.raftLog.lastIndex()
 
 
-	r.appendEntry(pb.Entry{Data: nil})
+	emptyEnt := pb.Entry{Data: nil}
+	if !r.appendEntry(emptyEnt) {
+		// This won't happen because we just called reset() above.
+		r.logger.Panic("empty entry was dropped")
+	}
+	// As a special case, don't count the initial empty entry towards the
+	// uncommitted log quota. This is because we want to preserve the
+	// behavior of allowing one entry larger than quota if the current
+	// usage is zero.
+	r.reduceUncommittedSize([]pb.Entry{emptyEnt})
 	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
 	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
 }
 }
 
 
@@ -970,10 +989,6 @@ func stepLeader(r *raft, m pb.Message) error {
 			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
 			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
 			return ErrProposalDropped
 			return ErrProposalDropped
 		}
 		}
-		if !r.increaseUncommittedSize(m.Entries) {
-			r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
-			return ErrProposalDropped
-		}
 
 
 		for i, e := range m.Entries {
 		for i, e := range m.Entries {
 			if e.Type == pb.EntryConfChange {
 			if e.Type == pb.EntryConfChange {
@@ -986,7 +1001,10 @@ func stepLeader(r *raft, m pb.Message) error {
 				}
 				}
 			}
 			}
 		}
 		}
-		r.appendEntry(m.Entries...)
+
+		if !r.appendEntry(m.Entries...) {
+			return ErrProposalDropped
+		}
 		r.bcastAppend()
 		r.bcastAppend()
 		return nil
 		return nil
 	case pb.MsgReadIndex:
 	case pb.MsgReadIndex:
@@ -1490,7 +1508,7 @@ func (r *raft) abortLeaderTransfer() {
 func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
 func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
 	var s uint64
 	var s uint64
 	for _, e := range ents {
 	for _, e := range ents {
-		s += uint64(e.Size())
+		s += uint64(PayloadSize(e))
 	}
 	}
 
 
 	if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
 	if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
@@ -1513,7 +1531,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
 
 
 	var s uint64
 	var s uint64
 	for _, e := range ents {
 	for _, e := range ents {
-		s += uint64(e.Size())
+		s += uint64(PayloadSize(e))
 	}
 	}
 	if s > r.uncommittedSize {
 	if s > r.uncommittedSize {
 		// uncommittedSize may underestimate the size of the uncommitted Raft
 		// uncommittedSize may underestimate the size of the uncommitted Raft

+ 1 - 1
raft/raft_paper_test.go

@@ -114,7 +114,7 @@ func TestLeaderBcastBeat(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
-		r.appendEntry(pb.Entry{Index: uint64(i) + 1})
+		mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
 	}
 	}
 
 
 	for i := 0; i < hi; i++ {
 	for i := 0; i < hi; i++ {

+ 29 - 8
raft/raft_test.go

@@ -37,6 +37,12 @@ func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
 	return ents
 	return ents
 }
 }
 
 
+func mustAppendEntry(r *raft, ents ...pb.Entry) {
+	if !r.appendEntry(ents...) {
+		panic("entry unexpectedly dropped")
+	}
+}
+
 type stateMachine interface {
 type stateMachine interface {
 	Step(m pb.Message) error
 	Step(m pb.Message) error
 	readMessages() []pb.Message
 	readMessages() []pb.Message
@@ -363,15 +369,24 @@ func TestProgressFlowControl(t *testing.T) {
 }
 }
 
 
 func TestUncommittedEntryLimit(t *testing.T) {
 func TestUncommittedEntryLimit(t *testing.T) {
-	const maxEntries = 16
+	// Use a relatively large number of entries here to prevent regression of a
+	// bug which computed the size before it was fixed. This test would fail
+	// with the bug, either because we'd get dropped proposals earlier than we
+	// expect them, or because the final tally ends up nonzero. (At the time of
+	// writing, the former).
+	const maxEntries = 1024
 	testEntry := pb.Entry{Data: []byte("testdata")}
 	testEntry := pb.Entry{Data: []byte("testdata")}
-	maxEntrySize := maxEntries * testEntry.Size()
+	maxEntrySize := maxEntries * PayloadSize(testEntry)
 
 
 	cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
 	cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
 	cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
 	cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
+	cfg.MaxInflightMsgs = 2 * 1024 // avoid interference
 	r := newRaft(cfg)
 	r := newRaft(cfg)
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
+	if n := r.uncommittedSize; n != 0 {
+		t.Fatalf("expected zero uncommitted size, got %d bytes", n)
+	}
 
 
 	// Set the two followers to the replicate state. Commit to tail of log.
 	// Set the two followers to the replicate state. Commit to tail of log.
 	const numFollowers = 2
 	const numFollowers = 2
@@ -401,6 +416,9 @@ func TestUncommittedEntryLimit(t *testing.T) {
 		t.Fatalf("expected %d messages, got %d", e, len(ms))
 		t.Fatalf("expected %d messages, got %d", e, len(ms))
 	}
 	}
 	r.reduceUncommittedSize(propEnts)
 	r.reduceUncommittedSize(propEnts)
+	if r.uncommittedSize != 0 {
+		t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize)
+	}
 
 
 	// Send a single large proposal to r1. Should be accepted even though it
 	// Send a single large proposal to r1. Should be accepted even though it
 	// pushes us above the limit because we were beneath it before the proposal.
 	// pushes us above the limit because we were beneath it before the proposal.
@@ -425,6 +443,9 @@ func TestUncommittedEntryLimit(t *testing.T) {
 		t.Fatalf("expected %d messages, got %d", e, len(ms))
 		t.Fatalf("expected %d messages, got %d", e, len(ms))
 	}
 	}
 	r.reduceUncommittedSize(propEnts)
 	r.reduceUncommittedSize(propEnts)
+	if n := r.uncommittedSize; n != 0 {
+		t.Fatalf("expected zero uncommitted size, got %d", n)
+	}
 }
 }
 
 
 func TestLeaderElection(t *testing.T) {
 func TestLeaderElection(t *testing.T) {
@@ -2585,7 +2606,7 @@ func TestBcastBeat(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.becomeLeader()
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
-		sm.appendEntry(pb.Entry{Index: uint64(i) + 1})
+		mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
 	}
 	}
 	// slow follower
 	// slow follower
 	sm.prs[2].Match, sm.prs[2].Next = 5, 6
 	sm.prs[2].Match, sm.prs[2].Next = 5, 6
@@ -2709,7 +2730,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 			// we expect that raft will only send out one msgAPP on the first
 			// we expect that raft will only send out one msgAPP on the first
 			// loop. After that, the follower is paused until a heartbeat response is
 			// loop. After that, the follower is paused until a heartbeat response is
 			// received.
 			// received.
-			r.appendEntry(pb.Entry{Data: []byte("somedata")})
+			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 			r.sendAppend(2)
 			r.sendAppend(2)
 			msg := r.readMessages()
 			msg := r.readMessages()
 			if len(msg) != 1 {
 			if len(msg) != 1 {
@@ -2724,7 +2745,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 			t.Errorf("paused = %v, want true", r.prs[2].Paused)
 			t.Errorf("paused = %v, want true", r.prs[2].Paused)
 		}
 		}
 		for j := 0; j < 10; j++ {
 		for j := 0; j < 10; j++ {
-			r.appendEntry(pb.Entry{Data: []byte("somedata")})
+			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 			r.sendAppend(2)
 			r.sendAppend(2)
 			if l := len(r.readMessages()); l != 0 {
 			if l := len(r.readMessages()); l != 0 {
 				t.Errorf("len(msg) = %d, want %d", l, 0)
 				t.Errorf("len(msg) = %d, want %d", l, 0)
@@ -2771,7 +2792,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
 	r.prs[2].becomeReplicate()
 	r.prs[2].becomeReplicate()
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
-		r.appendEntry(pb.Entry{Data: []byte("somedata")})
+		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 		r.sendAppend(2)
 		r.sendAppend(2)
 		msgs := r.readMessages()
 		msgs := r.readMessages()
 		if len(msgs) != 1 {
 		if len(msgs) != 1 {
@@ -2788,7 +2809,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
 	r.prs[2].becomeSnapshot(10)
 	r.prs[2].becomeSnapshot(10)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
-		r.appendEntry(pb.Entry{Data: []byte("somedata")})
+		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 		r.sendAppend(2)
 		r.sendAppend(2)
 		msgs := r.readMessages()
 		msgs := r.readMessages()
 		if len(msgs) != 0 {
 		if len(msgs) != 0 {
@@ -3182,7 +3203,7 @@ func TestNewLeaderPendingConfig(t *testing.T) {
 	for i, tt := range tests {
 	for i, tt := range tests {
 		r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 		r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 		if tt.addEntry {
 		if tt.addEntry {
-			r.appendEntry(pb.Entry{Type: pb.EntryNormal})
+			mustAppendEntry(r, pb.Entry{Type: pb.EntryNormal})
 		}
 		}
 		r.becomeCandidate()
 		r.becomeCandidate()
 		r.becomeLeader()
 		r.becomeLeader()

+ 1 - 1
raft/rawnode_test.go

@@ -493,7 +493,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
 	const maxEntries = 16
 	const maxEntries = 16
 	data := []byte("testdata")
 	data := []byte("testdata")
 	testEntry := raftpb.Entry{Data: data}
 	testEntry := raftpb.Entry{Data: data}
-	maxEntrySize := uint64(maxEntries * testEntry.Size())
+	maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
 
 
 	s := NewMemoryStorage()
 	s := NewMemoryStorage()
 	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
 	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)

+ 6 - 0
raft/util.go

@@ -101,6 +101,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
 	return buf.String()
 	return buf.String()
 }
 }
 
 
+// PayloadSize is the size of the payload of this Entry. Notably, it does not
+// depend on its Index or Term.
+func PayloadSize(e pb.Entry) int {
+	return len(e.Data)
+}
+
 // DescribeEntry returns a concise human-readable description of an
 // DescribeEntry returns a concise human-readable description of an
 // Entry for debugging.
 // Entry for debugging.
 func DescribeEntry(e pb.Entry, f EntryFormatter) string {
 func DescribeEntry(e pb.Entry, f EntryFormatter) string {