瀏覽代碼

storage: support tnx

Xiang Li 10 年之前
父節點
當前提交
93ecf36855
共有 3 個文件被更改,包括 149 次插入37 次删除
  1. 1 0
      storage/kv.go
  2. 85 37
      storage/kvstore.go
  3. 63 0
      storage/kvstore_test.go

+ 1 - 0
storage/kv.go

@@ -26,6 +26,7 @@ type KV interface {
 	// until tnx ends. Only one on-going tnx is allowed.
 	TnxBegin()
 	// TnxEnd ends the on-going tnx.
+	// TODO: generate and verify tnx id for safty.
 	TnxEnd()
 	TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64)
 	TnxPut(key, value []byte) (index int64)

+ 85 - 37
storage/kvstore.go

@@ -18,15 +18,14 @@ var (
 )
 
 type store struct {
-	// read operation MUST hold read lock
-	// write opeartion MUST hold write lock
-	// tnx operation MUST hold write lock
-	sync.RWMutex
+	mu sync.RWMutex
 
 	b       backend.Backend
 	kvindex index
 
 	currentIndex uint64
+
+	subIndex uint32 // tracks next subIndex to put into backend
 }
 
 func newStore(path string) *store {
@@ -46,20 +45,48 @@ func newStore(path string) *store {
 }
 
 func (s *store) Put(key, value []byte) int64 {
-	s.Lock()
-	defer s.Unlock()
+	s.TnxBegin()
+	s.put(key, value, s.currentIndex+1)
+	s.TnxEnd()
 
-	s.put(key, value, s.currentIndex+1, 0)
-	s.currentIndex = s.currentIndex + 1
 	return int64(s.currentIndex)
 }
 
 func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) {
-	s.RLock()
-	defer s.RUnlock()
+	s.TnxBegin()
+	kvs, index = s.TnxRange(key, end, limit, rangeIndex)
+	s.TnxEnd()
+
+	return kvs, index
+}
+
+func (s *store) DeleteRange(key, end []byte) (n, index int64) {
+	s.TnxBegin()
+	n = s.deleteRange(key, end, s.currentIndex+1)
+	s.TnxEnd()
+
+	return n, int64(s.currentIndex)
+}
+
+func (s *store) TnxBegin() {
+	s.mu.Lock()
+	s.subIndex = 0
+}
 
+func (s *store) TnxEnd() {
+	if s.subIndex != 0 {
+		s.currentIndex += 1
+	}
+	s.subIndex = 0
+	s.mu.Unlock()
+}
+
+func (s *store) TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) {
 	if rangeIndex <= 0 {
 		index = int64(s.currentIndex)
+		if s.subIndex > 0 {
+			index += 1
+		}
 	} else {
 		index = rangeIndex
 	}
@@ -83,6 +110,7 @@ func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb
 		binary.BigEndian.PutUint64(endbytes, pair.index+1)
 
 		found := false
+		var kv *storagepb.KeyValue
 
 		vs := tx.UnsafeRange(keyBucketName, ibytes, endbytes, 0)
 		for _, v := range vs {
@@ -93,46 +121,40 @@ func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb
 			}
 			if bytes.Equal(e.Kv.Key, pair.key) {
 				if e.Type == storagepb.PUT {
-					kvs = append(kvs, e.Kv)
+					kv = &e.Kv
+				} else {
+					kv = nil
 				}
 				found = true
-				break
 			}
 		}
 
 		if !found {
 			log.Fatalf("storage: range cannot find key %s at index %d", string(pair.key), pair.index)
 		}
+		if kv != nil {
+			kvs = append(kvs, *kv)
+		}
 	}
 	return kvs, index
 }
 
