Browse Source

Merge pull request #9997 from gyuho/snap-metrics

*: add v3 snapshot metrics (fsync, network)
Gyuho Lee 7 years ago
parent
commit
2a6bc7d113

+ 23 - 8
etcdserver/api/rafthttp/http.go

@@ -22,6 +22,7 @@ import (
 	"net/http"
 	"net/http"
 	"path"
 	"path"
 	"strings"
 	"strings"
+	"time"
 
 
 	"github.com/coreos/etcd/etcdserver/api/snap"
 	"github.com/coreos/etcd/etcdserver/api/snap"
 	pioutil "github.com/coreos/etcd/pkg/ioutil"
 	pioutil "github.com/coreos/etcd/pkg/ioutil"
@@ -185,6 +186,8 @@ func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid
 	}
 	}
 }
 }
 
 
+const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
+
 // ServeHTTP serves HTTP request to receive and process snapshot message.
 // ServeHTTP serves HTTP request to receive and process snapshot message.
 //
 //
 // If request sender dies without closing underlying TCP connection,
 // If request sender dies without closing underlying TCP connection,
@@ -195,9 +198,12 @@ func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid
 // received and processed.
 // received and processed.
 // 2. this case should happen rarely, so no further optimization is done.
 // 2. this case should happen rarely, so no further optimization is done.
 func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	start := time.Now()
+
 	if r.Method != "POST" {
 	if r.Method != "POST" {
 		w.Header().Set("Allow", "POST")
 		w.Header().Set("Allow", "POST")
 		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
 		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
 		return
 		return
 	}
 	}
 
 
@@ -205,6 +211,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 
 
 	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
 	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
 		http.Error(w, err.Error(), http.StatusPreconditionFailed)
 		http.Error(w, err.Error(), http.StatusPreconditionFailed)
+		snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
 		return
 		return
 	}
 	}
 
 
@@ -213,13 +220,14 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	dec := &messageDecoder{r: r.Body}
 	dec := &messageDecoder{r: r.Body}
 	// let snapshots be very large since they can exceed 512MB for large installations
 	// let snapshots be very large since they can exceed 512MB for large installations
 	m, err := dec.decodeLimit(uint64(1 << 63))
 	m, err := dec.decodeLimit(uint64(1 << 63))
+	from := types.ID(m.From).String()
 	if err != nil {
 	if err != nil {
 		msg := fmt.Sprintf("failed to decode raft message (%v)", err)
 		msg := fmt.Sprintf("failed to decode raft message (%v)", err)
 		if h.lg != nil {
 		if h.lg != nil {
 			h.lg.Warn(
 			h.lg.Warn(
 				"failed to decode Raft message",
 				"failed to decode Raft message",
 				zap.String("local-member-id", h.localID.String()),
 				zap.String("local-member-id", h.localID.String()),
-				zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+				zap.String("remote-snapshot-sender-id", from),
 				zap.Error(err),
 				zap.Error(err),
 			)
 			)
 		} else {
 		} else {
@@ -227,24 +235,26 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		}
 		}
 		http.Error(w, msg, http.StatusBadRequest)
 		http.Error(w, msg, http.StatusBadRequest)
 		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
 		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
+		snapshotReceiveFailures.WithLabelValues(from).Inc()
 		return
 		return
 	}
 	}
 
 
 	msgSize := m.Size()
 	msgSize := m.Size()
-	receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(msgSize))
+	receivedBytes.WithLabelValues(from).Add(float64(msgSize))
 
 
 	if m.Type != raftpb.MsgSnap {
 	if m.Type != raftpb.MsgSnap {
 		if h.lg != nil {
 		if h.lg != nil {
 			h.lg.Warn(
 			h.lg.Warn(
 				"unexpected Raft message type",
 				"unexpected Raft message type",
 				zap.String("local-member-id", h.localID.String()),
 				zap.String("local-member-id", h.localID.String()),
-				zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+				zap.String("remote-snapshot-sender-id", from),
 				zap.String("message-type", m.Type.String()),
 				zap.String("message-type", m.Type.String()),
 			)
 			)
 		} else {
 		} else {
 			plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
 			plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
 		}
 		}
 		http.Error(w, "wrong raft message type", http.StatusBadRequest)
 		http.Error(w, "wrong raft message type", http.StatusBadRequest)
+		snapshotReceiveFailures.WithLabelValues(from).Inc()
 		return
 		return
 	}
 	}
 
 
