Browse Source

rafthttp: simplify initialization funcs

Xiang Li 9 years ago
parent
commit
c25c00fcf9
4 changed files with 23 additions and 21 deletions
  1. 13 11
      rafthttp/peer.go
  2. 3 3
      rafthttp/remote.go
  3. 5 5
      rafthttp/snapshot_sender.go
  4. 2 2
      rafthttp/transport.go

+ 13 - 11
rafthttp/peer.go

@@ -117,14 +117,16 @@ type peer struct {
 	stopc  chan struct{}
 }
 
-func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
-	plog.Infof("starting peer %s...", to)
-	defer plog.Infof("started peer %s", to)
+func startPeer(transport *Transport, urls types.URLs, id types.ID, fs *stats.FollowerStats) *peer {
+	plog.Infof("starting peer %s...", id)
+	defer plog.Infof("started peer %s", id)
 
-	status := newPeerStatus(to)
+	status := newPeerStatus(id)
 	picker := newURLPicker(urls)
+	errorc := transport.ErrorC
+	r := transport.Raft
 	pipeline := &pipeline{
-		to:            to,
+		to:            id,
 		tr:            transport,
 		picker:        picker,
 		status:        status,
@@ -135,14 +137,14 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
 	pipeline.start()
 
 	p := &peer{
-		id:             to,
+		id:             id,
 		r:              r,
 		status:         status,
 		picker:         picker,
-		msgAppV2Writer: startStreamWriter(to, status, fs, r),
-		writer:         startStreamWriter(to, status, fs, r),
+		msgAppV2Writer: startStreamWriter(id, status, fs, r),
+		writer:         startStreamWriter(id, status, fs, r),
 		pipeline:       pipeline,
-		snapSender:     newSnapshotSender(transport, picker, local, to, cid, status, r, errorc),
+		snapSender:     newSnapshotSender(transport, picker, id, status),
 		sendc:          make(chan raftpb.Message),
 		recvc:          make(chan raftpb.Message, recvBufSize),
 		propc:          make(chan raftpb.Message, maxPendingProposals),
@@ -184,7 +186,7 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
 		typ:    streamTypeMsgAppV2,
 		tr:     transport,
 		picker: picker,
-		to:     to,
+		to:     id,
 		status: status,
 		recvc:  p.recvc,
 		propc:  p.propc,
@@ -193,7 +195,7 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
 		typ:    streamTypeMessage,
 		tr:     transport,
 		picker: picker,
-		to:     to,
+		to:     id,
 		status: status,
 		recvc:  p.recvc,
 		propc:  p.propc,

+ 3 - 3
rafthttp/remote.go

@@ -25,7 +25,7 @@ type remote struct {
 	pipeline *pipeline
 }
 
-func startRemote(tr *Transport, urls types.URLs, to types.ID, r Raft, errorc chan error) *remote {
+func startRemote(tr *Transport, urls types.URLs, to types.ID) *remote {
 	picker := newURLPicker(urls)
 	status := newPeerStatus(to)
 	pipeline := &pipeline{
@@ -33,8 +33,8 @@ func startRemote(tr *Transport, urls types.URLs, to types.ID, r Raft, errorc cha
 		tr:     tr,
 		picker: picker,
 		status: status,
-		raft:   r,
-		errorc: errorc,
+		raft:   tr.Raft,
+		errorc: tr.ErrorC,
 	}
 	pipeline.start()
 

+ 5 - 5
rafthttp/snapshot_sender.go

@@ -46,16 +46,16 @@ type snapshotSender struct {
 	stopc chan struct{}
 }
 
-func newSnapshotSender(tr *Transport, picker *urlPicker, from, to, cid types.ID, status *peerStatus, r Raft, errorc chan error) *snapshotSender {
+func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *peerStatus) *snapshotSender {
 	return &snapshotSender{
-		from:   from,
+		from:   tr.ID,
 		to:     to,
-		cid:    cid,
+		cid:    tr.ClusterID,
 		tr:     tr,
 		picker: picker,
 		status: status,
-		r:      r,
-		errorc: errorc,
+		r:      tr.Raft,
+		errorc: tr.ErrorC,
 		stopc:  make(chan struct{}),
 	}
 }

+ 2 - 2
rafthttp/transport.go

@@ -225,7 +225,7 @@ func (t *Transport) AddRemote(id types.ID, us []string) {
 	if err != nil {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
-	t.remotes[id] = startRemote(t, urls, id, t.Raft, t.ErrorC)
+	t.remotes[id] = startRemote(t, urls, id)
 }
 
 func (t *Transport) AddPeer(id types.ID, us []string) {
@@ -243,7 +243,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
 	fs := t.LeaderStats.Follower(id.String())
-	t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC)
+	t.peers[id] = startPeer(t, urls, id, fs)
 	addPeerToProber(t.prober, id.String(), us)
 
 	plog.Infof("added peer %s", id)