浏览代码

storage: hold batchTx lock during KV txn

One txn is treated as atomic, and might contain multiple Put/Delete/Range
operations. For now, between these operations, we might call forecCommit
to sync the change to disk, or backend may commit it in background.
Thus the snapshot state might contains an unfinished multiple objects
transaction, which is dangerous if database is restored from the snapshot.

This PR makes KV txn hold batchTx lock during the process and avoids
commit to happen.
Yicheng Qin 10 年之前
父节点
当前提交
c97dda766e
共有 2 个文件被更改,包括 36 次插入13 次删除
  1. 7 13
      storage/kvstore.go
  2. 29 0
      storage/kvstore_test.go

+ 7 - 13
storage/kvstore.go

@@ -52,6 +52,7 @@ type store struct {
 	// the main revision of the last compaction
 	compactMainRev int64
 
+	tx    backend.BatchTx
 	tmu   sync.Mutex // protect the txnID field
 	txnID int64      // tracks the current txnID to verify txn operations
 
@@ -122,6 +123,8 @@ func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
 func (s *store) TxnBegin() int64 {
 	s.mu.Lock()
 	s.currentRev.sub = 0
+	s.tx = s.b.BatchTx()
+	s.tx.Lock()
 
 	s.tmu.Lock()
 	defer s.tmu.Unlock()
@@ -148,6 +151,7 @@ func (s *store) txnEnd(txnID int64) error {
 		return ErrTxnIDMismatch
 	}
 
+	s.tx.Unlock()
 	if s.currentRev.sub != 0 {
 		s.currentRev.main += 1
 	}
@@ -392,14 +396,11 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 		return nil, rev, nil
 	}
 
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
 	for _, revpair := range revpairs {
 		revbytes := newRevBytes()
 		revToBytes(revpair, revbytes)
 
-		_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
+		_, vs := s.tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
 		if len(vs) != 1 {
 			log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
 		}
@@ -446,10 +447,7 @@ func (s *store) put(key, value []byte) {
 		log.Fatalf("storage: cannot marshal event: %v", err)
 	}
 
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
-	tx.UnsafePut(keyBucketName, ibytes, d)
+	s.tx.UnsafePut(keyBucketName, ibytes, d)
 	s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
 	s.currentRev.sub += 1
 }
@@ -474,10 +472,6 @@ func (s *store) deleteRange(key, end []byte) int64 {
 func (s *store) delete(key []byte) {
 	mainrev := s.currentRev.main + 1
 
-	tx := s.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
-
 	ibytes := newRevBytes()
 	revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
 
@@ -493,7 +487,7 @@ func (s *store) delete(key []byte) {
 		log.Fatalf("storage: cannot marshal event: %v", err)
 	}
 
-	tx.UnsafePut(keyBucketName, ibytes, d)
+	s.tx.UnsafePut(keyBucketName, ibytes, d)
 	err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
 	if err != nil {
 		log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)

+ 29 - 0
storage/kvstore_test.go

@@ -103,6 +103,7 @@ func TestStorePut(t *testing.T) {
 	for i, tt := range tests {
 		s, b, index := newFakeStore()
 		s.currentRev = tt.rev
+		s.tx = b.BatchTx()
 		index.indexGetRespc <- tt.r
 
 		s.put([]byte("foo"), []byte("bar"))
@@ -164,6 +165,7 @@ func TestStoreRange(t *testing.T) {
 	for i, tt := range tests {
 		s, b, index := newFakeStore()
 		s.currentRev = currev
+		s.tx = b.BatchTx()
 		b.tx.rangeRespc <- tt.r
 		index.indexRangeRespc <- tt.idxr
 
@@ -223,6 +225,7 @@ func TestStoreDeleteRange(t *testing.T) {
 	for i, tt := range tests {
 		s, b, index := newFakeStore()
 		s.currentRev = tt.rev
+		s.tx = b.BatchTx()
 		index.indexRangeRespc <- tt.r
 
 		n := s.deleteRange([]byte("foo"), []byte("goo"))
@@ -651,6 +654,32 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	tx.Unlock()
 }
 
+func TestTxnBlockBackendForceCommit(t *testing.T) {
+	s := newStore(tmpPath)
+	defer os.Remove(tmpPath)
+
+	id := s.TxnBegin()
+
+	done := make(chan struct{})
+	go func() {
+		s.b.ForceCommit()
+		done <- struct{}{}
+	}()
+	select {
+	case <-done:
+		t.Fatalf("failed to block ForceCommit")
+	case <-time.After(100 * time.Millisecond):
+	}
+
+	s.TxnEnd(id)
+	select {
+	case <-done:
+	case <-time.After(100 * time.Millisecond):
+		t.Fatalf("failed to execute ForceCommit")
+	}
+
+}
+
 func BenchmarkStorePut(b *testing.B) {
 	s := newStore(tmpPath)
 	defer os.Remove(tmpPath)