@@ -252,7 +262,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		h.lg.Info(
 		h.lg.Info(
 			"receiving database snapshot",
 			"receiving database snapshot",
 			zap.String("local-member-id", h.localID.String()),
 			zap.String("local-member-id", h.localID.String()),
-			zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+			zap.String("remote-snapshot-sender-id", from),
 			zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
 			zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
 			zap.Int("incoming-snapshot-message-size-bytes", msgSize),
 			zap.Int("incoming-snapshot-message-size-bytes", msgSize),
 			zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
 			zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
@@ -269,7 +279,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 			h.lg.Warn(
 			h.lg.Warn(
 				"failed to save incoming database snapshot",
 				"failed to save incoming database snapshot",
 				zap.String("local-member-id", h.localID.String()),
 				zap.String("local-member-id", h.localID.String()),
-				zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+				zap.String("remote-snapshot-sender-id", from),
 				zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
 				zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
 				zap.Error(err),
 				zap.Error(err),
 			)
 			)
@@ -277,16 +287,17 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 			plog.Error(msg)
 			plog.Error(msg)
 		}
 		}
 		http.Error(w, msg, http.StatusInternalServerError)
 		http.Error(w, msg, http.StatusInternalServerError)
+		snapshotReceiveFailures.WithLabelValues(from).Inc()
 		return
 		return
 	}
 	}
 
 
-	receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(n))
+	receivedBytes.WithLabelValues(from).Add(float64(n))
 
 
 	if h.lg != nil {
 	if h.lg != nil {
 		h.lg.Info(
 		h.lg.Info(
 			"received and saved database snapshot",
 			"received and saved database snapshot",
 			zap.String("local-member-id", h.localID.String()),
 			zap.String("local-member-id", h.localID.String()),
-			zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+			zap.String("remote-snapshot-sender-id", from),
 			zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
 			zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
 			zap.Int64("incoming-snapshot-size-bytes", n),
 			zap.Int64("incoming-snapshot-size-bytes", n),
 			zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
 			zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
@@ -307,13 +318,14 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 				h.lg.Warn(
 				h.lg.Warn(
 					"failed to process Raft message",
 					"failed to process Raft message",
 					zap.String("local-member-id", h.localID.String()),
 					zap.String("local-member-id", h.localID.String()),
-					zap.String("remote-snapshot-sender-id", types.ID(m.From).String()),
+					zap.String("remote-snapshot-sender-id", from),
 					zap.Error(err),
 					zap.Error(err),
 				)
 				)
 			} else {
 			} else {
 				plog.Error(msg)
 				plog.Error(msg)
 			}
 			}
 			http.Error(w, msg, http.StatusInternalServerError)
 			http.Error(w, msg, http.StatusInternalServerError)
+			snapshotReceiveFailures.WithLabelValues(from).Inc()
 		}
 		}
 		return
 		return
 	}
 	}
@@ -321,6 +333,9 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	// Write StatusNoContent header after the message has been processed by
 	// Write StatusNoContent header after the message has been processed by
 	// raft, which facilitates the client to report MsgSnap status.
 	// raft, which facilitates the client to report MsgSnap status.
 	w.WriteHeader(http.StatusNoContent)
 	w.WriteHeader(http.StatusNoContent)
