Browse Source

rafthttp: remove unncessary go routine

Xiang Li 9 years ago
parent
commit
2a3cacb60c
1 changed files with 47 additions and 74 deletions
  1. 47 74
      rafthttp/peer.go

+ 47 - 74
rafthttp/peer.go

@@ -15,6 +15,7 @@
 package rafthttp
 
 import (
+	"sync"
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@@ -104,18 +105,17 @@ type peer struct {
 	pipeline       *pipeline
 	snapSender     *snapshotSender // snapshot sender to send v3 snapshot messages
 	msgAppV2Reader *streamReader
+	msgAppReader   *streamReader
 
-	sendc    chan raftpb.Message
-	recvc    chan raftpb.Message
-	propc    chan raftpb.Message
-	newURLsC chan types.URLs
+	sendc chan raftpb.Message
+	recvc chan raftpb.Message
+	propc chan raftpb.Message
 
-	// for testing
-	pausec  chan struct{}
-	resumec chan struct{}
+	mu     sync.Mutex
+	paused bool
 
-	stopc chan struct{}
-	done  chan struct{}
+	cancel context.CancelFunc // cancel pending works in go routine created by peer.
+	stopc  chan struct{}
 }
 
 func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
@@ -134,16 +134,11 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
 		sendc:          make(chan raftpb.Message),
 		recvc:          make(chan raftpb.Message, recvBufSize),
 		propc:          make(chan raftpb.Message, maxPendingProposals),
-		newURLsC:       make(chan types.URLs),
-		pausec:         make(chan struct{}),
-		resumec:        make(chan struct{}),
 		stopc:          make(chan struct{}),
-		done:           make(chan struct{}),
 	}
 
-	// Use go-routine for process of MsgProp because it is
-	// blocking when there is no leader.
 	ctx, cancel := context.WithCancel(context.Background())
+	p.cancel = cancel
 	go func() {
 		for {
 			select {
@@ -151,66 +146,43 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
 				if err := r.Process(ctx, mm); err != nil {
 					plog.Warningf("failed to process raft message (%v)", err)
 				}
-			case <-p.stopc:
-				return
-			}
-		}
-	}()
-
-	p.msgAppV2Reader = startStreamReader(transport, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
-	reader := startStreamReader(transport, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
-	go func() {
-		var paused bool
-		for {
-			select {
-			case m := <-p.sendc:
-				if paused {
-					continue
-				}
-				writec, name := p.pick(m)
-				select {
-				case writec <- m:
-				default:
-					p.r.ReportUnreachable(m.To)
-					if isMsgSnap(m) {
-						p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
-					}
-					if status.isActive() {
-						plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
-					}
-					plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
-				}
 			case mm := <-p.recvc:
-				if err := r.Process(context.TODO(), mm); err != nil {
+				if err := r.Process(ctx, mm); err != nil {
 					plog.Warningf("failed to process raft message (%v)", err)
 				}
-			case urls := <-p.newURLsC:
-				picker.update(urls)
-			case <-p.pausec:
-				paused = true
-			case <-p.resumec:
-				paused = false
 			case <-p.stopc:
-				cancel()
-				p.msgAppV2Writer.stop()
-				p.writer.stop()
-				p.pipeline.stop()
-				p.snapSender.stop()
-				p.msgAppV2Reader.stop()
-				reader.stop()
-				close(p.done)
 				return
 			}
 		}
 	}()
 
+	p.msgAppV2Reader = startStreamReader(transport, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
+	p.msgAppReader = startStreamReader(transport, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
+
 	return p
 }
 
 func (p *peer) send(m raftpb.Message) {
+	p.mu.Lock()
+	paused := p.paused
+	p.mu.Unlock()
+
+	if paused {
+		return
+	}
+
+	writec, name := p.pick(m)
 	select {
-	case p.sendc <- m:
-	case <-p.done:
+	case writec <- m:
+	default:
+		p.r.ReportUnreachable(m.To)
+		if isMsgSnap(m) {
+			p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
+		}
+		if p.status.isActive() {
+			plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
+		}
+		plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
 	}
 }
 
@@ -219,10 +191,7 @@ func (p *peer) sendSnap(m snap.Message) {
 }
 
 func (p *peer) update(urls types.URLs) {
-	select {
-	case p.newURLsC <- urls:
-	case <-p.done:
-	}
+	p.picker.update(urls)
 }
 
 func (p *peer) attachOutgoingConn(conn *outgoingConn) {
@@ -245,23 +214,27 @@ func (p *peer) activeSince() time.Time { return p.status.activeSince }
 // Pause pauses the peer. The peer will simply drops all incoming
 // messages without returning an error.
 func (p *peer) Pause() {
-	select {
-	case p.pausec <- struct{}{}:
-	case <-p.done:
-	}
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.paused = true
 }
 
 // Resume resumes a paused peer.
 func (p *peer) Resume() {
-	select {
-	case p.resumec <- struct{}{}:
-	case <-p.done:
-	}
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.paused = false
 }
 
 func (p *peer) stop() {
 	close(p.stopc)
-	<-p.done
+	p.cancel()
+	p.msgAppV2Writer.stop()
+	p.writer.stop()
+	p.pipeline.stop()
+	p.snapSender.stop()
+	p.msgAppV2Reader.stop()
+	p.msgAppReader.stop()
 }
 
 // pick picks a chan for sending the given message. The picked chan and the picked chan