Browse Source

Merge pull request #2960 from yichengq/fix-drop-flood

rafthttp: pretty print message drop info
Yicheng Qin 10 years ago
parent
commit
ea3c7d1d31
4 changed files with 25 additions and 8 deletions
  1. 5 2
      rafthttp/peer.go
  2. 6 0
      rafthttp/peer_status.go
  3. 9 3
      rafthttp/remote.go
  4. 5 3
      rafthttp/stream.go

+ 5 - 2
rafthttp/peer.go

@@ -161,8 +161,11 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
 					if isMsgSnap(m) {
 						p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 					}
-					// TODO: log start and end of message dropping
-					plog.Warningf("dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name)
+					if status.isActive() {
+						plog.Warningf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
+					} else {
+						plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
+					}
 				}
 			case mm := <-p.recvc:
 				if err := r.Process(context.TODO(), mm); err != nil {

+ 6 - 0
rafthttp/peer_status.go

@@ -65,3 +65,9 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
 	s.failureMap[failure] = reason
 	plog.Errorf(logline)
 }
+
+func (s *peerStatus) isActive() bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.active
+}

+ 9 - 3
rafthttp/remote.go

@@ -23,14 +23,17 @@ import (
 
 type remote struct {
 	id       types.ID
+	status   *peerStatus
 	pipeline *pipeline
 }
 
 func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote {
 	picker := newURLPicker(urls)
+	status := newPeerStatus(to)
 	return &remote{
 		id:       to,
-		pipeline: newPipeline(tr, picker, local, to, cid, newPeerStatus(to), nil, r, errorc),
+		status:   status,
+		pipeline: newPipeline(tr, picker, local, to, cid, status, nil, r, errorc),
 	}
 }
 
@@ -38,8 +41,11 @@ func (g *remote) Send(m raftpb.Message) {
 	select {
 	case g.pipeline.msgc <- m:
 	default:
-		// TODO: log start and end of message dropping
-		plog.Warningf("dropping %s to %s since sending buffer is full", m.Type, g.id)
+		if g.status.isActive() {
+			plog.Warningf("dropped %s to %s since sending buffer is full", m.Type, g.id)
+		} else {
+			plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
+		}
 	}
 }
 

+ 5 - 3
rafthttp/stream.go

@@ -359,9 +359,11 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 			select {
 			case recvc <- m:
 			default:
-				// TODO: log start and end of message dropping
-				plog.Warningf("dropping %s from %x because receiving buffer is full",
-					m.Type, m.From)
+				if cr.status.isActive() {
+					plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, m.From)
+				} else {
+					plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, m.From)
+				}
 			}
 		}
 	}