Browse Source

Merge pull request #7934 from heyitsanthony/bench-rpc-mutex

benchmark: add rpc mutexes to stm benchmark
Anthony Romano 8 years ago
parent
commit
201fd70afc
1 changed files with 49 additions and 9 deletions
  1. 49 9
      tools/benchmark/cmd/stm.go

+ 49 - 9
tools/benchmark/cmd/stm.go

@@ -17,16 +17,19 @@ package cmd
 import (
 	"encoding/binary"
 	"fmt"
+	"math"
 	"math/rand"
 	"os"
 	"time"
 
 	v3 "github.com/coreos/etcd/clientv3"
 	v3sync "github.com/coreos/etcd/clientv3/concurrency"
+	"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
 	"github.com/coreos/etcd/pkg/report"
 
 	"github.com/spf13/cobra"
 	"golang.org/x/net/context"
+	"golang.org/x/time/rate"
 	"gopkg.in/cheggaaa/pb.v1"
 )
 
@@ -49,7 +52,8 @@ var (
 	stmKeyCount     int
 	stmValSize      int
 	stmWritePercent int
-	stmMutex        bool
+	stmLocker       string
+	stmRate         int
 )
 
 func init() {
@@ -60,8 +64,9 @@ func init() {
 	stmCmd.Flags().IntVar(&stmTotal, "total", 10000, "Total number of completed STM transactions")
 	stmCmd.Flags().IntVar(&stmKeysPerTxn, "keys-per-txn", 1, "Number of keys to access per transaction")
 	stmCmd.Flags().IntVar(&stmWritePercent, "txn-wr-percent", 50, "Percentage of keys to overwrite per transaction")
-	stmCmd.Flags().BoolVar(&stmMutex, "use-mutex", false, "Wrap STM transaction in a distributed mutex")
+	stmCmd.Flags().StringVar(&stmLocker, "stm-locker", "stm", "Wrap STM transaction with a custom locking mechanism (stm, lock-client, lock-rpc)")
 	stmCmd.Flags().IntVar(&stmValSize, "val-size", 8, "Value size of each STM put request")
+	stmCmd.Flags().IntVar(&stmRate, "rate", 0, "Maximum STM transactions per second (0 is no limit)")
 }
 
 func stmFunc(cmd *cobra.Command, args []string) {
@@ -94,6 +99,11 @@ func stmFunc(cmd *cobra.Command, args []string) {
 		os.Exit(1)
 	}
 
+	if stmRate == 0 {
+		stmRate = math.MaxInt32
+	}
+	limit := rate.NewLimiter(rate.Limit(stmRate), 1)
+
 	requests := make(chan stmApply, totalClients)
 	clients := mustCreateClients(totalClients, totalConns)
 
@@ -118,6 +128,7 @@ func stmFunc(cmd *cobra.Command, args []string) {
 			}
 
 			applyf := func(s v3sync.STM) error {
+				limit.Wait(context.Background())
 				wrs := int(float32(len(kset)*stmWritePercent) / 100.0)
 				for k := range kset {
 					s.Get(k)
@@ -144,23 +155,52 @@ func stmFunc(cmd *cobra.Command, args []string) {
 func doSTM(client *v3.Client, requests <-chan stmApply, results chan<- report.Result) {
 	defer wg.Done()
 
-	var m *v3sync.Mutex
-	if stmMutex {
+	lock, unlock := func() error { return nil }, func() error { return nil }
+	switch stmLocker {
+	case "lock-client":
+		s, err := v3sync.NewSession(client)
+		if err != nil {
+			panic(err)
+		}
+		defer s.Close()
+		m := v3sync.NewMutex(s, "stmlock")
+		lock = func() error { return m.Lock(context.TODO()) }
+		unlock = func() error { return m.Unlock(context.TODO()) }
+	case "lock-rpc":
+		var lockKey []byte
 		s, err := v3sync.NewSession(client)
 		if err != nil {
 			panic(err)
 		}
-		m = v3sync.NewMutex(s, "stmlock")
+		defer s.Close()
+		lc := v3lockpb.NewLockClient(client.ActiveConnection())
+		lock = func() error {
+			req := &v3lockpb.LockRequest{Name: []byte("stmlock"), Lease: int64(s.Lease())}
+			resp, err := lc.Lock(context.TODO(), req)
+			if resp != nil {
+				lockKey = resp.Key
+			}
+			return err
+		}
+		unlock = func() error {
+			req := &v3lockpb.UnlockRequest{Key: lockKey}
+			_, err := lc.Unlock(context.TODO(), req)
+			return err
+		}
+	case "stm":
+	default:
+		fmt.Fprintf(os.Stderr, "unexpected stm locker %q\n", stmLocker)
+		os.Exit(1)
 	}
 
 	for applyf := range requests {
 		st := time.Now()
-		if m != nil {
-			m.Lock(context.TODO())
+		if lerr := lock(); lerr != nil {
+			panic(lerr)
 		}
 		_, err := v3sync.NewSTM(client, applyf, v3sync.WithIsolation(stmIso))
-		if m != nil {
-			m.Unlock(context.TODO())
+		if lerr := unlock(); lerr != nil {
+			panic(lerr)
 		}
 		results <- report.Result{Err: err, Start: st, End: time.Now()}
 		bar.Increment()