-func (s *store) DeleteRange(key, end []byte) (n, index int64) {
-	s.Lock()
-	defer s.Unlock()
-
-	index = int64(s.currentIndex) + 1
-
-	pairs := s.kvindex.Range(key, end, s.currentIndex)
-	if len(pairs) == 0 {
-		return 0, int64(s.currentIndex)
-	}
+func (s *store) TnxPut(key, value []byte) int64 {
+	s.put(key, value, s.currentIndex+1)
+	return int64(s.currentIndex + 1)
+}
 
-	for i, pair := range pairs {
-		ok := s.delete(pair.key, uint64(index), uint32(i))
-		if ok {
-			n++
-		}
-	}
-	if n != 0 {
-		s.currentIndex = s.currentIndex + 1
+func (s *store) TnxDeleteRange(key, end []byte) (n, index int64) {
+	n = s.deleteRange(key, end, s.currentIndex+1)
+	if n != 0 || s.subIndex != 0 {
+		index = int64(s.currentIndex + 1)
 	}
-	return n, int64(s.currentIndex)
+	return n, index
 }
 
-func (s *store) put(key, value []byte, index uint64, subindex uint32) {
+func (s *store) put(key, value []byte, index uint64) {
 	ibytes := make([]byte, 8+1+4)
-	indexToBytes(index, subindex, ibytes)
+	indexToBytes(index, s.subIndex, ibytes)
 
 	event := storagepb.Event{
 		Type: storagepb.PUT,
@@ -152,17 +174,43 @@ func (s *store) put(key, value []byte, index uint64, subindex uint32) {
 	defer tx.Unlock()
 	tx.UnsafePut(keyBucketName, ibytes, d)
 	s.kvindex.Put(key, index)
+	s.subIndex += 1
+}
+
+func (s *store) deleteRange(key, end []byte, index uint64) int64 {
+	var n int64
+	rindex := index
+	if s.subIndex > 0 {
+		rindex += 1
+	}
+	pairs := s.kvindex.Range(key, end, rindex)
+
+	if len(pairs) == 0 {
+		return 0
+	}
+
+	for _, pair := range pairs {
+		ok := s.delete(pair.key, index)
+		if ok {
+			n++
+		}
+	}
+	return n
 }
 
-func (s *store) delete(key []byte, index uint64, subindex uint32) bool {
-	_, err := s.kvindex.Get(key, index)
+func (s *store) delete(key []byte, index uint64) bool {
+	gindex := index
+	if s.subIndex > 0 {
+		gindex += 1
+	}
+	_, err := s.kvindex.Get(key, gindex)
 	if err != nil {
 		// key not exist
 		return false
 	}
 
 	ibytes := make([]byte, 8+1+4)
-	indexToBytes(index, subindex, ibytes)
+	indexToBytes(index, s.subIndex, ibytes)
 
 	event := storagepb.Event{
 		Type: storagepb.DELETE,
@@ -184,7 +232,7 @@ func (s *store) delete(key []byte, index uint64, subindex uint32) bool {
 	if err != nil {
 		log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
 	}
-
+	s.subIndex += 1
 	return true
 }
 

+ 63 - 0
storage/kvstore_test.go

@@ -152,6 +152,69 @@ func TestRangeInSequence(t *testing.T) {
 	}
 }
 
+func TestOneTnx(t *testing.T) {
+	s := newStore("test")
+	defer os.Remove("test")
+
+	s.TnxBegin()
+	for i := 0; i < 3; i++ {
+		s.TnxPut([]byte("foo"), []byte("bar"))
+		s.TnxPut([]byte("foo1"), []byte("bar1"))
+		s.TnxPut([]byte("foo2"), []byte("bar2"))
+
+		// remove foo
+		n, index := s.TnxDeleteRange([]byte("foo"), nil)
+		if n != 1 || index != 1 {
+			t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1)
+		}
+
+		kvs, index := s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0)
+		if len(kvs) != 2 {
+			t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2)
+		}
+
+		// remove again -> expect nothing
+		n, index = s.TnxDeleteRange([]byte("foo"), nil)
+		if n != 0 || index != 1 {
+			t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 1)
+		}
+
+		// remove foo1
+		n, index = s.TnxDeleteRange([]byte("foo"), []byte("foo2"))
+		if n != 1 || index != 1 {
+			t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1)
+		}
+
+		// after removal foo1
+		kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0)
+		if len(kvs) != 1 {
+			t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1)
+		}
+
+		// remove foo2
+		n, index = s.TnxDeleteRange([]byte("foo2"), []byte("foo3"))
+		if n != 1 || index != 1 {
+			t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1)
+		}
+
+		// after removal foo2
+		kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0)
+		if len(kvs) != 0 {
+			t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
+		}
+	}
+	s.TnxEnd()
+
+	// After tnx
+	kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 1)
+	if len(kvs) != 0 {
+		t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
+	}
+	if index != 1 {
+		t.Fatalf("index = %d, want %d", index, 1)
+	}
+}
+
 func BenchmarkStorePut(b *testing.B) {
 	s := newStore("test")
 	defer os.Remove("test")