backend.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package backend
  2. import (
  3. "log"
  4. "time"
  5. "github.com/boltdb/bolt"
  6. )
  7. type Backend interface {
  8. BatchTx() BatchTx
  9. ForceCommit()
  10. Close() error
  11. }
  12. type backend struct {
  13. db *bolt.DB
  14. batchInterval time.Duration
  15. batchLimit int
  16. batchTx *batchTx
  17. stopc chan struct{}
  18. startc chan struct{}
  19. donec chan struct{}
  20. }
  21. func New(path string, d time.Duration, limit int) Backend {
  22. db, err := bolt.Open(path, 0600, nil)
  23. if err != nil {
  24. log.Panicf("backend: cannot open database at %s (%v)", path, err)
  25. }
  26. b := &backend{
  27. db: db,
  28. batchInterval: d,
  29. batchLimit: limit,
  30. batchTx: &batchTx{},
  31. stopc: make(chan struct{}),
  32. startc: make(chan struct{}),
  33. donec: make(chan struct{}),
  34. }
  35. b.batchTx.backend = b
  36. go b.run()
  37. <-b.startc
  38. return b
  39. }
  40. // BatchTnx returns the current batch tx in coalescer. The tx can be used for read and
  41. // write operations. The write result can be retrieved within the same tx immediately.
  42. // The write result is isolated with other txs until the current one get committed.
  43. func (b *backend) BatchTx() BatchTx {
  44. return b.batchTx
  45. }
  46. // force commit the current batching tx.
  47. func (b *backend) ForceCommit() {
  48. b.batchTx.Lock()
  49. b.commitAndBegin()
  50. b.batchTx.Unlock()
  51. }
  52. func (b *backend) run() {
  53. defer close(b.donec)
  54. b.batchTx.Lock()
  55. b.commitAndBegin()
  56. b.batchTx.Unlock()
  57. b.startc <- struct{}{}
  58. for {
  59. select {
  60. case <-time.After(b.batchInterval):
  61. case <-b.stopc:
  62. return
  63. }
  64. b.batchTx.Lock()
  65. b.commitAndBegin()
  66. b.batchTx.Unlock()
  67. }
  68. }
  69. func (b *backend) Close() error {
  70. close(b.stopc)
  71. <-b.donec
  72. return b.db.Close()
  73. }
  74. // commitAndBegin commits a previous tx and begins a new writable one.
  75. func (b *backend) commitAndBegin() {
  76. var err error
  77. // commit the last batchTx
  78. if b.batchTx.tx != nil {
  79. err = b.batchTx.tx.Commit()
  80. if err != nil {
  81. log.Fatalf("storage: cannot commit tx (%s)", err)
  82. }
  83. }
  84. // begin a new tx
  85. b.batchTx.tx, err = b.db.Begin(true)
  86. if err != nil {
  87. log.Fatalf("storage: cannot begin tx (%s)", err)
  88. }
  89. }