Browse Source

Merge pull request #9761 from gyuho/mmm

etcdserver,mvcc: add more storage layer metrics
Gyuho Lee 7 years ago
parent
commit
c444c1f327
8 changed files with 89 additions and 20 deletions
  1. 14 0
      etcdserver/metrics.go
  2. 4 2
      etcdserver/raft.go
  3. 2 1
      etcdserver/util.go
  4. 4 1
      mvcc/backend/backend.go
  5. 9 13
      mvcc/backend/batch_tx.go
  6. 20 2
      mvcc/backend/metrics.go
  7. 10 1
      mvcc/kvstore.go
  8. 26 0
      mvcc/metrics.go

+ 14 - 0
etcdserver/metrics.go

@@ -42,6 +42,18 @@ var (
 		Name:      "leader_changes_seen_total",
 		Help:      "The number of leader changes seen.",
 	})
+	heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "server",
+		Name:      "heartbeat_send_failures_total",
+		Help:      "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
+	})
+	slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "server",
+		Name:      "slow_apply_total",
+		Help:      "The total number of slow apply requests (likely overloaded from slow disk).",
+	})
 	proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
 		Namespace: "etcd",
 		Subsystem: "server",
@@ -85,6 +97,8 @@ func init() {
 	prometheus.MustRegister(hasLeader)
 	prometheus.MustRegister(isLeader)
 	prometheus.MustRegister(leaderChanges)
+	prometheus.MustRegister(heartbeatSendFailures)
+	prometheus.MustRegister(slowApplies)
 	prometheus.MustRegister(proposalsCommitted)
 	prometheus.MustRegister(proposalsApplied)
 	prometheus.MustRegister(proposalsPending)

+ 4 - 2
etcdserver/raft.go

@@ -368,14 +368,16 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
 				// TODO: limit request rate.
 				if r.lg != nil {
 					r.lg.Warn(
-						"heartbeat took too long to send out; server is overloaded, likely from slow disk",
-						zap.Duration("exceeded", exceed),
+						"leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk",
 						zap.Duration("heartbeat-interval", r.heartbeat),
+						zap.Duration("expected-duration", 2*r.heartbeat),
+						zap.Duration("exceeded-duration", exceed),
 					)
 				} else {
 					plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
 					plog.Warningf("server is likely overloaded")
 				}
+				heartbeatSendFailures.Inc()
 			}
 		}
 	}

+ 2 - 1
etcdserver/util.go

@@ -113,7 +113,7 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, stringer fmt.S
 	if d > warnApplyDuration {
 		if lg != nil {
 			lg.Warn(
-				"request took too long",
+				"apply request took too long",
 				zap.Duration("took", d),
 				zap.Duration("expected-duration", warnApplyDuration),
 				zap.String("prefix", prefix),
@@ -122,5 +122,6 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, stringer fmt.S
 		} else {
 			plog.Warningf("%srequest %q took too long (%v) to execute", prefix, stringer.String(), d)
 		}
+		slowApplies.Inc()
 	}
 }

+ 4 - 1
mvcc/backend/backend.go

