read_tx.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package backend
  15. import (
  16. "bytes"
  17. "math"
  18. "sync"
  19. bolt "go.etcd.io/bbolt"
  20. )
  21. // safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
  22. // overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
  23. // is known to never overwrite any key so range is safe.
  24. var safeRangeBucket = []byte("key")
  25. type ReadTx interface {
  26. Lock()
  27. Unlock()
  28. RLock()
  29. RUnlock()
  30. UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
  31. UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
  32. }
  33. type readTx struct {
  34. // mu protects accesses to the txReadBuffer
  35. mu sync.RWMutex
  36. buf txReadBuffer
  37. // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
  38. // txMu protects accesses to buckets and tx on Range requests.
  39. txMu sync.RWMutex
  40. tx *bolt.Tx
  41. buckets map[string]*bolt.Bucket
  42. // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
  43. txWg *sync.WaitGroup
  44. }
  45. func (rt *readTx) Lock() { rt.mu.Lock() }
  46. func (rt *readTx) Unlock() { rt.mu.Unlock() }
  47. func (rt *readTx) RLock() { rt.mu.RLock() }
  48. func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
  49. func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
  50. if endKey == nil {
  51. // forbid duplicates for single keys
  52. limit = 1
  53. }
  54. if limit <= 0 {
  55. limit = math.MaxInt64
  56. }
  57. if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
  58. panic("do not use unsafeRange on non-keys bucket")
  59. }
  60. keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
  61. if int64(len(keys)) == limit {
  62. return keys, vals
  63. }
  64. // find/cache bucket
  65. bn := string(bucketName)
  66. rt.txMu.RLock()
  67. bucket, ok := rt.buckets[bn]
  68. rt.txMu.RUnlock()
  69. if !ok {
  70. rt.txMu.Lock()
  71. bucket = rt.tx.Bucket(bucketName)
  72. rt.buckets[bn] = bucket
  73. rt.txMu.Unlock()
  74. }
  75. // ignore missing bucket since may have been created in this batch
  76. if bucket == nil {
  77. return keys, vals
  78. }
  79. rt.txMu.Lock()
  80. c := bucket.Cursor()
  81. rt.txMu.Unlock()
  82. k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
  83. return append(k2, keys...), append(v2, vals...)
  84. }
  85. func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
  86. dups := make(map[string]struct{})
  87. getDups := func(k, v []byte) error {
  88. dups[string(k)] = struct{}{}
  89. return nil
  90. }
  91. visitNoDup := func(k, v []byte) error {
  92. if _, ok := dups[string(k)]; ok {
  93. return nil
  94. }
  95. return visitor(k, v)
  96. }
  97. if err := rt.buf.ForEach(bucketName, getDups); err != nil {
  98. return err
  99. }
  100. rt.txMu.Lock()
  101. err := unsafeForEach(rt.tx, bucketName, visitNoDup)
  102. rt.txMu.Unlock()
  103. if err != nil {
  104. return err
  105. }
  106. return rt.buf.ForEach(bucketName, visitor)
  107. }
  108. func (rt *readTx) reset() {
  109. rt.buf.reset()
  110. rt.buckets = make(map[string]*bolt.Bucket)
  111. rt.tx = nil
  112. rt.txWg = new(sync.WaitGroup)
  113. }
  114. // TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
  115. type concurrentReadTx struct {
  116. buf txReadBuffer
  117. txMu *sync.RWMutex
  118. tx *bolt.Tx
  119. buckets map[string]*bolt.Bucket // note: A map value is a pointer
  120. txWg *sync.WaitGroup
  121. }
  122. func (rt *concurrentReadTx) Lock() {}
  123. func (rt *concurrentReadTx) Unlock() {}
  124. func (rt *concurrentReadTx) RLock() {}
  125. func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
  126. func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
  127. dups := make(map[string]struct{})
  128. getDups := func(k, v []byte) error {
  129. dups[string(k)] = struct{}{}
  130. return nil
  131. }
  132. visitNoDup := func(k, v []byte) error {
  133. if _, ok := dups[string(k)]; ok {
  134. return nil
  135. }
  136. return visitor(k, v)
  137. }
  138. if err := rt.buf.ForEach(bucketName, getDups); err != nil {
  139. return err
  140. }
  141. rt.txMu.Lock()
  142. err := unsafeForEach(rt.tx, bucketName, visitNoDup)
  143. rt.txMu.Unlock()
  144. if err != nil {
  145. return err
  146. }
  147. return rt.buf.ForEach(bucketName, visitor)
  148. }
  149. func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
  150. if endKey == nil {
  151. // forbid duplicates for single keys
  152. limit = 1
  153. }
  154. if limit <= 0 {
  155. limit = math.MaxInt64
  156. }
  157. if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
  158. panic("do not use unsafeRange on non-keys bucket")
  159. }
  160. keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
  161. if int64(len(keys)) == limit {
  162. return keys, vals
  163. }
  164. // find/cache bucket
  165. bn := string(bucketName)
  166. rt.txMu.RLock()
  167. bucket, ok := rt.buckets[bn]
  168. rt.txMu.RUnlock()
  169. if !ok {
  170. rt.txMu.Lock()
  171. bucket = rt.tx.Bucket(bucketName)
  172. rt.buckets[bn] = bucket
  173. rt.txMu.Unlock()
  174. }
  175. // ignore missing bucket since may have been created in this batch
  176. if bucket == nil {
  177. return keys, vals
  178. }
  179. rt.txMu.Lock()
  180. c := bucket.Cursor()
  181. rt.txMu.Unlock()
  182. k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
  183. return append(k2, keys...), append(v2, vals...)
  184. }