Browse Source

rafthttp: pause peer should also pause its reader

Xiang Li 9 years ago
parent
commit
5d256b7b86
2 changed files with 25 additions and 0 deletions
  1. 4 0
      rafthttp/peer.go
  2. 21 0
      rafthttp/stream.go

+ 4 - 0
rafthttp/peer.go

@@ -217,6 +217,8 @@ func (p *peer) Pause() {
 	p.mu.Lock()
 	defer p.mu.Unlock()
 	p.paused = true
+	p.msgAppReader.pause()
+	p.msgAppV2Reader.pause()
 }
 
 // Resume resumes a paused peer.
@@ -224,6 +226,8 @@ func (p *peer) Resume() {
 	p.mu.Lock()
 	defer p.mu.Unlock()
 	p.paused = false
+	p.msgAppReader.resume()
+	p.msgAppV2Reader.resume()
 }
 
 func (p *peer) stop() {

+ 21 - 0
rafthttp/stream.go

@@ -252,6 +252,7 @@ type streamReader struct {
 	errorc        chan<- error
 
 	mu     sync.Mutex
+	paused bool
 	cancel func()
 	closer io.Closer
 	stopc  chan struct{}
@@ -331,6 +332,14 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 			return err
 		}
 
+		cr.mu.Lock()
+		paused := cr.paused
+		cr.mu.Unlock()
+
+		if paused {
+			continue
+		}
+
 		if isLinkHeartbeatMessage(m) {
 			// raft is not interested in link layer
 			// heartbeat message, so we should ignore
@@ -463,6 +472,18 @@ func (cr *streamReader) close() {
 	cr.closer = nil
 }
 
+func (cr *streamReader) pause() {
+	cr.mu.Lock()
+	defer cr.mu.Unlock()
+	cr.paused = true
+}
+
+func (cr *streamReader) resume() {
+	cr.mu.Lock()
+	defer cr.mu.Unlock()
+	cr.paused = false
+}
+
 func isClosedConnectionError(err error) bool {
 	operr, ok := err.(*net.OpError)
 	return ok && operr.Err.Error() == "use of closed network connection"