Browse Source

Merge pull request #10167 from nvanbenschoten/nvanbenschoten/limitUncommitted

raft: provide protection against unbounded Raft log growth
Xiang Li 7 years ago
parent
commit
dac8c6fcc0
10 changed files with 281 additions and 32 deletions
  1. 7 6
      contrib/raftexample/raft.go
  2. 1 0
      raft/README.md
  3. 3 3
      raft/doc.go
  4. 1 0
      raft/node.go
  5. 54 0
      raft/node_test.go
  6. 74 11
      raft/raft.go
  7. 65 0
      raft/raft_test.go
  8. 14 12
      raft/rafttest/node.go
  9. 1 0
      raft/rawnode.go
  10. 61 0
      raft/rawnode_test.go

+ 7 - 6
contrib/raftexample/raft.go

@@ -274,12 +274,13 @@ func (rc *raftNode) startRaft() {
 		rpeers[i] = raft.Peer{ID: uint64(i + 1)}
 		rpeers[i] = raft.Peer{ID: uint64(i + 1)}
 	}
 	}
 	c := &raft.Config{
 	c := &raft.Config{
-		ID:              uint64(rc.id),
-		ElectionTick:    10,
-		HeartbeatTick:   1,
-		Storage:         rc.raftStorage,
-		MaxSizePerMsg:   1024 * 1024,
-		MaxInflightMsgs: 256,
+		ID:                        uint64(rc.id),
+		ElectionTick:              10,
+		HeartbeatTick:             1,
+		Storage:                   rc.raftStorage,
+		MaxSizePerMsg:             1024 * 1024,
+		MaxInflightMsgs:           256,
+		MaxUncommittedEntriesSize: 1 << 30,
 	}
 	}
 
 
 	if oldwal {
 	if oldwal {

+ 1 - 0
raft/README.md

@@ -41,6 +41,7 @@ This raft implementation also includes a few optional enhancements:
 - Writing to leader's disk in parallel
 - Writing to leader's disk in parallel
 - Internal proposal redirection from followers to leader
 - Internal proposal redirection from followers to leader
 - Automatic stepping down when the leader loses quorum
 - Automatic stepping down when the leader loses quorum
+- Protection against unbounded log growth when quorum is lost
 
 
 ## Notable Users
 ## Notable Users
 
 

+ 3 - 3
raft/doc.go

@@ -87,7 +87,7 @@ large).
 
 
 Note: Marshalling messages is not thread-safe; it is important that you
 Note: Marshalling messages is not thread-safe; it is important that you
 make sure that no new entries are persisted while marshalling.
 make sure that no new entries are persisted while marshalling.
-The easiest way to achieve this is to serialise the messages directly inside
+The easiest way to achieve this is to serialize the messages directly inside
 your main raft loop.
 your main raft loop.
 
 
 3. Apply Snapshot (if any) and CommittedEntries to the state machine.
 3. Apply Snapshot (if any) and CommittedEntries to the state machine.
@@ -153,7 +153,7 @@ If the proposal is committed, data will appear in committed entries with type
 raftpb.EntryNormal. There is no guarantee that a proposed command will be
 raftpb.EntryNormal. There is no guarantee that a proposed command will be
 committed; you may have to re-propose after a timeout.
 committed; you may have to re-propose after a timeout.
 
 
-To add or remove node in a cluster, build ConfChange struct 'cc' and call:
+To add or remove a node in a cluster, build ConfChange struct 'cc' and call:
 
 
 	n.ProposeConfChange(ctx, cc)
 	n.ProposeConfChange(ctx, cc)
 
 
@@ -260,7 +260,7 @@ stale log entries:
 	'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election
 	'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election
 	protocol. When Config.PreVote is true, a pre-election is carried out first
 	protocol. When Config.PreVote is true, a pre-election is carried out first
 	(using the same rules as a regular election), and no node increases its term
 	(using the same rules as a regular election), and no node increases its term
-	number unless the pre-election indicates that the campaigining node would win.
+	number unless the pre-election indicates that the campaigning node would win.
 	This minimizes disruption when a partitioned node rejoins the cluster.
 	This minimizes disruption when a partitioned node rejoins the cluster.
 
 
 	'MsgSnap' requests to install a snapshot message. When a node has just
 	'MsgSnap' requests to install a snapshot message. When a node has just

+ 1 - 0
raft/node.go

@@ -401,6 +401,7 @@ func (n *node) run(r *raft) {
 
 
 			r.msgs = nil
 			r.msgs = nil
 			r.readStates = nil
 			r.readStates = nil
+			r.reduceUncommittedSize(rd.CommittedEntries)
 			advancec = n.advancec
 			advancec = n.advancec
 		case <-advancec:
 		case <-advancec:
 			if applyingToI != 0 {
 			if applyingToI != 0 {

+ 54 - 0
raft/node_test.go

@@ -997,3 +997,57 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
 		)
 		)
 	}
 	}
 }
 }
