Jelajahi Sumber

Merge pull request #5508 from heyitsanthony/bench-stm-lock

concurrency, benchmark: additional stm support
Anthony Romano 9 tahun lalu
induk
melakukan
5cb7400cee
2 mengubah file dengan 32 tambahan dan 4 penghapusan
  1. 14 0
      clientv3/concurrency/stm.go
  2. 18 4
      tools/benchmark/cmd/stm.go

+ 14 - 0
clientv3/concurrency/stm.go

@@ -56,6 +56,12 @@ func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error
 	return runSTM(s, apply)
 }
 
+// NewSTMReadCommitted initiates a new read committed transaction.
+func NewSTMReadCommitted(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
+	s := &stmReadCommitted{stm{client: c, ctx: ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}}
+	return runSTM(s, apply)
+}
+
 type stmResponse struct {
 	resp *v3.TxnResponse
 	err  error
@@ -234,6 +240,14 @@ func (s *stmSerializable) commit() *v3.TxnResponse {
 	return nil
 }
 
+type stmReadCommitted struct{ stm }
+
+// commit always goes through when read committed
+func (s *stmReadCommitted) commit() *v3.TxnResponse {
+	s.rset = nil
+	return s.stm.commit()
+}
+
 func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp {
 	rev := r.Header.Revision + 1
 	if len(r.Kvs) != 0 {

+ 18 - 4
tools/benchmark/cmd/stm.go

@@ -45,17 +45,19 @@ var (
 	stmKeyCount     int
 	stmValSize      int
 	stmWritePercent int
+	stmMutex        bool
 	mkSTM           func(context.Context, *v3.Client, func(v3sync.STM) error) (*v3.TxnResponse, error)
 )
 
 func init() {
 	RootCmd.AddCommand(stmCmd)
 
-	stmCmd.Flags().StringVar(&stmIsolation, "isolation", "r", "Repeatable Reads (r) or Serializable (s)")
+	stmCmd.Flags().StringVar(&stmIsolation, "isolation", "r", "Read Committed (c), Repeatable Reads (r), or Serializable (s)")
 	stmCmd.Flags().IntVar(&stmKeyCount, "keys", 1, "Total unique keys accessible by the benchmark")
 	stmCmd.Flags().IntVar(&stmTotal, "total", 10000, "Total number of completed STM transactions")
 	stmCmd.Flags().IntVar(&stmKeysPerTxn, "keys-per-txn", 1, "Number of keys to access per transaction")
 	stmCmd.Flags().IntVar(&stmWritePercent, "txn-wr-percent", 50, "Percentage of keys to overwrite per transaction")
+	stmCmd.Flags().BoolVar(&stmMutex, "use-mutex", false, "Wrap STM transaction in a distributed mutex")
 	stmCmd.Flags().IntVar(&stmValSize, "val-size", 8, "Value size of each STM put request")
 }
 
@@ -76,9 +78,11 @@ func stmFunc(cmd *cobra.Command, args []string) {
 	}
 
 	switch stmIsolation {
+	case "c":
+		mkSTM = v3sync.NewSTMReadCommitted
 	case "r":
 		mkSTM = v3sync.NewSTMRepeatable
-	case "l":
+	case "s":
 		mkSTM = v3sync.NewSTMSerializable
 	default:
 		fmt.Fprintln(os.Stderr, cmd.Usage())
@@ -139,10 +143,20 @@ func stmFunc(cmd *cobra.Command, args []string) {
 func doSTM(ctx context.Context, client *v3.Client, requests <-chan stmApply) {
 	defer wg.Done()
 
+	var m *v3sync.Mutex
+	if stmMutex {
+		m = v3sync.NewMutex(client, "stmlock")
+	}
+
 	for applyf := range requests {
 		st := time.Now()
-		_, err := v3sync.NewSTMRepeatable(context.TODO(), client, applyf)
-
+		if m != nil {
+			m.Lock(context.TODO())
+		}
+		_, err := mkSTM(context.TODO(), client, applyf)
+		if m != nil {
+			m.Unlock(context.TODO())
+		}
 		var errStr string
 		if err != nil {
 			errStr = err.Error()