backend.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package backend
  2. import (
  3. "io"
  4. "log"
  5. "time"
  6. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
  7. )
  8. type Backend interface {
  9. BatchTx() BatchTx
  10. Snapshot(w io.Writer) (n int64, err error)
  11. ForceCommit()
  12. Close() error
  13. }
  14. type backend struct {
  15. db *bolt.DB
  16. batchInterval time.Duration
  17. batchLimit int
  18. batchTx *batchTx
  19. stopc chan struct{}
  20. startc chan struct{}
  21. donec chan struct{}
  22. }
  23. func New(path string, d time.Duration, limit int) Backend {
  24. db, err := bolt.Open(path, 0600, nil)
  25. if err != nil {
  26. log.Panicf("backend: cannot open database at %s (%v)", path, err)
  27. }
  28. b := &backend{
  29. db: db,
  30. batchInterval: d,
  31. batchLimit: limit,
  32. batchTx: &batchTx{},
  33. stopc: make(chan struct{}),
  34. startc: make(chan struct{}),
  35. donec: make(chan struct{}),
  36. }
  37. b.batchTx.backend = b
  38. go b.run()
  39. <-b.startc
  40. return b
  41. }
  42. // BatchTnx returns the current batch tx in coalescer. The tx can be used for read and
  43. // write operations. The write result can be retrieved within the same tx immediately.
  44. // The write result is isolated with other txs until the current one get committed.
  45. func (b *backend) BatchTx() BatchTx {
  46. return b.batchTx
  47. }
  48. // force commit the current batching tx.
  49. func (b *backend) ForceCommit() {
  50. b.batchTx.Commit()
  51. }
  52. func (b *backend) Snapshot(w io.Writer) (n int64, err error) {
  53. b.db.View(func(tx *bolt.Tx) error {
  54. n, err = tx.WriteTo(w)
  55. return nil
  56. })
  57. return n, err
  58. }
  59. func (b *backend) run() {
  60. defer close(b.donec)
  61. b.batchTx.Commit()
  62. b.startc <- struct{}{}
  63. for {
  64. select {
  65. case <-time.After(b.batchInterval):
  66. case <-b.stopc:
  67. b.batchTx.Commit()
  68. return
  69. }
  70. b.batchTx.Commit()
  71. }
  72. }
  73. func (b *backend) Close() error {
  74. close(b.stopc)
  75. <-b.donec
  76. return b.db.Close()
  77. }