Browse Source

backend: cache buckets in read tx

Saves an alloc and about 10% of Range() time.
Anthony Romano 8 years ago
parent
commit
8b872196d0
3 changed files with 53 additions and 42 deletions
  1. 6 3
      mvcc/backend/backend.go
  2. 15 34
      mvcc/backend/batch_tx.go
  3. 32 5
      mvcc/backend/read_tx.go

+ 6 - 3
mvcc/backend/backend.go

@@ -139,8 +139,11 @@ func newBackend(bcfg BackendConfig) *backend {
 		batchInterval: bcfg.BatchInterval,
 		batchInterval: bcfg.BatchInterval,
 		batchLimit:    bcfg.BatchLimit,
 		batchLimit:    bcfg.BatchLimit,
 
 
-		readTx: &readTx{buf: txReadBuffer{
-			txBuffer: txBuffer{make(map[string]*bucketBuffer)}},
+		readTx: &readTx{
+			buf: txReadBuffer{
+				txBuffer: txBuffer{make(map[string]*bucketBuffer)},
+			},
+			buckets: make(map[string]*bolt.Bucket),
 		},
 		},
 
 
 		stopc: make(chan struct{}),
 		stopc: make(chan struct{}),
@@ -339,7 +342,7 @@ func (b *backend) defrag() error {
 		plog.Fatalf("cannot begin tx (%s)", err)
 		plog.Fatalf("cannot begin tx (%s)", err)
 	}
 	}
 
 
-	b.readTx.buf.reset()
+	b.readTx.reset()
 	b.readTx.tx = b.unsafeBegin(false)
 	b.readTx.tx = b.unsafeBegin(false)
 	atomic.StoreInt64(&b.size, b.readTx.tx.Size())
 	atomic.StoreInt64(&b.size, b.readTx.tx.Size())
 
 

+ 15 - 34
mvcc/backend/batch_tx.go

@@ -16,7 +16,6 @@ package backend
 
 
 import (
 import (
 	"bytes"
 	"bytes"
-	"fmt"
 	"math"
 	"math"
 	"sync"
 	"sync"
 	"sync/atomic"
 	"sync/atomic"
@@ -45,13 +44,6 @@ type batchTx struct {
 	pending int
 	pending int
 }
 }
 
 
-var nopLock sync.Locker = &nopLocker{}
-
-type nopLocker struct{}
-
-func (*nopLocker) Lock()   {}
-func (*nopLocker) Unlock() {}
-
 func (t *batchTx) UnsafeCreateBucket(name []byte) {
 func (t *batchTx) UnsafeCreateBucket(name []byte) {
 	_, err := t.tx.CreateBucket(name)
 	_, err := t.tx.CreateBucket(name)
 	if err != nil && err != bolt.ErrBucketExists {
 	if err != nil && err != bolt.ErrBucketExists {
@@ -88,42 +80,32 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
 
 
 // UnsafeRange must be called holding the lock on the tx.
 // UnsafeRange must be called holding the lock on the tx.
 func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
 func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
-	// nop lock since a write txn should already hold a lock over t.tx
-	k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit, nopLock)
-	if err != nil {
-		plog.Fatal(err)
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		plog.Fatalf("bucket %s does not exist", bucketName)
 	}
 	}
-	return k, v
+	return unsafeRange(bucket.Cursor(), key, endKey, limit)
 }
 }
 
 
-func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64, l sync.Locker) (keys [][]byte, vs [][]byte, err error) {
-	l.Lock()
-	bucket := tx.Bucket(bucketName)
-	if bucket == nil {
-		l.Unlock()
-		return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
-	}
-	if len(endKey) == 0 {
-		v := bucket.Get(key)
-		l.Unlock()
-		if v != nil {
-			return append(keys, key), append(vs, v), nil
-		}
-		return nil, nil, nil
-	}
-	c := bucket.Cursor()
-	l.Unlock()
+func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
 	if limit <= 0 {
 	if limit <= 0 {
 		limit = math.MaxInt64
 		limit = math.MaxInt64
 	}
 	}
-	for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
+	var isMatch func(b []byte) bool
+	if len(endKey) > 0 {
+		isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
+	} else {
+		isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
+		limit = 1
+	}
+	for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
 		vs = append(vs, cv)
 		vs = append(vs, cv)
 		keys = append(keys, ck)
 		keys = append(keys, ck)
 		if limit == int64(len(keys)) {
 		if limit == int64(len(keys)) {
 			break
 			break
 		}
 		}
 	}
 	}
-	return keys, vs, nil
+	return keys, vs
 }
 }
 
 
 // UnsafeDelete must be called holding the lock on the tx.
 // UnsafeDelete must be called holding the lock on the tx.
@@ -257,8 +239,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
 		if err := t.backend.readTx.tx.Rollback(); err != nil {
 		if err := t.backend.readTx.tx.Rollback(); err != nil {
 			plog.Fatalf("cannot rollback tx (%s)", err)
 			plog.Fatalf("cannot rollback tx (%s)", err)
 		}
 		}
-		t.backend.readTx.buf.reset()
-		t.backend.readTx.tx = nil
+		t.backend.readTx.reset()
 	}
 	}
 
 
 	t.batchTx.commit(stop)
 	t.batchTx.commit(stop)

+ 32 - 5
mvcc/backend/read_tx.go

@@ -40,9 +40,10 @@ type readTx struct {
 	mu  sync.RWMutex
 	mu  sync.RWMutex
 	buf txReadBuffer
 	buf txReadBuffer
 
 
-	// txmu protects accesses to the Tx on Range requests
-	txmu sync.Mutex
-	tx   *bolt.Tx
+	// txmu protects accesses to buckets and tx on Range requests.
+	txmu    sync.RWMutex
+	tx      *bolt.Tx
+	buckets map[string]*bolt.Bucket
 }
 }
 
 
 func (rt *readTx) Lock()   { rt.mu.RLock() }
 func (rt *readTx) Lock()   { rt.mu.RLock() }
@@ -63,8 +64,28 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]
 	if int64(len(keys)) == limit {
 	if int64(len(keys)) == limit {
 		return keys, vals
 		return keys, vals
 	}
 	}
-	// ignore error since bucket may have been created in this batch
-	k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)), &rt.txmu)
+
+	// find/cache bucket
+	bn := string(bucketName)
+	rt.txmu.RLock()
+	bucket, ok := rt.buckets[bn]
+	rt.txmu.RUnlock()
+	if !ok {
+		rt.txmu.Lock()
+		bucket = rt.tx.Bucket(bucketName)
+		rt.buckets[bn] = bucket
+		rt.txmu.Unlock()
+	}
+
+	// ignore missing bucket since may have been created in this batch
+	if bucket == nil {
+		return keys, vals
+	}
+	rt.txmu.Lock()
+	c := bucket.Cursor()
+	rt.txmu.Unlock()
+
+	k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
 	return append(k2, keys...), append(v2, vals...)
 	return append(k2, keys...), append(v2, vals...)
 }
 }
 
 
@@ -91,3 +112,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
 	}
 	}
 	return rt.buf.ForEach(bucketName, visitor)
 	return rt.buf.ForEach(bucketName, visitor)
 }
 }
+
+func (rt *readTx) reset() {
+	rt.buf.reset()
+	rt.buckets = make(map[string]*bolt.Bucket)
+	rt.tx = nil
+}