backend.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package backend
  2. import (
  3. "io"
  4. "log"
  5. "time"
  6. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
  7. )
  8. type Backend interface {
  9. BatchTx() BatchTx
  10. Snapshot(w io.Writer) (n int64, err error)
  11. ForceCommit()
  12. Close() error
  13. }
  14. type backend struct {
  15. db *bolt.DB
  16. batchInterval time.Duration
  17. batchLimit int
  18. batchTx *batchTx
  19. stopc chan struct{}
  20. donec chan struct{}
  21. }
  22. func New(path string, d time.Duration, limit int) Backend {
  23. return newBackend(path, d, limit)
  24. }
  25. func newBackend(path string, d time.Duration, limit int) *backend {
  26. db, err := bolt.Open(path, 0600, nil)
  27. if err != nil {
  28. log.Panicf("backend: cannot open database at %s (%v)", path, err)
  29. }
  30. b := &backend{
  31. db: db,
  32. batchInterval: d,
  33. batchLimit: limit,
  34. stopc: make(chan struct{}),
  35. donec: make(chan struct{}),
  36. }
  37. b.batchTx = newBatchTx(b)
  38. go b.run()
  39. return b
  40. }
  41. // BatchTx returns the current batch tx in coalescer. The tx can be used for read and
  42. // write operations. The write result can be retrieved within the same tx immediately.
  43. // The write result is isolated with other txs until the current one get committed.
  44. func (b *backend) BatchTx() BatchTx {
  45. return b.batchTx
  46. }
  47. // force commit the current batching tx.
  48. func (b *backend) ForceCommit() {
  49. b.batchTx.Commit()
  50. }
  51. func (b *backend) Snapshot(w io.Writer) (n int64, err error) {
  52. b.db.View(func(tx *bolt.Tx) error {
  53. n, err = tx.WriteTo(w)
  54. return nil
  55. })
  56. return n, err
  57. }
  58. func (b *backend) run() {
  59. defer close(b.donec)
  60. for {
  61. select {
  62. case <-time.After(b.batchInterval):
  63. case <-b.stopc:
  64. b.batchTx.CommitAndStop()
  65. return
  66. }
  67. b.batchTx.Commit()
  68. }
  69. }
  70. func (b *backend) Close() error {
  71. close(b.stopc)
  72. <-b.donec
  73. return b.db.Close()
  74. }