+
+	snapshotReceive.WithLabelValues(from).Inc()
+	snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
 }
 }
 
 
 type streamHandler struct {
 type streamHandler struct {

+ 70 - 0
etcdserver/api/rafthttp/metrics.go

@@ -71,6 +71,68 @@ var (
 		[]string{"From"},
 		[]string{"From"},
 	)
 	)
 
 
+	snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_send_success",
+		Help:      "Total number of successful snapshot sends",
+	},
+		[]string{"To"},
+	)
+
+	snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_send_failures",
+		Help:      "Total number of snapshot send failures",
+	},
+		[]string{"To"},
+	)
+
+	snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_send_total_duration_seconds",
+		Help:      "Total latency distributions of v3 snapshot sends",
+
+		// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+		// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
+		Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
+	},
+		[]string{"To"},
+	)
+
+	snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_receive_success",
+		Help:      "Total number of successful snapshot receives",
+	},
+		[]string{"From"},
+	)
+
+	snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_receive_failures",
+		Help:      "Total number of snapshot receive failures",
+	},
+		[]string{"From"},
+	)
+
+	snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_receive_total_duration_seconds",
+		Help:      "Total latency distributions of v3 snapshot receives",
+
+		// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+		// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
+		Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
+	},
+		[]string{"From"},
+	)
+
 	rttSec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
 	rttSec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
 		Namespace: "etcd",
 		Namespace: "etcd",
 		Subsystem: "network",
 		Subsystem: "network",
@@ -92,5 +154,13 @@ func init() {
 	prometheus.MustRegister(receivedBytes)
 	prometheus.MustRegister(receivedBytes)
 	prometheus.MustRegister(sentFailures)
 	prometheus.MustRegister(sentFailures)
 	prometheus.MustRegister(recvFailures)
 	prometheus.MustRegister(recvFailures)
+
+	prometheus.MustRegister(snapshotSend)
+	prometheus.MustRegister(snapshotSendFailures)
+	prometheus.MustRegister(snapshotSendSeconds)
+	prometheus.MustRegister(snapshotReceive)
+	prometheus.MustRegister(snapshotReceiveFailures)
+	prometheus.MustRegister(snapshotReceiveSeconds)
+
 	prometheus.MustRegister(rttSec)
 	prometheus.MustRegister(rttSec)
 }
 }

+ 12 - 5
etcdserver/api/rafthttp/snapshot_sender.go

