|
|
@@ -65,6 +65,8 @@ type Backend interface {
|
|
|
// Since the backend can manage free space in a non-byte unit such as
|
|
|
// number of pages, the returned value can be not exactly accurate in bytes.
|
|
|
SizeInUse() int64
|
|
|
+ // OpenReadTxN returns the number of currently open read transactions in the backend.
|
|
|
+ OpenReadTxN() int64
|
|
|
Defrag() error
|
|
|
ForceCommit()
|
|
|
Close() error
|
|
|
@@ -89,6 +91,8 @@ type backend struct {
|
|
|
sizeInUse int64
|
|
|
// commits counts number of commits since start
|
|
|
commits int64
|
|
|
+ // openReadTxN is the number of currently open read transactions in the backend
|
|
|
+ openReadTxN int64
|
|
|
|
|
|
mu sync.RWMutex
|
|
|
db *bolt.DB
|
|
|
@@ -198,6 +202,7 @@ func (b *backend) ConcurrentReadTx() ReadTx {
|
|
|
defer b.readTx.RUnlock()
|
|
|
// prevent boltdb read Tx from been rolled back until store read Tx is done.
|
|
|
b.readTx.txWg.Add(1)
|
|
|
+ // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
|
|
|
return &concurrentReadTx{
|
|
|
buf: b.readTx.buf.unsafeCopy(),
|
|
|
tx: b.readTx.tx,
|
|
|
@@ -513,6 +518,7 @@ func (b *backend) begin(write bool) *bolt.Tx {
|
|
|
db := tx.DB()
|
|
|
atomic.StoreInt64(&b.size, size)
|
|
|
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
|
|
+ atomic.StoreInt64(&b.openReadTxN, int64(db.Stats().OpenTxN))
|
|
|
|
|
|
return tx
|
|
|
}
|
|
|
@@ -529,6 +535,10 @@ func (b *backend) unsafeBegin(write bool) *bolt.Tx {
|
|
|
return tx
|
|
|
}
|
|
|
|
|
|
+func (b *backend) OpenReadTxN() int64 {
|
|
|
+ return atomic.LoadInt64(&b.openReadTxN)
|
|
|
+}
|
|
|
+
|
|
|
// NewTmpBackend creates a backend implementation for testing.
|
|
|
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
|
|
|
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
|