Browse Source

mvcc: allow large concurrent reads under light write workload

Xiang 7 years ago
parent
commit
3ebee21407
2 changed files with 11 additions and 6 deletions
  1. 4 6
      internal/mvcc/backend/backend.go
  2. 7 0
      internal/mvcc/backend/batch_tx.go

+ 4 - 6
internal/mvcc/backend/backend.go

@@ -271,7 +271,9 @@ func (b *backend) run() {
 			b.batchTx.CommitAndStop()
 			return
 		}
-		b.batchTx.Commit()
+		if b.batchTx.safePending() != 0 {
+			b.batchTx.Commit()
+		}
 		t.Reset(b.batchInterval)
 	}
 }
@@ -302,11 +304,7 @@ func (b *backend) defrag() error {
 	b.mu.Lock()
 	defer b.mu.Unlock()
 
-	// block concurrent read requests while resetting tx
-	b.readTx.mu.Lock()
-	defer b.readTx.mu.Unlock()
-
-	b.batchTx.unsafeCommit(true)
+	b.batchTx.commit(true)
 	b.batchTx.tx = nil
 
 	tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)

+ 7 - 0
internal/mvcc/backend/batch_tx.go

@@ -98,6 +98,7 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
 		isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
 		limit = 1
 	}
+
 	for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
 		vs = append(vs, cv)
 		keys = append(keys, ck)
@@ -154,6 +155,12 @@ func (t *batchTx) Unlock() {
 	t.Mutex.Unlock()
 }
 
+func (t *batchTx) safePending() int {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+	return t.pending
+}
+
 func (t *batchTx) commit(stop bool) {
 	// commit the last tx
 	if t.tx != nil {