|
|
@@ -42,10 +42,13 @@ type readTx struct {
|
|
|
mu sync.RWMutex
|
|
|
buf txReadBuffer
|
|
|
|
|
|
- // txmu protects accesses to buckets and tx on Range requests.
|
|
|
- txmu sync.RWMutex
|
|
|
+ // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
|
|
|
+ // txMu protects accesses to buckets and tx on Range requests.
|
|
|
+ txMu sync.RWMutex
|
|
|
tx *bolt.Tx
|
|
|
buckets map[string]*bolt.Bucket
|
|
|
+ // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
|
|
|
+ txWg *sync.WaitGroup
|
|
|
}
|
|
|
|
|
|
func (rt *readTx) Lock() { rt.mu.Lock() }
|
|
|
@@ -71,23 +74,23 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]
|
|
|
|
|
|
// find/cache bucket
|
|
|
bn := string(bucketName)
|
|
|
- rt.txmu.RLock()
|
|
|
+ rt.txMu.RLock()
|
|
|
bucket, ok := rt.buckets[bn]
|
|
|
- rt.txmu.RUnlock()
|
|
|
+ rt.txMu.RUnlock()
|
|
|
if !ok {
|
|
|
- rt.txmu.Lock()
|
|
|
+ rt.txMu.Lock()
|
|
|
bucket = rt.tx.Bucket(bucketName)
|
|
|
rt.buckets[bn] = bucket
|
|
|
- rt.txmu.Unlock()
|
|
|
+ rt.txMu.Unlock()
|
|
|
}
|
|
|
|
|
|
// ignore missing bucket since may have been created in this batch
|
|
|
if bucket == nil {
|
|
|
return keys, vals
|
|
|
}
|
|
|
- rt.txmu.Lock()
|
|
|
+ rt.txMu.Lock()
|
|
|
c := bucket.Cursor()
|
|
|
- rt.txmu.Unlock()
|
|
|
+ rt.txMu.Unlock()
|
|
|
|
|
|
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
|
|
|
return append(k2, keys...), append(v2, vals...)
|
|
|
@@ -108,9 +111,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
|
|
|
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- rt.txmu.Lock()
|
|
|
+ rt.txMu.Lock()
|
|
|
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
|
|
|
- rt.txmu.Unlock()
|
|
|
+ rt.txMu.Unlock()
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -121,4 +124,83 @@ func (rt *readTx) reset() {
|
|
|
rt.buf.reset()
|
|
|
rt.buckets = make(map[string]*bolt.Bucket)
|
|
|
rt.tx = nil
|
|
|
+ rt.txWg = new(sync.WaitGroup)
|
|
|
+}
|
|
|
+
|
|
|
+// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
|
|
|
+type concurrentReadTx struct {
|
|
|
+ buf txReadBuffer
|
|
|
+ txMu *sync.RWMutex
|
|
|
+ tx *bolt.Tx
|
|
|
+ buckets map[string]*bolt.Bucket // note: A map value is a pointer
|
|
|
+ txWg *sync.WaitGroup
|
|
|
+}
|
|
|
+
|
|
|
+func (rt *concurrentReadTx) Lock() {}
|
|
|
+func (rt *concurrentReadTx) Unlock() {}
|
|
|
+func (rt *concurrentReadTx) RLock() {}
|
|
|
+func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
|
|
|
+
|
|
|
+func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
|
|
|
+ dups := make(map[string]struct{})
|
|
|
+ getDups := func(k, v []byte) error {
|
|
|
+ dups[string(k)] = struct{}{}
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ visitNoDup := func(k, v []byte) error {
|
|
|
+ if _, ok := dups[string(k)]; ok {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return visitor(k, v)
|
|
|
+ }
|
|
|
+ if err := rt.buf.ForEach(bucketName, getDups); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ rt.txMu.Lock()
|
|
|
+ err := unsafeForEach(rt.tx, bucketName, visitNoDup)
|
|
|
+ rt.txMu.Unlock()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return rt.buf.ForEach(bucketName, visitor)
|
|
|
+}
|
|
|
+
|
|
|
+func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
|
|
+ if endKey == nil {
|
|
|
+ // forbid duplicates for single keys
|
|
|
+ limit = 1
|
|
|
+ }
|
|
|
+ if limit <= 0 {
|
|
|
+ limit = math.MaxInt64
|
|
|
+ }
|
|
|
+ if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
|
|
|
+ panic("do not use unsafeRange on non-keys bucket")
|
|
|
+ }
|
|
|
+ keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
|
|
|
+ if int64(len(keys)) == limit {
|
|
|
+ return keys, vals
|
|
|
+ }
|
|
|
+
|
|
|
+ // find/cache bucket
|
|
|
+ bn := string(bucketName)
|
|
|
+ rt.txMu.RLock()
|
|
|
+ bucket, ok := rt.buckets[bn]
|
|
|
+ rt.txMu.RUnlock()
|
|
|
+ if !ok {
|
|
|
+ rt.txMu.Lock()
|
|
|
+ bucket = rt.tx.Bucket(bucketName)
|
|
|
+ rt.buckets[bn] = bucket
|
|
|
+ rt.txMu.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ // ignore missing bucket since may have been created in this batch
|
|
|
+ if bucket == nil {
|
|
|
+ return keys, vals
|
|
|
+ }
|
|
|
+ rt.txMu.Lock()
|
|
|
+ c := bucket.Cursor()
|
|
|
+ rt.txMu.Unlock()
|
|
|
+
|
|
|
+ k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
|
|
|
+ return append(k2, keys...), append(v2, vals...)
|
|
|
}
|