Browse Source

raft: Add a test for MaxSizePerMsg feature

Ensure that this limit is respected when generating MsgApp messages.
Ben Darnell 7 years ago
parent
commit
bc14deecca
2 changed files with 53 additions and 0 deletions
  1. 44 0
      raft/node_test.go
  2. 9 0
      raft/raft_test.go

+ 44 - 0
raft/node_test.go

@@ -831,3 +831,47 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
 	close(stop)
 	<-done
 }
+
+func TestAppendPagination(t *testing.T) {
+	const maxSizePerMsg = 2048
+	n := newNetworkWithConfig(func(c *Config) {
+		c.MaxSizePerMsg = maxSizePerMsg
+	}, nil, nil, nil)
+
+	seenFullMessage := false
+	// Inspect all messages to see that we never exceed the limit, but
+	// we do see messages of larger than half the limit.
+	n.msgHook = func(m raftpb.Message) bool {
+		if m.Type == raftpb.MsgApp {
+			size := 0
+			for _, e := range m.Entries {
+				size += len(e.Data)
+			}
+			if size > maxSizePerMsg {
+				t.Errorf("sent MsgApp that is too large: %d bytes", size)
+			}
+			if size > maxSizePerMsg/2 {
+				seenFullMessage = true
+			}
+		}
+		return true
+	}
+
+	n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
+
+	// Partition the network while we make our proposals. This forces
+	// the entries to be batched into larger messages.
+	n.isolate(1)
+	blob := []byte(strings.Repeat("a", 1000))
+	for i := 0; i < 5; i++ {
+		n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
+	}
+	n.recover()
+
+	// After the partition recovers, tick the clock to wake everything
+	// back up and send the messages.
+	n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
+	if !seenFullMessage {
+		t.Error("didn't see any messages more than half the max size; something is wrong with this test")
+	}
+}

+ 9 - 0
raft/raft_test.go

@@ -3986,6 +3986,10 @@ type network struct {
 	storage map[uint64]*MemoryStorage
 	dropm   map[connem]float64
 	ignorem map[pb.MessageType]bool
+
+	// msgHook is called for each message sent. It may inspect the
+	// message and return true to send it or false to drop it.
+	msgHook func(pb.Message) bool
 }
 
 // newNetwork initializes a network from peers.
@@ -4104,6 +4108,11 @@ func (nw *network) filter(msgs []pb.Message) []pb.Message {
 				continue
 			}
 		}
+		if nw.msgHook != nil {
+			if !nw.msgHook(m) {
+				continue
+			}
+		}
 		mm = append(mm, m)
 	}
 	return mm