read_tx.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package backend
  15. import (
  16. "bytes"
  17. "math"
  18. "sync"
  19. bolt "go.etcd.io/bbolt"
  20. )
  21. // safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
  22. // overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
  23. // is known to never overwrite any key so range is safe.
  24. var safeRangeBucket = []byte("key")
  25. type ReadTx interface {
  26. Lock()
  27. Unlock()
  28. RLock()
  29. RUnlock()
  30. UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
  31. UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
  32. }
  33. type readTx struct {
  34. // mu protects accesses to the txReadBuffer
  35. mu sync.RWMutex
  36. buf txReadBuffer
  37. // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
  38. // txMu protects accesses to buckets and tx on Range requests.
  39. txMu sync.RWMutex
  40. tx *bolt.Tx
  41. buckets map[string]*bolt.Bucket
  42. // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
  43. txWg *sync.WaitGroup
  44. }
  45. func (rt *readTx) Lock() { rt.mu.Lock() }
  46. func (rt *readTx) Unlock() { rt.mu.Unlock() }
  47. func (rt *readTx) RLock() { rt.mu.RLock() }
  48. func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
  49. func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
  50. if endKey == nil {
  51. // forbid duplicates for single keys
  52. limit = 1
  53. }
  54. if limit <= 0 {
  55. limit = math.MaxInt64
  56. }
  57. if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
  58. panic("do not use unsafeRange on non-keys bucket")
  59. }
  60. keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
  61. if int64(len(keys)) == limit {
  62. return keys, vals
  63. }
  64. // find/cache bucket
  65. bn := string(bucketName)
  66. rt.txMu.RLock()
  67. bucket, ok := rt.buckets[bn]
  68. rt.txMu.RUnlock()
  69. if !ok {
  70. rt.txMu.Lock()
  71. bucket = rt.tx.Bucket(bucketName)
  72. rt.buckets[bn] = bucket
  73. rt.txMu.Unlock()
  74. }
  75. // ignore missing bucket since may have been created in this batch
  76. if bucket == nil {
  77. return keys, vals
  78. }
  79. rt.txMu.Lock()
  80. c := bucket.Cursor()
  81. rt.txMu.Unlock()
  82. k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
  83. return append(k2, keys...), append(v2, vals...)
  84. }
  85. func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
  86. dups := make(map[string]struct{})
  87. getDups := func(k, v []byte) error {
  88. dups[string(k)] = struct{}{}
  89. return nil
  90. }
  91. visitNoDup := func(k, v []byte) error {
  92. if _, ok := dups[string(k)]; ok {
  93. return nil
  94. }
  95. return visitor(k, v)
  96. }
  97. if err := rt.buf.ForEach(bucketName, getDups); err != nil {
  98. return err
  99. }
  100. rt.txMu.Lock()
  101. err := unsafeForEach(rt.tx, bucketName, visitNoDup)
  102. rt.txMu.Unlock()
  103. if err != nil {
  104. return err
  105. }
  106. return rt.buf.ForEach(bucketName, visitor)
  107. }
  108. func (rt *readTx) reset() {
  109. rt.buf.reset()
  110. rt.buckets = make(map[string]*bolt.Bucket)
  111. rt.tx = nil
  112. rt.txWg = new(sync.WaitGroup)
  113. }
  114. // TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
  115. type concurrentReadTx struct {
  116. buf txReadBuffer
  117. txMu *sync.RWMutex
  118. tx *bolt.Tx
  119. buckets map[string]*bolt.Bucket
  120. txWg *sync.WaitGroup
  121. }
  122. func (rt *concurrentReadTx) Lock() {}
  123. func (rt *concurrentReadTx) Unlock() {}
  124. // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
  125. func (rt *concurrentReadTx) RLock() {}
  126. // RUnlock signals the end of concurrentReadTx.
  127. func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
  128. func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
  129. dups := make(map[string]struct{})
  130. getDups := func(k, v []byte) error {
  131. dups[string(k)] = struct{}{}
  132. return nil
  133. }
  134. visitNoDup := func(k, v []byte) error {
  135. if _, ok := dups[string(k)]; ok {
  136. return nil
  137. }
  138. return visitor(k, v)
  139. }
  140. if err := rt.buf.ForEach(bucketName, getDups); err != nil {
  141. return err
  142. }
  143. rt.txMu.Lock()
  144. err := unsafeForEach(rt.tx, bucketName, visitNoDup)
  145. rt.txMu.Unlock()
  146. if err != nil {
  147. return err
  148. }
  149. return rt.buf.ForEach(bucketName, visitor)
  150. }
  151. func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
  152. if endKey == nil {
  153. // forbid duplicates for single keys
  154. limit = 1
  155. }
  156. if limit <= 0 {
  157. limit = math.MaxInt64
  158. }
  159. if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
  160. panic("do not use unsafeRange on non-keys bucket")
  161. }
  162. keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
  163. if int64(len(keys)) == limit {
  164. return keys, vals
  165. }
  166. // find/cache bucket
  167. bn := string(bucketName)
  168. rt.txMu.RLock()
  169. bucket, ok := rt.buckets[bn]
  170. rt.txMu.RUnlock()
  171. if !ok {
  172. rt.txMu.Lock()
  173. bucket = rt.tx.Bucket(bucketName)
  174. rt.buckets[bn] = bucket
  175. rt.txMu.Unlock()
  176. }
  177. // ignore missing bucket since may have been created in this batch
  178. if bucket == nil {
  179. return keys, vals
  180. }
  181. rt.txMu.Lock()
  182. c := bucket.Cursor()
  183. rt.txMu.Unlock()
  184. k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
  185. return append(k2, keys...), append(v2, vals...)
  186. }