Browse Source

backend: support shrink db

Xiang Li 9 years ago
parent
commit
558640d91e
4 changed files with 163 additions and 1 deletions
  1. 118 1
      storage/backend/backend.go
  2. 42 0
      storage/backend/backend_test.go
  3. 2 0
      storage/backend/batch_tx.go
  4. 1 0
      storage/kvstore_test.go

+ 118 - 1
storage/backend/backend.go

@@ -22,6 +22,7 @@ import (
 	"log"
 	"os"
 	"path"
+	"sync"
 	"sync/atomic"
 	"time"
 
@@ -32,6 +33,8 @@ var (
 	defaultBatchLimit    = 10000
 	defaultBatchInterval = 100 * time.Millisecond
 
+	defragLimit = 10000
+
 	// InitialMmapSize is the initial size of the mmapped region. Setting this larger than
 	// the potential max db size can prevent writer from blocking reader.
 	// This only works for linux.
@@ -44,6 +47,7 @@ type Backend interface {
 	Hash() (uint32, error)
 	// Size returns the current size of the backend.
 	Size() int64
+	Defrag() error
 	ForceCommit()
 	Close() error
 }
@@ -58,6 +62,7 @@ type Snapshot interface {
 }
 
 type backend struct {
+	mu sync.RWMutex
 	db *bolt.DB
 
 	batchInterval time.Duration
@@ -114,9 +119,12 @@ func (b *backend) ForceCommit() {
 
 func (b *backend) Snapshot() Snapshot {
 	b.batchTx.Commit()
+
+	b.mu.RLock()
+	defer b.mu.RUnlock()
 	tx, err := b.db.Begin(false)
 	if err != nil {
-		log.Fatalf("storage: cannot begin tx (%s)", err)
+		log.Fatalf("backend: cannot begin tx (%s)", err)
 	}
 	return &snapshot{tx}
 }
@@ -124,6 +132,8 @@ func (b *backend) Snapshot() Snapshot {
 func (b *backend) Hash() (uint32, error) {
 	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
 
+	b.mu.RLock()
+	defer b.mu.RUnlock()
 	err := b.db.View(func(tx *bolt.Tx) error {
 		c := tx.Cursor()
 		for next, _ := c.First(); next != nil; next, _ = c.Next() {
@@ -177,6 +187,113 @@ func (b *backend) Commits() int64 {
 	return atomic.LoadInt64(&b.commits)
 }
 
+func (b *backend) Defrag() error {
+	// TODO: make this non-blocking?
+	// lock batchTx to ensure nobody is using previous tx, and then
+	// close previous ongoing tx.
+	b.batchTx.Lock()
+	defer b.batchTx.Unlock()
+
+	// lock database after lock tx to avoid deadlock.
+	b.mu.Lock()
+	defer b.mu.Unlock()
+
+	b.batchTx.commit(true)
+	b.batchTx.tx = nil
+
+	tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
+	if err != nil {
+		return err
+	}
+
+	err = defragdb(b.db, tmpdb, defragLimit)
+
+	if err != nil {
+		tmpdb.Close()
+		os.RemoveAll(tmpdb.Path())
+		return err
+	}
+
+	dbp := b.db.Path()
+	tdbp := tmpdb.Path()
+
+	err = b.db.Close()
+	if err != nil {
+		log.Fatalf("backend: cannot close database (%s)", err)
+	}
+	err = tmpdb.Close()
+	if err != nil {
+		log.Fatalf("backend: cannot close database (%s)", err)
+	}
+	err = os.Rename(tdbp, dbp)
+	if err != nil {
+		log.Fatalf("backend: cannot rename database (%s)", err)
+	}
+
+	b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
+	if err != nil {
+		log.Panicf("backend: cannot open database at %s (%v)", dbp, err)
+	}
+	b.batchTx.tx, err = b.db.Begin(true)
+	if err != nil {
+		log.Fatalf("backend: cannot begin tx (%s)", err)
+	}
+
+	return nil
+}
+
+func defragdb(odb, tmpdb *bolt.DB, limit int) error {
+	// open a tx on tmpdb for writes
+	tmptx, err := tmpdb.Begin(true)
+	if err != nil {
+		return err
+	}
+
+	// open a tx on old db for read
+	tx, err := odb.Begin(false)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	c := tx.Cursor()
+
+	count := 0
+	for next, _ := c.First(); next != nil; next, _ = c.Next() {
+		b := tx.Bucket(next)
+		if b == nil {
+			return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
+		}
+
+		tmpb, berr := tmptx.CreateBucketIfNotExists(next)
+		if berr != nil {
+			return berr
+		}
+
+		b.ForEach(func(k, v []byte) error {
+			count++
+			if count > limit {
+				err = tmptx.Commit()
+				if err != nil {
+					return err
+				}
+				tmptx, err = tmpdb.Begin(true)
+				if err != nil {
+					return err
+				}
+				tmpb = tmptx.Bucket(next)
+			}
+			err = tmpb.Put(k, v)
+			if err != nil {
+				return err
+			}
+			return nil
+		})
+	}
+
+	return tmptx.Commit()
+}
+
 // NewTmpBackend creates a backend implementation for testing.
 func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
 	dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")

+ 42 - 0
storage/backend/backend_test.go

@@ -15,6 +15,7 @@
 package backend
 
 import (
+	"fmt"
 	"io/ioutil"
 	"os"
 	"testing"
@@ -115,6 +116,47 @@ func TestBackendBatchIntervalCommit(t *testing.T) {
 	})
 }
 
+func TestBackendDefrag(t *testing.T) {
+	b, tmpPath := NewDefaultTmpBackend()
+	defer cleanup(b, tmpPath)
+
+	tx := b.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket([]byte("test"))
+	for i := 0; i < defragLimit+100; i++ {
+		tx.UnsafePut([]byte("test"), []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
+	}
+	tx.Unlock()
+	b.ForceCommit()
+
+	// shrink and check hash
+	oh, err := b.Hash()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = b.Defrag()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	nh, err := b.Hash()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if oh != nh {
+		t.Errorf("hash = %v, want %v", nh, oh)
+	}
+
+	// try put more keys after shrink.
+	tx = b.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket([]byte("test"))
+	tx.UnsafePut([]byte("test"), []byte("more"), []byte("bar"))
+	tx.Unlock()
+	b.ForceCommit()
+}
+
 func cleanup(b Backend, path string) {
 	b.Close()
 	os.Remove(path)

+ 2 - 0
storage/backend/batch_tx.go

@@ -149,6 +149,8 @@ func (t *batchTx) commit(stop bool) {
 		return
 	}
 
+	t.backend.mu.RLock()
+	defer t.backend.mu.RUnlock()
 	// begin a new tx
 	t.tx, err = t.backend.db.Begin(true)
 	if err != nil {

+ 1 - 0
storage/kvstore_test.go

@@ -593,6 +593,7 @@ func (b *fakeBackend) Hash() (uint32, error)      { return 0, nil }
 func (b *fakeBackend) Size() int64                { return 0 }
 func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
 func (b *fakeBackend) ForceCommit()               {}
+func (b *fakeBackend) Defrag() error              { return nil }
 func (b *fakeBackend) Close() error               { return nil }
 
 type indexGetResp struct {