瀏覽代碼

mvcc: add "etcd_mvcc_db_total_size_in_use_in_bytes"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 年之前
父節點
當前提交
a410463a0b
共有 5 個文件被更改,包括 64 次插入10 次删除
  1. 23 2
      mvcc/backend/backend.go
  2. 16 5
      mvcc/backend/batch_tx.go
  3. 5 1
      mvcc/kvstore.go
  4. 1 0
      mvcc/kvstore_test.go
  5. 19 2
      mvcc/metrics.go

+ 23 - 2
mvcc/backend/backend.go

@@ -54,6 +54,10 @@ type Backend interface {
 	Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
 	// Size returns the current size of the backend.
 	Size() int64
+	// SizeInUse returns the current size of the backend logically in use.
+	// Since the backend can manage free space in a non-byte unit such as
+	// number of pages, the returned value can be not exactly accurate in bytes.
+	SizeInUse() int64
 	Defrag() error
 	ForceCommit()
 	Close() error
@@ -74,6 +78,10 @@ type backend struct {
 
 	// size is the number of bytes in the backend
 	size int64
+
+	// sizeInUse is the number of bytes actually used in the backend
+	sizeInUse int64
+
 	// commits counts number of commits since start
 	commits int64
 
@@ -247,6 +255,10 @@ func (b *backend) Size() int64 {
 	return atomic.LoadInt64(&b.size)
 }
 
+func (b *backend) SizeInUse() int64 {
+	return atomic.LoadInt64(&b.sizeInUse)
+}
+
 func (b *backend) run() {
 	defer close(b.donec)
 	t := time.NewTimer(b.batchInterval)
@@ -344,7 +356,11 @@ func (b *backend) defrag() error {
 
 	b.readTx.reset()
 	b.readTx.tx = b.unsafeBegin(false)
-	atomic.StoreInt64(&b.size, b.readTx.tx.Size())
+
+	size := b.readTx.tx.Size()
+	db := b.db
+	atomic.StoreInt64(&b.size, size)
+	atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
 
 	return nil
 }
@@ -405,7 +421,12 @@ func (b *backend) begin(write bool) *bolt.Tx {
 	b.mu.RLock()
 	tx := b.unsafeBegin(write)
 	b.mu.RUnlock()
-	atomic.StoreInt64(&b.size, tx.Size())
+
+	size := tx.Size()
+	db := tx.DB()
+	atomic.StoreInt64(&b.size, size)
+	atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
+
 	return tx
 }
 

+ 16 - 5
mvcc/backend/batch_tx.go

@@ -161,11 +161,22 @@ func (t *batchTx) commit(stop bool) {
 			t.backend.mu.RLock()
 			defer t.backend.mu.RUnlock()
 
-			// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)',
-			// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size().
-			// Server must make sure 'batchTx.commit(false)' does not follow
-			// 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call).
-			atomic.StoreInt64(&t.backend.size, t.tx.Size())
+			// batchTx.commit(true) calls *bolt.Tx.Commit, which
+			// initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
+			// and subsequent *bolt.Tx.Size() call panics.
+			//
+			// This nil pointer reference panic happens when:
+			//   1. batchTx.commit(false) from newBatchTx
+			//   2. batchTx.commit(true) from stopping backend
+			//   3. batchTx.commit(false) from inflight mvcc Hash call
+			//
+			// Check if db is nil to prevent this panic
+			if t.tx.DB() != nil {
+				size := t.tx.Size()
+				db := t.tx.DB()
+				atomic.StoreInt64(&t.backend.size, size)
+				atomic.StoreInt64(&t.backend.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
+			}
 			return
 		}
 

+ 5 - 1
mvcc/kvstore.go

@@ -300,10 +300,14 @@ func (s *store) Restore(b backend.Backend) error {
 }
 
 func (s *store) restore() error {
-	reportDbTotalSizeInBytesMu.Lock()
 	b := s.b
+
+	reportDbTotalSizeInBytesMu.Lock()
 	reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
 	reportDbTotalSizeInBytesMu.Unlock()
+	reportDbTotalSizeInUseInBytesMu.Lock()
+	reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
+	reportDbTotalSizeInUseInBytesMu.Unlock()
 
 	min, max := newRevBytes(), newRevBytes()
 	revToBytes(revision{main: 1}, min)

+ 1 - 0
mvcc/kvstore_test.go

@@ -741,6 +741,7 @@ func (b *fakeBackend) BatchTx() backend.BatchTx
 func (b *fakeBackend) ReadTx() backend.ReadTx                                      { return b.tx }
 func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
 func (b *fakeBackend) Size() int64                                                 { return 0 }
+func (b *fakeBackend) SizeInUse() int64                                            { return 0 }
 func (b *fakeBackend) Snapshot() backend.Snapshot                                  { return nil }
 func (b *fakeBackend) ForceCommit()                                                {}
 func (b *fakeBackend) Defrag() error                                               { return nil }

+ 19 - 2
mvcc/metrics.go

@@ -143,7 +143,7 @@ var (
 		Namespace: "etcd_debugging",
 		Subsystem: "mvcc",
 		Name:      "db_total_size_in_bytes",
-		Help:      "Total size of the underlying database in bytes.",
+		Help:      "Total size of the underlying database physically allocated in bytes. Use etcd_mvcc_db_total_size_in_bytes",
 	},
 		func() float64 {
 			reportDbTotalSizeInBytesMu.RLock()
@@ -155,7 +155,7 @@ var (
 		Namespace: "etcd",
 		Subsystem: "mvcc",
 		Name:      "db_total_size_in_bytes",
-		Help:      "Total size of the underlying database in bytes.",
+		Help:      "Total size of the underlying database physically allocated in bytes.",
 	},
 		func() float64 {
 			reportDbTotalSizeInBytesMu.RLock()
@@ -166,6 +166,22 @@ var (
 	// overridden by mvcc initialization
 	reportDbTotalSizeInBytesMu sync.RWMutex
 	reportDbTotalSizeInBytes   = func() float64 { return 0 }
+
+	dbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "mvcc",
+		Name:      "db_total_size_in_use_in_bytes",
+		Help:      "Total size of the underlying database logically in use in bytes.",
+	},
+		func() float64 {
+			reportDbTotalSizeInUseInBytesMu.RLock()
+			defer reportDbTotalSizeInUseInBytesMu.RUnlock()
+			return reportDbTotalSizeInUseInBytes()
+		},
+	)
+	// overridden by mvcc initialization
+	reportDbTotalSizeInUseInBytesMu sync.RWMutex
+	reportDbTotalSizeInUseInBytes   = func() float64 { return 0 }
 )
 
 func init() {
@@ -185,6 +201,7 @@ func init() {
 	prometheus.MustRegister(dbCompactionKeysCounter)
 	prometheus.MustRegister(dbTotalSizeDebugging)
 	prometheus.MustRegister(dbTotalSize)
+	prometheus.MustRegister(dbTotalSizeInUse)
 }
 
 // ReportEventReceived reports that an event is received.