Browse Source

storage: add a simple backend and kv example

Xiang Li 10 years ago
parent
commit
4b0d9f69c7

+ 107 - 0
storage/backend/backend.go

@@ -0,0 +1,107 @@
+package backend
+
+import (
+	"log"
+	"time"
+
+	"github.com/boltdb/bolt"
+)
+
+type Backend interface {
+	BatchTx() BatchTx
+	ForceCommit()
+	Close() error
+}
+
+type backend struct {
+	db *bolt.DB
+
+	batchInterval time.Duration
+	batchLimit    int
+	batchTx       *batchTx
+
+	stopc  chan struct{}
+	startc chan struct{}
+	donec  chan struct{}
+}
+
+func New(path string, d time.Duration, limit int) Backend {
+	db, err := bolt.Open(path, 0600, nil)
+	if err != nil {
+		log.Panicf("backend: cannot open database at %s (%v)", path, err)
+	}
+
+	b := &backend{
+		db: db,
+
+		batchInterval: d,
+		batchLimit:    limit,
+		batchTx:       &batchTx{},
+
+		stopc:  make(chan struct{}),
+		startc: make(chan struct{}),
+		donec:  make(chan struct{}),
+	}
+	b.batchTx.backend = b
+	go b.run()
+	<-b.startc
+	return b
+}
+
+// BatchTnx returns the current batch tx in coalescer. The tx can be used for read and
+// write operations. The write result can be retrieved within the same tx immediately.
+// The write result is isolated with other txs until the current one get committed.
+func (b *backend) BatchTx() BatchTx {
+	return b.batchTx
+}
+
+// force commit the current batching tx.
+func (b *backend) ForceCommit() {
+	b.batchTx.Lock()
+	b.commitAndBegin()
+	b.batchTx.Unlock()
+}
+
+func (b *backend) run() {
+	defer close(b.donec)
+
+	b.batchTx.Lock()
+	b.commitAndBegin()
+	b.batchTx.Unlock()
+	b.startc <- struct{}{}
+
+	for {
+		select {
+		case <-time.After(b.batchInterval):
+		case <-b.stopc:
+			return
+		}
+		b.batchTx.Lock()
+		b.commitAndBegin()
+		b.batchTx.Unlock()
+	}
+}
+
+func (b *backend) Close() error {
+	close(b.stopc)
+	<-b.donec
+	return b.db.Close()
+}
+
+// commitAndBegin commits a previous tx and begins a new writable one.
+func (b *backend) commitAndBegin() {
+	var err error
+	// commit the last batchTx
+	if b.batchTx.tx != nil {
+		err = b.batchTx.tx.Commit()
+		if err != nil {
+			log.Fatalf("storage: cannot commit tx (%s)", err)
+		}
+	}
+
+	// begin a new tx
+	b.batchTx.tx, err = b.db.Begin(true)
+	if err != nil {
+		log.Fatalf("storage: cannot begin tx (%s)", err)
+	}
+}

+ 36 - 0
storage/backend/backend_bench_test.go

@@ -0,0 +1,36 @@
+package backend
+
+import (
+	"crypto/rand"
+	"os"
+	"testing"
+	"time"
+)
+
+func BenchmarkBackendPut(b *testing.B) {
+	backend := New("test", 100*time.Millisecond, 10000)
+	defer backend.Close()
+	defer os.Remove("test")
+
+	// prepare keys
+	keys := make([][]byte, b.N)
+	for i := 0; i < b.N; i++ {
+		keys[i] = make([]byte, 64)
+		rand.Read(keys[i])
+	}
+	value := make([]byte, 128)
+	rand.Read(value)
+
+	batchTx := backend.BatchTx()
+
+	batchTx.Lock()
+	batchTx.UnsafeCreateBucket([]byte("test"))
+	batchTx.Unlock()
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		batchTx.Lock()
+		batchTx.UnsafePut([]byte("test"), keys[i], value)
+		batchTx.Unlock()
+	}
+}

+ 61 - 0
storage/backend/backend_test.go

