batch_tx.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package backend
  15. import (
  16. "bytes"
  17. "math"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. bolt "go.etcd.io/bbolt"
  22. "go.uber.org/zap"
  23. )
  24. type BatchTx interface {
  25. ReadTx
  26. UnsafeCreateBucket(name []byte)
  27. UnsafePut(bucketName []byte, key []byte, value []byte)
  28. UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
  29. UnsafeDelete(bucketName []byte, key []byte)
  30. // Commit commits a previous tx and begins a new writable one.
  31. Commit()
  32. // CommitAndStop commits the previous tx and does not create a new one.
  33. CommitAndStop()
  34. }
  35. type batchTx struct {
  36. sync.Mutex
  37. tx *bolt.Tx
  38. backend *backend
  39. pending int
  40. }
  41. func (t *batchTx) Lock() {
  42. t.Mutex.Lock()
  43. }
  44. func (t *batchTx) Unlock() {
  45. if t.pending >= t.backend.batchLimit {
  46. t.commit(false)
  47. }
  48. t.Mutex.Unlock()
  49. }
  50. // BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
  51. // have appropriate semantics in BatchTx interface. Therefore should not be called.
  52. // TODO: might want to decouple ReadTx and BatchTx
  53. func (t *batchTx) RLock() {
  54. panic("unexpected RLock")
  55. }
  56. func (t *batchTx) RUnlock() {
  57. panic("unexpected RUnlock")
  58. }
  59. func (t *batchTx) UnsafeCreateBucket(name []byte) {
  60. _, err := t.tx.CreateBucket(name)
  61. if err != nil && err != bolt.ErrBucketExists {
  62. if t.backend.lg != nil {
  63. t.backend.lg.Fatal(
  64. "failed to create a bucket",
  65. zap.String("bucket-name", string(name)),
  66. zap.Error(err),
  67. )
  68. } else {
  69. plog.Fatalf("cannot create bucket %s (%v)", name, err)
  70. }
  71. }
  72. t.pending++
  73. }
  74. // UnsafePut must be called holding the lock on the tx.
  75. func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
  76. t.unsafePut(bucketName, key, value, false)
  77. }
  78. // UnsafeSeqPut must be called holding the lock on the tx.
  79. func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
  80. t.unsafePut(bucketName, key, value, true)
  81. }
  82. func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
  83. bucket := t.tx.Bucket(bucketName)
  84. if bucket == nil {
  85. if t.backend.lg != nil {
  86. t.backend.lg.Fatal(
  87. "failed to find a bucket",
  88. zap.String("bucket-name", string(bucketName)),
  89. )
  90. } else {
  91. plog.Fatalf("bucket %s does not exist", bucketName)
  92. }
  93. }
  94. if seq {
  95. // it is useful to increase fill percent when the workloads are mostly append-only.
  96. // this can delay the page split and reduce space usage.
  97. bucket.FillPercent = 0.9
  98. }
  99. if err := bucket.Put(key, value); err != nil {
  100. if t.backend.lg != nil {
  101. t.backend.lg.Fatal(
  102. "failed to write to a bucket",
  103. zap.String("bucket-name", string(bucketName)),
  104. zap.Error(err),
  105. )
  106. } else {
  107. plog.Fatalf("cannot put key into bucket (%v)", err)
  108. }
  109. }
  110. t.pending++
  111. }
  112. // UnsafeRange must be called holding the lock on the tx.
  113. func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
  114. bucket := t.tx.Bucket(bucketName)
  115. if bucket == nil {
  116. if t.backend.lg != nil {
  117. t.backend.lg.Fatal(
  118. "failed to find a bucket",
  119. zap.String("bucket-name", string(bucketName)),
  120. )
  121. } else {
  122. plog.Fatalf("bucket %s does not exist", bucketName)
  123. }
  124. }
  125. return unsafeRange(bucket.Cursor(), key, endKey, limit)
  126. }
  127. func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
  128. if limit <= 0 {
  129. limit = math.MaxInt64
  130. }
  131. var isMatch func(b []byte) bool
  132. if len(endKey) > 0 {
  133. isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
  134. } else {
  135. isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
  136. limit = 1
  137. }
  138. for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
  139. vs = append(vs, cv)
  140. keys = append(keys, ck)
  141. if limit == int64(len(keys)) {
  142. break
  143. }
  144. }
  145. return keys, vs
  146. }
  147. // UnsafeDelete must be called holding the lock on the tx.
  148. func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
  149. bucket := t.tx.Bucket(bucketName)
  150. if bucket == nil {
  151. if t.backend.lg != nil {
  152. t.backend.lg.Fatal(
  153. "failed to find a bucket",
  154. zap.String("bucket-name", string(bucketName)),
  155. )
  156. } else {
  157. plog.Fatalf("bucket %s does not exist", bucketName)
  158. }
  159. }
  160. err := bucket.Delete(key)
  161. if err != nil {
  162. if t.backend.lg != nil {
  163. t.backend.lg.Fatal(
  164. "failed to delete a key",
  165. zap.String("bucket-name", string(bucketName)),
  166. zap.Error(err),
  167. )
  168. } else {
  169. plog.Fatalf("cannot delete key from bucket (%v)", err)
  170. }
  171. }
  172. t.pending++
  173. }
  174. // UnsafeForEach must be called holding the lock on the tx.
  175. func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
  176. return unsafeForEach(t.tx, bucketName, visitor)
  177. }
  178. func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
  179. if b := tx.Bucket(bucket); b != nil {
  180. return b.ForEach(visitor)
  181. }
  182. return nil
  183. }
  184. // Commit commits a previous tx and begins a new writable one.
  185. func (t *batchTx) Commit() {
  186. t.Lock()
  187. t.commit(false)
  188. t.Unlock()
  189. }
  190. // CommitAndStop commits the previous tx and does not create a new one.
  191. func (t *batchTx) CommitAndStop() {
  192. t.Lock()
  193. t.commit(true)
  194. t.Unlock()
  195. }
  196. func (t *batchTx) safePending() int {
  197. t.Mutex.Lock()
  198. defer t.Mutex.Unlock()
  199. return t.pending
  200. }
  201. func (t *batchTx) commit(stop bool) {
  202. // commit the last tx
  203. if t.tx != nil {
  204. if t.pending == 0 && !stop {
  205. return
  206. }
  207. start := time.Now()
  208. // gofail: var beforeCommit struct{}
  209. err := t.tx.Commit()
  210. // gofail: var afterCommit struct{}
  211. rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
  212. spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
  213. writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
  214. commitSec.Observe(time.Since(start).Seconds())
  215. atomic.AddInt64(&t.backend.commits, 1)
  216. t.pending = 0
  217. if err != nil {
  218. if t.backend.lg != nil {
  219. t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
  220. } else {
  221. plog.Fatalf("cannot commit tx (%s)", err)
  222. }
  223. }
  224. }
  225. if !stop {
  226. t.tx = t.backend.begin(true)
  227. }
  228. }
  229. type batchTxBuffered struct {
  230. batchTx
  231. buf txWriteBuffer
  232. }
  233. func newBatchTxBuffered(backend *backend) *batchTxBuffered {
  234. tx := &batchTxBuffered{
  235. batchTx: batchTx{backend: backend},
  236. buf: txWriteBuffer{
  237. txBuffer: txBuffer{make(map[string]*bucketBuffer)},
  238. seq: true,
  239. },
  240. }
  241. tx.Commit()
  242. return tx
  243. }
  244. func (t *batchTxBuffered) Unlock() {
  245. if t.pending != 0 {
  246. t.backend.readTx.Lock() // blocks txReadBuffer for writing.
  247. t.buf.writeback(&t.backend.readTx.buf)
  248. t.backend.readTx.Unlock()
  249. if t.pending >= t.backend.batchLimit {
  250. t.commit(false)
  251. }
  252. }
  253. t.batchTx.Unlock()
  254. }
  255. func (t *batchTxBuffered) Commit() {
  256. t.Lock()
  257. t.commit(false)
  258. t.Unlock()
  259. }
  260. func (t *batchTxBuffered) CommitAndStop() {
  261. t.Lock()
  262. t.commit(true)
  263. t.Unlock()
  264. }
  265. func (t *batchTxBuffered) commit(stop bool) {
  266. // all read txs must be closed to acquire boltdb commit rwlock
  267. t.backend.readTx.Lock()
  268. t.unsafeCommit(stop)
  269. t.backend.readTx.Unlock()
  270. }
  271. func (t *batchTxBuffered) unsafeCommit(stop bool) {
  272. if t.backend.readTx.tx != nil {
  273. // wait all store read transactions using the current boltdb tx to finish,
  274. // then close the boltdb tx
  275. go func(tx *bolt.Tx, wg *sync.WaitGroup) {
  276. wg.Wait()
  277. if err := tx.Rollback(); err != nil {
  278. if t.backend.lg != nil {
  279. t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
  280. } else {
  281. plog.Fatalf("cannot rollback tx (%s)", err)
  282. }
  283. }
  284. }(t.backend.readTx.tx, t.backend.readTx.txWg)
  285. t.backend.readTx.reset()
  286. }
  287. t.batchTx.commit(stop)
  288. if !stop {
  289. t.backend.readTx.tx = t.backend.begin(false)
  290. }
  291. }
  292. func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
  293. t.batchTx.UnsafePut(bucketName, key, value)
  294. t.buf.put(bucketName, key, value)
  295. }
  296. func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
  297. t.batchTx.UnsafeSeqPut(bucketName, key, value)
  298. t.buf.putSeq(bucketName, key, value)
  299. }