|
|
@@ -136,15 +136,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error)
|
|
|
// Commit commits a previous tx and begins a new writable one.
|
|
|
func (t *batchTx) Commit() {
|
|
|
t.Lock()
|
|
|
- defer t.Unlock()
|
|
|
t.commit(false)
|
|
|
+ t.Unlock()
|
|
|
}
|
|
|
|
|
|
// CommitAndStop commits the previous tx and does not create a new one.
|
|
|
func (t *batchTx) CommitAndStop() {
|
|
|
t.Lock()
|
|
|
- defer t.Unlock()
|
|
|
t.commit(true)
|
|
|
+ t.Unlock()
|
|
|
}
|
|
|
|
|
|
func (t *batchTx) Unlock() {
|
|
|
@@ -162,9 +162,11 @@ func (t *batchTx) commit(stop bool) {
|
|
|
}
|
|
|
|
|
|
start := time.Now()
|
|
|
+
|
|
|
// gofail: var beforeCommit struct{}
|
|
|
err := t.tx.Commit()
|
|
|
// gofail: var afterCommit struct{}
|
|
|
+
|
|
|
commitDurations.Observe(time.Since(start).Seconds())
|
|
|
atomic.AddInt64(&t.backend.commits, 1)
|
|
|
|
|
|
@@ -209,21 +211,21 @@ func (t *batchTxBuffered) Unlock() {
|
|
|
|
|
|
func (t *batchTxBuffered) Commit() {
|
|
|
t.Lock()
|
|
|
- defer t.Unlock()
|
|
|
t.commit(false)
|
|
|
+ t.Unlock()
|
|
|
}
|
|
|
|
|
|
func (t *batchTxBuffered) CommitAndStop() {
|
|
|
t.Lock()
|
|
|
- defer t.Unlock()
|
|
|
t.commit(true)
|
|
|
+ t.Unlock()
|
|
|
}
|
|
|
|
|
|
func (t *batchTxBuffered) commit(stop bool) {
|
|
|
// all read txs must be closed to acquire boltdb commit rwlock
|
|
|
t.backend.readTx.mu.Lock()
|
|
|
- defer t.backend.readTx.mu.Unlock()
|
|
|
t.unsafeCommit(stop)
|
|
|
+ t.backend.readTx.mu.Unlock()
|
|
|
}
|
|
|
|
|
|
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|