Browse Source

Merge pull request #6248 from xiang90/fix_mvcc

mvcc: only write txn should update index
Xiang Li 9 years ago
parent
commit
83e66d2962
1 changed files with 15 additions and 3 deletions
  1. 15 3
      mvcc/kvstore.go

+ 15 - 3
mvcc/kvstore.go

@@ -74,8 +74,9 @@ type store struct {
 	// the main revision of the last compaction
 	// the main revision of the last compaction
 	compactMainRev int64
 	compactMainRev int64
 
 
-	tx    backend.BatchTx
-	txnID int64 // tracks the current txnID to verify txn operations
+	tx        backend.BatchTx
+	txnID     int64 // tracks the current txnID to verify txn operations
+	txnModify bool
 
 
 	// bytesBuf8 is a byte slice of length 8
 	// bytesBuf8 is a byte slice of length 8
 	// to avoid a repetitive allocation in saveIndex.
 	// to avoid a repetitive allocation in saveIndex.
@@ -180,7 +181,6 @@ func (s *store) TxnBegin() int64 {
 	s.currentRev.sub = 0
 	s.currentRev.sub = 0
 	s.tx = s.b.BatchTx()
 	s.tx = s.b.BatchTx()
 	s.tx.Lock()
 	s.tx.Lock()
-	s.saveIndex()
 
 
 	s.txnID = rand.Int63()
 	s.txnID = rand.Int63()
 	return s.txnID
 	return s.txnID
@@ -203,6 +203,14 @@ func (s *store) txnEnd(txnID int64) error {
 		return ErrTxnIDMismatch
 		return ErrTxnIDMismatch
 	}
 	}
 
 
+	// only update index if the txn modifies the mvcc state.
+	// read only txn might execute with one write txn concurrently,
+	// it should not write its index to mvcc.
+	if s.txnModify {
+		s.saveIndex()
+	}
+	s.txnModify = false
+
 	s.tx.Unlock()
 	s.tx.Unlock()
 	if s.currentRev.sub != 0 {
 	if s.currentRev.sub != 0 {
 		s.currentRev.main += 1
 		s.currentRev.main += 1
@@ -502,6 +510,8 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool
 }
 }
 
 
 func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
 func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
+	s.txnModify = true
+
 	rev := s.currentRev.main + 1
 	rev := s.currentRev.main + 1
 	c := rev
 	c := rev
 	oldLease := lease.NoLease
 	oldLease := lease.NoLease
@@ -568,6 +578,8 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
 }
 }
 
 
 func (s *store) deleteRange(key, end []byte) int64 {
 func (s *store) deleteRange(key, end []byte) int64 {
+	s.txnModify = true
+
 	rrev := s.currentRev.main
 	rrev := s.currentRev.main
 	if s.currentRev.sub > 0 {
 	if s.currentRev.sub > 0 {
 		rrev += 1
 		rrev += 1