// Copyright 2015 CoreOS, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package backend import ( "fmt" "hash/crc32" "io" "io/ioutil" "log" "os" "path" "sync" "sync/atomic" "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt" ) var ( defaultBatchLimit = 10000 defaultBatchInterval = 100 * time.Millisecond defragLimit = 10000 // InitialMmapSize is the initial size of the mmapped region. Setting this larger than // the potential max db size can prevent writer from blocking reader. // This only works for linux. InitialMmapSize = int64(10 * 1024 * 1024 * 1024) ) type Backend interface { BatchTx() BatchTx Snapshot() Snapshot Hash() (uint32, error) // Size returns the current size of the backend. Size() int64 Defrag() error ForceCommit() Close() error } type Snapshot interface { // Size gets the size of the snapshot. Size() int64 // WriteTo writes the snapshot into the given writer. WriteTo(w io.Writer) (n int64, err error) // Close closes the snapshot. Close() error } type backend struct { // size and commits are used with atomic operations so they must be // 64-bit aligned, otherwise 32-bit tests will crash // size is the number of bytes in the backend size int64 // commits counts number of commits since start commits int64 mu sync.RWMutex db *bolt.DB batchInterval time.Duration batchLimit int batchTx *batchTx stopc chan struct{} donec chan struct{} } func New(path string, d time.Duration, limit int) Backend { return newBackend(path, d, limit) } func NewDefaultBackend(path string) Backend { return newBackend(path, defaultBatchInterval, defaultBatchLimit) } func newBackend(path string, d time.Duration, limit int) *backend { db, err := bolt.Open(path, 0600, boltOpenOptions) if err != nil { log.Panicf("backend: cannot open database at %s (%v)", path, err) } b := &backend{ db: db, batchInterval: d, batchLimit: limit, stopc: make(chan struct{}), donec: make(chan struct{}), } b.batchTx = newBatchTx(b) go b.run() return b } // BatchTx returns the current batch tx in coalescer. The tx can be used for read and // write operations. The write result can be retrieved within the same tx immediately. // The write result is isolated with other txs until the current one get committed. func (b *backend) BatchTx() BatchTx { return b.batchTx } // ForceCommit forces the current batching tx to commit. func (b *backend) ForceCommit() { b.batchTx.Commit() } func (b *backend) Snapshot() Snapshot { b.batchTx.Commit() b.mu.RLock() defer b.mu.RUnlock() tx, err := b.db.Begin(false) if err != nil { log.Fatalf("backend: cannot begin tx (%s)", err) } return &snapshot{tx} } func (b *backend) Hash() (uint32, error) { h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) b.mu.RLock() defer b.mu.RUnlock() err := b.db.View(func(tx *bolt.Tx) error { c := tx.Cursor() for next, _ := c.First(); next != nil; next, _ = c.Next() { b := tx.Bucket(next) if b == nil { return fmt.Errorf("cannot get hash of bucket %s", string(next)) } h.Write(next) b.ForEach(func(k, v []byte) error { h.Write(k) h.Write(v) return nil }) } return nil }) if err != nil { return 0, err } return h.Sum32(), nil } func (b *backend) Size() int64 { return atomic.LoadInt64(&b.size) } func (b *backend) run() { defer close(b.donec) for { select { case <-time.After(b.batchInterval): case <-b.stopc: b.batchTx.CommitAndStop() return } b.batchTx.Commit() } } func (b *backend) Close() error { close(b.stopc) <-b.donec return b.db.Close() } // Commits returns total number of commits since start func (b *backend) Commits() int64 { return atomic.LoadInt64(&b.commits) } func (b *backend) Defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. b.batchTx.Lock() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. b.mu.Lock() defer b.mu.Unlock() b.batchTx.commit(true) b.batchTx.tx = nil tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions) if err != nil { return err } err = defragdb(b.db, tmpdb, defragLimit) if err != nil { tmpdb.Close() os.RemoveAll(tmpdb.Path()) return err } dbp := b.db.Path() tdbp := tmpdb.Path() err = b.db.Close() if err != nil { log.Fatalf("backend: cannot close database (%s)", err) } err = tmpdb.Close() if err != nil { log.Fatalf("backend: cannot close database (%s)", err) } err = os.Rename(tdbp, dbp) if err != nil { log.Fatalf("backend: cannot rename database (%s)", err) } b.db, err = bolt.Open(dbp, 0600, boltOpenOptions) if err != nil { log.Panicf("backend: cannot open database at %s (%v)", dbp, err) } b.batchTx.tx, err = b.db.Begin(true) if err != nil { log.Fatalf("backend: cannot begin tx (%s)", err) } return nil } func defragdb(odb, tmpdb *bolt.DB, limit int) error { // open a tx on tmpdb for writes tmptx, err := tmpdb.Begin(true) if err != nil { return err } // open a tx on old db for read tx, err := odb.Begin(false) if err != nil { return err } defer tx.Rollback() c := tx.Cursor() count := 0 for next, _ := c.First(); next != nil; next, _ = c.Next() { b := tx.Bucket(next) if b == nil { return fmt.Errorf("backend: cannot defrag bucket %s", string(next)) } tmpb, berr := tmptx.CreateBucketIfNotExists(next) if berr != nil { return berr } b.ForEach(func(k, v []byte) error { count++ if count > limit { err = tmptx.Commit() if err != nil { return err } tmptx, err = tmpdb.Begin(true) if err != nil { return err } tmpb = tmptx.Bucket(next) } err = tmpb.Put(k, v) if err != nil { return err } return nil }) } return tmptx.Commit() } // NewTmpBackend creates a backend implementation for testing. func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") if err != nil { log.Fatal(err) } tmpPath := path.Join(dir, "database") return newBackend(tmpPath, batchInterval, batchLimit), tmpPath } func NewDefaultTmpBackend() (*backend, string) { return NewTmpBackend(defaultBatchInterval, defaultBatchLimit) } type snapshot struct { *bolt.Tx } func (s *snapshot) Close() error { return s.Tx.Rollback() }