Browse Source

rafthttp/metrics.go:fixed TODO: record write/recv failures.

Vimal Kumar 9 years ago
parent
commit
64e1a327ee
6 changed files with 30 additions and 1 deletions
  1. 2 0
      Documentation/metrics.md
  2. 3 0
      rafthttp/http.go
  3. 20 1
      rafthttp/metrics.go
  4. 1 0
      rafthttp/pipeline.go
  5. 1 0
      rafthttp/snapshot_sender.go
  6. 3 0
      rafthttp/stream.go

+ 2 - 0
Documentation/metrics.md

@@ -70,6 +70,8 @@ All these metrics are prefixed with `etcd_network_`
 |---------------------------|--------------------------------------------------------------------|---------------|
 | peer_sent_bytes_total           | The total number of bytes sent to the peer with ID `To`.         | Counter(To)   |
 | peer_received_bytes_total       | The total number of bytes received from the peer with ID `From`. | Counter(From) |
+| peer_sent_failures_total        | The total number of send failures from the peer with ID `To`.         | Counter(To)   |
+| peer_received_failures_total    | The total number of receive failures from the peer with ID `From`. | Counter(From) |
 | peer_round_trip_time_seconds    | Round-Trip-Time histogram between peers.                         | Histogram(To) |
 | client_grpc_sent_bytes_total    | The total number of bytes sent to grpc clients.                  | Counter   |
 | client_grpc_received_bytes_total| The total number of bytes received to grpc clients.              | Counter   |

+ 3 - 0
rafthttp/http.go

@@ -104,6 +104,7 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	if err != nil {
 		plog.Errorf("failed to read raft message (%v)", err)
 		http.Error(w, "error reading raft message", http.StatusBadRequest)
+		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
 		return
 	}
 
@@ -111,6 +112,7 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	if err := m.Unmarshal(b); err != nil {
 		plog.Errorf("failed to unmarshal raft message (%v)", err)
 		http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
+		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
 		return
 	}
 
@@ -186,6 +188,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		msg := fmt.Sprintf("failed to decode raft message (%v)", err)
 		plog.Errorf(msg)
 		http.Error(w, msg, http.StatusBadRequest)
+		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
 		return
 	}
 

+ 20 - 1
rafthttp/metrics.go

@@ -16,7 +16,6 @@ package rafthttp
 
 import "github.com/prometheus/client_golang/prometheus"
 
-// TODO: record write/recv failures.
 var (
 	sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
 		Namespace: "etcd",
@@ -36,6 +35,24 @@ var (
 		[]string{"From"},
 	)
 
+	sentFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "peer_sent_failures_total",
+		Help:      "The total number of send failures from peers.",
+	},
+		[]string{"To"},
+	)
+
+	recvFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "peer_received_failures_total",
+		Help:      "The total number of receive failures from peers.",
+	},
+		[]string{"From"},
+	)
+
 	rtts = prometheus.NewHistogramVec(prometheus.HistogramOpts{
 		Namespace: "etcd",
 		Subsystem: "network",
@@ -50,5 +67,7 @@ var (
 func init() {
 	prometheus.MustRegister(sentBytes)
 	prometheus.MustRegister(receivedBytes)
+	prometheus.MustRegister(sentFailures)
+	prometheus.MustRegister(recvFailures)
 	prometheus.MustRegister(rtts)
 }

+ 1 - 0
rafthttp/pipeline.go

@@ -93,6 +93,7 @@ func (p *pipeline) handle() {
 				if isMsgSnap(m) {
 					p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
 				}
+				sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
 				continue
 			}
 

+ 1 - 0
rafthttp/snapshot_sender.go

@@ -91,6 +91,7 @@ func (s *snapshotSender) send(merged snap.Message) {
 		// machine knows about it, it would pause a while and retry sending
 		// new snapshot message.
 		s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
+		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
 		return
 	}
 	s.status.activate()

+ 3 - 0
rafthttp/stream.go

@@ -158,6 +158,7 @@ func (cw *streamWriter) run() {
 
 			cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
 
+			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
 			cw.close()
 			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
 			heartbeatc, msgc = nil, nil
@@ -184,6 +185,7 @@ func (cw *streamWriter) run() {
 			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
 			heartbeatc, msgc = nil, nil
 			cw.r.ReportUnreachable(m.To)
+			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
 
 		case conn := <-cw.connc:
 			cw.mu.Lock()
@@ -388,6 +390,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 				plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
 			}
 			plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
+			recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
 		}
 	}
 }