+
+// TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
+// partitioned from a quorum of nodes. It verifies that the leader's log is
+// protected from unbounded growth even as new entries continue to be proposed.
+// This protection is provided by the MaxUncommittedEntriesSize configuration.
+func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
+	const maxEntries = 16
+	data := []byte("testdata")
+	testEntry := raftpb.Entry{Data: data}
+	maxEntrySize := uint64(maxEntries * testEntry.Size())
+
+	s := NewMemoryStorage()
+	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
+	cfg.MaxUncommittedEntriesSize = maxEntrySize
+	r := newRaft(cfg)
+	n := newNode()
+	go n.run(r)
+	defer n.Stop()
+	n.Campaign(context.TODO())
+
+	rd := readyWithTimeout(&n)
+	if len(rd.CommittedEntries) != 1 {
+		t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
+	}
+	s.Append(rd.Entries)
+	n.Advance()
+
+	// Simulate a network partition while we make our proposals by never
+	// committing anything. These proposals should not cause the leader's
+	// log to grow indefinitely.
+	for i := 0; i < 1024; i++ {
+		n.Propose(context.TODO(), data)
+	}
+
+	// Check the size of leader's uncommitted log tail. It should not exceed the
+	// MaxUncommittedEntriesSize limit.
+	checkUncommitted := func(exp uint64) {
+		t.Helper()
+		if a := r.uncommittedSize; exp != a {
+			t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
+		}
+	}
+	checkUncommitted(maxEntrySize)
+
+	// Recover from the partition. The uncommitted tail of the Raft log should
+	// disappear as entries are committed.
+	rd = readyWithTimeout(&n)
+	if len(rd.CommittedEntries) != maxEntries {
+		t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
+	}
+	s.Append(rd.Entries)
+	n.Advance()
+	checkUncommitted(0)
+}

+ 74 - 11
raft/raft.go

@@ -148,12 +148,17 @@ type Config struct {
 	// applied entries. This is a very application dependent configuration.
 	// applied entries. This is a very application dependent configuration.
 	Applied uint64
 	Applied uint64
 
 
-	// MaxSizePerMsg limits the max size of each append message. Smaller value
-	// lowers the raft recovery cost(initial probing and message lost during normal
-	// operation). On the other side, it might affect the throughput during normal
-	// replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per
-	// message.
+	// MaxSizePerMsg limits the max byte size of each append message. Smaller
+	// value lowers the raft recovery cost(initial probing and message lost
+	// during normal operation). On the other side, it might affect the
+	// throughput during normal replication. Note: math.MaxUint64 for unlimited,
+	// 0 for at most one entry per message.
 	MaxSizePerMsg uint64
 	MaxSizePerMsg uint64
+	// MaxUncommittedEntriesSize limits the aggregate byte size of the
+	// uncommitted entries that may be appended to a leader's log. Once this
+	// limit is exceeded, proposals will begin to return ErrProposalDropped
+	// errors. Note: 0 for no limit.
+	MaxUncommittedEntriesSize uint64
 	// MaxInflightMsgs limits the max number of in-flight append messages during
 	// MaxInflightMsgs limits the max number of in-flight append messages during
 	// optimistic replication phase. The application transportation layer usually
 	// optimistic replication phase. The application transportation layer usually
 	// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
 	// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
@@ -215,6 +220,10 @@ func (c *Config) validate() error {
 		return errors.New("storage cannot be nil")
 		return errors.New("storage cannot be nil")
 	}
 	}
 
 
