Browse Source

Merge pull request #1901 from yichengq/260

rafthttp: batch MsgProp
Yicheng Qin 11 years ago
parent
commit
e89cc25c50
4 changed files with 97 additions and 35 deletions
  1. 16 12
      raft/raft.go
  2. 5 4
      raft/raft_test.go
  3. 31 0
      rafthttp/batcher.go
  4. 45 19
      rafthttp/sender.go

+ 16 - 12
raft/raft.go

@@ -298,10 +298,13 @@ func (r *raft) reset(term uint64) {
 	r.pendingConf = false
 }
 
-func (r *raft) appendEntry(e pb.Entry) {
-	e.Term = r.Term
-	e.Index = r.raftLog.lastIndex() + 1
-	r.raftLog.append(e)
+func (r *raft) appendEntry(es ...pb.Entry) {
+	li := r.raftLog.lastIndex()
+	for i := range es {
+		es[i].Term = r.Term
+		es[i].Index = li + 1 + uint64(i)
+	}
+	r.raftLog.append(es...)
 	r.prs[r.id].update(r.raftLog.lastIndex())
 	r.maybeCommit()
 }
@@ -444,17 +447,18 @@ func stepLeader(r *raft, m pb.Message) {
 	case pb.MsgBeat:
 		r.bcastHeartbeat()
 	case pb.MsgProp:
-		if len(m.Entries) != 1 {
-			panic("unexpected length(entries) of a MsgProp")
+		if len(m.Entries) == 0 {
+			log.Panicf("raft: %x stepped empty MsgProp", r.id)
 		}
-		e := m.Entries[0]
-		if e.Type == pb.EntryConfChange {
-			if r.pendingConf {
-				return
+		for i, e := range m.Entries {
+			if e.Type == pb.EntryConfChange {
+				if r.pendingConf {
+					m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
+				}
+				r.pendingConf = true
 			}
-			r.pendingConf = true
 		}
-		r.appendEntry(e)
+		r.appendEntry(m.Entries...)
 		r.bcastAppend()
 	case pb.MsgAppResp:
 		if m.Reject {

+ 5 - 4
raft/raft_test.go

@@ -1245,8 +1245,8 @@ func TestStepConfig(t *testing.T) {
 }
 
 // TestStepIgnoreConfig tests that if raft step the second msgProp in
-// EntryConfChange type when the first one is uncommitted, the node will deny
-// the proposal and keep its original state.
+// EntryConfChange type when the first one is uncommitted, the node will set
+// the proposal to noop and keep its original state.
 func TestStepIgnoreConfig(t *testing.T) {
 	// a raft that cannot make progress
 	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
@@ -1256,8 +1256,9 @@ func TestStepIgnoreConfig(t *testing.T) {
 	index := r.raftLog.lastIndex()
 	pendingConf := r.pendingConf
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
-	if g := r.raftLog.lastIndex(); g != index {
-		t.Errorf("index = %d, want %d", g, index)
+	wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
+	if ents := r.raftLog.entries(index + 1); !reflect.DeepEqual(ents, wents) {
+		t.Errorf("ents = %+v, want %+v", ents, wents)
 	}
 	if r.pendingConf != pendingConf {
 		t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)

+ 31 - 0
rafthttp/batcher.go

@@ -6,6 +6,10 @@ import (
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
+var (
+	emptyMsgProp = raftpb.Message{Type: raftpb.MsgProp}
+)
+
 type Batcher struct {
 	batchedN int
 	batchedT time.Time
@@ -39,3 +43,30 @@ func (b *Batcher) Reset(t time.Time) {
 func canBatch(m raftpb.Message) bool {
 	return m.Type == raftpb.MsgAppResp && m.Reject == false
 }
+
+type ProposalBatcher struct {
+	*Batcher
+	raftpb.Message
+}
+
+func NewProposalBatcher(n int, d time.Duration) *ProposalBatcher {
+	return &ProposalBatcher{
+		Batcher: NewBatcher(n, d),
+		Message: emptyMsgProp,
+	}
+}
+
+func (b *ProposalBatcher) Batch(m raftpb.Message) {
+	b.Message.From = m.From
+	b.Message.To = m.To
+	b.Message.Entries = append(b.Message.Entries, m.Entries...)
+}
+
+func (b *ProposalBatcher) IsEmpty() bool {
+	return len(b.Message.Entries) == 0
+}
+
+func (b *ProposalBatcher) Reset(t time.Time) {
+	b.Batcher.Reset(t)
+	b.Message = emptyMsgProp
+}

+ 45 - 19
rafthttp/sender.go

@@ -39,6 +39,7 @@ const (
 	senderBufSize = 64
 
 	appRespBatchMs = 50
+	propBatchMs    = 10
 
 	ConnReadTimeout  = 5 * time.Second
 	ConnWriteTimeout = 5 * time.Second
@@ -66,14 +67,15 @@ type Sender interface {
 
 func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender {
 	s := &sender{
-		tr:         tr,
-		u:          u,
-		cid:        cid,
-		p:          p,
-		fs:         fs,
-		shouldstop: shouldstop,
-		batcher:    NewBatcher(100, appRespBatchMs*time.Millisecond),
-		q:          make(chan []byte, senderBufSize),
+		tr:          tr,
+		u:           u,
+		cid:         cid,
+		p:           p,
+		fs:          fs,
+		shouldstop:  shouldstop,
+		batcher:     NewBatcher(100, appRespBatchMs*time.Millisecond),
+		propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
+		q:           make(chan []byte, senderBufSize),
 	}
 	s.wg.Add(connPerSender)
 	for i := 0; i < connPerSender; i++ {
@@ -90,11 +92,12 @@ type sender struct {
 	fs         *stats.FollowerStats
 	shouldstop chan struct{}
 
-	strmCln   *streamClient
-	batcher   *Batcher
-	strmSrv   *streamServer
-	strmSrvMu sync.Mutex
-	q         chan []byte
+	strmCln     *streamClient
+	batcher     *Batcher
+	propBatcher *ProposalBatcher
+	strmSrv     *streamServer
+	strmSrvMu   sync.Mutex
+	q           chan []byte
 
 	paused bool
 	mu     sync.RWMutex
@@ -136,16 +139,37 @@ func (s *sender) Send(m raftpb.Message) error {
 		s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
 		s.batcher.Reset(time.Now())
 	}
-	if canBatch(m) && s.hasStreamClient() {
-		if s.batcher.ShouldBatch(time.Now()) {
-			return nil
+
+	var err error
+	switch {
+	case isProposal(m):
+		s.propBatcher.Batch(m)
+	case canBatch(m) && s.hasStreamClient():
+		if !s.batcher.ShouldBatch(time.Now()) {
+			err = s.send(m)
+		}
+	case canUseStream(m):
+		if ok := s.tryStream(m); !ok {
+			err = s.send(m)
 		}
+	default:
+		err = s.send(m)
 	}
-	if canUseStream(m) {
-		if ok := s.tryStream(m); ok {
-			return nil
+	// send out batched MsgProp if needed
+	// TODO: it is triggered by all outcoming send now, and it needs
+	// more clear solution. Either use separate goroutine to trigger it
+	// or use streaming.
+	if !s.propBatcher.IsEmpty() {
+		t := time.Now()
+		if !s.propBatcher.ShouldBatch(t) {
+			s.send(s.propBatcher.Message)
+			s.propBatcher.Reset(t)
 		}
 	}
+	return err
+}
+
+func (s *sender) send(m raftpb.Message) error {
 	// TODO: don't block. we should be able to have 1000s
 	// of messages out at a time.
 	data := pbutil.MustMarshal(&m)
@@ -282,3 +306,5 @@ func (s *sender) post(data []byte) error {
 		return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode))
 	}
 }
+
+func isProposal(m raftpb.Message) bool { return m.Type == raftpb.MsgProp }