Browse Source

Merge pull request #4070 from mitake/storage-bench

tools: a new tool for benchmarking storage backends
Xiang Li 10 years ago
parent
commit
a74147384d

+ 18 - 18
storage/kv_test.go

@@ -89,7 +89,7 @@ func TestKVRange(t *testing.T)    { testKVRange(t, normalRangeFunc) }
 func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
 
 func testKVRange(t *testing.T, f rangeFunc) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"))
@@ -156,7 +156,7 @@ func TestKVRangeRev(t *testing.T)    { testKVRangeRev(t, normalRangeFunc) }
 func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
 
 func testKVRangeRev(t *testing.T, f rangeFunc) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"))
@@ -198,7 +198,7 @@ func TestKVRangeBadRev(t *testing.T)    { testKVRangeBadRev(t, normalRangeFunc)
 func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
 
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"))
@@ -230,7 +230,7 @@ func TestKVRangeLimit(t *testing.T)    { testKVRangeLimit(t, normalRangeFunc) }
 func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
 
 func testKVRangeLimit(t *testing.T, f rangeFunc) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"))
@@ -274,7 +274,7 @@ func TestKVPutMultipleTimes(t *testing.T)    { testKVPutMultipleTimes(t, normalP
 func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) }
 
 func testKVPutMultipleTimes(t *testing.T, f putFunc) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -335,7 +335,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
 	}
 
 	for i, tt := range tests {
-		s := newStore(tmpPath)
+		s := newDefaultStore(tmpPath)
 
 		s.Put([]byte("foo"), []byte("bar"))
 		s.Put([]byte("foo1"), []byte("bar1"))
@@ -354,7 +354,7 @@ func TestKVDeleteMultipleTimes(t *testing.T)    { testKVDeleteMultipleTimes(t, n
 func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) }
 
 func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"))
@@ -374,7 +374,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 
 // test that range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVOperationInSequence(t *testing.T) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -420,7 +420,7 @@ func TestKVOperationInSequence(t *testing.T) {
 }
 
 func TestKVTxnBlockNonTnxOperations(t *testing.T) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	tests := []func(){
@@ -451,7 +451,7 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) {
 }
 
 func TestKVTxnWrongID(t *testing.T) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	id := s.TxnBegin()
@@ -487,7 +487,7 @@ func TestKVTxnWrongID(t *testing.T) {
 
 // test that txn range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVTnxOperationInSequence(t *testing.T) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -542,7 +542,7 @@ func TestKVTnxOperationInSequence(t *testing.T) {
 }
 
 func TestKVCompactReserveLastValue(t *testing.T) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar0"))
@@ -595,7 +595,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 }
 
 func TestKVCompactBad(t *testing.T) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar0"))
