12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package backend
- import (
- "io"
- "log"
- "time"
- "github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
- )
- type Backend interface {
- BatchTx() BatchTx
- Snapshot(w io.Writer) (n int64, err error)
- ForceCommit()
- Close() error
- }
- type backend struct {
- db *bolt.DB
- batchInterval time.Duration
- batchLimit int
- batchTx *batchTx
- stopc chan struct{}
- startc chan struct{}
- donec chan struct{}
- }
- func New(path string, d time.Duration, limit int) Backend {
- db, err := bolt.Open(path, 0600, nil)
- if err != nil {
- log.Panicf("backend: cannot open database at %s (%v)", path, err)
- }
- b := &backend{
- db: db,
- batchInterval: d,
- batchLimit: limit,
- batchTx: &batchTx{},
- stopc: make(chan struct{}),
- startc: make(chan struct{}),
- donec: make(chan struct{}),
- }
- b.batchTx.backend = b
- go b.run()
- <-b.startc
- return b
- }
- // BatchTnx returns the current batch tx in coalescer. The tx can be used for read and
- // write operations. The write result can be retrieved within the same tx immediately.
- // The write result is isolated with other txs until the current one get committed.
- func (b *backend) BatchTx() BatchTx {
- return b.batchTx
- }
- // force commit the current batching tx.
- func (b *backend) ForceCommit() {
- b.batchTx.Commit()
- }
- func (b *backend) Snapshot(w io.Writer) (n int64, err error) {
- b.db.View(func(tx *bolt.Tx) error {
- n, err = tx.WriteTo(w)
- return nil
- })
- return n, err
- }
- func (b *backend) run() {
- defer close(b.donec)
- b.batchTx.Commit()
- b.startc <- struct{}{}
- for {
- select {
- case <-time.After(b.batchInterval):
- case <-b.stopc:
- b.batchTx.Commit()
- return
- }
- b.batchTx.Commit()
- }
- }
- func (b *backend) Close() error {
- close(b.stopc)
- <-b.donec
- return b.db.Close()
- }
|