@@ -415,6 +415,9 @@ func (b *backend) defrag() error {
 	atomic.StoreInt64(&b.size, size)
 	atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
 
+	took := time.Since(now)
+	defragDurations.Observe(took.Seconds())
+
 	size2, sizeInUse2 := b.Size(), b.SizeInUse()
 	if b.lg != nil {
 		b.lg.Info(
@@ -426,7 +429,7 @@ func (b *backend) defrag() error {
 			zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
 			zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
 			zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
-			zap.Duration("took", time.Since(now)),
+			zap.Duration("took", took),
 		)
 	}
 	return nil

+ 9 - 13
mvcc/backend/batch_tx.go

@@ -183,15 +183,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error)
 // Commit commits a previous tx and begins a new writable one.
 func (t *batchTx) Commit() {
 	t.Lock()
-	defer t.Unlock()
 	t.commit(false)
+	t.Unlock()
 }
 
 // CommitAndStop commits the previous tx and does not create a new one.
 func (t *batchTx) CommitAndStop() {
 	t.Lock()
-	defer t.Unlock()
 	t.commit(true)
+	t.Unlock()
 }
 
 func (t *batchTx) Unlock() {
@@ -215,19 +215,18 @@ func (t *batchTx) commit(stop bool) {
 		}
 
 		start := time.Now()
+
 		// gofail: var beforeCommit struct{}
 		err := t.tx.Commit()
 		// gofail: var afterCommit struct{}
+
 		commitDurations.Observe(time.Since(start).Seconds())
 		atomic.AddInt64(&t.backend.commits, 1)
 
 		t.pending = 0
 		if err != nil {
 			if t.backend.lg != nil {
-				t.backend.lg.Fatal(
-					"failed to commit tx",
-					zap.Error(err),
-				)
+				t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
 			} else {
 				plog.Fatalf("cannot commit tx (%s)", err)
 			}
@@ -269,31 +268,28 @@ func (t *batchTxBuffered) Unlock() {
 
 func (t *batchTxBuffered) Commit() {
 	t.Lock()
-	defer t.Unlock()
 	t.commit(false)
+	t.Unlock()
 }
 
 func (t *batchTxBuffered) CommitAndStop() {
 	t.Lock()
-	defer t.Unlock()
 	t.commit(true)
+	t.Unlock()
 }
 
 func (t *batchTxBuffered) commit(stop bool) {
 	// all read txs must be closed to acquire boltdb commit rwlock
 	t.backend.readTx.mu.Lock()
-	defer t.backend.readTx.mu.Unlock()
 	t.unsafeCommit(stop)
+	t.backend.readTx.mu.Unlock()
 }
 
 func (t *batchTxBuffered) unsafeCommit(stop bool) {
 	if t.backend.readTx.tx != nil {
 		if err := t.backend.readTx.tx.Rollback(); err != nil {
 			if t.backend.lg != nil {
-				t.backend.lg.Fatal(
-					"failed to rollback tx",
-					zap.Error(err),
-				)
+				t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
 			} else {
 				plog.Fatalf("cannot rollback tx (%s)", err)
 			}

+ 20 - 2
mvcc/backend/metrics.go

@@ -22,7 +22,22 @@ var (
 		Subsystem: "disk",
 		Name:      "backend_commit_duration_seconds",
 		Help:      "The latency distributions of commit called by backend.",
-		Buckets:   prometheus.ExponentialBuckets(0.001, 2, 14),
+
+		// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+		// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+		Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+	})
+
+	defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "disk",
+		Name:      "backend_defrag_duration_seconds",
+		Help:      "The latency distribution of backend defragmentation.",
+
+		// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
+		// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+		// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
+		Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
 	})
 
 	snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
@@ -30,12 +45,15 @@ var (
 		Subsystem: "disk",
 		Name:      "backend_snapshot_duration_seconds",
 		Help:      "The latency distribution of backend snapshots.",
-		// 10 ms -> 655 seconds
+
+		// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
+		// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
 		Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
 	})
 )
 
 func init() {
 	prometheus.MustRegister(commitDurations)
+	prometheus.MustRegister(defragDurations)
 	prometheus.MustRegister(snapshotDurations)
 }

+ 10 - 1
mvcc/kvstore.go

@@ -163,12 +163,18 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
 }
 
 func (s *store) Hash() (hash uint32, revision int64, err error) {
+	start := time.Now()
+
 	s.b.ForceCommit()
 	h, err := s.b.Hash(DefaultIgnores)
+
+	hashDurations.Observe(time.Since(start).Seconds())
 	return h, s.currentRev, err
 }
 
 func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
+	start := time.Now()
+
 	s.mu.RLock()
 	s.revMu.RLock()
 	compactRev, currentRev = s.compactMainRev, s.currentRev
@@ -213,7 +219,10 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
 		h.Write(v)
 		return nil
 	})
-	return h.Sum32(), currentRev, compactRev, err
+	hash = h.Sum32()
+
+	hashRevDurations.Observe(time.Since(start).Seconds())
+	return hash, currentRev, compactRev, err
 }
 
 func (s *store) Compact(rev int64) (<-chan struct{}, error) {

+ 26 - 0
mvcc/metrics.go

@@ -170,6 +170,30 @@ var (
 	// overridden by mvcc initialization
 	reportDbTotalSizeInUseInBytesMu sync.RWMutex
 	reportDbTotalSizeInUseInBytes   func() float64 = func() float64 { return 0 }
+
+	hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "mvcc",
+		Name:      "hash_duration_seconds",
+		Help:      "The latency distribution of storage hash operation.",
+
+		// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
+		// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
+		// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
+		Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
+	})
+
+	hashRevDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "mvcc",
+		Name:      "hash_rev_duration_seconds",
+		Help:      "The latency distribution of storage hash by revision operation.",
+
+		// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
+		// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
+		// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
+		Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
+	})
 )
 
 func init() {
@@ -189,6 +213,8 @@ func init() {
 	prometheus.MustRegister(dbCompactionKeysCounter)
 	prometheus.MustRegister(dbTotalSize)
 	prometheus.MustRegister(dbTotalSizeInUse)
+	prometheus.MustRegister(hashDurations)
+	prometheus.MustRegister(hashRevDurations)
 }
 
 // ReportEventReceived reports that an event is received.