@@ -627,7 +627,7 @@ func TestKVHash(t *testing.T) {
 
 	for i := 0; i < len(hashes); i++ {
 		var err error
-		kv := newStore(tmpPath)
+		kv := newDefaultStore(tmpPath)
 		kv.Put([]byte("foo0"), []byte("bar0"))
 		kv.Put([]byte("foo1"), []byte("bar0"))
 		hashes[i], err = kv.Hash()
@@ -663,7 +663,7 @@ func TestKVRestore(t *testing.T) {
 		},
 	}
 	for i, tt := range tests {
-		s := newStore(tmpPath)
+		s := newDefaultStore(tmpPath)
 		tt(s)
 		var kvss [][]storagepb.KeyValue
 		for k := int64(0); k < 10; k++ {
@@ -672,7 +672,7 @@ func TestKVRestore(t *testing.T) {
 		}
 		s.Close()
 
-		ns := newStore(tmpPath)
+		ns := newDefaultStore(tmpPath)
 		ns.Restore()
 		// wait for possible compaction to finish
 		testutil.WaitSchedule()
@@ -690,7 +690,7 @@ func TestKVRestore(t *testing.T) {
 }
 
 func TestKVSnapshot(t *testing.T) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"))
@@ -714,7 +714,7 @@ func TestKVSnapshot(t *testing.T) {
 	}
 	f.Close()
 
-	ns := newStore("new_test")
+	ns := newDefaultStore("new_test")
 	defer cleanup(ns, "new_test")
 	ns.Restore()
 	kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)

+ 5 - 1
storage/kvstore.go

@@ -65,7 +65,7 @@ type store struct {
 	stopc chan struct{}
 }
 
-func newStore(path string) *store {
+func NewStore(path string, bachInterval time.Duration, batchLimit int) KV {
 	s := &store{
 		b:              backend.New(path, batchInterval, batchLimit),
 		kvindex:        newTreeIndex(),
@@ -84,6 +84,10 @@ func newStore(path string) *store {
 	return s
 }
 
+func newDefaultStore(path string) *store {
+	return (NewStore(path, batchInterval, batchLimit)).(*store)
+}
+
 func (s *store) Rev() int64 {
 	s.mu.Lock()
 	defer s.mu.Unlock()

+ 2 - 2
storage/kvstore_bench_test.go

@@ -20,7 +20,7 @@ import (
 )
 
 func BenchmarkStorePut(b *testing.B) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer os.Remove(tmpPath)
 
 	// arbitrary number of bytes
@@ -38,7 +38,7 @@ func BenchmarkStorePut(b *testing.B) {
 // with transaction begin and end, where transaction involves
 // some synchronization operations, such as mutex locking.
 func BenchmarkStoreTxnPut(b *testing.B) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	// arbitrary number of bytes

+ 1 - 1
storage/kvstore_compaction_test.go

@@ -58,7 +58,7 @@ func TestScheduleCompaction(t *testing.T) {
 		},
 	}
 	for i, tt := range tests {
-		s := newStore(tmpPath)
+		s := newDefaultStore(tmpPath)
 		tx := s.b.BatchTx()
 
 		tx.Lock()

+ 5 - 5
storage/kvstore_test.go

@@ -29,7 +29,7 @@ import (
 )
 
 func TestStoreRev(t *testing.T) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer os.Remove(tmpPath)
 
 	for i := 0; i < 3; i++ {
@@ -354,7 +354,7 @@ func TestStoreRestore(t *testing.T) {
 }
 
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
-	s0 := newStore(tmpPath)
+	s0 := newDefaultStore(tmpPath)
 	defer os.Remove(tmpPath)
 
 	s0.Put([]byte("foo"), []byte("bar"))
@@ -371,7 +371,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 
 	s0.Close()
 
-	s1 := newStore(tmpPath)
+	s1 := newDefaultStore(tmpPath)
 	s1.Restore()
 
 	// wait for scheduled compaction to be finished
@@ -409,7 +409,7 @@ func TestTxnPut(t *testing.T) {
 	keys := createBytesSlice(bytesN, sliceN)
 	vals := createBytesSlice(bytesN, sliceN)
 
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
 	for i := 0; i < sliceN; i++ {
@@ -429,7 +429,7 @@ func TestTxnPut(t *testing.T) {
 }
 
 func TestTxnBlockBackendForceCommit(t *testing.T) {
-	s := newStore(tmpPath)
+	s := newDefaultStore(tmpPath)
 	defer os.Remove(tmpPath)
 
 	id := s.TxnBegin()

+ 1 - 1
storage/watchable_store.go

@@ -55,7 +55,7 @@ type watchableStore struct {
 
 func newWatchableStore(path string) *watchableStore {
 	s := &watchableStore{
-		store:    newStore(path),
+		store:    newDefaultStore(path),
 		unsynced: make(map[*watching]struct{}),
 		synced:   make(map[string]map[*watching]struct{}),
 		stopc:    make(chan struct{}),

+ 1 - 1
storage/watchable_store_bench_test.go

@@ -33,7 +33,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// in unsynced for this benchmark.
 	s := &watchableStore{
-		store:    newStore(tmpPath),
+		store:    newDefaultStore(tmpPath),
 		unsynced: make(map[*watching]struct{}),
 
 		// to make the test not crash from assigning to nil map.

+ 2 - 2
storage/watchable_store_test.go

@@ -66,7 +66,7 @@ func TestCancelUnsynced(t *testing.T) {
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
-		store:    newStore(tmpPath),
+		store:    newDefaultStore(tmpPath),
 		unsynced: make(map[*watching]struct{}),
 
 		// to make the test not crash from assigning to nil map.
@@ -120,7 +120,7 @@ func TestCancelUnsynced(t *testing.T) {
 // watchings to synced.
 func TestSyncWatchings(t *testing.T) {
 	s := &watchableStore{
-		store:    newStore(tmpPath),
+		store:    newDefaultStore(tmpPath),
 		unsynced: make(map[*watching]struct{}),
 		synced:   make(map[string]map[*watching]struct{}),
 	}

+ 112 - 0
tools/benchmark/cmd/storage-put.go

@@ -0,0 +1,112 @@
+// Copyright 2015 Nippon Telegraph and Telephone Corporation.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cmd
+
+import (
+	"crypto/rand"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
+)
+
+// storagePutCmd represents a storage put performance benchmarking tool
+var storagePutCmd = &cobra.Command{
+	Use:   "put",
+	Short: "Benchmark put performance of storage",
+
+	Run: storagePutFunc,
+}
+
+var (
+	totalNrKeys    int
+	storageKeySize int
+	valueSize      int
+	txn            bool
+)
+
+func init() {
+	storageCmd.AddCommand(storagePutCmd)
+
+	storagePutCmd.Flags().IntVar(&totalNrKeys, "total", 100, "a total number of keys to put")
+	storagePutCmd.Flags().IntVar(&storageKeySize, "key-size", 64, "a size of key (Byte)")
+	storagePutCmd.Flags().IntVar(&valueSize, "value-size", 64, "a size of value (Byte)")
+	storagePutCmd.Flags().BoolVar(&txn, "txn", false, "put a key in transaction or not")
+}
+
+func createBytesSlice(bytesN, sliceN int) [][]byte {
+	rs := make([][]byte, sliceN)
+	for i := range rs {
+		rs[i] = make([]byte, bytesN)
+		if _, err := rand.Read(rs[i]); err != nil {
+			panic(err)
+		}
+	}
+	return rs
+}
+
+func storagePutFunc(cmd *cobra.Command, args []string) {
+	keys := createBytesSlice(storageKeySize, totalNrKeys)
+	vals := createBytesSlice(valueSize, totalNrKeys)
+
+	latencies := make([]time.Duration, totalNrKeys)
+
+	minLat := time.Duration(1<<63 - 1)
+	maxLat := time.Duration(0)
+
+	for i := 0; i < totalNrKeys; i++ {
+		begin := time.Now()
+
+		if txn {
+			id := s.TxnBegin()
+			if _, err := s.TxnPut(id, keys[i], vals[i]); err != nil {
+				fmt.Errorf("txn put error: %v", err)
+				os.Exit(1)
+			}
+			s.TxnEnd(id)
+		} else {
+			s.Put(keys[i], vals[i])
+		}
+
+		end := time.Now()
+
+		lat := end.Sub(begin)
+		latencies[i] = lat
+		if maxLat < lat {
+			maxLat = lat
+		}
+		if lat < minLat {
+			minLat = lat
+		}
+	}
+
+	total := time.Duration(0)
+
+	for _, lat := range latencies {
+		total += lat
+	}
+
+	fmt.Printf("total: %v\n", total)
+	fmt.Printf("average: %v\n", total/time.Duration(totalNrKeys))
+	fmt.Printf("rate: %4.4f\n", float64(totalNrKeys)/total.Seconds())
+	fmt.Printf("minimum latency: %v\n", minLat)
+	fmt.Printf("maximum latency: %v\n", maxLat)
+
+	// TODO: Currently this benchmark doesn't use the common histogram infrastructure.
+	// This is because an accuracy of the infrastructure isn't suitable for measuring
+	// performance of kv storage:
+	// https://github.com/coreos/etcd/pull/4070#issuecomment-167954149
+}

+ 56 - 0
tools/benchmark/cmd/storage.go

@@ -0,0 +1,56 @@
+// Copyright 2015 Nippon Telegraph and Telephone Corporation.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cmd
+
+import (
+	"os"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
+	"github.com/coreos/etcd/storage"
+)
+
+var (
+	batchInterval int
+	batchLimit    int
+
+	s storage.KV
+)
+
+func initStorage() {
+	s = storage.NewStore("storage-bench", time.Duration(batchInterval), batchLimit)
+	os.Remove("storage-bench") // boltDB has an opened fd, so removing the file is ok
+}
+
+// storageCmd represents the storage benchmarking tools
+var storageCmd = &cobra.Command{
+	Use:   "storage",
+	Short: "Benchmark storage",
+	Long: `storage subcommand is a set of various benchmark tools for storage subsystem of etcd.
+Actual benchmarks are implemented as its subcommands.`,
+
+	PersistentPreRun: storagePreRun,
+}
+
+func init() {
+	RootCmd.AddCommand(storageCmd)
+
+	storageCmd.PersistentFlags().IntVar(&batchInterval, "batch-interval", 100, "Interval of batching (milliseconds)")
+	storageCmd.PersistentFlags().IntVar(&batchLimit, "batch-limit", 10000, "A limit of batched transaction")
+}
+
+func storagePreRun(cmd *cobra.Command, args []string) {
+	initStorage()
+}