Browse Source

Merge pull request #1768 from xiang90/batcher

rafthttp: add batcher
Xiang Li 11 years ago
parent
commit
68e79868cc
3 changed files with 113 additions and 1 deletions
  1. 41 0
      rafthttp/batcher.go
  2. 61 0
      rafthttp/batcher_test.go
  3. 11 1
      rafthttp/sender.go

+ 41 - 0
rafthttp/batcher.go

@@ -0,0 +1,41 @@
+package rafthttp
+
+import (
+	"time"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+type Batcher struct {
+	batchedN int
+	batchedT time.Time
+	batchN   int
+	batchD   time.Duration
+}
+
+func NewBatcher(n int, d time.Duration) *Batcher {
+	return &Batcher{
+		batchN:   n,
+		batchD:   d,
+		batchedT: time.Now(),
+	}
+}
+
+func (b *Batcher) ShouldBatch(now time.Time) bool {
+	b.batchedN++
+	batchedD := now.Sub(b.batchedT)
+	if b.batchedN >= b.batchN || batchedD >= b.batchD {
+		b.Reset(now)
+		return false
+	}
+	return true
+}
+
+func (b *Batcher) Reset(t time.Time) {
+	b.batchedN = 0
+	b.batchedT = t
+}
+
+func canBatch(m raftpb.Message) bool {
+	return m.Type == raftpb.MsgAppResp
+}

+ 61 - 0
rafthttp/batcher_test.go

@@ -0,0 +1,61 @@
+package rafthttp
+
+import (
+	"testing"
+	"time"
+)
+
+func TestBatcherNum(t *testing.T) {
+	n := 100
+	largeD := time.Minute
+	tests := []struct {
+		n         int
+		wnotbatch int
+	}{
+		{n - 1, 0},
+		{n, 1},
+		{n + 1, 1},
+		{2*n + 1, 2},
+		{3*n + 1, 3},
+	}
+
+	for i, tt := range tests {
+		b := NewBatcher(n, largeD)
+		notbatched := 0
+		for j := 0; j < tt.n; j++ {
+			if !b.ShouldBatch(time.Now()) {
+				notbatched++
+			}
+		}
+		if notbatched != tt.wnotbatch {
+			t.Errorf("#%d: notbatched = %d, want %d", i, notbatched, tt.wnotbatch)
+		}
+	}
+}
+
+func TestBatcherTime(t *testing.T) {
+	largeN := 10000
+	tests := []struct {
+		nms       int
+		wnotbatch int
+	}{
+		{0, 0},
+		{1, 1},
+		{2, 2},
+		{3, 3},
+	}
+
+	for i, tt := range tests {
+		b := NewBatcher(largeN, time.Millisecond)
+		baseT := b.batchedT
+		notbatched := 0
+		for j := 0; j < tt.nms+1; j++ {
+			if !b.ShouldBatch(baseT.Add(time.Duration(j) * time.Millisecond)) {
+				notbatched++
+			}
+		}
+		if notbatched != tt.wnotbatch {
+			t.Errorf("#%d: notbatched = %d, want %d", i, notbatched, tt.wnotbatch)
+		}
+	}
+}

+ 11 - 1
rafthttp/sender.go

@@ -33,6 +33,8 @@ import (
 const (
 const (
 	connPerSender = 4
 	connPerSender = 4
 	senderBufSize = connPerSender * 4
 	senderBufSize = connPerSender * 4
+
+	appRespBatchMs = 50
 )
 )
 
 
 type Sender interface {
 type Sender interface {
@@ -56,6 +58,7 @@ func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *st
 		p:          p,
 		p:          p,
 		fs:         fs,
 		fs:         fs,
 		shouldstop: shouldstop,
 		shouldstop: shouldstop,
+		batcher:    NewBatcher(100, appRespBatchMs*time.Millisecond),
 		q:          make(chan []byte, senderBufSize),
 		q:          make(chan []byte, senderBufSize),
 	}
 	}
 	s.wg.Add(connPerSender)
 	s.wg.Add(connPerSender)
@@ -74,6 +77,7 @@ type sender struct {
 	shouldstop chan struct{}
 	shouldstop chan struct{}
 
 
 	strmCln   *streamClient
 	strmCln   *streamClient
+	batcher   *Batcher
 	strmSrv   *streamServer
 	strmSrv   *streamServer
 	strmSrvMu sync.Mutex
 	strmSrvMu sync.Mutex
 	q         chan []byte
 	q         chan []byte
@@ -106,8 +110,14 @@ func (s *sender) Update(u string) {
 // TODO (xiangli): reasonable retry logic
 // TODO (xiangli): reasonable retry logic
 func (s *sender) Send(m raftpb.Message) error {
 func (s *sender) Send(m raftpb.Message) error {
 	s.maybeStopStream(m.Term)
 	s.maybeStopStream(m.Term)
-	if !s.hasStreamClient() && shouldInitStream(m) {
+	if shouldInitStream(m) && !s.hasStreamClient() {
 		s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
 		s.initStream(types.ID(m.From), types.ID(m.To), m.Term)
+		s.batcher.Reset(time.Now())
+	}
+	if canBatch(m) && s.hasStreamClient() {
+		if s.batcher.ShouldBatch(time.Now()) {
+			return nil
+		}
 	}
 	}
 	if canUseStream(m) {
 	if canUseStream(m) {
 		if ok := s.tryStream(m); ok {
 		if ok := s.tryStream(m); ok {