Browse Source

etcdserver/api: add "etcd_network_snapshot_send_inflights_total", "etcd_network_snapshot_receive_inflights_total"

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
Gyuho Lee 6 years ago
parent
commit
1c8fab7365
3 changed files with 28 additions and 0 deletions
  1. 4 0
      rafthttp/http.go
  2. 20 0
      rafthttp/metrics.go
  3. 4 0
      rafthttp/snapshot_sender.go

+ 4 - 0
rafthttp/http.go

@@ -203,6 +203,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
+	snapshotReceiveInflights.WithLabelValues(from).Inc()
+	defer func() {
+		snapshotReceiveInflights.WithLabelValues(from).Dec()
+	}()
 	plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
 	// save incoming database snapshot.
 	n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)

+ 20 - 0
rafthttp/metrics.go

@@ -62,6 +62,15 @@ var (
 		[]string{"To"},
 	)
 
+	snapshotSendInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_send_inflights_total",
+		Help:      "Total number of inflight snapshot sends",
+	},
+		[]string{"To"},
+	)
+
 	snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
 		Namespace: "etcd",
 		Subsystem: "network",
@@ -93,6 +102,15 @@ var (
 		[]string{"From"},
 	)
 
+	snapshotReceiveInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_receive_inflights_total",
+		Help:      "Total number of inflight snapshot receives",
+	},
+		[]string{"From"},
+	)
+
 	snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
 		Namespace: "etcd",
 		Subsystem: "network",
@@ -133,9 +151,11 @@ func init() {
 	prometheus.MustRegister(recvFailures)
 
 	prometheus.MustRegister(snapshotSend)
+	prometheus.MustRegister(snapshotSendInflights)
 	prometheus.MustRegister(snapshotSendFailures)
 	prometheus.MustRegister(snapshotSendSeconds)
 	prometheus.MustRegister(snapshotReceive)
+	prometheus.MustRegister(snapshotReceiveInflights)
 	prometheus.MustRegister(snapshotReceiveFailures)
 	prometheus.MustRegister(snapshotReceiveSeconds)
 

+ 4 - 0
rafthttp/snapshot_sender.go

@@ -76,6 +76,10 @@ func (s *snapshotSender) send(merged snap.Message) {
 	req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.tr.URLs, s.from, s.cid)
 
 	plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
+	snapshotSendInflights.WithLabelValues(to).Inc()
+	defer func() {
+		snapshotSendInflights.WithLabelValues(to).Dec()
+	}()
 
 	err := s.post(req)
 	defer merged.CloseWithError(err)