浏览代码

Merge pull request #5532 from xiang90/rh

rafthttp: simplify initialization funcs
Xiang Li 9 年之前
父节点
当前提交
29d2caf14a
共有 8 个文件被更改,包括 62 次插入61 次删除
  1. 13 11
      rafthttp/peer.go
  2. 4 4
      rafthttp/pipeline.go
  3. 3 4
      rafthttp/pipeline_test.go
  4. 6 6
      rafthttp/remote.go
  5. 5 5
      rafthttp/snapshot_sender.go
  6. 24 24
      rafthttp/stream.go
  7. 5 5
      rafthttp/stream_test.go
  8. 2 2
      rafthttp/transport.go

+ 13 - 11
rafthttp/peer.go

@@ -117,14 +117,16 @@ type peer struct {
 	stopc  chan struct{}
 }
 
-func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
-	plog.Infof("starting peer %s...", to)
-	defer plog.Infof("started peer %s", to)
+func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
+	plog.Infof("starting peer %s...", peerID)
+	defer plog.Infof("started peer %s", peerID)
 
-	status := newPeerStatus(to)
+	status := newPeerStatus(peerID)
 	picker := newURLPicker(urls)
+	errorc := transport.ErrorC
+	r := transport.Raft
 	pipeline := &pipeline{
-		to:            to,
+		peerID:        peerID,
 		tr:            transport,
 		picker:        picker,
 		status:        status,
@@ -135,14 +137,14 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
 	pipeline.start()
 
 	p := &peer{
-		id:             to,
+		id:             peerID,
 		r:              r,
 		status:         status,
 		picker:         picker,
-		msgAppV2Writer: startStreamWriter(to, status, fs, r),
-		writer:         startStreamWriter(to, status, fs, r),
+		msgAppV2Writer: startStreamWriter(peerID, status, fs, r),
+		writer:         startStreamWriter(peerID, status, fs, r),
 		pipeline:       pipeline,
-		snapSender:     newSnapshotSender(transport, picker, local, to, cid, status, r, errorc),
+		snapSender:     newSnapshotSender(transport, picker, peerID, status),
 		sendc:          make(chan raftpb.Message),
 		recvc:          make(chan raftpb.Message, recvBufSize),
 		propc:          make(chan raftpb.Message, maxPendingProposals),
@@ -181,19 +183,19 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
 	}()
 
 	p.msgAppV2Reader = &streamReader{
+		peerID: peerID,
 		typ:    streamTypeMsgAppV2,
 		tr:     transport,
 		picker: picker,
-		to:     to,
 		status: status,
 		recvc:  p.recvc,
 		propc:  p.propc,
 	}
 	p.msgAppReader = &streamReader{
+		peerID: peerID,
 		typ:    streamTypeMessage,
 		tr:     transport,
 		picker: picker,
-		to:     to,
 		status: status,
 		recvc:  p.recvc,
 		propc:  p.propc,

+ 4 - 4
rafthttp/pipeline.go

@@ -41,7 +41,7 @@ const (
 var errStopped = errors.New("stopped")
 
 type pipeline struct {
-	to types.ID
+	peerID types.ID
 
 	tr     *Transport
 	picker *urlPicker
@@ -64,13 +64,13 @@ func (p *pipeline) start() {
 	for i := 0; i < connPerPipeline; i++ {
 		go p.handle()
 	}
-	plog.Infof("started HTTP pipelining with peer %s", p.to)
+	plog.Infof("started HTTP pipelining with peer %s", p.peerID)
 }
 
 func (p *pipeline) stop() {
 	close(p.stopc)
 	p.wg.Wait()
-	plog.Infof("stopped HTTP pipelining with peer %s", p.to)
+	plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
 }
 
 func (p *pipeline) handle() {
@@ -140,7 +140,7 @@ func (p *pipeline) post(data []byte) (err error) {
 	}
 	resp.Body.Close()
 
-	err = checkPostResponse(resp, b, req, p.to)
+	err = checkPostResponse(resp, b, req, p.peerID)
 	if err != nil {
 		p.picker.unreachable(u)
 		// errMemberRemoved is a critical error since a removed member should

+ 3 - 4
rafthttp/pipeline_test.go

@@ -301,10 +301,9 @@ func (n *nopReadCloser) Close() error               { return nil }
 
 func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline {
 	p := &pipeline{
-		tr:     tr,
-		picker: picker,
-
-		to:            types.ID(1),
+		peerID:        types.ID(1),
+		tr:            tr,
+		picker:        picker,
 		status:        newPeerStatus(types.ID(1)),
 		raft:          &fakeRaft{},
 		followerStats: &stats.FollowerStats{},

+ 6 - 6
rafthttp/remote.go

@@ -25,21 +25,21 @@ type remote struct {
 	pipeline *pipeline
 }
 
-func startRemote(tr *Transport, urls types.URLs, to types.ID, r Raft, errorc chan error) *remote {
+func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
 	picker := newURLPicker(urls)
-	status := newPeerStatus(to)
+	status := newPeerStatus(id)
 	pipeline := &pipeline{
-		to:     to,
+		peerID: id,
 		tr:     tr,
 		picker: picker,
 		status: status,
-		raft:   r,
-		errorc: errorc,
+		raft:   tr.Raft,
+		errorc: tr.ErrorC,
 	}
 	pipeline.start()
 
 	return &remote{
-		id:       to,
+		id:       id,
 		status:   status,
 		pipeline: pipeline,
 	}

+ 5 - 5
rafthttp/snapshot_sender.go

@@ -46,16 +46,16 @@ type snapshotSender struct {
 	stopc chan struct{}
 }
 
-func newSnapshotSender(tr *Transport, picker *urlPicker, from, to, cid types.ID, status *peerStatus, r Raft, errorc chan error) *snapshotSender {
+func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *peerStatus) *snapshotSender {
 	return &snapshotSender{
-		from:   from,
+		from:   tr.ID,
 		to:     to,
-		cid:    cid,
+		cid:    tr.ClusterID,
 		tr:     tr,
 		picker: picker,
 		status: status,
-		r:      r,
-		errorc: errorc,
+		r:      tr.Raft,
+		errorc: tr.ErrorC,
 		stopc:  make(chan struct{}),
 	}
 }

+ 24 - 24
rafthttp/stream.go

@@ -97,7 +97,7 @@ type outgoingConn struct {
 
 // streamWriter writes messages to the attached outgoingConn.
 type streamWriter struct {
-	id     types.ID
+	peerID types.ID
 	status *peerStatus
 	fs     *stats.FollowerStats
 	r      Raft
@@ -116,7 +116,7 @@ type streamWriter struct {
 // messages and writes to the attached outgoing connection.
 func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
 	w := &streamWriter{
-		id:     id,
+		peerID: id,
 		status: status,
 		fs:     fs,
 		r:      r,
@@ -141,7 +141,7 @@ func (cw *streamWriter) run() {
 	tickc := time.Tick(ConnReadTimeout / 3)
 	unflushed := 0
 
-	plog.Infof("started streaming with peer %s (writer)", cw.id)
+	plog.Infof("started streaming with peer %s (writer)", cw.peerID)
 
 	for {
 		select {
@@ -151,7 +151,7 @@ func (cw *streamWriter) run() {
 			if err == nil {
 				flusher.Flush()
 				batched = 0
-				sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed))
+				sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
 				unflushed = 0
 				continue
 			}
@@ -159,7 +159,7 @@ func (cw *streamWriter) run() {
 			cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
 
 			cw.close()
-			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t)
+			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
 			heartbeatc, msgc = nil, nil
 
 		case m := <-msgc:
@@ -169,7 +169,7 @@ func (cw *streamWriter) run() {
 
 				if len(msgc) == 0 || batched > streamBufSize/2 {
 					flusher.Flush()
-					sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed))
+					sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
 					unflushed = 0
 					batched = 0
 				} else {
@@ -181,13 +181,13 @@ func (cw *streamWriter) run() {
 
 			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
 			cw.close()
-			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t)
+			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
 			heartbeatc, msgc = nil, nil
 			cw.r.ReportUnreachable(m.To)
 
 		case conn := <-cw.connc:
 			if cw.close() {
-				plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.id, t)
+				plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
 			}
 			t = conn.t
 			switch conn.t {
@@ -205,14 +205,14 @@ func (cw *streamWriter) run() {
 			cw.closer = conn.Closer
 			cw.working = true
 			cw.mu.Unlock()
-			plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.id, t)
+			plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
 			heartbeatc, msgc = tickc, cw.msgc
 		case <-cw.stopc:
 			if cw.close() {
-				plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.id, t)
+				plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
 			}
 			close(cw.done)
-			plog.Infof("stopped streaming with peer %s (writer)", cw.id)
+			plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
 			return
 		}
 	}
@@ -232,7 +232,7 @@ func (cw *streamWriter) close() bool {
 	}
 	cw.closer.Close()
 	if len(cw.msgc) > 0 {
-		cw.r.ReportUnreachable(uint64(cw.id))
+		cw.r.ReportUnreachable(uint64(cw.peerID))
 	}
 	cw.msgc = make(chan raftpb.Message, streamBufSize)
 	cw.working = false
@@ -256,11 +256,11 @@ func (cw *streamWriter) stop() {
 // streamReader is a long-running go-routine that dials to the remote stream
 // endpoint and reads messages from the response body returned.
 type streamReader struct {
-	typ streamType
+	peerID types.ID
+	typ    streamType
 
 	tr     *Transport
 	picker *urlPicker
-	to     types.ID
 	status *peerStatus
 	recvc  chan<- raftpb.Message
 	propc  chan<- raftpb.Message
@@ -288,7 +288,7 @@ func (r *streamReader) start() {
 
 func (cr *streamReader) run() {
 	t := cr.typ
-	plog.Infof("started streaming with peer %s (%s reader)", cr.to, t)
+	plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
 	for {
 		rc, err := cr.dial(t)
 		if err != nil {
@@ -297,9 +297,9 @@ func (cr *streamReader) run() {
 			}
 		} else {
 			cr.status.activate()
-			plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ)
+			plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
 			err := cr.decodeLoop(rc, t)
-			plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ)
+			plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
 			switch {
 			// all data is read out
 			case err == io.EOF:
@@ -315,7 +315,7 @@ func (cr *streamReader) run() {
 		case <-time.After(100 * time.Millisecond):
 		case <-cr.stopc:
 			close(cr.done)
-			plog.Infof("stopped streaming with peer %s (%s reader)", cr.to, t)
+			plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
 			return
 		}
 	}
@@ -326,7 +326,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 	cr.mu.Lock()
 	switch t {
 	case streamTypeMsgAppV2:
-		dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.to)
+		dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
 	case streamTypeMessage:
 		dec = &messageDecoder{r: rc}
 	default:
@@ -402,7 +402,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	req.Header.Set("X-Server-Version", version.Version)
 	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
 	req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
-	req.Header.Set("X-Raft-To", cr.to.String())
+	req.Header.Set("X-Raft-To", cr.peerID.String())
 
 	setPeerURLsHeader(req, cr.tr.URLs)
 
@@ -445,7 +445,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 	case http.StatusNotFound:
 		httputil.GracefulClose(resp)
 		cr.picker.unreachable(u)
-		return nil, fmt.Errorf("remote member %s could not recognize local member", cr.to)
+		return nil, fmt.Errorf("peer %s faild to fine local node %s", cr.peerID, cr.tr.ID)
 	case http.StatusPreconditionFailed:
 		b, err := ioutil.ReadAll(resp.Body)
 		if err != nil {
@@ -457,11 +457,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
 
 		switch strings.TrimSuffix(string(b), "\n") {
 		case errIncompatibleVersion.Error():
-			plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.to)
+			plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
 			return nil, errIncompatibleVersion
 		case errClusterIDMismatch.Error():
-			plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
-				cr.to, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
+			plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
+				cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
 			return nil, errClusterIDMismatch
 		default:
 			return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))

+ 5 - 5
rafthttp/stream_test.go

@@ -116,9 +116,9 @@ func TestStreamReaderDialRequest(t *testing.T) {
 	for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
 		tr := &roundTripperRecorder{}
 		sr := &streamReader{
+			peerID: types.ID(2),
 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
-			to:     types.ID(2),
 		}
 		sr.dial(tt)
 
@@ -164,9 +164,9 @@ func TestStreamReaderDialResult(t *testing.T) {
 			err:    tt.err,
 		}
 		sr := &streamReader{
+			peerID: types.ID(2),
 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
-			to:     types.ID(2),
 			errorc: make(chan error, 1),
 		}
 
@@ -190,9 +190,9 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) {
 			header: http.Header{},
 		}
 		sr := &streamReader{
+			peerID: types.ID(2),
 			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
 			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
-			to:     types.ID(2),
 		}
 
 		_, err := sr.dial(typ)
@@ -251,11 +251,11 @@ func TestStream(t *testing.T) {
 		tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)}
 
 		sr := &streamReader{
+			peerID: types.ID(2),
 			typ:    tt.t,
 			tr:     tr,
 			picker: picker,
-			to:     types.ID(2),
-			status: newPeerStatus(types.ID(1)),
+			status: newPeerStatus(types.ID(2)),
 			recvc:  recvc,
 			propc:  propc,
 		}

+ 2 - 2
rafthttp/transport.go

@@ -225,7 +225,7 @@ func (t *Transport) AddRemote(id types.ID, us []string) {
 	if err != nil {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
-	t.remotes[id] = startRemote(t, urls, id, t.Raft, t.ErrorC)
+	t.remotes[id] = startRemote(t, urls, id)
 }
 
 func (t *Transport) AddPeer(id types.ID, us []string) {
@@ -243,7 +243,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
 	fs := t.LeaderStats.Follower(id.String())
-	t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC)
+	t.peers[id] = startPeer(t, urls, id, fs)
 	addPeerToProber(t.prober, id.String(), us)
 
 	plog.Infof("added peer %s", id)