Browse Source

rafthttp: use go-routine for MsgProp processing

MsgProp process is blocking when there is no leader, which blocks the peer
loop totally.
Yicheng Qin 10 years ago
parent
commit
51397a6423
3 changed files with 67 additions and 16 deletions
  1. 29 2
      rafthttp/peer.go
  2. 8 2
      rafthttp/stream.go
  3. 30 12
      rafthttp/stream_test.go

+ 29 - 2
rafthttp/peer.go

@@ -38,6 +38,13 @@ const (
 	ConnWriteTimeout = 5 * time.Second
 
 	recvBufSize = 4096
+	// maxPendingProposals holds the proposals during one leader election process.
+	// Generally one leader election takes at most 1 sec. It should have
+	// 0-2 election conflicts, and each one takes 0.5 sec.
+	// We assume the number of concurrent proposers is smaller than 4096.
+	// One client blocks on its proposal for at least 1 sec, so 4096 is enough
+	// to hold all proposals.
+	maxPendingProposals = 4096
 
 	streamApp   = "streamMsgApp"
 	streamMsg   = "streamMsg"
@@ -91,6 +98,7 @@ type peer struct {
 
 	sendc    chan raftpb.Message
 	recvc    chan raftpb.Message
+	propc    chan raftpb.Message
 	newURLsC chan types.URLs
 
 	// for testing
@@ -110,16 +118,34 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 		pipeline:     newPipeline(tr, picker, to, cid, fs, r, errorc),
 		sendc:        make(chan raftpb.Message),
 		recvc:        make(chan raftpb.Message, recvBufSize),
+		propc:        make(chan raftpb.Message, maxPendingProposals),
 		newURLsC:     make(chan types.URLs),
 		pausec:       make(chan struct{}),
 		resumec:      make(chan struct{}),
 		stopc:        make(chan struct{}),
 		done:         make(chan struct{}),
 	}
+
+	// Use go-routine for process of MsgProp because it is
+	// blocking when there is no leader.
+	ctx, cancel := context.WithCancel(context.Background())
+	go func() {
+		for {
+			select {
+			case mm := <-p.propc:
+				if err := r.Process(ctx, mm); err != nil {
+					log.Printf("peer: process raft message error: %v", err)
+				}
+			case <-p.stopc:
+				return
+			}
+		}
+	}()
+
 	go func() {
 		var paused bool
-		msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc)
-		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc)
+		msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc, p.propc)
+		reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc)
 		for {
 			select {
 			case m := <-p.sendc:
@@ -147,6 +173,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 			case <-p.resumec:
 				paused = false
 			case <-p.stopc:
+				cancel()
 				p.msgAppWriter.stop()
 				p.writer.stop()
 				p.pipeline.stop()

+ 8 - 2
rafthttp/stream.go

@@ -199,6 +199,7 @@ type streamReader struct {
 	from, to types.ID
 	cid      types.ID
 	recvc    chan<- raftpb.Message
+	propc    chan<- raftpb.Message
 
 	mu         sync.Mutex
 	msgAppTerm uint64
@@ -208,7 +209,7 @@ type streamReader struct {
 	done       chan struct{}
 }
 
-func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message) *streamReader {
+func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message, propc chan<- raftpb.Message) *streamReader {
 	r := &streamReader{
 		tr:     tr,
 		picker: picker,
@@ -217,6 +218,7 @@ func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, fr
 		to:     to,
 		cid:    cid,
 		recvc:  recvc,
+		propc:  propc,
 		stopc:  make(chan struct{}),
 		done:   make(chan struct{}),
 	}
@@ -271,8 +273,12 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
 		case isLinkHeartbeatMessage(m):
 			// do nothing for linkHeartbeatMessage
 		default:
+			recvc := cr.recvc
+			if m.Type == raftpb.MsgProp {
+				recvc = cr.propc
+			}
 			select {
-			case cr.recvc <- m:
+			case recvc <- m:
 			default:
 				log.Printf("rafthttp: dropping %s from %x because receive buffer is blocked",
 					m.Type, m.From)

+ 30 - 12
rafthttp/stream_test.go

@@ -6,6 +6,7 @@ import (
 	"net/http/httptest"
 	"reflect"
 	"testing"
+	"time"
 
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/pkg/testutil"
@@ -154,28 +155,41 @@ func TestStreamReaderDialResult(t *testing.T) {
 // TestStream tests that streamReader and streamWriter can build stream to
 // send messages between each other.
 func TestStream(t *testing.T) {
+	recvc := make(chan raftpb.Message)
+	propc := make(chan raftpb.Message)
+	msgapp := raftpb.Message{
+		Type:    raftpb.MsgApp,
+		From:    2,
+		To:      1,
+		Term:    1,
+		LogTerm: 1,
+		Index:   3,
+		Entries: []raftpb.Entry{{Term: 1, Index: 4}},
+	}
+
 	tests := []struct {
 		t    streamType
 		term uint64
 		m    raftpb.Message
+		wc   chan raftpb.Message
 	}{
 		{
 			streamTypeMessage,
 			0,
 			raftpb.Message{Type: raftpb.MsgProp, To: 2},
+			propc,
+		},
+		{
+			streamTypeMessage,
+			0,
+			msgapp,
+			recvc,
 		},
 		{
 			streamTypeMsgApp,
 			1,
-			raftpb.Message{
-				Type:    raftpb.MsgApp,
-				From:    2,
-				To:      1,
-				Term:    1,
-				LogTerm: 1,
-				Index:   3,
-				Entries: []raftpb.Entry{{Term: 1, Index: 4}},
-			},
+			msgapp,
+			recvc,
 		},
 	}
 	for i, tt := range tests {
@@ -187,16 +201,20 @@ func TestStream(t *testing.T) {
 		defer sw.stop()
 		h.sw = sw
 
-		recvc := make(chan raftpb.Message)
 		picker := mustNewURLPicker(t, []string{srv.URL})
-		sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc)
+		sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc, propc)
 		defer sr.stop()
 		if tt.t == streamTypeMsgApp {
 			sr.updateMsgAppTerm(tt.term)
 		}
 
 		sw.msgc <- tt.m
-		m := <-recvc
+		var m raftpb.Message
+		select {
+		case m = <-tt.wc:
+		case <-time.After(time.Second):
+			t.Errorf("#%d: failed to receive message from the channel", i)
+		}
 		if !reflect.DeepEqual(m, tt.m) {
 			t.Errorf("#%d: message = %+v, want %+v", i, m, tt.m)
 		}