+	if c.MaxUncommittedEntriesSize == 0 {
+		c.MaxUncommittedEntriesSize = noLimit
+	}
+
 	if c.MaxInflightMsgs <= 0 {
 	if c.MaxInflightMsgs <= 0 {
 		return errors.New("max inflight messages must be greater than 0")
 		return errors.New("max inflight messages must be greater than 0")
 	}
 	}
@@ -241,11 +250,12 @@ type raft struct {
 	// the log
 	// the log
 	raftLog *raftLog
 	raftLog *raftLog
 
 
-	maxInflight int
-	maxMsgSize  uint64
-	prs         map[uint64]*Progress
-	learnerPrs  map[uint64]*Progress
-	matchBuf    uint64Slice
+	maxMsgSize         uint64
+	maxUncommittedSize uint64
+	maxInflight        int
+	prs                map[uint64]*Progress
+	learnerPrs         map[uint64]*Progress
+	matchBuf           uint64Slice
 
 
 	state StateType
 	state StateType
 
 
@@ -268,6 +278,10 @@ type raft struct {
 	// be proposed if the leader's applied index is greater than this
 	// be proposed if the leader's applied index is greater than this
 	// value.
 	// value.
 	pendingConfIndex uint64
 	pendingConfIndex uint64
+	// an estimate of the size of the uncommitted tail of the Raft log. Used to
+	// prevent unbounded log growth. Only maintained by the leader. Reset on
+	// term changes.
+	uncommittedSize uint64
 
 
 	readOnly *readOnly
 	readOnly *readOnly
 
 
@@ -326,6 +340,7 @@ func newRaft(c *Config) *raft {
 		raftLog:                   raftlog,
 		raftLog:                   raftlog,
 		maxMsgSize:                c.MaxSizePerMsg,
 		maxMsgSize:                c.MaxSizePerMsg,
 		maxInflight:               c.MaxInflightMsgs,
 		maxInflight:               c.MaxInflightMsgs,
+		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
 		prs:                       make(map[uint64]*Progress),
 		prs:                       make(map[uint64]*Progress),
 		learnerPrs:                make(map[uint64]*Progress),
 		learnerPrs:                make(map[uint64]*Progress),
 		electionTimeout:           c.ElectionTick,
 		electionTimeout:           c.ElectionTick,
@@ -514,7 +529,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
 	return true
 	return true
 }
 }
 
 
-// sendHeartbeat sends an empty MsgApp
+// sendHeartbeat sends a heartbeat RPC to the given peer.
 func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
 func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
 	// Attach the commit as min(to.matched, r.committed).
 	// Attach the commit as min(to.matched, r.committed).
 	// When the leader sends out heartbeat message,
 	// When the leader sends out heartbeat message,
@@ -616,6 +631,7 @@ func (r *raft) reset(term uint64) {
 	})
 	})
 
 
 	r.pendingConfIndex = 0
 	r.pendingConfIndex = 0
+	r.uncommittedSize = 0
 	r.readOnly = newReadOnly(r.readOnly.option)
 	r.readOnly = newReadOnly(r.readOnly.option)
 }
 }
 
 
@@ -954,6 +970,10 @@ func stepLeader(r *raft, m pb.Message) error {
 			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
 			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
 			return ErrProposalDropped
 			return ErrProposalDropped
 		}
 		}
