batch_tx.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package backend
  2. import (
  3. "bytes"
  4. "log"
  5. "sync"
  6. "github.com/boltdb/bolt"
  7. )
  8. type BatchTx interface {
  9. Lock()
  10. Unlock()
  11. UnsafeCreateBucket(name []byte)
  12. UnsafePut(bucketName []byte, key []byte, value []byte)
  13. UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte
  14. UnsafeDelete(bucketName []byte, key []byte)
  15. }
  16. type batchTx struct {
  17. mu sync.Mutex
  18. tx *bolt.Tx
  19. backend *backend
  20. pending int
  21. }
  22. func (t *batchTx) Lock() {
  23. t.mu.Lock()
  24. }
  25. func (t *batchTx) Unlock() {
  26. t.mu.Unlock()
  27. }
  28. func (t *batchTx) UnsafeCreateBucket(name []byte) {
  29. _, err := t.tx.CreateBucket(name)
  30. if err != nil && err != bolt.ErrBucketExists {
  31. log.Fatalf("storage: cannot create bucket %s (%v)", string(name), err)
  32. }
  33. }
  34. // before calling unsafePut, the caller MUST hold the lock on tnx.
  35. func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
  36. bucket := t.tx.Bucket(bucketName)
  37. if bucket == nil {
  38. log.Fatalf("storage: bucket %s does not exist", string(bucketName))
  39. }
  40. if err := bucket.Put(key, value); err != nil {
  41. log.Fatalf("storage: cannot put key into bucket (%v)", err)
  42. }
  43. t.pending++
  44. if t.pending > t.backend.batchLimit {
  45. t.backend.commitAndBegin()
  46. t.pending = 0
  47. }
  48. }
  49. // before calling unsafeRange, the caller MUST hold the lock on tnx.
  50. func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte {
  51. bucket := t.tx.Bucket(bucketName)
  52. if bucket == nil {
  53. log.Fatalf("storage: bucket %s does not exist", string(bucketName))
  54. }
  55. var vs [][]byte
  56. if len(endKey) == 0 {
  57. if v := bucket.Get(key); v == nil {
  58. return vs
  59. } else {
  60. return append(vs, v)
  61. }
  62. }
  63. c := bucket.Cursor()
  64. for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
  65. vs = append(vs, cv)
  66. }
  67. return vs
  68. }
  69. // before calling unsafeDelete, the caller MUST hold the lock on tnx.
  70. func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
  71. bucket := t.tx.Bucket(bucketName)
  72. if bucket == nil {
  73. log.Fatalf("storage: bucket %s does not exist", string(bucketName))
  74. }
  75. err := bucket.Delete(key)
  76. if err != nil {
  77. log.Fatalf("storage: cannot delete key from bucket (%v)", err)
  78. }
  79. t.pending++
  80. if t.pending > t.backend.batchLimit {
  81. t.backend.commitAndBegin()
  82. t.pending = 0
  83. }
  84. }