Browse Source

Merge pull request #9982 from bdarnell/pagination

raft: Introduce CommittedEntries pagination
Xiang Li 7 years ago
parent
commit
11dd0b583b
5 changed files with 129 additions and 6 deletions
  1. 15 5
      raft/log.go
  2. 9 0
      raft/node.go
  3. 95 0
      raft/node_test.go
  4. 1 1
      raft/raft.go
  5. 9 0
      raft/raft_test.go

+ 15 - 5
raft/log.go

@@ -38,17 +38,27 @@ type raftLog struct {
 	applied uint64
 
 	logger Logger
+
+	maxMsgSize uint64
 }
 
-// newLog returns log using the given storage. It recovers the log to the state
-// that it just commits and applies the latest snapshot.
+// newLog returns log using the given storage and default options. It
+// recovers the log to the state that it just commits and applies the
+// latest snapshot.
 func newLog(storage Storage, logger Logger) *raftLog {
+	return newLogWithSize(storage, logger, noLimit)
+}
+
+// newLogWithSize returns a log using the given storage and max
+// message size.
+func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
 	if storage == nil {
 		log.Panic("storage must not be nil")
 	}
 	log := &raftLog{
-		storage: storage,
-		logger:  logger,
+		storage:    storage,
+		logger:     logger,
+		maxMsgSize: maxMsgSize,
 	}
 	firstIndex, err := storage.FirstIndex()
 	if err != nil {
@@ -139,7 +149,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
 func (l *raftLog) nextEnts() (ents []pb.Entry) {
 	off := max(l.applied+1, l.firstIndex())
 	if l.committed+1 > off {
-		ents, err := l.slice(off, l.committed+1, noLimit)
+		ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
 		if err != nil {
 			l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
 		}

+ 9 - 0
raft/node.go

@@ -559,6 +559,15 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	}
 	if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
 		rd.HardState = hardSt
+		// If we hit a size limit when loadaing CommittedEntries, clamp
+		// our HardState.Commit to what we're actually returning. This is
+		// also used as our cursor to resume for the next Ready batch.
+		if len(rd.CommittedEntries) > 0 {
+			lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
+			if rd.HardState.Commit > lastCommit.Index {
+				rd.HardState.Commit = lastCommit.Index
+			}
+		}
 	}
 	if r.raftLog.unstable.snapshot != nil {
 		rd.Snapshot = *r.raftLog.unstable.snapshot

+ 95 - 0
raft/node_test.go

@@ -26,6 +26,19 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
+// readyWithTimeout selects from n.Ready() with a 1-second timeout. It
+// panics on timeout, which is better than the indefinite wait that
+// would occur if this channel were read without being wrapped in a
+// select.
+func readyWithTimeout(n Node) Ready {
+	select {
+	case rd := <-n.Ready():
+		return rd
+	case <-time.After(time.Second):
+		panic("timed out waiting for ready")
+	}
+}
+
 // TestNodeStep ensures that node.Step sends msgProp to propc chan
 // and other kinds of messages to recvc chan.
 func TestNodeStep(t *testing.T) {
@@ -831,3 +844,85 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
 	close(stop)
 	<-done
 }
+
+func TestAppendPagination(t *testing.T) {
+	const maxSizePerMsg = 2048
+	n := newNetworkWithConfig(func(c *Config) {
+		c.MaxSizePerMsg = maxSizePerMsg
+	}, nil, nil, nil)
+
+	seenFullMessage := false
+	// Inspect all messages to see that we never exceed the limit, but
+	// we do see messages of larger than half the limit.
+	n.msgHook = func(m raftpb.Message) bool {
+		if m.Type == raftpb.MsgApp {
+			size := 0
+			for _, e := range m.Entries {
+				size += len(e.Data)
+			}
+			if size > maxSizePerMsg {
+				t.Errorf("sent MsgApp that is too large: %d bytes", size)
+			}
+			if size > maxSizePerMsg/2 {
+				seenFullMessage = true
+			}
+		}
+		return true
+	}
+
+	n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
+
+	// Partition the network while we make our proposals. This forces
+	// the entries to be batched into larger messages.
+	n.isolate(1)
+	blob := []byte(strings.Repeat("a", 1000))
+	for i := 0; i < 5; i++ {
+		n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
+	}
+	n.recover()
+
+	// After the partition recovers, tick the clock to wake everything
+	// back up and send the messages.
+	n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
+	if !seenFullMessage {
+		t.Error("didn't see any messages more than half the max size; something is wrong with this test")
+	}
+}
+
+func TestCommitPagination(t *testing.T) {
+	s := NewMemoryStorage()
+	cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
+	cfg.MaxSizePerMsg = 2048
+	r := newRaft(cfg)
+	n := newNode()
+	go n.run(r)
+	n.Campaign(context.TODO())
+
+	rd := readyWithTimeout(&n)
+	if len(rd.CommittedEntries) != 1 {
+		t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
+	}
+	s.Append(rd.Entries)
+	n.Advance()
+
+	blob := []byte(strings.Repeat("a", 1000))
+	for i := 0; i < 3; i++ {
+		if err := n.Propose(context.TODO(), blob); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// The 3 proposals will commit in two batches.
+	rd = readyWithTimeout(&n)
+	if len(rd.CommittedEntries) != 2 {
+		t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
+	}
+	s.Append(rd.Entries)
+	n.Advance()
+	rd = readyWithTimeout(&n)
+	if len(rd.CommittedEntries) != 1 {
+		t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
+	}
+	s.Append(rd.Entries)
+	n.Advance()
+}

+ 1 - 1
raft/raft.go

@@ -302,7 +302,7 @@ func newRaft(c *Config) *raft {
 	if err := c.validate(); err != nil {
 		panic(err.Error())
 	}
-	raftlog := newLog(c.Storage, c.Logger)
+	raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg)
 	hs, cs, err := c.Storage.InitialState()
 	if err != nil {
 		panic(err) // TODO(bdarnell)

+ 9 - 0
raft/raft_test.go

@@ -4055,6 +4055,10 @@ type network struct {
 	storage map[uint64]*MemoryStorage
 	dropm   map[connem]float64
 	ignorem map[pb.MessageType]bool
+
+	// msgHook is called for each message sent. It may inspect the
+	// message and return true to send it or false to drop it.
+	msgHook func(pb.Message) bool
 }
 
 // newNetwork initializes a network from peers.
@@ -4173,6 +4177,11 @@ func (nw *network) filter(msgs []pb.Message) []pb.Message {
 				continue
 			}
 		}
+		if nw.msgHook != nil {
+			if !nw.msgHook(m) {
+				continue
+			}
+		}
 		mm = append(mm, m)
 	}
 	return mm