Browse Source

rafthttp: record the number of failed messages

Xiang Li 10 years ago
parent
commit
ab33c068b7
3 changed files with 22 additions and 0 deletions
  1. 16 0
      rafthttp/metrics.go
  2. 2 0
      rafthttp/pipeline.go
  3. 4 0
      rafthttp/stream.go

+ 16 - 0
rafthttp/metrics.go

@@ -30,10 +30,18 @@ var (
 		},
 		[]string{"channel", "remoteID", "msgType"},
 	)
+
+	msgWriteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Name: "rafthttp_message_sent_failed_total",
+		Help: "The total number of failed messages sent.",
+	},
+		[]string{"channel", "remoteID", "msgType"},
+	)
 )
 
 func init() {
 	prometheus.MustRegister(msgWriteDuration)
+	prometheus.MustRegister(msgWriteFailed)
 }
 
 func reportSendingDuration(channel string, m raftpb.Message, duration time.Duration) {
@@ -43,3 +51,11 @@ func reportSendingDuration(channel string, m raftpb.Message, duration time.Durat
 	}
 	msgWriteDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond)))
 }
+
+func reportMessageFailure(channel string, m raftpb.Message) {
+	typ := m.Type.String()
+	if isLinkHeartbeatMessage(m) {
+		typ = "MsgLinkHeartbeat"
+	}
+	msgWriteFailed.WithLabelValues(channel, types.ID(m.To).String(), typ).Inc()
+}

+ 2 - 0
rafthttp/pipeline.go

@@ -94,6 +94,8 @@ func (p *pipeline) handle() {
 
 		p.Lock()
 		if err != nil {
+			reportMessageFailure(pipelineMsg, m)
+
 			if p.errored == nil || p.errored.Error() != err.Error() {
 				log.Printf("pipeline: error posting to %s: %v", p.id, err)
 				p.errored = err

+ 4 - 0
rafthttp/stream.go

@@ -102,6 +102,8 @@ func (cw *streamWriter) run() {
 		case <-heartbeatc:
 			start := time.Now()
 			if err := enc.encode(linkHeartbeatMessage); err != nil {
+				reportMessageFailure(string(t), linkHeartbeatMessage)
+
 				log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
 				cw.resetCloser()
 				heartbeatc, msgc = nil, nil
@@ -120,6 +122,8 @@ func (cw *streamWriter) run() {
 			}
 			start := time.Now()
 			if err := enc.encode(m); err != nil {
+				reportMessageFailure(string(t), m)
+
 				log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
 				cw.resetCloser()
 				heartbeatc, msgc = nil, nil