Browse Source

backend: don't hold boltdb read txn lock on cursor scanning

Large fetches hold the lock when they do not need to do so.
Anthony Romano 8 years ago
parent
commit
0506f49f9e
2 changed files with 18 additions and 7 deletions
  1. 17 4
      mvcc/backend/batch_tx.go
  2. 1 3
      mvcc/backend/read_tx.go

+ 17 - 4
mvcc/backend/batch_tx.go

@@ -45,6 +45,13 @@ type batchTx struct {
 	pending int
 	pending int
 }
 }
 
 
+var nopLock sync.Locker = &nopLocker{}
+
+type nopLocker struct{}
+
+func (*nopLocker) Lock()   {}
+func (*nopLocker) Unlock() {}
+
 func (t *batchTx) UnsafeCreateBucket(name []byte) {
 func (t *batchTx) UnsafeCreateBucket(name []byte) {
 	_, err := t.tx.CreateBucket(name)
 	_, err := t.tx.CreateBucket(name)
 	if err != nil && err != bolt.ErrBucketExists {
 	if err != nil && err != bolt.ErrBucketExists {
@@ -81,28 +88,34 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
 
 
 // UnsafeRange must be called holding the lock on the tx.
 // UnsafeRange must be called holding the lock on the tx.
 func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
 func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
-	k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit)
+	// nop lock since a write txn should already hold a lock over t.tx
+	k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit, nopLock)
 	if err != nil {
 	if err != nil {
 		plog.Fatal(err)
 		plog.Fatal(err)
 	}
 	}
 	return k, v
 	return k, v
 }
 }
 
 
-func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) {
+func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64, l sync.Locker) (keys [][]byte, vs [][]byte, err error) {
+	l.Lock()
 	bucket := tx.Bucket(bucketName)
 	bucket := tx.Bucket(bucketName)
 	if bucket == nil {
 	if bucket == nil {
+		l.Unlock()
 		return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
 		return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
 	}
 	}
 	if len(endKey) == 0 {
 	if len(endKey) == 0 {
-		if v := bucket.Get(key); v != nil {
+		v := bucket.Get(key)
+		l.Unlock()
+		if v != nil {
 			return append(keys, key), append(vs, v), nil
 			return append(keys, key), append(vs, v), nil
 		}
 		}
 		return nil, nil, nil
 		return nil, nil, nil
 	}
 	}
+	c := bucket.Cursor()
+	l.Unlock()
 	if limit <= 0 {
 	if limit <= 0 {
 		limit = math.MaxInt64
 		limit = math.MaxInt64
 	}
 	}
-	c := bucket.Cursor()
 	for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
 	for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
 		vs = append(vs, cv)
 		vs = append(vs, cv)
 		keys = append(keys, ck)
 		keys = append(keys, ck)

+ 1 - 3
mvcc/backend/read_tx.go

@@ -63,10 +63,8 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]
 	if int64(len(keys)) == limit {
 	if int64(len(keys)) == limit {
 		return keys, vals
 		return keys, vals
 	}
 	}
-	rt.txmu.Lock()
 	// ignore error since bucket may have been created in this batch
 	// ignore error since bucket may have been created in this batch
-	k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)))
-	rt.txmu.Unlock()
+	k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)), &rt.txmu)
 	return append(k2, keys...), append(v2, vals...)
 	return append(k2, keys...), append(v2, vals...)
 }
 }