Browse Source

Merge pull request #3069 from yichengq/init-term

rafthttp: support to init term when adding peer
Yicheng Qin 10 years ago
parent
commit
2afa6688ab
4 changed files with 19 additions and 20 deletions
  1. 3 3
      rafthttp/peer.go
  2. 14 13
      rafthttp/stream.go
  3. 1 1
      rafthttp/stream_test.go
  4. 1 3
      rafthttp/transport.go

+ 3 - 3
rafthttp/peer.go

@@ -107,7 +107,7 @@ type peer struct {
 	done  chan struct{}
 }
 
-func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
+func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64) *peer {
 	picker := newURLPicker(urls)
 	status := newPeerStatus(to)
 	p := &peer{
@@ -143,8 +143,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 		}
 	}()
 
-	p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
-	reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
+	p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc, term)
+	reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc, term)
 	go func() {
 		var paused bool
 		for {

+ 14 - 13
rafthttp/stream.go

@@ -266,20 +266,21 @@ type streamReader struct {
 	done       chan struct{}
 }
 
-func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error) *streamReader {
+func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, local, remote, cid types.ID, status *peerStatus, recvc chan<- raftpb.Message, propc chan<- raftpb.Message, errorc chan<- error, term uint64) *streamReader {
 	r := &streamReader{
-		tr:     tr,
-		picker: picker,
-		t:      t,
-		local:  local,
-		remote: remote,
-		cid:    cid,
-		status: status,
-		recvc:  recvc,
-		propc:  propc,
-		errorc: errorc,
-		stopc:  make(chan struct{}),
-		done:   make(chan struct{}),
+		tr:         tr,
+		picker:     picker,
+		t:          t,
+		local:      local,
+		remote:     remote,
+		cid:        cid,
+		status:     status,
+		recvc:      recvc,
+		propc:      propc,
+		errorc:     errorc,
+		msgAppTerm: term,
+		stopc:      make(chan struct{}),
+		done:       make(chan struct{}),
 	}
 	go r.run()
 	return r

+ 1 - 1
rafthttp/stream_test.go

@@ -274,7 +274,7 @@ func TestStream(t *testing.T) {
 		h.sw = sw
 
 		picker := mustNewURLPicker(t, []string{srv.URL})
-		sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil)
+		sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), newPeerStatus(types.ID(1)), recvc, propc, nil, 1)
 		defer sr.stop()
 		if tt.t == streamTypeMsgApp {
 			sr.updateMsgAppTerm(tt.term)

+ 1 - 3
rafthttp/transport.go

@@ -194,9 +194,7 @@ func (t *transport) AddPeer(id types.ID, us []string) {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
 	fs := t.leaderStats.Follower(id.String())
-	p := startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc)
-	p.setTerm(t.term)
-	t.peers[id] = p
+	t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc, t.term)
 }
 
 func (t *transport) RemovePeer(id types.ID) {