Browse Source

Merge pull request #11009 from gyuho/snapshot

*: add inflight snapshot metrics
Gyuho Lee 6 years ago
parent
commit
046c705f97

+ 5 - 0
etcdserver/api/rafthttp/http.go

@@ -258,6 +258,11 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
+	snapshotReceiveInflights.WithLabelValues(from).Inc()
+	defer func() {
+		snapshotReceiveInflights.WithLabelValues(from).Dec()
+	}()
+
 	if h.lg != nil {
 		h.lg.Info(
 			"receiving database snapshot",

+ 20 - 0
etcdserver/api/rafthttp/metrics.go

@@ -80,6 +80,15 @@ var (
 		[]string{"To"},
 	)
 
+	snapshotSendInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_send_inflights_total",
+		Help:      "Total number of inflight snapshot sends",
+	},
+		[]string{"To"},
+	)
+
 	snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
 		Namespace: "etcd",
 		Subsystem: "network",
@@ -111,6 +120,15 @@ var (
 		[]string{"From"},
 	)
 
+	snapshotReceiveInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_receive_inflights_total",
+		Help:      "Total number of inflight snapshot receives",
+	},
+		[]string{"From"},
+	)
+
 	snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
 		Namespace: "etcd",
 		Subsystem: "network",
@@ -156,9 +174,11 @@ func init() {
 	prometheus.MustRegister(recvFailures)
 
 	prometheus.MustRegister(snapshotSend)
+	prometheus.MustRegister(snapshotSendInflights)
 	prometheus.MustRegister(snapshotSendFailures)
 	prometheus.MustRegister(snapshotSendSeconds)
 	prometheus.MustRegister(snapshotReceive)
+	prometheus.MustRegister(snapshotReceiveInflights)
 	prometheus.MustRegister(snapshotReceiveFailures)
 	prometheus.MustRegister(snapshotReceiveSeconds)
 

+ 5 - 1
etcdserver/api/rafthttp/snapshot_sender.go

@@ -90,6 +90,11 @@ func (s *snapshotSender) send(merged snap.Message) {
 		plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
 	}
 
+	snapshotSendInflights.WithLabelValues(to).Inc()
+	defer func() {
+		snapshotSendInflights.WithLabelValues(to).Dec()
+	}()
+
 	err := s.post(req)
 	defer merged.CloseWithError(err)
 	if err != nil {
@@ -139,7 +144,6 @@ func (s *snapshotSender) send(merged snap.Message) {
 	}
 
 	sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
-
 	snapshotSend.WithLabelValues(to).Inc()
 	snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
 }

+ 7 - 0
etcdserver/metrics.go

@@ -76,6 +76,12 @@ var (
 		Name:      "slow_apply_total",
 		Help:      "The total number of slow apply requests (likely overloaded from slow disk).",
 	})
+	applySnapshotInProgress = prometheus.NewGauge(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "server",
+		Name:      "snapshot_apply_in_progress_total",
+		Help:      "1 if the server is applying the incoming snapshot. 0 if none.",
+	})
 	proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
 		Namespace: "etcd",
 		Subsystem: "server",
@@ -153,6 +159,7 @@ func init() {
 	prometheus.MustRegister(leaderChanges)
 	prometheus.MustRegister(heartbeatSendFailures)
 	prometheus.MustRegister(slowApplies)
+	prometheus.MustRegister(applySnapshotInProgress)
 	prometheus.MustRegister(proposalsCommitted)
 	prometheus.MustRegister(proposalsApplied)
 	prometheus.MustRegister(proposalsPending)

+ 2 - 0
etcdserver/server.go

@@ -1113,6 +1113,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 	if raft.IsEmptySnap(apply.snapshot) {
 		return
 	}
+	applySnapshotInProgress.Inc()
 
 	lg := s.getLogger()
 	if lg != nil {
@@ -1138,6 +1139,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 		} else {
 			plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
 		}
+		applySnapshotInProgress.Dec()
 	}()
 
 	if apply.snapshot.Metadata.Index <= ep.appliedi {

+ 19 - 1
integration/v3_watch_restore_test.go

@@ -71,7 +71,25 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
 	// trigger snapshot send from leader to this slow follower
 	// which then calls watchable store Restore
 	clus.Members[0].RecoverPartition(t, clus.Members[1:]...)
-	clus.WaitLeader(t)
+	lead := clus.WaitLeader(t)
+
+	sends, err := clus.Members[lead].Metric("etcd_network_snapshot_send_inflights_total")
+	if err != nil {
+		t.Fatal(err)
+	}
+	if sends != "0" && sends != "1" {
+		// 0 if already sent, 1 if sending
+		t.Fatalf("inflight snapshot sends expected 0 or 1, got %q", sends)
+	}
+	receives, err := clus.Members[(lead+1)%3].Metric("etcd_network_snapshot_receive_inflights_total")
+	if err != nil {
+		t.Fatal(err)
+	}
+	if receives != "0" && receives != "1" {
+		// 0 if already received, 1 if receiving
+		t.Fatalf("inflight snapshot receives expected 0 or 1, got %q", receives)
+	}
+
 	time.Sleep(2 * time.Second)
 
 	// slow follower now applies leader snapshot