Browse Source

Merge pull request #10063 from tschottdorf/fix-commit-pagination

raft: fix correctness bug in CommittedEntries pagination
Ben Darnell 7 years ago
parent
commit
08e88c6693
5 changed files with 188 additions and 22 deletions
  1. 20 11
      raft/node.go
  2. 71 0
      raft/node_test.go
  3. 8 11
      raft/rawnode.go
  4. 79 0
      raft/rawnode_test.go
  5. 10 0
      raft/util.go

+ 20 - 11
raft/node.go

@@ -109,6 +109,19 @@ func (rd Ready) containsUpdates() bool {
 		len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
 }
 
+// appliedCursor extracts from the Ready the highest index the client has
+// applied (once the Ready is confirmed via Advance). If no information is
+// contained in the Ready, returns zero.
+func (rd Ready) appliedCursor() uint64 {
+	if n := len(rd.CommittedEntries); n > 0 {
+		return rd.CommittedEntries[n-1].Index
+	}
+	if index := rd.Snapshot.Metadata.Index; index > 0 {
+		return index
+	}
+	return 0
+}
+
 // Node represents a node in a raft cluster.
 type Node interface {
 	// Tick increments the internal logical clock for the Node by a single tick. Election
@@ -282,6 +295,7 @@ func (n *node) run(r *raft) {
 	var prevLastUnstablei, prevLastUnstablet uint64
 	var havePrevLastUnstablei bool
 	var prevSnapi uint64
+	var applyingToI uint64
 	var rd Ready
 
 	lead := None
@@ -381,13 +395,17 @@ func (n *node) run(r *raft) {
 			if !IsEmptySnap(rd.Snapshot) {
 				prevSnapi = rd.Snapshot.Metadata.Index
 			}
+			if index := rd.appliedCursor(); index != 0 {
+				applyingToI = index
+			}
 
 			r.msgs = nil
 			r.readStates = nil
 			advancec = n.advancec
 		case <-advancec:
-			if prevHardSt.Commit != 0 {
-				r.raftLog.appliedTo(prevHardSt.Commit)
+			if applyingToI != 0 {
+				r.raftLog.appliedTo(applyingToI)
+				applyingToI = 0
 			}
 			if havePrevLastUnstablei {
 				r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
@@ -559,15 +577,6 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	}
 	if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
 		rd.HardState = hardSt
-		// If we hit a size limit when loadaing CommittedEntries, clamp
-		// our HardState.Commit to what we're actually returning. This is
-		// also used as our cursor to resume for the next Ready batch.
-		if len(rd.CommittedEntries) > 0 {
-			lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
-			if rd.HardState.Commit > lastCommit.Index {
-				rd.HardState.Commit = lastCommit.Index
-			}
-		}
 	}
 	if r.raftLog.unstable.snapshot != nil {
 		rd.Snapshot = *r.raftLog.unstable.snapshot

+ 71 - 0
raft/node_test.go

@@ -17,6 +17,8 @@ package raft
 import (
 	"bytes"
 	"context"
+	"fmt"
+	"math"
 	"reflect"
 	"strings"
 	"testing"
@@ -926,3 +928,72 @@ func TestCommitPagination(t *testing.T) {
 	s.Append(rd.Entries)
 	n.Advance()
 }
+
+type ignoreSizeHintMemStorage struct {
+	*MemoryStorage
+}
+
+func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) {
+	return s.MemoryStorage.Entries(lo, hi, math.MaxUint64)
+}
+
+// TestNodeCommitPaginationAfterRestart regression tests a scenario in which the
+// Storage's Entries size limitation is slightly more permissive than Raft's
+// internal one. The original bug was the following:
+//
+// - node learns that index 11 (or 100, doesn't matter) is committed
+// - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However,
+//   index 10 already exceeds maxBytes, due to a user-provided impl of Entries.
+// - Commit index gets bumped to 10
+// - the node persists the HardState, but crashes before applying the entries
+// - upon restart, the storage returns the same entries, but `slice` takes a different code path
+//   (since it is now called with an upper bound of 10) and removes the last entry.
+// - Raft emits a HardState with a regressing commit index.
+//
+// A simpler version of this test would have the storage return a lot less entries than dictated
+// by maxSize (for example, exactly one entry) after the restart, resulting in a larger regression.
+// This wouldn't need to exploit anything about Raft-internal code paths to fail.
+func TestNodeCommitPaginationAfterRestart(t *testing.T) {
+	s := &ignoreSizeHintMemStorage{
+		MemoryStorage: NewMemoryStorage(),
+	}
+	persistedHardState := raftpb.HardState{
+		Term:   1,
+		Vote:   1,
+		Commit: 10,
+	}
+
+	s.hardState = persistedHardState
+	s.ents = make([]raftpb.Entry, 10)
+	var size uint64
+	for i := range s.ents {
+		ent := raftpb.Entry{
+			Term:  1,
+			Index: uint64(i + 1),
+			Type:  raftpb.EntryNormal,
+			Data:  []byte("a"),
+		}
+
+		s.ents[i] = ent
+		size += uint64(ent.Size())
+	}
+
+	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
+	// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
+	// not be included in the initial rd.CommittedEntries. However, our storage will ignore
+	// this and *will* return it (which is how the Commit index ended up being 10 initially).
+	cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
+
+	r := newRaft(cfg)
+	n := newNode()
+	go n.run(r)
+	defer n.Stop()
+
+	rd := readyWithTimeout(&n)
+	if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit {
+		t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n%+v",
+			persistedHardState.Commit, rd.HardState.Commit,
+			DescribeEntries(rd.CommittedEntries, func(data []byte) string { return fmt.Sprintf("%q", data) }),
+		)
+	}
+}

+ 8 - 11
raft/rawnode.go

@@ -47,18 +47,15 @@ func (rn *RawNode) commitReady(rd Ready) {
 	if !IsEmptyHardState(rd.HardState) {
 		rn.prevHardSt = rd.HardState
 	}
-	if rn.prevHardSt.Commit != 0 {
-		// In most cases, prevHardSt and rd.HardState will be the same
-		// because when there are new entries to apply we just sent a
-		// HardState with an updated Commit value. However, on initial
-		// startup the two are different because we don't send a HardState
-		// until something changes, but we do send any un-applied but
-		// committed entries (and previously-committed entries may be
-		// incorporated into the snapshot, even if rd.CommittedEntries is
-		// empty). Therefore we mark all committed entries as applied
-		// whether they were included in rd.HardState or not.
-		rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit)
+
+	// If entries were applied (or a snapshot), update our cursor for
+	// the next Ready. Note that if the current HardState contains a
+	// new Commit index, this does not mean that we're also applying
+	// all of the new entries due to commit pagination by size.
+	if index := rd.appliedCursor(); index > 0 {
+		rn.raft.raftLog.appliedTo(index)
 	}
+
 	if len(rd.Entries) > 0 {
 		e := rd.Entries[len(rd.Entries)-1]
 		rn.raft.raftLog.stableTo(e.Index, e.Term)

+ 79 - 0
raft/rawnode_test.go

@@ -405,3 +405,82 @@ func TestRawNodeStatus(t *testing.T) {
 		t.Errorf("expected status struct, got nil")
 	}
 }
+
+// TestRawNodeCommitPaginationAfterRestart is the RawNode version of
+// TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the
+// Raft group would forget to apply entries:
+//
+// - node learns that index 11 is committed
+// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already
+//   exceeds maxBytes), which isn't noticed internally by Raft
+// - Commit index gets bumped to 10
+// - the node persists the HardState, but crashes before applying the entries
+// - upon restart, the storage returns the same entries, but `slice` takes a
+//   different code path and removes the last entry.
+// - Raft does not emit a HardState, but when the app calls Advance(), it bumps
+//   its internal applied index cursor to 10 (when it should be 9)
+// - the next Ready asks the app to apply index 11 (omitting index 10), losing a
+//    write.
+func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
+	s := &ignoreSizeHintMemStorage{
+		MemoryStorage: NewMemoryStorage(),
+	}
+	persistedHardState := raftpb.HardState{
+		Term:   1,
+		Vote:   1,
+		Commit: 10,
+	}
+
+	s.hardState = persistedHardState
+	s.ents = make([]raftpb.Entry, 10)
+	var size uint64
+	for i := range s.ents {
+		ent := raftpb.Entry{
+			Term:  1,
+			Index: uint64(i + 1),
+			Type:  raftpb.EntryNormal,
+			Data:  []byte("a"),
+		}
+
+		s.ents[i] = ent
+		size += uint64(ent.Size())
+	}
+
+	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
+	// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
+	// not be included in the initial rd.CommittedEntries. However, our storage will ignore
+	// this and *will* return it (which is how the Commit index ended up being 10 initially).
+	cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
+
+	s.ents = append(s.ents, raftpb.Entry{
+		Term:  1,
+		Index: uint64(11),
+		Type:  raftpb.EntryNormal,
+		Data:  []byte("boom"),
+	})
+
+	rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for highestApplied := uint64(0); highestApplied != 11; {
+		rd := rawNode.Ready()
+		n := len(rd.CommittedEntries)
+		if n == 0 {
+			t.Fatalf("stopped applying entries at index %d", highestApplied)
+		}
+		if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next {
+			t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied)
+		}
+		highestApplied = rd.CommittedEntries[n-1].Index
+		rawNode.Advance(rd)
+		rawNode.Step(raftpb.Message{
+			Type:   raftpb.MsgHeartbeat,
+			To:     1,
+			From:   1, // illegal, but we get away with it
+			Term:   1,
+			Commit: 11,
+		})
+	}
+}

+ 10 - 0
raft/util.go

@@ -113,6 +113,16 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string {
 	return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
 }
 
+// DescribeEntries calls DescribeEntry for each Entry, adding a newline to
+// each.
+func DescribeEntries(ents []pb.Entry, f EntryFormatter) string {
+	var buf bytes.Buffer
+	for _, e := range ents {
+		_, _ = buf.WriteString(DescribeEntry(e, f) + "\n")
+	}
+	return buf.String()
+}
+
 func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
 	if len(ents) == 0 {
 		return ents