backend.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package backend
  2. import (
  3. "log"
  4. "time"
  5. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
  6. )
  7. type Backend interface {
  8. BatchTx() BatchTx
  9. ForceCommit()
  10. Close() error
  11. }
  12. type backend struct {
  13. db *bolt.DB
  14. batchInterval time.Duration
  15. batchLimit int
  16. batchTx *batchTx
  17. stopc chan struct{}
  18. startc chan struct{}
  19. donec chan struct{}
  20. }
  21. func New(path string, d time.Duration, limit int) Backend {
  22. db, err := bolt.Open(path, 0600, nil)
  23. if err != nil {
  24. log.Panicf("backend: cannot open database at %s (%v)", path, err)
  25. }
  26. b := &backend{
  27. db: db,
  28. batchInterval: d,
  29. batchLimit: limit,
  30. batchTx: &batchTx{},
  31. stopc: make(chan struct{}),
  32. startc: make(chan struct{}),
  33. donec: make(chan struct{}),
  34. }
  35. b.batchTx.backend = b
  36. go b.run()
  37. <-b.startc
  38. return b
  39. }
  40. // BatchTnx returns the current batch tx in coalescer. The tx can be used for read and
  41. // write operations. The write result can be retrieved within the same tx immediately.
  42. // The write result is isolated with other txs until the current one get committed.
  43. func (b *backend) BatchTx() BatchTx {
  44. return b.batchTx
  45. }
  46. // force commit the current batching tx.
  47. func (b *backend) ForceCommit() {
  48. b.batchTx.Commit()
  49. }
  50. func (b *backend) run() {
  51. defer close(b.donec)
  52. b.batchTx.Commit()
  53. b.startc <- struct{}{}
  54. for {
  55. select {
  56. case <-time.After(b.batchInterval):
  57. case <-b.stopc:
  58. return
  59. }
  60. b.batchTx.Commit()
  61. }
  62. }
  63. func (b *backend) Close() error {
  64. close(b.stopc)
  65. <-b.donec
  66. return b.db.Close()
  67. }