Browse Source

stroage: add tnx id

Xiang Li 10 years ago
parent
commit
cbb8b9bb08
2 changed files with 87 additions and 27 deletions
  1. 51 15
      storage/kvstore.go
  2. 36 12
      storage/kvstore_test.go

+ 51 - 15
storage/kvstore.go

@@ -3,7 +3,9 @@ package storage
 import (
 	"bytes"
 	"encoding/binary"
+	"errors"
 	"log"
+	"math/rand"
 	"sync"
 	"time"
 
@@ -15,6 +17,8 @@ var (
 	batchLimit    = 10000
 	batchInterval = 100 * time.Millisecond
 	keyBucketName = []byte("key")
+
+	ErrTnxIDMismatch = errors.New("storage: tnx id mismatch")
 )
 
 type store struct {
@@ -24,8 +28,10 @@ type store struct {
 	kvindex index
 
 	currentIndex uint64
+	subIndex     uint32 // tracks next subIndex to put into backend
 
-	subIndex uint32 // tracks next subIndex to put into backend
+	tmu   sync.Mutex // protect the tnxID field
+	tnxID int64      // tracks the current tnxID to verify tnx operations
 }
 
 func newStore(path string) *store {
@@ -45,57 +51,87 @@ func newStore(path string) *store {
 }
 
 func (s *store) Put(key, value []byte) int64 {
-	s.TnxBegin()
+	id := s.TnxBegin()
 	s.put(key, value, s.currentIndex+1)
-	s.TnxEnd()
+	s.TnxEnd(id)
 
 	return int64(s.currentIndex)
 }
 
 func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) {
-	s.TnxBegin()
+	id := s.TnxBegin()
 	kvs, index = s.rangeKeys(key, end, limit, rangeIndex)
-	s.TnxEnd()
+	s.TnxEnd(id)
 
 	return kvs, index
 }
 
 func (s *store) DeleteRange(key, end []byte) (n, index int64) {
-	s.TnxBegin()
+	id := s.TnxBegin()
 	n = s.deleteRange(key, end, s.currentIndex+1)
-	s.TnxEnd()
+	s.TnxEnd(id)
 
 	return n, int64(s.currentIndex)
 }
 
-func (s *store) TnxBegin() {
+func (s *store) TnxBegin() int64 {
 	s.mu.Lock()
 	s.subIndex = 0
+
+	s.tmu.Lock()
+	defer s.tmu.Unlock()
+	s.tnxID = rand.Int63()
+	return s.tnxID
 }
 
-func (s *store) TnxEnd() {
+func (s *store) TnxEnd(tnxID int64) error {
+	s.tmu.Lock()
+	defer s.tmu.Unlock()
+	if tnxID != s.tnxID {
+		return ErrTnxIDMismatch
+	}
+
 	if s.subIndex != 0 {
 		s.currentIndex += 1
 	}
 	s.subIndex = 0
 	s.mu.Unlock()
+	return nil
 }
 
-func (s *store) TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) {
-	return s.rangeKeys(key, end, limit, rangeIndex)
+func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) {
+	s.tmu.Lock()
+	defer s.tmu.Unlock()
+	if tnxID != s.tnxID {
+		return nil, 0, ErrTnxIDMismatch
+	}
+	kvs, index = s.rangeKeys(key, end, limit, rangeIndex)
+	return kvs, index, nil
 }
 
-func (s *store) TnxPut(key, value []byte) int64 {
+func (s *store) TnxPut(tnxID int64, key, value []byte) (index int64, err error) {
+	s.tmu.Lock()
+	defer s.tmu.Unlock()
+	if tnxID != s.tnxID {
+		return 0, ErrTnxIDMismatch
+	}
+
 	s.put(key, value, s.currentIndex+1)
-	return int64(s.currentIndex + 1)
+	return int64(s.currentIndex + 1), nil
 }
 
-func (s *store) TnxDeleteRange(key, end []byte) (n, index int64) {
+func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error) {
+	s.tmu.Lock()
+	defer s.tmu.Unlock()
+	if tnxID != s.tnxID {
+		return 0, 0, ErrTnxIDMismatch
+	}
+
 	n = s.deleteRange(key, end, s.currentIndex+1)
 	if n != 0 || s.subIndex != 0 {
 		index = int64(s.currentIndex + 1)
 	}
-	return n, index
+	return n, index, nil
 }
 
 // range is a keyword in Go, add Keys suffix.

+ 36 - 12
storage/kvstore_test.go

@@ -156,54 +156,78 @@ func TestOneTnx(t *testing.T) {
 	s := newStore("test")
 	defer os.Remove("test")
 
-	s.TnxBegin()
+	id := s.TnxBegin()
 	for i := 0; i < 3; i++ {
-		s.TnxPut([]byte("foo"), []byte("bar"))
-		s.TnxPut([]byte("foo1"), []byte("bar1"))
-		s.TnxPut([]byte("foo2"), []byte("bar2"))
+		s.TnxPut(id, []byte("foo"), []byte("bar"))
+		s.TnxPut(id, []byte("foo1"), []byte("bar1"))
+		s.TnxPut(id, []byte("foo2"), []byte("bar2"))
 
 		// remove foo
-		n, index := s.TnxDeleteRange([]byte("foo"), nil)
+		n, index, err := s.TnxDeleteRange(id, []byte("foo"), nil)
+		if err != nil {
+			t.Fatal(err)
+		}
 		if n != 1 || index != 1 {
 			t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1)
 		}
 
-		kvs, index := s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0)
+		kvs, index, err := s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0)
+		if err != nil {
+			t.Fatal(err)
+		}
 		if len(kvs) != 2 {
 			t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2)
 		}
 
 		// remove again -> expect nothing
-		n, index = s.TnxDeleteRange([]byte("foo"), nil)
+		n, index, err = s.TnxDeleteRange(id, []byte("foo"), nil)
+		if err != nil {
+			t.Fatal(err)
+		}
 		if n != 0 || index != 1 {
 			t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 1)
 		}
 
 		// remove foo1
-		n, index = s.TnxDeleteRange([]byte("foo"), []byte("foo2"))
+		n, index, err = s.TnxDeleteRange(id, []byte("foo"), []byte("foo2"))
+		if err != nil {
+			t.Fatal(err)
+		}
 		if n != 1 || index != 1 {
 			t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1)
 		}
 
 		// after removal foo1
-		kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0)
+		kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0)
+		if err != nil {
+			t.Fatal(err)
+		}
 		if len(kvs) != 1 {
 			t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1)
 		}
 
 		// remove foo2
-		n, index = s.TnxDeleteRange([]byte("foo2"), []byte("foo3"))
+		n, index, err = s.TnxDeleteRange(id, []byte("foo2"), []byte("foo3"))
+		if err != nil {
+			t.Fatal(err)
+		}
 		if n != 1 || index != 1 {
 			t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1)
 		}
 
 		// after removal foo2
-		kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0)
+		kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0)
+		if err != nil {
+			t.Fatal(err)
+		}
 		if len(kvs) != 0 {
 			t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
 		}
 	}
-	s.TnxEnd()
+	err := s.TnxEnd(id)
+	if err != nil {
+		t.Fatal(err)
+	}
 
 	// After tnx
 	kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 1)