Browse Source

Merge pull request #2439 from xiang90/metrics

Metrics
Xiang Li 10 years ago
parent
commit
8e76ccf979
3 changed files with 31 additions and 9 deletions
  1. 22 6
      rafthttp/metrics.go
  2. 3 1
      rafthttp/pipeline.go
  3. 6 2
      rafthttp/stream.go

+ 22 - 6
rafthttp/metrics.go

@@ -23,23 +23,39 @@ import (
 )
 
 var (
-	msgWriteDuration = prometheus.NewSummaryVec(
+	msgSentDuration = prometheus.NewSummaryVec(
 		prometheus.SummaryOpts{
-			Name: "rafthttp_message_sending_latency_microseconds",
-			Help: "message sending latency distributions.",
+			Name: "rafthttp_message_sent_latency_microseconds",
+			Help: "message sent latency distributions.",
 		},
 		[]string{"channel", "remoteID", "msgType"},
 	)
+
+	msgSentFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Name: "rafthttp_message_sent_failed_total",
+		Help: "The total number of failed messages sent.",
+	},
+		[]string{"channel", "remoteID", "msgType"},
+	)
 )
 
 func init() {
-	prometheus.MustRegister(msgWriteDuration)
+	prometheus.MustRegister(msgSentDuration)
+	prometheus.MustRegister(msgSentFailed)
+}
+
+func reportSentDuration(channel string, m raftpb.Message, duration time.Duration) {
+	typ := m.Type.String()
+	if isLinkHeartbeatMessage(m) {
+		typ = "MsgLinkHeartbeat"
+	}
+	msgSentDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond)))
 }
 
-func reportSendingDuration(channel string, m raftpb.Message, duration time.Duration) {
+func reportSentFailure(channel string, m raftpb.Message) {
 	typ := m.Type.String()
 	if isLinkHeartbeatMessage(m) {
 		typ = "MsgLinkHeartbeat"
 	}
-	msgWriteDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond)))
+	msgSentFailed.WithLabelValues(channel, types.ID(m.To).String(), typ).Inc()
 }

+ 3 - 1
rafthttp/pipeline.go

@@ -91,6 +91,8 @@ func (p *pipeline) handle() {
 
 		p.Lock()
 		if err != nil {
+			reportSentFailure(pipelineMsg, m)
+
 			if p.errored == nil || p.errored.Error() != err.Error() {
 				log.Printf("pipeline: error posting to %s: %v", p.id, err)
 				p.errored = err
@@ -118,7 +120,7 @@ func (p *pipeline) handle() {
 			if isMsgSnap(m) {
 				p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
 			}
-			reportSendingDuration(pipelineMsg, m, time.Since(start))
+			reportSentDuration(pipelineMsg, m, time.Since(start))
 		}
 		p.Unlock()
 	}

+ 6 - 2
rafthttp/stream.go

@@ -101,13 +101,15 @@ func (cw *streamWriter) run() {
 		case <-heartbeatc:
 			start := time.Now()
 			if err := enc.encode(linkHeartbeatMessage); err != nil {
+				reportSentFailure(string(t), linkHeartbeatMessage)
+
 				log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
 				cw.resetCloser()
 				heartbeatc, msgc = nil, nil
 				continue
 			}
 			flusher.Flush()
-			reportSendingDuration(string(t), linkHeartbeatMessage, time.Since(start))
+			reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start))
 		case m := <-msgc:
 			if t == streamTypeMsgApp && m.Term != msgAppTerm {
 				// TODO: reasonable retry logic
@@ -119,6 +121,8 @@ func (cw *streamWriter) run() {
 			}
 			start := time.Now()
 			if err := enc.encode(m); err != nil {
+				reportSentFailure(string(t), m)
+
 				log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
 				cw.resetCloser()
 				heartbeatc, msgc = nil, nil
@@ -126,7 +130,7 @@ func (cw *streamWriter) run() {
 				continue
 			}
 			flusher.Flush()
-			reportSendingDuration(string(t), m, time.Since(start))
+			reportSentDuration(string(t), m, time.Since(start))
 		case conn := <-cw.connc:
 			cw.resetCloser()
 			t = conn.t