+		if !r.increaseUncommittedSize(m.Entries) {
+			r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
+			return ErrProposalDropped
+		}
 
 
 		for i, e := range m.Entries {
 		for i, e := range m.Entries {
 			if e.Type == pb.EntryConfChange {
 			if e.Type == pb.EntryConfChange {
@@ -1462,6 +1482,49 @@ func (r *raft) abortLeaderTransfer() {
 	r.leadTransferee = None
 	r.leadTransferee = None
 }
 }
 
 
+// increaseUncommittedSize computes the size of the proposed entries and
+// determines whether they would push leader over its maxUncommittedSize limit.
+// If the new entries would exceed the limit, the method returns false. If not,
+// the increase in uncommitted entry size is recorded and the method returns
+// true.
+func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
+	var s uint64
+	for _, e := range ents {
+		s += uint64(e.Size())
+	}
+
+	if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
+		// If the uncommitted tail of the Raft log is empty, allow any size
+		// proposal. Otherwise, limit the size of the uncommitted tail of the
+		// log and drop any proposal that would push the size over the limit.
+		return false
+	}
+	r.uncommittedSize += s
+	return true
+}
+
+// reduceUncommittedSize accounts for the newly committed entries by decreasing
+// the uncommitted entry size limit.
+func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
+	if r.uncommittedSize == 0 {
+		// Fast-path for followers, who do not track or enforce the limit.
+		return
+	}
+
+	var s uint64
+	for _, e := range ents {
+		s += uint64(e.Size())
+	}
+	if s > r.uncommittedSize {
+		// uncommittedSize may underestimate the size of the uncommitted Raft
+		// log tail but will never overestimate it. Saturate at 0 instead of
+		// allowing overflow.
+		r.uncommittedSize = 0
+	} else {
+		r.uncommittedSize -= s
+	}
+}
+
 func numOfPendingConf(ents []pb.Entry) int {
 func numOfPendingConf(ents []pb.Entry) int {
 	n := 0
 	n := 0
 	for i := range ents {
 	for i := range ents {

+ 65 - 0
raft/raft_test.go

@@ -362,6 +362,71 @@ func TestProgressFlowControl(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestUncommittedEntryLimit(t *testing.T) {
+	const maxEntries = 16
+	testEntry := pb.Entry{Data: []byte("testdata")}
+	maxEntrySize := maxEntries * testEntry.Size()
+
+	cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
+	cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
+	r := newRaft(cfg)
+	r.becomeCandidate()
+	r.becomeLeader()
+
+	// Set the two followers to the replicate state. Commit to tail of log.
+	const numFollowers = 2
+	r.prs[2].becomeReplicate()
+	r.prs[3].becomeReplicate()
+	r.uncommittedSize = 0
+
+	// Send proposals to r1. The first 5 entries should be appended to the log.
+	propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{testEntry}}
+	propEnts := make([]pb.Entry, maxEntries)
+	for i := 0; i < maxEntries; i++ {
+		if err := r.Step(propMsg); err != nil {
+			t.Fatalf("proposal resulted in error: %v", err)
+		}
+		propEnts[i] = testEntry
+	}
+
+	// Send one more proposal to r1. It should be rejected.
+	if err := r.Step(propMsg); err != ErrProposalDropped {
+		t.Fatalf("proposal not dropped: %v", err)
+	}
+
+	// Read messages and reduce the uncommitted size as if we had committed
+	// these entries.
+	ms := r.readMessages()
+	if e := maxEntries * numFollowers; len(ms) != e {
+		t.Fatalf("expected %d messages, got %d", e, len(ms))
+	}
+	r.reduceUncommittedSize(propEnts)
+
+	// Send a single large proposal to r1. Should be accepted even though it
+	// pushes us above the limit because we were beneath it before the proposal.
+	propEnts = make([]pb.Entry, 2*maxEntries)
+	for i := range propEnts {
+		propEnts[i] = testEntry
+	}
+	propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts}
+	if err := r.Step(propMsgLarge); err != nil {
+		t.Fatalf("proposal resulted in error: %v", err)
+	}
+
+	// Send one more proposal to r1. It should be rejected, again.
+	if err := r.Step(propMsg); err != ErrProposalDropped {
+		t.Fatalf("proposal not dropped: %v", err)
+	}
+
+	// Read messages and reduce the uncommitted size as if we had committed
+	// these entries.
+	ms = r.readMessages()
+	if e := 1 * numFollowers; len(ms) != e {
+		t.Fatalf("expected %d messages, got %d", e, len(ms))
+	}
+	r.reduceUncommittedSize(propEnts)
+}
+
 func TestLeaderElection(t *testing.T) {
 func TestLeaderElection(t *testing.T) {
 	testLeaderElection(t, false)
 	testLeaderElection(t, false)
 }
 }

+ 14 - 12
raft/rafttest/node.go

@@ -41,12 +41,13 @@ type node struct {
 func startNode(id uint64, peers []raft.Peer, iface iface) *node {
 func startNode(id uint64, peers []raft.Peer, iface iface) *node {
 	st := raft.NewMemoryStorage()
 	st := raft.NewMemoryStorage()
 	c := &raft.Config{
 	c := &raft.Config{
-		ID:              id,
-		ElectionTick:    10,
-		HeartbeatTick:   1,
-		Storage:         st,
-		MaxSizePerMsg:   1024 * 1024,
-		MaxInflightMsgs: 256,
+		ID:                        id,
+		ElectionTick:              10,
+		HeartbeatTick:             1,
+		Storage:                   st,
+		MaxSizePerMsg:             1024 * 1024,
+		MaxInflightMsgs:           256,
+		MaxUncommittedEntriesSize: 1 << 30,
 	}
 	}
 	rn := raft.StartNode(c, peers)
 	rn := raft.StartNode(c, peers)
 	n := &node{
 	n := &node{
@@ -125,12 +126,13 @@ func (n *node) restart() {
 	// wait for the shutdown
 	// wait for the shutdown
 	<-n.stopc
 	<-n.stopc
 	c := &raft.Config{
 	c := &raft.Config{
-		ID:              n.id,
-		ElectionTick:    10,
-		HeartbeatTick:   1,
-		Storage:         n.storage,
-		MaxSizePerMsg:   1024 * 1024,
-		MaxInflightMsgs: 256,
+		ID:                        n.id,
+		ElectionTick:              10,
+		HeartbeatTick:             1,
+		Storage:                   n.storage,
+		MaxSizePerMsg:             1024 * 1024,
+		MaxInflightMsgs:           256,
+		MaxUncommittedEntriesSize: 1 << 30,
 	}
 	}
 	n.Node = raft.RestartNode(c)
 	n.Node = raft.RestartNode(c)
 	n.start()
 	n.start()

+ 1 - 0
raft/rawnode.go

@@ -198,6 +198,7 @@ func (rn *RawNode) Step(m pb.Message) error {
 func (rn *RawNode) Ready() Ready {
 func (rn *RawNode) Ready() Ready {
 	rd := rn.newReady()
 	rd := rn.newReady()
 	rn.raft.msgs = nil
 	rn.raft.msgs = nil
+	rn.raft.reduceUncommittedSize(rd.CommittedEntries)
 	return rd
 	return rd
 }
 }
 
 

+ 61 - 0
raft/rawnode_test.go

@@ -484,3 +484,64 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
 		})
 		})
 	}
 	}
 }
 }
+
+// TestRawNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
+// partitioned from a quorum of nodes. It verifies that the leader's log is
+// protected from unbounded growth even as new entries continue to be proposed.
+// This protection is provided by the MaxUncommittedEntriesSize configuration.
+func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
+	const maxEntries = 16
+	data := []byte("testdata")
+	testEntry := raftpb.Entry{Data: data}
+	maxEntrySize := uint64(maxEntries * testEntry.Size())
+
+	s := NewMemoryStorage()
+	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
+	cfg.MaxUncommittedEntriesSize = maxEntrySize
+	rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
+	if err != nil {
+		t.Fatal(err)
+	}
+	rd := rawNode.Ready()
+	s.Append(rd.Entries)
+	rawNode.Advance(rd)
+
+	// Become the leader.
+	rawNode.Campaign()
+	for {
+		rd = rawNode.Ready()
+		s.Append(rd.Entries)
+		if rd.SoftState.Lead == rawNode.raft.id {
+			rawNode.Advance(rd)
+			break
+		}
+		rawNode.Advance(rd)
+	}
+
+	// Simulate a network partition while we make our proposals by never
+	// committing anything. These proposals should not cause the leader's
+	// log to grow indefinitely.
+	for i := 0; i < 1024; i++ {
+		rawNode.Propose(data)
+	}
+
+	// Check the size of leader's uncommitted log tail. It should not exceed the
+	// MaxUncommittedEntriesSize limit.
+	checkUncommitted := func(exp uint64) {
+		t.Helper()
+		if a := rawNode.raft.uncommittedSize; exp != a {
+			t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
+		}
+	}
+	checkUncommitted(maxEntrySize)
+
+	// Recover from the partition. The uncommitted tail of the Raft log should
+	// disappear as entries are committed.
+	rd = rawNode.Ready()
+	if len(rd.CommittedEntries) != maxEntries {
+		t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
+	}
+	s.Append(rd.Entries)
+	rawNode.Advance(rd)
+	checkUncommitted(0)
+}