Browse Source

etcdserver: add buffer to the sender queue

Xiang Li 11 years ago
parent
commit
7c4b84a6cd
2 changed files with 15 additions and 17 deletions
  1. 2 1
      etcdserver/sender.go
  2. 13 16
      etcdserver/sender_test.go

+ 2 - 1
etcdserver/sender.go

@@ -34,6 +34,7 @@ import (
 const (
 	raftPrefix    = "/raft"
 	connPerSender = 4
+	senderBufSize = connPerSender * 4
 )
 
 type sendHub struct {
@@ -150,7 +151,7 @@ func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerS
 		u:          u,
 		cid:        cid,
 		fs:         fs,
-		q:          make(chan []byte),
+		q:          make(chan []byte, senderBufSize),
 		shouldstop: shouldstop,
 	}
 	s.wg.Add(connPerSender)

+ 13 - 16
etcdserver/sender_test.go

@@ -97,9 +97,6 @@ func TestSendHubShouldStop(t *testing.T) {
 	cl := newTestCluster(membs)
 	ls := stats.NewLeaderStats("")
 	h := newSendHub(tr, cl, nil, ls)
-	// wait for handle goroutines start
-	// TODO: wait for goroutines ready before return newSender
-	time.Sleep(10 * time.Millisecond)
 
 	shouldstop := h.ShouldStopNotify()
 	select {
@@ -123,9 +120,7 @@ func TestSenderSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	fs := &stats.FollowerStats{}
 	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
-	// wait for handle goroutines start
-	// TODO: wait for goroutines ready before return newSender
-	time.Sleep(10 * time.Millisecond)
+
 	if err := s.send([]byte("some data")); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
 	}
@@ -145,22 +140,26 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	fs := &stats.FollowerStats{}
 	s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
-	// wait for handle goroutines start
-	// TODO: wait for goroutines ready before return newSender
-	time.Sleep(10 * time.Millisecond)
-	// It could handle that many requests at the same time.
-	for i := 0; i < connPerSender; i++ {
+
+	// keep the sender busy and make the buffer full
+	// nothing can go out as we block the sender
+	for i := 0; i < connPerSender+senderBufSize; i++ {
 		if err := s.send([]byte("some data")); err != nil {
 			t.Errorf("send err = %v, want nil", err)
 		}
+		// force the sender to grab data
+		testutil.ForceGosched()
 	}
-	// This one exceeds its maximal serving ability
+
+	// try to send a data when we are sure the buffer is full
 	if err := s.send([]byte("some data")); err == nil {
 		t.Errorf("unexpect send success")
 	}
+
+	// unblock the senders and force them to send out the data
 	tr.unblock()
-	// Make handles finish their post
 	testutil.ForceGosched()
+
 	// It could send new data after previous ones succeed
 	if err := s.send([]byte("some data")); err != nil {
 		t.Errorf("send err = %v, want nil", err)
@@ -173,9 +172,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
 func TestSenderSendFailed(t *testing.T) {
 	fs := &stats.FollowerStats{}
 	s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
-	// wait for handle goroutines start
-	// TODO: wait for goroutines ready before return newSender
-	time.Sleep(10 * time.Millisecond)
+
 	if err := s.send([]byte("some data")); err != nil {
 		t.Fatalf("unexpect send error: %v", err)
 	}