@@ -0,0 +1,61 @@
+package backend
+
+import (
+	"os"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestBackendPut(t *testing.T) {
+	backend := New("test", 10*time.Second, 10000)
+	defer backend.Close()
+	defer os.Remove("test")
+
+	v := []byte("foo")
+
+	batchTx := backend.BatchTx()
+	batchTx.Lock()
+
+	batchTx.UnsafeCreateBucket([]byte("test"))
+
+	batchTx.UnsafePut([]byte("test"), []byte("foo"), v)
+	gv := batchTx.UnsafeRange([]byte("test"), v, nil, -1)
+	if !reflect.DeepEqual(gv[0], v) {
+		t.Errorf("v = %s, want %s", string(gv[0]), string(v))
+	}
+
+	batchTx.Unlock()
+}
+
+func TestBackendForceCommit(t *testing.T) {
+	backend := New("test", 10*time.Second, 10000)
+	defer backend.Close()
+	defer os.Remove("test")
+
+	v := []byte("foo")
+	batchTx := backend.BatchTx()
+
+	batchTx.Lock()
+
+	batchTx.UnsafeCreateBucket([]byte("test"))
+	batchTx.UnsafePut([]byte("test"), []byte("foo"), v)
+
+	batchTx.Unlock()
+
+	// expect to see nothing that the batch tx created
+	tx := backend.ReadTnx()
+	gbucket := tx.Bucket([]byte("test"))
+	if gbucket != nil {
+		t.Errorf("readtx.bu = %p, want nil", gbucket)
+	}
+	tx.Commit()
+
+	// commit batch tx
+	backend.ForceCommit()
+	tx = backend.ReadTnx()
+	gbucket = tx.Bucket([]byte("test"))
+	if gbucket == nil {
+		t.Errorf("readtx.bu = nil, want not nil")
+	}
+}

+ 98 - 0
storage/backend/batch_tx.go

@@ -0,0 +1,98 @@
+package backend
+
+import (
+	"bytes"
+	"log"
+	"sync"
+
+	"github.com/boltdb/bolt"
+)
+
+type BatchTx interface {
+	Lock()
+	Unlock()
+	UnsafeCreateBucket(name []byte)
+	UnsafePut(bucketName []byte, key []byte, value []byte)
+	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte
+	UnsafeDelete(bucketName []byte, key []byte)
+}
+
+type batchTx struct {
+	mu      sync.Mutex
+	tx      *bolt.Tx
+	backend *backend
+	pending int
+}
+
+func (t *batchTx) Lock() {
+	t.mu.Lock()
+}
+
+func (t *batchTx) Unlock() {
+	t.mu.Unlock()
+}
+
+func (t *batchTx) UnsafeCreateBucket(name []byte) {
+	_, err := t.tx.CreateBucket(name)
+	if err != nil && err != bolt.ErrBucketExists {
+		log.Fatalf("storage: cannot create bucket %s (%v)", string(name), err)
+	}
+}
+
+// before calling unsafePut, the caller MUST hold the lock on tnx.
+func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		log.Fatalf("storage: bucket %s does not exist", string(bucketName))
+	}
+	if err := bucket.Put(key, value); err != nil {
+		log.Fatalf("storage: cannot put key into bucket (%v)", err)
+	}
+	t.pending++
+	if t.pending > t.backend.batchLimit {
+		t.backend.commitAndBegin()
+		t.pending = 0
+	}
+}
+
+// before calling unsafeRange, the caller MUST hold the lock on tnx.
+func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte {
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		log.Fatalf("storage: bucket %s does not exist", string(bucketName))
+	}
+
+	var vs [][]byte
+
+	if len(endKey) == 0 {
+		if v := bucket.Get(key); v == nil {
+			return vs
+		} else {
+			return append(vs, v)
+		}
+	}
+
+	c := bucket.Cursor()
+	for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
+		vs = append(vs, cv)
+	}
+
+	return vs
+}
+
+// before calling unsafeDelete, the caller MUST hold the lock on tnx.
+func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		log.Fatalf("storage: bucket %s does not exist", string(bucketName))
+	}
+	err := bucket.Delete(key)
+	if err != nil {
+		log.Fatalf("storage: cannot delete key from bucket (%v)", err)
+	}
+	t.pending++
+	if t.pending > t.backend.batchLimit {
+		t.backend.commitAndBegin()
+		t.pending = 0
+	}
+}

+ 66 - 0
storage/kv.go

@@ -0,0 +1,66 @@
+package storage
+
+import (
+	"encoding/binary"
+	"time"
+
+	"github.com/coreos/etcd/storage/backend"
+)
+
+var (
+	batchLimit    = 10000
+	batchInterval = 100 * time.Millisecond
+	keyBucketName = []byte("key")
+)
+
+type store struct {
+	b       backend.Backend
+	kvindex index
+
+	now uint64 // current index of the store
+}
+
+func newStore(path string) *store {
+	s := &store{
+		b:       backend.New(path, batchInterval, batchLimit),
+		kvindex: newTreeIndex(),
+		now:     0,
+	}
+
+	tx := s.b.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket(keyBucketName)
+	tx.Unlock()
+	s.b.ForceCommit()
+
+	return s
+}
+
+func (s *store) Put(key, value []byte) {
+	now := s.now + 1
+
+	s.kvindex.Put(key, now)
+	ibytes := make([]byte, 8)
+	binary.BigEndian.PutUint64(ibytes, now)
+
+	tx := s.b.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+	s.now = now
+	tx.UnsafePut(keyBucketName, ibytes, value)
+}
+
+func (s *store) Get(key []byte) []byte {
+	index, err := s.kvindex.Get(key, s.now)
+	if err != nil {
+		return nil
+	}
+
+	ibytes := make([]byte, 8)
+	binary.BigEndian.PutUint64(ibytes, index)
+	tx := s.b.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+	vs := tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
+	return vs[0]
+}

+ 24 - 0
storage/kv_test.go

@@ -0,0 +1,24 @@
+package storage
+
+import (
+	"crypto/rand"
+	"os"
+	"testing"
+)
+
+func BenchmarkStorePut(b *testing.B) {
+	s := newStore("test")
+	defer os.Remove("test")
+
+	// prepare keys
+	keys := make([][]byte, b.N)
+	for i := 0; i < b.N; i++ {
+		keys[i] = make([]byte, 64)
+		rand.Read(keys[i])
+	}
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		s.Put(keys[i], []byte("foo"))
+	}
+}