浏览代码

storage: support Range

Xiang Li 10 年之前
父节点
当前提交
9db360387d
共有 5 个文件被更改,包括 404 次插入155 次删除
  1. 38 0
      storage/index.go
  2. 1 131
      storage/kv.go
  3. 0 24
      storage/kv_test.go
  4. 195 0
      storage/kvstore.go
  5. 170 0
      storage/kvstore_test.go

+ 38 - 0
storage/index.go

@@ -9,11 +9,17 @@ import (
 
 type index interface {
 	Get(key []byte, atIndex uint64) (index uint64, err error)
+	Range(key, end []byte, atIndex uint64) []kipair
 	Put(key []byte, index uint64)
 	Tombstone(key []byte, index uint64) error
 	Compact(index uint64) map[uint64]struct{}
 }
 
+type kipair struct {
+	index uint64
+	key   []byte
+}
+
 type treeIndex struct {
 	sync.RWMutex
 	tree *btree.BTree
@@ -54,6 +60,38 @@ func (ti *treeIndex) Get(key []byte, atIndex uint64) (index uint64, err error) {
 	return keyi.get(atIndex)
 }
 
+func (ti *treeIndex) Range(key, end []byte, atIndex uint64) []kipair {
+	if end == nil {
+		index, err := ti.Get(key, atIndex)
+		if err != nil {
+			return nil
+		}
+		return []kipair{{key: key, index: index}}
+	}
+
+	keyi := &keyIndex{key: key}
+	endi := &keyIndex{key: end}
+	pairs := make([]kipair, 0)
+
+	ti.RLock()
+	defer ti.RUnlock()
+
+	ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
+		if !item.Less(endi) {
+			return false
+		}
+		curKeyi := item.(*keyIndex)
+		index, err := curKeyi.get(atIndex)
+		if err != nil {
+			return true
+		}
+		pairs = append(pairs, kipair{index, curKeyi.key})
+		return true
+	})
+
+	return pairs
+}
+
 func (ti *treeIndex) Tombstone(key []byte, index uint64) error {
 	keyi := &keyIndex{key: key}
 

+ 1 - 131
storage/kv.go

@@ -1,20 +1,6 @@
 package storage
 
-import (
-	"encoding/binary"
-	"log"
-	"sync"
-	"time"
-
-	"github.com/coreos/etcd/storage/backend"
-	"github.com/coreos/etcd/storage/storagepb"
-)
-
-var (
-	batchLimit    = 10000
-	batchInterval = 100 * time.Millisecond
-	keyBucketName = []byte("key")
-)
+import "github.com/coreos/etcd/storage/storagepb"
 
 type KV interface {
 	// Range gets the keys in the range at rangeIndex.
@@ -45,119 +31,3 @@ type KV interface {
 	TnxPut(key, value []byte) (index int64)
 	TnxDeleteRange(key, end []byte) (n, index int64)
 }
-
-type store struct {
-	// read operation MUST hold read lock
-	// write opeartion MUST hold write lock
-	// tnx operation MUST hold write lock
-	sync.RWMutex
-
-	b       backend.Backend
-	kvindex index
-
-	currentIndex uint64
-	marshalBuf   []byte // buffer for marshal protobuf
-}
-
-func newStore(path string) *store {
-	s := &store{
-		b:            backend.New(path, batchInterval, batchLimit),
-		kvindex:      newTreeIndex(),
-		currentIndex: 0,
-		marshalBuf:   make([]byte, 1024*1024),
-	}
-
-	tx := s.b.BatchTx()
-	tx.Lock()
-	tx.UnsafeCreateBucket(keyBucketName)
-	tx.Unlock()
-	s.b.ForceCommit()
-
-	return s
-}
-
-func (s *store) Put(key, value []byte) {
-	s.Lock()
-	defer s.Unlock()
-
-	currentIndex := s.currentIndex + 1
-
-	ibytes := make([]byte, 8)
-	binary.BigEndian.PutUint64(ibytes, currentIndex)
-
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
-	s.currentIndex = currentIndex
-
-	event := storagepb.Event{
-		Type: storagepb.PUT,
-		Kv: storagepb.KeyValue{
-			Key:   key,
-			Value: value,
-		},
-	}
-
-	var (
-		d   []byte
-		err error
-		n   int
-	)
-
-	if event.Size() < len(s.marshalBuf) {
-		n, err = event.MarshalTo(s.marshalBuf)
-		d = s.marshalBuf[:n]
-	} else {
-		d, err = event.Marshal()
-	}
-	if err != nil {
-		log.Fatalf("storage: cannot marshal event: %v", err)
-	}
-
-	tx.UnsafePut(keyBucketName, ibytes, d)
-
-	s.kvindex.Put(key, currentIndex)
-}
-
-func (s *store) Get(key []byte) []byte {
-	s.RLock()
-	defer s.RUnlock()
-
-	index, err := s.kvindex.Get(key, s.currentIndex)
-	if err != nil {
-		return nil
-	}
-
-	ibytes := make([]byte, 8)
-	binary.BigEndian.PutUint64(ibytes, index)
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
-	vs := tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
-	// TODO: the value will be an event type.
-	// TODO: copy out the bytes, decode it, return the value.
-	return vs[0]
-}
-
-func (s *store) Delete(key []byte) error {
-	s.Lock()
-	defer s.Unlock()
-
-	_, err := s.kvindex.Get(key, s.currentIndex)
-	if err != nil {
-		return nil
-	}
-
-	currentIndex := s.currentIndex + 1
-
-	ibytes := make([]byte, 8)
-	binary.BigEndian.PutUint64(ibytes, currentIndex)
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
-	// TODO: the value will be an event type.
-	// A tombstone is simple a "Delete" type event.
-	tx.UnsafePut(keyBucketName, key, []byte("tombstone"))
-
-	return s.kvindex.Tombstone(key, currentIndex)
-}

+ 0 - 24
storage/kv_test.go

@@ -1,24 +0,0 @@
-package storage
-
-import (
-	"crypto/rand"
-	"os"
-	"testing"
-)
-
-func BenchmarkStorePut(b *testing.B) {
-	s := newStore("test")
-	defer os.Remove("test")
-
-	// prepare keys
-	keys := make([][]byte, b.N)
-	for i := 0; i < b.N; i++ {
-		keys[i] = make([]byte, 64)
-		rand.Read(keys[i])
-	}
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		s.Put(keys[i], []byte("foo"))
-	}
-}

+ 195 - 0
storage/kvstore.go

@@ -0,0 +1,195 @@
+package storage
+
+import (
+	"bytes"
+	"encoding/binary"
+	"log"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/storage/backend"
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+var (
+	batchLimit    = 10000
+	batchInterval = 100 * time.Millisecond
+	keyBucketName = []byte("key")
+)
+
+type store struct {
+	// read operation MUST hold read lock
+	// write opeartion MUST hold write lock
+	// tnx operation MUST hold write lock
+	sync.RWMutex
+
+	b       backend.Backend
+	kvindex index
+
+	currentIndex uint64
+}
+
+func newStore(path string) *store {
+	s := &store{
+		b:            backend.New(path, batchInterval, batchLimit),
+		kvindex:      newTreeIndex(),
+		currentIndex: 0,
+	}
+
+	tx := s.b.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket(keyBucketName)
+	tx.Unlock()
+	s.b.ForceCommit()
+
+	return s
+}
+
+func (s *store) Put(key, value []byte) int64 {
+	s.Lock()
+	defer s.Unlock()
+
+	s.put(key, value, s.currentIndex+1, 0)
+	s.currentIndex = s.currentIndex + 1
+	return int64(s.currentIndex)
+}
+
+func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) {
+	s.RLock()
+	defer s.RUnlock()
+
+	if rangeIndex <= 0 {
+		index = int64(s.currentIndex)
+	} else {
+		index = rangeIndex
+	}
+
+	pairs := s.kvindex.Range(key, end, uint64(index))
+	if len(pairs) == 0 {
+		return nil, index
+	}
+	if limit > 0 && len(pairs) > int(limit) {
+		pairs = pairs[:limit]
+	}
+
+	tx := s.b.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+
+	for _, pair := range pairs {
+		ibytes := make([]byte, 8)
+		endbytes := make([]byte, 8)
+		binary.BigEndian.PutUint64(ibytes, pair.index)
+		binary.BigEndian.PutUint64(endbytes, pair.index+1)
+
+		found := false
+
+		vs := tx.UnsafeRange(keyBucketName, ibytes, endbytes, 0)
+		for _, v := range vs {
+			var e storagepb.Event
+			err := e.Unmarshal(v)
+			if err != nil {
+				log.Fatalf("storage: range cannot unmarshal event: %v", err)
+			}
+			if bytes.Equal(e.Kv.Key, pair.key) {
+				if e.Type == storagepb.PUT {
+					kvs = append(kvs, e.Kv)
+				}
+				found = true
+				break
+			}
+		}
+
+		if !found {
+			log.Fatalf("storage: range cannot find key %s at index %d", string(pair.key), pair.index)
+		}
+	}
+	return kvs, index
+}
+
+func (s *store) DeleteRange(key, end []byte) (n, index int64) {
+	s.Lock()
+	defer s.Unlock()
+
+	index = int64(s.currentIndex) + 1
+
+	pairs := s.kvindex.Range(key, end, s.currentIndex)
+	if len(pairs) == 0 {
+		return 0, int64(s.currentIndex)
+	}
+
+	for i, pair := range pairs {
+		ok := s.delete(pair.key, uint64(index), uint32(i))
+		if ok {
+			n++
+		}
+	}
+	if n != 0 {
+		s.currentIndex = s.currentIndex + 1
+	}
+	return n, int64(s.currentIndex)
+}
+
+func (s *store) put(key, value []byte, index uint64, subindex uint32) {
+	ibytes := make([]byte, 8+1+4)
+	indexToBytes(index, subindex, ibytes)
+
+	event := storagepb.Event{
+		Type: storagepb.PUT,
+		Kv: storagepb.KeyValue{
+			Key:   key,
+			Value: value,
+		},
+	}
+
+	d, err := event.Marshal()
+	if err != nil {
+		log.Fatalf("storage: cannot marshal event: %v", err)
+	}
+
+	tx := s.b.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+	tx.UnsafePut(keyBucketName, ibytes, d)
+	s.kvindex.Put(key, index)
+}
+
+func (s *store) delete(key []byte, index uint64, subindex uint32) bool {
+	_, err := s.kvindex.Get(key, index)
+	if err != nil {
+		// key not exist
+		return false
+	}
+
+	ibytes := make([]byte, 8+1+4)
+	indexToBytes(index, subindex, ibytes)
+
+	event := storagepb.Event{
+		Type: storagepb.DELETE,
+		Kv: storagepb.KeyValue{
+			Key: key,
+		},
+	}
+
+	d, err := event.Marshal()
+	if err != nil {
+		log.Fatalf("storage: cannot marshal event: %v", err)
+	}
+
+	tx := s.b.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+	tx.UnsafePut(keyBucketName, ibytes, d)
+	err = s.kvindex.Tombstone(key, index)
+	if err != nil {
+		log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
+	}
+
+	return true
+}
+
+func indexToBytes(index uint64, subindex uint32, bytes []byte) {
+	binary.BigEndian.PutUint64(bytes, index)
+	bytes[8] = '_'
+	binary.BigEndian.PutUint32(bytes[9:], subindex)
+}

+ 170 - 0
storage/kvstore_test.go

@@ -0,0 +1,170 @@
+package storage
+
+import (
+	"crypto/rand"
+	"os"
+	"testing"
+)
+
+func TestRange(t *testing.T) {
+	s := newStore("test")
+	defer os.Remove("test")
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo1"), []byte("bar1"))
+	s.Put([]byte("foo2"), []byte("bar2"))
+
+	tests := []struct {
+		key, end []byte
+		index    int64
+
+		windex int64
+		// TODO: change this to the actual kv
+		wN int64
+	}{
+		{
+			[]byte("foo"), []byte("foo3"), 0,
+			3, 3,
+		},
+		{
+			[]byte("foo"), []byte("foo1"), 0,
+			3, 1,
+		},
+		{
+			[]byte("foo"), []byte("foo3"), 1,
+			1, 1,
+		},
+		{
+			[]byte("foo"), []byte("foo3"), 2,
+			2, 2,
+		},
+	}
+
+	for i, tt := range tests {
+		kvs, index := s.Range(tt.key, tt.end, 0, tt.index)
+		if len(kvs) != int(tt.wN) {
+			t.Errorf("#%d: len(kvs) = %d, want %d", i, len(kvs), tt.wN)
+		}
+		if index != tt.windex {
+			t.Errorf("#%d: index = %d, wang %d", i, tt.index, tt.windex)
+		}
+	}
+}
+
+func TestSimpleDeleteRange(t *testing.T) {
+	tests := []struct {
+		key, end []byte
+
+		windex int64
+		wN     int64
+	}{
+		{
+			[]byte("foo"), []byte("foo1"),
+			4, 1,
+		},
+		{
+			[]byte("foo"), []byte("foo2"),
+			4, 2,
+		},
+		{
+			[]byte("foo"), []byte("foo3"),
+			4, 3,
+		},
+		{
+			[]byte("foo3"), []byte("foo8"),
+			3, 0,
+		},
+	}
+
+	for i, tt := range tests {
+		s := newStore("test")
+
+		s.Put([]byte("foo"), []byte("bar"))
+		s.Put([]byte("foo1"), []byte("bar1"))
+		s.Put([]byte("foo2"), []byte("bar2"))
+
+		n, index := s.DeleteRange(tt.key, tt.end)
+		if n != tt.wN {
+			t.Errorf("#%d: n = %d, want %d", i, n, tt.wN)
+		}
+		if index != tt.windex {
+			t.Errorf("#%d: index = %d, wang %d", i, index, tt.windex)
+		}
+
+		os.Remove("test")
+	}
+}
+
+func TestRangeInSequence(t *testing.T) {
+	s := newStore("test")
+	defer os.Remove("test")
+
+	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo1"), []byte("bar1"))
+	s.Put([]byte("foo2"), []byte("bar2"))
+
+	// remove foo
+	n, index := s.DeleteRange([]byte("foo"), nil)
+	if n != 1 || index != 4 {
+		t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 4)
+	}
+
+	// before removal foo
+	kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 3)
+	if len(kvs) != 3 {
+		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 3)
+	}
+
+	// after removal foo
+	kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 4)
+	if len(kvs) != 2 {
+		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2)
+	}
+
+	// remove again -> expect nothing
+	n, index = s.DeleteRange([]byte("foo"), nil)
+	if n != 0 || index != 4 {
+		t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 4)
+	}
+
+	// remove foo1
+	n, index = s.DeleteRange([]byte("foo"), []byte("foo2"))
+	if n != 1 || index != 5 {
+		t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 5)
+	}
+
+	// after removal foo1
+	kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 5)
+	if len(kvs) != 1 {
+		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1)
+	}
+
+	// remove foo2
+	n, index = s.DeleteRange([]byte("foo2"), []byte("foo3"))
+	if n != 1 || index != 6 {
+		t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 6)
+	}
+
+	// after removal foo2
+	kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 6)
+	if len(kvs) != 0 {
+		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
+	}
+}
+
+func BenchmarkStorePut(b *testing.B) {
+	s := newStore("test")
+	defer os.Remove("test")
+
+	// prepare keys
+	keys := make([][]byte, b.N)
+	for i := 0; i < b.N; i++ {
+		keys[i] = make([]byte, 64)
+		rand.Read(keys[i])
+	}
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		s.Put(keys[i], []byte("foo"))
+	}
+}