@@ -67,7 +67,10 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
 func (s *snapshotSender) stop() { close(s.stopc) }
 func (s *snapshotSender) stop() { close(s.stopc) }
 
 
 func (s *snapshotSender) send(merged snap.Message) {
 func (s *snapshotSender) send(merged snap.Message) {
+	start := time.Now()
+
 	m := merged.Message
 	m := merged.Message
+	to := types.ID(m.To).String()
 
 
 	body := createSnapBody(s.tr.Logger, merged)
 	body := createSnapBody(s.tr.Logger, merged)
 	defer body.Close()
 	defer body.Close()
@@ -79,7 +82,7 @@ func (s *snapshotSender) send(merged snap.Message) {
 		s.tr.Logger.Info(
 		s.tr.Logger.Info(
 			"sending database snapshot",
 			"sending database snapshot",
 			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
 			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
-			zap.String("remote-peer-id", types.ID(m.To).String()),
+			zap.String("remote-peer-id", to),
 			zap.Int64("bytes", merged.TotalSize),
 			zap.Int64("bytes", merged.TotalSize),
 			zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
 			zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
 		)
 		)
@@ -94,7 +97,7 @@ func (s *snapshotSender) send(merged snap.Message) {
 			s.tr.Logger.Warn(
 			s.tr.Logger.Warn(
 				"failed to send database snapshot",
 				"failed to send database snapshot",
 				zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
 				zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
-				zap.String("remote-peer-id", types.ID(m.To).String()),
+				zap.String("remote-peer-id", to),
 				zap.Int64("bytes", merged.TotalSize),
 				zap.Int64("bytes", merged.TotalSize),
 				zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
 				zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
 				zap.Error(err),
 				zap.Error(err),
@@ -116,7 +119,8 @@ func (s *snapshotSender) send(merged snap.Message) {
 		// machine knows about it, it would pause a while and retry sending
 		// machine knows about it, it would pause a while and retry sending
 		// new snapshot message.
 		// new snapshot message.
 		s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 		s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
-		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
+		sentFailures.WithLabelValues(to).Inc()
+		snapshotSendFailures.WithLabelValues(to).Inc()
 		return
 		return
 	}
 	}
 	s.status.activate()
 	s.status.activate()
@@ -126,7 +130,7 @@ func (s *snapshotSender) send(merged snap.Message) {
 		s.tr.Logger.Info(
 		s.tr.Logger.Info(
 			"sent database snapshot",
 			"sent database snapshot",
 			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
 			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
-			zap.String("remote-peer-id", types.ID(m.To).String()),
+			zap.String("remote-peer-id", to),
 			zap.Int64("bytes", merged.TotalSize),
 			zap.Int64("bytes", merged.TotalSize),
 			zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
 			zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
 		)
 		)
@@ -134,7 +138,10 @@ func (s *snapshotSender) send(merged snap.Message) {
 		plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
 		plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
 	}
 	}
 
 
-	sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(merged.TotalSize))
+	sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
+
+	snapshotSend.WithLabelValues(to).Inc()
+	snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
 }
 }
 
 
 // post posts the given request.
 // post posts the given request.

+ 6 - 0
etcdserver/api/snap/db.go

@@ -21,6 +21,7 @@ import (
 	"io/ioutil"
 	"io/ioutil"
 	"os"
 	"os"
 	"path/filepath"
 	"path/filepath"
+	"time"
 
 
 	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/fileutil"
 
 
@@ -33,6 +34,8 @@ var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")
 // SaveDBFrom saves snapshot of the database from the given reader. It
 // SaveDBFrom saves snapshot of the database from the given reader. It
 // guarantees the save operation is atomic.
 // guarantees the save operation is atomic.
 func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
 func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
+	start := time.Now()
+
 	f, err := ioutil.TempFile(s.dir, "tmp")
 	f, err := ioutil.TempFile(s.dir, "tmp")
 	if err != nil {
 	if err != nil {
 		return 0, err
 		return 0, err
@@ -40,7 +43,9 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
 	var n int64
 	var n int64
 	n, err = io.Copy(f, r)
 	n, err = io.Copy(f, r)
 	if err == nil {
 	if err == nil {
+		fsyncStart := time.Now()
 		err = fileutil.Fsync(f)
 		err = fileutil.Fsync(f)
+		snapDBFsyncSec.Observe(time.Since(fsyncStart).Seconds())
 	}
 	}
 	f.Close()
 	f.Close()
 	if err != nil {
 	if err != nil {
@@ -69,6 +74,7 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
 		plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
 		plog.Infof("saved database snapshot to disk [total bytes: %d]", n)
 	}
 	}
 
 
+	snapDBSaveSec.Observe(time.Since(start).Seconds())
 	return n, nil
 	return n, nil
 }
 }
 
 

+ 24 - 0
etcdserver/api/snap/metrics.go

@@ -49,10 +49,34 @@ var (
 		// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
 		// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
 		Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
 		Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
 	})
 	})
+
+	snapDBSaveSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "snap_db",
+		Name:      "save_total_duration_seconds",
+		Help:      "The total latency distributions of v3 snapshot save",
+
+		// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+		// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
+		Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
+	})
+
+	snapDBFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "snap_db",
+		Name:      "fsync_duration_seconds",
+		Help:      "The latency distributions of fsyncing .snap.db file",
+
+		// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+		// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+		Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+	})
 )
 )
 
 
 func init() {
 func init() {
 	prometheus.MustRegister(snapMarshallingSec)
 	prometheus.MustRegister(snapMarshallingSec)
 	prometheus.MustRegister(snapSaveSec)
 	prometheus.MustRegister(snapSaveSec)
 	prometheus.MustRegister(snapFsyncSec)
 	prometheus.MustRegister(snapFsyncSec)
+	prometheus.MustRegister(snapDBSaveSec)
+	prometheus.MustRegister(snapDBFsyncSec)
 }
 }