Browse Source

Merge pull request #9985 from bdarnell/flow-control

raft: Make flow control more aggressive
Xiang Li 7 years ago
parent
commit
6002bf37ba
2 changed files with 97 additions and 6 deletions
  1. 28 6
      raft/raft.go
  2. 69 0
      raft/raft_test.go

+ 28 - 6
raft/raft.go

@@ -441,22 +441,35 @@ func (r *raft) getProgress(id uint64) *Progress {
 	return r.learnerPrs[id]
 	return r.learnerPrs[id]
 }
 }
 
 
-// sendAppend sends RPC, with entries to the given peer.
+// sendAppend sends an append RPC with new entries (if any) and the
+// current commit index to the given peer.
 func (r *raft) sendAppend(to uint64) {
 func (r *raft) sendAppend(to uint64) {
+	r.maybeSendAppend(to, true)
+}
+
+// maybeSendAppend sends an append RPC with new entries to the given peer,
+// if necessary. Returns true if a message was sent. The sendIfEmpty
+// argument controls whether messages with no entries will be sent
+// ("empty" messages are useful to convey updated Commit indexes, but
+// are undesirable when we're sending multiple messages in a batch).
+func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
 	pr := r.getProgress(to)
 	pr := r.getProgress(to)
 	if pr.IsPaused() {
 	if pr.IsPaused() {
-		return
+		return false
 	}
 	}
 	m := pb.Message{}
 	m := pb.Message{}
 	m.To = to
 	m.To = to
 
 
 	term, errt := r.raftLog.term(pr.Next - 1)
 	term, errt := r.raftLog.term(pr.Next - 1)
 	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
 	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
+	if len(ents) == 0 && !sendIfEmpty {
+		return false
+	}
 
 
 	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
 	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
 		if !pr.RecentActive {
 		if !pr.RecentActive {
 			r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
 			r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
-			return
+			return false
 		}
 		}
 
 
 		m.Type = pb.MsgSnap
 		m.Type = pb.MsgSnap
@@ -464,7 +477,7 @@ func (r *raft) sendAppend(to uint64) {
 		if err != nil {
 		if err != nil {
 			if err == ErrSnapshotTemporarilyUnavailable {
 			if err == ErrSnapshotTemporarilyUnavailable {
 				r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
 				r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
-				return
+				return false
 			}
 			}
 			panic(err) // TODO(bdarnell)
 			panic(err) // TODO(bdarnell)
 		}
 		}
@@ -498,6 +511,7 @@ func (r *raft) sendAppend(to uint64) {
 		}
 		}
 	}
 	}
 	r.send(m)
 	r.send(m)
+	return true
 }
 }
 
 
 // sendHeartbeat sends an empty MsgApp
 // sendHeartbeat sends an empty MsgApp
@@ -1020,10 +1034,18 @@ func stepLeader(r *raft, m pb.Message) error {
 				if r.maybeCommit() {
 				if r.maybeCommit() {
 					r.bcastAppend()
 					r.bcastAppend()
 				} else if oldPaused {
 				} else if oldPaused {
-					// update() reset the wait state on this node. If we had delayed sending
-					// an update before, send it now.
+					// If we were paused before, this node may be missing the
+					// latest commit index, so send it.
 					r.sendAppend(m.From)
 					r.sendAppend(m.From)
 				}
 				}
+				// We've updated flow control information above, which may
+				// allow us to send multiple (size-limited) in-flight messages
+				// at once (such as when transitioning from probe to
+				// replicate, or when freeTo() covers multiple messages). If
+				// we have more entries to send, send as many messages as we
+				// can (without sending empty messages for the commit index)
+				for r.maybeSendAppend(m.From, false) {
+				}
 				// Transfer leadership is in progress.
 				// Transfer leadership is in progress.
 				if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
 				if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
 					r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
 					r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)

+ 69 - 0
raft/raft_test.go

@@ -20,6 +20,7 @@ import (
 	"math"
 	"math"
 	"math/rand"
 	"math/rand"
 	"reflect"
 	"reflect"
+	"strings"
 	"testing"
 	"testing"
 
 
 	pb "github.com/coreos/etcd/raft/raftpb"
 	pb "github.com/coreos/etcd/raft/raftpb"
@@ -293,6 +294,74 @@ func TestProgressPaused(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestProgressFlowControl(t *testing.T) {
+	cfg := newTestConfig(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
+	cfg.MaxInflightMsgs = 3
+	cfg.MaxSizePerMsg = 2048
+	r := newRaft(cfg)
+	r.becomeCandidate()
+	r.becomeLeader()
+
+	// Throw away all the messages relating to the initial election.
+	r.readMessages()
+
+	// While node 2 is in probe state, propose a bunch of entries.
+	r.prs[2].becomeProbe()
+	blob := []byte(strings.Repeat("a", 1000))
+	for i := 0; i < 10; i++ {
+		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
+	}
+
+	ms := r.readMessages()
+	// First append has two entries: the empty entry to confirm the
+	// election, and the first proposal (only one proposal gets sent
+	// because we're in probe state).
+	if len(ms) != 1 || ms[0].Type != pb.MsgApp {
+		t.Fatalf("expected 1 MsgApp, got %v", ms)
+	}
+	if len(ms[0].Entries) != 2 {
+		t.Fatalf("expected 2 entries, got %d", len(ms[0].Entries))
+	}
+	if len(ms[0].Entries[0].Data) != 0 || len(ms[0].Entries[1].Data) != 1000 {
+		t.Fatalf("unexpected entry sizes: %v", ms[0].Entries)
+	}
+
+	// When this append is acked, we change to replicate state and can
+	// send multiple messages at once.
+	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index})
+	ms = r.readMessages()
+	if len(ms) != 3 {
+		t.Fatalf("expected 3 messages, got %d", len(ms))
+	}
+	for i, m := range ms {
+		if m.Type != pb.MsgApp {
+			t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
+		}
+		if len(m.Entries) != 2 {
+			t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries))
+		}
+	}
+
+	// Ack all three of those messages together and get the last two
+	// messages (containing three entries).
+	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index})
+	ms = r.readMessages()
+	if len(ms) != 2 {
+		t.Fatalf("expected 2 messages, got %d", len(ms))
+	}
+	for i, m := range ms {
+		if m.Type != pb.MsgApp {
+			t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
+		}
+	}
+	if len(ms[0].Entries) != 2 {
+		t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries))
+	}
+	if len(ms[1].Entries) != 1 {
+		t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries))
+	}
+}
+
 func TestLeaderElection(t *testing.T) {
 func TestLeaderElection(t *testing.T) {
 	testLeaderElection(t, false)
 	testLeaderElection(t, false)
 }
 }