Browse Source

Merge pull request #3616 from yichengq/storage-txn

storage: hold batchTx lock during KV txn
Xiang Li 10 years ago
parent
commit
21179d929f
2 changed files with 36 additions and 13 deletions
  1. 7 13
      storage/kvstore.go
  2. 29 0
      storage/kvstore_test.go

+ 7 - 13
storage/kvstore.go

@@ -51,6 +51,7 @@ type store struct {
 	// the main revision of the last compaction
 	compactMainRev int64
 
+	tx    backend.BatchTx
 	tmu   sync.Mutex // protect the txnID field
 	txnID int64      // tracks the current txnID to verify txn operations
 
@@ -121,6 +122,8 @@ func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
 func (s *store) TxnBegin() int64 {
 	s.mu.Lock()
 	s.currentRev.sub = 0
+	s.tx = s.b.BatchTx()
+	s.tx.Lock()
 
 	s.tmu.Lock()
 	defer s.tmu.Unlock()
@@ -147,6 +150,7 @@ func (s *store) txnEnd(txnID int64) error {
 		return ErrTxnIDMismatch
 	}
 
+	s.tx.Unlock()
 	if s.currentRev.sub != 0 {
 		s.currentRev.main += 1
 	}
@@ -393,14 +397,11 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 		return nil, rev, nil
 	}
 
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
 	for _, revpair := range revpairs {
 		revbytes := newRevBytes()
 		revToBytes(revpair, revbytes)
 
-		_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
+		_, vs := s.tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
 		if len(vs) != 1 {
 			log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
 		}
@@ -447,10 +448,7 @@ func (s *store) put(key, value []byte) {
 		log.Fatalf("storage: cannot marshal event: %v", err)
 	}
 
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
-	tx.UnsafePut(keyBucketName, ibytes, d)
+	s.tx.UnsafePut(keyBucketName, ibytes, d)
 	s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
 	s.currentRev.sub += 1
 }
@@ -475,10 +473,6 @@ func (s *store) deleteRange(key, end []byte) int64 {
 func (s *store) delete(key []byte) {
 	mainrev := s.currentRev.main + 1
 
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
-
 	ibytes := newRevBytes()
 	revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
 
@@ -494,7 +488,7 @@ func (s *store) delete(key []byte) {
 		log.Fatalf("storage: cannot marshal event: %v", err)
 	}
 
-	tx.UnsafePut(keyBucketName, ibytes, d)
+	s.tx.UnsafePut(keyBucketName, ibytes, d)
 	err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
 	if err != nil {
 		log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)

+ 29 - 0
storage/kvstore_test.go

@@ -101,6 +101,7 @@ func TestStorePut(t *testing.T) {
 	for i, tt := range tests {
 		s, b, index := newFakeStore()
 		s.currentRev = tt.rev
+		s.tx = b.BatchTx()
 		index.indexGetRespc <- tt.r
 
 		s.put([]byte("foo"), []byte("bar"))
@@ -162,6 +163,7 @@ func TestStoreRange(t *testing.T) {
 	for i, tt := range tests {
 		s, b, index := newFakeStore()
 		s.currentRev = currev
+		s.tx = b.BatchTx()
 		b.tx.rangeRespc <- tt.r
 		index.indexRangeRespc <- tt.idxr
 
@@ -221,6 +223,7 @@ func TestStoreDeleteRange(t *testing.T) {
 	for i, tt := range tests {
 		s, b, index := newFakeStore()
 		s.currentRev = tt.rev
+		s.tx = b.BatchTx()
 		index.indexRangeRespc <- tt.r
 
 		n := s.deleteRange([]byte("foo"), []byte("goo"))
@@ -649,6 +652,32 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	tx.Unlock()
 }
 
+func TestTxnBlockBackendForceCommit(t *testing.T) {
+	s := newStore(tmpPath)
+	defer os.Remove(tmpPath)
+
+	id := s.TxnBegin()
+
+	done := make(chan struct{})
+	go func() {
+		s.b.ForceCommit()
+		done <- struct{}{}
+	}()
+	select {
+	case <-done:
+		t.Fatalf("failed to block ForceCommit")
+	case <-time.After(100 * time.Millisecond):
+	}
+
+	s.TxnEnd(id)
+	select {
+	case <-done:
+	case <-time.After(100 * time.Millisecond):
+		t.Fatalf("failed to execute ForceCommit")
+	}
+
+}
+
 func BenchmarkStorePut(b *testing.B) {
 	s := newStore(tmpPath)
 	defer os.Remove(tmpPath)