Browse Source

storage: use only one mutex for store struct

Mutex is a variable, which means there needs to be only one mutex
value per scope. We don't need a separate mutex inside store struct,
**if we assume that `TxnPut` and `TxnRange` are called ONLY ONCE
per transaction (between `TxnBegin` and `TxnEnd`)**, as documented.
Gyu-Ho Lee 10 years ago
parent
commit
84d777305d
3 changed files with 101 additions and 32 deletions
  1. 4 15
      storage/kvstore.go
  2. 57 0
      storage/kvstore_bench_test.go
  3. 40 17
      storage/kvstore_test.go

+ 4 - 15
storage/kvstore.go

@@ -49,7 +49,7 @@ var (
 )
 
 type store struct {
-	mu sync.RWMutex
+	mu sync.Mutex // guards the following
 
 	b       backend.Backend
 	kvindex index
@@ -59,8 +59,7 @@ type store struct {
 	compactMainRev int64
 
 	tx    backend.BatchTx
-	tmu   sync.Mutex // protect the txnID field
-	txnID int64      // tracks the current txnID to verify txn operations
+	txnID int64 // tracks the current txnID to verify txn operations
 
 	wg    sync.WaitGroup
 	stopc chan struct{}
@@ -86,8 +85,8 @@ func newStore(path string) *store {
 }
 
 func (s *store) Rev() int64 {
-	s.mu.RLock()
-	defer s.mu.RUnlock()
+	s.mu.Lock()
+	defer s.mu.Unlock()
 
 	return s.currentRev.main
 }
@@ -128,8 +127,6 @@ func (s *store) TxnBegin() int64 {
 	s.tx = s.b.BatchTx()
 	s.tx.Lock()
 
-	s.tmu.Lock()
-	defer s.tmu.Unlock()
 	s.txnID = rand.Int63()
 	return s.txnID
 }
@@ -147,8 +144,6 @@ func (s *store) TxnEnd(txnID int64) error {
 // txnEnd is used for unlocking an internal txn. It does
 // not increase the txnCounter.
 func (s *store) txnEnd(txnID int64) error {
-	s.tmu.Lock()
-	defer s.tmu.Unlock()
 	if txnID != s.txnID {
 		return ErrTxnIDMismatch
 	}
@@ -165,8 +160,6 @@ func (s *store) txnEnd(txnID int64) error {
 }
 
 func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
-	s.tmu.Lock()
-	defer s.tmu.Unlock()
 	if txnID != s.txnID {
 		return nil, 0, ErrTxnIDMismatch
 	}
@@ -174,8 +167,6 @@ func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (k
 }
 
 func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
-	s.tmu.Lock()
-	defer s.tmu.Unlock()
 	if txnID != s.txnID {
 		return 0, ErrTxnIDMismatch
 	}
@@ -185,8 +176,6 @@ func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
 }
 
 func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
-	s.tmu.Lock()
-	defer s.tmu.Unlock()
 	if txnID != s.txnID {
 		return 0, 0, ErrTxnIDMismatch
 	}

+ 57 - 0
storage/kvstore_bench_test.go

@@ -0,0 +1,57 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package storage
+
+import (
+	"log"
+	"os"
+	"testing"
+)
+
+func BenchmarkStorePut(b *testing.B) {
+	s := newStore(tmpPath)
+	defer os.Remove(tmpPath)
+
+	// arbitrary number of bytes
+	bytesN := 64
+	keys := createBytesSlice(bytesN, b.N)
+	vals := createBytesSlice(bytesN, b.N)
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		s.Put(keys[i], vals[i])
+	}
+}
+
+// BenchmarkStoreTxnPut benchmarks the Put operation
+// with transaction begin and end, where transaction involves
+// some synchronization operations, such as mutex locking.
+func BenchmarkStoreTxnPut(b *testing.B) {
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	// arbitrary number of bytes
+	bytesN := 64
+	keys := createBytesSlice(bytesN, b.N)
+	vals := createBytesSlice(bytesN, b.N)
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		id := s.TxnBegin()
+		if _, err := s.TxnPut(id, keys[i], vals[i]); err != nil {
+			log.Fatalf("txn put error: %v", err)
+		}
+		s.TxnEnd(id)
+	}
+}

+ 40 - 17
storage/kvstore_test.go

@@ -18,6 +18,7 @@ import (
 	"crypto/rand"
 	"encoding/binary"
 	"math"
+	mrand "math/rand"
 	"os"
 	"reflect"
 	"testing"
@@ -665,6 +666,32 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
 }
 
+func TestTxnPut(t *testing.T) {
+	// assign arbitrary size
+	bytesN := 30
+	sliceN := 100
+	keys := createBytesSlice(bytesN, sliceN)
+	vals := createBytesSlice(bytesN, sliceN)
+
+	s := newStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	for i := 0; i < sliceN; i++ {
+		id := s.TxnBegin()
+		base := int64(i + 1)
+
+		rev, err := s.TxnPut(id, keys[i], vals[i])
+		if err != nil {
+			t.Error("txn put error")
+		}
+		if rev != base {
+			t.Errorf("#%d: rev = %d, want %d", i, rev, base)
+		}
+
+		s.TxnEnd(id)
+	}
+}
+
 func TestTxnBlockBackendForceCommit(t *testing.T) {
 	s := newStore(tmpPath)
 	defer os.Remove(tmpPath)
@@ -691,23 +718,6 @@ func TestTxnBlockBackendForceCommit(t *testing.T) {
 
 }
 
-func BenchmarkStorePut(b *testing.B) {
-	s := newStore(tmpPath)
-	defer os.Remove(tmpPath)
-
-	// prepare keys
-	keys := make([][]byte, b.N)
-	for i := 0; i < b.N; i++ {
-		keys[i] = make([]byte, 64)
-		rand.Read(keys[i])
-	}
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		s.Put(keys[i], []byte("foo"))
-	}
-}
-
 func newTestRevBytes(rev revision) []byte {
 	bytes := newRevBytes()
 	revToBytes(rev, bytes)
@@ -831,3 +841,16 @@ func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
 	return <-i.indexCompactRespc
 }
 func (i *fakeIndex) Equal(b index) bool { return false }
+
+func createBytesSlice(bytesN, sliceN int) [][]byte {
+	rs := [][]byte{}
+	for len(rs) != sliceN {
+		mrand.Seed(time.Now().UnixNano())
+		v := make([]byte, bytesN)
+		if _, err := rand.Read(v); err != nil {
+			panic(err)
+		}
+		rs = append(rs, v)
+	}
+	return rs
+}