Browse Source

rafthttp: add metrics for sending message

Xiang Li 10 years ago
parent
commit
17aa3cf7db
3 changed files with 50 additions and 0 deletions
  1. 45 0
      rafthttp/metrics.go
  2. 1 0
      rafthttp/pipeline.go
  3. 4 0
      rafthttp/stream.go

+ 45 - 0
rafthttp/metrics.go

@@ -0,0 +1,45 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+var (
+	msgWriteDuration = prometheus.NewSummaryVec(
+		prometheus.SummaryOpts{
+			Name: "rafthttp_message_sending_latency_microseconds",
+			Help: "message sending latency distributions.",
+		},
+		[]string{"channel", "remoteID", "msgType"},
+	)
+)
+
+func init() {
+	prometheus.MustRegister(msgWriteDuration)
+}
+
+func reportSendingDuration(channel string, m raftpb.Message, duration time.Duration) {
+	typ := m.Type.String()
+	if isLinkHeartbeatMessage(m) {
+		typ = "MsgLinkHeartbeat"
+	}
+	msgWriteDuration.WithLabelValues(channel, types.ID(m.To).String(), typ).Observe(float64(duration.Nanoseconds() / int64(time.Microsecond)))
+}

+ 1 - 0
rafthttp/pipeline.go

@@ -121,6 +121,7 @@ func (p *pipeline) handle() {
 			if isMsgSnap(m) {
 			if isMsgSnap(m) {
 				p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
 				p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
 			}
 			}
+			reportSendingDuration(pipelineMsg, m, time.Since(start))
 		}
 		}
 		p.Unlock()
 		p.Unlock()
 	}
 	}

+ 4 - 0
rafthttp/stream.go

@@ -100,6 +100,7 @@ func (cw *streamWriter) run() {
 	for {
 	for {
 		select {
 		select {
 		case <-heartbeatc:
 		case <-heartbeatc:
+			start := time.Now()
 			if err := enc.encode(linkHeartbeatMessage); err != nil {
 			if err := enc.encode(linkHeartbeatMessage); err != nil {
 				log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
 				log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
 				cw.resetCloser()
 				cw.resetCloser()
@@ -107,6 +108,7 @@ func (cw *streamWriter) run() {
 				continue
 				continue
 			}
 			}
 			flusher.Flush()
 			flusher.Flush()
+			reportSendingDuration(string(t), linkHeartbeatMessage, time.Since(start))
 		case m := <-msgc:
 		case m := <-msgc:
 			if t == streamTypeMsgApp && m.Term != msgAppTerm {
 			if t == streamTypeMsgApp && m.Term != msgAppTerm {
 				// TODO: reasonable retry logic
 				// TODO: reasonable retry logic
@@ -116,6 +118,7 @@ func (cw *streamWriter) run() {
 				}
 				}
 				continue
 				continue
 			}
 			}
+			start := time.Now()
 			if err := enc.encode(m); err != nil {
 			if err := enc.encode(m); err != nil {
 				log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
 				log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
 				cw.resetCloser()
 				cw.resetCloser()
@@ -124,6 +127,7 @@ func (cw *streamWriter) run() {
 				continue
 				continue
 			}
 			}
 			flusher.Flush()
 			flusher.Flush()
+			reportSendingDuration(string(t), m, time.Since(start))
 		case conn := <-cw.connc:
 		case conn := <-cw.connc:
 			cw.resetCloser()
 			cw.resetCloser()
 			t = conn.t
 			t = conn.t