Browse Source

tools/benchmark: revive watch benchmark

Current watch benchmark seems to be broken. This commit revives it.
Hitoshi Mitake 10 years ago
parent
commit
99e7449f44
1 changed files with 48 additions and 12 deletions
  1. 48 12
      tools/benchmark/cmd/watch.go

+ 48 - 12
tools/benchmark/cmd/watch.go

@@ -17,6 +17,7 @@ package cmd
 import (
 import (
 	"fmt"
 	"fmt"
 	"os"
 	"os"
+	"sync/atomic"
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -51,6 +52,14 @@ var (
 
 
 	watchPutRate  int
 	watchPutRate  int
 	watchPutTotal int
 	watchPutTotal int
+
+	eventsTotal int
+
+	nrWatchCompleted       int32
+	nrRecvCompleted        int32
+	watchCompletedNotifier chan struct{}
+	putStartNotifier       chan struct{}
+	recvCompletedNotifier  chan struct{}
 )
 )
 
 
 func init() {
 func init() {
@@ -83,10 +92,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		}
 		}
 	}
 	}
 
 
-	for i := range streams {
-		wg.Add(1)
-		go doWatch(streams[i], requests)
-	}
+	putStartNotifier = make(chan struct{})
 
 
 	// watching phase
 	// watching phase
 	results = make(chan result)
 	results = make(chan result)
@@ -97,6 +103,12 @@ func watchFunc(cmd *cobra.Command, args []string) {
 
 
 	pdoneC := printRate(results)
 	pdoneC := printRate(results)
 
 
+	atomic.StoreInt32(&nrWatchCompleted, int32(0))
+	watchCompletedNotifier = make(chan struct{})
+	for i := range streams {
+		go doWatch(streams[i], requests)
+	}
+
 	go func() {
 	go func() {
 		for i := 0; i < watchTotal; i++ {
 		for i := 0; i < watchTotal; i++ {
 			requests <- etcdserverpb.WatchRequest{
 			requests <- etcdserverpb.WatchRequest{
@@ -107,7 +119,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		close(requests)
 		close(requests)
 	}()
 	}()
 
 
-	wg.Wait()
+	<-watchCompletedNotifier
 	bar.Finish()
 	bar.Finish()
 
 
 	fmt.Printf("Watch creation summary:\n")
 	fmt.Printf("Watch creation summary:\n")
@@ -116,19 +128,21 @@ func watchFunc(cmd *cobra.Command, args []string) {
 
 
 	// put phase
 	// put phase
 	// total number of puts * number of watchers on each key
 	// total number of puts * number of watchers on each key
-	eventsTotal := watchPutTotal * (watchTotal / watchedKeyTotal)
-
+	eventsTotal = watchPutTotal * (watchTotal / watchedKeyTotal)
 	results = make(chan result)
 	results = make(chan result)
 	bar = pb.New(eventsTotal)
 	bar = pb.New(eventsTotal)
 
 
 	bar.Format("Bom !")
 	bar.Format("Bom !")
 	bar.Start()
 	bar.Start()
 
 
+	atomic.StoreInt32(&nrRecvCompleted, 0)
+	recvCompletedNotifier = make(chan struct{})
+	close(putStartNotifier)
+
 	putreqc := make(chan etcdserverpb.PutRequest)
 	putreqc := make(chan etcdserverpb.PutRequest)
 
 
 	for i := 0; i < watchPutTotal; i++ {
 	for i := 0; i < watchPutTotal; i++ {
-		wg.Add(1)
-		go doPut(context.TODO(), clients[i%len(clients)].KV, putreqc)
+		go doPutForWatch(context.TODO(), clients[i%len(clients)].KV, putreqc)
 	}
 	}
 
 
 	pdoneC = printRate(results)
 	pdoneC = printRate(results)
@@ -145,7 +159,7 @@ func watchFunc(cmd *cobra.Command, args []string) {
 		close(putreqc)
 		close(putreqc)
 	}()
 	}()
 
 
-	wg.Wait()
+	<-recvCompletedNotifier
 	bar.Finish()
 	bar.Finish()
 	fmt.Printf("Watch events received summary:\n")
 	fmt.Printf("Watch events received summary:\n")
 	close(results)
 	close(results)
@@ -163,14 +177,36 @@ func doWatch(stream etcdserverpb.Watch_WatchClient, requests <-chan etcdserverpb
 		results <- result{errStr: errStr, duration: time.Since(st)}
 		results <- result{errStr: errStr, duration: time.Since(st)}
 		bar.Increment()
 		bar.Increment()
 	}
 	}
+	atomic.AddInt32(&nrWatchCompleted, 1)
+	if atomic.LoadInt32(&nrWatchCompleted) == int32(watchTotalStreams) {
+		watchCompletedNotifier <- struct{}{}
+	}
+
+	<-putStartNotifier
+
 	for {
 	for {
+		st := time.Now()
 		_, err := stream.Recv()
 		_, err := stream.Recv()
 		var errStr string
 		var errStr string
 		if err != nil {
 		if err != nil {
 			errStr = err.Error()
 			errStr = err.Error()
 		}
 		}
-		results <- result{errStr: errStr}
+		results <- result{errStr: errStr, duration: time.Since(st)}
 		bar.Increment()
 		bar.Increment()
+
+		atomic.AddInt32(&nrRecvCompleted, 1)
+		if atomic.LoadInt32(&nrRecvCompleted) == int32(eventsTotal) {
+			recvCompletedNotifier <- struct{}{}
+		}
+	}
+}
+
+func doPutForWatch(ctx context.Context, client etcdserverpb.KVClient, requests <-chan etcdserverpb.PutRequest) {
+	for r := range requests {
+		_, err := client.Put(ctx, &r)
+		if err != nil {
+			fmt.Fprintln(os.Stderr, "failed to Put for watch benchmark: %s", err)
+			os.Exit(1)
+		}
 	}
 	}
-	wg.Done()
 }
 }